lash_core/runtime/process/
engine.rs1use std::collections::BTreeMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5
6use tokio_util::sync::CancellationToken;
7
8use super::events::ProcessAwaitOutput;
9use super::model::{
10 ProcessExecutionContext, ProcessExecutionEnvSpec, ProcessIdentity, ProcessInput,
11 ProcessRegistration,
12};
13use super::registry::ProcessRegistry;
14
15pub type ProcessEngineShutdownFuture<'run> = Pin<Box<dyn Future<Output = ()> + Send + 'run>>;
16
17pub struct ProcessEngineRunGuard<'run> {
18 shutdown: Option<Box<dyn FnOnce() -> ProcessEngineShutdownFuture<'run> + Send + 'run>>,
19}
20
21impl<'run> ProcessEngineRunGuard<'run> {
22 pub(crate) fn new(
23 shutdown: impl FnOnce() -> ProcessEngineShutdownFuture<'run> + Send + 'run,
24 ) -> Self {
25 Self {
26 shutdown: Some(Box::new(shutdown)),
27 }
28 }
29
30 pub async fn shutdown(mut self) {
31 if let Some(shutdown) = self.shutdown.take() {
32 shutdown().await;
33 }
34 }
35}
36
37pub struct ProcessEngineRuntimeContext<'run> {
38 context: crate::RuntimeExecutionContext<'run>,
39 guard: ProcessEngineRunGuard<'run>,
40}
41
42impl<'run> ProcessEngineRuntimeContext<'run> {
43 pub(crate) fn new(
44 context: crate::RuntimeExecutionContext<'run>,
45 guard: ProcessEngineRunGuard<'run>,
46 ) -> Self {
47 Self { context, guard }
48 }
49
50 pub fn context(&self) -> &crate::RuntimeExecutionContext<'run> {
51 &self.context
52 }
53
54 pub fn context_mut(&mut self) -> &mut crate::RuntimeExecutionContext<'run> {
55 &mut self.context
56 }
57
58 pub fn into_parts(
59 self,
60 ) -> (
61 crate::RuntimeExecutionContext<'run>,
62 ProcessEngineRunGuard<'run>,
63 ) {
64 (self.context, self.guard)
65 }
66
67 pub async fn shutdown(self) {
68 self.guard.shutdown().await;
69 }
70}
71
72type RuntimeContextBuilder<'run> = Box<
73 dyn FnOnce(
74 Arc<crate::ToolCatalog>,
75 ) -> Result<ProcessEngineRuntimeContext<'run>, crate::PluginError>
76 + Send
77 + 'run,
78>;
79
80pub struct ProcessEngineRunContext<'run> {
81 registration: ProcessRegistration,
82 execution_context: ProcessExecutionContext,
83 registry: Arc<dyn ProcessRegistry>,
84 session_id: String,
85 plugins: Arc<crate::PluginSession>,
86 store: Option<Arc<dyn crate::RuntimePersistence>>,
87 session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
88 queued_work_poke: Option<crate::QueuedWorkPoke>,
89 process_registry_available: bool,
90 cancellation: CancellationToken,
91 turn_phase_probe: Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
92 runtime_context_builder: Option<RuntimeContextBuilder<'run>>,
93}
94
95impl<'run> ProcessEngineRunContext<'run> {
96 #[allow(clippy::too_many_arguments)]
97 pub(crate) fn new(
98 registration: ProcessRegistration,
99 execution_context: ProcessExecutionContext,
100 registry: Arc<dyn ProcessRegistry>,
101 session_id: String,
102 plugins: Arc<crate::PluginSession>,
103 store: Option<Arc<dyn crate::RuntimePersistence>>,
104 session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
105 queued_work_poke: Option<crate::QueuedWorkPoke>,
106 process_registry_available: bool,
107 cancellation: CancellationToken,
108 turn_phase_probe: Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
109 runtime_context_builder: RuntimeContextBuilder<'run>,
110 ) -> Self {
111 Self {
112 registration,
113 execution_context,
114 registry,
115 session_id,
116 plugins,
117 store,
118 session_store_factory,
119 queued_work_poke,
120 process_registry_available,
121 cancellation,
122 turn_phase_probe,
123 runtime_context_builder: Some(runtime_context_builder),
124 }
125 }
126
127 pub fn registration(&self) -> &ProcessRegistration {
128 &self.registration
129 }
130
131 pub fn execution_context(&self) -> &ProcessExecutionContext {
132 &self.execution_context
133 }
134
135 pub fn registry(&self) -> Arc<dyn ProcessRegistry> {
136 Arc::clone(&self.registry)
137 }
138
139 pub fn session_id(&self) -> &str {
140 &self.session_id
141 }
142
143 pub fn plugins(&self) -> Arc<crate::PluginSession> {
144 Arc::clone(&self.plugins)
145 }
146
147 pub fn store(&self) -> Option<Arc<dyn crate::RuntimePersistence>> {
148 self.store.clone()
149 }
150
151 pub fn session_store_factory(&self) -> Option<Arc<dyn crate::SessionStoreFactory>> {
152 self.session_store_factory.clone()
153 }
154
155 pub fn queued_work_poke(&self) -> Option<crate::QueuedWorkPoke> {
156 self.queued_work_poke.clone()
157 }
158
159 pub fn process_registry_available(&self) -> bool {
160 self.process_registry_available
161 }
162
163 pub fn cancellation_token(&self) -> CancellationToken {
164 self.cancellation.clone()
165 }
166
167 #[doc(hidden)]
168 pub fn named_phase(&self, phase: &'static str) -> crate::runtime::RuntimeNamedPhase {
169 crate::runtime::RuntimeNamedPhase::begin(self.turn_phase_probe.clone(), phase)
170 }
171
172 #[doc(hidden)]
173 pub fn turn_phase_probe(&self) -> Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>> {
174 self.turn_phase_probe.clone()
175 }
176
177 pub fn resolved_tool_catalog(&self) -> Result<Arc<crate::ToolCatalog>, crate::PluginError> {
178 self.plugins
179 .resolved_tool_catalog(&self.session_id)
180 .map_err(crate::PluginError::from)
181 }
182
183 pub fn into_runtime_context(
184 mut self,
185 tool_catalog: Arc<crate::ToolCatalog>,
186 ) -> Result<ProcessEngineRuntimeContext<'run>, crate::PluginError> {
187 let builder = self.runtime_context_builder.take().ok_or_else(|| {
188 crate::PluginError::Session("process engine runtime context was already built".into())
189 })?;
190 builder(tool_catalog)
191 }
192}
193
194pub struct ProcessEngineValidationContext<'a> {
195 plugin_host: &'a crate::PluginHost,
196 tool_catalog: Arc<crate::ToolCatalog>,
197 process_registry_available: bool,
198}
199
200impl<'a> ProcessEngineValidationContext<'a> {
201 pub(crate) fn new(
202 plugin_host: &'a crate::PluginHost,
203 tool_catalog: Arc<crate::ToolCatalog>,
204 process_registry_available: bool,
205 ) -> Self {
206 Self {
207 plugin_host,
208 tool_catalog,
209 process_registry_available,
210 }
211 }
212
213 pub fn plugin_host(&self) -> &crate::PluginHost {
214 self.plugin_host
215 }
216
217 pub fn tool_catalog(&self) -> &crate::ToolCatalog {
218 self.tool_catalog.as_ref()
219 }
220
221 pub fn process_registry_available(&self) -> bool {
222 self.process_registry_available
223 }
224}
225
226#[async_trait::async_trait]
227pub trait ProcessEngine: Send + Sync {
234 fn kind(&self) -> &'static str;
235
236 async fn validate_start(
237 &self,
238 _context: ProcessEngineValidationContext<'_>,
239 _payload: &serde_json::Value,
240 _env_spec: Option<&ProcessExecutionEnvSpec>,
241 ) -> Result<(), crate::PluginError> {
242 Ok(())
243 }
244
245 async fn run(
246 &self,
247 context: ProcessEngineRunContext<'_>,
248 payload: serde_json::Value,
249 ) -> ProcessAwaitOutput;
250
251 fn identity(&self, payload: &serde_json::Value) -> ProcessIdentity {
252 let _ = payload;
253 ProcessIdentity::new(self.kind())
254 }
255}
256
257#[derive(Clone, Default)]
258pub struct ProcessEngineRegistry {
259 engines: Arc<BTreeMap<String, Arc<dyn ProcessEngine>>>,
260}
261
262impl ProcessEngineRegistry {
263 pub fn new() -> Self {
264 Self::default()
265 }
266
267 pub fn with_engine(self, engine: Arc<dyn ProcessEngine>) -> Self {
268 let mut engines = (*self.engines).clone();
269 engines.insert(engine.kind().to_string(), engine);
270 Self {
271 engines: Arc::new(engines),
272 }
273 }
274
275 pub fn get(&self, kind: &str) -> Option<Arc<dyn ProcessEngine>> {
276 self.engines.get(kind).cloned()
277 }
278
279 pub fn require(&self, kind: &str) -> Result<Arc<dyn ProcessEngine>, crate::PluginError> {
280 self.get(kind).ok_or_else(|| {
281 crate::PluginError::Session(format!("process engine `{kind}` is not configured"))
282 })
283 }
284
285 pub fn validate_input(&self, input: &ProcessInput) -> Result<(), crate::PluginError> {
286 if let ProcessInput::Engine { kind, .. } = input {
287 self.require(kind).map(|_| ())
288 } else {
289 Ok(())
290 }
291 }
292}