Skip to main content

lash_core/runtime/process/
registry.rs

1use crate::plugin::PluginError;
2
3use super::events::{
4    ProcessAwaitOutput, ProcessEvent, ProcessEventAppendRequest, ProcessEventAppendResult,
5};
6use super::model::{
7    ProcessExternalRef, ProcessHandleDescriptor, ProcessHandleGrant, ProcessHandleGrantEntry,
8    ProcessLease, ProcessLeaseCompletion, ProcessRecord, ProcessRegistration, ProcessScope,
9    ProcessSessionDeleteReport,
10};
11
12/// Durability-neutral process registry.
13#[async_trait::async_trait]
14pub trait ProcessRegistry: Send + Sync {
15    /// Durability tier this process registry provides; defaults to
16    /// [`DurabilityTier`](crate::DurabilityTier)`::Inline`.
17    fn durability_tier(&self) -> crate::DurabilityTier {
18        crate::DurabilityTier::Inline
19    }
20
21    async fn register_process(
22        &self,
23        registration: ProcessRegistration,
24    ) -> Result<ProcessRecord, PluginError>;
25
26    async fn set_external_ref(
27        &self,
28        process_id: &str,
29        external_ref: ProcessExternalRef,
30    ) -> Result<ProcessRecord, PluginError>;
31
32    async fn grant_handle(
33        &self,
34        owner_scope: &ProcessScope,
35        process_id: &str,
36        descriptor: ProcessHandleDescriptor,
37    ) -> Result<ProcessHandleGrant, PluginError>;
38
39    async fn revoke_handle(
40        &self,
41        owner_scope: &ProcessScope,
42        process_id: &str,
43    ) -> Result<(), PluginError>;
44
45    async fn transfer_handle_grants(
46        &self,
47        from_scope: &ProcessScope,
48        to_scope: &ProcessScope,
49        process_ids: &[String],
50    ) -> Result<(), PluginError>;
51
52    async fn list_handle_grants(
53        &self,
54        owner_scope: &ProcessScope,
55    ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError>;
56
57    async fn list_live_handle_grants(
58        &self,
59        owner_scope: &ProcessScope,
60    ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
61        Ok(self
62            .list_handle_grants(owner_scope)
63            .await?
64            .into_iter()
65            .filter(|(_, record)| !record.is_terminal())
66            .collect())
67    }
68
69    async fn has_handle_grant(
70        &self,
71        owner_scope: &ProcessScope,
72        process_id: &str,
73    ) -> Result<bool, PluginError> {
74        Ok(self
75            .list_handle_grants(owner_scope)
76            .await?
77            .into_iter()
78            .any(|(grant, _)| grant.process_id == process_id))
79    }
80
81    async fn handle_grants_for_process(
82        &self,
83        process_id: &str,
84    ) -> Result<Vec<ProcessHandleGrant>, PluginError>;
85
86    async fn delete_session_process_state(
87        &self,
88        session_id: &str,
89    ) -> Result<ProcessSessionDeleteReport, PluginError>;
90
91    async fn append_event(
92        &self,
93        process_id: &str,
94        request: ProcessEventAppendRequest,
95    ) -> Result<ProcessEventAppendResult, PluginError>;
96
97    async fn events_after(
98        &self,
99        process_id: &str,
100        after_sequence: u64,
101    ) -> Result<Vec<ProcessEvent>, PluginError>;
102
103    async fn wake_events_after(
104        &self,
105        process_id: &str,
106        after_sequence: u64,
107    ) -> Result<Vec<ProcessEvent>, PluginError>;
108
109    async fn wait_event_after(
110        &self,
111        process_id: &str,
112        event_type: &str,
113        after_sequence: u64,
114    ) -> Result<ProcessEvent, PluginError>;
115
116    async fn await_process(&self, process_id: &str) -> Result<ProcessAwaitOutput, PluginError>;
117
118    async fn complete_process(
119        &self,
120        process_id: &str,
121        await_output: ProcessAwaitOutput,
122    ) -> Result<ProcessRecord, PluginError>;
123
124    async fn get_process(&self, process_id: &str) -> Option<ProcessRecord>;
125
126    async fn ack_wake(&self, process_id: &str, sequence: u64) -> Result<(), PluginError>;
127
128    /// All non-terminal process records, in stable `process_id` order.
129    ///
130    /// This is the recovery sweep's worklist: every process that was started
131    /// but has not reached a terminal event is a candidate for re-execution by
132    /// a [`DurableProcessWorker`](crate::DurableProcessWorker) after a crash.
133    /// Terminal processes are excluded — they are already done and idempotent by
134    /// `process_id`, so re-running them would be wasted work.
135    async fn list_non_terminal(&self) -> Result<Vec<ProcessRecord>, PluginError>;
136
137    /// Claim the durable single-owner lease over a non-terminal process.
138    ///
139    /// An unexpired lease held by a *different* owner fences the claim (returns
140    /// an error); claiming a free, expired, or own lease succeeds and bumps the
141    /// `fencing_token`. The returned [`ProcessLease`]'s
142    /// `(owner_id, lease_token)` plus `fencing_token` are the contract a worker
143    /// presents on every subsequent renew/complete — a stale writer is rejected.
144    async fn claim_process_lease(
145        &self,
146        process_id: &str,
147        owner_id: &str,
148        lease_ttl_ms: u64,
149    ) -> Result<ProcessLease, PluginError>;
150
151    /// Extend the expiry of a live lease the caller still owns.
152    ///
153    /// The lease must match the persisted `(owner, lease_token, fencing_token)`
154    /// and be unexpired, else the renewal is rejected (the lease was superseded
155    /// or expired). Workers renew across long-running effects so a healthy
156    /// process is not swept out from under its live owner.
157    async fn renew_process_lease(
158        &self,
159        lease: &ProcessLease,
160        lease_ttl_ms: u64,
161    ) -> Result<ProcessLease, PluginError>;
162
163    /// Release a lease the caller owns, fenced by the completion's
164    /// `(process_id, lease_token)`.
165    ///
166    /// Mirrors clearing a runtime turn lease: a stale completion (whose token no
167    /// longer matches the live lease) is a no-op so it cannot release a lease a
168    /// newer owner now holds. Idempotent — completing an already-released lease
169    /// succeeds.
170    async fn complete_process_lease(
171        &self,
172        completion: &ProcessLeaseCompletion,
173    ) -> Result<(), PluginError>;
174}