rivetkit-core 2.3.0-rc.12

Core runtime primitives for RivetKit actor hosts
use super::*;

mod moved_tests {
	use super::{
		PersistedQueueMessage, QueueMetadata, QueueNextOpts, QueueWaitOpts, encode_queue_message,
		encode_queue_metadata,
	};

	use crate::actor::context::ActorContext;
	use crate::actor::keys::{
		QUEUE_MESSAGES_PREFIX, QUEUE_METADATA_KEY, decode_queue_message_key, make_queue_message_key,
	};
	use crate::actor::preload::PreloadedKv;
	use crate::kv::Kv;
	use std::time::Duration;
	use tokio::task::yield_now;
	use tokio_util::sync::CancellationToken;

	fn test_queue() -> ActorContext {
		ActorContext::new_with_kv(
			"actor-queue",
			"queue-test",
			Vec::new(),
			"local",
			Kv::new_in_memory(),
		)
	}

	fn assert_actor_aborted(error: anyhow::Error) {
		let error = rivet_error::RivetError::extract(&error);
		assert_eq!(error.group(), "actor");
		assert_eq!(error.code(), "aborted");
	}

	#[test]
	fn queue_message_keys_are_big_endian() {
		let first = make_queue_message_key(1);
		let second = make_queue_message_key(2);

		assert!(first < second);
		assert_eq!(QUEUE_METADATA_KEY, [5, 1, 1]);
		assert_eq!(first, vec![5, 1, 2, 0, 0, 0, 0, 0, 0, 0, 1]);
		assert_eq!(decode_queue_message_key(&first).expect("decode first"), 1);
		assert_eq!(decode_queue_message_key(&second).expect("decode second"), 2);
	}

	#[tokio::test]
	async fn inspect_messages_uses_preloaded_queue_entries_when_present() {
		let queue = ActorContext::new_with_kv(
			"actor-queue",
			"queue-preload",
			Vec::new(),
			"local",
			Kv::default(),
		);
		let metadata = QueueMetadata {
			next_id: 8,
			size: 1,
		};
		let persisted = PersistedQueueMessage {
			name: "preloaded".to_owned(),
			body: b"body".to_vec(),
			created_at: 42,
			failure_count: None,
			available_at: None,
			in_flight: None,
			in_flight_at: None,
		};
		queue.configure_preload(Some(PreloadedKv::new_with_requested_get_keys(
			[
				(
					QUEUE_METADATA_KEY.to_vec(),
					encode_queue_metadata(&metadata).expect("metadata should encode"),
				),
				(
					make_queue_message_key(7),
					encode_queue_message(&persisted).expect("message should encode"),
				),
			],
			vec![QUEUE_METADATA_KEY.to_vec()],
			vec![QUEUE_MESSAGES_PREFIX.to_vec()],
		)));

		let messages = queue
			.inspect_messages()
			.await
			.expect("queue should initialize from preload without touching kv");

		assert_eq!(messages.len(), 1);
		assert_eq!(messages[0].id, 7);
		assert_eq!(messages[0].name, "preloaded");
		assert_eq!(messages[0].body, b"body");
		assert_eq!(*queue.0.queue_metadata.lock().await, metadata);
	}

	#[tokio::test]
	async fn wait_for_names_returns_aborted_when_signal_is_already_cancelled() {
		let queue = test_queue();
		let signal = CancellationToken::new();
		signal.cancel();

		let error = queue
			.wait_for_names(
				vec!["missing".to_owned()],
				QueueWaitOpts {
					signal: Some(signal),
					..Default::default()
				},
			)
			.await
			.expect_err("already-cancelled waits should abort immediately");

		assert_actor_aborted(error);
	}

	#[tokio::test(start_paused = true)]
	async fn wait_for_names_returns_aborted_when_signal_cancels_during_wait() {
		let queue = test_queue();
		let signal = CancellationToken::new();
		let wait_signal = signal.clone();
		let wait_queue = queue.clone();

		let wait = tokio::spawn(async move {
			wait_queue
				.wait_for_names(
					vec!["missing".to_owned()],
					QueueWaitOpts {
						timeout: Some(Duration::from_secs(60)),
						signal: Some(wait_signal),
						..Default::default()
					},
				)
				.await
		});

		yield_now().await;
		signal.cancel();

		let error = wait
			.await
			.expect("wait task should join")
			.expect_err("cancelled waits should abort");

		assert_actor_aborted(error);
	}

	#[tokio::test(start_paused = true)]
	async fn next_returns_aborted_when_actor_signal_cancels_during_wait() {
		let queue = test_queue();

		let wait = tokio::spawn({
			let queue = queue.clone();
			async move {
				queue
					.next(QueueNextOpts {
						names: Some(vec!["missing".to_owned()]),
						timeout: Some(Duration::from_secs(60)),
						..Default::default()
					})
					.await
			}
		});

		yield_now().await;
		queue.cancel_actor_abort_signal();

		let error = wait
			.await
			.expect("wait task should join")
			.expect_err("cancelled actor waits should abort");

		assert_actor_aborted(error);
	}
}