Skip to main content

lash_core/runtime/
process_worker.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use tokio_util::sync::CancellationToken;
5
6use super::effect::ProcessRunner;
7use super::session_manager::RuntimeSessionServices;
8use super::{EmbeddedRuntimeBuilder, RUNTIME_TURN_LEASE_TTL_MS, RuntimeHostConfig};
9use crate::{
10    LashRuntime, PluginError, PluginFactory, PluginHost, PluginStack, ProcessAwaitOutput,
11    ProcessExecutionContext, ProcessInput, ProcessLease, ProcessLeaseCompletion, ProcessRecord,
12    ProcessRegistration, ProcessRegistry, SessionStoreCreateRequest, SessionStoreFactory,
13};
14
15/// Deployment-local configuration for rebuilding durable process executions.
16///
17/// Process rows intentionally carry only portable process input and provenance.
18/// Workers provide the host profile, plugins, providers, stores, secrets, and
19/// host capabilities for the deployment that owns those rows.
20#[derive(Clone)]
21pub struct DurableProcessWorkerConfig {
22    pub plugin_host: Arc<PluginHost>,
23    pub runtime_host: RuntimeHostConfig,
24    pub session_policy: crate::SessionPolicy,
25    pub session_store_factory: Arc<dyn SessionStoreFactory>,
26    pub process_registry: Arc<dyn ProcessRegistry>,
27    pub host_event_store: Arc<dyn crate::HostEventStore>,
28    /// Residency for sessions the worker rebuilds to run a process. Defaults to
29    /// [`Residency::KeepAll`]; a host running [`Residency::ActivePathOnly`] wires
30    /// it here so the worker's rebuilt sessions trim to the active path too,
31    /// instead of silently diverging from the live runtime by keeping the full
32    /// graph resident.
33    pub residency: crate::Residency,
34}
35
36impl DurableProcessWorkerConfig {
37    pub fn new(
38        plugin_host: Arc<PluginHost>,
39        runtime_host: RuntimeHostConfig,
40        session_store_factory: Arc<dyn SessionStoreFactory>,
41        process_registry: Arc<dyn ProcessRegistry>,
42    ) -> Self {
43        Self {
44            plugin_host,
45            runtime_host,
46            session_policy: crate::SessionPolicy::default(),
47            session_store_factory,
48            process_registry,
49            host_event_store: Arc::new(crate::InMemoryHostEventStore::default()),
50            residency: crate::Residency::default(),
51        }
52    }
53
54    pub fn with_host_event_store(mut self, store: Arc<dyn crate::HostEventStore>) -> Self {
55        self.host_event_store = store;
56        self
57    }
58
59    pub fn with_session_policy(mut self, policy: crate::SessionPolicy) -> Self {
60        self.session_policy = policy;
61        self
62    }
63
64    pub fn with_residency(mut self, residency: crate::Residency) -> Self {
65        self.residency = residency;
66        self
67    }
68
69    pub fn from_plugin_factories(
70        plugin_factories: impl IntoIterator<Item = Arc<dyn PluginFactory>>,
71        runtime_host: RuntimeHostConfig,
72        session_store_factory: Arc<dyn SessionStoreFactory>,
73        process_registry: Arc<dyn ProcessRegistry>,
74    ) -> Self {
75        Self::new(
76            Arc::new(PluginHost::new(plugin_factories.into_iter().collect())),
77            runtime_host,
78            session_store_factory,
79            process_registry,
80        )
81    }
82
83    pub fn from_plugin_stack(
84        plugin_stack: PluginStack,
85        runtime_host: RuntimeHostConfig,
86        session_store_factory: Arc<dyn SessionStoreFactory>,
87        process_registry: Arc<dyn ProcessRegistry>,
88    ) -> Self {
89        Self::from_plugin_factories(
90            plugin_stack.into_factories(),
91            runtime_host,
92            session_store_factory,
93            process_registry,
94        )
95    }
96}
97
98/// Reconstructable background-process worker.
99#[derive(Clone)]
100pub struct DurableProcessWorker {
101    config: Arc<DurableProcessWorkerConfig>,
102}
103
104/// Why a recovery run did not produce a terminal outcome under the lease.
105enum RecoverFailure {
106    /// The lease was lost mid-run (another owner reclaimed an expired lease).
107    /// The losing worker must not write a terminal outcome — the new owner is
108    /// now the single writer.
109    LeaseLost(PluginError),
110    /// The process could not be run (rebuild/store-facet failure). The lease is
111    /// still held, so this worker terminalizes the row.
112    Run(PluginError),
113}
114
115impl DurableProcessWorker {
116    pub fn new(config: DurableProcessWorkerConfig) -> Self {
117        Self {
118            config: Arc::new(config),
119        }
120    }
121
122    pub fn from_shared_config(config: Arc<DurableProcessWorkerConfig>) -> Self {
123        Self { config }
124    }
125
126    pub fn config(&self) -> &DurableProcessWorkerConfig {
127        &self.config
128    }
129
130    pub async fn run_process(
131        &self,
132        registration: ProcessRegistration,
133        execution_context: ProcessExecutionContext,
134        cancellation: CancellationToken,
135    ) -> Result<ProcessAwaitOutput, PluginError> {
136        let scoped_effect_controller = self
137            .config
138            .runtime_host
139            .control
140            .effect_host
141            .scoped_static(crate::EffectScope::process(registration.id.clone()))
142            .map_err(|err| PluginError::Session(err.to_string()))?
143            .ok_or_else(|| {
144                PluginError::Session(
145                    "process worker effect host must provide a static process scope".to_string(),
146                )
147            })?;
148        self.run_process_with_scoped_effect_controller(
149            registration,
150            execution_context,
151            scoped_effect_controller,
152            cancellation,
153        )
154        .await
155    }
156
157    pub async fn run_process_with_scoped_effect_controller(
158        &self,
159        registration: ProcessRegistration,
160        execution_context: ProcessExecutionContext,
161        scoped_effect_controller: crate::ScopedEffectController<'_>,
162        cancellation: CancellationToken,
163    ) -> Result<ProcessAwaitOutput, PluginError> {
164        self.ensure_stable_process_id(&registration)?;
165        self.ensure_host_profile_matches(&registration)?;
166        self.ensure_durable_store_facets()?;
167        if let ProcessInput::External { metadata } = registration.input.as_ref() {
168            return Ok(ProcessAwaitOutput::Success {
169                value: serde_json::json!({ "metadata": metadata.clone() }),
170                control: None,
171            });
172        }
173        let session_id = registration.provenance.owner_scope.session_id.as_str();
174        if session_id.is_empty() {
175            return Err(PluginError::Session(format!(
176                "process `{}` is missing a structured owner scope",
177                registration.id
178            )));
179        }
180        let runtime = self.rebuild_runtime(session_id).await?;
181        let manager = RuntimeSessionServices::new(&runtime, true, None).map_err(|err| {
182            PluginError::Session(format!(
183                "failed to rebuild runtime session `{session_id}` for process `{}`: {err}",
184                registration.id
185            ))
186        })?;
187        Ok(manager
188            .run_process(
189                registration,
190                execution_context,
191                Arc::clone(&self.config.process_registry),
192                scoped_effect_controller,
193                cancellation,
194            )
195            .await)
196    }
197
198    /// Sweep the registry for non-terminal processes and re-execute the ones
199    /// this worker can claim, driving each to a terminal state.
200    ///
201    /// This is the crash-recovery counterpart to a worker that ran a process
202    /// from a live turn: a trigger/host-event-started process whose worker
203    /// died mid-flight is left non-terminal in the registry, and a subsequent
204    /// worker reopening that registry must finish it. The sweep:
205    ///
206    /// 1. lists every non-terminal process ([`ProcessRegistry::list_non_terminal`]);
207    /// 2. claims the durable single-owner [`ProcessLease`] over each — a process
208    ///    already leased live by *another* owner is skipped (it is being run by
209    ///    that owner right now), so a non-terminal process is re-run by exactly
210    ///    one owner (lease fencing);
211    /// 3. runs the claimed process on this worker's wired controller, renewing
212    ///    the lease across the long-running execution so a healthy recovery is
213    ///    not swept out from under itself;
214    /// 4. writes the terminal outcome and releases the lease.
215    ///
216    /// Idempotent by `process_id`: terminal processes are never in the worklist,
217    /// and a process that became terminal between the list and the claim is
218    /// detected after claiming and skipped, so re-running a recovery sweep does
219    /// not double-execute completed work.
220    pub async fn drive_pending_processes(&self) -> Result<(), PluginError> {
221        let records = self.config.process_registry.list_non_terminal().await?;
222        for record in records {
223            // Run each claimed process on its OWN lease-fenced task. A sequential
224            // drive that awaited each process to terminal would deadlock a process
225            // that blocks awaiting a nested child (`start child` then `await`, or a
226            // subagent fan-out): the one drive task would park inside the parent's
227            // await and never claim the child. Spawning frees the loop so a
228            // subsequent drive (poke or poll) claims and runs the child, and the
229            // per-process `ProcessLease` fences concurrent owners — so spawning a
230            // task per pending row on every drive is idempotent (a row already
231            // running is skipped on claim conflict) and one failing row never
232            // aborts the rest of the sweep.
233            let worker = self.clone();
234            tokio::spawn(async move { worker.recover_process(record).await });
235        }
236        Ok(())
237    }
238
239    async fn recover_process(&self, record: ProcessRecord) {
240        let owner_id = format!("process-recovery-{}", uuid::Uuid::new_v4());
241        let process_id = record.id.clone();
242        // Skip if held live by another owner: a claim conflict means a worker is
243        // already running this process, so re-running here would violate the
244        // single-owner contract. Treat any claim failure as "leased elsewhere".
245        let Ok(lease) = self
246            .config
247            .process_registry
248            .claim_process_lease(&process_id, &owner_id, RUNTIME_TURN_LEASE_TTL_MS)
249            .await
250        else {
251            return;
252        };
253        // The process may have reached a terminal state between the list and the
254        // claim. Idempotent by process_id: do not re-execute a finished process.
255        if self
256            .config
257            .process_registry
258            .get_process(&process_id)
259            .await
260            .is_some_and(|current| current.is_terminal())
261        {
262            self.release_or_log(&lease).await;
263            return;
264        }
265        let registration = ProcessRegistration {
266            id: record.id,
267            input: record.input,
268            event_types: record.event_types,
269            provenance: record.provenance.clone(),
270        };
271        // Wakes route to the creator scope; on recovery the owner scope persisted
272        // in provenance is that creator scope, so it is the wake target.
273        let execution_context = ProcessExecutionContext::default()
274            .with_wake_target_scope(record.provenance.owner_scope);
275        match self
276            .run_process_with_lease_renewal(registration, execution_context, lease.clone())
277            .await
278        {
279            // Ran to a terminal outcome (success or a process-level failure) while
280            // holding the lease: this owner is the single writer of the terminal.
281            Ok(output) => self.complete_and_release(&lease, &process_id, output).await,
282            // The lease was lost mid-run — another owner reclaimed the expired
283            // lease and is now running this process. Do NOT write a terminal
284            // outcome or release the lease: that would race the new owner and
285            // could record a succeeded process as Failed. Leave the row to the
286            // lease holder; it will finish (or another sweep retries it).
287            Err(RecoverFailure::LeaseLost(err)) => {
288                tracing::warn!(
289                    process_id = %process_id,
290                    error = %err,
291                    "process recovery lost its lease mid-run; deferring to the new owner",
292                );
293            }
294            // The process could not be run at all (rebuild/store-facet failure):
295            // terminalize as a recovery failure so the row leaves the worklist.
296            Err(RecoverFailure::Run(err)) => {
297                let output = ProcessAwaitOutput::Failure {
298                    class: crate::ToolFailureClass::Execution,
299                    code: "process_recovery_failed".to_string(),
300                    message: err.to_string(),
301                    raw: None,
302                    control: None,
303                };
304                self.complete_and_release(&lease, &process_id, output).await;
305            }
306        }
307    }
308
309    /// Write a recovered process's terminal outcome (the running lease owner is
310    /// the single writer) and then release the lease, logging either failure
311    /// rather than aborting — the lease's TTL is the backstop.
312    async fn complete_and_release(
313        &self,
314        lease: &ProcessLease,
315        process_id: &str,
316        output: ProcessAwaitOutput,
317    ) {
318        // Fence the terminal write: re-confirm the lease immediately before
319        // writing. `renew_process_lease` is rejected (by owner/lease_token/
320        // fencing_token) if another owner has reclaimed an expired lease, and on
321        // success extends the window so the back-to-back write lands inside the
322        // owned interval. A worker that stalled past its TTL therefore cannot
323        // overwrite the new owner's outcome — it defers instead.
324        let fenced = match self
325            .config
326            .process_registry
327            .renew_process_lease(lease, RUNTIME_TURN_LEASE_TTL_MS)
328            .await
329        {
330            Ok(renewed) => renewed,
331            Err(err) => {
332                tracing::warn!(
333                    process_id = %process_id,
334                    error = %err,
335                    "lost process lease before terminal write; deferring to the new owner",
336                );
337                return;
338            }
339        };
340        if let Err(err) = self
341            .config
342            .process_registry
343            .complete_process(process_id, output)
344            .await
345        {
346            tracing::warn!(
347                process_id = %process_id,
348                error = %err,
349                "failed to write recovered process terminal outcome",
350            );
351        }
352        self.release_or_log(&fenced).await;
353    }
354
355    async fn release_or_log(&self, lease: &ProcessLease) {
356        if let Err(err) = self.release_process_lease(lease).await {
357            tracing::warn!(
358                process_id = %lease.process_id,
359                error = %err,
360                "failed to release recovered process lease",
361            );
362        }
363    }
364
365    /// Run a recovered process while renewing its lease across the execution,
366    /// mirroring the turn-lease renewal that keeps a long-running effect's lease
367    /// from expiring under the live owner.
368    async fn run_process_with_lease_renewal(
369        &self,
370        registration: ProcessRegistration,
371        execution_context: ProcessExecutionContext,
372        mut lease: ProcessLease,
373    ) -> Result<ProcessAwaitOutput, RecoverFailure> {
374        let process_id = registration.id.clone();
375        let cancellation = CancellationToken::new();
376        let cancel_watcher = {
377            let registry = Arc::clone(&self.config.process_registry);
378            let process_id = process_id.clone();
379            let cancellation = cancellation.clone();
380            tokio::spawn(async move {
381                match registry
382                    .wait_event_after(&process_id, "process.cancel_requested", 0)
383                    .await
384                {
385                    Ok(_) => cancellation.cancel(),
386                    Err(err) => tracing::warn!(
387                        process_id = %process_id,
388                        error = %err,
389                        "process cancel watcher stopped before observing cancellation",
390                    ),
391                }
392            })
393        };
394        let pending = self.run_process(registration, execution_context, cancellation.clone());
395        tokio::pin!(pending);
396        loop {
397            tokio::select! {
398                outcome = &mut pending => {
399                    cancel_watcher.abort();
400                    return outcome.map_err(RecoverFailure::Run);
401                }
402                _ = tokio::time::sleep(process_lease_renew_interval()) => {
403                    match self
404                        .config
405                        .process_registry
406                        .renew_process_lease(&lease, RUNTIME_TURN_LEASE_TTL_MS)
407                        .await
408                    {
409                        Ok(renewed) => lease = renewed,
410                        Err(err) => {
411                            cancellation.cancel();
412                            cancel_watcher.abort();
413                            return Err(RecoverFailure::LeaseLost(err));
414                        }
415                    }
416                }
417            }
418        }
419    }
420
421    async fn release_process_lease(&self, lease: &ProcessLease) -> Result<(), PluginError> {
422        self.config
423            .process_registry
424            .complete_process_lease(&ProcessLeaseCompletion::from_lease(lease))
425            .await
426    }
427
428    pub async fn request_process_cancel(
429        &self,
430        process_id: &str,
431        reason: Option<String>,
432    ) -> Result<(), PluginError> {
433        self.config
434            .process_registry
435            .append_event(
436                process_id,
437                crate::ProcessEventAppendRequest::cancel_requested(process_id, reason),
438            )
439            .await
440            .map(|_| ())
441    }
442
443    async fn rebuild_runtime(&self, session_id: &str) -> Result<LashRuntime, PluginError> {
444        let store = self
445            .config
446            .session_store_factory
447            .create_store(&SessionStoreCreateRequest {
448                session_id: session_id.to_string(),
449                relation: crate::SessionRelation::Root,
450                policy: self.config.session_policy.clone(),
451            })
452            .await
453            .map_err(|err| {
454                PluginError::Session(format!(
455                    "failed to open session store for process worker session `{session_id}`: {err}"
456                ))
457            })?;
458        EmbeddedRuntimeBuilder::new()
459            .with_session_id(session_id.to_string())
460            .with_plugin_host(self.config.plugin_host.as_ref().clone())
461            .with_runtime_host(self.config.runtime_host.clone())
462            .with_policy(self.config.session_policy.clone())
463            .with_session_store_factory(Arc::clone(&self.config.session_store_factory))
464            .with_host_event_store(Arc::clone(&self.config.host_event_store))
465            .with_process_registry(Arc::clone(&self.config.process_registry))
466            .with_residency(self.config.residency)
467            .with_store(store)
468            .build()
469            .await
470            .map_err(|err| {
471                PluginError::Session(format!(
472                    "failed to rebuild process worker runtime for session `{session_id}`: {err}"
473                ))
474            })
475    }
476
477    /// Enforce the durable-first wiring invariant at the worker process-run
478    /// boundary: when the worker was wired with a durable effect host, every
479    /// store it will execute against must also be durable. A durable host
480    /// running against any ephemeral store fails loudly here rather than
481    /// silently re-executing a process against non-durable state.
482    ///
483    /// Inline controllers (the default tier) impose no requirement, so
484    /// inline/in-memory workers pass unchanged.
485    fn ensure_durable_store_facets(&self) -> Result<(), PluginError> {
486        if self
487            .config
488            .runtime_host
489            .control
490            .effect_host
491            .durability_tier()
492            != crate::DurabilityTier::Durable
493        {
494            return Ok(());
495        }
496        let require = |facet: crate::DurableStoreFacet| {
497            PluginError::Session(crate::RuntimeError::durable_store_required(facet).to_string())
498        };
499        if self
500            .config
501            .runtime_host
502            .durability
503            .attachment_store
504            .persistence()
505            .durability_tier()
506            != crate::DurabilityTier::Durable
507        {
508            return Err(require(crate::DurableStoreFacet::AttachmentStore));
509        }
510        if self
511            .config
512            .runtime_host
513            .durability
514            .lashlang_artifact_store
515            .durability_tier()
516            != crate::DurabilityTier::Durable
517        {
518            return Err(require(crate::DurableStoreFacet::ArtifactStore));
519        }
520        if self.config.session_store_factory.durability_tier() != crate::DurabilityTier::Durable {
521            return Err(require(crate::DurableStoreFacet::SessionStore));
522        }
523        if self.config.process_registry.durability_tier() != crate::DurabilityTier::Durable {
524            return Err(require(crate::DurableStoreFacet::ProcessRegistry));
525        }
526        if self.config.host_event_store.durability_tier() != crate::DurabilityTier::Durable {
527            return Err(require(crate::DurableStoreFacet::HostEventStore));
528        }
529        Ok(())
530    }
531
532    /// Enforce the stable-process-id invariant at every (re-)execution: process
533    /// execution identity is the persisted `process_id`, so a retry — a Restate
534    /// `run` re-invocation (keyed `LashProcessWorkflow/{process_id}`) or a
535    /// recovery sweep re-running a non-terminal row — must present that stable
536    /// id. An empty/fresh id has lost its idempotency anchor and is rejected
537    /// loudly here, mirroring how `EffectScope` rejects an
538    /// empty turn id at the durable-effect boundary.
539    fn ensure_stable_process_id(
540        &self,
541        registration: &ProcessRegistration,
542    ) -> Result<(), PluginError> {
543        if registration.id.trim().is_empty() {
544            return Err(PluginError::Session(
545                crate::RuntimeError::missing_process_execution_id().to_string(),
546            ));
547        }
548        Ok(())
549    }
550
551    fn ensure_host_profile_matches(
552        &self,
553        registration: &ProcessRegistration,
554    ) -> Result<(), PluginError> {
555        let actual = registration.provenance.host_profile_id.as_str();
556        let expected = self.config.runtime_host.profile.host_profile_id.as_str();
557        if actual.is_empty() || actual == expected {
558            return Ok(());
559        }
560        Err(PluginError::Session(format!(
561            "process `{}` was created for host profile `{actual}` but this worker is `{expected}`",
562            registration.id
563        )))
564    }
565}
566
567fn process_lease_renew_interval() -> Duration {
568    Duration::from_millis(process_lease_renew_interval_ms())
569}
570
571#[cfg(test)]
572fn process_lease_renew_interval_ms() -> u64 {
573    25
574}
575
576#[cfg(not(test))]
577fn process_lease_renew_interval_ms() -> u64 {
578    30_000
579}
580
581#[cfg(test)]
582mod boundary_tests {
583    use super::*;
584    use crate::{
585        AttachmentStore, AttachmentStoreError, AttachmentStorePersistence, DurabilityTier,
586        DurableStoreFacet, HostEventStore, InMemoryAttachmentStore, LashlangArtifactStore,
587        ProcessInput, ProcessRegistration, RuntimeEffectController, RuntimeError, StoredAttachment,
588    };
589    use lash_sansio::{AttachmentCreateMeta, AttachmentId, AttachmentRef};
590
591    /// Effect controller that reports the durable tier; the worker boundary
592    /// only reads the tier, so the effect path is never exercised here.
593    #[derive(Default)]
594    struct DurableController;
595
596    #[async_trait::async_trait]
597    impl RuntimeEffectController for DurableController {
598        fn durability_tier(&self) -> DurabilityTier {
599            DurabilityTier::Durable
600        }
601
602        async fn execute_effect(
603            &self,
604            _envelope: crate::RuntimeEffectEnvelope,
605            _local_executor: crate::RuntimeEffectLocalExecutor<'_>,
606        ) -> Result<crate::RuntimeEffectOutcome, crate::RuntimeEffectControllerError> {
607            unreachable!("worker boundary rejects before executing any effect")
608        }
609    }
610
611    /// Attachment store reporting a durable tier over in-memory storage.
612    #[derive(Default)]
613    struct DurableAttachmentStore {
614        inner: InMemoryAttachmentStore,
615    }
616
617    #[async_trait::async_trait]
618    impl AttachmentStore for DurableAttachmentStore {
619        fn persistence(&self) -> AttachmentStorePersistence {
620            AttachmentStorePersistence::Durable
621        }
622
623        async fn put(
624            &self,
625            bytes: Vec<u8>,
626            meta: AttachmentCreateMeta,
627        ) -> Result<AttachmentRef, AttachmentStoreError> {
628            self.inner.put(bytes, meta).await
629        }
630
631        async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
632            self.inner.get(id).await
633        }
634    }
635
636    /// Lashlang artifact store reporting a durable tier over in-memory storage.
637    #[derive(Default)]
638    struct DurableArtifactStore {
639        inner: lashlang::InMemoryLashlangArtifactStore,
640    }
641
642    #[async_trait::async_trait]
643    impl LashlangArtifactStore for DurableArtifactStore {
644        fn durability_tier(&self) -> DurabilityTier {
645            DurabilityTier::Durable
646        }
647
648        async fn put_module_artifact(
649            &self,
650            artifact: &lashlang::ModuleArtifact,
651        ) -> Result<(), lashlang::ArtifactStoreError> {
652            self.inner.put_module_artifact(artifact).await
653        }
654
655        async fn get_module_artifact(
656            &self,
657            module_ref: &lashlang::ModuleRef,
658        ) -> Result<Option<Arc<lashlang::ModuleArtifact>>, lashlang::ArtifactStoreError> {
659            self.inner.get_module_artifact(module_ref).await
660        }
661    }
662
663    /// Session store factory whose declared tier is configurable; it never has
664    /// to create a store because the worker boundary rejects first.
665    struct TierSessionStoreFactory {
666        tier: DurabilityTier,
667    }
668
669    #[async_trait::async_trait]
670    impl SessionStoreFactory for TierSessionStoreFactory {
671        fn durability_tier(&self) -> DurabilityTier {
672            self.tier
673        }
674
675        async fn create_store(
676            &self,
677            _request: &crate::SessionStoreCreateRequest,
678        ) -> Result<Arc<dyn crate::RuntimePersistence>, String> {
679            unreachable!("worker boundary rejects before creating a session store")
680        }
681
682        async fn delete_session(&self, _session_id: &str) -> Result<(), String> {
683            Ok(())
684        }
685    }
686
687    struct TierHostEventStore {
688        tier: DurabilityTier,
689        inner: crate::InMemoryHostEventStore,
690    }
691
692    impl TierHostEventStore {
693        fn new(tier: DurabilityTier) -> Self {
694            Self {
695                tier,
696                inner: crate::InMemoryHostEventStore::default(),
697            }
698        }
699    }
700
701    #[async_trait::async_trait]
702    impl HostEventStore for TierHostEventStore {
703        fn durability_tier(&self) -> DurabilityTier {
704            self.tier
705        }
706
707        async fn register_subscription(
708            &self,
709            draft: crate::TriggerSubscriptionDraft,
710        ) -> Result<crate::TriggerSubscriptionRecord, PluginError> {
711            self.inner.register_subscription(draft).await
712        }
713
714        async fn list_subscriptions(
715            &self,
716            filter: crate::TriggerSubscriptionFilter,
717        ) -> Result<Vec<crate::TriggerSubscriptionRecord>, PluginError> {
718            self.inner.list_subscriptions(filter).await
719        }
720
721        async fn cancel_subscription(
722            &self,
723            session_id: &str,
724            handle: &str,
725        ) -> Result<bool, PluginError> {
726            self.inner.cancel_subscription(session_id, handle).await
727        }
728
729        async fn record_occurrence(
730            &self,
731            request: crate::HostEventOccurrenceRequest,
732        ) -> Result<crate::HostEventOccurrenceRecord, PluginError> {
733            self.inner.record_occurrence(request).await
734        }
735
736        async fn reserve_matching_deliveries(
737            &self,
738            occurrence_id: &str,
739        ) -> Result<Vec<crate::TriggerDeliveryReservation>, PluginError> {
740            self.inner.reserve_matching_deliveries(occurrence_id).await
741        }
742    }
743
744    /// Build a worker whose controller is durable but whose stores can be set
745    /// per-facet to durable/ephemeral, so each facet's loud rejection can be
746    /// exercised independently.
747    fn worker(
748        attachment: Arc<dyn AttachmentStore>,
749        artifact: Arc<dyn LashlangArtifactStore>,
750        session_store_tier: DurabilityTier,
751    ) -> DurableProcessWorker {
752        worker_with_store_tiers(
753            attachment,
754            artifact,
755            session_store_tier,
756            DurabilityTier::Durable,
757            DurabilityTier::Durable,
758        )
759    }
760
761    fn worker_with_store_tiers(
762        attachment: Arc<dyn AttachmentStore>,
763        artifact: Arc<dyn LashlangArtifactStore>,
764        session_store_tier: DurabilityTier,
765        process_registry_tier: DurabilityTier,
766        host_event_store_tier: DurabilityTier,
767    ) -> DurableProcessWorker {
768        let mut runtime_host = RuntimeHostConfig::in_memory();
769        runtime_host.control.effect_host =
770            Arc::new(crate::InlineEffectHost::new(Arc::new(DurableController)));
771        runtime_host.durability.attachment_store = attachment;
772        runtime_host.durability.lashlang_artifact_store = artifact;
773        let plugin_host = Arc::new(crate::PluginHost::new(Vec::new()));
774        let factory: Arc<dyn SessionStoreFactory> = Arc::new(TierSessionStoreFactory {
775            tier: session_store_tier,
776        });
777        let registry: Arc<dyn ProcessRegistry> = Arc::new(
778            crate::TestLocalProcessRegistry::default().with_durability_tier(process_registry_tier),
779        );
780        let host_event_store: Arc<dyn HostEventStore> =
781            Arc::new(TierHostEventStore::new(host_event_store_tier));
782        DurableProcessWorker::new(
783            DurableProcessWorkerConfig::new(plugin_host, runtime_host, factory, registry)
784                .with_host_event_store(host_event_store),
785        )
786    }
787
788    fn external_registration() -> ProcessRegistration {
789        ProcessRegistration::new(
790            "worker-boundary-process",
791            ProcessInput::External {
792                metadata: serde_json::json!({}),
793            },
794        )
795    }
796
797    async fn run(worker: &DurableProcessWorker) -> Result<ProcessAwaitOutput, PluginError> {
798        worker
799            .run_process(
800                external_registration(),
801                ProcessExecutionContext::default(),
802                CancellationToken::new(),
803            )
804            .await
805    }
806
807    fn assert_facet(err: PluginError, facet: DurableStoreFacet) {
808        let PluginError::Session(message) = err else {
809            panic!("expected PluginError::Session, got {err:?}");
810        };
811        let expected = RuntimeError::durable_store_required(facet).to_string();
812        assert_eq!(message, expected, "worker must reject the {facet:?} facet");
813    }
814
815    #[tokio::test]
816    async fn durable_worker_rejects_ephemeral_attachment_store() {
817        let worker = worker(
818            Arc::new(InMemoryAttachmentStore::new()),
819            Arc::new(DurableArtifactStore::default()),
820            DurabilityTier::Durable,
821        );
822        let err = run(&worker)
823            .await
824            .expect_err("ephemeral attachment store must be rejected at the worker boundary");
825        assert_facet(err, DurableStoreFacet::AttachmentStore);
826    }
827
828    #[tokio::test]
829    async fn durable_worker_rejects_ephemeral_artifact_store() {
830        let worker = worker(
831            Arc::new(DurableAttachmentStore::default()),
832            Arc::new(lashlang::InMemoryLashlangArtifactStore::new()),
833            DurabilityTier::Durable,
834        );
835        let err = run(&worker)
836            .await
837            .expect_err("ephemeral artifact store must be rejected at the worker boundary");
838        assert_facet(err, DurableStoreFacet::ArtifactStore);
839    }
840
841    #[tokio::test]
842    async fn durable_worker_rejects_ephemeral_session_store_factory() {
843        let worker = worker(
844            Arc::new(DurableAttachmentStore::default()),
845            Arc::new(DurableArtifactStore::default()),
846            DurabilityTier::Inline,
847        );
848        let err = run(&worker)
849            .await
850            .expect_err("ephemeral session store factory must be rejected at the worker boundary");
851        assert_facet(err, DurableStoreFacet::SessionStore);
852    }
853
854    #[tokio::test]
855    async fn durable_worker_rejects_ephemeral_process_registry() {
856        let worker = worker_with_store_tiers(
857            Arc::new(DurableAttachmentStore::default()),
858            Arc::new(DurableArtifactStore::default()),
859            DurabilityTier::Durable,
860            DurabilityTier::Inline,
861            DurabilityTier::Durable,
862        );
863        let err = run(&worker)
864            .await
865            .expect_err("ephemeral process registry must be rejected at the worker boundary");
866        assert_facet(err, DurableStoreFacet::ProcessRegistry);
867    }
868
869    #[tokio::test]
870    async fn durable_worker_rejects_ephemeral_host_event_store() {
871        let worker = worker_with_store_tiers(
872            Arc::new(DurableAttachmentStore::default()),
873            Arc::new(DurableArtifactStore::default()),
874            DurabilityTier::Durable,
875            DurabilityTier::Durable,
876            DurabilityTier::Inline,
877        );
878        let err = run(&worker)
879            .await
880            .expect_err("ephemeral host event store must be rejected at the worker boundary");
881        assert_facet(err, DurableStoreFacet::HostEventStore);
882    }
883
884    #[tokio::test]
885    async fn durable_worker_with_all_durable_stores_passes_store_facet_check() {
886        // Positive control: a durable worker wired against fully durable stores
887        // clears the store-facet guard and proceeds to run the (External)
888        // process.
889        let worker = worker(
890            Arc::new(DurableAttachmentStore::default()),
891            Arc::new(DurableArtifactStore::default()),
892            DurabilityTier::Durable,
893        );
894        let output = run(&worker)
895            .await
896            .expect("all-durable worker should pass the store-facet guard");
897        assert!(matches!(output, ProcessAwaitOutput::Success { .. }));
898    }
899
900    #[tokio::test]
901    async fn inline_worker_passes_store_facet_check_with_ephemeral_stores() {
902        // Inline controllers impose no requirement, so an in-memory worker runs
903        // unchanged — the durable-first guard must not regress inline hosts.
904        let runtime_host = RuntimeHostConfig::in_memory();
905        let plugin_host = Arc::new(crate::PluginHost::new(Vec::new()));
906        let factory: Arc<dyn SessionStoreFactory> = Arc::new(TierSessionStoreFactory {
907            tier: DurabilityTier::Inline,
908        });
909        let registry: Arc<dyn ProcessRegistry> =
910            Arc::new(crate::TestLocalProcessRegistry::default());
911        let worker = DurableProcessWorker::new(DurableProcessWorkerConfig::new(
912            plugin_host,
913            runtime_host,
914            factory,
915            registry,
916        ));
917        let output = run(&worker)
918            .await
919            .expect("inline worker should pass the store-facet guard");
920        assert!(matches!(output, ProcessAwaitOutput::Success { .. }));
921    }
922}