rivetkit-core 2.3.0-rc.12

Core runtime primitives for RivetKit actor hosts
use super::*;

mod moved_tests {
	use super::{Inspector, InspectorSignal, InspectorSnapshot};
	use crate::QueueNextOpts;
	use crate::actor::connection::{
		ConnHandle, PersistedConnection, PersistedSubscription, encode_persisted_connection,
	};
	use crate::actor::context::tests::new_with_kv;
	use crate::actor::keys::{INSPECTOR_TOKEN_KEY, make_connection_key};
	use crate::actor::messages::StateDelta;
	use crate::inspector::InspectorAuth;
	use crate::inspector::auth::test_inspector_env_lock;
	use rivet_error::RivetError;
	use std::sync::Arc;
	use std::sync::atomic::{AtomicUsize, Ordering};

	#[tokio::test]
	async fn state_updates_increment_inspector_revisions() {
		let ctx = new_with_kv(
			"actor-1",
			"inspector-state",
			Vec::new(),
			"local",
			crate::kv::tests::new_in_memory(),
		);
		let inspector = Inspector::new();

		ctx.configure_inspector(Some(inspector.clone()));
		ctx.save_state(vec![StateDelta::ActorState(vec![1, 2, 3])])
			.await
			.expect("state save should succeed");

		assert_eq!(
			inspector.snapshot(),
			InspectorSnapshot {
				state_revision: 1,
				..InspectorSnapshot::default()
			},
		);
	}

	#[tokio::test]
	async fn connection_lifecycle_updates_inspector_snapshot() {
		let kv = crate::kv::tests::new_in_memory();
		let ctx = new_with_kv(
			"actor-1",
			"inspector-connections",
			Vec::new(),
			"local",
			kv.clone(),
		);
		let inspector = Inspector::new();

		ctx.configure_inspector(Some(inspector.clone()));

		let conn = ConnHandle::new("conn-1", vec![1], vec![2], false);
		ctx.add_conn(conn.clone());
		assert_eq!(
			inspector.snapshot(),
			InspectorSnapshot {
				connections_revision: 1,
				active_connections: 1,
				..InspectorSnapshot::default()
			},
		);

		ctx.remove_conn(conn.id());
		assert_eq!(
			inspector.snapshot(),
			InspectorSnapshot {
				connections_revision: 2,
				active_connections: 0,
				..InspectorSnapshot::default()
			},
		);

		let restored = PersistedConnection {
			id: "restored-1".into(),
			parameters: vec![9],
			state: vec![8],
			subscriptions: vec![PersistedSubscription {
				event_name: "counter.updated".into(),
			}],
			gateway_id: [1, 2, 3, 4],
			request_id: [5, 6, 7, 8],
			server_message_index: 3,
			client_message_index: 4,
			request_path: "/socket".into(),
			request_headers: Default::default(),
		};
		kv.put(
			&make_connection_key(&restored.id),
			&encode_persisted_connection(&restored).expect("encode restored connection"),
		)
		.await
		.expect("persist restored connection");

		let restored_connections = ctx
			.restore_hibernatable_connections()
			.await
			.expect("restore should succeed");
		assert_eq!(restored_connections.len(), 1);
		assert_eq!(
			inspector.snapshot(),
			InspectorSnapshot {
				connections_revision: 3,
				active_connections: 1,
				..InspectorSnapshot::default()
			},
		);

		ctx.remove_conn("restored-1");
		assert_eq!(
			inspector.snapshot(),
			InspectorSnapshot {
				connections_revision: 4,
				active_connections: 0,
				..InspectorSnapshot::default()
			},
		);
	}

	#[tokio::test]
	async fn queue_lifecycle_updates_inspector_snapshot() {
		let ctx = new_with_kv(
			"actor-1",
			"inspector-queue",
			Vec::new(),
			"local",
			crate::kv::tests::new_in_memory(),
		);
		let inspector = Inspector::new();

		ctx.configure_inspector(Some(inspector.clone()));

		ctx.queue()
			.send("jobs", b"first")
			.await
			.expect("enqueue should succeed");
		assert_eq!(
			inspector.snapshot(),
			InspectorSnapshot {
				queue_revision: 1,
				queue_size: 1,
				..InspectorSnapshot::default()
			},
		);

		let received = ctx
			.queue()
			.next(QueueNextOpts::default())
			.await
			.expect("queue next should succeed")
			.expect("message should exist");
		assert_eq!(received.body, b"first".to_vec());
		assert_eq!(
			inspector.snapshot(),
			InspectorSnapshot {
				queue_revision: 2,
				queue_size: 0,
				..InspectorSnapshot::default()
			},
		);

		ctx.queue()
			.send("jobs", b"second")
			.await
			.expect("second enqueue should succeed");
		assert_eq!(
			inspector.snapshot(),
			InspectorSnapshot {
				queue_revision: 3,
				queue_size: 1,
				..InspectorSnapshot::default()
			},
		);

		let completable = ctx
			.queue()
			.next(QueueNextOpts {
				names: None,
				timeout: None,
				signal: None,
				completable: true,
			})
			.await
			.expect("completable receive should succeed")
			.expect("completable message should exist");
		assert_eq!(
			inspector.snapshot(),
			InspectorSnapshot {
				queue_revision: 4,
				queue_size: 1,
				..InspectorSnapshot::default()
			},
		);

		completable
			.complete(Some(vec![7]))
			.await
			.expect("queue ack should succeed");
		assert_eq!(
			inspector.snapshot(),
			InspectorSnapshot {
				queue_revision: 5,
				queue_size: 0,
				..InspectorSnapshot::default()
			},
		);
	}

