Skip to main content

lash_core/runtime/
process_worker.rs

1use std::sync::Arc;
2
3use tokio_util::sync::CancellationToken;
4
5use super::effect::ProcessRunner;
6use super::session_manager::RuntimeSessionServices;
7use super::{EmbeddedRuntimeBuilder, ProcessWorkDriver, QueuedWorkDriver, RuntimeHostConfig};
8use crate::{
9    InMemorySessionStore, LashRuntime, PluginError, PluginFactory, PluginHost, PluginStack,
10    ProcessAwaitOutput, ProcessExecutionContext, ProcessInput, ProcessLease,
11    ProcessLeaseCompletion, ProcessRecord, ProcessRegistration, ProcessRegistry,
12    SessionStoreFactory,
13};
14
15/// Deployment-local configuration for rebuilding durable process executions.
16///
17/// Process rows intentionally carry only portable process input and provenance.
18/// Workers provide plugins, providers, stores, secrets, and host capabilities
19/// for the deployment that owns those rows.
20#[derive(Clone)]
21pub struct DurableProcessWorkerConfig {
22    pub plugin_host: Arc<PluginHost>,
23    pub runtime_host: RuntimeHostConfig,
24    pub session_policy: crate::SessionPolicy,
25    pub session_store_factory: Arc<dyn SessionStoreFactory>,
26    pub process_registry: Arc<dyn ProcessRegistry>,
27    pub process_change_hub: Option<crate::ProcessChangeHub>,
28    pub trigger_store: Arc<dyn crate::TriggerStore>,
29    pub process_work_driver: Option<ProcessWorkDriver>,
30    pub queued_work_driver: Option<QueuedWorkDriver>,
31    #[doc(hidden)]
32    pub turn_phase_probe_slot: crate::runtime::RuntimeTurnPhaseProbeSlot,
33    /// Residency for sessions the worker rebuilds to run a process. Defaults to
34    /// [`Residency::KeepAll`]; a host running [`Residency::ActivePathOnly`] wires
35    /// it here so the worker's rebuilt sessions trim to the active path too,
36    /// instead of silently diverging from the live runtime by keeping the full
37    /// graph resident.
38    pub residency: crate::Residency,
39    /// Owner identity stem this worker derives per-recovery lease owners from.
40    ///
41    /// Each recovery attempt claims with a unique `(owner_id, incarnation_id)`
42    /// derived from this identity — a live lease held by an earlier attempt
43    /// must fence a later sweep pass rather than be re-entered as the same
44    /// incarnation — while the liveness metadata is inherited as-is. Defaults
45    /// to a fresh opaque identity per config. Hosts that run one worker per OS
46    /// process should wire a
47    /// [`LeaseOwnerIdentity::local_process`](crate::LeaseOwnerIdentity::local_process)
48    /// identity so peers on the same host can prove a crashed worker dead and
49    /// reclaim its process leases before the TTL — mirroring the session
50    /// execution lane's runtime lease owner.
51    pub lease_owner: crate::LeaseOwnerIdentity,
52}
53
54impl DurableProcessWorkerConfig {
55    pub fn new(
56        plugin_host: Arc<PluginHost>,
57        runtime_host: RuntimeHostConfig,
58        session_store_factory: Arc<dyn SessionStoreFactory>,
59        process_registry: Arc<dyn ProcessRegistry>,
60    ) -> Self {
61        let clock = Arc::clone(&runtime_host.clock);
62        Self {
63            plugin_host,
64            runtime_host,
65            session_policy: crate::SessionPolicy::default(),
66            session_store_factory,
67            process_registry,
68            process_change_hub: None,
69            trigger_store: Arc::new(crate::InMemoryTriggerStore::with_clock(clock)),
70            process_work_driver: None,
71            queued_work_driver: None,
72            turn_phase_probe_slot: crate::runtime::RuntimeTurnPhaseProbeSlot::default(),
73            residency: crate::Residency::default(),
74            lease_owner: crate::LeaseOwnerIdentity::opaque(
75                format!("durable-process-worker:{}", uuid::Uuid::new_v4()),
76                uuid::Uuid::new_v4().to_string(),
77            ),
78        }
79    }
80
81    pub fn with_trigger_store(mut self, store: Arc<dyn crate::TriggerStore>) -> Self {
82        self.trigger_store = store;
83        self
84    }
85
86    pub fn with_session_policy(mut self, policy: crate::SessionPolicy) -> Self {
87        self.session_policy = policy;
88        self
89    }
90
91    pub fn with_residency(mut self, residency: crate::Residency) -> Self {
92        self.residency = residency;
93        self
94    }
95
96    pub fn with_process_work_driver(mut self, driver: ProcessWorkDriver) -> Self {
97        self.process_work_driver = Some(driver);
98        self
99    }
100
101    pub fn with_change_hub(mut self, hub: crate::ProcessChangeHub) -> Self {
102        self.process_change_hub = Some(hub);
103        self
104    }
105
106    /// Set the owner identity this worker presents when claiming process
107    /// leases. See [`DurableProcessWorkerConfig::lease_owner`].
108    pub fn with_lease_owner(mut self, lease_owner: crate::LeaseOwnerIdentity) -> Self {
109        self.lease_owner = lease_owner;
110        self
111    }
112
113    pub fn with_queued_work_driver(mut self, driver: QueuedWorkDriver) -> Self {
114        self.queued_work_driver = Some(driver);
115        self
116    }
117
118    #[doc(hidden)]
119    pub fn with_turn_phase_probe_slot(
120        mut self,
121        slot: crate::runtime::RuntimeTurnPhaseProbeSlot,
122    ) -> Self {
123        self.turn_phase_probe_slot = slot;
124        self
125    }
126
127    pub fn from_plugin_factories(
128        plugin_factories: impl IntoIterator<Item = Arc<dyn PluginFactory>>,
129        runtime_host: RuntimeHostConfig,
130        session_store_factory: Arc<dyn SessionStoreFactory>,
131        process_registry: Arc<dyn ProcessRegistry>,
132    ) -> Self {
133        Self::new(
134            Arc::new(PluginHost::new(plugin_factories.into_iter().collect())),
135            runtime_host,
136            session_store_factory,
137            process_registry,
138        )
139    }
140
141    pub fn from_plugin_stack(
142        plugin_stack: PluginStack,
143        runtime_host: RuntimeHostConfig,
144        session_store_factory: Arc<dyn SessionStoreFactory>,
145        process_registry: Arc<dyn ProcessRegistry>,
146    ) -> Self {
147        Self::from_plugin_factories(
148            plugin_stack.into_factories(),
149            runtime_host,
150            session_store_factory,
151            process_registry,
152        )
153    }
154}
155
156/// Reconstructable background-process worker.
157#[derive(Clone)]
158pub struct DurableProcessWorker {
159    config: Arc<DurableProcessWorkerConfig>,
160}
161
162/// Why a recovery run did not produce a terminal outcome under the lease.
163enum RecoverFailure {
164    /// The lease was lost mid-run (another owner reclaimed an expired lease).
165    /// The losing worker must not write a terminal outcome — the new owner is
166    /// now the single writer.
167    LeaseLost(PluginError),
168    /// The process could not be run (rebuild/store-facet failure). The lease is
169    /// still held, so this worker terminalizes the row.
170    Run(PluginError),
171}
172
173impl DurableProcessWorker {
174    pub fn new(config: DurableProcessWorkerConfig) -> Self {
175        Self {
176            config: Arc::new(config),
177        }
178    }
179
180    pub fn from_shared_config(config: Arc<DurableProcessWorkerConfig>) -> Self {
181        Self { config }
182    }
183
184    pub fn config(&self) -> &DurableProcessWorkerConfig {
185        &self.config
186    }
187
188    pub async fn run_process(
189        &self,
190        registration: ProcessRegistration,
191        execution_context: ProcessExecutionContext,
192        cancellation: CancellationToken,
193    ) -> Result<ProcessAwaitOutput, PluginError> {
194        let scoped_effect_controller = self
195            .config
196            .runtime_host
197            .control
198            .effect_host
199            .scoped_static(crate::ExecutionScope::process(registration.id.clone()))
200            .map_err(|err| PluginError::Session(err.to_string()))?
201            .ok_or_else(|| {
202                PluginError::Session(
203                    "process worker effect host must provide a static process scope".to_string(),
204                )
205            })?;
206        self.run_process_with_scoped_effect_controller(
207            registration,
208            execution_context,
209            scoped_effect_controller,
210            cancellation,
211        )
212        .await
213    }
214
215    pub async fn run_process_with_scoped_effect_controller(
216        &self,
217        registration: ProcessRegistration,
218        execution_context: ProcessExecutionContext,
219        scoped_effect_controller: crate::ScopedEffectController<'_>,
220        cancellation: CancellationToken,
221    ) -> Result<ProcessAwaitOutput, PluginError> {
222        self.ensure_stable_process_id(&registration)?;
223        self.ensure_durable_store_facets()?;
224        if let ProcessInput::External { metadata } = registration.input.as_ref() {
225            return Ok(ProcessAwaitOutput::Success {
226                value: serde_json::json!({ "metadata": metadata.clone() }),
227                control: None,
228            });
229        }
230        let mut runtime = self.runtime_for_registration(&registration).await?;
231        let originator_scope = if let crate::ProcessOriginator::Session { scope } =
232            &registration.provenance.originator
233        {
234            Some(scope)
235        } else {
236            None
237        };
238        let probe_scope = registration.wake_target.as_ref().or(originator_scope);
239        if let Some(probe) =
240            probe_scope.and_then(|scope| self.config.turn_phase_probe_slot.get_for_scope(scope))
241        {
242            runtime.set_turn_phase_probe(probe);
243        }
244        let manager = RuntimeSessionServices::new(&runtime, true, None).map_err(|err| {
245            PluginError::Session(format!(
246                "failed to build runtime env for process `{}`: {err}",
247                registration.id
248            ))
249        })?;
250        Ok(manager
251            .run_process(
252                registration,
253                execution_context,
254                Arc::clone(&self.config.process_registry),
255                scoped_effect_controller,
256                cancellation,
257            )
258            .await)
259    }
260
261    /// Sweep the registry for non-terminal processes and re-execute the ones
262    /// this worker can claim, driving each to a terminal state.
263    ///
264    /// This is the crash-recovery counterpart to a worker that ran a process
265    /// from a live turn: a trigger/trigger-started process whose worker
266    /// died mid-flight is left non-terminal in the registry, and a subsequent
267    /// worker reopening that registry must finish it. The sweep:
268    ///
269    /// 1. lists every non-terminal process ([`ProcessRegistry::list_non_terminal`]);
270    /// 2. claims the durable single-owner [`ProcessLease`] over each — a process
271    ///    already leased live by *another* owner is skipped (it is being run by
272    ///    that owner right now) unless persisted liveness metadata proves that
273    ///    owner definitely dead, in which case the lease is reclaimed with the
274    ///    fenced CAS discipline of
275    ///    [`ProcessRegistry::reclaim_process_lease`]; either way a non-terminal
276    ///    process is re-run by exactly one owner (lease fencing);
277    /// 3. runs the claimed process on this worker's wired controller, renewing
278    ///    the lease across the long-running execution so a healthy recovery is
279    ///    not swept out from under itself;
280    /// 4. writes the terminal outcome and releases the lease.
281    ///
282    /// Idempotent by `process_id`: terminal processes are never in the worklist,
283    /// and a process that became terminal between the list and the claim is
284    /// detected after claiming and skipped, so re-running a recovery sweep does
285    /// not double-execute completed work.
286    pub async fn drive_pending_processes(&self) -> Result<(), PluginError> {
287        let records = self.config.process_registry.list_non_terminal().await?;
288        for record in records {
289            // Run each claimed process on its OWN lease-fenced task. A sequential
290            // drive that awaited each process to terminal would deadlock a process
291            // that blocks awaiting a nested child (`start child` then `await`, or a
292            // subagent fan-out): the one drive task would park inside the parent's
293            // await and never claim the child. Spawning frees the loop so a
294            // subsequent host-driven pass claims and runs the child, and the
295            // per-process `ProcessLease` fences concurrent owners — so spawning a
296            // task per pending row on every drive is idempotent (a row already
297            // running is skipped on claim conflict) and one failing row never
298            // aborts the rest of the sweep.
299            let worker = self.clone();
300            tokio::spawn(async move { worker.recover_process(record).await });
301        }
302        Ok(())
303    }
304
305    /// Unique lease owner for one recovery attempt.
306    ///
307    /// Derived from [`DurableProcessWorkerConfig::lease_owner`]: a fresh
308    /// `(owner_id, incarnation_id)` per attempt keeps sweeps idempotent (a
309    /// still-running attempt's live lease fences later passes instead of being
310    /// re-entered as "own lease"), while the configured liveness metadata is
311    /// inherited so peers can prove a crashed worker dead and reclaim.
312    fn recovery_lease_owner(&self) -> crate::LeaseOwnerIdentity {
313        let attempt = uuid::Uuid::new_v4();
314        crate::LeaseOwnerIdentity {
315            owner_id: format!("{}:recovery:{attempt}", self.config.lease_owner.owner_id),
316            incarnation_id: attempt.to_string(),
317            liveness: self.config.lease_owner.liveness.clone(),
318        }
319    }
320
321    async fn recover_process(&self, record: ProcessRecord) {
322        let process_id = record.id.clone();
323        let lease_ttl_ms = self.lease_timings().ttl_ms();
324        let owner = &self.recovery_lease_owner();
325        // Skip if held live by another owner: a busy claim means a worker is
326        // already running this process, so re-running here would violate the
327        // single-owner contract. A busy holder whose persisted local-process
328        // liveness proves it definitely dead is reclaimed with the fenced CAS
329        // discipline the session execution lane uses, so an orphaned process
330        // recovers without waiting out the full lease TTL. Treat claim errors
331        // as "leased elsewhere".
332        let lease = match self
333            .config
334            .process_registry
335            .claim_process_lease(&process_id, owner, lease_ttl_ms)
336            .await
337        {
338            Ok(crate::ProcessLeaseClaimOutcome::Acquired(lease)) => lease,
339            Ok(crate::ProcessLeaseClaimOutcome::Busy { holder })
340                if holder.owner.is_definitely_dead_for_claimant(owner) =>
341            {
342                match self
343                    .config
344                    .process_registry
345                    .reclaim_process_lease(&process_id, owner, &holder, lease_ttl_ms)
346                    .await
347                {
348                    Ok(crate::ProcessLeaseClaimOutcome::Acquired(lease)) => lease,
349                    Ok(crate::ProcessLeaseClaimOutcome::Busy { .. }) | Err(_) => return,
350                }
351            }
352            Ok(crate::ProcessLeaseClaimOutcome::Busy { .. }) | Err(_) => return,
353        };
354        // The process may have reached a terminal state between the list and the
355        // claim. Idempotent by process_id: do not re-execute a finished process.
356        if self
357            .config
358            .process_registry
359            .get_process(&process_id)
360            .await
361            .is_some_and(|current| current.is_terminal())
362        {
363            self.release_or_log(&lease).await;
364            return;
365        }
366        let registration = ProcessRegistration {
367            id: record.id,
368            input: record.input,
369            identity: record.identity,
370            event_types: record.event_types,
371            provenance: record.provenance.clone(),
372            env_ref: record.env_ref.clone(),
373            wake_target: record.wake_target.clone(),
374        };
375        let execution_context = ProcessExecutionContext::default();
376        match self
377            .run_process_with_lease_renewal(registration, execution_context, lease.clone())
378            .await
379        {
380            // Ran to a terminal outcome (success or a process-level failure) while
381            // holding the lease: this owner is the single writer of the terminal.
382            Ok(output) => self.complete_and_release(&lease, &process_id, output).await,
383            // The lease was lost mid-run — another owner reclaimed the expired
384            // lease and is now running this process. Do NOT write a terminal
385            // outcome or release the lease: that would race the new owner and
386            // could record a succeeded process as Failed. Leave the row to the
387            // lease holder; it will finish (or another sweep retries it).
388            Err(RecoverFailure::LeaseLost(err)) => {
389                tracing::warn!(
390                    process_id = %process_id,
391                    error = %err,
392                    "process recovery lost its lease mid-run; deferring to the new owner",
393                );
394            }
395            // The process could not be run at all (rebuild/store-facet failure):
396            // terminalize as a recovery failure so the row leaves the worklist.
397            Err(RecoverFailure::Run(err)) => {
398                let output = ProcessAwaitOutput::Failure {
399                    class: crate::ToolFailureClass::Execution,
400                    code: "process_recovery_failed".to_string(),
401                    message: err.to_string(),
402                    raw: None,
403                    control: None,
404                };
405                self.complete_and_release(&lease, &process_id, output).await;
406            }
407        }
408    }
409
410    /// Write a recovered process's terminal outcome (the running lease owner is
411    /// the single writer) and then release the lease, logging either failure
412    /// rather than aborting — the lease's TTL is the backstop.
413    async fn complete_and_release(
414        &self,
415        lease: &ProcessLease,
416        process_id: &str,
417        output: ProcessAwaitOutput,
418    ) {
419        // Fence the terminal write: re-confirm the lease immediately before
420        // writing. `renew_process_lease` is rejected (by owner/lease_token/
421        // fencing_token) if another owner has reclaimed an expired lease, and on
422        // success extends the window so the back-to-back write lands inside the
423        // owned interval. A worker that stalled past its TTL therefore cannot
424        // overwrite the new owner's outcome — it defers instead.
425        let fenced = match self
426            .config
427            .process_registry
428            .renew_process_lease(lease, self.lease_timings().ttl_ms())
429            .await
430        {
431            Ok(renewed) => renewed,
432            Err(err) => {
433                tracing::warn!(
434                    process_id = %process_id,
435                    error = %err,
436                    "lost process lease before terminal write; deferring to the new owner",
437                );
438                return;
439            }
440        };
441        if let Err(err) = self
442            .config
443            .process_registry
444            .complete_process(process_id, output)
445            .await
446        {
447            tracing::warn!(
448                process_id = %process_id,
449                error = %err,
450                "failed to write recovered process terminal outcome",
451            );
452        }
453        self.release_or_log(&fenced).await;
454    }
455
456    async fn release_or_log(&self, lease: &ProcessLease) {
457        if let Err(err) = self.release_process_lease(lease).await {
458            tracing::warn!(
459                process_id = %lease.process_id,
460                error = %err,
461                "failed to release recovered process lease",
462            );
463        }
464    }
465
466    /// Run a recovered process while renewing its lease across the execution,
467    /// mirroring the turn-lease renewal that keeps a long-running effect's lease
468    /// from expiring under the live owner.
469    async fn run_process_with_lease_renewal(
470        &self,
471        registration: ProcessRegistration,
472        execution_context: ProcessExecutionContext,
473        mut lease: ProcessLease,
474    ) -> Result<ProcessAwaitOutput, RecoverFailure> {
475        let process_id = registration.id.clone();
476        let cancellation = CancellationToken::new();
477        let cancel_watcher = {
478            let awaiter = self
479                .config
480                .process_change_hub
481                .clone()
482                .map(|hub| {
483                    crate::ProcessAwaiter::new(Arc::clone(&self.config.process_registry), hub)
484                })
485                .unwrap_or_else(|| {
486                    crate::ProcessAwaiter::polling(Arc::clone(&self.config.process_registry))
487                });
488            let process_id = process_id.clone();
489            let cancellation = cancellation.clone();
490            tokio::spawn(async move {
491                match awaiter
492                    .await_event(&process_id, "process.cancel_requested", 0)
493                    .await
494                {
495                    Ok(_) => cancellation.cancel(),
496                    Err(err) => tracing::warn!(
497                        process_id = %process_id,
498                        error = %err,
499                        "process cancel watcher stopped before observing cancellation",
500                    ),
501                }
502            })
503        };
504        let pending = self.run_process(registration, execution_context, cancellation.clone());
505        tokio::pin!(pending);
506        loop {
507            tokio::select! {
508                outcome = &mut pending => {
509                    cancel_watcher.abort();
510                    return outcome.map_err(RecoverFailure::Run);
511                }
512                _ = self.config.runtime_host.clock.sleep(self.lease_timings().renew_interval()) => {
513                    match self
514                        .config
515                        .process_registry
516                        .renew_process_lease(&lease, self.lease_timings().ttl_ms())
517                        .await
518                    {
519                        Ok(renewed) => lease = renewed,
520                        Err(err) => {
521                            cancellation.cancel();
522                            cancel_watcher.abort();
523                            return Err(RecoverFailure::LeaseLost(err));
524                        }
525                    }
526                }
527            }
528        }
529    }
530
531    fn lease_timings(&self) -> crate::LeaseTimings {
532        self.config.runtime_host.control.lease_timings
533    }
534
535    async fn release_process_lease(&self, lease: &ProcessLease) -> Result<(), PluginError> {
536        self.config
537            .process_registry
538            .complete_process_lease(&ProcessLeaseCompletion::from_lease(lease))
539            .await
540    }
541
542    pub async fn request_process_cancel(
543        &self,
544        process_id: &str,
545        reason: Option<String>,
546    ) -> Result<(), PluginError> {
547        self.config
548            .process_registry
549            .append_event(
550                process_id,
551                crate::ProcessEventAppendRequest::cancel_requested(process_id, reason),
552            )
553            .await
554            .map(|_| ())
555    }
556
557    async fn runtime_for_registration(
558        &self,
559        registration: &ProcessRegistration,
560    ) -> Result<LashRuntime, PluginError> {
561        match registration.input.as_ref() {
562            ProcessInput::SessionTurn { create_request, .. } => {
563                self.runtime_for_session_turn(registration, create_request.as_ref())
564                    .await
565            }
566            ProcessInput::ToolCall { .. } | ProcessInput::Engine { .. } => {
567                self.runtime_for_process_env(registration).await
568            }
569            ProcessInput::External { .. } => unreachable!("external processes short-circuit"),
570        }
571    }
572
573    async fn runtime_for_session_turn(
574        &self,
575        registration: &ProcessRegistration,
576        create_request: &crate::SessionCreateRequest,
577    ) -> Result<LashRuntime, PluginError> {
578        let mut policy = create_request
579            .policy
580            .clone()
581            .unwrap_or_else(|| self.config.session_policy.clone());
582        if policy.recorded_provider_id().is_empty() {
583            policy.provider_id = self.config.session_policy.provider_id.clone();
584        }
585        self.build_ephemeral_runtime(
586            format!("process-session-turn:{}", registration.id),
587            policy,
588            create_request.plugin_options.clone(),
589            "session turn request",
590        )
591        .await
592    }
593
594    async fn runtime_for_process_env(
595        &self,
596        registration: &ProcessRegistration,
597    ) -> Result<LashRuntime, PluginError> {
598        let Some(env_ref) = registration.env_ref.as_ref() else {
599            return Err(PluginError::Session(format!(
600                "process `{}` is missing a captured execution env",
601                registration.id
602            )));
603        };
604        let env = crate::load_process_execution_env(
605            self.config
606                .runtime_host
607                .durability
608                .process_env_store
609                .as_ref(),
610            env_ref,
611        )
612        .await?;
613        self.build_ephemeral_runtime(
614            format!("process-env:{}", registration.id),
615            env.policy,
616            env.plugin_options,
617            env_ref.as_str(),
618        )
619        .await
620    }
621
622    async fn build_ephemeral_runtime(
623        &self,
624        session_id: String,
625        policy: crate::SessionPolicy,
626        plugin_options: crate::PluginOptions,
627        source_label: &str,
628    ) -> Result<LashRuntime, PluginError> {
629        let store = Arc::new(InMemorySessionStore::default());
630        let process_work_driver = self.config.process_work_driver.clone().unwrap_or_else(|| {
631            if let Some(hub) = self.config.process_change_hub.clone() {
632                ProcessWorkDriver::from_watched(
633                    Arc::clone(&self.config.process_registry),
634                    hub,
635                    Arc::new(crate::InlineProcessRunHandle::new(self.clone())),
636                )
637            } else {
638                ProcessWorkDriver::inline(Arc::clone(&self.config.process_registry), self.clone())
639            }
640        });
641        let mut builder = EmbeddedRuntimeBuilder::new()
642            .with_session_id(session_id.to_string())
643            .with_plugin_host(self.config.plugin_host.as_ref().clone())
644            .with_runtime_host(self.config.runtime_host.clone())
645            .with_policy(policy)
646            .with_plugin_options(plugin_options)
647            .with_session_store_factory(Arc::clone(&self.config.session_store_factory))
648            .with_trigger_store(Arc::clone(&self.config.trigger_store))
649            .with_process_registry(Arc::clone(&self.config.process_registry))
650            .with_process_work_driver(process_work_driver)
651            .with_residency(self.config.residency)
652            .with_store(store);
653        if let Some(driver) = self.config.queued_work_driver.clone() {
654            builder = builder.with_queued_work_driver(driver);
655        }
656        builder.build().await.map_err(|err| {
657            PluginError::Session(format!(
658                "failed to build process worker runtime for {source_label}: {err}"
659            ))
660        })
661    }
662
663    /// Enforce the durable-first wiring invariant at the worker process-run
664    /// boundary: when the worker was wired with a durable effect host, every
665    /// store it will execute against must also be durable. A durable host
666    /// running against any ephemeral store fails loudly here rather than
667    /// silently re-executing a process against non-durable state.
668    ///
669    /// Inline controllers (the default tier) impose no requirement, so
670    /// inline/in-memory workers pass unchanged.
671    fn ensure_durable_store_facets(&self) -> Result<(), PluginError> {
672        if self
673            .config
674            .runtime_host
675            .control
676            .effect_host
677            .durability_tier()
678            != crate::DurabilityTier::Durable
679        {
680            return Ok(());
681        }
682        let require = |facet: crate::DurableStoreFacet| {
683            PluginError::Session(crate::RuntimeError::durable_store_required(facet).to_string())
684        };
685        if self
686            .config
687            .runtime_host
688            .durability
689            .attachment_store
690            .persistence()
691            .durability_tier()
692            != crate::DurabilityTier::Durable
693        {
694            return Err(require(crate::DurableStoreFacet::AttachmentStore));
695        }
696        if self
697            .config
698            .runtime_host
699            .durability
700            .process_env_store
701            .durability_tier()
702            != crate::DurabilityTier::Durable
703        {
704            return Err(require(crate::DurableStoreFacet::ProcessEnvStore));
705        }
706        if self.config.session_store_factory.durability_tier() != crate::DurabilityTier::Durable {
707            return Err(require(crate::DurableStoreFacet::SessionStore));
708        }
709        if self.config.process_registry.durability_tier() != crate::DurabilityTier::Durable {
710            return Err(require(crate::DurableStoreFacet::ProcessRegistry));
711        }
712        if self.config.trigger_store.durability_tier() != crate::DurabilityTier::Durable {
713            return Err(require(crate::DurableStoreFacet::TriggerStore));
714        }
715        Ok(())
716    }
717
718    /// Enforce the stable-process-id invariant at every (re-)execution: process
719    /// execution identity is the persisted `process_id`, so a retry — a Restate
720    /// `run` re-invocation (keyed `LashProcessWorkflow/{process_id}`) or a
721    /// recovery sweep re-running a non-terminal row — must present that stable
722    /// id. An empty/fresh id has lost its idempotency anchor and is rejected
723    /// loudly here, mirroring how `ExecutionScope` rejects an
724    /// empty turn id at the durable-effect boundary.
725    fn ensure_stable_process_id(
726        &self,
727        registration: &ProcessRegistration,
728    ) -> Result<(), PluginError> {
729        if registration.id.trim().is_empty() {
730            return Err(PluginError::Session(
731                crate::RuntimeError::missing_process_execution_id().to_string(),
732            ));
733        }
734        Ok(())
735    }
736}
737
738#[cfg(test)]
739mod recovery_tests {
740    use super::*;
741    use crate::{
742        DurabilityTier, LeaseOwnerIdentity, LeaseOwnerLiveness, ProcessInput, ProcessRegistration,
743    };
744
745    fn inline_worker(
746        registry: Arc<dyn ProcessRegistry>,
747        lease_owner: LeaseOwnerIdentity,
748    ) -> DurableProcessWorker {
749        struct InlineSessionStoreFactory;
750
751        #[async_trait::async_trait]
752        impl SessionStoreFactory for InlineSessionStoreFactory {
753            fn durability_tier(&self) -> DurabilityTier {
754                DurabilityTier::Inline
755            }
756
757            async fn create_store(
758                &self,
759                _request: &crate::SessionStoreCreateRequest,
760            ) -> Result<Arc<dyn crate::RuntimePersistence>, String> {
761                Ok(Arc::new(InMemorySessionStore::default()))
762            }
763
764            async fn delete_session(&self, _session_id: &str) -> Result<(), String> {
765                Ok(())
766            }
767        }
768
769        DurableProcessWorker::new(
770            DurableProcessWorkerConfig::new(
771                Arc::new(PluginHost::new(Vec::new())),
772                RuntimeHostConfig::in_memory(),
773                Arc::new(InlineSessionStoreFactory),
774                registry,
775            )
776            .with_lease_owner(lease_owner),
777        )
778    }
779
780    fn external_registration(id: &str) -> ProcessRegistration {
781        ProcessRegistration::new(
782            id,
783            ProcessInput::External {
784                metadata: serde_json::json!({}),
785            },
786            crate::ProcessProvenance::host(),
787        )
788    }
789
790    fn local_owner(owner_id: &str, host_id: &str, process_start: &str) -> LeaseOwnerIdentity {
791        LeaseOwnerIdentity {
792            owner_id: owner_id.to_string(),
793            incarnation_id: format!("{owner_id}:incarnation"),
794            liveness: LeaseOwnerLiveness::local_process_for_test(
795                host_id,
796                "boot-a",
797                std::process::id(),
798                process_start,
799            ),
800        }
801    }
802
803    async fn await_terminal(registry: &Arc<dyn ProcessRegistry>, process_id: &str) {
804        let awaiter = crate::ProcessAwaiter::polling(Arc::clone(registry));
805        tokio::time::timeout(
806            std::time::Duration::from_secs(5),
807            awaiter.await_terminal(process_id),
808        )
809        .await
810        .expect("recovered process reaches terminal within the sweep")
811        .expect("recovered process terminal output");
812    }
813
814    /// The sweep reclaims a lease whose holder is provably dead (same
815    /// host/boot, process gone) instead of waiting out the full TTL.
816    #[tokio::test]
817    async fn sweep_reclaims_dead_holder_lease_and_recovers_the_process() {
818        let registry: Arc<dyn ProcessRegistry> =
819            Arc::new(crate::TestLocalProcessRegistry::default());
820        registry
821            .register_process(external_registration("proc-sweep-reclaim"))
822            .await
823            .expect("register");
824        let dead_holder = local_owner("dead-worker", "host-a", "not-the-current-process-start");
825        registry
826            .claim_process_lease("proc-sweep-reclaim", &dead_holder, 60_000)
827            .await
828            .expect("dead holder claims")
829            .acquired()
830            .expect("dead holder lease acquired");
831
832        let claimant = local_owner("live-worker", "host-a", "claimant-start");
833        let worker = inline_worker(Arc::clone(&registry), claimant);
834        worker
835            .drive_pending_processes()
836            .await
837            .expect("sweep dispatches");
838        await_terminal(&registry, "proc-sweep-reclaim").await;
839    }
840
841    /// A live-leased row whose holder is not provably dead is skipped.
842    #[tokio::test]
843    async fn sweep_skips_rows_whose_holder_is_not_provably_dead() {
844        let registry: Arc<dyn ProcessRegistry> =
845            Arc::new(crate::TestLocalProcessRegistry::default());
846        registry
847            .register_process(external_registration("proc-sweep-skip"))
848            .await
849            .expect("register");
850        // Opaque holders carry no liveness proof, so they are never reclaimed.
851        registry
852            .claim_process_lease(
853                "proc-sweep-skip",
854                &LeaseOwnerIdentity::opaque("other-worker", "other-incarnation"),
855                60_000,
856            )
857            .await
858            .expect("live holder claims")
859            .acquired()
860            .expect("live holder lease acquired");
861
862        let claimant = local_owner("live-worker", "host-a", "claimant-start");
863        let worker = inline_worker(Arc::clone(&registry), claimant);
864        worker
865            .drive_pending_processes()
866            .await
867            .expect("sweep dispatches");
868        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
869        let record = registry
870            .get_process("proc-sweep-skip")
871            .await
872            .expect("process exists");
873        assert!(
874            !record.is_terminal(),
875            "a live-leased process must not be re-run by the sweep"
876        );
877    }
878}
879
880#[cfg(test)]
881mod boundary_tests {
882    use super::*;
883    use crate::{
884        AttachmentStore, AttachmentStoreError, AttachmentStorePersistence, DurabilityTier,
885        DurableStoreFacet, InMemoryAttachmentStore, ProcessExecutionEnvRef,
886        ProcessExecutionEnvStore, ProcessInput, ProcessRegistration, RuntimeEffectController,
887        RuntimeError, StoredAttachment, TriggerStore,
888    };
889    use lash_sansio::{AttachmentCreateMeta, AttachmentId, AttachmentRef};
890
891    /// Effect controller that reports the durable tier; the worker boundary
892    /// only reads the tier, so the effect path is never exercised here.
893    #[derive(Default)]
894    struct DurableController;
895
896    impl crate::AwaitEventResolver for DurableController {
897        fn durability_tier(&self) -> DurabilityTier {
898            DurabilityTier::Durable
899        }
900    }
901
902    #[async_trait::async_trait]
903    impl RuntimeEffectController for DurableController {
904        async fn execute_effect(
905            &self,
906            _envelope: crate::RuntimeEffectEnvelope,
907            _local_executor: crate::RuntimeEffectLocalExecutor<'_>,
908        ) -> Result<crate::RuntimeEffectOutcome, crate::RuntimeEffectControllerError> {
909            unreachable!("worker boundary rejects before executing any effect")
910        }
911    }
912
913    /// Attachment store reporting a durable tier over in-memory storage.
914    #[derive(Default)]
915    struct DurableAttachmentStore {
916        inner: InMemoryAttachmentStore,
917    }
918
919    #[async_trait::async_trait]
920    impl AttachmentStore for DurableAttachmentStore {
921        fn persistence(&self) -> AttachmentStorePersistence {
922            AttachmentStorePersistence::Durable
923        }
924
925        async fn put(
926            &self,
927            bytes: Vec<u8>,
928            meta: AttachmentCreateMeta,
929        ) -> Result<AttachmentRef, AttachmentStoreError> {
930            self.inner.put(bytes, meta).await
931        }
932
933        async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
934            self.inner.get(id).await
935        }
936
937        async fn delete(&self, id: &AttachmentId) -> Result<(), AttachmentStoreError> {
938            self.inner.delete(id).await
939        }
940    }
941
942    /// Process env store reporting a durable tier over in-memory storage.
943    #[derive(Default)]
944    struct DurableProcessEnvStore {
945        inner: crate::InMemoryProcessExecutionEnvStore,
946    }
947
948    #[async_trait::async_trait]
949    impl ProcessExecutionEnvStore for DurableProcessEnvStore {
950        fn durability_tier(&self) -> DurabilityTier {
951            DurabilityTier::Durable
952        }
953
954        async fn put_process_execution_env(
955            &self,
956            env_ref: &ProcessExecutionEnvRef,
957            bytes: &[u8],
958        ) -> Result<(), PluginError> {
959            self.inner.put_process_execution_env(env_ref, bytes).await
960        }
961
962        async fn get_process_execution_env(
963            &self,
964            env_ref: &ProcessExecutionEnvRef,
965        ) -> Result<Option<Vec<u8>>, PluginError> {
966            self.inner.get_process_execution_env(env_ref).await
967        }
968    }
969
970    /// Session store factory whose declared tier is configurable; it never has
971    /// to create a store because the worker boundary rejects first.
972    struct TierSessionStoreFactory {
973        tier: DurabilityTier,
974    }
975
976    #[async_trait::async_trait]
977    impl SessionStoreFactory for TierSessionStoreFactory {
978        fn durability_tier(&self) -> DurabilityTier {
979            self.tier
980        }
981
982        async fn create_store(
983            &self,
984            _request: &crate::SessionStoreCreateRequest,
985        ) -> Result<Arc<dyn crate::RuntimePersistence>, String> {
986            unreachable!("worker boundary rejects before creating a session store")
987        }
988
989        async fn delete_session(&self, _session_id: &str) -> Result<(), String> {
990            Ok(())
991        }
992    }
993
994    struct TierTriggerStore {
995        tier: DurabilityTier,
996        inner: crate::InMemoryTriggerStore,
997    }
998
999    impl TierTriggerStore {
1000        fn new(tier: DurabilityTier) -> Self {
1001            Self {
1002                tier,
1003                inner: crate::InMemoryTriggerStore::default(),
1004            }
1005        }
1006    }
1007
1008    #[async_trait::async_trait]
1009    impl TriggerStore for TierTriggerStore {
1010        fn durability_tier(&self) -> DurabilityTier {
1011            self.tier
1012        }
1013
1014        async fn register_subscription(
1015            &self,
1016            draft: crate::TriggerSubscriptionDraft,
1017        ) -> Result<crate::TriggerSubscriptionRecord, PluginError> {
1018            self.inner.register_subscription(draft).await
1019        }
1020
1021        async fn list_subscriptions(
1022            &self,
1023            filter: crate::TriggerSubscriptionFilter,
1024        ) -> Result<Vec<crate::TriggerSubscriptionRecord>, PluginError> {
1025            self.inner.list_subscriptions(filter).await
1026        }
1027
1028        async fn cancel_subscription(
1029            &self,
1030            session_id: &str,
1031            handle: &str,
1032        ) -> Result<bool, PluginError> {
1033            self.inner.cancel_subscription(session_id, handle).await
1034        }
1035
1036        async fn delete_session_subscriptions(
1037            &self,
1038            session_id: &str,
1039        ) -> Result<usize, PluginError> {
1040            self.inner.delete_session_subscriptions(session_id).await
1041        }
1042
1043        async fn record_occurrence(
1044            &self,
1045            request: crate::TriggerOccurrenceRequest,
1046        ) -> Result<crate::TriggerOccurrenceRecord, PluginError> {
1047            self.inner.record_occurrence(request).await
1048        }
1049
1050        async fn reserve_matching_deliveries(
1051            &self,
1052            occurrence_id: &str,
1053        ) -> Result<Vec<crate::TriggerDeliveryReservation>, PluginError> {
1054            self.inner.reserve_matching_deliveries(occurrence_id).await
1055        }
1056    }
1057
1058    /// Build a worker whose controller is durable but whose stores can be set
1059    /// per-facet to durable/ephemeral, so each facet's loud rejection can be
1060    /// exercised independently.
1061    fn worker(
1062        attachment: Arc<dyn AttachmentStore>,
1063        process_env_store: Arc<dyn ProcessExecutionEnvStore>,
1064        session_store_tier: DurabilityTier,
1065    ) -> DurableProcessWorker {
1066        worker_with_store_tiers(
1067            attachment,
1068            process_env_store,
1069            session_store_tier,
1070            DurabilityTier::Durable,
1071            DurabilityTier::Durable,
1072        )
1073    }
1074
1075    fn worker_with_store_tiers(
1076        attachment: Arc<dyn AttachmentStore>,
1077        process_env_store: Arc<dyn ProcessExecutionEnvStore>,
1078        session_store_tier: DurabilityTier,
1079        process_registry_tier: DurabilityTier,
1080        trigger_store_tier: DurabilityTier,
1081    ) -> DurableProcessWorker {
1082        let mut runtime_host = RuntimeHostConfig::in_memory();
1083        runtime_host.control.effect_host =
1084            Arc::new(crate::InlineEffectHost::new(Arc::new(DurableController)));
1085        runtime_host.durability.attachment_store = attachment;
1086        runtime_host.durability.process_env_store = process_env_store;
1087        let plugin_host = Arc::new(crate::PluginHost::new(Vec::new()));
1088        let factory: Arc<dyn SessionStoreFactory> = Arc::new(TierSessionStoreFactory {
1089            tier: session_store_tier,
1090        });
1091        let registry: Arc<dyn ProcessRegistry> = Arc::new(
1092            crate::TestLocalProcessRegistry::default().with_durability_tier(process_registry_tier),
1093        );
1094        let trigger_store: Arc<dyn TriggerStore> =
1095            Arc::new(TierTriggerStore::new(trigger_store_tier));
1096        DurableProcessWorker::new(
1097            DurableProcessWorkerConfig::new(plugin_host, runtime_host, factory, registry)
1098                .with_trigger_store(trigger_store),
1099        )
1100    }
1101
1102    fn external_registration() -> ProcessRegistration {
1103        ProcessRegistration::new(
1104            "worker-boundary-process",
1105            ProcessInput::External {
1106                metadata: serde_json::json!({}),
1107            },
1108            crate::ProcessProvenance::host(),
1109        )
1110    }
1111
1112    async fn run(worker: &DurableProcessWorker) -> Result<ProcessAwaitOutput, PluginError> {
1113        worker
1114            .run_process(
1115                external_registration(),
1116                ProcessExecutionContext::default(),
1117                CancellationToken::new(),
1118            )
1119            .await
1120    }
1121
1122    fn assert_facet(err: PluginError, facet: DurableStoreFacet) {
1123        let PluginError::Session(message) = err else {
1124            panic!("expected PluginError::Session, got {err:?}");
1125        };
1126        let expected = RuntimeError::durable_store_required(facet).to_string();
1127        assert_eq!(message, expected, "worker must reject the {facet:?} facet");
1128    }
1129
1130    #[tokio::test]
1131    async fn durable_worker_rejects_ephemeral_attachment_store() {
1132        let worker = worker(
1133            Arc::new(InMemoryAttachmentStore::new()),
1134            Arc::new(DurableProcessEnvStore::default()),
1135            DurabilityTier::Durable,
1136        );
1137        let err = run(&worker)
1138            .await
1139            .expect_err("ephemeral attachment store must be rejected at the worker boundary");
1140        assert_facet(err, DurableStoreFacet::AttachmentStore);
1141    }
1142
1143    #[tokio::test]
1144    async fn durable_worker_rejects_ephemeral_process_env_store() {
1145        let worker = worker(
1146            Arc::new(DurableAttachmentStore::default()),
1147            Arc::new(crate::InMemoryProcessExecutionEnvStore::new()),
1148            DurabilityTier::Durable,
1149        );
1150        let err = run(&worker)
1151            .await
1152            .expect_err("ephemeral process env store must be rejected at the worker boundary");
1153        assert_facet(err, DurableStoreFacet::ProcessEnvStore);
1154    }
1155
1156    #[tokio::test]
1157    async fn durable_worker_rejects_ephemeral_session_store_factory() {
1158        let worker = worker(
1159            Arc::new(DurableAttachmentStore::default()),
1160            Arc::new(DurableProcessEnvStore::default()),
1161            DurabilityTier::Inline,
1162        );
1163        let err = run(&worker)
1164            .await
1165            .expect_err("ephemeral session store factory must be rejected at the worker boundary");
1166        assert_facet(err, DurableStoreFacet::SessionStore);
1167    }
1168
1169    #[tokio::test]
1170    async fn durable_worker_rejects_ephemeral_process_registry() {
1171        let worker = worker_with_store_tiers(
1172            Arc::new(DurableAttachmentStore::default()),
1173            Arc::new(DurableProcessEnvStore::default()),
1174            DurabilityTier::Durable,
1175            DurabilityTier::Inline,
1176            DurabilityTier::Durable,
1177        );
1178        let err = run(&worker)
1179            .await
1180            .expect_err("ephemeral process registry must be rejected at the worker boundary");
1181        assert_facet(err, DurableStoreFacet::ProcessRegistry);
1182    }
1183
1184    #[tokio::test]
1185    async fn durable_worker_rejects_ephemeral_trigger_store() {
1186        let worker = worker_with_store_tiers(
1187            Arc::new(DurableAttachmentStore::default()),
1188            Arc::new(DurableProcessEnvStore::default()),
1189            DurabilityTier::Durable,
1190            DurabilityTier::Durable,
1191            DurabilityTier::Inline,
1192        );
1193        let err = run(&worker)
1194            .await
1195            .expect_err("ephemeral trigger store must be rejected at the worker boundary");
1196        assert_facet(err, DurableStoreFacet::TriggerStore);
1197    }
1198
1199    #[tokio::test]
1200    async fn durable_worker_with_all_durable_stores_passes_store_facet_check() {
1201        // Positive control: a durable worker wired against fully durable stores
1202        // clears the store-facet guard and proceeds to run the (External)
1203        // process.
1204        let worker = worker(
1205            Arc::new(DurableAttachmentStore::default()),
1206            Arc::new(DurableProcessEnvStore::default()),
1207            DurabilityTier::Durable,
1208        );
1209        let output = run(&worker)
1210            .await
1211            .expect("all-durable worker should pass the store-facet guard");
1212        assert!(matches!(output, ProcessAwaitOutput::Success { .. }));
1213    }
1214
1215    #[tokio::test]
1216    async fn inline_worker_passes_store_facet_check_with_ephemeral_stores() {
1217        // Inline controllers impose no requirement, so an in-memory worker runs
1218        // unchanged — the durable-first guard must not regress inline hosts.
1219        let runtime_host = RuntimeHostConfig::in_memory();
1220        let plugin_host = Arc::new(crate::PluginHost::new(Vec::new()));
1221        let factory: Arc<dyn SessionStoreFactory> = Arc::new(TierSessionStoreFactory {
1222            tier: DurabilityTier::Inline,
1223        });
1224        let registry: Arc<dyn ProcessRegistry> =
1225            Arc::new(crate::TestLocalProcessRegistry::default());
1226        let worker = DurableProcessWorker::new(DurableProcessWorkerConfig::new(
1227            plugin_host,
1228            runtime_host,
1229            factory,
1230            registry,
1231        ));
1232        let output = run(&worker)
1233            .await
1234            .expect("inline worker should pass the store-facet guard");
1235        assert!(matches!(output, ProcessAwaitOutput::Success { .. }));
1236    }
1237}