Skip to main content

lash/
process_admin.rs

1//! The global process facade surface.
2//!
3//! [`Processes`] (reached via [`LashCore::processes`](crate::LashCore::processes),
4//! re-exported as [`lash::process::Processes`](crate::process::Processes)) is THE
5//! host-level process surface (ADR 0019 grill): start, observe, signal, cancel,
6//! transfer, prune, and abandon-request every process, with the two distinct
7//! scope filters — `granted_to` (what a session may address) and `originated_by`
8//! (what a session created). The session-scoped
9//! [`SessionProcessAdmin`](crate::admin::SessionProcessAdmin) is thin sugar over
10//! this surface pre-filtered by a session's grant; it lives in `admin` because it
11//! wraps a [`SessionAdmin`](crate::admin::SessionAdmin).
12
13use crate::support::*;
14
15#[derive(Clone)]
16pub struct Processes {
17    pub(crate) core: LashCore,
18}
19
20impl Processes {
21    fn registry(&self) -> Result<Arc<dyn lash_core::ProcessRegistry>> {
22        self.core
23            .env
24            .process_registry
25            .as_ref()
26            .cloned()
27            .ok_or_else(|| {
28                EmbedError::Plugin(lash_core::PluginError::Session(
29                    "process registry is unavailable in this runtime".to_string(),
30                ))
31            })
32    }
33
34    fn make_observer(&self) -> Result<lash_core::ProcessWorkObserver> {
35        Ok(lash_core::ProcessWorkObserver::new(self.registry()?))
36    }
37
38    fn process_invocation(command: &lash_core::ProcessCommand) -> lash_core::RuntimeInvocation {
39        let effect_id = command.effect_id();
40        lash_core::RuntimeInvocation::effect(
41            lash_core::runtime::RuntimeScope::new("runtime"),
42            effect_id.clone(),
43            lash_core::RuntimeEffectKind::Process,
44            effect_id,
45        )
46    }
47
48    async fn run_command(
49        &self,
50        command: lash_core::ProcessCommand,
51        scoped_effect_controller: ScopedEffectController<'_>,
52    ) -> Result<lash_core::ProcessEffectOutcome> {
53        let registry = self.registry()?;
54        let invocation = Self::process_invocation(&command);
55        let outcome = scoped_effect_controller
56            .controller()
57            .execute_effect(
58                lash_core::RuntimeEffectEnvelope::new(
59                    invocation,
60                    lash_core::RuntimeEffectCommand::process(command),
61                ),
62                lash_core::RuntimeEffectLocalExecutor::processes(
63                    registry,
64                    self.core.env.process_work_driver.clone(),
65                ),
66            )
67            .await
68            .map_err(|err| EmbedError::Plugin(lash_core::PluginError::Session(err.to_string())))?;
69        match outcome {
70            lash_core::RuntimeEffectOutcome::Process { result } => Ok(result),
71            _ => Err(EmbedError::Plugin(lash_core::PluginError::Session(
72                "process effect returned non-process outcome".to_string(),
73            ))),
74        }
75    }
76
77    pub async fn start(
78        &self,
79        request: lash_core::ProcessStartRequest,
80        scoped_effect_controller: ScopedEffectController<'_>,
81    ) -> Result<lash_core::ProcessRecord> {
82        let env_ref = match request.env_spec.as_ref() {
83            Some(env_spec) => Some(
84                lash_core::runtime::persist_process_execution_env(
85                    self.core.env.core.durability.process_env_store.as_ref(),
86                    env_spec,
87                )
88                .await?,
89            ),
90            None => None,
91        };
92        let grant = request.grant.clone();
93        let registration = request.into_registration(env_ref);
94        let command = lash_core::ProcessCommand::Start {
95            registration,
96            grant,
97            execution_context: Box::new(lash_core::ProcessExecutionContext::default()),
98        };
99        let outcome = self
100            .run_command(command, scoped_effect_controller.clone())
101            .await?;
102        let lash_core::ProcessEffectOutcome::Start { record } = outcome else {
103            return Err(EmbedError::Plugin(lash_core::PluginError::Session(
104                "process start returned the wrong outcome".to_string(),
105            )));
106        };
107        if let Some(driver) = self.core.work_driver.drivers().await.process {
108            driver.claim_and_run_pending("admin_process_start").await?;
109        }
110        Ok(*record)
111    }
112
113    pub async fn list(
114        &self,
115        filter: &lash_core::ProcessListFilter,
116    ) -> Result<Vec<lash_core::ObservedProcess>> {
117        self.make_observer()?.list(filter).await.map_err(Into::into)
118    }
119
120    /// List processes a session may address — the **grant** filter (ADR 0019).
121    /// This is the security lens (what a session is authorized to see), distinct
122    /// from [`list_originated_by`](Self::list_originated_by). `session.processes()`
123    /// is thin sugar over this method pre-scoped to the session's own grant.
124    pub async fn list_granted_to(
125        &self,
126        session_scope: &lash_core::SessionScope,
127        filter: &lash_core::ProcessListFilter,
128    ) -> Result<Vec<lash_core::ObservedProcess>> {
129        self.make_observer()?
130            .list_granted_to(session_scope, filter)
131            .await
132            .map_err(Into::into)
133    }
134
135    /// List processes a session originated — the **provenance** filter (ADR
136    /// 0019). This is the lineage lens (what a session created), distinct from
137    /// [`list_granted_to`](Self::list_granted_to): a process a session started
138    /// then transferred away still matches here, and one merely granted to it
139    /// does not.
140    pub async fn list_originated_by(
141        &self,
142        session_scope: &lash_core::SessionScope,
143        filter: &lash_core::ProcessListFilter,
144    ) -> Result<Vec<lash_core::ObservedProcess>> {
145        self.make_observer()?
146            .list_originated_by(session_scope, filter)
147            .await
148            .map_err(Into::into)
149    }
150
151    pub async fn get(&self, process_id: &str) -> Result<Option<lash_core::ObservedProcess>> {
152        Ok(self.make_observer()?.process(process_id).await)
153    }
154
155    pub async fn events(
156        &self,
157        process_id: &str,
158        after_sequence: u64,
159    ) -> Result<Vec<lash_core::ObservedProcessEvent>> {
160        self.make_observer()?
161            .events_after(process_id, after_sequence)
162            .await
163            .map_err(Into::into)
164    }
165
166    pub async fn await_output(&self, process_id: &str) -> Result<lash_core::ProcessAwaitOutput> {
167        if let Some(driver) = self.core.env.process_work_driver.as_ref() {
168            return driver.await_terminal(process_id).await.map_err(Into::into);
169        }
170        lash_core::ProcessAwaiter::polling(self.registry()?)
171            .await_terminal(process_id)
172            .await
173            .map_err(Into::into)
174    }
175
176    pub async fn cancel(
177        &self,
178        process_id: &str,
179        scoped_effect_controller: ScopedEffectController<'_>,
180    ) -> Result<lash_core::ProcessCancelSummary> {
181        let command = lash_core::ProcessCommand::Cancel {
182            process_id: process_id.to_string(),
183            reason: Some("requested by host".to_string()),
184        };
185        let outcome = self
186            .run_command(command, scoped_effect_controller.clone())
187            .await?;
188        let lash_core::ProcessEffectOutcome::Cancel { record } = outcome else {
189            return Err(EmbedError::Plugin(lash_core::PluginError::Session(
190                "process cancel returned the wrong outcome".to_string(),
191            )));
192        };
193        Ok(lash_core::ProcessCancelSummary::from_record(*record))
194    }
195
196    pub async fn signal(
197        &self,
198        process_id: &str,
199        signal_name: impl Into<String>,
200        signal_id: impl Into<String>,
201        request: lash_core::ProcessEventAppendRequest,
202        scoped_effect_controller: ScopedEffectController<'_>,
203    ) -> Result<lash_core::ProcessEvent> {
204        let signal_name = signal_name.into();
205        let event_type = request.event_type.clone();
206        let payload = request.payload.clone();
207        let command = lash_core::ProcessCommand::Signal {
208            process_id: process_id.to_string(),
209            signal_name: signal_name.clone(),
210            signal_id: signal_id.into(),
211            request,
212        };
213        let outcome = self
214            .run_command(command, scoped_effect_controller.clone())
215            .await?;
216        let lash_core::ProcessEffectOutcome::Signal { event } = outcome else {
217            return Err(EmbedError::Plugin(lash_core::PluginError::Session(
218                "process signal returned the wrong outcome".to_string(),
219            )));
220        };
221        let registry = self.registry()?;
222        let waiting_ordinal =
223            registry
224                .get_process(process_id)
225                .await
226                .and_then(|record| match record.wait {
227                    Some(lash_core::WaitState {
228                        kind:
229                            lash_core::WaitKind::Signal {
230                                name,
231                                event_type: wait_event_type,
232                                ordinal,
233                                ..
234                            },
235                        ..
236                    }) if name == signal_name && wait_event_type == event_type => Some(ordinal),
237                    _ => None,
238                });
239        let ordinal = match waiting_ordinal {
240            Some(ordinal) => ordinal,
241            None => {
242                registry
243                    .count_events_through(process_id, &event_type, event.sequence)
244                    .await?
245            }
246        };
247        if ordinal > 0 {
248            let key = scoped_effect_controller
249                .controller()
250                .await_event_key(
251                    &lash_core::ExecutionScope::process(process_id),
252                    lash_core::AwaitEventWaitIdentity::process_signal(
253                        process_id,
254                        &signal_name,
255                        ordinal,
256                    ),
257                )
258                .await
259                .map_err(|err| {
260                    EmbedError::Plugin(lash_core::PluginError::Session(err.to_string()))
261                })?;
262            let _ = scoped_effect_controller
263                .controller()
264                .resolve_await_event(&key, lash_core::Resolution::Ok(payload))
265                .await
266                .map_err(|err| {
267                    EmbedError::Plugin(lash_core::PluginError::Session(err.to_string()))
268                })?;
269        }
270        Ok(*event)
271    }
272
273    pub async fn session_snapshot(
274        &self,
275        session_id: impl Into<String>,
276    ) -> Result<lash_core::ProcessWorkSnapshot> {
277        self.make_observer()?
278            .snapshot_for_session(session_id)
279            .await
280            .map_err(Into::into)
281    }
282
283    pub fn observer(&self) -> Result<lash_core::ProcessWorkObserver> {
284        self.make_observer()
285    }
286
287    /// Cancel every currently-running process. A host-wide lever; for a
288    /// session-scoped stop use [`SessionProcessAdmin::cancel_all`](crate::admin::SessionProcessAdmin::cancel_all).
289    pub async fn cancel_all(
290        &self,
291        scoped_effect_controller: ScopedEffectController<'_>,
292    ) -> Result<Vec<lash_core::ProcessCancelSummary>> {
293        let running = self
294            .list(&lash_core::ProcessListFilter {
295                status: lash_core::ProcessStatusFilter::Running,
296                ..lash_core::ProcessListFilter::default()
297            })
298            .await?;
299        let mut summaries = Vec::with_capacity(running.len());
300        for process in running {
301            summaries.push(
302                self.cancel(&process.process_id, scoped_effect_controller.clone())
303                    .await?,
304            );
305        }
306        Ok(summaries)
307    }
308
309    /// Move handle grants for `process_ids` from one session scope to another.
310    /// Processes are global; this re-homes only the addressing grant, never the
311    /// process itself.
312    pub async fn transfer(
313        &self,
314        from_scope: &lash_core::SessionScope,
315        to_scope: &lash_core::SessionScope,
316        process_ids: &[String],
317    ) -> Result<()> {
318        self.registry()?
319            .transfer_handle_grants(from_scope, to_scope, process_ids)
320            .await
321            .map_err(Into::into)
322    }
323
324    /// Host-scheduled retention lever (ADR 0017): physically delete terminal
325    /// process rows (and their events, grants, leases) older than
326    /// `cutoff_epoch_ms`, returning what was reclaimed. Non-terminal rows are
327    /// never touched. Choose a cutoff comfortably longer than any live await.
328    pub async fn prune(&self, cutoff_epoch_ms: u64) -> Result<lash_core::ProcessPruneReport> {
329        self.registry()?
330            .prune_terminal_processes(cutoff_epoch_ms)
331            .await
332            .map_err(Into::into)
333    }
334
335    /// Record a durable, non-terminal **Abandon Request** on a process (ADR
336    /// 0019): a third party's authorization to accept uncertainty about an
337    /// owner. This never terminalizes anything itself — the recovery sweep
338    /// reconciles it into `Abandoned` only once the owner's lease has lapsed;
339    /// the marker stays visible to observers while pending. Returns the process
340    /// as observed after the marker is written.
341    pub async fn request_abandon(
342        &self,
343        process_id: &str,
344        requested_by: impl Into<String>,
345        reason: Option<String>,
346    ) -> Result<lash_core::ObservedProcess> {
347        let request = lash_core::AbandonRequest {
348            requested_by: requested_by.into(),
349            requested_at_ms: now_epoch_ms(),
350            reason,
351        };
352        self.registry()?
353            .request_process_abandon(process_id, request)
354            .await?;
355        self.get(process_id).await?.ok_or_else(|| {
356            EmbedError::Plugin(lash_core::PluginError::Session(format!(
357                "process `{process_id}` vanished after recording its abandon request"
358            )))
359        })
360    }
361}
362
363/// Host wall-clock epoch milliseconds for facade-issued markers (e.g. the
364/// Abandon Request timestamp). The registry stays state-only, so the facade
365/// stamps the request time itself. Shared with the session-scoped abandon lever
366/// in [`crate::admin`].
367pub(crate) fn now_epoch_ms() -> u64 {
368    std::time::SystemTime::now()
369        .duration_since(std::time::UNIX_EPOCH)
370        .map(|elapsed| elapsed.as_millis() as u64)
371        .unwrap_or(0)
372}