	#[test]
	fn inspector_subscriptions_track_connected_clients_and_cleanup() {
		let inspector = Inspector::new();
		let state_updates = Arc::new(AtomicUsize::new(0));
		let queue_updates = Arc::new(AtomicUsize::new(0));
		let state_updates_clone = state_updates.clone();
		let queue_updates_clone = queue_updates.clone();

		let subscription = inspector.subscribe(Arc::new(move |signal| match signal {
			InspectorSignal::StateUpdated => {
				state_updates_clone.fetch_add(1, Ordering::SeqCst);
			}
			InspectorSignal::QueueUpdated => {
				queue_updates_clone.fetch_add(1, Ordering::SeqCst);
			}
			InspectorSignal::ConnectionsUpdated | InspectorSignal::WorkflowHistoryUpdated => {}
		}));

		assert_eq!(inspector.snapshot().connected_clients, 1);

		inspector.record_state_updated();
		inspector.record_queue_updated(3);

		assert_eq!(state_updates.load(Ordering::SeqCst), 1);
		assert_eq!(queue_updates.load(Ordering::SeqCst), 1);

		drop(subscription);

		assert_eq!(inspector.snapshot().connected_clients, 0);

		inspector.record_state_updated();
		assert_eq!(state_updates.load(Ordering::SeqCst), 1);
	}

	#[tokio::test]
	async fn inspector_auth_uses_env_token_before_kv_fallback() {
		let _env_guard = test_inspector_env_lock().lock().expect("env lock poisoned");
		unsafe {
			std::env::set_var("_RIVET_TEST_INSPECTOR_TOKEN", "env-token");
		}

		let kv = crate::kv::tests::new_in_memory();
		let ctx = new_with_kv(
			"actor-1",
			"inspector-auth-env",
			Vec::new(),
			"local",
			kv.clone(),
		);
		kv.put(&INSPECTOR_TOKEN_KEY, b"kv-token")
			.await
			.expect("kv token should persist");

		InspectorAuth::new()
			.verify(&ctx, Some("env-token"))
			.await
			.expect("env token should authorize");

		let error = InspectorAuth::new()
			.verify(&ctx, Some("kv-token"))
			.await
			.expect_err("kv token should not bypass configured env token");
		let error = RivetError::extract(&error);
		assert_eq!(error.group(), "inspector");
		assert_eq!(error.code(), "unauthorized");

		unsafe {
			std::env::remove_var("_RIVET_TEST_INSPECTOR_TOKEN");
		}
	}

	#[tokio::test]
	async fn inspector_auth_falls_back_to_actor_kv_token() {
		let _env_guard = test_inspector_env_lock().lock().expect("env lock poisoned");
		unsafe {
			std::env::remove_var("_RIVET_TEST_INSPECTOR_TOKEN");
		}

		let kv = crate::kv::tests::new_in_memory();
		let ctx = new_with_kv(
			"actor-1",
			"inspector-auth-kv",
			Vec::new(),
			"local",
			kv.clone(),
		);
		kv.put(&INSPECTOR_TOKEN_KEY, b"kv-token")
			.await
			.expect("kv token should persist");

		InspectorAuth::new()
			.verify(&ctx, Some("kv-token"))
			.await
			.expect("kv token should authorize");

		let error = InspectorAuth::new()
			.verify(&ctx, Some("nope"))
			.await
			.expect_err("wrong token should fail");
		let error = RivetError::extract(&error);
		assert_eq!(error.group(), "inspector");
		assert_eq!(error.code(), "unauthorized");
	}

	#[tokio::test]
	async fn inspector_auth_rejects_missing_token() {
		let _env_guard = test_inspector_env_lock().lock().expect("env lock poisoned");
		unsafe {
			std::env::remove_var("_RIVET_TEST_INSPECTOR_TOKEN");
		}

		let ctx = new_with_kv(
			"actor-1",
			"inspector-auth-missing",
			Vec::new(),
			"local",
			crate::kv::tests::new_in_memory(),
		);

		let error = InspectorAuth::new()
			.verify(&ctx, None)
			.await
			.expect_err("missing token should fail");
		let error = RivetError::extract(&error);
		assert_eq!(error.group(), "inspector");
		assert_eq!(error.code(), "unauthorized");
	}
}