use crate::backends::OopBackend;
use crate::client_hub::ClientHub;
use crate::config::ConfigProvider;
use crate::registry::ModuleRegistry;
use crate::runtime::shutdown;
use crate::runtime::{DbOptions, HostRuntime};
use std::collections::HashMap;
use std::path::PathBuf;
use std::{future::Future, pin::Pin, sync::Arc};
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
pub struct ClientRegistration {
register_fn: Box<dyn FnOnce(&ClientHub) + Send>,
}
impl ClientRegistration {
pub fn new<T>(client: Arc<T>) -> Self
where
T: ?Sized + Send + Sync + 'static,
{
Self {
register_fn: Box::new(move |hub| {
hub.register::<T>(client);
}),
}
}
pub(crate) fn apply(self, hub: &ClientHub) {
(self.register_fn)(hub);
}
}
pub enum ShutdownOptions {
Signals,
Token(CancellationToken),
Future(Pin<Box<dyn Future<Output = ()> + Send>>),
}
#[derive(Clone)]
pub struct OopModuleSpawnConfig {
pub module_name: String,
pub binary: PathBuf,
pub args: Vec<String>,
pub env: HashMap<String, String>,
pub working_directory: Option<String>,
pub rendered_config_json: String,
}
pub struct OopSpawnOptions {
pub modules: Vec<OopModuleSpawnConfig>,
pub backend: Box<dyn OopBackend>,
}
pub struct RunOptions {
pub modules_cfg: Arc<dyn ConfigProvider>,
pub db: DbOptions,
pub shutdown: ShutdownOptions,
pub clients: Vec<ClientRegistration>,
pub instance_id: Uuid,
pub oop: Option<OopSpawnOptions>,
pub shutdown_deadline: Option<std::time::Duration>,
}
pub async fn run(opts: RunOptions) -> anyhow::Result<()> {
let cancel = match &opts.shutdown {
ShutdownOptions::Token(t) => t.clone(),
_ => CancellationToken::new(),
};
match opts.shutdown {
ShutdownOptions::Signals => {
let c = cancel.clone();
tokio::spawn(async move {
match shutdown::wait_for_shutdown().await {
Ok(()) => {
tracing::info!(target: "", "------------------");
tracing::info!("shutdown: signal received");
}
Err(e) => {
tracing::warn!(
error = %e,
"shutdown: primary waiter failed; falling back to ctrl_c()"
);
_ = tokio::signal::ctrl_c().await;
}
}
c.cancel();
});
}
ShutdownOptions::Future(waiter) => {
let c = cancel.clone();
tokio::spawn(async move {
waiter.await;
tracing::info!("shutdown: external future completed");
c.cancel();
});
}
ShutdownOptions::Token(_) => {
tracing::info!("shutdown: external token will control lifecycle");
}
}
let registry = ModuleRegistry::discover_and_build()?;
let hub = Arc::new(ClientHub::default());
for registration in opts.clients {
registration.apply(&hub);
}
let mut host = HostRuntime::new(
registry,
opts.modules_cfg.clone(),
opts.db,
hub,
cancel.clone(),
opts.instance_id,
opts.oop,
);
if let Some(deadline) = opts.shutdown_deadline {
host = host.with_shutdown_deadline(deadline);
}
host.run_module_phases().await
}