lash_core/runtime/process/registry.rs
1use crate::plugin::PluginError;
2
3use super::events::{
4 ProcessAwaitOutput, ProcessEvent, ProcessEventAppendRequest, ProcessEventAppendResult,
5};
6use super::model::{
7 AbandonRequest, ProcessExternalRef, ProcessHandleDescriptor, ProcessHandleGrant,
8 ProcessHandleGrantEntry, ProcessLease, ProcessLeaseClaimOutcome, ProcessLeaseCompletion,
9 ProcessListFilter, ProcessRecord, ProcessRegistration, ProcessSessionDeleteReport,
10 ProcessStarted, SessionScope, WaitState,
11};
12
13/// Outcome of [`ProcessRegistry::prune_terminal_processes`]: how many terminal
14/// process rows and event rows were physically deleted.
15#[derive(Clone, Debug, Default, PartialEq, Eq)]
16pub struct ProcessPruneReport {
17 /// Terminal process rows deleted.
18 pub pruned_processes: usize,
19 /// Event rows deleted across those processes.
20 pub pruned_events: usize,
21}
22
23/// Durability-neutral process registry.
24///
25/// Process waits are coordination behavior and live on
26/// [`ProcessWorkDriver`](crate::ProcessWorkDriver) /
27/// [`ProcessAwaiter`](crate::ProcessAwaiter), not on persistence
28/// implementations. Registry methods are point reads and writes only. See
29/// `docs/adr/0016-process-waits-live-on-the-work-driver-seam.md`.
30#[async_trait::async_trait]
31pub trait ProcessRegistry: Send + Sync {
32 /// Durability tier this process registry provides; defaults to
33 /// [`DurabilityTier`](crate::DurabilityTier)`::Inline`.
34 fn durability_tier(&self) -> crate::DurabilityTier {
35 crate::DurabilityTier::Inline
36 }
37
38 async fn register_process(
39 &self,
40 registration: ProcessRegistration,
41 ) -> Result<ProcessRecord, PluginError>;
42
43 /// Attach a durable backend reference to a registered process.
44 ///
45 /// Implementations must reject unknown process ids. The first assignment
46 /// stores the reference. Repeating the exact same assignment is an
47 /// idempotent no-op that returns the existing record unchanged. Assigning a
48 /// different reference after one has been stored is a registry model error.
49 async fn set_external_ref(
50 &self,
51 process_id: &str,
52 external_ref: ProcessExternalRef,
53 ) -> Result<ProcessRecord, PluginError>;
54
55 async fn grant_handle(
56 &self,
57 session_scope: &SessionScope,
58 process_id: &str,
59 descriptor: ProcessHandleDescriptor,
60 ) -> Result<ProcessHandleGrant, PluginError>;
61
62 async fn revoke_handle(
63 &self,
64 session_scope: &SessionScope,
65 process_id: &str,
66 ) -> Result<(), PluginError>;
67
68 async fn transfer_handle_grants(
69 &self,
70 from_scope: &SessionScope,
71 to_scope: &SessionScope,
72 process_ids: &[String],
73 ) -> Result<(), PluginError>;
74
75 async fn list_handle_grants(
76 &self,
77 session_scope: &SessionScope,
78 ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError>;
79
80 async fn list_live_handle_grants(
81 &self,
82 session_scope: &SessionScope,
83 ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
84 Ok(self
85 .list_handle_grants(session_scope)
86 .await?
87 .into_iter()
88 .filter(|(_, record)| !record.is_terminal())
89 .collect())
90 }
91
92 async fn has_handle_grant(
93 &self,
94 session_scope: &SessionScope,
95 process_id: &str,
96 ) -> Result<bool, PluginError> {
97 Ok(self
98 .list_handle_grants(session_scope)
99 .await?
100 .into_iter()
101 .any(|(grant, _)| grant.process_id == process_id))
102 }
103
104 async fn handle_grants_for_process(
105 &self,
106 process_id: &str,
107 ) -> Result<Vec<ProcessHandleGrant>, PluginError>;
108
109 async fn delete_session_process_state(
110 &self,
111 session_id: &str,
112 ) -> Result<ProcessSessionDeleteReport, PluginError>;
113
114 async fn append_event(
115 &self,
116 process_id: &str,
117 request: ProcessEventAppendRequest,
118 ) -> Result<ProcessEventAppendResult, PluginError>;
119
120 async fn events_after(
121 &self,
122 process_id: &str,
123 after_sequence: u64,
124 ) -> Result<Vec<ProcessEvent>, PluginError>;
125
126 /// Count events of `event_type` with `sequence <= up_to_sequence`.
127 ///
128 /// This is the signal-ordinal query: the Nth occurrence of a signal event
129 /// resolves the Nth durable wait key. The default scans the event log;
130 /// store backends override it with a COUNT so per-signal cost stays flat
131 /// instead of growing with a long-lived process's history.
132 async fn count_events_through(
133 &self,
134 process_id: &str,
135 event_type: &str,
136 up_to_sequence: u64,
137 ) -> Result<u64, PluginError> {
138 Ok(self
139 .events_after(process_id, 0)
140 .await?
141 .into_iter()
142 .filter(|event| event.sequence <= up_to_sequence && event.event_type == event_type)
143 .count() as u64)
144 }
145
146 /// The most recent `limit` events, in ascending sequence order.
147 ///
148 /// Observation snapshots use this to show a bounded activity tail without
149 /// fetching a process's entire history on every poll. The default scans
150 /// the event log; store backends override it with ORDER BY ... LIMIT.
151 async fn recent_events(
152 &self,
153 process_id: &str,
154 limit: usize,
155 ) -> Result<Vec<ProcessEvent>, PluginError> {
156 let mut events = self.events_after(process_id, 0).await?;
157 if events.len() > limit {
158 events.drain(..events.len() - limit);
159 }
160 Ok(events)
161 }
162
163 async fn wake_events_after(
164 &self,
165 process_id: &str,
166 after_sequence: u64,
167 ) -> Result<Vec<ProcessEvent>, PluginError>;
168
169 async fn complete_process(
170 &self,
171 process_id: &str,
172 await_output: ProcessAwaitOutput,
173 ) -> Result<ProcessRecord, PluginError>;
174
175 /// Record the durable, lease-fenced "execution started" fact (ADR 0019).
176 ///
177 /// First-writer-wins: the first call stores `started`; a later call is an
178 /// idempotent no-op returning the existing record unchanged (the fact is
179 /// immutable once written, so the sweep can prove an OwnerBound row has
180 /// begun executing). Implementations reject unknown process ids.
181 async fn record_first_started(
182 &self,
183 process_id: &str,
184 started: ProcessStarted,
185 ) -> Result<ProcessRecord, PluginError>;
186
187 /// Set the durable, non-terminal Abandon Request marker (ADR 0019).
188 ///
189 /// First-writer-wins: if a marker is already present the call is an
190 /// idempotent no-op returning the existing record unchanged, preserving the
191 /// original recorded authorization rather than letting a later requester
192 /// clobber it. Setting it on a terminal row is a model error — a terminal
193 /// process has already recorded its outcome, so there is nothing to abandon.
194 async fn request_process_abandon(
195 &self,
196 process_id: &str,
197 request: AbandonRequest,
198 ) -> Result<ProcessRecord, PluginError>;
199
200 async fn set_process_wait(
201 &self,
202 process_id: &str,
203 wait: WaitState,
204 ) -> Result<ProcessRecord, PluginError>;
205
206 async fn clear_process_wait(&self, process_id: &str) -> Result<ProcessRecord, PluginError>;
207
208 async fn get_process(&self, process_id: &str) -> Option<ProcessRecord>;
209
210 async fn list_processes(
211 &self,
212 filter: &ProcessListFilter,
213 ) -> Result<Vec<ProcessRecord>, PluginError>;
214
215 async fn ack_wake(&self, process_id: &str, sequence: u64) -> Result<(), PluginError>;
216
217 /// All non-terminal process records, in stable `process_id` order.
218 ///
219 /// This is the recovery sweep's worklist: every process that was started
220 /// but has not reached a terminal event is a candidate for re-execution by
221 /// a [`DurableProcessWorker`](crate::DurableProcessWorker) after a crash.
222 /// Terminal processes are excluded — they are already done and idempotent by
223 /// `process_id`, so re-running them would be wasted work.
224 async fn list_non_terminal(&self) -> Result<Vec<ProcessRecord>, PluginError>;
225
226 /// Claim the durable single-owner lease over a non-terminal process.
227 ///
228 /// An unexpired lease held by a *different* owner returns
229 /// [`ProcessLeaseClaimOutcome::Busy`] carrying the observed holder;
230 /// claiming a free or expired lease succeeds and bumps the
231 /// `fencing_token`, and the same incarnation re-entering its own live
232 /// lease extends it without changing token or fence. The returned
233 /// [`ProcessLease`]'s `(owner, lease_token)` plus `fencing_token` are the
234 /// contract a worker presents on every subsequent renew/complete — a stale
235 /// writer is rejected.
236 async fn claim_process_lease(
237 &self,
238 process_id: &str,
239 owner: &crate::LeaseOwnerIdentity,
240 lease_ttl_ms: u64,
241 ) -> Result<ProcessLeaseClaimOutcome, PluginError>;
242
243 /// Reclaim an unexpired process lease whose observed holder is definitely
244 /// dead according to persisted local-process liveness metadata.
245 ///
246 /// Mirrors
247 /// [`RuntimePersistence::reclaim_session_execution_lease`](crate::RuntimePersistence::reclaim_session_execution_lease):
248 /// backends must CAS on `observed_holder` (owner identity, lease token,
249 /// and fencing token) so a stale claimant cannot clear a newer live lease
250 /// that won the race after the busy observation, and a successful reclaim
251 /// must advance the fencing token monotonically.
252 async fn reclaim_process_lease(
253 &self,
254 process_id: &str,
255 owner: &crate::LeaseOwnerIdentity,
256 observed_holder: &ProcessLease,
257 lease_ttl_ms: u64,
258 ) -> Result<ProcessLeaseClaimOutcome, PluginError>;
259
260 /// Extend the expiry of a live lease the caller still owns.
261 ///
262 /// The lease must match the persisted `(owner, lease_token, fencing_token)`
263 /// and be unexpired, else the renewal is rejected (the lease was superseded
264 /// or expired). Workers renew across long-running effects so a healthy
265 /// process is not swept out from under its live owner.
266 async fn renew_process_lease(
267 &self,
268 lease: &ProcessLease,
269 lease_ttl_ms: u64,
270 ) -> Result<ProcessLease, PluginError>;
271
272 /// Read the current lease row for a process without claiming it.
273 ///
274 /// Returns the persisted lease when one is held (owner and token present),
275 /// or `None` when the row is unleased or released. The returned lease may be
276 /// expired: expiry is a raw fact exposed read-side (ADR 0019) so hosts
277 /// classify staleness themselves; this never mutates the lease. Unknown
278 /// process ids return `None`.
279 async fn get_process_lease(
280 &self,
281 process_id: &str,
282 ) -> Result<Option<ProcessLease>, PluginError>;
283
284 /// Release a lease the caller owns, fenced by the completion's
285 /// `(process_id, lease_token)`.
286 ///
287 /// Mirrors clearing a runtime turn lease: a stale completion (whose token no
288 /// longer matches the live lease) is a no-op so it cannot release a lease a
289 /// newer owner now holds. Idempotent — completing an already-released lease
290 /// succeeds.
291 async fn complete_process_lease(
292 &self,
293 completion: &ProcessLeaseCompletion,
294 ) -> Result<(), PluginError>;
295
296 /// Physically delete terminal process rows whose `updated_at_ms` is older
297 /// than `cutoff_epoch_ms`, together with their events, wake acks, handle
298 /// grants, and lease rows. Host-scheduled retention: hosts that project
299 /// results/events into their own store call this to keep the registry
300 /// bounded. Non-terminal rows are never touched. Callers must choose a
301 /// retention window comfortably longer than any waiter lifetime — a
302 /// pruned process id becomes "unknown process" to late awaits.
303 ///
304 /// ```no_run
305 /// use std::time::{Duration, SystemTime, UNIX_EPOCH};
306 /// use lash_core::{PluginError, ProcessRegistry};
307 ///
308 /// async fn prune_week_old(registry: &dyn ProcessRegistry) -> Result<(), PluginError> {
309 /// let now_ms = SystemTime::now()
310 /// .duration_since(UNIX_EPOCH)
311 /// .expect("clock after epoch")
312 /// .as_millis() as u64;
313 /// // Window must exceed any in-flight await's lifetime (ADR 0017).
314 /// let cutoff = now_ms - Duration::from_secs(7 * 24 * 60 * 60).as_millis() as u64;
315 /// let report = registry.prune_terminal_processes(cutoff).await?;
316 /// eprintln!(
317 /// "pruned {} processes, {} events",
318 /// report.pruned_processes, report.pruned_events
319 /// );
320 /// Ok(())
321 /// }
322 /// ```
323 async fn prune_terminal_processes(
324 &self,
325 cutoff_epoch_ms: u64,
326 ) -> Result<ProcessPruneReport, PluginError>;
327}