Skip to main content

lash_core/runtime/process/
registry.rs

1use crate::plugin::PluginError;
2
3use super::events::{
4    ProcessAwaitOutput, ProcessEvent, ProcessEventAppendRequest, ProcessEventAppendResult,
5};
6use super::model::{
7    ProcessExternalRef, ProcessHandleDescriptor, ProcessHandleGrant, ProcessHandleGrantEntry,
8    ProcessLease, ProcessLeaseCompletion, ProcessListFilter, ProcessRecord, ProcessRegistration,
9    ProcessSessionDeleteReport, SessionScope, WaitState,
10};
11
12/// Durability-neutral process registry.
13#[async_trait::async_trait]
14pub trait ProcessRegistry: Send + Sync {
15    /// Durability tier this process registry provides; defaults to
16    /// [`DurabilityTier`](crate::DurabilityTier)`::Inline`.
17    fn durability_tier(&self) -> crate::DurabilityTier {
18        crate::DurabilityTier::Inline
19    }
20
21    async fn register_process(
22        &self,
23        registration: ProcessRegistration,
24    ) -> Result<ProcessRecord, PluginError>;
25
26    async fn set_external_ref(
27        &self,
28        process_id: &str,
29        external_ref: ProcessExternalRef,
30    ) -> Result<ProcessRecord, PluginError>;
31
32    async fn grant_handle(
33        &self,
34        session_scope: &SessionScope,
35        process_id: &str,
36        descriptor: ProcessHandleDescriptor,
37    ) -> Result<ProcessHandleGrant, PluginError>;
38
39    async fn revoke_handle(
40        &self,
41        session_scope: &SessionScope,
42        process_id: &str,
43    ) -> Result<(), PluginError>;
44
45    async fn transfer_handle_grants(
46        &self,
47        from_scope: &SessionScope,
48        to_scope: &SessionScope,
49        process_ids: &[String],
50    ) -> Result<(), PluginError>;
51
52    async fn list_handle_grants(
53        &self,
54        session_scope: &SessionScope,
55    ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError>;
56
57    async fn list_live_handle_grants(
58        &self,
59        session_scope: &SessionScope,
60    ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
61        Ok(self
62            .list_handle_grants(session_scope)
63            .await?
64            .into_iter()
65            .filter(|(_, record)| !record.is_terminal())
66            .collect())
67    }
68
69    async fn has_handle_grant(
70        &self,
71        session_scope: &SessionScope,
72        process_id: &str,
73    ) -> Result<bool, PluginError> {
74        Ok(self
75            .list_handle_grants(session_scope)
76            .await?
77            .into_iter()
78            .any(|(grant, _)| grant.process_id == process_id))
79    }
80
81    async fn handle_grants_for_process(
82        &self,
83        process_id: &str,
84    ) -> Result<Vec<ProcessHandleGrant>, PluginError>;
85
86    async fn delete_session_process_state(
87        &self,
88        session_id: &str,
89    ) -> Result<ProcessSessionDeleteReport, PluginError>;
90
91    async fn append_event(
92        &self,
93        process_id: &str,
94        request: ProcessEventAppendRequest,
95    ) -> Result<ProcessEventAppendResult, PluginError>;
96
97    async fn events_after(
98        &self,
99        process_id: &str,
100        after_sequence: u64,
101    ) -> Result<Vec<ProcessEvent>, PluginError>;
102
103    async fn wake_events_after(
104        &self,
105        process_id: &str,
106        after_sequence: u64,
107    ) -> Result<Vec<ProcessEvent>, PluginError>;
108
109    async fn wait_event_after(
110        &self,
111        process_id: &str,
112        event_type: &str,
113        after_sequence: u64,
114    ) -> Result<ProcessEvent, PluginError>;
115
116    async fn await_process(&self, process_id: &str) -> Result<ProcessAwaitOutput, PluginError>;
117
118    async fn complete_process(
119        &self,
120        process_id: &str,
121        await_output: ProcessAwaitOutput,
122    ) -> Result<ProcessRecord, PluginError>;
123
124    async fn set_process_wait(
125        &self,
126        process_id: &str,
127        wait: WaitState,
128    ) -> Result<ProcessRecord, PluginError>;
129
130    async fn clear_process_wait(&self, process_id: &str) -> Result<ProcessRecord, PluginError>;
131
132    async fn get_process(&self, process_id: &str) -> Option<ProcessRecord>;
133
134    async fn list_processes(
135        &self,
136        filter: &ProcessListFilter,
137    ) -> Result<Vec<ProcessRecord>, PluginError>;
138
139    async fn ack_wake(&self, process_id: &str, sequence: u64) -> Result<(), PluginError>;
140
141    /// All non-terminal process records, in stable `process_id` order.
142    ///
143    /// This is the recovery sweep's worklist: every process that was started
144    /// but has not reached a terminal event is a candidate for re-execution by
145    /// a [`DurableProcessWorker`](crate::DurableProcessWorker) after a crash.
146    /// Terminal processes are excluded — they are already done and idempotent by
147    /// `process_id`, so re-running them would be wasted work.
148    async fn list_non_terminal(&self) -> Result<Vec<ProcessRecord>, PluginError>;
149
150    /// Claim the durable single-owner lease over a non-terminal process.
151    ///
152    /// An unexpired lease held by a *different* owner fences the claim (returns
153    /// an error); claiming a free, expired, or own lease succeeds and bumps the
154    /// `fencing_token`. The returned [`ProcessLease`]'s
155    /// `(owner_id, lease_token)` plus `fencing_token` are the contract a worker
156    /// presents on every subsequent renew/complete — a stale writer is rejected.
157    async fn claim_process_lease(
158        &self,
159        process_id: &str,
160        owner_id: &str,
161        lease_ttl_ms: u64,
162    ) -> Result<ProcessLease, PluginError>;
163
164    /// Extend the expiry of a live lease the caller still owns.
165    ///
166    /// The lease must match the persisted `(owner, lease_token, fencing_token)`
167    /// and be unexpired, else the renewal is rejected (the lease was superseded
168    /// or expired). Workers renew across long-running effects so a healthy
169    /// process is not swept out from under its live owner.
170    async fn renew_process_lease(
171        &self,
172        lease: &ProcessLease,
173        lease_ttl_ms: u64,
174    ) -> Result<ProcessLease, PluginError>;
175
176    /// Release a lease the caller owns, fenced by the completion's
177    /// `(process_id, lease_token)`.
178    ///
179    /// Mirrors clearing a runtime turn lease: a stale completion (whose token no
180    /// longer matches the live lease) is a no-op so it cannot release a lease a
181    /// newer owner now holds. Idempotent — completing an already-released lease
182    /// succeeds.
183    async fn complete_process_lease(
184        &self,
185        completion: &ProcessLeaseCompletion,
186    ) -> Result<(), PluginError>;
187}