rivetkit-core 2.3.0-rc.12

Core runtime primitives for RivetKit actor hosts
use super::*;

mod moved_tests {
	use std::sync::Arc;
	use std::sync::atomic::{AtomicUsize, Ordering};
	use std::time::Duration;

	use anyhow::{Result, anyhow};
	use futures::future::BoxFuture;

	use super::Schedule;
	use crate::actor::action::ActionInvoker;
	use crate::actor::callbacks::{ActionHandler, ActorInstanceCallbacks};
	use crate::actor::config::ActorConfig;
	use crate::actor::context::ActorContext;
	use crate::actor::state::ActorState;

	fn action_handler<F>(handler: F) -> ActionHandler
	where
		F: Fn(crate::actor::callbacks::ActionRequest) -> BoxFuture<'static, Result<Vec<u8>>>
			+ Send
			+ Sync
			+ 'static,
	{
		Box::new(handler)
	}

	#[test]
	fn at_inserts_events_in_timestamp_order() {
		let schedule = Schedule::default();

		schedule.at(50, "later", b"");
		schedule.at(10, "sooner", b"");
		schedule.at(30, "middle", b"");

		let actions: Vec<_> = schedule
			.all_events()
			.into_iter()
			.map(|event| event.action)
			.collect();

		assert_eq!(actions, vec!["sooner", "middle", "later"]);
	}

	#[test]
	fn after_creates_future_event() {
		let schedule = Schedule::default();

		schedule.after(Duration::from_millis(5), "ping", b"abc");

		let event = schedule.next_event().expect("scheduled event should exist");
		assert_eq!(event.action, "ping");
		assert_eq!(event.args.as_deref(), Some(b"abc".as_slice()));
		assert!(event.timestamp >= super::now_timestamp_ms());
	}

	#[tokio::test]
	async fn handle_alarm_dispatches_due_events_and_removes_them() {
		let schedule = Schedule::new(ActorState::default(), "actor-1", ActorConfig::default());
		let ctx = ActorContext::default();
		let mut callbacks = ActorInstanceCallbacks::default();
		let seen = Arc::new(AtomicUsize::new(0));
		let seen_clone = seen.clone();
		callbacks.actions.insert(
			"run".to_owned(),
			action_handler(move |request| {
				let seen_clone = seen_clone.clone();
				Box::pin(async move {
					assert_eq!(request.args, b"payload");
					seen_clone.fetch_add(1, Ordering::SeqCst);
					Ok(Vec::new())
				})
			}),
		);

		let invoker = ActionInvoker::new(ActorConfig::default(), callbacks);
		schedule.at(
			super::now_timestamp_ms().saturating_sub(1),
			"run",
			b"payload",
		);
		schedule.at(
			super::now_timestamp_ms().saturating_add(60_000),
			"later",
			b"",
		);

		let executed = schedule.handle_alarm(&ctx, &invoker).await;

		assert_eq!(executed, 1);
		assert_eq!(seen.load(Ordering::SeqCst), 1);
		assert_eq!(schedule.all_events().len(), 1);
		assert_eq!(schedule.next_event().expect("future event").action, "later");
	}

	#[tokio::test]
	async fn handle_alarm_continues_after_errors_and_uses_keep_awake_wrapper() {
		let schedule = Schedule::new(ActorState::default(), "actor-1", ActorConfig::default());
		let ctx = ActorContext::default();
		let mut callbacks = ActorInstanceCallbacks::default();
		let keep_awake_calls = Arc::new(AtomicUsize::new(0));
		let keep_awake_calls_clone = keep_awake_calls.clone();
		schedule.set_internal_keep_awake(Some(Arc::new(move |future| {
			let keep_awake_calls_clone = keep_awake_calls_clone.clone();
			Box::pin(async move {
				keep_awake_calls_clone.fetch_add(1, Ordering::SeqCst);
				future.await
			})
		})));

		let succeeded = Arc::new(AtomicUsize::new(0));
		let succeeded_clone = succeeded.clone();
		callbacks.actions.insert(
			"ok".to_owned(),
			action_handler(move |_| {
				let succeeded_clone = succeeded_clone.clone();
				Box::pin(async move {
					succeeded_clone.fetch_add(1, Ordering::SeqCst);
					Ok(Vec::new())
				})
			}),
		);
		callbacks.actions.insert(
			"fail".to_owned(),
			action_handler(|_| Box::pin(async move { Err(anyhow!("boom")) })),
		);

		let invoker = ActionInvoker::new(ActorConfig::default(), callbacks);
		schedule.at(super::now_timestamp_ms().saturating_sub(1), "fail", b"");
		schedule.at(super::now_timestamp_ms().saturating_sub(1), "ok", b"");

		let executed = schedule.handle_alarm(&ctx, &invoker).await;

		assert_eq!(executed, 2);
		assert_eq!(keep_awake_calls.load(Ordering::SeqCst), 2);
		assert_eq!(succeeded.load(Ordering::SeqCst), 1);
		assert!(schedule.all_events().is_empty());
	}

	#[test]
	fn set_alarm_requires_envoy_handle() {
		let schedule = Schedule::default();
		let error = schedule
			.set_alarm(Some(123))
			.expect_err("set_alarm should fail without envoy");

		assert!(
			error
				.to_string()
				.contains("schedule alarm handle is not configured")
		);
	}
}