Skip to main content

lash_core/runtime/
process_work_driver.rs

1use std::sync::Arc;
2
3use super::DurableProcessWorker;
4use super::process::{
5    ProcessAttach, ProcessAwaiter, ProcessChangeHub, ProcessEvent, ProcessEventSink,
6    ProcessRegistry, watch_process_registry_with_sink,
7};
8use crate::{PluginError, ProcessAwaitOutput};
9
10/// Registry and run handle for process work owned outside
11/// [`LashCore`](https://docs.rs/lash/latest/lash/struct.LashCore.html).
12///
13/// The registry non-terminal rows are the durable work queue. Hosts drive that
14/// queue explicitly by calling [`claim_and_run_pending`](Self::claim_and_run_pending)
15/// on each relevant event. Cross-process idempotency belongs to the registry
16/// claim; there is no core-owned polling loop.
17#[derive(Clone)]
18pub struct ProcessWorkDriver {
19    registry: Arc<dyn ProcessRegistry>,
20    run_handle: Arc<dyn ProcessRunHandle>,
21    awaiter: ProcessAwaiter,
22    attach: Option<Arc<dyn ProcessAttach>>,
23    hub: ProcessChangeHub,
24}
25
26impl ProcessWorkDriver {
27    pub fn new(registry: Arc<dyn ProcessRegistry>, run_handle: Arc<dyn ProcessRunHandle>) -> Self {
28        Self::new_with_sink(registry, run_handle, None)
29    }
30
31    /// Like [`new`](Self::new), but installs a host-facing
32    /// [`ProcessEventSink`] on the registry decorator this driver wraps.
33    ///
34    /// The sink receives every appended event, best-effort, after its durable
35    /// write — see [`ProcessEventSink`] for the freshness-not-truth contract.
36    pub fn new_with_sink(
37        registry: Arc<dyn ProcessRegistry>,
38        run_handle: Arc<dyn ProcessRunHandle>,
39        sink: Option<Arc<dyn ProcessEventSink>>,
40    ) -> Self {
41        let (registry, hub) = watch_process_registry_with_sink(registry, sink);
42        Self::from_watched(registry, hub, run_handle)
43    }
44
45    pub fn from_watched(
46        registry: Arc<dyn ProcessRegistry>,
47        hub: ProcessChangeHub,
48        run_handle: Arc<dyn ProcessRunHandle>,
49    ) -> Self {
50        let awaiter = ProcessAwaiter::new(Arc::clone(&registry), hub.clone());
51        Self {
52            registry,
53            run_handle,
54            awaiter,
55            attach: None,
56            hub,
57        }
58    }
59
60    pub fn with_attach(mut self, attach: Arc<dyn ProcessAttach>) -> Self {
61        self.attach = Some(attach);
62        self
63    }
64
65    pub fn inline(registry: Arc<dyn ProcessRegistry>, worker: DurableProcessWorker) -> Self {
66        Self::new(registry, Arc::new(InlineProcessRunHandle::new(worker)))
67    }
68
69    pub fn process_registry(&self) -> Arc<dyn ProcessRegistry> {
70        Arc::clone(&self.registry)
71    }
72
73    pub fn change_hub(&self) -> ProcessChangeHub {
74        self.hub.clone()
75    }
76
77    pub fn awaiter(&self) -> ProcessAwaiter {
78        self.awaiter.clone()
79    }
80
81    /// Wait for `process_id` to reach a terminal state and return its outcome.
82    ///
83    /// This is the one way to wait on a started work item (ADR 0016): never a
84    /// raw registry poll loop. The mechanism matches the deployment — an
85    /// engine-native durable promise when a [`ProcessAttach`] is installed
86    /// (Restate ingress attach), otherwise the in-process change hub plus
87    /// bounded backoff point reads. An already-terminal process returns
88    /// immediately.
89    ///
90    /// Callers must bound the wait themselves: a process that never terminates
91    /// would otherwise pin the caller forever. Wrap it in
92    /// [`tokio::time::timeout`].
93    ///
94    /// ```no_run
95    /// use std::time::Duration;
96    /// use lash_core::{PluginError, ProcessWorkDriver};
97    ///
98    /// async fn wait(driver: &ProcessWorkDriver, process_id: &str) -> Result<(), PluginError> {
99    ///     match tokio::time::timeout(Duration::from_secs(30), driver.await_terminal(process_id)).await {
100    ///         Ok(Ok(output)) => {
101    ///             // Terminal outcome (success / failure / cancelled). To reconcile
102    ///             // the full event history, read `events_after(process_id, 0)`.
103    ///             let _ = output;
104    ///             Ok(())
105    ///         }
106    ///         Ok(Err(err)) => Err(err), // e.g. unknown process, or an attach error
107    ///         Err(_elapsed) => Ok(()),  // bound exceeded; retry or surface to the caller
108    ///     }
109    /// }
110    /// ```
111    pub async fn await_terminal(
112        &self,
113        process_id: &str,
114    ) -> Result<ProcessAwaitOutput, PluginError> {
115        let record = self
116            .registry
117            .get_process(process_id)
118            .await
119            .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
120        if let Some(output) = record.status.await_output() {
121            return Ok(output.clone());
122        }
123        if let Some(attach) = self.attach.as_ref() {
124            return attach.await_terminal(process_id).await;
125        }
126        self.awaiter.await_terminal(process_id).await
127    }
128
129    /// Wait for the first event of `event_type` on `process_id` with a sequence
130    /// greater than `after_sequence`, returning it once it appears.
131    ///
132    /// Like [`await_terminal`](Self::await_terminal) this rides the awaiter's
133    /// hub-plus-backoff point reads rather than a store poll loop, and callers
134    /// bound the wait with [`tokio::time::timeout`]. Historical events already
135    /// past `after_sequence` resolve immediately. This waits on a *non-terminal*
136    /// milestone; for completion use [`await_terminal`](Self::await_terminal).
137    pub async fn await_event(
138        &self,
139        process_id: &str,
140        event_type: &str,
141        after_sequence: u64,
142    ) -> Result<ProcessEvent, PluginError> {
143        self.awaiter
144            .await_event(process_id, event_type, after_sequence)
145            .await
146    }
147
148    pub async fn claim_and_run_pending(&self, reason: &str) -> Result<(), PluginError> {
149        if let Err(err) = self.run_handle.claim_and_run_pending().await {
150            tracing::warn!("process work drive ({reason}) failed: {err}");
151            return Err(err);
152        }
153        Ok(())
154    }
155}
156
157/// One lease-protected drive of the registry's pending (non-terminal) processes.
158///
159/// Implementations claim the single-owner [`ProcessLease`](crate::ProcessLease)
160/// per non-terminal row to fence execution, so a concurrent drive on another
161/// owner skips an already-leased process and a process runs exactly once.
162#[async_trait::async_trait]
163pub trait ProcessRunHandle: Send + Sync {
164    /// Claim and run every pending process this owner can claim, driving each to
165    /// a terminal state. Idempotent: leased and terminal rows are skipped.
166    async fn claim_and_run_pending(&self) -> Result<(), PluginError>;
167}
168
169/// Inline run handle: drives the worker's own lease-protected sweep in-process.
170///
171/// Delegates to [`DurableProcessWorker::drive_pending_processes`], the existing
172/// `list_non_terminal -> claim lease -> run -> complete -> release` loop, so the
173/// inline tier reuses the same coordination point as the durable tier.
174pub struct InlineProcessRunHandle {
175    worker: DurableProcessWorker,
176}
177
178impl InlineProcessRunHandle {
179    pub fn new(worker: DurableProcessWorker) -> Self {
180        Self { worker }
181    }
182}
183
184#[async_trait::async_trait]
185impl ProcessRunHandle for InlineProcessRunHandle {
186    async fn claim_and_run_pending(&self) -> Result<(), PluginError> {
187        self.worker.drive_pending_processes().await
188    }
189}