rivetkit-core 2.3.0-rc.12

Core runtime primitives for RivetKit actor hosts
use super::*;

pub(crate) fn new_in_memory() -> Kv {
	Kv::new_in_memory()
}

mod moved_tests {
	use std::{
		sync::{Arc, Condvar, Mutex, mpsc},
		time::Duration,
	};

	use crate::types::ListOpts;

	#[tokio::test]
	async fn in_memory_backend_supports_basic_crud_and_listing() {
		let kv = super::new_in_memory();

		kv.batch_put(&[(b"alpha".as_slice(), b"1".as_slice())])
			.await
			.expect("batch put should succeed");
		kv.batch_put(&[(b"beta".as_slice(), b"2".as_slice())])
			.await
			.expect("second batch put should succeed");

		let values = kv
			.batch_get(&[b"alpha".as_slice(), b"beta".as_slice()])
			.await
			.expect("batch get should succeed");
		assert_eq!(values, vec![Some(b"1".to_vec()), Some(b"2".to_vec())]);

		let prefix = kv
			.list_prefix(b"a", ListOpts::default())
			.await
			.expect("list prefix should succeed");
		assert_eq!(prefix, vec![(b"alpha".to_vec(), b"1".to_vec())]);

		let range = kv
			.list_range(
				b"alpha",
				b"gamma",
				ListOpts {
					reverse: true,
					limit: Some(1),
				},
			)
			.await
			.expect("list range should succeed");
		assert_eq!(range, vec![(b"beta".to_vec(), b"2".to_vec())]);

		kv.delete_range(b"alpha", b"beta")
			.await
			.expect("delete range should succeed");
		kv.batch_delete(&[b"beta".as_slice()])
			.await
			.expect("batch delete should succeed");

		let remaining = kv
			.batch_get(&[b"alpha".as_slice(), b"beta".as_slice()])
			.await
			.expect("batch get after deletes should succeed");
		assert_eq!(remaining, vec![None, None]);
	}

	#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
	async fn in_memory_delete_range_blocks_concurrent_put_until_delete_commits() {
		let kv = super::new_in_memory();
		kv.put(b"alpha-old", b"old")
			.await
			.expect("seed put should succeed");

		let delete_started = Arc::new((Mutex::new(false), Condvar::new()));
		let release_delete = Arc::new((Mutex::new(false), Condvar::new()));
		kv.test_set_delete_range_after_write_lock_hook({
			let delete_started = Arc::clone(&delete_started);
			let release_delete = Arc::clone(&release_delete);
			move || {
				let (started, started_cv) = &*delete_started;
				*started.lock().expect("delete-started lock poisoned") = true;
				started_cv.notify_one();

				let (release, release_cv) = &*release_delete;
				let released = release.lock().expect("delete-release lock poisoned");
				let _released = release_cv
					.wait_while(released, |released| !*released)
					.expect("delete-release lock poisoned");
			}
		});

		let delete_task = tokio::spawn({
			let kv = kv.clone();
			async move {
				kv.delete_range(b"alpha", b"beta")
					.await
					.expect("delete range should succeed");
			}
		});

		let (started, started_cv) = &*delete_started;
		let started = started.lock().expect("delete-started lock poisoned");
		let (started, _) = started_cv
			.wait_timeout_while(started, Duration::from_secs(2), |started| !*started)
			.expect("delete-started lock poisoned");
		assert!(
			*started,
			"delete_range should reach the write-locked section"
		);
		drop(started);

		let (put_attempted_tx, put_attempted_rx) = mpsc::channel();
		let (put_done_tx, put_done_rx) = mpsc::channel();
		let put_task = tokio::spawn({
			let kv = kv.clone();
			async move {
				put_attempted_tx
					.send(())
					.expect("put-attempted receiver should still be alive");
				kv.put(b"alpha-new", b"new")
					.await
					.expect("concurrent put should succeed");
				put_done_tx
					.send(())
					.expect("put-done receiver should still be alive");
			}
		});

		put_attempted_rx
			.recv_timeout(Duration::from_secs(2))
			.expect("concurrent put should start");
		assert!(
			put_done_rx.recv_timeout(Duration::from_millis(50)).is_err(),
			"concurrent put must not commit while delete_range holds the write lock",
		);

		let (release, release_cv) = &*release_delete;
		*release.lock().expect("delete-release lock poisoned") = true;
		release_cv.notify_one();

		delete_task.await.expect("delete task should not panic");
		put_task.await.expect("put task should not panic");
		put_done_rx
			.recv_timeout(Duration::from_secs(2))
			.expect("concurrent put should finish after delete_range commits");

		assert_eq!(
			kv.get(b"alpha-old")
				.await
				.expect("old key lookup should succeed"),
			None,
		);
		assert_eq!(
			kv.get(b"alpha-new")
				.await
				.expect("new key lookup should succeed"),
			Some(b"new".to_vec()),
		);
	}
}