Skip to main content

lash_core/runtime/process/
engine.rs

1use std::collections::BTreeMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5
6use tokio_util::sync::CancellationToken;
7
8use super::events::ProcessAwaitOutput;
9use super::model::{
10    ProcessExecutionContext, ProcessExecutionEnvSpec, ProcessIdentity, ProcessInput,
11    ProcessRegistration,
12};
13use super::registry::ProcessRegistry;
14
15pub type ProcessEngineShutdownFuture<'run> = Pin<Box<dyn Future<Output = ()> + Send + 'run>>;
16
17pub struct ProcessEngineRunGuard<'run> {
18    shutdown: Option<Box<dyn FnOnce() -> ProcessEngineShutdownFuture<'run> + Send + 'run>>,
19}
20
21impl<'run> ProcessEngineRunGuard<'run> {
22    pub(crate) fn new(
23        shutdown: impl FnOnce() -> ProcessEngineShutdownFuture<'run> + Send + 'run,
24    ) -> Self {
25        Self {
26            shutdown: Some(Box::new(shutdown)),
27        }
28    }
29
30    pub async fn shutdown(mut self) {
31        if let Some(shutdown) = self.shutdown.take() {
32            shutdown().await;
33        }
34    }
35}
36
37pub struct ProcessEngineRuntimeContext<'run> {
38    context: crate::RuntimeExecutionContext<'run>,
39    guard: ProcessEngineRunGuard<'run>,
40}
41
42impl<'run> ProcessEngineRuntimeContext<'run> {
43    pub(crate) fn new(
44        context: crate::RuntimeExecutionContext<'run>,
45        guard: ProcessEngineRunGuard<'run>,
46    ) -> Self {
47        Self { context, guard }
48    }
49
50    pub fn context(&self) -> &crate::RuntimeExecutionContext<'run> {
51        &self.context
52    }
53
54    pub fn context_mut(&mut self) -> &mut crate::RuntimeExecutionContext<'run> {
55        &mut self.context
56    }
57
58    pub fn into_parts(
59        self,
60    ) -> (
61        crate::RuntimeExecutionContext<'run>,
62        ProcessEngineRunGuard<'run>,
63    ) {
64        (self.context, self.guard)
65    }
66
67    pub async fn shutdown(self) {
68        self.guard.shutdown().await;
69    }
70}
71
72type RuntimeContextBuilder<'run> = Box<
73    dyn FnOnce(
74            Arc<crate::ToolCatalog>,
75        ) -> Result<ProcessEngineRuntimeContext<'run>, crate::PluginError>
76        + Send
77        + 'run,
78>;
79
80pub struct ProcessEngineRunContext<'run> {
81    registration: ProcessRegistration,
82    execution_context: ProcessExecutionContext,
83    registry: Arc<dyn ProcessRegistry>,
84    session_id: String,
85    plugins: Arc<crate::PluginSession>,
86    store: Option<Arc<dyn crate::RuntimePersistence>>,
87    session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
88    queued_work_poke: Option<crate::QueuedWorkPoke>,
89    process_registry_available: bool,
90    cancellation: CancellationToken,
91    turn_phase_probe: Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
92    runtime_context_builder: Option<RuntimeContextBuilder<'run>>,
93}
94
95impl<'run> ProcessEngineRunContext<'run> {
96    #[allow(clippy::too_many_arguments)]
97    pub(crate) fn new(
98        registration: ProcessRegistration,
99        execution_context: ProcessExecutionContext,
100        registry: Arc<dyn ProcessRegistry>,
101        session_id: String,
102        plugins: Arc<crate::PluginSession>,
103        store: Option<Arc<dyn crate::RuntimePersistence>>,
104        session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
105        queued_work_poke: Option<crate::QueuedWorkPoke>,
106        process_registry_available: bool,
107        cancellation: CancellationToken,
108        turn_phase_probe: Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
109        runtime_context_builder: RuntimeContextBuilder<'run>,
110    ) -> Self {
111        Self {
112            registration,
113            execution_context,
114            registry,
115            session_id,
116            plugins,
117            store,
118            session_store_factory,
119            queued_work_poke,
120            process_registry_available,
121            cancellation,
122            turn_phase_probe,
123            runtime_context_builder: Some(runtime_context_builder),
124        }
125    }
126
127    pub fn registration(&self) -> &ProcessRegistration {
128        &self.registration
129    }
130
131    pub fn execution_context(&self) -> &ProcessExecutionContext {
132        &self.execution_context
133    }
134
135    pub fn registry(&self) -> Arc<dyn ProcessRegistry> {
136        Arc::clone(&self.registry)
137    }
138
139    pub fn session_id(&self) -> &str {
140        &self.session_id
141    }
142
143    pub fn plugins(&self) -> Arc<crate::PluginSession> {
144        Arc::clone(&self.plugins)
145    }
146
147    pub fn store(&self) -> Option<Arc<dyn crate::RuntimePersistence>> {
148        self.store.clone()
149    }
150
151    pub fn session_store_factory(&self) -> Option<Arc<dyn crate::SessionStoreFactory>> {
152        self.session_store_factory.clone()
153    }
154
155    pub fn queued_work_poke(&self) -> Option<crate::QueuedWorkPoke> {
156        self.queued_work_poke.clone()
157    }
158
159    pub fn process_registry_available(&self) -> bool {
160        self.process_registry_available
161    }
162
163    pub fn cancellation_token(&self) -> CancellationToken {
164        self.cancellation.clone()
165    }
166
167    #[doc(hidden)]
168    pub fn named_phase(&self, phase: &'static str) -> crate::runtime::RuntimeNamedPhase {
169        crate::runtime::RuntimeNamedPhase::begin(self.turn_phase_probe.clone(), phase)
170    }
171
172    #[doc(hidden)]
173    pub fn turn_phase_probe(&self) -> Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>> {
174        self.turn_phase_probe.clone()
175    }
176
177    pub fn resolved_tool_catalog(&self) -> Result<Arc<crate::ToolCatalog>, crate::PluginError> {
178        self.plugins
179            .resolved_tool_catalog(&self.session_id)
180            .map_err(crate::PluginError::from)
181    }
182
183    pub fn into_runtime_context(
184        mut self,
185        tool_catalog: Arc<crate::ToolCatalog>,
186    ) -> Result<ProcessEngineRuntimeContext<'run>, crate::PluginError> {
187        let builder = self.runtime_context_builder.take().ok_or_else(|| {
188            crate::PluginError::Session("process engine runtime context was already built".into())
189        })?;
190        builder(tool_catalog)
191    }
192}
193
194pub struct ProcessEngineValidationContext<'a> {
195    plugin_host: &'a crate::PluginHost,
196    tool_catalog: Arc<crate::ToolCatalog>,
197    process_registry_available: bool,
198}
199
200impl<'a> ProcessEngineValidationContext<'a> {
201    pub(crate) fn new(
202        plugin_host: &'a crate::PluginHost,
203        tool_catalog: Arc<crate::ToolCatalog>,
204        process_registry_available: bool,
205    ) -> Self {
206        Self {
207            plugin_host,
208            tool_catalog,
209            process_registry_available,
210        }
211    }
212
213    pub fn plugin_host(&self) -> &crate::PluginHost {
214        self.plugin_host
215    }
216
217    pub fn tool_catalog(&self) -> &crate::ToolCatalog {
218        self.tool_catalog.as_ref()
219    }
220
221    pub fn process_registry_available(&self) -> bool {
222        self.process_registry_available
223    }
224}
225
226#[async_trait::async_trait]
227/// Deployment extension point for non-kernel process runtimes.
228///
229/// Core built-ins (`ToolCall`, `SessionTurn`, and `External`) are intentionally
230/// not registered here; they are kernel primitives with direct orchestration
231/// support. Implement `ProcessEngine` for process kinds stored as
232/// [`ProcessInput::Engine`](super::model::ProcessInput::Engine).
233pub trait ProcessEngine: Send + Sync {
234    fn kind(&self) -> &'static str;
235
236    async fn validate_start(
237        &self,
238        _context: ProcessEngineValidationContext<'_>,
239        _payload: &serde_json::Value,
240        _env_spec: Option<&ProcessExecutionEnvSpec>,
241    ) -> Result<(), crate::PluginError> {
242        Ok(())
243    }
244
245    async fn run(
246        &self,
247        context: ProcessEngineRunContext<'_>,
248        payload: serde_json::Value,
249    ) -> ProcessAwaitOutput;
250
251    fn identity(&self, payload: &serde_json::Value) -> ProcessIdentity {
252        let _ = payload;
253        ProcessIdentity::new(self.kind())
254    }
255}
256
257#[derive(Clone, Default)]
258pub struct ProcessEngineRegistry {
259    engines: Arc<BTreeMap<String, Arc<dyn ProcessEngine>>>,
260}
261
262impl ProcessEngineRegistry {
263    pub fn new() -> Self {
264        Self::default()
265    }
266
267    pub fn with_engine(self, engine: Arc<dyn ProcessEngine>) -> Self {
268        let mut engines = (*self.engines).clone();
269        engines.insert(engine.kind().to_string(), engine);
270        Self {
271            engines: Arc::new(engines),
272        }
273    }
274
275    pub fn get(&self, kind: &str) -> Option<Arc<dyn ProcessEngine>> {
276        self.engines.get(kind).cloned()
277    }
278
279    pub fn require(&self, kind: &str) -> Result<Arc<dyn ProcessEngine>, crate::PluginError> {
280        self.get(kind).ok_or_else(|| {
281            crate::PluginError::Session(format!("process engine `{kind}` is not configured"))
282        })
283    }
284
285    pub fn validate_input(&self, input: &ProcessInput) -> Result<(), crate::PluginError> {
286        if let ProcessInput::Engine { kind, .. } = input {
287            self.require(kind).map(|_| ())
288        } else {
289            Ok(())
290        }
291    }
292}