1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
use std::sync::Arc;
use super::DurableProcessWorker;
use super::process::{
ProcessAttach, ProcessAwaiter, ProcessChangeHub, ProcessEvent, ProcessEventSink,
ProcessRegistry, watch_process_registry_with_sink,
};
use crate::{PluginError, ProcessAwaitOutput};
/// Registry and run handle for process work owned outside
/// [`LashCore`](https://docs.rs/lash/latest/lash/struct.LashCore.html).
///
/// The registry non-terminal rows are the durable work queue. Hosts drive that
/// queue explicitly by calling [`claim_and_run_pending`](Self::claim_and_run_pending)
/// on each relevant event. Cross-process idempotency belongs to the registry
/// claim; there is no core-owned polling loop.
#[derive(Clone)]
pub struct ProcessWorkDriver {
registry: Arc<dyn ProcessRegistry>,
run_handle: Arc<dyn ProcessRunHandle>,
awaiter: ProcessAwaiter,
attach: Option<Arc<dyn ProcessAttach>>,
hub: ProcessChangeHub,
}
impl ProcessWorkDriver {
pub fn new(registry: Arc<dyn ProcessRegistry>, run_handle: Arc<dyn ProcessRunHandle>) -> Self {
Self::new_with_sink(registry, run_handle, None)
}
/// Like [`new`](Self::new), but installs a host-facing
/// [`ProcessEventSink`] on the registry decorator this driver wraps.
///
/// The sink receives every appended event, best-effort, after its durable
/// write — see [`ProcessEventSink`] for the freshness-not-truth contract.
pub fn new_with_sink(
registry: Arc<dyn ProcessRegistry>,
run_handle: Arc<dyn ProcessRunHandle>,
sink: Option<Arc<dyn ProcessEventSink>>,
) -> Self {
let (registry, hub) = watch_process_registry_with_sink(registry, sink);
Self::from_watched(registry, hub, run_handle)
}
pub fn from_watched(
registry: Arc<dyn ProcessRegistry>,
hub: ProcessChangeHub,
run_handle: Arc<dyn ProcessRunHandle>,
) -> Self {
let awaiter = ProcessAwaiter::new(Arc::clone(®istry), hub.clone());
Self {
registry,
run_handle,
awaiter,
attach: None,
hub,
}
}
pub fn with_attach(mut self, attach: Arc<dyn ProcessAttach>) -> Self {
self.attach = Some(attach);
self
}
pub fn inline(registry: Arc<dyn ProcessRegistry>, worker: DurableProcessWorker) -> Self {
Self::new(registry, Arc::new(InlineProcessRunHandle::new(worker)))
}
pub fn process_registry(&self) -> Arc<dyn ProcessRegistry> {
Arc::clone(&self.registry)
}
pub fn change_hub(&self) -> ProcessChangeHub {
self.hub.clone()
}
pub fn awaiter(&self) -> ProcessAwaiter {
self.awaiter.clone()
}
/// Wait for `process_id` to reach a terminal state and return its outcome.
///
/// This is the one way to wait on a started work item (ADR 0016): never a
/// raw registry poll loop. The mechanism matches the deployment — an
/// engine-native durable promise when a [`ProcessAttach`] is installed
/// (Restate ingress attach), otherwise the in-process change hub plus
/// bounded backoff point reads. An already-terminal process returns
/// immediately.
///
/// Callers must bound the wait themselves: a process that never terminates
/// would otherwise pin the caller forever. Wrap it in
/// [`tokio::time::timeout`].
///
/// ```no_run
/// use std::time::Duration;
/// use lash_core::{PluginError, ProcessWorkDriver};
///
/// async fn wait(driver: &ProcessWorkDriver, process_id: &str) -> Result<(), PluginError> {
/// match tokio::time::timeout(Duration::from_secs(30), driver.await_terminal(process_id)).await {
/// Ok(Ok(output)) => {
/// // Terminal outcome (success / failure / cancelled). To reconcile
/// // the full event history, read `events_after(process_id, 0)`.
/// let _ = output;
/// Ok(())
/// }
/// Ok(Err(err)) => Err(err), // e.g. unknown process, or an attach error
/// Err(_elapsed) => Ok(()), // bound exceeded; retry or surface to the caller
/// }
/// }
/// ```
pub async fn await_terminal(
&self,
process_id: &str,
) -> Result<ProcessAwaitOutput, PluginError> {
let record = self
.registry
.get_process(process_id)
.await
.ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
if let Some(output) = record.status.await_output() {
return Ok(output.clone());
}
if let Some(attach) = self.attach.as_ref() {
return attach.await_terminal(process_id).await;
}
self.awaiter.await_terminal(process_id).await
}
/// Wait for the first event of `event_type` on `process_id` with a sequence
/// greater than `after_sequence`, returning it once it appears.
///
/// Like [`await_terminal`](Self::await_terminal) this rides the awaiter's
/// hub-plus-backoff point reads rather than a store poll loop, and callers
/// bound the wait with [`tokio::time::timeout`]. Historical events already
/// past `after_sequence` resolve immediately. This waits on a *non-terminal*
/// milestone; for completion use [`await_terminal`](Self::await_terminal).
pub async fn await_event(
&self,
process_id: &str,
event_type: &str,
after_sequence: u64,
) -> Result<ProcessEvent, PluginError> {
self.awaiter
.await_event(process_id, event_type, after_sequence)
.await
}
pub async fn claim_and_run_pending(&self, reason: &str) -> Result<(), PluginError> {
if let Err(err) = self.run_handle.claim_and_run_pending().await {
tracing::warn!("process work drive ({reason}) failed: {err}");
return Err(err);
}
Ok(())
}
}
/// One lease-protected drive of the registry's pending (non-terminal) processes.
///
/// Implementations claim the single-owner [`ProcessLease`](crate::ProcessLease)
/// per non-terminal row to fence execution, so a concurrent drive on another
/// owner skips an already-leased process and a process runs exactly once.
#[async_trait::async_trait]
pub trait ProcessRunHandle: Send + Sync {
/// Claim and run every pending process this owner can claim, driving each to
/// a terminal state. Idempotent: leased and terminal rows are skipped.
async fn claim_and_run_pending(&self) -> Result<(), PluginError>;
}
/// Inline run handle: drives the worker's own lease-protected sweep in-process.
///
/// Delegates to [`DurableProcessWorker::drive_pending_processes`], the existing
/// `list_non_terminal -> claim lease -> run -> complete -> release` loop, so the
/// inline tier reuses the same coordination point as the durable tier.
pub struct InlineProcessRunHandle {
worker: DurableProcessWorker,
}
impl InlineProcessRunHandle {
pub fn new(worker: DurableProcessWorker) -> Self {
Self { worker }
}
}
#[async_trait::async_trait]
impl ProcessRunHandle for InlineProcessRunHandle {
async fn claim_and_run_pending(&self) -> Result<(), PluginError> {
self.worker.drive_pending_processes().await
}
}