lash_core/runtime/process_work_driver.rs
1use std::sync::Arc;
2
3use super::DurableProcessWorker;
4use super::process::{
5 ProcessAttach, ProcessAwaiter, ProcessChangeHub, ProcessEvent, ProcessEventSink,
6 ProcessRegistry, watch_process_registry_with_sink,
7};
8use crate::{PluginError, ProcessAwaitOutput};
9
10/// Registry and run handle for process work owned outside
11/// [`LashCore`](https://docs.rs/lash/latest/lash/struct.LashCore.html).
12///
13/// The registry non-terminal rows are the durable work queue. Hosts drive that
14/// queue explicitly by calling [`claim_and_run_pending`](Self::claim_and_run_pending)
15/// on each relevant event. Cross-process idempotency belongs to the registry
16/// claim; there is no core-owned polling loop.
17#[derive(Clone)]
18pub struct ProcessWorkDriver {
19 registry: Arc<dyn ProcessRegistry>,
20 run_handle: Arc<dyn ProcessRunHandle>,
21 awaiter: ProcessAwaiter,
22 attach: Option<Arc<dyn ProcessAttach>>,
23 hub: ProcessChangeHub,
24}
25
26impl ProcessWorkDriver {
27 pub fn new(registry: Arc<dyn ProcessRegistry>, run_handle: Arc<dyn ProcessRunHandle>) -> Self {
28 Self::new_with_sink(registry, run_handle, None)
29 }
30
31 /// Like [`new`](Self::new), but installs a host-facing
32 /// [`ProcessEventSink`] on the registry decorator this driver wraps.
33 ///
34 /// The sink receives every appended event, best-effort, after its durable
35 /// write — see [`ProcessEventSink`] for the freshness-not-truth contract.
36 pub fn new_with_sink(
37 registry: Arc<dyn ProcessRegistry>,
38 run_handle: Arc<dyn ProcessRunHandle>,
39 sink: Option<Arc<dyn ProcessEventSink>>,
40 ) -> Self {
41 let (registry, hub) = watch_process_registry_with_sink(registry, sink);
42 Self::from_watched(registry, hub, run_handle)
43 }
44
45 pub fn from_watched(
46 registry: Arc<dyn ProcessRegistry>,
47 hub: ProcessChangeHub,
48 run_handle: Arc<dyn ProcessRunHandle>,
49 ) -> Self {
50 let awaiter = ProcessAwaiter::new(Arc::clone(®istry), hub.clone());
51 Self {
52 registry,
53 run_handle,
54 awaiter,
55 attach: None,
56 hub,
57 }
58 }
59
60 pub fn with_attach(mut self, attach: Arc<dyn ProcessAttach>) -> Self {
61 self.attach = Some(attach);
62 self
63 }
64
65 pub fn inline(registry: Arc<dyn ProcessRegistry>, worker: DurableProcessWorker) -> Self {
66 Self::new(registry, Arc::new(InlineProcessRunHandle::new(worker)))
67 }
68
69 pub fn process_registry(&self) -> Arc<dyn ProcessRegistry> {
70 Arc::clone(&self.registry)
71 }
72
73 pub fn change_hub(&self) -> ProcessChangeHub {
74 self.hub.clone()
75 }
76
77 pub fn awaiter(&self) -> ProcessAwaiter {
78 self.awaiter.clone()
79 }
80
81 /// Wait for `process_id` to reach a terminal state and return its outcome.
82 ///
83 /// This is the one way to wait on a started work item (ADR 0016): never a
84 /// raw registry poll loop. The mechanism matches the deployment — an
85 /// engine-native durable promise when a [`ProcessAttach`] is installed
86 /// (Restate ingress attach), otherwise the in-process change hub plus
87 /// bounded backoff point reads. An already-terminal process returns
88 /// immediately.
89 ///
90 /// Callers must bound the wait themselves: a process that never terminates
91 /// would otherwise pin the caller forever. Wrap it in
92 /// [`tokio::time::timeout`].
93 ///
94 /// ```no_run
95 /// use std::time::Duration;
96 /// use lash_core::{PluginError, ProcessWorkDriver};
97 ///
98 /// async fn wait(driver: &ProcessWorkDriver, process_id: &str) -> Result<(), PluginError> {
99 /// match tokio::time::timeout(Duration::from_secs(30), driver.await_terminal(process_id)).await {
100 /// Ok(Ok(output)) => {
101 /// // Terminal outcome (success / failure / cancelled). To reconcile
102 /// // the full event history, read `events_after(process_id, 0)`.
103 /// let _ = output;
104 /// Ok(())
105 /// }
106 /// Ok(Err(err)) => Err(err), // e.g. unknown process, or an attach error
107 /// Err(_elapsed) => Ok(()), // bound exceeded; retry or surface to the caller
108 /// }
109 /// }
110 /// ```
111 pub async fn await_terminal(
112 &self,
113 process_id: &str,
114 ) -> Result<ProcessAwaitOutput, PluginError> {
115 let record = self
116 .registry
117 .get_process(process_id)
118 .await
119 .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
120 if let Some(output) = record.status.await_output() {
121 return Ok(output.clone());
122 }
123 if let Some(attach) = self.attach.as_ref() {
124 return attach.await_terminal(process_id).await;
125 }
126 self.awaiter.await_terminal(process_id).await
127 }
128
129 /// Wait for the first event of `event_type` on `process_id` with a sequence
130 /// greater than `after_sequence`, returning it once it appears.
131 ///
132 /// Like [`await_terminal`](Self::await_terminal) this rides the awaiter's
133 /// hub-plus-backoff point reads rather than a store poll loop, and callers
134 /// bound the wait with [`tokio::time::timeout`]. Historical events already
135 /// past `after_sequence` resolve immediately. This waits on a *non-terminal*
136 /// milestone; for completion use [`await_terminal`](Self::await_terminal).
137 pub async fn await_event(
138 &self,
139 process_id: &str,
140 event_type: &str,
141 after_sequence: u64,
142 ) -> Result<ProcessEvent, PluginError> {
143 self.awaiter
144 .await_event(process_id, event_type, after_sequence)
145 .await
146 }
147
148 pub async fn claim_and_run_pending(&self, reason: &str) -> Result<(), PluginError> {
149 if let Err(err) = self.run_handle.claim_and_run_pending().await {
150 tracing::warn!("process work drive ({reason}) failed: {err}");
151 return Err(err);
152 }
153 Ok(())
154 }
155}
156
157/// One lease-protected drive of the registry's pending (non-terminal) processes.
158///
159/// Implementations claim the single-owner [`ProcessLease`](crate::ProcessLease)
160/// per non-terminal row to fence execution, so a concurrent drive on another
161/// owner skips an already-leased process and a process runs exactly once.
162#[async_trait::async_trait]
163pub trait ProcessRunHandle: Send + Sync {
164 /// Claim and run every pending process this owner can claim, driving each to
165 /// a terminal state. Idempotent: leased and terminal rows are skipped.
166 async fn claim_and_run_pending(&self) -> Result<(), PluginError>;
167}
168
169/// Inline run handle: drives the worker's own lease-protected sweep in-process.
170///
171/// Delegates to [`DurableProcessWorker::drive_pending_processes`], the existing
172/// `list_non_terminal -> claim lease -> run -> complete -> release` loop, so the
173/// inline tier reuses the same coordination point as the durable tier.
174pub struct InlineProcessRunHandle {
175 worker: DurableProcessWorker,
176}
177
178impl InlineProcessRunHandle {
179 pub fn new(worker: DurableProcessWorker) -> Self {
180 Self { worker }
181 }
182}
183
184#[async_trait::async_trait]
185impl ProcessRunHandle for InlineProcessRunHandle {
186 async fn claim_and_run_pending(&self) -> Result<(), PluginError> {
187 self.worker.drive_pending_processes().await
188 }
189}