Skip to main content

lash_core/runtime/
process_worker.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use tokio_util::sync::CancellationToken;
5
6use super::effect::ProcessRunner;
7use super::session_manager::RuntimeSessionServices;
8use super::{EmbeddedRuntimeBuilder, RUNTIME_TURN_LEASE_TTL_MS, RuntimeHostConfig};
9use crate::{
10    LashRuntime, PluginError, PluginFactory, PluginHost, PluginStack, ProcessAwaitOutput,
11    ProcessExecutionContext, ProcessInput, ProcessLease, ProcessLeaseCompletion, ProcessRecord,
12    ProcessRegistration, ProcessRegistry, SessionStoreCreateRequest, SessionStoreFactory,
13};
14
15/// Deployment-local configuration for rebuilding durable process executions.
16///
17/// Process rows intentionally carry only portable process input and provenance.
18/// Workers provide the host profile, plugins, providers, stores, secrets, and
19/// host capabilities for the deployment that owns those rows.
20#[derive(Clone)]
21pub struct DurableProcessWorkerConfig {
22    pub plugin_host: Arc<PluginHost>,
23    pub runtime_host: RuntimeHostConfig,
24    pub session_policy: crate::SessionPolicy,
25    pub session_store_factory: Arc<dyn SessionStoreFactory>,
26    pub process_registry: Arc<dyn ProcessRegistry>,
27    /// Residency for sessions the worker rebuilds to run a process. Defaults to
28    /// [`Residency::KeepAll`]; a host running [`Residency::ActivePathOnly`] wires
29    /// it here so the worker's rebuilt sessions trim to the active path too,
30    /// instead of silently diverging from the live runtime by keeping the full
31    /// graph resident.
32    pub residency: crate::Residency,
33}
34
35impl DurableProcessWorkerConfig {
36    pub fn new(
37        plugin_host: Arc<PluginHost>,
38        runtime_host: RuntimeHostConfig,
39        session_store_factory: Arc<dyn SessionStoreFactory>,
40        process_registry: Arc<dyn ProcessRegistry>,
41    ) -> Self {
42        Self {
43            plugin_host,
44            runtime_host,
45            session_policy: crate::SessionPolicy::default(),
46            session_store_factory,
47            process_registry,
48            residency: crate::Residency::default(),
49        }
50    }
51
52    pub fn with_session_policy(mut self, policy: crate::SessionPolicy) -> Self {
53        self.session_policy = policy;
54        self
55    }
56
57    pub fn with_residency(mut self, residency: crate::Residency) -> Self {
58        self.residency = residency;
59        self
60    }
61
62    pub fn from_plugin_factories(
63        plugin_factories: impl IntoIterator<Item = Arc<dyn PluginFactory>>,
64        runtime_host: RuntimeHostConfig,
65        session_store_factory: Arc<dyn SessionStoreFactory>,
66        process_registry: Arc<dyn ProcessRegistry>,
67    ) -> Self {
68        Self::new(
69            Arc::new(PluginHost::new(plugin_factories.into_iter().collect())),
70            runtime_host,
71            session_store_factory,
72            process_registry,
73        )
74    }
75
76    pub fn from_plugin_stack(
77        plugin_stack: PluginStack,
78        runtime_host: RuntimeHostConfig,
79        session_store_factory: Arc<dyn SessionStoreFactory>,
80        process_registry: Arc<dyn ProcessRegistry>,
81    ) -> Self {
82        Self::from_plugin_factories(
83            plugin_stack.into_factories(),
84            runtime_host,
85            session_store_factory,
86            process_registry,
87        )
88    }
89}
90
91/// Reconstructable background-process worker.
92#[derive(Clone)]
93pub struct DurableProcessWorker {
94    config: Arc<DurableProcessWorkerConfig>,
95}
96
97/// Why a recovery run did not produce a terminal outcome under the lease.
98enum RecoverFailure {
99    /// The lease was lost mid-run (another owner reclaimed an expired lease).
100    /// The losing worker must not write a terminal outcome — the new owner is
101    /// now the single writer.
102    LeaseLost(PluginError),
103    /// The process could not be run (rebuild/store-facet failure). The lease is
104    /// still held, so this worker terminalizes the row.
105    Run(PluginError),
106}
107
108impl DurableProcessWorker {
109    pub fn new(config: DurableProcessWorkerConfig) -> Self {
110        Self {
111            config: Arc::new(config),
112        }
113    }
114
115    pub fn from_shared_config(config: Arc<DurableProcessWorkerConfig>) -> Self {
116        Self { config }
117    }
118
119    pub fn config(&self) -> &DurableProcessWorkerConfig {
120        &self.config
121    }
122
123    pub async fn run_process(
124        &self,
125        registration: ProcessRegistration,
126        execution_context: ProcessExecutionContext,
127        cancellation: CancellationToken,
128    ) -> Result<ProcessAwaitOutput, PluginError> {
129        let scoped_effect_controller = self
130            .config
131            .runtime_host
132            .control
133            .effect_host
134            .scoped_static(crate::EffectScope::process(registration.id.clone()))
135            .map_err(|err| PluginError::Session(err.to_string()))?
136            .ok_or_else(|| {
137                PluginError::Session(
138                    "process worker effect host must provide a static process scope".to_string(),
139                )
140            })?;
141        self.run_process_with_scoped_effect_controller(
142            registration,
143            execution_context,
144            scoped_effect_controller,
145            cancellation,
146        )
147        .await
148    }
149
150    pub async fn run_process_with_scoped_effect_controller(
151        &self,
152        registration: ProcessRegistration,
153        execution_context: ProcessExecutionContext,
154        scoped_effect_controller: crate::ScopedEffectController<'_>,
155        cancellation: CancellationToken,
156    ) -> Result<ProcessAwaitOutput, PluginError> {
157        self.ensure_stable_process_id(&registration)?;
158        self.ensure_host_profile_matches(&registration)?;
159        self.ensure_durable_store_facets()?;
160        if let ProcessInput::External { metadata } = registration.input.as_ref() {
161            return Ok(ProcessAwaitOutput::Success {
162                value: serde_json::json!({ "metadata": metadata.clone() }),
163                control: None,
164            });
165        }
166        let session_id = registration.provenance.owner_scope.session_id.as_str();
167        if session_id.is_empty() {
168            return Err(PluginError::Session(format!(
169                "process `{}` is missing a structured owner scope",
170                registration.id
171            )));
172        }
173        let runtime = self.rebuild_runtime(session_id).await?;
174        let manager = RuntimeSessionServices::new(&runtime, true, None).map_err(|err| {
175            PluginError::Session(format!(
176                "failed to rebuild runtime session `{session_id}` for process `{}`: {err}",
177                registration.id
178            ))
179        })?;
180        Ok(manager
181            .run_process(
182                registration,
183                execution_context,
184                Arc::clone(&self.config.process_registry),
185                scoped_effect_controller,
186                cancellation,
187            )
188            .await)
189    }
190
191    /// Sweep the registry for non-terminal processes and re-execute the ones
192    /// this worker can claim, driving each to a terminal state.
193    ///
194    /// This is the crash-recovery counterpart to a worker that ran a process
195    /// from a live turn: a trigger/host-event-started process whose worker
196    /// died mid-flight is left non-terminal in the registry, and a subsequent
197    /// worker reopening that registry must finish it. The sweep:
198    ///
199    /// 1. lists every non-terminal process ([`ProcessRegistry::list_non_terminal`]);
200    /// 2. claims the durable single-owner [`ProcessLease`] over each — a process
201    ///    already leased live by *another* owner is skipped (it is being run by
202    ///    that owner right now), so a non-terminal process is re-run by exactly
203    ///    one owner (lease fencing);
204    /// 3. runs the claimed process on this worker's wired controller, renewing
205    ///    the lease across the long-running execution so a healthy recovery is
206    ///    not swept out from under itself;
207    /// 4. writes the terminal outcome and releases the lease.
208    ///
209    /// Idempotent by `process_id`: terminal processes are never in the worklist,
210    /// and a process that became terminal between the list and the claim is
211    /// detected after claiming and skipped, so re-running a recovery sweep does
212    /// not double-execute completed work.
213    pub async fn drive_pending_processes(&self) -> Result<(), PluginError> {
214        let records = self.config.process_registry.list_non_terminal().await?;
215        for record in records {
216            // Run each claimed process on its OWN lease-fenced task. A sequential
217            // drive that awaited each process to terminal would deadlock a process
218            // that blocks awaiting a nested child (`start child` then `await`, or a
219            // subagent fan-out): the one drive task would park inside the parent's
220            // await and never claim the child. Spawning frees the loop so a
221            // subsequent drive (poke or poll) claims and runs the child, and the
222            // per-process `ProcessLease` fences concurrent owners — so spawning a
223            // task per pending row on every drive is idempotent (a row already
224            // running is skipped on claim conflict) and one failing row never
225            // aborts the rest of the sweep.
226            let worker = self.clone();
227            tokio::spawn(async move { worker.recover_process(record).await });
228        }
229        Ok(())
230    }
231
232    async fn recover_process(&self, record: ProcessRecord) {
233        let owner_id = format!("process-recovery-{}", uuid::Uuid::new_v4());
234        let process_id = record.id.clone();
235        // Skip if held live by another owner: a claim conflict means a worker is
236        // already running this process, so re-running here would violate the
237        // single-owner contract. Treat any claim failure as "leased elsewhere".
238        let Ok(lease) = self
239            .config
240            .process_registry
241            .claim_process_lease(&process_id, &owner_id, RUNTIME_TURN_LEASE_TTL_MS)
242            .await
243        else {
244            return;
245        };
246        // The process may have reached a terminal state between the list and the
247        // claim. Idempotent by process_id: do not re-execute a finished process.
248        if self
249            .config
250            .process_registry
251            .get_process(&process_id)
252            .await
253            .is_some_and(|current| current.is_terminal())
254        {
255            self.release_or_log(&lease).await;
256            return;
257        }
258        let registration = ProcessRegistration {
259            id: record.id,
260            input: record.input,
261            event_types: record.event_types,
262            provenance: record.provenance.clone(),
263        };
264        // Wakes route to the creator scope; on recovery the owner scope persisted
265        // in provenance is that creator scope, so it is the wake target.
266        let execution_context = ProcessExecutionContext::default()
267            .with_wake_target_scope(record.provenance.owner_scope);
268        match self
269            .run_process_with_lease_renewal(registration, execution_context, lease.clone())
270            .await
271        {
272            // Ran to a terminal outcome (success or a process-level failure) while
273            // holding the lease: this owner is the single writer of the terminal.
274            Ok(output) => self.complete_and_release(&lease, &process_id, output).await,
275            // The lease was lost mid-run — another owner reclaimed the expired
276            // lease and is now running this process. Do NOT write a terminal
277            // outcome or release the lease: that would race the new owner and
278            // could record a succeeded process as Failed. Leave the row to the
279            // lease holder; it will finish (or another sweep retries it).
280            Err(RecoverFailure::LeaseLost(err)) => {
281                tracing::warn!(
282                    process_id = %process_id,
283                    error = %err,
284                    "process recovery lost its lease mid-run; deferring to the new owner",
285                );
286            }
287            // The process could not be run at all (rebuild/store-facet failure):
288            // terminalize as a recovery failure so the row leaves the worklist.
289            Err(RecoverFailure::Run(err)) => {
290                let output = ProcessAwaitOutput::Failure {
291                    class: crate::ToolFailureClass::Execution,
292                    code: "process_recovery_failed".to_string(),
293                    message: err.to_string(),
294                    raw: None,
295                    control: None,
296                };
297                self.complete_and_release(&lease, &process_id, output).await;
298            }
299        }
300    }
301
302    /// Write a recovered process's terminal outcome (the running lease owner is
303    /// the single writer) and then release the lease, logging either failure
304    /// rather than aborting — the lease's TTL is the backstop.
305    async fn complete_and_release(
306        &self,
307        lease: &ProcessLease,
308        process_id: &str,
309        output: ProcessAwaitOutput,
310    ) {
311        // Fence the terminal write: re-confirm the lease immediately before
312        // writing. `renew_process_lease` is rejected (by owner/lease_token/
313        // fencing_token) if another owner has reclaimed an expired lease, and on
314        // success extends the window so the back-to-back write lands inside the
315        // owned interval. A worker that stalled past its TTL therefore cannot
316        // overwrite the new owner's outcome — it defers instead.
317        let fenced = match self
318            .config
319            .process_registry
320            .renew_process_lease(lease, RUNTIME_TURN_LEASE_TTL_MS)
321            .await
322        {
323            Ok(renewed) => renewed,
324            Err(err) => {
325                tracing::warn!(
326                    process_id = %process_id,
327                    error = %err,
328                    "lost process lease before terminal write; deferring to the new owner",
329                );
330                return;
331            }
332        };
333        if let Err(err) = self
334            .config
335            .process_registry
336            .complete_process(process_id, output)
337            .await
338        {
339            tracing::warn!(
340                process_id = %process_id,
341                error = %err,
342                "failed to write recovered process terminal outcome",
343            );
344        }
345        self.release_or_log(&fenced).await;
346    }
347
348    async fn release_or_log(&self, lease: &ProcessLease) {
349        if let Err(err) = self.release_process_lease(lease).await {
350            tracing::warn!(
351                process_id = %lease.process_id,
352                error = %err,
353                "failed to release recovered process lease",
354            );
355        }
356    }
357
358    /// Run a recovered process while renewing its lease across the execution,
359    /// mirroring the turn-lease renewal that keeps a long-running effect's lease
360    /// from expiring under the live owner.
361    async fn run_process_with_lease_renewal(
362        &self,
363        registration: ProcessRegistration,
364        execution_context: ProcessExecutionContext,
365        mut lease: ProcessLease,
366    ) -> Result<ProcessAwaitOutput, RecoverFailure> {
367        let process_id = registration.id.clone();
368        let cancellation = CancellationToken::new();
369        let cancel_watcher = {
370            let registry = Arc::clone(&self.config.process_registry);
371            let process_id = process_id.clone();
372            let cancellation = cancellation.clone();
373            tokio::spawn(async move {
374                match registry
375                    .wait_event_after(&process_id, "process.cancel_requested", 0)
376                    .await
377                {
378                    Ok(_) => cancellation.cancel(),
379                    Err(err) => tracing::warn!(
380                        process_id = %process_id,
381                        error = %err,
382                        "process cancel watcher stopped before observing cancellation",
383                    ),
384                }
385            })
386        };
387        let pending = self.run_process(registration, execution_context, cancellation.clone());
388        tokio::pin!(pending);
389        loop {
390            tokio::select! {
391                outcome = &mut pending => {
392                    cancel_watcher.abort();
393                    return outcome.map_err(RecoverFailure::Run);
394                }
395                _ = tokio::time::sleep(process_lease_renew_interval()) => {
396                    match self
397                        .config
398                        .process_registry
399                        .renew_process_lease(&lease, RUNTIME_TURN_LEASE_TTL_MS)
400                        .await
401                    {
402                        Ok(renewed) => lease = renewed,
403                        Err(err) => {
404                            cancellation.cancel();
405                            cancel_watcher.abort();
406                            return Err(RecoverFailure::LeaseLost(err));
407                        }
408                    }
409                }
410            }
411        }
412    }
413
414    async fn release_process_lease(&self, lease: &ProcessLease) -> Result<(), PluginError> {
415        self.config
416            .process_registry
417            .complete_process_lease(&ProcessLeaseCompletion::from_lease(lease))
418            .await
419    }
420
421    pub async fn request_process_cancel(
422        &self,
423        process_id: &str,
424        reason: Option<String>,
425    ) -> Result<(), PluginError> {
426        self.config
427            .process_registry
428            .append_event(
429                process_id,
430                crate::ProcessEventAppendRequest::cancel_requested(process_id, reason),
431            )
432            .await
433            .map(|_| ())
434    }
435
436    async fn rebuild_runtime(&self, session_id: &str) -> Result<LashRuntime, PluginError> {
437        let store = self
438            .config
439            .session_store_factory
440            .create_store(&SessionStoreCreateRequest {
441                session_id: session_id.to_string(),
442                relation: crate::SessionRelation::Root,
443                policy: self.config.session_policy.clone(),
444            })
445            .await
446            .map_err(|err| {
447                PluginError::Session(format!(
448                    "failed to open session store for process worker session `{session_id}`: {err}"
449                ))
450            })?;
451        EmbeddedRuntimeBuilder::new()
452            .with_session_id(session_id.to_string())
453            .with_plugin_host(self.config.plugin_host.as_ref().clone())
454            .with_runtime_host(self.config.runtime_host.clone())
455            .with_policy(self.config.session_policy.clone())
456            .with_session_store_factory(Arc::clone(&self.config.session_store_factory))
457            .with_process_registry(Arc::clone(&self.config.process_registry))
458            .with_residency(self.config.residency)
459            .with_store(store)
460            .build()
461            .await
462            .map_err(|err| {
463                PluginError::Session(format!(
464                    "failed to rebuild process worker runtime for session `{session_id}`: {err}"
465                ))
466            })
467    }
468
469    /// Enforce the durable-first wiring invariant at the worker process-run
470    /// boundary: when the worker was wired with a durable effect host, every
471    /// store it will execute against must also be durable. A durable host
472    /// running against any ephemeral store fails loudly here rather than
473    /// silently re-executing a process against non-durable state.
474    ///
475    /// Inline controllers (the default tier) impose no requirement, so
476    /// inline/in-memory workers pass unchanged.
477    fn ensure_durable_store_facets(&self) -> Result<(), PluginError> {
478        if self
479            .config
480            .runtime_host
481            .control
482            .effect_host
483            .durability_tier()
484            != crate::DurabilityTier::Durable
485        {
486            return Ok(());
487        }
488        let require = |facet: crate::DurableStoreFacet| {
489            PluginError::Session(crate::RuntimeError::durable_store_required(facet).to_string())
490        };
491        if self
492            .config
493            .runtime_host
494            .durability
495            .attachment_store
496            .persistence()
497            .durability_tier()
498            != crate::DurabilityTier::Durable
499        {
500            return Err(require(crate::DurableStoreFacet::AttachmentStore));
501        }
502        if self
503            .config
504            .runtime_host
505            .durability
506            .lashlang_artifact_store
507            .durability_tier()
508            != crate::DurabilityTier::Durable
509        {
510            return Err(require(crate::DurableStoreFacet::ArtifactStore));
511        }
512        if self.config.session_store_factory.durability_tier() != crate::DurabilityTier::Durable {
513            return Err(require(crate::DurableStoreFacet::SessionStore));
514        }
515        if self.config.process_registry.durability_tier() != crate::DurabilityTier::Durable {
516            return Err(require(crate::DurableStoreFacet::ProcessRegistry));
517        }
518        Ok(())
519    }
520
521    /// Enforce the stable-process-id invariant at every (re-)execution: process
522    /// execution identity is the persisted `process_id`, so a retry — a Restate
523    /// `run` re-invocation (keyed `LashProcessWorkflow/{process_id}`) or a
524    /// recovery sweep re-running a non-terminal row — must present that stable
525    /// id. An empty/fresh id has lost its idempotency anchor and is rejected
526    /// loudly here, mirroring how `EffectScope` rejects an
527    /// empty turn id at the durable-effect boundary.
528    fn ensure_stable_process_id(
529        &self,
530        registration: &ProcessRegistration,
531    ) -> Result<(), PluginError> {
532        if registration.id.trim().is_empty() {
533            return Err(PluginError::Session(
534                crate::RuntimeError::missing_process_execution_id().to_string(),
535            ));
536        }
537        Ok(())
538    }
539
540    fn ensure_host_profile_matches(
541        &self,
542        registration: &ProcessRegistration,
543    ) -> Result<(), PluginError> {
544        let actual = registration.provenance.host_profile_id.as_str();
545        let expected = self.config.runtime_host.profile.host_profile_id.as_str();
546        if actual.is_empty() || actual == expected {
547            return Ok(());
548        }
549        Err(PluginError::Session(format!(
550            "process `{}` was created for host profile `{actual}` but this worker is `{expected}`",
551            registration.id
552        )))
553    }
554}
555
556fn process_lease_renew_interval() -> Duration {
557    Duration::from_millis(process_lease_renew_interval_ms())
558}
559
560#[cfg(test)]
561fn process_lease_renew_interval_ms() -> u64 {
562    25
563}
564
565#[cfg(not(test))]
566fn process_lease_renew_interval_ms() -> u64 {
567    30_000
568}
569
570#[cfg(test)]
571mod boundary_tests {
572    use super::*;
573    use crate::{
574        AttachmentStore, AttachmentStoreError, AttachmentStorePersistence, DurabilityTier,
575        DurableStoreFacet, InMemoryAttachmentStore, LashlangArtifactStore, ProcessInput,
576        ProcessRegistration, RuntimeEffectController, RuntimeError, StoredAttachment,
577    };
578    use lash_sansio::{AttachmentCreateMeta, AttachmentId, AttachmentRef};
579
580    /// Effect controller that reports the durable tier; the worker boundary
581    /// only reads the tier, so the effect path is never exercised here.
582    #[derive(Default)]
583    struct DurableController;
584
585    #[async_trait::async_trait]
586    impl RuntimeEffectController for DurableController {
587        fn durability_tier(&self) -> DurabilityTier {
588            DurabilityTier::Durable
589        }
590
591        async fn execute_effect(
592            &self,
593            _envelope: crate::RuntimeEffectEnvelope,
594            _local_executor: crate::RuntimeEffectLocalExecutor<'_>,
595        ) -> Result<crate::RuntimeEffectOutcome, crate::RuntimeEffectControllerError> {
596            unreachable!("worker boundary rejects before executing any effect")
597        }
598    }
599
600    /// Attachment store reporting a durable tier over in-memory storage.
601    #[derive(Default)]
602    struct DurableAttachmentStore {
603        inner: InMemoryAttachmentStore,
604    }
605
606    impl AttachmentStore for DurableAttachmentStore {
607        fn persistence(&self) -> AttachmentStorePersistence {
608            AttachmentStorePersistence::Durable
609        }
610
611        fn put(
612            &self,
613            bytes: Vec<u8>,
614            meta: AttachmentCreateMeta,
615        ) -> Result<AttachmentRef, AttachmentStoreError> {
616            self.inner.put(bytes, meta)
617        }
618
619        fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
620            self.inner.get(id)
621        }
622    }
623
624    /// Lashlang artifact store reporting a durable tier over in-memory storage.
625    #[derive(Default)]
626    struct DurableArtifactStore {
627        inner: lashlang::InMemoryLashlangArtifactStore,
628    }
629
630    #[async_trait::async_trait]
631    impl LashlangArtifactStore for DurableArtifactStore {
632        fn durability_tier(&self) -> DurabilityTier {
633            DurabilityTier::Durable
634        }
635
636        async fn put_module_artifact(
637            &self,
638            artifact: &lashlang::ModuleArtifact,
639        ) -> Result<(), lashlang::ArtifactStoreError> {
640            self.inner.put_module_artifact(artifact).await
641        }
642
643        async fn get_module_artifact(
644            &self,
645            module_ref: &lashlang::ModuleRef,
646        ) -> Result<Option<Arc<lashlang::ModuleArtifact>>, lashlang::ArtifactStoreError> {
647            self.inner.get_module_artifact(module_ref).await
648        }
649    }
650
651    /// Session store factory whose declared tier is configurable; it never has
652    /// to create a store because the worker boundary rejects first.
653    struct TierSessionStoreFactory {
654        tier: DurabilityTier,
655    }
656
657    #[async_trait::async_trait]
658    impl SessionStoreFactory for TierSessionStoreFactory {
659        fn durability_tier(&self) -> DurabilityTier {
660            self.tier
661        }
662
663        async fn create_store(
664            &self,
665            _request: &crate::SessionStoreCreateRequest,
666        ) -> Result<Arc<dyn crate::RuntimePersistence>, String> {
667            unreachable!("worker boundary rejects before creating a session store")
668        }
669
670        async fn delete_session(&self, _session_id: &str) -> Result<(), String> {
671            Ok(())
672        }
673    }
674
675    /// Build a worker whose controller is durable but whose stores can be set
676    /// per-facet to durable/ephemeral, so each facet's loud rejection can be
677    /// exercised independently.
678    fn worker(
679        attachment: Arc<dyn AttachmentStore>,
680        artifact: Arc<dyn LashlangArtifactStore>,
681        session_store_tier: DurabilityTier,
682    ) -> DurableProcessWorker {
683        worker_with_process_registry_tier(
684            attachment,
685            artifact,
686            session_store_tier,
687            DurabilityTier::Durable,
688        )
689    }
690
691    fn worker_with_process_registry_tier(
692        attachment: Arc<dyn AttachmentStore>,
693        artifact: Arc<dyn LashlangArtifactStore>,
694        session_store_tier: DurabilityTier,
695        process_registry_tier: DurabilityTier,
696    ) -> DurableProcessWorker {
697        let mut runtime_host = RuntimeHostConfig::in_memory();
698        runtime_host.control.effect_host =
699            Arc::new(crate::InlineEffectHost::new(Arc::new(DurableController)));
700        runtime_host.durability.attachment_store = attachment;
701        runtime_host.durability.lashlang_artifact_store = artifact;
702        let plugin_host = Arc::new(crate::PluginHost::new(Vec::new()));
703        let factory: Arc<dyn SessionStoreFactory> = Arc::new(TierSessionStoreFactory {
704            tier: session_store_tier,
705        });
706        let registry: Arc<dyn ProcessRegistry> = Arc::new(
707            crate::TestLocalProcessRegistry::default().with_durability_tier(process_registry_tier),
708        );
709        DurableProcessWorker::new(DurableProcessWorkerConfig::new(
710            plugin_host,
711            runtime_host,
712            factory,
713            registry,
714        ))
715    }
716
717    fn external_registration() -> ProcessRegistration {
718        ProcessRegistration::new(
719            "worker-boundary-process",
720            ProcessInput::External {
721                metadata: serde_json::json!({}),
722            },
723        )
724    }
725
726    async fn run(worker: &DurableProcessWorker) -> Result<ProcessAwaitOutput, PluginError> {
727        worker
728            .run_process(
729                external_registration(),
730                ProcessExecutionContext::default(),
731                CancellationToken::new(),
732            )
733            .await
734    }
735
736    fn assert_facet(err: PluginError, facet: DurableStoreFacet) {
737        let PluginError::Session(message) = err else {
738            panic!("expected PluginError::Session, got {err:?}");
739        };
740        let expected = RuntimeError::durable_store_required(facet).to_string();
741        assert_eq!(message, expected, "worker must reject the {facet:?} facet");
742    }
743
744    #[tokio::test]
745    async fn durable_worker_rejects_ephemeral_attachment_store() {
746        let worker = worker(
747            Arc::new(InMemoryAttachmentStore::new()),
748            Arc::new(DurableArtifactStore::default()),
749            DurabilityTier::Durable,
750        );
751        let err = run(&worker)
752            .await
753            .expect_err("ephemeral attachment store must be rejected at the worker boundary");
754        assert_facet(err, DurableStoreFacet::AttachmentStore);
755    }
756
757    #[tokio::test]
758    async fn durable_worker_rejects_ephemeral_artifact_store() {
759        let worker = worker(
760            Arc::new(DurableAttachmentStore::default()),
761            Arc::new(lashlang::InMemoryLashlangArtifactStore::new()),
762            DurabilityTier::Durable,
763        );
764        let err = run(&worker)
765            .await
766            .expect_err("ephemeral artifact store must be rejected at the worker boundary");
767        assert_facet(err, DurableStoreFacet::ArtifactStore);
768    }
769
770    #[tokio::test]
771    async fn durable_worker_rejects_ephemeral_session_store_factory() {
772        let worker = worker(
773            Arc::new(DurableAttachmentStore::default()),
774            Arc::new(DurableArtifactStore::default()),
775            DurabilityTier::Inline,
776        );
777        let err = run(&worker)
778            .await
779            .expect_err("ephemeral session store factory must be rejected at the worker boundary");
780        assert_facet(err, DurableStoreFacet::SessionStore);
781    }
782
783    #[tokio::test]
784    async fn durable_worker_rejects_ephemeral_process_registry() {
785        let worker = worker_with_process_registry_tier(
786            Arc::new(DurableAttachmentStore::default()),
787            Arc::new(DurableArtifactStore::default()),
788            DurabilityTier::Durable,
789            DurabilityTier::Inline,
790        );
791        let err = run(&worker)
792            .await
793            .expect_err("ephemeral process registry must be rejected at the worker boundary");
794        assert_facet(err, DurableStoreFacet::ProcessRegistry);
795    }
796
797    #[tokio::test]
798    async fn durable_worker_with_all_durable_stores_passes_store_facet_check() {
799        // Positive control: a durable worker wired against fully durable stores
800        // clears the store-facet guard and proceeds to run the (External)
801        // process.
802        let worker = worker(
803            Arc::new(DurableAttachmentStore::default()),
804            Arc::new(DurableArtifactStore::default()),
805            DurabilityTier::Durable,
806        );
807        let output = run(&worker)
808            .await
809            .expect("all-durable worker should pass the store-facet guard");
810        assert!(matches!(output, ProcessAwaitOutput::Success { .. }));
811    }
812
813    #[tokio::test]
814    async fn inline_worker_passes_store_facet_check_with_ephemeral_stores() {
815        // Inline controllers impose no requirement, so an in-memory worker runs
816        // unchanged — the durable-first guard must not regress inline hosts.
817        let runtime_host = RuntimeHostConfig::in_memory();
818        let plugin_host = Arc::new(crate::PluginHost::new(Vec::new()));
819        let factory: Arc<dyn SessionStoreFactory> = Arc::new(TierSessionStoreFactory {
820            tier: DurabilityTier::Inline,
821        });
822        let registry: Arc<dyn ProcessRegistry> =
823            Arc::new(crate::TestLocalProcessRegistry::default());
824        let worker = DurableProcessWorker::new(DurableProcessWorkerConfig::new(
825            plugin_host,
826            runtime_host,
827            factory,
828            registry,
829        ));
830        let output = run(&worker)
831            .await
832            .expect("inline worker should pass the store-facet guard");
833        assert!(matches!(output, ProcessAwaitOutput::Success { .. }));
834    }
835}