use std::collections::BTreeMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use super::events::ProcessAwaitOutput;
use super::model::{
ProcessExecutionContext, ProcessExecutionEnvSpec, ProcessIdentity, ProcessInput,
ProcessRegistration,
};
use super::registry::ProcessRegistry;
pub type ProcessEngineShutdownFuture<'run> = Pin<Box<dyn Future<Output = ()> + Send + 'run>>;
pub struct ProcessEngineRunGuard<'run> {
shutdown: Option<Box<dyn FnOnce() -> ProcessEngineShutdownFuture<'run> + Send + 'run>>,
}
impl<'run> ProcessEngineRunGuard<'run> {
pub(crate) fn new(
shutdown: impl FnOnce() -> ProcessEngineShutdownFuture<'run> + Send + 'run,
) -> Self {
Self {
shutdown: Some(Box::new(shutdown)),
}
}
pub async fn shutdown(mut self) {
if let Some(shutdown) = self.shutdown.take() {
shutdown().await;
}
}
}
pub struct ProcessEngineRuntimeContext<'run> {
context: crate::RuntimeExecutionContext<'run>,
guard: ProcessEngineRunGuard<'run>,
}
impl<'run> ProcessEngineRuntimeContext<'run> {
pub(crate) fn new(
context: crate::RuntimeExecutionContext<'run>,
guard: ProcessEngineRunGuard<'run>,
) -> Self {
Self { context, guard }
}
pub fn context(&self) -> &crate::RuntimeExecutionContext<'run> {
&self.context
}
pub fn context_mut(&mut self) -> &mut crate::RuntimeExecutionContext<'run> {
&mut self.context
}
pub fn into_parts(
self,
) -> (
crate::RuntimeExecutionContext<'run>,
ProcessEngineRunGuard<'run>,
) {
(self.context, self.guard)
}
pub async fn shutdown(self) {
self.guard.shutdown().await;
}
}
type RuntimeContextBuilder<'run> = Box<
dyn FnOnce(
Arc<crate::ToolCatalog>,
) -> Result<ProcessEngineRuntimeContext<'run>, crate::PluginError>
+ Send
+ 'run,
>;
pub struct ProcessEngineRunContext<'run> {
registration: ProcessRegistration,
execution_context: ProcessExecutionContext,
registry: Arc<dyn ProcessRegistry>,
session_id: String,
plugins: Arc<crate::PluginSession>,
store: Option<Arc<dyn crate::RuntimePersistence>>,
session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
queued_work_poke: Option<crate::QueuedWorkPoke>,
process_registry_available: bool,
cancellation: CancellationToken,
turn_phase_probe: Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
runtime_context_builder: Option<RuntimeContextBuilder<'run>>,
}
impl<'run> ProcessEngineRunContext<'run> {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
registration: ProcessRegistration,
execution_context: ProcessExecutionContext,
registry: Arc<dyn ProcessRegistry>,
session_id: String,
plugins: Arc<crate::PluginSession>,
store: Option<Arc<dyn crate::RuntimePersistence>>,
session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
queued_work_poke: Option<crate::QueuedWorkPoke>,
process_registry_available: bool,
cancellation: CancellationToken,
turn_phase_probe: Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
runtime_context_builder: RuntimeContextBuilder<'run>,
) -> Self {
Self {
registration,
execution_context,
registry,
session_id,
plugins,
store,
session_store_factory,
queued_work_poke,
process_registry_available,
cancellation,
turn_phase_probe,
runtime_context_builder: Some(runtime_context_builder),
}
}
pub fn registration(&self) -> &ProcessRegistration {
&self.registration
}
pub fn execution_context(&self) -> &ProcessExecutionContext {
&self.execution_context
}
pub fn registry(&self) -> Arc<dyn ProcessRegistry> {
Arc::clone(&self.registry)
}
pub fn session_id(&self) -> &str {
&self.session_id
}
pub fn plugins(&self) -> Arc<crate::PluginSession> {
Arc::clone(&self.plugins)
}
pub fn store(&self) -> Option<Arc<dyn crate::RuntimePersistence>> {
self.store.clone()
}
pub fn session_store_factory(&self) -> Option<Arc<dyn crate::SessionStoreFactory>> {
self.session_store_factory.clone()
}
pub fn queued_work_poke(&self) -> Option<crate::QueuedWorkPoke> {
self.queued_work_poke.clone()
}
pub fn process_registry_available(&self) -> bool {
self.process_registry_available
}
pub fn cancellation_token(&self) -> CancellationToken {
self.cancellation.clone()
}
#[doc(hidden)]
pub fn named_phase(&self, phase: &'static str) -> crate::runtime::RuntimeNamedPhase {
crate::runtime::RuntimeNamedPhase::begin(self.turn_phase_probe.clone(), phase)
}
#[doc(hidden)]
pub fn turn_phase_probe(&self) -> Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>> {
self.turn_phase_probe.clone()
}
pub fn resolved_tool_catalog(&self) -> Result<Arc<crate::ToolCatalog>, crate::PluginError> {
self.plugins
.resolved_tool_catalog(&self.session_id)
.map_err(crate::PluginError::from)
}
pub fn into_runtime_context(
mut self,
tool_catalog: Arc<crate::ToolCatalog>,
) -> Result<ProcessEngineRuntimeContext<'run>, crate::PluginError> {
let builder = self.runtime_context_builder.take().ok_or_else(|| {
crate::PluginError::Session("process engine runtime context was already built".into())
})?;
builder(tool_catalog)
}
}
pub struct ProcessEngineValidationContext<'a> {
plugin_host: &'a crate::PluginHost,
tool_catalog: Arc<crate::ToolCatalog>,
process_registry_available: bool,
}
impl<'a> ProcessEngineValidationContext<'a> {
pub(crate) fn new(
plugin_host: &'a crate::PluginHost,
tool_catalog: Arc<crate::ToolCatalog>,
process_registry_available: bool,
) -> Self {
Self {
plugin_host,
tool_catalog,
process_registry_available,
}
}
pub fn plugin_host(&self) -> &crate::PluginHost {
self.plugin_host
}
pub fn tool_catalog(&self) -> &crate::ToolCatalog {
self.tool_catalog.as_ref()
}
pub fn process_registry_available(&self) -> bool {
self.process_registry_available
}
}
#[async_trait::async_trait]
pub trait ProcessEngine: Send + Sync {
fn kind(&self) -> &'static str;
async fn validate_start(
&self,
_context: ProcessEngineValidationContext<'_>,
_payload: &serde_json::Value,
_env_spec: Option<&ProcessExecutionEnvSpec>,
) -> Result<(), crate::PluginError> {
Ok(())
}
async fn run(
&self,
context: ProcessEngineRunContext<'_>,
payload: serde_json::Value,
) -> ProcessAwaitOutput;
fn identity(&self, payload: &serde_json::Value) -> ProcessIdentity {
let _ = payload;
ProcessIdentity::new(self.kind())
}
}
#[derive(Clone, Default)]
pub struct ProcessEngineRegistry {
engines: Arc<BTreeMap<String, Arc<dyn ProcessEngine>>>,
}
impl ProcessEngineRegistry {
pub fn new() -> Self {
Self::default()
}
pub fn with_engine(self, engine: Arc<dyn ProcessEngine>) -> Self {
let mut engines = (*self.engines).clone();
engines.insert(engine.kind().to_string(), engine);
Self {
engines: Arc::new(engines),
}
}
pub fn get(&self, kind: &str) -> Option<Arc<dyn ProcessEngine>> {
self.engines.get(kind).cloned()
}
pub fn require(&self, kind: &str) -> Result<Arc<dyn ProcessEngine>, crate::PluginError> {
self.get(kind).ok_or_else(|| {
crate::PluginError::Session(format!("process engine `{kind}` is not configured"))
})
}
pub fn validate_input(&self, input: &ProcessInput) -> Result<(), crate::PluginError> {
if let ProcessInput::Engine { kind, .. } = input {
self.require(kind).map(|_| ())
} else {
Ok(())
}
}
}