Skip to main content

lash_core/runtime/process/
registry.rs

1use crate::plugin::PluginError;
2
3use super::events::{
4    ProcessAwaitOutput, ProcessEvent, ProcessEventAppendRequest, ProcessEventAppendResult,
5};
6use super::model::{
7    ProcessExternalRef, ProcessHandleDescriptor, ProcessHandleGrant, ProcessHandleGrantEntry,
8    ProcessLease, ProcessLeaseCompletion, ProcessListFilter, ProcessRecord, ProcessRegistration,
9    ProcessSessionDeleteReport, SessionScope, WaitState,
10};
11
12/// Durability-neutral process registry.
13#[async_trait::async_trait]
14pub trait ProcessRegistry: Send + Sync {
15    /// Durability tier this process registry provides; defaults to
16    /// [`DurabilityTier`](crate::DurabilityTier)`::Inline`.
17    fn durability_tier(&self) -> crate::DurabilityTier {
18        crate::DurabilityTier::Inline
19    }
20
21    async fn register_process(
22        &self,
23        registration: ProcessRegistration,
24    ) -> Result<ProcessRecord, PluginError>;
25
26    async fn set_external_ref(
27        &self,
28        process_id: &str,
29        external_ref: ProcessExternalRef,
30    ) -> Result<ProcessRecord, PluginError>;
31
32    async fn grant_handle(
33        &self,
34        session_scope: &SessionScope,
35        process_id: &str,
36        descriptor: ProcessHandleDescriptor,
37    ) -> Result<ProcessHandleGrant, PluginError>;
38
39    async fn revoke_handle(
40        &self,
41        session_scope: &SessionScope,
42        process_id: &str,
43    ) -> Result<(), PluginError>;
44
45    async fn transfer_handle_grants(
46        &self,
47        from_scope: &SessionScope,
48        to_scope: &SessionScope,
49        process_ids: &[String],
50    ) -> Result<(), PluginError>;
51
52    async fn list_handle_grants(
53        &self,
54        session_scope: &SessionScope,
55    ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError>;
56
57    async fn list_live_handle_grants(
58        &self,
59        session_scope: &SessionScope,
60    ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
61        Ok(self
62            .list_handle_grants(session_scope)
63            .await?
64            .into_iter()
65            .filter(|(_, record)| !record.is_terminal())
66            .collect())
67    }
68
69    async fn has_handle_grant(
70        &self,
71        session_scope: &SessionScope,
72        process_id: &str,
73    ) -> Result<bool, PluginError> {
74        Ok(self
75            .list_handle_grants(session_scope)
76            .await?
77            .into_iter()
78            .any(|(grant, _)| grant.process_id == process_id))
79    }
80
81    async fn handle_grants_for_process(
82        &self,
83        process_id: &str,
84    ) -> Result<Vec<ProcessHandleGrant>, PluginError>;
85
86    async fn delete_session_process_state(
87        &self,
88        session_id: &str,
89    ) -> Result<ProcessSessionDeleteReport, PluginError>;
90
91    async fn append_event(
92        &self,
93        process_id: &str,
94        request: ProcessEventAppendRequest,
95    ) -> Result<ProcessEventAppendResult, PluginError>;
96
97    async fn events_after(
98        &self,
99        process_id: &str,
100        after_sequence: u64,
101    ) -> Result<Vec<ProcessEvent>, PluginError>;
102
103    /// Count events of `event_type` with `sequence <= up_to_sequence`.
104    ///
105    /// This is the signal-ordinal query: the Nth occurrence of a signal event
106    /// resolves the Nth durable wait key. The default scans the event log;
107    /// store backends override it with a COUNT so per-signal cost stays flat
108    /// instead of growing with a long-lived process's history.
109    async fn count_events_through(
110        &self,
111        process_id: &str,
112        event_type: &str,
113        up_to_sequence: u64,
114    ) -> Result<u64, PluginError> {
115        Ok(self
116            .events_after(process_id, 0)
117            .await?
118            .into_iter()
119            .filter(|event| event.sequence <= up_to_sequence && event.event_type == event_type)
120            .count() as u64)
121    }
122
123    /// The most recent `limit` events, in ascending sequence order.
124    ///
125    /// Observation snapshots use this to show a bounded activity tail without
126    /// fetching a process's entire history on every poll. The default scans
127    /// the event log; store backends override it with ORDER BY ... LIMIT.
128    async fn recent_events(
129        &self,
130        process_id: &str,
131        limit: usize,
132    ) -> Result<Vec<ProcessEvent>, PluginError> {
133        let mut events = self.events_after(process_id, 0).await?;
134        if events.len() > limit {
135            events.drain(..events.len() - limit);
136        }
137        Ok(events)
138    }
139
140    async fn wake_events_after(
141        &self,
142        process_id: &str,
143        after_sequence: u64,
144    ) -> Result<Vec<ProcessEvent>, PluginError>;
145
146    async fn wait_event_after(
147        &self,
148        process_id: &str,
149        event_type: &str,
150        after_sequence: u64,
151    ) -> Result<ProcessEvent, PluginError>;
152
153    async fn await_process(&self, process_id: &str) -> Result<ProcessAwaitOutput, PluginError>;
154
155    async fn complete_process(
156        &self,
157        process_id: &str,
158        await_output: ProcessAwaitOutput,
159    ) -> Result<ProcessRecord, PluginError>;
160
161    async fn set_process_wait(
162        &self,
163        process_id: &str,
164        wait: WaitState,
165    ) -> Result<ProcessRecord, PluginError>;
166
167    async fn clear_process_wait(&self, process_id: &str) -> Result<ProcessRecord, PluginError>;
168
169    async fn get_process(&self, process_id: &str) -> Option<ProcessRecord>;
170
171    async fn list_processes(
172        &self,
173        filter: &ProcessListFilter,
174    ) -> Result<Vec<ProcessRecord>, PluginError>;
175
176    async fn ack_wake(&self, process_id: &str, sequence: u64) -> Result<(), PluginError>;
177
178    /// All non-terminal process records, in stable `process_id` order.
179    ///
180    /// This is the recovery sweep's worklist: every process that was started
181    /// but has not reached a terminal event is a candidate for re-execution by
182    /// a [`DurableProcessWorker`](crate::DurableProcessWorker) after a crash.
183    /// Terminal processes are excluded — they are already done and idempotent by
184    /// `process_id`, so re-running them would be wasted work.
185    async fn list_non_terminal(&self) -> Result<Vec<ProcessRecord>, PluginError>;
186
187    /// Claim the durable single-owner lease over a non-terminal process.
188    ///
189    /// An unexpired lease held by a *different* owner fences the claim (returns
190    /// an error); claiming a free, expired, or own lease succeeds and bumps the
191    /// `fencing_token`. The returned [`ProcessLease`]'s
192    /// `(owner_id, lease_token)` plus `fencing_token` are the contract a worker
193    /// presents on every subsequent renew/complete — a stale writer is rejected.
194    async fn claim_process_lease(
195        &self,
196        process_id: &str,
197        owner_id: &str,
198        lease_ttl_ms: u64,
199    ) -> Result<ProcessLease, PluginError>;
200
201    /// Extend the expiry of a live lease the caller still owns.
202    ///
203    /// The lease must match the persisted `(owner, lease_token, fencing_token)`
204    /// and be unexpired, else the renewal is rejected (the lease was superseded
205    /// or expired). Workers renew across long-running effects so a healthy
206    /// process is not swept out from under its live owner.
207    async fn renew_process_lease(
208        &self,
209        lease: &ProcessLease,
210        lease_ttl_ms: u64,
211    ) -> Result<ProcessLease, PluginError>;
212
213    /// Release a lease the caller owns, fenced by the completion's
214    /// `(process_id, lease_token)`.
215    ///
216    /// Mirrors clearing a runtime turn lease: a stale completion (whose token no
217    /// longer matches the live lease) is a no-op so it cannot release a lease a
218    /// newer owner now holds. Idempotent — completing an already-released lease
219    /// succeeds.
220    async fn complete_process_lease(
221        &self,
222        completion: &ProcessLeaseCompletion,
223    ) -> Result<(), PluginError>;
224}