Skip to main content

lash_core/runtime/process/
registry.rs

1use crate::plugin::PluginError;
2
3use super::events::{
4    ProcessAwaitOutput, ProcessEvent, ProcessEventAppendRequest, ProcessEventAppendResult,
5};
6use super::model::{
7    AbandonRequest, ProcessExternalRef, ProcessHandleDescriptor, ProcessHandleGrant,
8    ProcessHandleGrantEntry, ProcessLease, ProcessLeaseClaimOutcome, ProcessLeaseCompletion,
9    ProcessListFilter, ProcessRecord, ProcessRegistration, ProcessSessionDeleteReport,
10    ProcessStarted, SessionScope, WaitState,
11};
12
13/// Outcome of [`ProcessRegistry::prune_terminal_processes`]: how many terminal
14/// process rows and event rows were physically deleted.
15#[derive(Clone, Debug, Default, PartialEq, Eq)]
16pub struct ProcessPruneReport {
17    /// Terminal process rows deleted.
18    pub pruned_processes: usize,
19    /// Event rows deleted across those processes.
20    pub pruned_events: usize,
21}
22
23/// Durability-neutral process registry.
24///
25/// Process waits are coordination behavior and live on
26/// [`ProcessWorkDriver`](crate::ProcessWorkDriver) /
27/// [`ProcessAwaiter`](crate::ProcessAwaiter), not on persistence
28/// implementations. Registry methods are point reads and writes only. See
29/// `docs/adr/0016-process-waits-live-on-the-work-driver-seam.md`.
30#[async_trait::async_trait]
31pub trait ProcessRegistry: Send + Sync {
32    /// Durability tier this process registry provides; defaults to
33    /// [`DurabilityTier`](crate::DurabilityTier)`::Inline`.
34    fn durability_tier(&self) -> crate::DurabilityTier {
35        crate::DurabilityTier::Inline
36    }
37
38    async fn register_process(
39        &self,
40        registration: ProcessRegistration,
41    ) -> Result<ProcessRecord, PluginError>;
42
43    /// Attach a durable backend reference to a registered process.
44    ///
45    /// Implementations must reject unknown process ids. The first assignment
46    /// stores the reference. Repeating the exact same assignment is an
47    /// idempotent no-op that returns the existing record unchanged. Assigning a
48    /// different reference after one has been stored is a registry model error.
49    async fn set_external_ref(
50        &self,
51        process_id: &str,
52        external_ref: ProcessExternalRef,
53    ) -> Result<ProcessRecord, PluginError>;
54
55    async fn grant_handle(
56        &self,
57        session_scope: &SessionScope,
58        process_id: &str,
59        descriptor: ProcessHandleDescriptor,
60    ) -> Result<ProcessHandleGrant, PluginError>;
61
62    async fn revoke_handle(
63        &self,
64        session_scope: &SessionScope,
65        process_id: &str,
66    ) -> Result<(), PluginError>;
67
68    async fn transfer_handle_grants(
69        &self,
70        from_scope: &SessionScope,
71        to_scope: &SessionScope,
72        process_ids: &[String],
73    ) -> Result<(), PluginError>;
74
75    async fn list_handle_grants(
76        &self,
77        session_scope: &SessionScope,
78    ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError>;
79
80    async fn list_live_handle_grants(
81        &self,
82        session_scope: &SessionScope,
83    ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
84        Ok(self
85            .list_handle_grants(session_scope)
86            .await?
87            .into_iter()
88            .filter(|(_, record)| !record.is_terminal())
89            .collect())
90    }
91
92    async fn has_handle_grant(
93        &self,
94        session_scope: &SessionScope,
95        process_id: &str,
96    ) -> Result<bool, PluginError> {
97        Ok(self
98            .list_handle_grants(session_scope)
99            .await?
100            .into_iter()
101            .any(|(grant, _)| grant.process_id == process_id))
102    }
103
104    async fn handle_grants_for_process(
105        &self,
106        process_id: &str,
107    ) -> Result<Vec<ProcessHandleGrant>, PluginError>;
108
109    async fn delete_session_process_state(
110        &self,
111        session_id: &str,
112    ) -> Result<ProcessSessionDeleteReport, PluginError>;
113
114    async fn append_event(
115        &self,
116        process_id: &str,
117        request: ProcessEventAppendRequest,
118    ) -> Result<ProcessEventAppendResult, PluginError>;
119
120    async fn events_after(
121        &self,
122        process_id: &str,
123        after_sequence: u64,
124    ) -> Result<Vec<ProcessEvent>, PluginError>;
125
126    /// Count events of `event_type` with `sequence <= up_to_sequence`.
127    ///
128    /// This is the signal-ordinal query: the Nth occurrence of a signal event
129    /// resolves the Nth durable wait key. The default scans the event log;
130    /// store backends override it with a COUNT so per-signal cost stays flat
131    /// instead of growing with a long-lived process's history.
132    async fn count_events_through(
133        &self,
134        process_id: &str,
135        event_type: &str,
136        up_to_sequence: u64,
137    ) -> Result<u64, PluginError> {
138        Ok(self
139            .events_after(process_id, 0)
140            .await?
141            .into_iter()
142            .filter(|event| event.sequence <= up_to_sequence && event.event_type == event_type)
143            .count() as u64)
144    }
145
146    /// The most recent `limit` events, in ascending sequence order.
147    ///
148    /// Observation snapshots use this to show a bounded activity tail without
149    /// fetching a process's entire history on every poll. The default scans
150    /// the event log; store backends override it with ORDER BY ... LIMIT.
151    async fn recent_events(
152        &self,
153        process_id: &str,
154        limit: usize,
155    ) -> Result<Vec<ProcessEvent>, PluginError> {
156        let mut events = self.events_after(process_id, 0).await?;
157        if events.len() > limit {
158            events.drain(..events.len() - limit);
159        }
160        Ok(events)
161    }
162
163    async fn wake_events_after(
164        &self,
165        process_id: &str,
166        after_sequence: u64,
167    ) -> Result<Vec<ProcessEvent>, PluginError>;
168
169    async fn complete_process(
170        &self,
171        process_id: &str,
172        await_output: ProcessAwaitOutput,
173    ) -> Result<ProcessRecord, PluginError>;
174
175    /// Record the durable, lease-fenced "execution started" fact (ADR 0019).
176    ///
177    /// First-writer-wins: the first call stores `started`; a later call is an
178    /// idempotent no-op returning the existing record unchanged (the fact is
179    /// immutable once written, so the sweep can prove an OwnerBound row has
180    /// begun executing). Implementations reject unknown process ids.
181    async fn record_first_started(
182        &self,
183        process_id: &str,
184        started: ProcessStarted,
185    ) -> Result<ProcessRecord, PluginError>;
186
187    /// Set the durable, non-terminal Abandon Request marker (ADR 0019).
188    ///
189    /// First-writer-wins: if a marker is already present the call is an
190    /// idempotent no-op returning the existing record unchanged, preserving the
191    /// original recorded authorization rather than letting a later requester
192    /// clobber it. Setting it on a terminal row is a model error — a terminal
193    /// process has already recorded its outcome, so there is nothing to abandon.
194    async fn request_process_abandon(
195        &self,
196        process_id: &str,
197        request: AbandonRequest,
198    ) -> Result<ProcessRecord, PluginError>;
199
200    async fn set_process_wait(
201        &self,
202        process_id: &str,
203        wait: WaitState,
204    ) -> Result<ProcessRecord, PluginError>;
205
206    async fn clear_process_wait(&self, process_id: &str) -> Result<ProcessRecord, PluginError>;
207
208    async fn get_process(&self, process_id: &str) -> Option<ProcessRecord>;
209
210    async fn list_processes(
211        &self,
212        filter: &ProcessListFilter,
213    ) -> Result<Vec<ProcessRecord>, PluginError>;
214
215    async fn ack_wake(&self, process_id: &str, sequence: u64) -> Result<(), PluginError>;
216
217    /// All non-terminal process records, in stable `process_id` order.
218    ///
219    /// This is the recovery sweep's worklist: every process that was started
220    /// but has not reached a terminal event is a candidate for re-execution by
221    /// a [`DurableProcessWorker`](crate::DurableProcessWorker) after a crash.
222    /// Terminal processes are excluded — they are already done and idempotent by
223    /// `process_id`, so re-running them would be wasted work.
224    async fn list_non_terminal(&self) -> Result<Vec<ProcessRecord>, PluginError>;
225
226    /// Claim the durable single-owner lease over a non-terminal process.
227    ///
228    /// An unexpired lease held by a *different* owner returns
229    /// [`ProcessLeaseClaimOutcome::Busy`] carrying the observed holder;
230    /// claiming a free or expired lease succeeds and bumps the
231    /// `fencing_token`, and the same incarnation re-entering its own live
232    /// lease extends it without changing token or fence. The returned
233    /// [`ProcessLease`]'s `(owner, lease_token)` plus `fencing_token` are the
234    /// contract a worker presents on every subsequent renew/complete — a stale
235    /// writer is rejected.
236    async fn claim_process_lease(
237        &self,
238        process_id: &str,
239        owner: &crate::LeaseOwnerIdentity,
240        lease_ttl_ms: u64,
241    ) -> Result<ProcessLeaseClaimOutcome, PluginError>;
242
243    /// Reclaim an unexpired process lease whose observed holder is definitely
244    /// dead according to persisted local-process liveness metadata.
245    ///
246    /// Mirrors
247    /// [`RuntimePersistence::reclaim_session_execution_lease`](crate::RuntimePersistence::reclaim_session_execution_lease):
248    /// backends must CAS on `observed_holder` (owner identity, lease token,
249    /// and fencing token) so a stale claimant cannot clear a newer live lease
250    /// that won the race after the busy observation, and a successful reclaim
251    /// must advance the fencing token monotonically.
252    async fn reclaim_process_lease(
253        &self,
254        process_id: &str,
255        owner: &crate::LeaseOwnerIdentity,
256        observed_holder: &ProcessLease,
257        lease_ttl_ms: u64,
258    ) -> Result<ProcessLeaseClaimOutcome, PluginError>;
259
260    /// Extend the expiry of a live lease the caller still owns.
261    ///
262    /// The lease must match the persisted `(owner, lease_token, fencing_token)`
263    /// and be unexpired, else the renewal is rejected (the lease was superseded
264    /// or expired). Workers renew across long-running effects so a healthy
265    /// process is not swept out from under its live owner.
266    async fn renew_process_lease(
267        &self,
268        lease: &ProcessLease,
269        lease_ttl_ms: u64,
270    ) -> Result<ProcessLease, PluginError>;
271
272    /// Read the current lease row for a process without claiming it.
273    ///
274    /// Returns the persisted lease when one is held (owner and token present),
275    /// or `None` when the row is unleased or released. The returned lease may be
276    /// expired: expiry is a raw fact exposed read-side (ADR 0019) so hosts
277    /// classify staleness themselves; this never mutates the lease. Unknown
278    /// process ids return `None`.
279    async fn get_process_lease(
280        &self,
281        process_id: &str,
282    ) -> Result<Option<ProcessLease>, PluginError>;
283
284    /// Release a lease the caller owns, fenced by the completion's
285    /// `(process_id, lease_token)`.
286    ///
287    /// Mirrors clearing a runtime turn lease: a stale completion (whose token no
288    /// longer matches the live lease) is a no-op so it cannot release a lease a
289    /// newer owner now holds. Idempotent — completing an already-released lease
290    /// succeeds.
291    async fn complete_process_lease(
292        &self,
293        completion: &ProcessLeaseCompletion,
294    ) -> Result<(), PluginError>;
295
296    /// Physically delete terminal process rows whose `updated_at_ms` is older
297    /// than `cutoff_epoch_ms`, together with their events, wake acks, handle
298    /// grants, and lease rows. Host-scheduled retention: hosts that project
299    /// results/events into their own store call this to keep the registry
300    /// bounded. Non-terminal rows are never touched. Callers must choose a
301    /// retention window comfortably longer than any waiter lifetime — a
302    /// pruned process id becomes "unknown process" to late awaits.
303    ///
304    /// ```no_run
305    /// use std::time::{Duration, SystemTime, UNIX_EPOCH};
306    /// use lash_core::{PluginError, ProcessRegistry};
307    ///
308    /// async fn prune_week_old(registry: &dyn ProcessRegistry) -> Result<(), PluginError> {
309    ///     let now_ms = SystemTime::now()
310    ///         .duration_since(UNIX_EPOCH)
311    ///         .expect("clock after epoch")
312    ///         .as_millis() as u64;
313    ///     // Window must exceed any in-flight await's lifetime (ADR 0017).
314    ///     let cutoff = now_ms - Duration::from_secs(7 * 24 * 60 * 60).as_millis() as u64;
315    ///     let report = registry.prune_terminal_processes(cutoff).await?;
316    ///     eprintln!(
317    ///         "pruned {} processes, {} events",
318    ///         report.pruned_processes, report.pruned_events
319    ///     );
320    ///     Ok(())
321    /// }
322    /// ```
323    async fn prune_terminal_processes(
324        &self,
325        cutoff_epoch_ms: u64,
326    ) -> Result<ProcessPruneReport, PluginError>;
327}