Skip to main content

lash_core/runtime/
process_worker.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use tokio_util::sync::CancellationToken;
5
6use super::effect::ProcessRunner;
7use super::session_manager::RuntimeSessionServices;
8use super::{
9    EmbeddedRuntimeBuilder, ProcessWorkDriver, QueuedWorkDriver, RUNTIME_TURN_LEASE_TTL_MS,
10    RuntimeHostConfig,
11};
12use crate::{
13    InMemorySessionStore, LashRuntime, PluginError, PluginFactory, PluginHost, PluginStack,
14    ProcessAwaitOutput, ProcessExecutionContext, ProcessInput, ProcessLease,
15    ProcessLeaseCompletion, ProcessRecord, ProcessRegistration, ProcessRegistry,
16    SessionStoreFactory,
17};
18
19/// Deployment-local configuration for rebuilding durable process executions.
20///
21/// Process rows intentionally carry only portable process input and provenance.
22/// Workers provide plugins, providers, stores, secrets, and host capabilities
23/// for the deployment that owns those rows.
24#[derive(Clone)]
25pub struct DurableProcessWorkerConfig {
26    pub plugin_host: Arc<PluginHost>,
27    pub runtime_host: RuntimeHostConfig,
28    pub session_policy: crate::SessionPolicy,
29    pub session_store_factory: Arc<dyn SessionStoreFactory>,
30    pub process_registry: Arc<dyn ProcessRegistry>,
31    pub trigger_store: Arc<dyn crate::TriggerStore>,
32    pub process_work_driver: Option<ProcessWorkDriver>,
33    pub queued_work_driver: Option<QueuedWorkDriver>,
34    #[doc(hidden)]
35    pub turn_phase_probe_slot: crate::runtime::RuntimeTurnPhaseProbeSlot,
36    /// Residency for sessions the worker rebuilds to run a process. Defaults to
37    /// [`Residency::KeepAll`]; a host running [`Residency::ActivePathOnly`] wires
38    /// it here so the worker's rebuilt sessions trim to the active path too,
39    /// instead of silently diverging from the live runtime by keeping the full
40    /// graph resident.
41    pub residency: crate::Residency,
42}
43
44impl DurableProcessWorkerConfig {
45    pub fn new(
46        plugin_host: Arc<PluginHost>,
47        runtime_host: RuntimeHostConfig,
48        session_store_factory: Arc<dyn SessionStoreFactory>,
49        process_registry: Arc<dyn ProcessRegistry>,
50    ) -> Self {
51        let clock = Arc::clone(&runtime_host.clock);
52        Self {
53            plugin_host,
54            runtime_host,
55            session_policy: crate::SessionPolicy::default(),
56            session_store_factory,
57            process_registry,
58            trigger_store: Arc::new(crate::InMemoryTriggerStore::with_clock(clock)),
59            process_work_driver: None,
60            queued_work_driver: None,
61            turn_phase_probe_slot: crate::runtime::RuntimeTurnPhaseProbeSlot::default(),
62            residency: crate::Residency::default(),
63        }
64    }
65
66    pub fn with_trigger_store(mut self, store: Arc<dyn crate::TriggerStore>) -> Self {
67        self.trigger_store = store;
68        self
69    }
70
71    pub fn with_session_policy(mut self, policy: crate::SessionPolicy) -> Self {
72        self.session_policy = policy;
73        self
74    }
75
76    pub fn with_residency(mut self, residency: crate::Residency) -> Self {
77        self.residency = residency;
78        self
79    }
80
81    pub fn with_process_work_driver(mut self, driver: ProcessWorkDriver) -> Self {
82        self.process_work_driver = Some(driver);
83        self
84    }
85
86    pub fn with_queued_work_driver(mut self, driver: QueuedWorkDriver) -> Self {
87        self.queued_work_driver = Some(driver);
88        self
89    }
90
91    #[doc(hidden)]
92    pub fn with_turn_phase_probe_slot(
93        mut self,
94        slot: crate::runtime::RuntimeTurnPhaseProbeSlot,
95    ) -> Self {
96        self.turn_phase_probe_slot = slot;
97        self
98    }
99
100    pub fn from_plugin_factories(
101        plugin_factories: impl IntoIterator<Item = Arc<dyn PluginFactory>>,
102        runtime_host: RuntimeHostConfig,
103        session_store_factory: Arc<dyn SessionStoreFactory>,
104        process_registry: Arc<dyn ProcessRegistry>,
105    ) -> Self {
106        Self::new(
107            Arc::new(PluginHost::new(plugin_factories.into_iter().collect())),
108            runtime_host,
109            session_store_factory,
110            process_registry,
111        )
112    }
113
114    pub fn from_plugin_stack(
115        plugin_stack: PluginStack,
116        runtime_host: RuntimeHostConfig,
117        session_store_factory: Arc<dyn SessionStoreFactory>,
118        process_registry: Arc<dyn ProcessRegistry>,
119    ) -> Self {
120        Self::from_plugin_factories(
121            plugin_stack.into_factories(),
122            runtime_host,
123            session_store_factory,
124            process_registry,
125        )
126    }
127}
128
129/// Reconstructable background-process worker.
130#[derive(Clone)]
131pub struct DurableProcessWorker {
132    config: Arc<DurableProcessWorkerConfig>,
133}
134
135/// Why a recovery run did not produce a terminal outcome under the lease.
136enum RecoverFailure {
137    /// The lease was lost mid-run (another owner reclaimed an expired lease).
138    /// The losing worker must not write a terminal outcome — the new owner is
139    /// now the single writer.
140    LeaseLost(PluginError),
141    /// The process could not be run (rebuild/store-facet failure). The lease is
142    /// still held, so this worker terminalizes the row.
143    Run(PluginError),
144}
145
146impl DurableProcessWorker {
147    pub fn new(config: DurableProcessWorkerConfig) -> Self {
148        Self {
149            config: Arc::new(config),
150        }
151    }
152
153    pub fn from_shared_config(config: Arc<DurableProcessWorkerConfig>) -> Self {
154        Self { config }
155    }
156
157    pub fn config(&self) -> &DurableProcessWorkerConfig {
158        &self.config
159    }
160
161    pub async fn run_process(
162        &self,
163        registration: ProcessRegistration,
164        execution_context: ProcessExecutionContext,
165        cancellation: CancellationToken,
166    ) -> Result<ProcessAwaitOutput, PluginError> {
167        let scoped_effect_controller = self
168            .config
169            .runtime_host
170            .control
171            .effect_host
172            .scoped_static(crate::ExecutionScope::process(registration.id.clone()))
173            .map_err(|err| PluginError::Session(err.to_string()))?
174            .ok_or_else(|| {
175                PluginError::Session(
176                    "process worker effect host must provide a static process scope".to_string(),
177                )
178            })?;
179        self.run_process_with_scoped_effect_controller(
180            registration,
181            execution_context,
182            scoped_effect_controller,
183            cancellation,
184        )
185        .await
186    }
187
188    pub async fn run_process_with_scoped_effect_controller(
189        &self,
190        registration: ProcessRegistration,
191        execution_context: ProcessExecutionContext,
192        scoped_effect_controller: crate::ScopedEffectController<'_>,
193        cancellation: CancellationToken,
194    ) -> Result<ProcessAwaitOutput, PluginError> {
195        self.ensure_stable_process_id(&registration)?;
196        self.ensure_durable_store_facets()?;
197        if let ProcessInput::External { metadata } = registration.input.as_ref() {
198            return Ok(ProcessAwaitOutput::Success {
199                value: serde_json::json!({ "metadata": metadata.clone() }),
200                control: None,
201            });
202        }
203        let mut runtime = self.runtime_for_registration(&registration).await?;
204        let originator_scope = if let crate::ProcessOriginator::Session { scope } =
205            &registration.provenance.originator
206        {
207            Some(scope)
208        } else {
209            None
210        };
211        let probe_scope = registration.wake_target.as_ref().or(originator_scope);
212        if let Some(probe) =
213            probe_scope.and_then(|scope| self.config.turn_phase_probe_slot.get_for_scope(scope))
214        {
215            runtime.set_turn_phase_probe(probe);
216        }
217        let manager = RuntimeSessionServices::new(&runtime, true, None).map_err(|err| {
218            PluginError::Session(format!(
219                "failed to build runtime env for process `{}`: {err}",
220                registration.id
221            ))
222        })?;
223        Ok(manager
224            .run_process(
225                registration,
226                execution_context,
227                Arc::clone(&self.config.process_registry),
228                scoped_effect_controller,
229                cancellation,
230            )
231            .await)
232    }
233
234    /// Sweep the registry for non-terminal processes and re-execute the ones
235    /// this worker can claim, driving each to a terminal state.
236    ///
237    /// This is the crash-recovery counterpart to a worker that ran a process
238    /// from a live turn: a trigger/trigger-started process whose worker
239    /// died mid-flight is left non-terminal in the registry, and a subsequent
240    /// worker reopening that registry must finish it. The sweep:
241    ///
242    /// 1. lists every non-terminal process ([`ProcessRegistry::list_non_terminal`]);
243    /// 2. claims the durable single-owner [`ProcessLease`] over each — a process
244    ///    already leased live by *another* owner is skipped (it is being run by
245    ///    that owner right now), so a non-terminal process is re-run by exactly
246    ///    one owner (lease fencing);
247    /// 3. runs the claimed process on this worker's wired controller, renewing
248    ///    the lease across the long-running execution so a healthy recovery is
249    ///    not swept out from under itself;
250    /// 4. writes the terminal outcome and releases the lease.
251    ///
252    /// Idempotent by `process_id`: terminal processes are never in the worklist,
253    /// and a process that became terminal between the list and the claim is
254    /// detected after claiming and skipped, so re-running a recovery sweep does
255    /// not double-execute completed work.
256    pub async fn drive_pending_processes(&self) -> Result<(), PluginError> {
257        let records = self.config.process_registry.list_non_terminal().await?;
258        for record in records {
259            // Run each claimed process on its OWN lease-fenced task. A sequential
260            // drive that awaited each process to terminal would deadlock a process
261            // that blocks awaiting a nested child (`start child` then `await`, or a
262            // subagent fan-out): the one drive task would park inside the parent's
263            // await and never claim the child. Spawning frees the loop so a
264            // subsequent host-driven pass claims and runs the child, and the
265            // per-process `ProcessLease` fences concurrent owners — so spawning a
266            // task per pending row on every drive is idempotent (a row already
267            // running is skipped on claim conflict) and one failing row never
268            // aborts the rest of the sweep.
269            let worker = self.clone();
270            tokio::spawn(async move { worker.recover_process(record).await });
271        }
272        Ok(())
273    }
274
275    async fn recover_process(&self, record: ProcessRecord) {
276        let owner_id = format!("process-recovery-{}", uuid::Uuid::new_v4());
277        let process_id = record.id.clone();
278        // Skip if held live by another owner: a claim conflict means a worker is
279        // already running this process, so re-running here would violate the
280        // single-owner contract. Treat any claim failure as "leased elsewhere".
281        let Ok(lease) = self
282            .config
283            .process_registry
284            .claim_process_lease(&process_id, &owner_id, RUNTIME_TURN_LEASE_TTL_MS)
285            .await
286        else {
287            return;
288        };
289        // The process may have reached a terminal state between the list and the
290        // claim. Idempotent by process_id: do not re-execute a finished process.
291        if self
292            .config
293            .process_registry
294            .get_process(&process_id)
295            .await
296            .is_some_and(|current| current.is_terminal())
297        {
298            self.release_or_log(&lease).await;
299            return;
300        }
301        let registration = ProcessRegistration {
302            id: record.id,
303            input: record.input,
304            identity: record.identity,
305            event_types: record.event_types,
306            provenance: record.provenance.clone(),
307            env_ref: record.env_ref.clone(),
308            wake_target: record.wake_target.clone(),
309        };
310        let execution_context = ProcessExecutionContext::default();
311        match self
312            .run_process_with_lease_renewal(registration, execution_context, lease.clone())
313            .await
314        {
315            // Ran to a terminal outcome (success or a process-level failure) while
316            // holding the lease: this owner is the single writer of the terminal.
317            Ok(output) => self.complete_and_release(&lease, &process_id, output).await,
318            // The lease was lost mid-run — another owner reclaimed the expired
319            // lease and is now running this process. Do NOT write a terminal
320            // outcome or release the lease: that would race the new owner and
321            // could record a succeeded process as Failed. Leave the row to the
322            // lease holder; it will finish (or another sweep retries it).
323            Err(RecoverFailure::LeaseLost(err)) => {
324                tracing::warn!(
325                    process_id = %process_id,
326                    error = %err,
327                    "process recovery lost its lease mid-run; deferring to the new owner",
328                );
329            }
330            // The process could not be run at all (rebuild/store-facet failure):
331            // terminalize as a recovery failure so the row leaves the worklist.
332            Err(RecoverFailure::Run(err)) => {
333                let output = ProcessAwaitOutput::Failure {
334                    class: crate::ToolFailureClass::Execution,
335                    code: "process_recovery_failed".to_string(),
336                    message: err.to_string(),
337                    raw: None,
338                    control: None,
339                };
340                self.complete_and_release(&lease, &process_id, output).await;
341            }
342        }
343    }
344
345    /// Write a recovered process's terminal outcome (the running lease owner is
346    /// the single writer) and then release the lease, logging either failure
347    /// rather than aborting — the lease's TTL is the backstop.
348    async fn complete_and_release(
349        &self,
350        lease: &ProcessLease,
351        process_id: &str,
352        output: ProcessAwaitOutput,
353    ) {
354        // Fence the terminal write: re-confirm the lease immediately before
355        // writing. `renew_process_lease` is rejected (by owner/lease_token/
356        // fencing_token) if another owner has reclaimed an expired lease, and on
357        // success extends the window so the back-to-back write lands inside the
358        // owned interval. A worker that stalled past its TTL therefore cannot
359        // overwrite the new owner's outcome — it defers instead.
360        let fenced = match self
361            .config
362            .process_registry
363            .renew_process_lease(lease, RUNTIME_TURN_LEASE_TTL_MS)
364            .await
365        {
366            Ok(renewed) => renewed,
367            Err(err) => {
368                tracing::warn!(
369                    process_id = %process_id,
370                    error = %err,
371                    "lost process lease before terminal write; deferring to the new owner",
372                );
373                return;
374            }
375        };
376        if let Err(err) = self
377            .config
378            .process_registry
379            .complete_process(process_id, output)
380            .await
381        {
382            tracing::warn!(
383                process_id = %process_id,
384                error = %err,
385                "failed to write recovered process terminal outcome",
386            );
387        }
388        self.release_or_log(&fenced).await;
389    }
390
391    async fn release_or_log(&self, lease: &ProcessLease) {
392        if let Err(err) = self.release_process_lease(lease).await {
393            tracing::warn!(
394                process_id = %lease.process_id,
395                error = %err,
396                "failed to release recovered process lease",
397            );
398        }
399    }
400
401    /// Run a recovered process while renewing its lease across the execution,
402    /// mirroring the turn-lease renewal that keeps a long-running effect's lease
403    /// from expiring under the live owner.
404    async fn run_process_with_lease_renewal(
405        &self,
406        registration: ProcessRegistration,
407        execution_context: ProcessExecutionContext,
408        mut lease: ProcessLease,
409    ) -> Result<ProcessAwaitOutput, RecoverFailure> {
410        let process_id = registration.id.clone();
411        let cancellation = CancellationToken::new();
412        let cancel_watcher = {
413            let registry = Arc::clone(&self.config.process_registry);
414            let process_id = process_id.clone();
415            let cancellation = cancellation.clone();
416            tokio::spawn(async move {
417                match registry
418                    .wait_event_after(&process_id, "process.cancel_requested", 0)
419                    .await
420                {
421                    Ok(_) => cancellation.cancel(),
422                    Err(err) => tracing::warn!(
423                        process_id = %process_id,
424                        error = %err,
425                        "process cancel watcher stopped before observing cancellation",
426                    ),
427                }
428            })
429        };
430        let pending = self.run_process(registration, execution_context, cancellation.clone());
431        tokio::pin!(pending);
432        loop {
433            tokio::select! {
434                outcome = &mut pending => {
435                    cancel_watcher.abort();
436                    return outcome.map_err(RecoverFailure::Run);
437                }
438                _ = self.config.runtime_host.clock.sleep(process_lease_renew_interval()) => {
439                    match self
440                        .config
441                        .process_registry
442                        .renew_process_lease(&lease, RUNTIME_TURN_LEASE_TTL_MS)
443                        .await
444                    {
445                        Ok(renewed) => lease = renewed,
446                        Err(err) => {
447                            cancellation.cancel();
448                            cancel_watcher.abort();
449                            return Err(RecoverFailure::LeaseLost(err));
450                        }
451                    }
452                }
453            }
454        }
455    }
456
457    async fn release_process_lease(&self, lease: &ProcessLease) -> Result<(), PluginError> {
458        self.config
459            .process_registry
460            .complete_process_lease(&ProcessLeaseCompletion::from_lease(lease))
461            .await
462    }
463
464    pub async fn request_process_cancel(
465        &self,
466        process_id: &str,
467        reason: Option<String>,
468    ) -> Result<(), PluginError> {
469        self.config
470            .process_registry
471            .append_event(
472                process_id,
473                crate::ProcessEventAppendRequest::cancel_requested(process_id, reason),
474            )
475            .await
476            .map(|_| ())
477    }
478
479    async fn runtime_for_registration(
480        &self,
481        registration: &ProcessRegistration,
482    ) -> Result<LashRuntime, PluginError> {
483        match registration.input.as_ref() {
484            ProcessInput::SessionTurn { create_request, .. } => {
485                self.runtime_for_session_turn(registration, create_request.as_ref())
486                    .await
487            }
488            ProcessInput::ToolCall { .. } | ProcessInput::Engine { .. } => {
489                self.runtime_for_process_env(registration).await
490            }
491            ProcessInput::External { .. } => unreachable!("external processes short-circuit"),
492        }
493    }
494
495    async fn runtime_for_session_turn(
496        &self,
497        registration: &ProcessRegistration,
498        create_request: &crate::SessionCreateRequest,
499    ) -> Result<LashRuntime, PluginError> {
500        let mut policy = create_request
501            .policy
502            .clone()
503            .unwrap_or_else(|| self.config.session_policy.clone());
504        if policy.recorded_provider_id().is_empty() {
505            policy.provider_id = self.config.session_policy.provider_id.clone();
506        }
507        self.build_ephemeral_runtime(
508            format!("process-session-turn:{}", registration.id),
509            policy,
510            create_request.plugin_options.clone(),
511            "session turn request",
512        )
513        .await
514    }
515
516    async fn runtime_for_process_env(
517        &self,
518        registration: &ProcessRegistration,
519    ) -> Result<LashRuntime, PluginError> {
520        let Some(env_ref) = registration.env_ref.as_ref() else {
521            return Err(PluginError::Session(format!(
522                "process `{}` is missing a captured execution env",
523                registration.id
524            )));
525        };
526        let env = crate::load_process_execution_env(
527            self.config
528                .runtime_host
529                .durability
530                .process_env_store
531                .as_ref(),
532            env_ref,
533        )
534        .await?;
535        self.build_ephemeral_runtime(
536            format!("process-env:{}", registration.id),
537            env.policy,
538            env.plugin_options,
539            env_ref.as_str(),
540        )
541        .await
542    }
543
544    async fn build_ephemeral_runtime(
545        &self,
546        session_id: String,
547        policy: crate::SessionPolicy,
548        plugin_options: crate::PluginOptions,
549        source_label: &str,
550    ) -> Result<LashRuntime, PluginError> {
551        let store = Arc::new(InMemorySessionStore::default());
552        let process_work_driver = self.config.process_work_driver.clone().unwrap_or_else(|| {
553            ProcessWorkDriver::inline(Arc::clone(&self.config.process_registry), self.clone())
554        });
555        let mut builder = EmbeddedRuntimeBuilder::new()
556            .with_session_id(session_id.to_string())
557            .with_plugin_host(self.config.plugin_host.as_ref().clone())
558            .with_runtime_host(self.config.runtime_host.clone())
559            .with_policy(policy)
560            .with_plugin_options(plugin_options)
561            .with_session_store_factory(Arc::clone(&self.config.session_store_factory))
562            .with_trigger_store(Arc::clone(&self.config.trigger_store))
563            .with_process_registry(Arc::clone(&self.config.process_registry))
564            .with_process_work_driver(process_work_driver)
565            .with_residency(self.config.residency)
566            .with_store(store);
567        if let Some(driver) = self.config.queued_work_driver.clone() {
568            builder = builder.with_queued_work_driver(driver);
569        }
570        builder.build().await.map_err(|err| {
571            PluginError::Session(format!(
572                "failed to build process worker runtime for {source_label}: {err}"
573            ))
574        })
575    }
576
577    /// Enforce the durable-first wiring invariant at the worker process-run
578    /// boundary: when the worker was wired with a durable effect host, every
579    /// store it will execute against must also be durable. A durable host
580    /// running against any ephemeral store fails loudly here rather than
581    /// silently re-executing a process against non-durable state.
582    ///
583    /// Inline controllers (the default tier) impose no requirement, so
584    /// inline/in-memory workers pass unchanged.
585    fn ensure_durable_store_facets(&self) -> Result<(), PluginError> {
586        if self
587            .config
588            .runtime_host
589            .control
590            .effect_host
591            .durability_tier()
592            != crate::DurabilityTier::Durable
593        {
594            return Ok(());
595        }
596        let require = |facet: crate::DurableStoreFacet| {
597            PluginError::Session(crate::RuntimeError::durable_store_required(facet).to_string())
598        };
599        if self
600            .config
601            .runtime_host
602            .durability
603            .attachment_store
604            .persistence()
605            .durability_tier()
606            != crate::DurabilityTier::Durable
607        {
608            return Err(require(crate::DurableStoreFacet::AttachmentStore));
609        }
610        if self
611            .config
612            .runtime_host
613            .durability
614            .process_env_store
615            .durability_tier()
616            != crate::DurabilityTier::Durable
617        {
618            return Err(require(crate::DurableStoreFacet::ProcessEnvStore));
619        }
620        if self.config.session_store_factory.durability_tier() != crate::DurabilityTier::Durable {
621            return Err(require(crate::DurableStoreFacet::SessionStore));
622        }
623        if self.config.process_registry.durability_tier() != crate::DurabilityTier::Durable {
624            return Err(require(crate::DurableStoreFacet::ProcessRegistry));
625        }
626        if self.config.trigger_store.durability_tier() != crate::DurabilityTier::Durable {
627            return Err(require(crate::DurableStoreFacet::TriggerStore));
628        }
629        Ok(())
630    }
631
632    /// Enforce the stable-process-id invariant at every (re-)execution: process
633    /// execution identity is the persisted `process_id`, so a retry — a Restate
634    /// `run` re-invocation (keyed `LashProcessWorkflow/{process_id}`) or a
635    /// recovery sweep re-running a non-terminal row — must present that stable
636    /// id. An empty/fresh id has lost its idempotency anchor and is rejected
637    /// loudly here, mirroring how `ExecutionScope` rejects an
638    /// empty turn id at the durable-effect boundary.
639    fn ensure_stable_process_id(
640        &self,
641        registration: &ProcessRegistration,
642    ) -> Result<(), PluginError> {
643        if registration.id.trim().is_empty() {
644            return Err(PluginError::Session(
645                crate::RuntimeError::missing_process_execution_id().to_string(),
646            ));
647        }
648        Ok(())
649    }
650}
651
652fn process_lease_renew_interval() -> Duration {
653    Duration::from_millis(process_lease_renew_interval_ms())
654}
655
656#[cfg(test)]
657fn process_lease_renew_interval_ms() -> u64 {
658    25
659}
660
661#[cfg(not(test))]
662fn process_lease_renew_interval_ms() -> u64 {
663    30_000
664}
665
666#[cfg(test)]
667mod boundary_tests {
668    use super::*;
669    use crate::{
670        AttachmentStore, AttachmentStoreError, AttachmentStorePersistence, DurabilityTier,
671        DurableStoreFacet, InMemoryAttachmentStore, ProcessExecutionEnvRef,
672        ProcessExecutionEnvStore, ProcessInput, ProcessRegistration, RuntimeEffectController,
673        RuntimeError, StoredAttachment, TriggerStore,
674    };
675    use lash_sansio::{AttachmentCreateMeta, AttachmentId, AttachmentRef};
676
677    /// Effect controller that reports the durable tier; the worker boundary
678    /// only reads the tier, so the effect path is never exercised here.
679    #[derive(Default)]
680    struct DurableController;
681
682    #[async_trait::async_trait]
683    impl RuntimeEffectController for DurableController {
684        fn durability_tier(&self) -> DurabilityTier {
685            DurabilityTier::Durable
686        }
687
688        async fn execute_effect(
689            &self,
690            _envelope: crate::RuntimeEffectEnvelope,
691            _local_executor: crate::RuntimeEffectLocalExecutor<'_>,
692        ) -> Result<crate::RuntimeEffectOutcome, crate::RuntimeEffectControllerError> {
693            unreachable!("worker boundary rejects before executing any effect")
694        }
695    }
696
697    /// Attachment store reporting a durable tier over in-memory storage.
698    #[derive(Default)]
699    struct DurableAttachmentStore {
700        inner: InMemoryAttachmentStore,
701    }
702
703    #[async_trait::async_trait]
704    impl AttachmentStore for DurableAttachmentStore {
705        fn persistence(&self) -> AttachmentStorePersistence {
706            AttachmentStorePersistence::Durable
707        }
708
709        async fn put(
710            &self,
711            bytes: Vec<u8>,
712            meta: AttachmentCreateMeta,
713        ) -> Result<AttachmentRef, AttachmentStoreError> {
714            self.inner.put(bytes, meta).await
715        }
716
717        async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
718            self.inner.get(id).await
719        }
720    }
721
722    /// Process env store reporting a durable tier over in-memory storage.
723    #[derive(Default)]
724    struct DurableProcessEnvStore {
725        inner: crate::InMemoryProcessExecutionEnvStore,
726    }
727
728    #[async_trait::async_trait]
729    impl ProcessExecutionEnvStore for DurableProcessEnvStore {
730        fn durability_tier(&self) -> DurabilityTier {
731            DurabilityTier::Durable
732        }
733
734        async fn put_process_execution_env(
735            &self,
736            env_ref: &ProcessExecutionEnvRef,
737            bytes: &[u8],
738        ) -> Result<(), PluginError> {
739            self.inner.put_process_execution_env(env_ref, bytes).await
740        }
741
742        async fn get_process_execution_env(
743            &self,
744            env_ref: &ProcessExecutionEnvRef,
745        ) -> Result<Option<Vec<u8>>, PluginError> {
746            self.inner.get_process_execution_env(env_ref).await
747        }
748    }
749
750    /// Session store factory whose declared tier is configurable; it never has
751    /// to create a store because the worker boundary rejects first.
752    struct TierSessionStoreFactory {
753        tier: DurabilityTier,
754    }
755
756    #[async_trait::async_trait]
757    impl SessionStoreFactory for TierSessionStoreFactory {
758        fn durability_tier(&self) -> DurabilityTier {
759            self.tier
760        }
761
762        async fn create_store(
763            &self,
764            _request: &crate::SessionStoreCreateRequest,
765        ) -> Result<Arc<dyn crate::RuntimePersistence>, String> {
766            unreachable!("worker boundary rejects before creating a session store")
767        }
768
769        async fn delete_session(&self, _session_id: &str) -> Result<(), String> {
770            Ok(())
771        }
772    }
773
774    struct TierTriggerStore {
775        tier: DurabilityTier,
776        inner: crate::InMemoryTriggerStore,
777    }
778
779    impl TierTriggerStore {
780        fn new(tier: DurabilityTier) -> Self {
781            Self {
782                tier,
783                inner: crate::InMemoryTriggerStore::default(),
784            }
785        }
786    }
787
788    #[async_trait::async_trait]
789    impl TriggerStore for TierTriggerStore {
790        fn durability_tier(&self) -> DurabilityTier {
791            self.tier
792        }
793
794        async fn register_subscription(
795            &self,
796            draft: crate::TriggerSubscriptionDraft,
797        ) -> Result<crate::TriggerSubscriptionRecord, PluginError> {
798            self.inner.register_subscription(draft).await
799        }
800
801        async fn list_subscriptions(
802            &self,
803            filter: crate::TriggerSubscriptionFilter,
804        ) -> Result<Vec<crate::TriggerSubscriptionRecord>, PluginError> {
805            self.inner.list_subscriptions(filter).await
806        }
807
808        async fn cancel_subscription(
809            &self,
810            session_id: &str,
811            handle: &str,
812        ) -> Result<bool, PluginError> {
813            self.inner.cancel_subscription(session_id, handle).await
814        }
815
816        async fn delete_session_subscriptions(
817            &self,
818            session_id: &str,
819        ) -> Result<usize, PluginError> {
820            self.inner.delete_session_subscriptions(session_id).await
821        }
822
823        async fn record_occurrence(
824            &self,
825            request: crate::TriggerOccurrenceRequest,
826        ) -> Result<crate::TriggerOccurrenceRecord, PluginError> {
827            self.inner.record_occurrence(request).await
828        }
829
830        async fn reserve_matching_deliveries(
831            &self,
832            occurrence_id: &str,
833        ) -> Result<Vec<crate::TriggerDeliveryReservation>, PluginError> {
834            self.inner.reserve_matching_deliveries(occurrence_id).await
835        }
836    }
837
838    /// Build a worker whose controller is durable but whose stores can be set
839    /// per-facet to durable/ephemeral, so each facet's loud rejection can be
840    /// exercised independently.
841    fn worker(
842        attachment: Arc<dyn AttachmentStore>,
843        process_env_store: Arc<dyn ProcessExecutionEnvStore>,
844        session_store_tier: DurabilityTier,
845    ) -> DurableProcessWorker {
846        worker_with_store_tiers(
847            attachment,
848            process_env_store,
849            session_store_tier,
850            DurabilityTier::Durable,
851            DurabilityTier::Durable,
852        )
853    }
854
855    fn worker_with_store_tiers(
856        attachment: Arc<dyn AttachmentStore>,
857        process_env_store: Arc<dyn ProcessExecutionEnvStore>,
858        session_store_tier: DurabilityTier,
859        process_registry_tier: DurabilityTier,
860        trigger_store_tier: DurabilityTier,
861    ) -> DurableProcessWorker {
862        let mut runtime_host = RuntimeHostConfig::in_memory();
863        runtime_host.control.effect_host =
864            Arc::new(crate::InlineEffectHost::new(Arc::new(DurableController)));
865        runtime_host.durability.attachment_store = attachment;
866        runtime_host.durability.process_env_store = process_env_store;
867        let plugin_host = Arc::new(crate::PluginHost::new(Vec::new()));
868        let factory: Arc<dyn SessionStoreFactory> = Arc::new(TierSessionStoreFactory {
869            tier: session_store_tier,
870        });
871        let registry: Arc<dyn ProcessRegistry> = Arc::new(
872            crate::TestLocalProcessRegistry::default().with_durability_tier(process_registry_tier),
873        );
874        let trigger_store: Arc<dyn TriggerStore> =
875            Arc::new(TierTriggerStore::new(trigger_store_tier));
876        DurableProcessWorker::new(
877            DurableProcessWorkerConfig::new(plugin_host, runtime_host, factory, registry)
878                .with_trigger_store(trigger_store),
879        )
880    }
881
882    fn external_registration() -> ProcessRegistration {
883        ProcessRegistration::new(
884            "worker-boundary-process",
885            ProcessInput::External {
886                metadata: serde_json::json!({}),
887            },
888            crate::ProcessProvenance::host(),
889        )
890    }
891
892    async fn run(worker: &DurableProcessWorker) -> Result<ProcessAwaitOutput, PluginError> {
893        worker
894            .run_process(
895                external_registration(),
896                ProcessExecutionContext::default(),
897                CancellationToken::new(),
898            )
899            .await
900    }
901
902    fn assert_facet(err: PluginError, facet: DurableStoreFacet) {
903        let PluginError::Session(message) = err else {
904            panic!("expected PluginError::Session, got {err:?}");
905        };
906        let expected = RuntimeError::durable_store_required(facet).to_string();
907        assert_eq!(message, expected, "worker must reject the {facet:?} facet");
908    }
909
910    #[tokio::test]
911    async fn durable_worker_rejects_ephemeral_attachment_store() {
912        let worker = worker(
913            Arc::new(InMemoryAttachmentStore::new()),
914            Arc::new(DurableProcessEnvStore::default()),
915            DurabilityTier::Durable,
916        );
917        let err = run(&worker)
918            .await
919            .expect_err("ephemeral attachment store must be rejected at the worker boundary");
920        assert_facet(err, DurableStoreFacet::AttachmentStore);
921    }
922
923    #[tokio::test]
924    async fn durable_worker_rejects_ephemeral_process_env_store() {
925        let worker = worker(
926            Arc::new(DurableAttachmentStore::default()),
927            Arc::new(crate::InMemoryProcessExecutionEnvStore::new()),
928            DurabilityTier::Durable,
929        );
930        let err = run(&worker)
931            .await
932            .expect_err("ephemeral process env store must be rejected at the worker boundary");
933        assert_facet(err, DurableStoreFacet::ProcessEnvStore);
934    }
935
936    #[tokio::test]
937    async fn durable_worker_rejects_ephemeral_session_store_factory() {
938        let worker = worker(
939            Arc::new(DurableAttachmentStore::default()),
940            Arc::new(DurableProcessEnvStore::default()),
941            DurabilityTier::Inline,
942        );
943        let err = run(&worker)
944            .await
945            .expect_err("ephemeral session store factory must be rejected at the worker boundary");
946        assert_facet(err, DurableStoreFacet::SessionStore);
947    }
948
949    #[tokio::test]
950    async fn durable_worker_rejects_ephemeral_process_registry() {
951        let worker = worker_with_store_tiers(
952            Arc::new(DurableAttachmentStore::default()),
953            Arc::new(DurableProcessEnvStore::default()),
954            DurabilityTier::Durable,
955            DurabilityTier::Inline,
956            DurabilityTier::Durable,
957        );
958        let err = run(&worker)
959            .await
960            .expect_err("ephemeral process registry must be rejected at the worker boundary");
961        assert_facet(err, DurableStoreFacet::ProcessRegistry);
962    }
963
964    #[tokio::test]
965    async fn durable_worker_rejects_ephemeral_trigger_store() {
966        let worker = worker_with_store_tiers(
967            Arc::new(DurableAttachmentStore::default()),
968            Arc::new(DurableProcessEnvStore::default()),
969            DurabilityTier::Durable,
970            DurabilityTier::Durable,
971            DurabilityTier::Inline,
972        );
973        let err = run(&worker)
974            .await
975            .expect_err("ephemeral trigger store must be rejected at the worker boundary");
976        assert_facet(err, DurableStoreFacet::TriggerStore);
977    }
978
979    #[tokio::test]
980    async fn durable_worker_with_all_durable_stores_passes_store_facet_check() {
981        // Positive control: a durable worker wired against fully durable stores
982        // clears the store-facet guard and proceeds to run the (External)
983        // process.
984        let worker = worker(
985            Arc::new(DurableAttachmentStore::default()),
986            Arc::new(DurableProcessEnvStore::default()),
987            DurabilityTier::Durable,
988        );
989        let output = run(&worker)
990            .await
991            .expect("all-durable worker should pass the store-facet guard");
992        assert!(matches!(output, ProcessAwaitOutput::Success { .. }));
993    }
994
995    #[tokio::test]
996    async fn inline_worker_passes_store_facet_check_with_ephemeral_stores() {
997        // Inline controllers impose no requirement, so an in-memory worker runs
998        // unchanged — the durable-first guard must not regress inline hosts.
999        let runtime_host = RuntimeHostConfig::in_memory();
1000        let plugin_host = Arc::new(crate::PluginHost::new(Vec::new()));
1001        let factory: Arc<dyn SessionStoreFactory> = Arc::new(TierSessionStoreFactory {
1002            tier: DurabilityTier::Inline,
1003        });
1004        let registry: Arc<dyn ProcessRegistry> =
1005            Arc::new(crate::TestLocalProcessRegistry::default());
1006        let worker = DurableProcessWorker::new(DurableProcessWorkerConfig::new(
1007            plugin_host,
1008            runtime_host,
1009            factory,
1010            registry,
1011        ));
1012        let output = run(&worker)
1013            .await
1014            .expect("inline worker should pass the store-facet guard");
1015        assert!(matches!(output, ProcessAwaitOutput::Success { .. }));
1016    }
1017}