lash-core 0.1.0-alpha.56

Sans-IO turn machine and runtime kernel for the lash agent runtime.
Documentation
use std::collections::BTreeMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;

use tokio_util::sync::CancellationToken;

use super::events::ProcessAwaitOutput;
use super::model::{
    ProcessExecutionContext, ProcessExecutionEnvSpec, ProcessIdentity, ProcessInput,
    ProcessRegistration,
};
use super::registry::ProcessRegistry;

pub type ProcessEngineShutdownFuture<'run> = Pin<Box<dyn Future<Output = ()> + Send + 'run>>;

pub struct ProcessEngineRunGuard<'run> {
    shutdown: Option<Box<dyn FnOnce() -> ProcessEngineShutdownFuture<'run> + Send + 'run>>,
}

impl<'run> ProcessEngineRunGuard<'run> {
    pub(crate) fn new(
        shutdown: impl FnOnce() -> ProcessEngineShutdownFuture<'run> + Send + 'run,
    ) -> Self {
        Self {
            shutdown: Some(Box::new(shutdown)),
        }
    }

    pub async fn shutdown(mut self) {
        if let Some(shutdown) = self.shutdown.take() {
            shutdown().await;
        }
    }
}

pub struct ProcessEngineRuntimeContext<'run> {
    context: crate::RuntimeExecutionContext<'run>,
    guard: ProcessEngineRunGuard<'run>,
}

impl<'run> ProcessEngineRuntimeContext<'run> {
    pub(crate) fn new(
        context: crate::RuntimeExecutionContext<'run>,
        guard: ProcessEngineRunGuard<'run>,
    ) -> Self {
        Self { context, guard }
    }

    pub fn context(&self) -> &crate::RuntimeExecutionContext<'run> {
        &self.context
    }

    pub fn context_mut(&mut self) -> &mut crate::RuntimeExecutionContext<'run> {
        &mut self.context
    }

    pub fn into_parts(
        self,
    ) -> (
        crate::RuntimeExecutionContext<'run>,
        ProcessEngineRunGuard<'run>,
    ) {
        (self.context, self.guard)
    }

    pub async fn shutdown(self) {
        self.guard.shutdown().await;
    }
}

type RuntimeContextBuilder<'run> = Box<
    dyn FnOnce(
            Arc<crate::ToolCatalog>,
        ) -> Result<ProcessEngineRuntimeContext<'run>, crate::PluginError>
        + Send
        + 'run,
>;

pub struct ProcessEngineRunContext<'run> {
    registration: ProcessRegistration,
    execution_context: ProcessExecutionContext,
    registry: Arc<dyn ProcessRegistry>,
    session_id: String,
    plugins: Arc<crate::PluginSession>,
    store: Option<Arc<dyn crate::RuntimePersistence>>,
    session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
    queued_work_poke: Option<crate::QueuedWorkPoke>,
    process_registry_available: bool,
    cancellation: CancellationToken,
    turn_phase_probe: Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
    runtime_context_builder: Option<RuntimeContextBuilder<'run>>,
}

impl<'run> ProcessEngineRunContext<'run> {
    #[allow(clippy::too_many_arguments)]
    pub(crate) fn new(
        registration: ProcessRegistration,
        execution_context: ProcessExecutionContext,
        registry: Arc<dyn ProcessRegistry>,
        session_id: String,
        plugins: Arc<crate::PluginSession>,
        store: Option<Arc<dyn crate::RuntimePersistence>>,
        session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
        queued_work_poke: Option<crate::QueuedWorkPoke>,
        process_registry_available: bool,
        cancellation: CancellationToken,
        turn_phase_probe: Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
        runtime_context_builder: RuntimeContextBuilder<'run>,
    ) -> Self {
        Self {
            registration,
            execution_context,
            registry,
            session_id,
            plugins,
            store,
            session_store_factory,
            queued_work_poke,
            process_registry_available,
            cancellation,
            turn_phase_probe,
            runtime_context_builder: Some(runtime_context_builder),
        }
    }

    pub fn registration(&self) -> &ProcessRegistration {
        &self.registration
    }

    pub fn execution_context(&self) -> &ProcessExecutionContext {
        &self.execution_context
    }

    pub fn registry(&self) -> Arc<dyn ProcessRegistry> {
        Arc::clone(&self.registry)
    }

    pub fn session_id(&self) -> &str {
        &self.session_id
    }

    pub fn plugins(&self) -> Arc<crate::PluginSession> {
        Arc::clone(&self.plugins)
    }

    pub fn store(&self) -> Option<Arc<dyn crate::RuntimePersistence>> {
        self.store.clone()
    }

