Skip to main content

lash_core/runtime/process/
registry.rs

1use crate::plugin::PluginError;
2
3use super::events::{
4    ProcessAwaitOutput, ProcessEvent, ProcessEventAppendRequest, ProcessEventAppendResult,
5};
6use super::model::{
7    ProcessExternalRef, ProcessHandleDescriptor, ProcessHandleGrant, ProcessHandleGrantEntry,
8    ProcessLease, ProcessLeaseCompletion, ProcessListFilter, ProcessRecord, ProcessRegistration,
9    ProcessSessionDeleteReport, SessionScope, WaitState,
10};
11
12/// Durability-neutral process registry.
13#[async_trait::async_trait]
14pub trait ProcessRegistry: Send + Sync {
15    /// Durability tier this process registry provides; defaults to
16    /// [`DurabilityTier`](crate::DurabilityTier)`::Inline`.
17    fn durability_tier(&self) -> crate::DurabilityTier {
18        crate::DurabilityTier::Inline
19    }
20
21    async fn register_process(
22        &self,
23        registration: ProcessRegistration,
24    ) -> Result<ProcessRecord, PluginError>;
25
26    /// Attach a durable backend reference to a registered process.
27    ///
28    /// Implementations must reject unknown process ids. The first assignment
29    /// stores the reference. Repeating the exact same assignment is an
30    /// idempotent no-op that returns the existing record unchanged. Assigning a
31    /// different reference after one has been stored is a registry model error.
32    async fn set_external_ref(
33        &self,
34        process_id: &str,
35        external_ref: ProcessExternalRef,
36    ) -> Result<ProcessRecord, PluginError>;
37
38    async fn grant_handle(
39        &self,
40        session_scope: &SessionScope,
41        process_id: &str,
42        descriptor: ProcessHandleDescriptor,
43    ) -> Result<ProcessHandleGrant, PluginError>;
44
45    async fn revoke_handle(
46        &self,
47        session_scope: &SessionScope,
48        process_id: &str,
49    ) -> Result<(), PluginError>;
50
51    async fn transfer_handle_grants(
52        &self,
53        from_scope: &SessionScope,
54        to_scope: &SessionScope,
55        process_ids: &[String],
56    ) -> Result<(), PluginError>;
57
58    async fn list_handle_grants(
59        &self,
60        session_scope: &SessionScope,
61    ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError>;
62
63    async fn list_live_handle_grants(
64        &self,
65        session_scope: &SessionScope,
66    ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
67        Ok(self
68            .list_handle_grants(session_scope)
69            .await?
70            .into_iter()
71            .filter(|(_, record)| !record.is_terminal())
72            .collect())
73    }
74
75    async fn has_handle_grant(
76        &self,
77        session_scope: &SessionScope,
78        process_id: &str,
79    ) -> Result<bool, PluginError> {
80        Ok(self
81            .list_handle_grants(session_scope)
82            .await?
83            .into_iter()
84            .any(|(grant, _)| grant.process_id == process_id))
85    }
86
87    async fn handle_grants_for_process(
88        &self,
89        process_id: &str,
90    ) -> Result<Vec<ProcessHandleGrant>, PluginError>;
91
92    async fn delete_session_process_state(
93        &self,
94        session_id: &str,
95    ) -> Result<ProcessSessionDeleteReport, PluginError>;
96
97    async fn append_event(
98        &self,
99        process_id: &str,
100        request: ProcessEventAppendRequest,
101    ) -> Result<ProcessEventAppendResult, PluginError>;
102
103    async fn events_after(
104        &self,
105        process_id: &str,
106        after_sequence: u64,
107    ) -> Result<Vec<ProcessEvent>, PluginError>;
108
109    /// Count events of `event_type` with `sequence <= up_to_sequence`.
110    ///
111    /// This is the signal-ordinal query: the Nth occurrence of a signal event
112    /// resolves the Nth durable wait key. The default scans the event log;
113    /// store backends override it with a COUNT so per-signal cost stays flat
114    /// instead of growing with a long-lived process's history.
115    async fn count_events_through(
116        &self,
117        process_id: &str,
118        event_type: &str,
119        up_to_sequence: u64,
120    ) -> Result<u64, PluginError> {
121        Ok(self
122            .events_after(process_id, 0)
123            .await?
124            .into_iter()
125            .filter(|event| event.sequence <= up_to_sequence && event.event_type == event_type)
126            .count() as u64)
127    }
128
129    /// The most recent `limit` events, in ascending sequence order.
130    ///
131    /// Observation snapshots use this to show a bounded activity tail without
132    /// fetching a process's entire history on every poll. The default scans
133    /// the event log; store backends override it with ORDER BY ... LIMIT.
134    async fn recent_events(
135        &self,
136        process_id: &str,
137        limit: usize,
138    ) -> Result<Vec<ProcessEvent>, PluginError> {
139        let mut events = self.events_after(process_id, 0).await?;
140        if events.len() > limit {
141            events.drain(..events.len() - limit);
142        }
143        Ok(events)
144    }
145
146    async fn wake_events_after(
147        &self,
148        process_id: &str,
149        after_sequence: u64,
150    ) -> Result<Vec<ProcessEvent>, PluginError>;
151
152    async fn wait_event_after(
153        &self,
154        process_id: &str,
155        event_type: &str,
156        after_sequence: u64,
157    ) -> Result<ProcessEvent, PluginError>;
158
159    async fn await_process(&self, process_id: &str) -> Result<ProcessAwaitOutput, PluginError>;
160
161    async fn complete_process(
162        &self,
163        process_id: &str,
164        await_output: ProcessAwaitOutput,
165    ) -> Result<ProcessRecord, PluginError>;
166
167    async fn set_process_wait(
168        &self,
169        process_id: &str,
170        wait: WaitState,
171    ) -> Result<ProcessRecord, PluginError>;
172
173    async fn clear_process_wait(&self, process_id: &str) -> Result<ProcessRecord, PluginError>;
174
175    async fn get_process(&self, process_id: &str) -> Option<ProcessRecord>;
176
177    async fn list_processes(
178        &self,
179        filter: &ProcessListFilter,
180    ) -> Result<Vec<ProcessRecord>, PluginError>;
181
182    async fn ack_wake(&self, process_id: &str, sequence: u64) -> Result<(), PluginError>;
183
184    /// All non-terminal process records, in stable `process_id` order.
185    ///
186    /// This is the recovery sweep's worklist: every process that was started
187    /// but has not reached a terminal event is a candidate for re-execution by
188    /// a [`DurableProcessWorker`](crate::DurableProcessWorker) after a crash.
189    /// Terminal processes are excluded — they are already done and idempotent by
190    /// `process_id`, so re-running them would be wasted work.
191    async fn list_non_terminal(&self) -> Result<Vec<ProcessRecord>, PluginError>;
192
193    /// Claim the durable single-owner lease over a non-terminal process.
194    ///
195    /// An unexpired lease held by a *different* owner fences the claim (returns
196    /// an error); claiming a free, expired, or own lease succeeds and bumps the
197    /// `fencing_token`. The returned [`ProcessLease`]'s
198    /// `(owner_id, lease_token)` plus `fencing_token` are the contract a worker
199    /// presents on every subsequent renew/complete — a stale writer is rejected.
200    async fn claim_process_lease(
201        &self,
202        process_id: &str,
203        owner_id: &str,
204        lease_ttl_ms: u64,
205    ) -> Result<ProcessLease, PluginError>;
206
207    /// Extend the expiry of a live lease the caller still owns.
208    ///
209    /// The lease must match the persisted `(owner, lease_token, fencing_token)`
210    /// and be unexpired, else the renewal is rejected (the lease was superseded
211    /// or expired). Workers renew across long-running effects so a healthy
212    /// process is not swept out from under its live owner.
213    async fn renew_process_lease(
214        &self,
215        lease: &ProcessLease,
216        lease_ttl_ms: u64,
217    ) -> Result<ProcessLease, PluginError>;
218
219    /// Release a lease the caller owns, fenced by the completion's
220    /// `(process_id, lease_token)`.
221    ///
222    /// Mirrors clearing a runtime turn lease: a stale completion (whose token no
223    /// longer matches the live lease) is a no-op so it cannot release a lease a
224    /// newer owner now holds. Idempotent — completing an already-released lease
225    /// succeeds.
226    async fn complete_process_lease(
227        &self,
228        completion: &ProcessLeaseCompletion,
229    ) -> Result<(), PluginError>;
230}