Skip to main content

lash_core/runtime/process_worker/
mod.rs

1use std::sync::Arc;
2
3use tokio_util::sync::CancellationToken;
4
5use super::effect::ProcessRunner;
6use super::session_manager::RuntimeSessionServices;
7use super::{EmbeddedRuntimeBuilder, ProcessWorkDriver, QueuedWorkDriver, RuntimeHostConfig};
8use crate::{
9    AbandonEvidence, AbandonWriter, InMemorySessionStore, LashRuntime, PluginError, PluginFactory,
10    PluginHost, PluginStack, ProcessAwaitOutput, ProcessExecutionContext, ProcessInput,
11    ProcessLease, ProcessLeaseCompletion, ProcessRecord, ProcessRegistration, ProcessRegistry,
12    RecoveryDisposition, SessionStoreFactory,
13};
14
15/// Deployment-local configuration for rebuilding durable process executions.
16///
17/// Process rows intentionally carry only portable process input and provenance.
18/// Workers provide plugins, providers, stores, secrets, and host capabilities
19/// for the deployment that owns those rows.
20#[derive(Clone)]
21pub struct DurableProcessWorkerConfig {
22    pub plugin_host: Arc<PluginHost>,
23    pub runtime_host: RuntimeHostConfig,
24    pub session_policy: crate::SessionPolicy,
25    pub session_store_factory: Arc<dyn SessionStoreFactory>,
26    pub process_registry: Arc<dyn ProcessRegistry>,
27    pub process_change_hub: Option<crate::ProcessChangeHub>,
28    pub trigger_store: Arc<dyn crate::TriggerStore>,
29    pub process_work_driver: Option<ProcessWorkDriver>,
30    pub queued_work_driver: Option<QueuedWorkDriver>,
31    #[doc(hidden)]
32    pub turn_phase_probe_slot: crate::runtime::RuntimeTurnPhaseProbeSlot,
33    /// Residency for sessions the worker rebuilds to run a process. Defaults to
34    /// [`Residency::KeepAll`]; a host running [`Residency::ActivePathOnly`] wires
35    /// it here so the worker's rebuilt sessions trim to the active path too,
36    /// instead of silently diverging from the live runtime by keeping the full
37    /// graph resident.
38    pub residency: crate::Residency,
39    /// Owner identity stem this worker derives per-recovery lease owners from.
40    ///
41    /// Each recovery attempt claims with a unique `(owner_id, incarnation_id)`
42    /// derived from this identity — a live lease held by an earlier attempt
43    /// must fence a later sweep pass rather than be re-entered as the same
44    /// incarnation — while the liveness metadata is inherited as-is. Defaults
45    /// to a fresh opaque identity per config. Hosts that run one worker per OS
46    /// process should wire a
47    /// [`LeaseOwnerIdentity::local_process`](crate::LeaseOwnerIdentity::local_process)
48    /// identity so peers on the same host can prove a crashed worker dead and
49    /// reclaim its process leases before the TTL — mirroring the session
50    /// execution lane's runtime lease owner.
51    pub lease_owner: crate::LeaseOwnerIdentity,
52}
53
54impl DurableProcessWorkerConfig {
55    pub fn new(
56        plugin_host: Arc<PluginHost>,
57        runtime_host: RuntimeHostConfig,
58        session_store_factory: Arc<dyn SessionStoreFactory>,
59        process_registry: Arc<dyn ProcessRegistry>,
60    ) -> Self {
61        let clock = Arc::clone(&runtime_host.clock);
62        Self {
63            plugin_host,
64            runtime_host,
65            session_policy: crate::SessionPolicy::default(),
66            session_store_factory,
67            process_registry,
68            process_change_hub: None,
69            trigger_store: Arc::new(crate::InMemoryTriggerStore::with_clock(clock)),
70            process_work_driver: None,
71            queued_work_driver: None,
72            turn_phase_probe_slot: crate::runtime::RuntimeTurnPhaseProbeSlot::default(),
73            residency: crate::Residency::default(),
74            lease_owner: crate::LeaseOwnerIdentity::opaque(
75                format!("durable-process-worker:{}", uuid::Uuid::new_v4()),
76                uuid::Uuid::new_v4().to_string(),
77            ),
78        }
79    }
80
81    pub fn with_trigger_store(mut self, store: Arc<dyn crate::TriggerStore>) -> Self {
82        self.trigger_store = store;
83        self
84    }
85
86    pub fn with_session_policy(mut self, policy: crate::SessionPolicy) -> Self {
87        self.session_policy = policy;
88        self
89    }
90
91    pub fn with_residency(mut self, residency: crate::Residency) -> Self {
92        self.residency = residency;
93        self
94    }
95
96    pub fn with_process_work_driver(mut self, driver: ProcessWorkDriver) -> Self {
97        self.process_work_driver = Some(driver);
98        self
99    }
100
101    pub fn with_change_hub(mut self, hub: crate::ProcessChangeHub) -> Self {
102        self.process_change_hub = Some(hub);
103        self
104    }
105
106    /// Set the owner identity this worker presents when claiming process
107    /// leases. See [`DurableProcessWorkerConfig::lease_owner`].
108    pub fn with_lease_owner(mut self, lease_owner: crate::LeaseOwnerIdentity) -> Self {
109        self.lease_owner = lease_owner;
110        self
111    }
112
113    pub fn with_queued_work_driver(mut self, driver: QueuedWorkDriver) -> Self {
114        self.queued_work_driver = Some(driver);
115        self
116    }
117
118    #[doc(hidden)]
119    pub fn with_turn_phase_probe_slot(
120        mut self,
121        slot: crate::runtime::RuntimeTurnPhaseProbeSlot,
122    ) -> Self {
123        self.turn_phase_probe_slot = slot;
124        self
125    }
126
127    pub fn from_plugin_factories(
128        plugin_factories: impl IntoIterator<Item = Arc<dyn PluginFactory>>,
129        runtime_host: RuntimeHostConfig,
130        session_store_factory: Arc<dyn SessionStoreFactory>,
131        process_registry: Arc<dyn ProcessRegistry>,
132    ) -> Self {
133        Self::new(
134            Arc::new(PluginHost::new(plugin_factories.into_iter().collect())),
135            runtime_host,
136            session_store_factory,
137            process_registry,
138        )
139    }
140
141    pub fn from_plugin_stack(
142        plugin_stack: PluginStack,
143        runtime_host: RuntimeHostConfig,
144        session_store_factory: Arc<dyn SessionStoreFactory>,
145        process_registry: Arc<dyn ProcessRegistry>,
146    ) -> Self {
147        Self::from_plugin_factories(
148            plugin_stack.into_factories(),
149            runtime_host,
150            session_store_factory,
151            process_registry,
152        )
153    }
154}
155
156/// Reconstructable background-process worker.
157#[derive(Clone)]
158pub struct DurableProcessWorker {
159    config: Arc<DurableProcessWorkerConfig>,
160}
161
162/// Report from a graceful [owner drain](DurableProcessWorker::drain_owner_bound_work).
163#[derive(Clone, Debug, Default, PartialEq, Eq)]
164pub struct ProcessDrainReport {
165    /// Process ids this host's own started `OwnerBound` work was terminalized as
166    /// `Abandoned{OwnerDrain}` on, in the order they were drained.
167    pub abandoned: Vec<String>,
168}
169
170/// Why a recovery run did not produce a terminal outcome under the lease.
171enum RecoverFailure {
172    /// The lease was lost mid-run (another owner reclaimed an expired lease).
173    /// The losing worker must not write a terminal outcome — the new owner is
174    /// now the single writer.
175    LeaseLost(PluginError),
176    /// The process could not be run (rebuild/store-facet failure). The lease is
177    /// still held, so this worker terminalizes the row.
178    Run(PluginError),
179}
180
181impl DurableProcessWorker {
182    pub fn new(config: DurableProcessWorkerConfig) -> Self {
183        Self {
184            config: Arc::new(config),
185        }
186    }
187
188    pub fn from_shared_config(config: Arc<DurableProcessWorkerConfig>) -> Self {
189        Self { config }
190    }
191
192    pub fn config(&self) -> &DurableProcessWorkerConfig {
193        &self.config
194    }
195
196    pub async fn run_process(
197        &self,
198        registration: ProcessRegistration,
199        execution_context: ProcessExecutionContext,
200        cancellation: CancellationToken,
201    ) -> Result<ProcessAwaitOutput, PluginError> {
202        let scoped_effect_controller = self
203            .config
204            .runtime_host
205            .control
206            .effect_host
207            .scoped_static(crate::ExecutionScope::process(registration.id.clone()))
208            .map_err(|err| PluginError::Session(err.to_string()))?
209            .ok_or_else(|| {
210                PluginError::Session(
211                    "process worker effect host must provide a static process scope".to_string(),
212                )
213            })?;
214        self.run_process_with_scoped_effect_controller(
215            registration,
216            execution_context,
217            scoped_effect_controller,
218            cancellation,
219        )
220        .await
221    }
222
223    pub async fn run_process_with_scoped_effect_controller(
224        &self,
225        registration: ProcessRegistration,
226        execution_context: ProcessExecutionContext,
227        scoped_effect_controller: crate::ScopedEffectController<'_>,
228        cancellation: CancellationToken,
229    ) -> Result<ProcessAwaitOutput, PluginError> {
230        self.ensure_stable_process_id(&registration)?;
231        self.ensure_durable_store_facets()?;
232        // Externally-owned rows are never executed by lash (ADR 0019). Reject the
233        // disposition before touching a runtime — the old fabricated-success path
234        // for External inputs is deleted.
235        if registration.disposition == RecoveryDisposition::ExternallyOwned {
236            return Err(PluginError::Session(format!(
237                "process `{}` is externally-owned and must not be executed by lash",
238                registration.id
239            )));
240        }
241        // Durable, lease-fenced "execution started" fact: recorded immediately
242        // before executing so a later sweep can distinguish a started OwnerBound
243        // row (never re-run) from an unstarted one (runnable by anyone). This is
244        // the shared run path both the inline sweep and the Restate run handler
245        // funnel through. First-writer-wins, so a re-invocation is a no-op.
246        self.config
247            .process_registry
248            .record_first_started(
249                &registration.id,
250                crate::ProcessStarted {
251                    owner: self.config.lease_owner.clone(),
252                    started_at_ms: self.now_ms(),
253                },
254            )
255            .await?;
256        let mut runtime = self.runtime_for_registration(&registration).await?;
257        let originator_scope = if let crate::ProcessOriginator::Session { scope } =
258            &registration.provenance.originator
259        {
260            Some(scope)
261        } else {
262            None
263        };
264        let probe_scope = registration.wake_target.as_ref().or(originator_scope);
265        if let Some(probe) =
266            probe_scope.and_then(|scope| self.config.turn_phase_probe_slot.get_for_scope(scope))
267        {
268            runtime.set_turn_phase_probe(probe);
269        }
270        let manager = RuntimeSessionServices::new(&runtime, true, None).map_err(|err| {
271            PluginError::Session(format!(
272                "failed to build runtime env for process `{}`: {err}",
273                registration.id
274            ))
275        })?;
276        Ok(manager
277            .run_process(
278                registration,
279                execution_context,
280                Arc::clone(&self.config.process_registry),
281                scoped_effect_controller,
282                cancellation,
283            )
284            .await)
285    }
286
287    /// Sweep the registry for non-terminal processes and re-execute the ones
288    /// this worker can claim, driving each to a terminal state.
289    ///
290    /// This is the crash-recovery counterpart to a worker that ran a process
291    /// from a live turn: a trigger/trigger-started process whose worker
292    /// died mid-flight is left non-terminal in the registry, and a subsequent
293    /// worker reopening that registry must finish it. The sweep:
294    ///
295    /// 1. lists every non-terminal process ([`ProcessRegistry::list_non_terminal`]);
296    /// 2. claims the durable single-owner [`ProcessLease`] over each — a process
297    ///    already leased live by *another* owner is skipped (it is being run by
298    ///    that owner right now) unless persisted liveness metadata proves that
299    ///    owner definitely dead, in which case the lease is reclaimed with the
300    ///    fenced CAS discipline of
301    ///    [`ProcessRegistry::reclaim_process_lease`]; either way a non-terminal
302    ///    process is re-run by exactly one owner (lease fencing);
303    /// 3. runs the claimed process on this worker's wired controller, renewing
304    ///    the lease across the long-running execution so a healthy recovery is
305    ///    not swept out from under itself;
306    /// 4. writes the terminal outcome and releases the lease.
307    ///
308    /// Idempotent by `process_id`: terminal processes are never in the worklist,
309    /// and a process that became terminal between the list and the claim is
310    /// detected after claiming and skipped, so re-running a recovery sweep does
311    /// not double-execute completed work.
312    pub async fn drive_pending_processes(&self) -> Result<(), PluginError> {
313        let records = self.config.process_registry.list_non_terminal().await?;
314        for record in records {
315            // Run each claimed process on its OWN lease-fenced task. A sequential
316            // drive that awaited each process to terminal would deadlock a process
317            // that blocks awaiting a nested child (`start child` then `await`, or a
318            // subagent fan-out): the one drive task would park inside the parent's
319            // await and never claim the child. Spawning frees the loop so a
320            // subsequent host-driven pass claims and runs the child, and the
321            // per-process `ProcessLease` fences concurrent owners — so spawning a
322            // task per pending row on every drive is idempotent (a row already
323            // running is skipped on claim conflict) and one failing row never
324            // aborts the rest of the sweep.
325            let worker = self.clone();
326            tokio::spawn(async move { worker.recover_process(record).await });
327        }
328        Ok(())
329    }
330
331    /// Graceful owner drain: terminalize this host's own started `OwnerBound`
332    /// work as `Abandoned{OwnerDrain}` at close (ADR 0019).
333    ///
334    /// This is an explicit **host lever on the worker**, never an implicit
335    /// consequence of closing a session. Processes are global and outlive any
336    /// one session ([ADR 0011]), so `LashSession::close`/`park` must not touch
337    /// them; a host that wants its in-flight owner-bound work terminalized at
338    /// shutdown calls this on the worker it is tearing down.
339    ///
340    /// Drain sequence (the operations runbook owns the surrounding steps; this
341    /// is the terminal-writing step):
342    /// 1. stop admitting new work to this worker;
343    /// 2. cancel or await the worker's in-flight run tasks so they release their
344    ///    per-run leases — for **Rerunnable** in-flight work that is the whole
345    ///    story: stopping the local run task without any terminal write leaves
346    ///    the row non-terminal so the next worker re-runs it (its contract);
347    /// 3. call this lever: for every non-terminal **OwnerBound** row this exact
348    ///    worker started (`first_started.owner == self.config.lease_owner`),
349    ///    claim a fresh drain lease and, being the owner completing its own
350    ///    work, write `Abandoned{OwnerDrain}` under it — the ordinary graceful
351    ///    completion path, respecting the single-writer rule.
352    ///
353    /// A row still held by a live foreign lease (an in-flight run under one of
354    /// this worker's own recovery incarnations that step 2 has not yet released)
355    /// is deferred rather than reclaimed, so the drain never races a still-live
356    /// run; such a row reaches `Abandoned` on the next drain pass or at a peer's
357    /// recovery sweep. Rows started by a different owner, not-yet-started
358    /// OwnerBound rows (still claimable by anyone), Rerunnable rows, and
359    /// Externally-Owned rows are all left untouched.
360    ///
361    /// [ADR 0011]: durable process registration is session-independent.
362    pub async fn drain_owner_bound_work(&self) -> Result<ProcessDrainReport, PluginError> {
363        let mut abandoned = Vec::new();
364        for record in self.config.process_registry.list_non_terminal().await? {
365            if record.disposition != RecoveryDisposition::OwnerBound {
366                continue;
367            }
368            let Some(first_started) = record.first_started.as_ref() else {
369                // Never started: first execution is not re-execution, so any
370                // worker may still claim it. Draining it would strand runnable
371                // work as Abandoned.
372                continue;
373            };
374            if first_started.owner != self.config.lease_owner {
375                // Started by a different owner; not this host's to drain.
376                continue;
377            }
378            let owner = first_started.owner.clone();
379            if self.drain_one_owner_bound(&record.id, owner).await {
380                abandoned.push(record.id);
381            }
382        }
383        Ok(ProcessDrainReport { abandoned })
384    }
385
386    /// Terminalize one of this host's started OwnerBound rows as
387    /// `Abandoned{OwnerDrain}` under a freshly claimed drain lease. Returns
388    /// whether the terminal was written (`false` when the row is held by a live
389    /// foreign lease, already terminal, or the claim failed).
390    async fn drain_one_owner_bound(
391        &self,
392        process_id: &str,
393        owner: crate::LeaseOwnerIdentity,
394    ) -> bool {
395        let lease_ttl_ms = self.lease_timings().ttl_ms();
396        let drain_owner = self.recovery_lease_owner();
397        let lease = match self
398            .config
399            .process_registry
400            .claim_process_lease(process_id, &drain_owner, lease_ttl_ms)
401            .await
402        {
403            Ok(crate::ProcessLeaseClaimOutcome::Acquired(lease)) => lease,
404            // A live run still holds the lease, or the claim failed: defer.
405            Ok(crate::ProcessLeaseClaimOutcome::Busy { .. }) | Err(_) => return false,
406        };
407        if self
408            .config
409            .process_registry
410            .get_process(process_id)
411            .await
412            .is_some_and(|current| current.is_terminal())
413        {
414            self.release_or_log(&lease).await;
415            return false;
416        }
417        let evidence = AbandonEvidence {
418            writer: AbandonWriter::OwnerDrain,
419            owner: Some(owner),
420            epoch_ms: self.now_ms(),
421        };
422        self.complete_and_release(
423            &lease,
424            process_id,
425            ProcessAwaitOutput::Abandoned {
426                evidence: Box::new(evidence),
427                control: None,
428            },
429        )
430        .await;
431        true
432    }
433
434    /// Unique lease owner for one recovery attempt.
435    ///
436    /// Derived from [`DurableProcessWorkerConfig::lease_owner`]: a fresh
437    /// `(owner_id, incarnation_id)` per attempt keeps sweeps idempotent (a
438    /// still-running attempt's live lease fences later passes instead of being
439    /// re-entered as "own lease"), while the configured liveness metadata is
440    /// inherited so peers can prove a crashed worker dead and reclaim.
441    fn recovery_lease_owner(&self) -> crate::LeaseOwnerIdentity {
442        let attempt = uuid::Uuid::new_v4();
443        crate::LeaseOwnerIdentity {
444            owner_id: format!("{}:recovery:{attempt}", self.config.lease_owner.owner_id),
445            incarnation_id: attempt.to_string(),
446            liveness: self.config.lease_owner.liveness.clone(),
447        }
448    }
449
450    /// Recover one non-terminal row, obeying its declared recovery disposition
451    /// (ADR 0019). The verdict per disposition:
452    ///
453    /// - **ExternallyOwned**: never claimed, never run. If a pending Abandon
454    ///   Request is present it is reconciled into `Abandoned{reconciled_request}`.
455    /// - **Rerunnable**: exactly today's behavior — claim, (re-)run, complete.
456    /// - **OwnerBound, never started**: any worker may run it (first execution is
457    ///   not re-execution); the runner records `first_started` before executing.
458    /// - **OwnerBound, started**: never re-run. A provably-dead holder yields
459    ///   `Abandoned{sweep}`; a merely silent/expired holder is left non-terminal
460    ///   unless an Abandon Request is present and the lease has lapsed, which
461    ///   yields `Abandoned{reconciled_request}`. Elapsed time alone never
462    ///   terminalizes.
463    ///
464    /// Every Abandoned write goes through `complete_process` under this sweep's
465    /// own fenced lease, so the sweep stays the single writer and a revenant's
466    /// stale token is rejected.
467    async fn recover_process(&self, record: ProcessRecord) {
468        let process_id = record.id.clone();
469        // ExternallyOwned: lash never executes the row. The only recovery action
470        // is reconciling a pending Abandon Request; there is no owner lease to
471        // wait out.
472        if record.disposition == RecoveryDisposition::ExternallyOwned {
473            if record.abandon_request.is_some() {
474                self.reconcile_externally_owned_abandon(&process_id).await;
475            }
476            return;
477        }
478
479        let lease_ttl_ms = self.lease_timings().ttl_ms();
480        let owner = self.recovery_lease_owner();
481        // Claim the single-owner lease, distinguishing a fenced reclaim of a
482        // provably-dead holder (death evidence) from acquiring a free/expired
483        // lease (no death evidence). A live, not-provably-dead holder or a claim
484        // error leaves the row to its owner.
485        let Some((lease, dead_holder)) = self
486            .claim_for_recovery(&process_id, &owner, lease_ttl_ms)
487            .await
488        else {
489            return;
490        };
491        // Terminal between the list and the claim. Idempotent by process_id: do
492        // not re-execute or re-terminalize a finished process.
493        if self
494            .config
495            .process_registry
496            .get_process(&process_id)
497            .await
498            .is_some_and(|current| current.is_terminal())
499        {
500            self.release_or_log(&lease).await;
501            return;
502        }
503
504        match record.disposition {
505            // Rerunnable: claim, (re-)run, complete — exactly today's behavior.
506            RecoveryDisposition::Rerunnable => self.run_and_complete(record, lease).await,
507            RecoveryDisposition::OwnerBound if record.first_started.is_some() => {
508                // Started OwnerBound work is NEVER re-run — abandonment is the
509                // only recovery. `first_started`'s owner is the lapsed owner the
510                // reconciled-request evidence names.
511                let lapsed_owner = record
512                    .first_started
513                    .as_ref()
514                    .map(|started| started.owner.clone());
515                let evidence = if let Some(dead_holder) = dead_holder {
516                    // Holder provably dead ⇒ Abandoned{sweep}.
517                    Some(AbandonEvidence {
518                        writer: AbandonWriter::Sweep,
519                        owner: Some(dead_holder.owner),
520                        epoch_ms: self.now_ms(),
521                    })
522                } else if record.abandon_request.is_some() {
523                    // Silent/expired holder without death evidence, but an
524                    // operator authorized abandonment and the lease has lapsed
525                    // (we acquired a free/expired lease) ⇒ Abandoned{reconciled}.
526                    Some(AbandonEvidence {
527                        writer: AbandonWriter::ReconciledRequest,
528                        owner: lapsed_owner,
529                        epoch_ms: self.now_ms(),
530                    })
531                } else {
532                    // No death evidence and no authorization: elapsed time alone
533                    // never terminalizes. Leave the row non-terminal.
534                    None
535                };
536                match evidence {
537                    Some(evidence) => {
538                        self.complete_and_release(
539                            &lease,
540                            &process_id,
541                            ProcessAwaitOutput::Abandoned {
542                                evidence: Box::new(evidence),
543                                control: None,
544                            },
545                        )
546                        .await;
547                    }
548                    None => self.release_or_log(&lease).await,
549                }
550            }
551            // OwnerBound, never started: first execution is not re-execution, so
552            // any worker may run it; the runner records first_started first.
553            RecoveryDisposition::OwnerBound => self.run_and_complete(record, lease).await,
554            // Filtered above; releasing keeps the lease honest if reached.
555            RecoveryDisposition::ExternallyOwned => self.release_or_log(&lease).await,
556        }
557    }
558
559    /// Wall-clock epoch ms from the worker's configured clock.
560    fn now_ms(&self) -> u64 {
561        self.config.runtime_host.clock.timestamp_ms()
562    }
563
564    /// Claim the recovery lease. Returns the acquired lease plus, when the claim
565    /// fenced out a provably-dead holder, that holder as death evidence. Returns
566    /// `None` when the row is held by a live (not provably-dead) owner or the
567    /// claim fails — either way this pass leaves the row to its owner.
568    async fn claim_for_recovery(
569        &self,
570        process_id: &str,
571        owner: &crate::LeaseOwnerIdentity,
572        lease_ttl_ms: u64,
573    ) -> Option<(ProcessLease, Option<ProcessLease>)> {
574        match self
575            .config
576            .process_registry
577            .claim_process_lease(process_id, owner, lease_ttl_ms)
578            .await
579        {
580            Ok(crate::ProcessLeaseClaimOutcome::Acquired(lease)) => Some((lease, None)),
581            Ok(crate::ProcessLeaseClaimOutcome::Busy { holder })
582                if holder.owner.is_definitely_dead_for_claimant(owner) =>
583            {
584                match self
585                    .config
586                    .process_registry
587                    .reclaim_process_lease(process_id, owner, &holder, lease_ttl_ms)
588                    .await
589                {
590                    Ok(crate::ProcessLeaseClaimOutcome::Acquired(lease)) => {
591                        Some((lease, Some(holder)))
592                    }
593                    Ok(crate::ProcessLeaseClaimOutcome::Busy { .. }) | Err(_) => None,
594                }
595            }
596            Ok(crate::ProcessLeaseClaimOutcome::Busy { .. }) | Err(_) => None,
597        }
598    }
599
600    /// Reconcile a pending Abandon Request on an externally-owned row into an
601    /// `Abandoned{reconciled_request}` terminal. Lash never executed the row, so
602    /// there is no owner lease to wait out — but the terminal still goes through
603    /// the sweep's own fenced lease so the sweep stays the single writer.
604    async fn reconcile_externally_owned_abandon(&self, process_id: &str) {
605        let lease_ttl_ms = self.lease_timings().ttl_ms();
606        let owner = self.recovery_lease_owner();
607        let lease = match self
608            .config
609            .process_registry
610            .claim_process_lease(process_id, &owner, lease_ttl_ms)
611            .await
612        {
613            Ok(crate::ProcessLeaseClaimOutcome::Acquired(lease)) => lease,
614            // A concurrent writer holds the lease; let it finish.
615            Ok(crate::ProcessLeaseClaimOutcome::Busy { .. }) | Err(_) => return,
616        };
617        if self
618            .config
619            .process_registry
620            .get_process(process_id)
621            .await
622            .is_some_and(|current| current.is_terminal())
623        {
624            self.release_or_log(&lease).await;
625            return;
626        }
627        let evidence = AbandonEvidence {
628            writer: AbandonWriter::ReconciledRequest,
629            // Externally-owned work has no lash execution owner to name.
630            owner: None,
631            epoch_ms: self.now_ms(),
632        };
633        self.complete_and_release(
634            &lease,
635            process_id,
636            ProcessAwaitOutput::Abandoned {
637                evidence: Box::new(evidence),
638                control: None,
639            },
640        )
641        .await;
642    }
643
644    /// (Re-)run a claimed row under its renewed lease and write the terminal
645    /// outcome, the same live-owner-is-single-writer path used before ADR 0019.
646    async fn run_and_complete(&self, record: ProcessRecord, lease: ProcessLease) {
647        let process_id = record.id.clone();
648        let registration = registration_from_record(record);
649        let execution_context = ProcessExecutionContext::default();
650        match self
651            .run_process_with_lease_renewal(registration, execution_context, lease.clone())
652            .await
653        {
654            // Ran to a terminal outcome (success or a process-level failure) while
655            // holding the lease: this owner is the single writer of the terminal.
656            Ok(output) => self.complete_and_release(&lease, &process_id, output).await,
657            // The lease was lost mid-run — another owner reclaimed the expired
658            // lease and is now running this process. Do NOT write a terminal
659            // outcome or release the lease: that would race the new owner and
660            // could record a succeeded process as Failed. Leave the row to the
661            // lease holder; it will finish (or another sweep retries it).
662            Err(RecoverFailure::LeaseLost(err)) => {
663                tracing::warn!(
664                    process_id = %process_id,
665                    error = %err,
666                    "process recovery lost its lease mid-run; deferring to the new owner",
667                );
668            }
669            // The process could not be run at all (rebuild/store-facet failure):
670            // terminalize as a recovery failure so the row leaves the worklist.
671            Err(RecoverFailure::Run(err)) => {
672                let output = ProcessAwaitOutput::Failure {
673                    class: crate::ToolFailureClass::Execution,
674                    code: "process_recovery_failed".to_string(),
675                    message: err.to_string(),
676                    raw: None,
677                    control: None,
678                };
679                self.complete_and_release(&lease, &process_id, output).await;
680            }
681        }
682    }
683
684    /// Write a recovered process's terminal outcome (the running lease owner is
685    /// the single writer) and then release the lease, logging either failure
686    /// rather than aborting — the lease's TTL is the backstop.
687    async fn complete_and_release(
688        &self,
689        lease: &ProcessLease,
690        process_id: &str,
691        output: ProcessAwaitOutput,
692    ) {
693        // Fence the terminal write: re-confirm the lease immediately before
694        // writing. `renew_process_lease` is rejected (by owner/lease_token/
695        // fencing_token) if another owner has reclaimed an expired lease, and on
696        // success extends the window so the back-to-back write lands inside the
697        // owned interval. A worker that stalled past its TTL therefore cannot
698        // overwrite the new owner's outcome — it defers instead.
699        let fenced = match self
700            .config
701            .process_registry
702            .renew_process_lease(lease, self.lease_timings().ttl_ms())
703            .await
704        {
705            Ok(renewed) => renewed,
706            Err(err) => {
707                tracing::warn!(
708                    process_id = %process_id,
709                    error = %err,
710                    "lost process lease before terminal write; deferring to the new owner",
711                );
712                return;
713            }
714        };
715        if let Err(err) = self
716            .config
717            .process_registry
718            .complete_process(process_id, output)
719            .await
720        {
721            tracing::warn!(
722                process_id = %process_id,
723                error = %err,
724                "failed to write recovered process terminal outcome",
725            );
726        }
727        self.release_or_log(&fenced).await;
728    }
729
730    async fn release_or_log(&self, lease: &ProcessLease) {
731        if let Err(err) = self.release_process_lease(lease).await {
732            tracing::warn!(
733                process_id = %lease.process_id,
734                error = %err,
735                "failed to release recovered process lease",
736            );
737        }
738    }
739
740    /// Run a recovered process while renewing its lease across the execution,
741    /// mirroring the turn-lease renewal that keeps a long-running effect's lease
742    /// from expiring under the live owner.
743    async fn run_process_with_lease_renewal(
744        &self,
745        registration: ProcessRegistration,
746        execution_context: ProcessExecutionContext,
747        mut lease: ProcessLease,
748    ) -> Result<ProcessAwaitOutput, RecoverFailure> {
749        let process_id = registration.id.clone();
750        let cancellation = CancellationToken::new();
751        let cancel_watcher = {
752            let awaiter = self
753                .config
754                .process_change_hub
755                .clone()
756                .map(|hub| {
757                    crate::ProcessAwaiter::new(Arc::clone(&self.config.process_registry), hub)
758                })
759                .unwrap_or_else(|| {
760                    crate::ProcessAwaiter::polling(Arc::clone(&self.config.process_registry))
761                });
762            let process_id = process_id.clone();
763            let cancellation = cancellation.clone();
764            tokio::spawn(async move {
765                match awaiter
766                    .await_event(&process_id, "process.cancel_requested", 0)
767                    .await
768                {
769                    Ok(_) => cancellation.cancel(),
770                    Err(err) => tracing::warn!(
771                        process_id = %process_id,
772                        error = %err,
773                        "process cancel watcher stopped before observing cancellation",
774                    ),
775                }
776            })
777        };
778        let pending = self.run_process(registration, execution_context, cancellation.clone());
779        tokio::pin!(pending);
780        loop {
781            tokio::select! {
782                outcome = &mut pending => {
783                    cancel_watcher.abort();
784                    return outcome.map_err(RecoverFailure::Run);
785                }
786                _ = self.config.runtime_host.clock.sleep(self.lease_timings().renew_interval()) => {
787                    match self
788                        .config
789                        .process_registry
790                        .renew_process_lease(&lease, self.lease_timings().ttl_ms())
791                        .await
792                    {
793                        Ok(renewed) => lease = renewed,
794                        Err(err) => {
795                            cancellation.cancel();
796                            cancel_watcher.abort();
797                            return Err(RecoverFailure::LeaseLost(err));
798                        }
799                    }
800                }
801            }
802        }
803    }
804
805    fn lease_timings(&self) -> crate::LeaseTimings {
806        self.config.runtime_host.control.lease_timings
807    }
808
809    async fn release_process_lease(&self, lease: &ProcessLease) -> Result<(), PluginError> {
810        self.config
811            .process_registry
812            .complete_process_lease(&ProcessLeaseCompletion::from_lease(lease))
813            .await
814    }
815
816    pub async fn request_process_cancel(
817        &self,
818        process_id: &str,
819        reason: Option<String>,
820    ) -> Result<(), PluginError> {
821        self.config
822            .process_registry
823            .append_event(
824                process_id,
825                crate::ProcessEventAppendRequest::cancel_requested(process_id, reason),
826            )
827            .await
828            .map(|_| ())
829    }
830
831    async fn runtime_for_registration(
832        &self,
833        registration: &ProcessRegistration,
834    ) -> Result<LashRuntime, PluginError> {
835        match registration.input.as_ref() {
836            ProcessInput::SessionTurn { create_request, .. } => {
837                self.runtime_for_session_turn(registration, create_request.as_ref())
838                    .await
839            }
840            ProcessInput::ToolCall { .. } | ProcessInput::Engine { .. } => {
841                self.runtime_for_process_env(registration).await
842            }
843            // Externally-owned rows are rejected before dispatch (ADR 0019), so an
844            // External input has no execution runtime; fail loudly rather than
845            // fabricate one.
846            ProcessInput::External { .. } => Err(PluginError::Session(format!(
847                "process `{}` is externally-owned and has no execution runtime",
848                registration.id
849            ))),
850        }
851    }
852
853    async fn runtime_for_session_turn(
854        &self,
855        registration: &ProcessRegistration,
856        create_request: &crate::SessionCreateRequest,
857    ) -> Result<LashRuntime, PluginError> {
858        let mut policy = create_request
859            .policy
860            .clone()
861            .unwrap_or_else(|| self.config.session_policy.clone());
862        if policy.recorded_provider_id().is_empty() {
863            policy.provider_id = self.config.session_policy.provider_id.clone();
864        }
865        self.build_ephemeral_runtime(
866            format!("process-session-turn:{}", registration.id),
867            policy,
868            create_request.plugin_options.clone(),
869            "session turn request",
870        )
871        .await
872    }
873
874    async fn runtime_for_process_env(
875        &self,
876        registration: &ProcessRegistration,
877    ) -> Result<LashRuntime, PluginError> {
878        let Some(env_ref) = registration.env_ref.as_ref() else {
879            return Err(PluginError::Session(format!(
880                "process `{}` is missing a captured execution env",
881                registration.id
882            )));
883        };
884        let env = crate::load_process_execution_env(
885            self.config
886                .runtime_host
887                .durability
888                .process_env_store
889                .as_ref(),
890            env_ref,
891        )
892        .await?;
893        self.build_ephemeral_runtime(
894            format!("process-env:{}", registration.id),
895            env.policy,
896            env.plugin_options,
897            env_ref.as_str(),
898        )
899        .await
900    }
901
902    async fn build_ephemeral_runtime(
903        &self,
904        session_id: String,
905        policy: crate::SessionPolicy,
906        plugin_options: crate::PluginOptions,
907        source_label: &str,
908    ) -> Result<LashRuntime, PluginError> {
909        let store = Arc::new(InMemorySessionStore::default());
910        let process_work_driver = self.config.process_work_driver.clone().unwrap_or_else(|| {
911            if let Some(hub) = self.config.process_change_hub.clone() {
912                ProcessWorkDriver::from_watched(
913                    Arc::clone(&self.config.process_registry),
914                    hub,
915                    Arc::new(crate::InlineProcessRunHandle::new(self.clone())),
916                )
917            } else {
918                ProcessWorkDriver::inline(Arc::clone(&self.config.process_registry), self.clone())
919            }
920        });
921        let mut builder = EmbeddedRuntimeBuilder::new()
922            .with_session_id(session_id.to_string())
923            .with_plugin_host(self.config.plugin_host.as_ref().clone())
924            .with_runtime_host(self.config.runtime_host.clone())
925            .with_policy(policy)
926            .with_plugin_options(plugin_options)
927            .with_session_store_factory(Arc::clone(&self.config.session_store_factory))
928            .with_trigger_store(Arc::clone(&self.config.trigger_store))
929            .with_process_registry(Arc::clone(&self.config.process_registry))
930            .with_process_work_driver(process_work_driver)
931            .with_residency(self.config.residency)
932            .with_store(store);
933        if let Some(driver) = self.config.queued_work_driver.clone() {
934            builder = builder.with_queued_work_driver(driver);
935        }
936        builder.build().await.map_err(|err| {
937            PluginError::Session(format!(
938                "failed to build process worker runtime for {source_label}: {err}"
939            ))
940        })
941    }
942
943    /// Enforce the durable-first wiring invariant at the worker process-run
944    /// boundary: when the worker was wired with a durable effect host, every
945    /// store it will execute against must also be durable. A durable host
946    /// running against any ephemeral store fails loudly here rather than
947    /// silently re-executing a process against non-durable state.
948    ///
949    /// Inline controllers (the default tier) impose no requirement, so
950    /// inline/in-memory workers pass unchanged.
951    fn ensure_durable_store_facets(&self) -> Result<(), PluginError> {
952        if self
953            .config
954            .runtime_host
955            .control
956            .effect_host
957            .durability_tier()
958            != crate::DurabilityTier::Durable
959        {
960            return Ok(());
961        }
962        let require = |facet: crate::DurableStoreFacet| {
963            PluginError::Session(crate::RuntimeError::durable_store_required(facet).to_string())
964        };
965        if self
966            .config
967            .runtime_host
968            .durability
969            .attachment_store
970            .persistence()
971            .durability_tier()
972            != crate::DurabilityTier::Durable
973        {
974            return Err(require(crate::DurableStoreFacet::AttachmentStore));
975        }
976        if self
977            .config
978            .runtime_host
979            .durability
980            .process_env_store
981            .durability_tier()
982            != crate::DurabilityTier::Durable
983        {
984            return Err(require(crate::DurableStoreFacet::ProcessEnvStore));
985        }
986        if self.config.session_store_factory.durability_tier() != crate::DurabilityTier::Durable {
987            return Err(require(crate::DurableStoreFacet::SessionStore));
988        }
989        if self.config.process_registry.durability_tier() != crate::DurabilityTier::Durable {
990            return Err(require(crate::DurableStoreFacet::ProcessRegistry));
991        }
992        if self.config.trigger_store.durability_tier() != crate::DurabilityTier::Durable {
993            return Err(require(crate::DurableStoreFacet::TriggerStore));
994        }
995        Ok(())
996    }
997
998    /// Enforce the stable-process-id invariant at every (re-)execution: process
999    /// execution identity is the persisted `process_id`, so a retry — a Restate
1000    /// `run` re-invocation (keyed `LashProcessWorkflow/{process_id}`) or a
1001    /// recovery sweep re-running a non-terminal row — must present that stable
1002    /// id. An empty/fresh id has lost its idempotency anchor and is rejected
1003    /// loudly here, mirroring how `ExecutionScope` rejects an
1004    /// empty turn id at the durable-effect boundary.
1005    fn ensure_stable_process_id(
1006        &self,
1007        registration: &ProcessRegistration,
1008    ) -> Result<(), PluginError> {
1009        if registration.id.trim().is_empty() {
1010            return Err(PluginError::Session(
1011                crate::RuntimeError::missing_process_execution_id().to_string(),
1012            ));
1013        }
1014        Ok(())
1015    }
1016}
1017
1018/// Rebuild a runnable registration from a persisted row, preserving its declared
1019/// disposition (ADR 0019).
1020fn registration_from_record(record: ProcessRecord) -> ProcessRegistration {
1021    ProcessRegistration {
1022        id: record.id,
1023        input: record.input,
1024        disposition: record.disposition,
1025        identity: record.identity,
1026        event_types: record.event_types,
1027        provenance: record.provenance,
1028        env_ref: record.env_ref,
1029        wake_target: record.wake_target,
1030    }
1031}
1032
1033#[cfg(test)]
1034mod boundary_tests;
1035#[cfg(test)]
1036mod recovery_tests;