use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use anyhow::{Context, Result};
use rivetkit_client::{Client, ClientConfig, GetOrCreateOptions};
use rivetkit_core::ServeConfig;
use serde_json::Value as JsonValue;
use tokio::net::TcpStream;
use tokio::sync::{Mutex, oneshot};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use crate::{
Action, Actor, Handles, IntoActorKey, TypedActorConnection, TypedActorHandle,
registry::Registry, typed_client::encode_action_args,
};
const ENDPOINT: &str = "http://127.0.0.1:6420";
const ENGINE_PORT: u16 = 6420;
const TOKEN: &str = "dev";
const NAMESPACE: &str = "default";
const READY_TIMEOUT: Duration = Duration::from_secs(30);
static ENGINE_LOCK: Mutex<()> = Mutex::const_new(());
static POOL_SEQ: AtomicU64 = AtomicU64::new(0);
pub async fn setup(registry: Registry) -> Result<TestHandle> {
let pool_name = format!("rivetkit-test-{}", POOL_SEQ.fetch_add(1, Ordering::Relaxed));
let config = ServeConfig {
endpoint: ENDPOINT.to_owned(),
token: Some(TOKEN.to_owned()),
namespace: NAMESPACE.to_owned(),
pool_name: pool_name.clone(),
..ServeConfig::default()
};
let shutdown = CancellationToken::new();
let serve = {
let _guard = ENGINE_LOCK.lock().await;
let (ready_tx, ready_rx) = oneshot::channel();
let serve = tokio::spawn({
let shutdown = shutdown.clone();
async move {
registry
.serve_with_config_and_handle_observer(config, shutdown, move |_| {
let _ = ready_tx.send(());
})
.await
}
});
wait_for_port().await;
ready_rx.await.context("wait for registry envoy startup")?;
serve
};
let client = Client::new(
ClientConfig::new(ENDPOINT)
.token(TOKEN)
.namespace(NAMESPACE)
.pool_name(pool_name),
);
Ok(TestHandle {
client,
shutdown,
serve: Some(serve),
})
}
pub struct TestHandle {
client: Client,
shutdown: CancellationToken,
serve: Option<JoinHandle<Result<()>>>,
}
impl TestHandle {
pub fn actor<A: Actor>(&self, name: &str) -> TestActor<A> {
self.actor_with_key(name, vec![format!("{name}-{}", unique_suffix())])
}
pub fn actor_with_key<A: Actor>(&self, name: &str, key: impl IntoActorKey) -> TestActor<A> {
self.actor_with_options(name, key, GetOrCreateOptions::default())
}
pub fn actor_with_options<A: Actor>(
&self,
name: &str,
key: impl IntoActorKey,
opts: GetOrCreateOptions,
) -> TestActor<A> {
TestActor {
handle: TypedActorHandle::new(
self.client
.get_or_create(name, key.into_actor_key(), opts)
.expect("build actor handle"),
),
}
}
pub fn client(&self) -> &Client {
&self.client
}
pub async fn shutdown(mut self) {
self.shutdown.cancel();
if let Some(serve) = self.serve.take() {
let _ = serve.await;
}
}
}
impl Drop for TestHandle {
fn drop(&mut self) {
self.shutdown.cancel();
}
}
pub struct TestActor<A: Actor> {
handle: TypedActorHandle<A>,
}
impl<A: Actor> TestActor<A> {
pub async fn action(&self, name: &str, args: Vec<JsonValue>) -> Result<JsonValue> {
let deadline = Instant::now() + READY_TIMEOUT;
loop {
match self.handle.inner().action(name, args.clone()).await {
Ok(value) => return Ok(value),
Err(error) if is_transient(&error) && Instant::now() < deadline => {
tokio::time::sleep(Duration::from_millis(250)).await;
}
Err(error) => return Err(error),
}
}
}
pub async fn send<M>(&self, action: M) -> Result<M::Output>
where
A: Handles<M>,
M: Action,
{
let args = encode_action_args(&action)?;
let output = self.action(M::NAME, args).await?;
serde_json::from_value(output)
.map_err(anyhow::Error::from)
.context("decode typed test action output")
}
pub fn handle(&self) -> &TypedActorHandle<A> {
&self.handle
}
pub fn connect(&self) -> TypedActorConnection<A> {
self.handle.connect()
}
}
fn is_transient(error: &anyhow::Error) -> bool {
let message = error.to_string();
message.contains("actor_ready_timeout")
|| message.contains("no_runner_config_configured")
|| message.contains("Service Unavailable")
}
async fn wait_for_port() {
let deadline = Instant::now() + Duration::from_secs(60);
while Instant::now() < deadline {
if TcpStream::connect(("127.0.0.1", ENGINE_PORT)).await.is_ok() {
return;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
fn unique_suffix() -> u128 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or_default()
}