rivetkit-core 2.3.0-rc.12

Core runtime primitives for RivetKit actor hosts
mod moved_tests {
	use std::sync::atomic::{AtomicUsize, Ordering};
	use std::sync::{Arc, Mutex};
	use std::time::Duration;

	use tokio::sync::{mpsc, oneshot};
	use tokio::task::yield_now;
	use tokio::time::advance;

	use crate::actor::messages::{ActorEvent, StateDelta};
	use crate::actor::preload::PreloadedPersistedActor;
	use crate::actor::state::PersistedActor;
	use crate::actor::task::{ActorTask, LifecycleCommand};
	use crate::actor::task_types::ShutdownKind;
	use crate::{ActorConfig, ActorContext, ActorFactory};

	fn new_task_with_factory(ctx: ActorContext, factory: Arc<ActorFactory>) -> ActorTask {
		let (_lifecycle_tx, lifecycle_rx) = mpsc::unbounded_channel();
		let (_dispatch_tx, dispatch_rx) = mpsc::unbounded_channel();
		let (_events_tx, events_rx) = mpsc::unbounded_channel();
		ActorTask::new(
			ctx.actor_id().to_owned(),
			0,
			lifecycle_rx,
			dispatch_rx,
			events_rx,
			factory,
			ctx,
			None,
			None,
		)
	}

	struct NotifyOnDrop(Mutex<Option<oneshot::Sender<()>>>);

	impl NotifyOnDrop {
		fn new(sender: oneshot::Sender<()>) -> Self {
			Self(Mutex::new(Some(sender)))
		}
	}

	impl Drop for NotifyOnDrop {
		fn drop(&mut self) {
			if let Some(sender) = self.0.lock().expect("drop notify lock poisoned").take() {
				let _ = sender.send(());
			}
		}
	}

	#[tokio::test]
	async fn startup_loads_preloaded_state_and_input_before_run_handler() {
		let ctx = crate::actor::context::tests::new_with_kv(
			"actor-preloaded-startup",
			"task-lifecycle",
			Vec::new(),
			"local",
			crate::kv::tests::new_in_memory(),
		);
		let (observed_send, observed_rx) = oneshot::channel();
		let observed_tx = Arc::new(Mutex::new(Some(observed_send)));

		let factory = Arc::new(ActorFactory::new(Default::default(), move |start| {
			let observed_tx = observed_tx.clone();
			Box::pin(async move {
				let mut events = start.events;
				observed_tx
					.lock()
					.expect("observed lock poisoned")
					.take()
					.expect("observed sender should exist")
					.send((
						start.input,
						start.snapshot,
						start.ctx.state(),
						start.ctx.persisted_actor().has_initialized,
						events.try_recv().is_none(),
					))
					.expect("startup observation should send");

				while let Some(event) = events.recv().await {
					match event {
						ActorEvent::SerializeState { reply, .. } => {
							reply.send(Ok(vec![StateDelta::ActorState(start.ctx.state())]));
						}
						ActorEvent::RunGracefulCleanup { reply, .. } => {
							reply.send(Ok(()));
						}
						_ => {}
					}
				}
				Ok(())
			})
		}));
		let mut task = ActorTask::new(
			"actor-preloaded-startup".to_owned(),
			0,
			mpsc::unbounded_channel().1,
			mpsc::unbounded_channel().1,
			mpsc::unbounded_channel().1,
			factory,
			ctx.clone(),
			Some(vec![7, 7, 7]),
			None,
		)
		.with_preloaded_persisted_actor(PreloadedPersistedActor::Some(PersistedActor {
			input: Some(vec![1, 2, 3]),
			has_initialized: false,
			state: vec![9, 8, 7],
			scheduled_events: Vec::new(),
		}));

		let (start_tx, start_rx) = oneshot::channel();
		task.handle_lifecycle(LifecycleCommand::Start { reply: start_tx })
			.await;
		start_rx
			.await
			.expect("start reply should send")
			.expect("start should succeed");

		let (input, snapshot, state, has_initialized, no_initial_event) =
			observed_rx.await.expect("startup observation should send");
		assert_eq!(input, Some(vec![1, 2, 3]));
		assert_eq!(snapshot, None);
		assert_eq!(state, vec![9, 8, 7]);
		assert!(has_initialized);
		assert!(no_initial_event);

		let run_handle = task.run_handle.take().expect("run handle should exist");
		run_handle.abort();
		let _ = run_handle.await;
	}

	#[tokio::test(start_paused = true)]
	async fn sleep_shutdown_aborts_stuck_run_handler_at_grace_deadline() {
		let ctx = crate::actor::context::tests::new_with_kv(
			"actor-stuck-run-grace",
			"task-lifecycle",
			Vec::new(),
			"local",
			crate::kv::tests::new_in_memory(),
		);
		let grace_period = Duration::from_millis(100);
		let dropped_count = Arc::new(AtomicUsize::new(0));
		let (drop_tx, drop_rx) = oneshot::channel();
		let drop_tx = Arc::new(Mutex::new(Some(drop_tx)));
		let dropped_count_for_factory = dropped_count.clone();
		let factory = Arc::new(ActorFactory::new(
			ActorConfig {
				sleep_grace_period: grace_period,
				sleep_grace_period_overridden: true,
				..ActorConfig::default()
			},
			move |_start| {
				let dropped_count = dropped_count_for_factory.clone();
				let drop_tx = drop_tx.clone();
				Box::pin(async move {
					let sender = drop_tx
						.lock()
						.expect("drop sender lock poisoned")
						.take()
						.expect("drop sender should exist");
					let _notify = NotifyOnDrop::new(sender);
					dropped_count.fetch_add(1, Ordering::SeqCst);
					std::future::pending::<()>().await;
					Ok(())
				})
			},
		));
		let mut task = new_task_with_factory(ctx, factory);

		let (start_tx, start_rx) = oneshot::channel();
		task.handle_lifecycle(LifecycleCommand::Start { reply: start_tx })
			.await;
		start_rx
			.await
			.expect("start reply should send")
			.expect("start should succeed");
		yield_now().await;
		assert_eq!(dropped_count.load(Ordering::SeqCst), 1);

		let stop = tokio::spawn(async move { task.handle_stop(ShutdownKind::Sleep).await });
		yield_now().await;
		assert!(
			!stop.is_finished(),
			"sleep shutdown should wait for the shared grace deadline"
		);

		advance(grace_period - Duration::from_millis(1)).await;
		yield_now().await;
		assert!(
			!stop.is_finished(),
			"stuck run handler should not be aborted before the grace deadline"
		);

		advance(Duration::from_millis(1)).await;
		stop.await
			.expect("sleep stop join should succeed")
			.expect("sleep stop should succeed after grace timeout");
		drop_rx
			.await
			.expect("stuck run handler should be aborted at the grace deadline");
	}
}