rivetkit 2.3.0-rc.12

Rust SDK for RivetKit actors, actions, events, queues, and test harnesses
mod moved_tests {
	use futures::StreamExt;
	use tokio_util::sync::CancellationToken;

	use crate::queue::{QueueStreamExt, QueueStreamOpts};
	use rivetkit_core::{ActorContext, Kv};

	#[tokio::test]
	async fn queue_stream_yields_messages_through_stream_ext_combinators() {
		let ctx = ActorContext::new_with_kv(
			"actor-id",
			"test",
			Vec::new(),
			"local",
			Kv::new_in_memory(),
		);
		let queue = ctx.queue();

		queue.send("alpha", br#"{"value":1}"#).await.expect("send alpha");
		queue.send("beta", br#"{"value":2}"#).await.expect("send beta");

		let names = queue
			.stream(QueueStreamOpts::default())
			.map(|message| message.name)
			.take(2)
			.collect::<Vec<_>>()
			.await;

		assert_eq!(names, vec!["alpha".to_owned(), "beta".to_owned()]);
	}

	#[tokio::test]
	async fn queue_stream_honors_name_filters() {
		let ctx = ActorContext::new_with_kv(
			"actor-id",
			"test",
			Vec::new(),
			"local",
			Kv::new_in_memory(),
		);
		let queue = ctx.queue();

		queue.send("skip", b"1").await.expect("send skip");
		queue.send("keep", b"2").await.expect("send keep");

		let message = queue
			.stream(QueueStreamOpts {
				names: Some(vec!["keep".to_owned()]),
				signal: None,
			})
			.next()
			.await
			.expect("filtered stream should yield");

		assert_eq!(message.name, "keep");
		assert_eq!(message.body, b"2");
	}

	#[tokio::test]
	async fn queue_stream_ends_when_cancellation_signal_is_already_fired() {
		let ctx = ActorContext::new_with_kv(
			"actor-id",
			"test",
			Vec::new(),
			"local",
			Kv::new_in_memory(),
		);
		let queue = ctx.queue();
		let signal = CancellationToken::new();
		signal.cancel();

		let next = queue
			.stream(QueueStreamOpts {
				names: None,
				signal: Some(signal),
			})
			.next()
			.await;

		assert!(next.is_none());
	}
}