Skip to main content

lash_core/runtime/
process_worker.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use tokio_util::sync::CancellationToken;
5
6use super::effect::ProcessRunner;
7use super::session_manager::RuntimeSessionServices;
8use super::{EmbeddedRuntimeBuilder, RUNTIME_TURN_LEASE_TTL_MS, RuntimeHostConfig};
9use crate::{
10    LashRuntime, PluginError, PluginFactory, PluginHost, PluginStack, ProcessAwaitOutput,
11    ProcessExecutionContext, ProcessInput, ProcessLease, ProcessLeaseCompletion, ProcessRecord,
12    ProcessRegistration, ProcessRegistry, SessionStoreCreateRequest, SessionStoreFactory,
13};
14
15/// Deployment-local configuration for rebuilding durable process executions.
16///
17/// Process rows intentionally carry only portable process input and provenance.
18/// Workers provide the host profile, plugins, providers, stores, secrets, and
19/// host capabilities for the deployment that owns those rows.
20#[derive(Clone)]
21pub struct DurableProcessWorkerConfig {
22    pub plugin_host: Arc<PluginHost>,
23    pub runtime_host: RuntimeHostConfig,
24    pub session_policy: crate::SessionPolicy,
25    pub session_store_factory: Arc<dyn SessionStoreFactory>,
26    pub process_registry: Arc<dyn ProcessRegistry>,
27    pub host_event_store: Arc<dyn crate::HostEventStore>,
28    /// Residency for sessions the worker rebuilds to run a process. Defaults to
29    /// [`Residency::KeepAll`]; a host running [`Residency::ActivePathOnly`] wires
30    /// it here so the worker's rebuilt sessions trim to the active path too,
31    /// instead of silently diverging from the live runtime by keeping the full
32    /// graph resident.
33    pub residency: crate::Residency,
34}
35
36impl DurableProcessWorkerConfig {
37    pub fn new(
38        plugin_host: Arc<PluginHost>,
39        runtime_host: RuntimeHostConfig,
40        session_store_factory: Arc<dyn SessionStoreFactory>,
41        process_registry: Arc<dyn ProcessRegistry>,
42    ) -> Self {
43        Self {
44            plugin_host,
45            runtime_host,
46            session_policy: crate::SessionPolicy::default(),
47            session_store_factory,
48            process_registry,
49            host_event_store: Arc::new(crate::InMemoryHostEventStore::default()),
50            residency: crate::Residency::default(),
51        }
52    }
53
54    pub fn with_host_event_store(mut self, store: Arc<dyn crate::HostEventStore>) -> Self {
55        self.host_event_store = store;
56        self
57    }
58
59    pub fn with_session_policy(mut self, policy: crate::SessionPolicy) -> Self {
60        self.session_policy = policy;
61        self
62    }
63
64    pub fn with_residency(mut self, residency: crate::Residency) -> Self {
65        self.residency = residency;
66        self
67    }
68
69    pub fn from_plugin_factories(
70        plugin_factories: impl IntoIterator<Item = Arc<dyn PluginFactory>>,
71        runtime_host: RuntimeHostConfig,
72        session_store_factory: Arc<dyn SessionStoreFactory>,
73        process_registry: Arc<dyn ProcessRegistry>,
74    ) -> Self {
75        Self::new(
76            Arc::new(PluginHost::new(plugin_factories.into_iter().collect())),
77            runtime_host,
78            session_store_factory,
79            process_registry,
80        )
81    }
82
83    pub fn from_plugin_stack(
84        plugin_stack: PluginStack,
85        runtime_host: RuntimeHostConfig,
86        session_store_factory: Arc<dyn SessionStoreFactory>,
87        process_registry: Arc<dyn ProcessRegistry>,
88    ) -> Self {
89        Self::from_plugin_factories(
90            plugin_stack.into_factories(),
91            runtime_host,
92            session_store_factory,
93            process_registry,
94        )
95    }
96}
97
98/// Reconstructable background-process worker.
99#[derive(Clone)]
100pub struct DurableProcessWorker {
101    config: Arc<DurableProcessWorkerConfig>,
102}
103
104/// Why a recovery run did not produce a terminal outcome under the lease.
105enum RecoverFailure {
106    /// The lease was lost mid-run (another owner reclaimed an expired lease).
107    /// The losing worker must not write a terminal outcome — the new owner is
108    /// now the single writer.
109    LeaseLost(PluginError),
110    /// The process could not be run (rebuild/store-facet failure). The lease is
111    /// still held, so this worker terminalizes the row.
112    Run(PluginError),
113}
114
115impl DurableProcessWorker {
116    pub fn new(config: DurableProcessWorkerConfig) -> Self {
117        Self {
118            config: Arc::new(config),
119        }
120    }
121
122    pub fn from_shared_config(config: Arc<DurableProcessWorkerConfig>) -> Self {
123        Self { config }
124    }
125
126    pub fn config(&self) -> &DurableProcessWorkerConfig {
127        &self.config
128    }
129
130    pub async fn run_process(
131        &self,
132        registration: ProcessRegistration,
133        execution_context: ProcessExecutionContext,
134        cancellation: CancellationToken,
135    ) -> Result<ProcessAwaitOutput, PluginError> {
136        let scoped_effect_controller = self
137            .config
138            .runtime_host
139            .control
140            .effect_host
141            .scoped_static(crate::EffectScope::process(registration.id.clone()))
142            .map_err(|err| PluginError::Session(err.to_string()))?
143            .ok_or_else(|| {
144                PluginError::Session(
145                    "process worker effect host must provide a static process scope".to_string(),
146                )
147            })?;
148        self.run_process_with_scoped_effect_controller(
149            registration,
150            execution_context,
151            scoped_effect_controller,
152            cancellation,
153        )
154        .await
155    }
156
157    pub async fn run_process_with_scoped_effect_controller(
158        &self,
159        registration: ProcessRegistration,
160        execution_context: ProcessExecutionContext,
161        scoped_effect_controller: crate::ScopedEffectController<'_>,
162        cancellation: CancellationToken,
163    ) -> Result<ProcessAwaitOutput, PluginError> {
164        self.ensure_stable_process_id(&registration)?;
165        self.ensure_host_profile_matches(&registration)?;
166        self.ensure_durable_store_facets()?;
167        if let ProcessInput::External { metadata } = registration.input.as_ref() {
168            return Ok(ProcessAwaitOutput::Success {
169                value: serde_json::json!({ "metadata": metadata.clone() }),
170                control: None,
171            });
172        }
173        let session_id = registration.provenance.owner_scope.session_id.as_str();
174        if session_id.is_empty() {
175            return Err(PluginError::Session(format!(
176                "process `{}` is missing a structured owner scope",
177                registration.id
178            )));
179        }
180        let runtime = self.rebuild_runtime(session_id).await?;
181        let manager = RuntimeSessionServices::new(&runtime, true, None).map_err(|err| {
182            PluginError::Session(format!(
183                "failed to rebuild runtime session `{session_id}` for process `{}`: {err}",
184                registration.id
185            ))
186        })?;
187        Ok(manager
188            .run_process(
189                registration,
190                execution_context,
191                Arc::clone(&self.config.process_registry),
192                scoped_effect_controller,
193                cancellation,
194            )
195            .await)
196    }
197
198    /// Sweep the registry for non-terminal processes and re-execute the ones
199    /// this worker can claim, driving each to a terminal state.
200    ///
201    /// This is the crash-recovery counterpart to a worker that ran a process
202    /// from a live turn: a trigger/host-event-started process whose worker
203    /// died mid-flight is left non-terminal in the registry, and a subsequent
204    /// worker reopening that registry must finish it. The sweep:
205    ///
206    /// 1. lists every non-terminal process ([`ProcessRegistry::list_non_terminal`]);
207    /// 2. claims the durable single-owner [`ProcessLease`] over each — a process
208    ///    already leased live by *another* owner is skipped (it is being run by
209    ///    that owner right now), so a non-terminal process is re-run by exactly
210    ///    one owner (lease fencing);
211    /// 3. runs the claimed process on this worker's wired controller, renewing
212    ///    the lease across the long-running execution so a healthy recovery is
213    ///    not swept out from under itself;
214    /// 4. writes the terminal outcome and releases the lease.
215    ///
216    /// Idempotent by `process_id`: terminal processes are never in the worklist,
217    /// and a process that became terminal between the list and the claim is
218    /// detected after claiming and skipped, so re-running a recovery sweep does
219    /// not double-execute completed work.
220    pub async fn drive_pending_processes(&self) -> Result<(), PluginError> {
221        let records = self.config.process_registry.list_non_terminal().await?;
222        for record in records {
223            // Run each claimed process on its OWN lease-fenced task. A sequential
224            // drive that awaited each process to terminal would deadlock a process
225            // that blocks awaiting a nested child (`start child` then `await`, or a
226            // subagent fan-out): the one drive task would park inside the parent's
227            // await and never claim the child. Spawning frees the loop so a
228            // subsequent drive (poke or poll) claims and runs the child, and the
229            // per-process `ProcessLease` fences concurrent owners — so spawning a
230            // task per pending row on every drive is idempotent (a row already
231            // running is skipped on claim conflict) and one failing row never
232            // aborts the rest of the sweep.
233            let worker = self.clone();
234            tokio::spawn(async move { worker.recover_process(record).await });
235        }
236        Ok(())
237    }
238
239    async fn recover_process(&self, record: ProcessRecord) {
240        let owner_id = format!("process-recovery-{}", uuid::Uuid::new_v4());
241        let process_id = record.id.clone();
242        // Skip if held live by another owner: a claim conflict means a worker is
243        // already running this process, so re-running here would violate the
244        // single-owner contract. Treat any claim failure as "leased elsewhere".
245        let Ok(lease) = self
246            .config
247            .process_registry
248            .claim_process_lease(&process_id, &owner_id, RUNTIME_TURN_LEASE_TTL_MS)
249            .await
250        else {
251            return;
252        };
253        // The process may have reached a terminal state between the list and the
254        // claim. Idempotent by process_id: do not re-execute a finished process.
255        if self
256            .config
257            .process_registry
258            .get_process(&process_id)
259            .await
260            .is_some_and(|current| current.is_terminal())
261        {
262            self.release_or_log(&lease).await;
263            return;
264        }
265        let registration = ProcessRegistration {
266            id: record.id,
267            input: record.input,
268            event_types: record.event_types,
269            provenance: record.provenance.clone(),
270        };
271        // Wakes route to the creator scope; on recovery the owner scope persisted
272        // in provenance is that creator scope, so it is the wake target.
273        let execution_context = ProcessExecutionContext::default()
274            .with_wake_target_scope(record.provenance.owner_scope);
275        match self
276            .run_process_with_lease_renewal(registration, execution_context, lease.clone())
277            .await
278        {
279            // Ran to a terminal outcome (success or a process-level failure) while
280            // holding the lease: this owner is the single writer of the terminal.
281            Ok(output) => self.complete_and_release(&lease, &process_id, output).await,
282            // The lease was lost mid-run — another owner reclaimed the expired
283            // lease and is now running this process. Do NOT write a terminal
284            // outcome or release the lease: that would race the new owner and
285            // could record a succeeded process as Failed. Leave the row to the
286            // lease holder; it will finish (or another sweep retries it).
287            Err(RecoverFailure::LeaseLost(err)) => {
288                tracing::warn!(
289                    process_id = %process_id,
290                    error = %err,
291                    "process recovery lost its lease mid-run; deferring to the new owner",
292                );
293            }
294            // The process could not be run at all (rebuild/store-facet failure):
295            // terminalize as a recovery failure so the row leaves the worklist.
296            Err(RecoverFailure::Run(err)) => {
297                let output = ProcessAwaitOutput::Failure {
298                    class: crate::ToolFailureClass::Execution,
299                    code: "process_recovery_failed".to_string(),
300                    message: err.to_string(),
301                    raw: None,
302                    control: None,
303                };
304                self.complete_and_release(&lease, &process_id, output).await;
305            }
306        }
307    }
308
309    /// Write a recovered process's terminal outcome (the running lease owner is
310    /// the single writer) and then release the lease, logging either failure
311    /// rather than aborting — the lease's TTL is the backstop.
312    async fn complete_and_release(
313        &self,
314        lease: &ProcessLease,
315        process_id: &str,
316        output: ProcessAwaitOutput,
317    ) {
318        // Fence the terminal write: re-confirm the lease immediately before
319        // writing. `renew_process_lease` is rejected (by owner/lease_token/
320        // fencing_token) if another owner has reclaimed an expired lease, and on
321        // success extends the window so the back-to-back write lands inside the
322        // owned interval. A worker that stalled past its TTL therefore cannot
323        // overwrite the new owner's outcome — it defers instead.
324        let fenced = match self
325            .config
326            .process_registry
327            .renew_process_lease(lease, RUNTIME_TURN_LEASE_TTL_MS)
328            .await
329        {
330            Ok(renewed) => renewed,
331            Err(err) => {
332                tracing::warn!(
333                    process_id = %process_id,
334                    error = %err,
335                    "lost process lease before terminal write; deferring to the new owner",
336                );
337                return;
338            }
339        };
340        if let Err(err) = self
341            .config
342            .process_registry
343            .complete_process(process_id, output)
344            .await
345        {
346            tracing::warn!(
347                process_id = %process_id,
348                error = %err,
349                "failed to write recovered process terminal outcome",
350            );
351        }
352        self.release_or_log(&fenced).await;
353    }
354
355    async fn release_or_log(&self, lease: &ProcessLease) {
356        if let Err(err) = self.release_process_lease(lease).await {
357            tracing::warn!(
358                process_id = %lease.process_id,
359                error = %err,
360                "failed to release recovered process lease",
361            );
362        }
363    }
364
365    /// Run a recovered process while renewing its lease across the execution,
366    /// mirroring the turn-lease renewal that keeps a long-running effect's lease
367    /// from expiring under the live owner.
368    async fn run_process_with_lease_renewal(
369        &self,
370        registration: ProcessRegistration,
371        execution_context: ProcessExecutionContext,
372        mut lease: ProcessLease,
373    ) -> Result<ProcessAwaitOutput, RecoverFailure> {
374        let process_id = registration.id.clone();
375        let cancellation = CancellationToken::new();
376        let cancel_watcher = {
377            let registry = Arc::clone(&self.config.process_registry);
378            let process_id = process_id.clone();
379            let cancellation = cancellation.clone();
380            tokio::spawn(async move {
381                match registry
382                    .wait_event_after(&process_id, "process.cancel_requested", 0)
383                    .await
384                {
385                    Ok(_) => cancellation.cancel(),
386                    Err(err) => tracing::warn!(
387                        process_id = %process_id,
388                        error = %err,
389                        "process cancel watcher stopped before observing cancellation",
390                    ),
391                }
392            })
393        };
394        let pending = self.run_process(registration, execution_context, cancellation.clone());
395        tokio::pin!(pending);
396        loop {
397            tokio::select! {
398                outcome = &mut pending => {
399                    cancel_watcher.abort();
400                    return outcome.map_err(RecoverFailure::Run);
401                }
402                _ = tokio::time::sleep(process_lease_renew_interval()) => {
403                    match self
404                        .config
405                        .process_registry
406                        .renew_process_lease(&lease, RUNTIME_TURN_LEASE_TTL_MS)
407                        .await
408                    {
409                        Ok(renewed) => lease = renewed,
410                        Err(err) => {
411                            cancellation.cancel();
412                            cancel_watcher.abort();
413                            return Err(RecoverFailure::LeaseLost(err));
414                        }
415                    }
416                }
417            }
418        }
419    }
420
421    async fn release_process_lease(&self, lease: &ProcessLease) -> Result<(), PluginError> {
422        self.config
423            .process_registry
424            .complete_process_lease(&ProcessLeaseCompletion::from_lease(lease))
425            .await
426    }
427
428    pub async fn request_process_cancel(
429        &self,
430        process_id: &str,
431        reason: Option<String>,
432    ) -> Result<(), PluginError> {
433        self.config
434            .process_registry
435            .append_event(
436                process_id,
437                crate::ProcessEventAppendRequest::cancel_requested(process_id, reason),
438            )
439            .await
440            .map(|_| ())
441    }
442
443    async fn rebuild_runtime(&self, session_id: &str) -> Result<LashRuntime, PluginError> {
444        let store = self
445            .config
446            .session_store_factory
447            .create_store(&SessionStoreCreateRequest {
448                session_id: session_id.to_string(),
449                relation: crate::SessionRelation::Root,
450                policy: self.config.session_policy.clone(),
451            })
452            .await
453            .map_err(|err| {
454                PluginError::Session(format!(
455                    "failed to open session store for process worker session `{session_id}`: {err}"
456                ))
457            })?;
458        EmbeddedRuntimeBuilder::new()
459            .with_session_id(session_id.to_string())
460            .with_plugin_host(self.config.plugin_host.as_ref().clone())
461            .with_runtime_host(self.config.runtime_host.clone())
462            .with_policy(self.config.session_policy.clone())
463            .with_session_store_factory(Arc::clone(&self.config.session_store_factory))
464            .with_host_event_store(Arc::clone(&self.config.host_event_store))
465            .with_process_registry(Arc::clone(&self.config.process_registry))
466            .with_residency(self.config.residency)
467            .with_store(store)
468            .build()
469            .await
470            .map_err(|err| {
471                PluginError::Session(format!(
472                    "failed to rebuild process worker runtime for session `{session_id}`: {err}"
473                ))
474            })
475    }
476
477    /// Enforce the durable-first wiring invariant at the worker process-run
478    /// boundary: when the worker was wired with a durable effect host, every
479    /// store it will execute against must also be durable. A durable host
480    /// running against any ephemeral store fails loudly here rather than
481    /// silently re-executing a process against non-durable state.
482    ///
483    /// Inline controllers (the default tier) impose no requirement, so
484    /// inline/in-memory workers pass unchanged.
485    fn ensure_durable_store_facets(&self) -> Result<(), PluginError> {
486        if self
487            .config
488            .runtime_host
489            .control
490            .effect_host
491            .durability_tier()
492            != crate::DurabilityTier::Durable
493        {
494            return Ok(());
495        }
496        let require = |facet: crate::DurableStoreFacet| {
497            PluginError::Session(crate::RuntimeError::durable_store_required(facet).to_string())
498        };
499        if self
500            .config
501            .runtime_host
502            .durability
503            .attachment_store
504            .persistence()
505            .durability_tier()
506            != crate::DurabilityTier::Durable
507        {
508            return Err(require(crate::DurableStoreFacet::AttachmentStore));
509        }
510        if self
511            .config
512            .runtime_host
513            .durability
514            .lashlang_artifact_store
515            .durability_tier()
516            != crate::DurabilityTier::Durable
517        {
518            return Err(require(crate::DurableStoreFacet::ArtifactStore));
519        }
520        if self.config.session_store_factory.durability_tier() != crate::DurabilityTier::Durable {
521            return Err(require(crate::DurableStoreFacet::SessionStore));
522        }
523        if self.config.process_registry.durability_tier() != crate::DurabilityTier::Durable {
524            return Err(require(crate::DurableStoreFacet::ProcessRegistry));
525        }
526        if self.config.host_event_store.durability_tier() != crate::DurabilityTier::Durable {
527            return Err(require(crate::DurableStoreFacet::HostEventStore));
528        }
529        Ok(())
530    }
531
532    /// Enforce the stable-process-id invariant at every (re-)execution: process
533    /// execution identity is the persisted `process_id`, so a retry — a Restate
534    /// `run` re-invocation (keyed `LashProcessWorkflow/{process_id}`) or a
535    /// recovery sweep re-running a non-terminal row — must present that stable
536    /// id. An empty/fresh id has lost its idempotency anchor and is rejected
537    /// loudly here, mirroring how `EffectScope` rejects an
538    /// empty turn id at the durable-effect boundary.
539    fn ensure_stable_process_id(
540        &self,
541        registration: &ProcessRegistration,
542    ) -> Result<(), PluginError> {
543        if registration.id.trim().is_empty() {
544            return Err(PluginError::Session(
545                crate::RuntimeError::missing_process_execution_id().to_string(),
546            ));
547        }
548        Ok(())
549    }
550
551    fn ensure_host_profile_matches(
552        &self,
553        registration: &ProcessRegistration,
554    ) -> Result<(), PluginError> {
555        let actual = registration.provenance.host_profile_id.as_str();
556        let expected = self.config.runtime_host.profile.host_profile_id.as_str();
557        if actual.is_empty() || actual == expected {
558            return Ok(());
559        }
560        Err(PluginError::Session(format!(
561            "process `{}` was created for host profile `{actual}` but this worker is `{expected}`",
562            registration.id
563        )))
564    }
565}
566
567fn process_lease_renew_interval() -> Duration {
568    Duration::from_millis(process_lease_renew_interval_ms())
569}
570
571#[cfg(test)]
572fn process_lease_renew_interval_ms() -> u64 {
573    25
574}
575
576#[cfg(not(test))]
577fn process_lease_renew_interval_ms() -> u64 {
578    30_000
579}
580
581#[cfg(test)]
582mod boundary_tests {
583    use super::*;
584    use crate::{
585        AttachmentStore, AttachmentStoreError, AttachmentStorePersistence, DurabilityTier,
586        DurableStoreFacet, HostEventStore, InMemoryAttachmentStore, LashlangArtifactStore,
587        ProcessInput, ProcessRegistration, RuntimeEffectController, RuntimeError, StoredAttachment,
588    };
589    use lash_sansio::{AttachmentCreateMeta, AttachmentId, AttachmentRef};
590
591    /// Effect controller that reports the durable tier; the worker boundary
592    /// only reads the tier, so the effect path is never exercised here.
593    #[derive(Default)]
594    struct DurableController;
595
596    #[async_trait::async_trait]
597    impl RuntimeEffectController for DurableController {
598        fn durability_tier(&self) -> DurabilityTier {
599            DurabilityTier::Durable
600        }
601
602        async fn execute_effect(
603            &self,
604            _envelope: crate::RuntimeEffectEnvelope,
605            _local_executor: crate::RuntimeEffectLocalExecutor<'_>,
606        ) -> Result<crate::RuntimeEffectOutcome, crate::RuntimeEffectControllerError> {
607            unreachable!("worker boundary rejects before executing any effect")
608        }
609    }
610
611    /// Attachment store reporting a durable tier over in-memory storage.
612    #[derive(Default)]
613    struct DurableAttachmentStore {
614        inner: InMemoryAttachmentStore,
615    }
616
617    impl AttachmentStore for DurableAttachmentStore {
618        fn persistence(&self) -> AttachmentStorePersistence {
619            AttachmentStorePersistence::Durable
620        }
621
622        fn put(
623            &self,
624            bytes: Vec<u8>,
625            meta: AttachmentCreateMeta,
626        ) -> Result<AttachmentRef, AttachmentStoreError> {
627            self.inner.put(bytes, meta)
628        }
629
630        fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
631            self.inner.get(id)
632        }
633    }
634
635    /// Lashlang artifact store reporting a durable tier over in-memory storage.
636    #[derive(Default)]
637    struct DurableArtifactStore {
638        inner: lashlang::InMemoryLashlangArtifactStore,
639    }
640
641    #[async_trait::async_trait]
642    impl LashlangArtifactStore for DurableArtifactStore {
643        fn durability_tier(&self) -> DurabilityTier {
644            DurabilityTier::Durable
645        }
646
647        async fn put_module_artifact(
648            &self,
649            artifact: &lashlang::ModuleArtifact,
650        ) -> Result<(), lashlang::ArtifactStoreError> {
651            self.inner.put_module_artifact(artifact).await
652        }
653
654        async fn get_module_artifact(
655            &self,
656            module_ref: &lashlang::ModuleRef,
657        ) -> Result<Option<Arc<lashlang::ModuleArtifact>>, lashlang::ArtifactStoreError> {
658            self.inner.get_module_artifact(module_ref).await
659        }
660    }
661
662    /// Session store factory whose declared tier is configurable; it never has
663    /// to create a store because the worker boundary rejects first.
664    struct TierSessionStoreFactory {
665        tier: DurabilityTier,
666    }
667
668    #[async_trait::async_trait]
669    impl SessionStoreFactory for TierSessionStoreFactory {
670        fn durability_tier(&self) -> DurabilityTier {
671            self.tier
672        }
673
674        async fn create_store(
675            &self,
676            _request: &crate::SessionStoreCreateRequest,
677        ) -> Result<Arc<dyn crate::RuntimePersistence>, String> {
678            unreachable!("worker boundary rejects before creating a session store")
679        }
680
681        async fn delete_session(&self, _session_id: &str) -> Result<(), String> {
682            Ok(())
683        }
684    }
685
686    struct TierHostEventStore {
687        tier: DurabilityTier,
688        inner: crate::InMemoryHostEventStore,
689    }
690
691    impl TierHostEventStore {
692        fn new(tier: DurabilityTier) -> Self {
693            Self {
694                tier,
695                inner: crate::InMemoryHostEventStore::default(),
696            }
697        }
698    }
699
700    #[async_trait::async_trait]
701    impl HostEventStore for TierHostEventStore {
702        fn durability_tier(&self) -> DurabilityTier {
703            self.tier
704        }
705
706        async fn register_subscription(
707            &self,
708            draft: crate::TriggerSubscriptionDraft,
709        ) -> Result<crate::TriggerSubscriptionRecord, PluginError> {
710            self.inner.register_subscription(draft).await
711        }
712
713        async fn list_subscriptions(
714            &self,
715            filter: crate::TriggerSubscriptionFilter,
716        ) -> Result<Vec<crate::TriggerSubscriptionRecord>, PluginError> {
717            self.inner.list_subscriptions(filter).await
718        }
719
720        async fn cancel_subscription(
721            &self,
722            session_id: &str,
723            handle: &str,
724        ) -> Result<bool, PluginError> {
725            self.inner.cancel_subscription(session_id, handle).await
726        }
727
728        async fn record_occurrence(
729            &self,
730            request: crate::HostEventOccurrenceRequest,
731        ) -> Result<crate::HostEventOccurrenceRecord, PluginError> {
732            self.inner.record_occurrence(request).await
733        }
734
735        async fn reserve_matching_deliveries(
736            &self,
737            occurrence_id: &str,
738        ) -> Result<Vec<crate::TriggerDeliveryReservation>, PluginError> {
739            self.inner.reserve_matching_deliveries(occurrence_id).await
740        }
741    }
742
743    /// Build a worker whose controller is durable but whose stores can be set
744    /// per-facet to durable/ephemeral, so each facet's loud rejection can be
745    /// exercised independently.
746    fn worker(
747        attachment: Arc<dyn AttachmentStore>,
748        artifact: Arc<dyn LashlangArtifactStore>,
749        session_store_tier: DurabilityTier,
750    ) -> DurableProcessWorker {
751        worker_with_store_tiers(
752            attachment,
753            artifact,
754            session_store_tier,
755            DurabilityTier::Durable,
756            DurabilityTier::Durable,
757        )
758    }
759
760    fn worker_with_store_tiers(
761        attachment: Arc<dyn AttachmentStore>,
762        artifact: Arc<dyn LashlangArtifactStore>,
763        session_store_tier: DurabilityTier,
764        process_registry_tier: DurabilityTier,
765        host_event_store_tier: DurabilityTier,
766    ) -> DurableProcessWorker {
767        let mut runtime_host = RuntimeHostConfig::in_memory();
768        runtime_host.control.effect_host =
769            Arc::new(crate::InlineEffectHost::new(Arc::new(DurableController)));
770        runtime_host.durability.attachment_store = attachment;
771        runtime_host.durability.lashlang_artifact_store = artifact;
772        let plugin_host = Arc::new(crate::PluginHost::new(Vec::new()));
773        let factory: Arc<dyn SessionStoreFactory> = Arc::new(TierSessionStoreFactory {
774            tier: session_store_tier,
775        });
776        let registry: Arc<dyn ProcessRegistry> = Arc::new(
777            crate::TestLocalProcessRegistry::default().with_durability_tier(process_registry_tier),
778        );
779        let host_event_store: Arc<dyn HostEventStore> =
780            Arc::new(TierHostEventStore::new(host_event_store_tier));
781        DurableProcessWorker::new(
782            DurableProcessWorkerConfig::new(plugin_host, runtime_host, factory, registry)
783                .with_host_event_store(host_event_store),
784        )
785    }
786
787    fn external_registration() -> ProcessRegistration {
788        ProcessRegistration::new(
789            "worker-boundary-process",
790            ProcessInput::External {
791                metadata: serde_json::json!({}),
792            },
793        )
794    }
795
796    async fn run(worker: &DurableProcessWorker) -> Result<ProcessAwaitOutput, PluginError> {
797        worker
798            .run_process(
799                external_registration(),
800                ProcessExecutionContext::default(),
801                CancellationToken::new(),
802            )
803            .await
804    }
805
806    fn assert_facet(err: PluginError, facet: DurableStoreFacet) {
807        let PluginError::Session(message) = err else {
808            panic!("expected PluginError::Session, got {err:?}");
809        };
810        let expected = RuntimeError::durable_store_required(facet).to_string();
811        assert_eq!(message, expected, "worker must reject the {facet:?} facet");
812    }
813
814    #[tokio::test]
815    async fn durable_worker_rejects_ephemeral_attachment_store() {
816        let worker = worker(
817            Arc::new(InMemoryAttachmentStore::new()),
818            Arc::new(DurableArtifactStore::default()),
819            DurabilityTier::Durable,
820        );
821        let err = run(&worker)
822            .await
823            .expect_err("ephemeral attachment store must be rejected at the worker boundary");
824        assert_facet(err, DurableStoreFacet::AttachmentStore);
825    }
826
827    #[tokio::test]
828    async fn durable_worker_rejects_ephemeral_artifact_store() {
829        let worker = worker(
830            Arc::new(DurableAttachmentStore::default()),
831            Arc::new(lashlang::InMemoryLashlangArtifactStore::new()),
832            DurabilityTier::Durable,
833        );
834        let err = run(&worker)
835            .await
836            .expect_err("ephemeral artifact store must be rejected at the worker boundary");
837        assert_facet(err, DurableStoreFacet::ArtifactStore);
838    }
839
840    #[tokio::test]
841    async fn durable_worker_rejects_ephemeral_session_store_factory() {
842        let worker = worker(
843            Arc::new(DurableAttachmentStore::default()),
844            Arc::new(DurableArtifactStore::default()),
845            DurabilityTier::Inline,
846        );
847        let err = run(&worker)
848            .await
849            .expect_err("ephemeral session store factory must be rejected at the worker boundary");
850        assert_facet(err, DurableStoreFacet::SessionStore);
851    }
852
853    #[tokio::test]
854    async fn durable_worker_rejects_ephemeral_process_registry() {
855        let worker = worker_with_store_tiers(
856            Arc::new(DurableAttachmentStore::default()),
857            Arc::new(DurableArtifactStore::default()),
858            DurabilityTier::Durable,
859            DurabilityTier::Inline,
860            DurabilityTier::Durable,
861        );
862        let err = run(&worker)
863            .await
864            .expect_err("ephemeral process registry must be rejected at the worker boundary");
865        assert_facet(err, DurableStoreFacet::ProcessRegistry);
866    }
867
868    #[tokio::test]
869    async fn durable_worker_rejects_ephemeral_host_event_store() {
870        let worker = worker_with_store_tiers(
871            Arc::new(DurableAttachmentStore::default()),
872            Arc::new(DurableArtifactStore::default()),
873            DurabilityTier::Durable,
874            DurabilityTier::Durable,
875            DurabilityTier::Inline,
876        );
877        let err = run(&worker)
878            .await
879            .expect_err("ephemeral host event store must be rejected at the worker boundary");
880        assert_facet(err, DurableStoreFacet::HostEventStore);
881    }
882
883    #[tokio::test]
884    async fn durable_worker_with_all_durable_stores_passes_store_facet_check() {
885        // Positive control: a durable worker wired against fully durable stores
886        // clears the store-facet guard and proceeds to run the (External)
887        // process.
888        let worker = worker(
889            Arc::new(DurableAttachmentStore::default()),
890            Arc::new(DurableArtifactStore::default()),
891            DurabilityTier::Durable,
892        );
893        let output = run(&worker)
894            .await
895            .expect("all-durable worker should pass the store-facet guard");
896        assert!(matches!(output, ProcessAwaitOutput::Success { .. }));
897    }
898
899    #[tokio::test]
900    async fn inline_worker_passes_store_facet_check_with_ephemeral_stores() {
901        // Inline controllers impose no requirement, so an in-memory worker runs
902        // unchanged — the durable-first guard must not regress inline hosts.
903        let runtime_host = RuntimeHostConfig::in_memory();
904        let plugin_host = Arc::new(crate::PluginHost::new(Vec::new()));
905        let factory: Arc<dyn SessionStoreFactory> = Arc::new(TierSessionStoreFactory {
906            tier: DurabilityTier::Inline,
907        });
908        let registry: Arc<dyn ProcessRegistry> =
909            Arc::new(crate::TestLocalProcessRegistry::default());
910        let worker = DurableProcessWorker::new(DurableProcessWorkerConfig::new(
911            plugin_host,
912            runtime_host,
913            factory,
914            registry,
915        ));
916        let output = run(&worker)
917            .await
918            .expect("inline worker should pass the store-facet guard");
919        assert!(matches!(output, ProcessAwaitOutput::Success { .. }));
920    }
921}