use std::sync::Arc;
use std::time::Duration;
use anyhow::Result;
use async_trait::async_trait;
use folk_api::{Executor, Plugin, PluginContext, ServerPlugin, ServerPluginWrapper};
use tokio::sync::watch;
use tokio::time::timeout;
struct EchoPlugin;
#[async_trait]
impl ServerPlugin for EchoPlugin {
fn name(&self) -> &'static str {
"echo"
}
async fn run(&self, mut ctx: PluginContext) -> Result<()> {
if let Err(e) = ctx.shutdown.changed().await {
tracing::error!(error = %e, "shutdown sender dropped unexpectedly");
}
Ok(())
}
}
struct NoopExecutor;
#[async_trait]
impl Executor for NoopExecutor {
async fn execute_method(&self, _: &str, _: bytes::Bytes) -> Result<bytes::Bytes> {
unimplemented!("not used in this test")
}
}
#[tokio::test]
async fn server_plugin_wrapper_boots_and_shuts_down() {
let mut wrapper = ServerPluginWrapper::new(EchoPlugin);
let (tx, rx) = watch::channel(false);
let ctx = PluginContext {
executor: Arc::new(NoopExecutor),
shutdown: rx,
rpc_registrar: None,
health_registry: None,
metrics_registry: None,
};
wrapper.boot(ctx).await.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
tx.send(true).unwrap();
wrapper.shutdown().await.unwrap();
}
#[tokio::test]
async fn server_plugin_wrapper_double_boot_returns_error() {
let mut wrapper = ServerPluginWrapper::new(EchoPlugin);
let (_tx, rx) = watch::channel(false);
let ctx = PluginContext {
executor: Arc::new(NoopExecutor),
shutdown: rx.clone(),
rpc_registrar: None,
health_registry: None,
metrics_registry: None,
};
let ctx2 = PluginContext {
executor: Arc::new(NoopExecutor),
shutdown: rx,
rpc_registrar: None,
health_registry: None,
metrics_registry: None,
};
wrapper.boot(ctx).await.unwrap();
let err = wrapper.boot(ctx2).await;
assert!(err.is_err(), "second boot() must return Err");
assert!(
err.unwrap_err().to_string().contains("already running"),
"error message must mention 'already running'"
);
}
#[tokio::test]
async fn server_plugin_wrapper_shutdown_aborts_stuck_task() {
struct StubbornPlugin;
#[async_trait]
impl ServerPlugin for StubbornPlugin {
fn name(&self) -> &'static str {
"stubborn"
}
async fn run(&self, _ctx: PluginContext) -> Result<()> {
std::future::pending::<()>().await;
Ok(())
}
}
let mut wrapper = ServerPluginWrapper::new(StubbornPlugin);
let (_tx, rx) = watch::channel(false);
let ctx = PluginContext {
executor: Arc::new(NoopExecutor),
shutdown: rx,
rpc_registrar: None,
health_registry: None,
metrics_registry: None,
};
wrapper.boot(ctx).await.unwrap();
let result = timeout(Duration::from_secs(5), wrapper.shutdown()).await;
assert!(result.is_ok(), "shutdown() deadlocked");
result.unwrap().unwrap();
}