rivetkit-core 2.3.0-rc.12

Core runtime primitives for RivetKit actor hosts
use super::*;

mod moved_tests {
	use std::sync::Arc;
	use std::sync::Mutex;

	use anyhow::Result;
	use futures::future::BoxFuture;
	use rivet_error::INTERNAL_ERROR;

	use super::{EventBroadcaster, dispatch_request, dispatch_websocket};
	use crate::actor::callbacks::{ActorInstanceCallbacks, Request, RequestCallback, Response};
	use crate::actor::connection::{ConnHandle, EventSendCallback, OutgoingEvent};
	use crate::actor::context::ActorContext;
	use crate::websocket::{WebSocket, WebSocketCloseCallback};

	fn request_callback<F>(callback: F) -> RequestCallback
	where
		F: Fn(
				crate::actor::callbacks::OnRequestRequest,
			) -> BoxFuture<'static, Result<crate::actor::callbacks::Response>>
			+ Send
			+ Sync
			+ 'static,
	{
		Box::new(callback)
	}

	#[test]
	fn broadcaster_only_fans_out_to_subscribed_connections() {
		let sent = Arc::new(Mutex::new(Vec::<(String, OutgoingEvent)>::new()));
		let sent_clone = sent.clone();
		let subscribed = ConnHandle::new("subscribed", Vec::new(), Vec::new(), false);
		let idle = ConnHandle::new("idle", Vec::new(), Vec::new(), false);

		let sender: EventSendCallback = Arc::new(move |event| {
			sent_clone
				.lock()
				.expect("sent events lock poisoned")
				.push(("subscribed".to_owned(), event));
			Ok(())
		});

		subscribed.configure_event_sender(Some(sender));
		subscribed.subscribe("updated");

		EventBroadcaster::default().broadcast(&[subscribed, idle], "updated", b"payload");

		assert_eq!(
			*sent.lock().expect("sent events lock poisoned"),
			vec![(
				"subscribed".to_owned(),
				OutgoingEvent {
					name: "updated".to_owned(),
					args: b"payload".to_vec(),
				},
			)]
		);
	}

	#[tokio::test]
	async fn request_dispatch_returns_callback_response() {
		let mut callbacks = ActorInstanceCallbacks::default();
		callbacks.on_request = Some(request_callback(|request| {
			Box::pin(async move {
				assert_eq!(request.request.uri().path(), "/ok");
				Ok(Response::from(
					http::Response::builder()
						.status(http::StatusCode::ACCEPTED)
						.body(b"ok".to_vec())
						.expect("accepted response should build"),
				))
			})
		}));

		let response = dispatch_request(
			&callbacks,
			ActorContext::default(),
			Request::from(
				http::Request::builder()
					.uri("/ok")
					.body(Vec::new())
					.expect("request should build"),
			),
		)
		.await;

		assert_eq!(response.status(), http::StatusCode::ACCEPTED);
		assert_eq!(response.body(), b"ok");
	}

	#[tokio::test]
	async fn request_dispatch_returns_500_on_error() {
		let mut callbacks = ActorInstanceCallbacks::default();
		callbacks.on_request = Some(request_callback(|_| {
			Box::pin(async move { Err(INTERNAL_ERROR.build()) })
		}));

		let response = dispatch_request(
			&callbacks,
			ActorContext::default(),
			Request::from(
				http::Request::builder()
					.uri("/boom")
					.body(Vec::new())
					.expect("request should build"),
			),
		)
		.await;

		assert_eq!(response.status(), http::StatusCode::INTERNAL_SERVER_ERROR);
		assert_eq!(response.body(), b"internal server error");
	}

	#[tokio::test]
	async fn websocket_dispatch_closes_on_callback_error() {
		let closed = Arc::new(Mutex::new(None::<(Option<u16>, Option<String>)>));
		let closed_clone = closed.clone();
		let ws = WebSocket::new();
		let close_callback: WebSocketCloseCallback = Arc::new(move |code, reason| {
			*closed_clone.lock().expect("closed websocket lock poisoned") = Some((code, reason));
			Ok(())
		});
		ws.configure_close_callback(Some(close_callback));

		let mut callbacks = ActorInstanceCallbacks::default();
		callbacks.on_websocket = Some(Box::new(|_| {
			Box::pin(async move { Err(INTERNAL_ERROR.build()) })
		}));

		dispatch_websocket(&callbacks, ActorContext::default(), ws).await;

		assert_eq!(
			*closed.lock().expect("closed websocket lock poisoned"),
			Some((Some(1011), Some("Server Error".to_owned())))
		);
	}
}