    pub fn session_store_factory(&self) -> Option<Arc<dyn crate::SessionStoreFactory>> {
        self.session_store_factory.clone()
    }

    pub fn queued_work_poke(&self) -> Option<crate::QueuedWorkPoke> {
        self.queued_work_poke.clone()
    }

    pub fn process_registry_available(&self) -> bool {
        self.process_registry_available
    }

    pub fn cancellation_token(&self) -> CancellationToken {
        self.cancellation.clone()
    }

    #[doc(hidden)]
    pub fn named_phase(&self, phase: &'static str) -> crate::runtime::RuntimeNamedPhase {
        crate::runtime::RuntimeNamedPhase::begin(self.turn_phase_probe.clone(), phase)
    }

    #[doc(hidden)]
    pub fn turn_phase_probe(&self) -> Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>> {
        self.turn_phase_probe.clone()
    }

    pub fn resolved_tool_catalog(&self) -> Result<Arc<crate::ToolCatalog>, crate::PluginError> {
        self.plugins
            .resolved_tool_catalog(&self.session_id)
            .map_err(crate::PluginError::from)
    }

    pub fn into_runtime_context(
        mut self,
        tool_catalog: Arc<crate::ToolCatalog>,
    ) -> Result<ProcessEngineRuntimeContext<'run>, crate::PluginError> {
        let builder = self.runtime_context_builder.take().ok_or_else(|| {
            crate::PluginError::Session("process engine runtime context was already built".into())
        })?;
        builder(tool_catalog)
    }
}

pub struct ProcessEngineValidationContext<'a> {
    plugin_host: &'a crate::PluginHost,
    tool_catalog: Arc<crate::ToolCatalog>,
    process_registry_available: bool,
}

impl<'a> ProcessEngineValidationContext<'a> {
    pub(crate) fn new(
        plugin_host: &'a crate::PluginHost,
        tool_catalog: Arc<crate::ToolCatalog>,
        process_registry_available: bool,
    ) -> Self {
        Self {
            plugin_host,
            tool_catalog,
            process_registry_available,
        }
    }

    pub fn plugin_host(&self) -> &crate::PluginHost {
        self.plugin_host
    }

    pub fn tool_catalog(&self) -> &crate::ToolCatalog {
        self.tool_catalog.as_ref()
    }

    pub fn process_registry_available(&self) -> bool {
        self.process_registry_available
    }
}

#[async_trait::async_trait]
/// Deployment extension point for non-kernel process runtimes.
///
/// Core built-ins (`ToolCall`, `SessionTurn`, and `External`) are intentionally
/// not registered here; they are kernel primitives with direct orchestration
/// support. Implement `ProcessEngine` for process kinds stored as
/// [`ProcessInput::Engine`](super::model::ProcessInput::Engine).
pub trait ProcessEngine: Send + Sync {
    fn kind(&self) -> &'static str;

    async fn validate_start(
        &self,
        _context: ProcessEngineValidationContext<'_>,
        _payload: &serde_json::Value,
        _env_spec: Option<&ProcessExecutionEnvSpec>,
    ) -> Result<(), crate::PluginError> {
        Ok(())
    }

    async fn run(
        &self,
        context: ProcessEngineRunContext<'_>,
        payload: serde_json::Value,
    ) -> ProcessAwaitOutput;

    fn identity(&self, payload: &serde_json::Value) -> ProcessIdentity {
        let _ = payload;
        ProcessIdentity::new(self.kind())
    }
}

#[derive(Clone, Default)]
pub struct ProcessEngineRegistry {
    engines: Arc<BTreeMap<String, Arc<dyn ProcessEngine>>>,
}

impl ProcessEngineRegistry {
    pub fn new() -> Self {
        Self::default()
    }

    pub fn with_engine(self, engine: Arc<dyn ProcessEngine>) -> Self {
        let mut engines = (*self.engines).clone();
        engines.insert(engine.kind().to_string(), engine);
        Self {
            engines: Arc::new(engines),
        }
    }

    pub fn get(&self, kind: &str) -> Option<Arc<dyn ProcessEngine>> {
        self.engines.get(kind).cloned()
    }

    pub fn require(&self, kind: &str) -> Result<Arc<dyn ProcessEngine>, crate::PluginError> {
        self.get(kind).ok_or_else(|| {
            crate::PluginError::Session(format!("process engine `{kind}` is not configured"))
        })
    }

    pub fn validate_input(&self, input: &ProcessInput) -> Result<(), crate::PluginError> {
        if let ProcessInput::Engine { kind, .. } = input {
            self.require(kind).map(|_| ())
        } else {
            Ok(())
        }
    }
}