1use crate::support::*;
14
15#[derive(Clone)]
16pub struct Processes {
17 pub(crate) core: LashCore,
18}
19
20impl Processes {
21 fn registry(&self) -> Result<Arc<dyn lash_core::ProcessRegistry>> {
22 self.core
23 .env
24 .process_registry
25 .as_ref()
26 .cloned()
27 .ok_or_else(|| {
28 EmbedError::Plugin(lash_core::PluginError::Session(
29 "process registry is unavailable in this runtime".to_string(),
30 ))
31 })
32 }
33
34 fn make_observer(&self) -> Result<lash_core::ProcessWorkObserver> {
35 Ok(lash_core::ProcessWorkObserver::new(self.registry()?))
36 }
37
38 fn process_invocation(command: &lash_core::ProcessCommand) -> lash_core::RuntimeInvocation {
39 let effect_id = command.effect_id();
40 lash_core::RuntimeInvocation::effect(
41 lash_core::runtime::RuntimeScope::new("runtime"),
42 effect_id.clone(),
43 lash_core::RuntimeEffectKind::Process,
44 effect_id,
45 )
46 }
47
48 async fn run_command(
49 &self,
50 command: lash_core::ProcessCommand,
51 scoped_effect_controller: ScopedEffectController<'_>,
52 ) -> Result<lash_core::ProcessEffectOutcome> {
53 let registry = self.registry()?;
54 let invocation = Self::process_invocation(&command);
55 let outcome = scoped_effect_controller
56 .controller()
57 .execute_effect(
58 lash_core::RuntimeEffectEnvelope::new(
59 invocation,
60 lash_core::RuntimeEffectCommand::process(command),
61 ),
62 lash_core::RuntimeEffectLocalExecutor::processes(
63 registry,
64 self.core.env.process_work_driver.clone(),
65 ),
66 )
67 .await
68 .map_err(|err| EmbedError::Plugin(lash_core::PluginError::Session(err.to_string())))?;
69 match outcome {
70 lash_core::RuntimeEffectOutcome::Process { result } => Ok(result),
71 _ => Err(EmbedError::Plugin(lash_core::PluginError::Session(
72 "process effect returned non-process outcome".to_string(),
73 ))),
74 }
75 }
76
77 pub async fn start(
78 &self,
79 request: lash_core::ProcessStartRequest,
80 scoped_effect_controller: ScopedEffectController<'_>,
81 ) -> Result<lash_core::ProcessRecord> {
82 let env_ref = match request.env_spec.as_ref() {
83 Some(env_spec) => Some(
84 lash_core::runtime::persist_process_execution_env(
85 self.core.env.core.durability.process_env_store.as_ref(),
86 env_spec,
87 )
88 .await?,
89 ),
90 None => None,
91 };
92 let grant = request.grant.clone();
93 let registration = request.into_registration(env_ref);
94 let command = lash_core::ProcessCommand::Start {
95 registration,
96 grant,
97 execution_context: Box::new(lash_core::ProcessExecutionContext::default()),
98 };
99 let outcome = self
100 .run_command(command, scoped_effect_controller.clone())
101 .await?;
102 let lash_core::ProcessEffectOutcome::Start { record } = outcome else {
103 return Err(EmbedError::Plugin(lash_core::PluginError::Session(
104 "process start returned the wrong outcome".to_string(),
105 )));
106 };
107 if let Some(driver) = self.core.work_driver.drivers().await.process {
108 driver.claim_and_run_pending("admin_process_start").await?;
109 }
110 Ok(*record)
111 }
112
113 pub async fn list(
114 &self,
115 filter: &lash_core::ProcessListFilter,
116 ) -> Result<Vec<lash_core::ObservedProcess>> {
117 self.make_observer()?.list(filter).await.map_err(Into::into)
118 }
119
120 pub async fn list_granted_to(
125 &self,
126 session_scope: &lash_core::SessionScope,
127 filter: &lash_core::ProcessListFilter,
128 ) -> Result<Vec<lash_core::ObservedProcess>> {
129 self.make_observer()?
130 .list_granted_to(session_scope, filter)
131 .await
132 .map_err(Into::into)
133 }
134
135 pub async fn list_originated_by(
141 &self,
142 session_scope: &lash_core::SessionScope,
143 filter: &lash_core::ProcessListFilter,
144 ) -> Result<Vec<lash_core::ObservedProcess>> {
145 self.make_observer()?
146 .list_originated_by(session_scope, filter)
147 .await
148 .map_err(Into::into)
149 }
150
151 pub async fn get(&self, process_id: &str) -> Result<Option<lash_core::ObservedProcess>> {
152 Ok(self.make_observer()?.process(process_id).await)
153 }
154
155 pub async fn events(
156 &self,
157 process_id: &str,
158 after_sequence: u64,
159 ) -> Result<Vec<lash_core::ObservedProcessEvent>> {
160 self.make_observer()?
161 .events_after(process_id, after_sequence)
162 .await
163 .map_err(Into::into)
164 }
165
166 pub async fn await_output(&self, process_id: &str) -> Result<lash_core::ProcessAwaitOutput> {
167 if let Some(driver) = self.core.env.process_work_driver.as_ref() {
168 return driver.await_terminal(process_id).await.map_err(Into::into);
169 }
170 lash_core::ProcessAwaiter::polling(self.registry()?)
171 .await_terminal(process_id)
172 .await
173 .map_err(Into::into)
174 }
175
176 pub async fn cancel(
177 &self,
178 process_id: &str,
179 scoped_effect_controller: ScopedEffectController<'_>,
180 ) -> Result<lash_core::ProcessCancelSummary> {
181 let command = lash_core::ProcessCommand::Cancel {
182 process_id: process_id.to_string(),
183 reason: Some("requested by host".to_string()),
184 };
185 let outcome = self
186 .run_command(command, scoped_effect_controller.clone())
187 .await?;
188 let lash_core::ProcessEffectOutcome::Cancel { record } = outcome else {
189 return Err(EmbedError::Plugin(lash_core::PluginError::Session(
190 "process cancel returned the wrong outcome".to_string(),
191 )));
192 };
193 Ok(lash_core::ProcessCancelSummary::from_record(*record))
194 }
195
196 pub async fn signal(
197 &self,
198 process_id: &str,
199 signal_name: impl Into<String>,
200 signal_id: impl Into<String>,
201 request: lash_core::ProcessEventAppendRequest,
202 scoped_effect_controller: ScopedEffectController<'_>,
203 ) -> Result<lash_core::ProcessEvent> {
204 let signal_name = signal_name.into();
205 let event_type = request.event_type.clone();
206 let payload = request.payload.clone();
207 let command = lash_core::ProcessCommand::Signal {
208 process_id: process_id.to_string(),
209 signal_name: signal_name.clone(),
210 signal_id: signal_id.into(),
211 request,
212 };
213 let outcome = self
214 .run_command(command, scoped_effect_controller.clone())
215 .await?;
216 let lash_core::ProcessEffectOutcome::Signal { event } = outcome else {
217 return Err(EmbedError::Plugin(lash_core::PluginError::Session(
218 "process signal returned the wrong outcome".to_string(),
219 )));
220 };
221 let registry = self.registry()?;
222 let waiting_ordinal =
223 registry
224 .get_process(process_id)
225 .await
226 .and_then(|record| match record.wait {
227 Some(lash_core::WaitState {
228 kind:
229 lash_core::WaitKind::Signal {
230 name,
231 event_type: wait_event_type,
232 ordinal,
233 ..
234 },
235 ..
236 }) if name == signal_name && wait_event_type == event_type => Some(ordinal),
237 _ => None,
238 });
239 let ordinal = match waiting_ordinal {
240 Some(ordinal) => ordinal,
241 None => {
242 registry
243 .count_events_through(process_id, &event_type, event.sequence)
244 .await?
245 }
246 };
247 if ordinal > 0 {
248 let key = scoped_effect_controller
249 .controller()
250 .await_event_key(
251 &lash_core::ExecutionScope::process(process_id),
252 lash_core::AwaitEventWaitIdentity::process_signal(
253 process_id,
254 &signal_name,
255 ordinal,
256 ),
257 )
258 .await
259 .map_err(|err| {
260 EmbedError::Plugin(lash_core::PluginError::Session(err.to_string()))
261 })?;
262 let _ = scoped_effect_controller
263 .controller()
264 .resolve_await_event(&key, lash_core::Resolution::Ok(payload))
265 .await
266 .map_err(|err| {
267 EmbedError::Plugin(lash_core::PluginError::Session(err.to_string()))
268 })?;
269 }
270 Ok(*event)
271 }
272
273 pub async fn session_snapshot(
274 &self,
275 session_id: impl Into<String>,
276 ) -> Result<lash_core::ProcessWorkSnapshot> {
277 self.make_observer()?
278 .snapshot_for_session(session_id)
279 .await
280 .map_err(Into::into)
281 }
282
283 pub fn observer(&self) -> Result<lash_core::ProcessWorkObserver> {
284 self.make_observer()
285 }
286
287 pub async fn cancel_all(
290 &self,
291 scoped_effect_controller: ScopedEffectController<'_>,
292 ) -> Result<Vec<lash_core::ProcessCancelSummary>> {
293 let running = self
294 .list(&lash_core::ProcessListFilter {
295 status: lash_core::ProcessStatusFilter::Running,
296 ..lash_core::ProcessListFilter::default()
297 })
298 .await?;
299 let mut summaries = Vec::with_capacity(running.len());
300 for process in running {
301 summaries.push(
302 self.cancel(&process.process_id, scoped_effect_controller.clone())
303 .await?,
304 );
305 }
306 Ok(summaries)
307 }
308
309 pub async fn transfer(
313 &self,
314 from_scope: &lash_core::SessionScope,
315 to_scope: &lash_core::SessionScope,
316 process_ids: &[String],
317 ) -> Result<()> {
318 self.registry()?
319 .transfer_handle_grants(from_scope, to_scope, process_ids)
320 .await
321 .map_err(Into::into)
322 }
323
324 pub async fn prune(&self, cutoff_epoch_ms: u64) -> Result<lash_core::ProcessPruneReport> {
329 self.registry()?
330 .prune_terminal_processes(cutoff_epoch_ms)
331 .await
332 .map_err(Into::into)
333 }
334
335 pub async fn request_abandon(
342 &self,
343 process_id: &str,
344 requested_by: impl Into<String>,
345 reason: Option<String>,
346 ) -> Result<lash_core::ObservedProcess> {
347 let request = lash_core::AbandonRequest {
348 requested_by: requested_by.into(),
349 requested_at_ms: now_epoch_ms(),
350 reason,
351 };
352 self.registry()?
353 .request_process_abandon(process_id, request)
354 .await?;
355 self.get(process_id).await?.ok_or_else(|| {
356 EmbedError::Plugin(lash_core::PluginError::Session(format!(
357 "process `{process_id}` vanished after recording its abandon request"
358 )))
359 })
360 }
361}
362
363pub(crate) fn now_epoch_ms() -> u64 {
368 std::time::SystemTime::now()
369 .duration_since(std::time::UNIX_EPOCH)
370 .map(|elapsed| elapsed.as_millis() as u64)
371 .unwrap_or(0)
372}