Skip to main content

lash_core/runtime/
process_worker.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use tokio_util::sync::CancellationToken;
5
6use super::effect::ProcessRunner;
7use super::session_manager::RuntimeSessionServices;
8use super::{EmbeddedRuntimeBuilder, RUNTIME_TURN_LEASE_TTL_MS, RuntimeHostConfig};
9use crate::{
10    InMemorySessionStore, LashRuntime, PluginError, PluginFactory, PluginHost, PluginStack,
11    ProcessAwaitOutput, ProcessExecutionContext, ProcessInput, ProcessLease,
12    ProcessLeaseCompletion, ProcessRecord, ProcessRegistration, ProcessRegistry,
13    SessionStoreFactory,
14};
15
16/// Deployment-local configuration for rebuilding durable process executions.
17///
18/// Process rows intentionally carry only portable process input and provenance.
19/// Workers provide the host profile, plugins, providers, stores, secrets, and
20/// host capabilities for the deployment that owns those rows.
21#[derive(Clone)]
22pub struct DurableProcessWorkerConfig {
23    pub plugin_host: Arc<PluginHost>,
24    pub runtime_host: RuntimeHostConfig,
25    pub session_policy: crate::SessionPolicy,
26    pub session_store_factory: Arc<dyn SessionStoreFactory>,
27    pub process_registry: Arc<dyn ProcessRegistry>,
28    pub trigger_store: Arc<dyn crate::TriggerStore>,
29    #[doc(hidden)]
30    pub turn_phase_probe_slot: crate::runtime::RuntimeTurnPhaseProbeSlot,
31    /// Residency for sessions the worker rebuilds to run a process. Defaults to
32    /// [`Residency::KeepAll`]; a host running [`Residency::ActivePathOnly`] wires
33    /// it here so the worker's rebuilt sessions trim to the active path too,
34    /// instead of silently diverging from the live runtime by keeping the full
35    /// graph resident.
36    pub residency: crate::Residency,
37}
38
39impl DurableProcessWorkerConfig {
40    pub fn new(
41        plugin_host: Arc<PluginHost>,
42        runtime_host: RuntimeHostConfig,
43        session_store_factory: Arc<dyn SessionStoreFactory>,
44        process_registry: Arc<dyn ProcessRegistry>,
45    ) -> Self {
46        Self {
47            plugin_host,
48            runtime_host,
49            session_policy: crate::SessionPolicy::default(),
50            session_store_factory,
51            process_registry,
52            trigger_store: Arc::new(crate::InMemoryTriggerStore::default()),
53            turn_phase_probe_slot: crate::runtime::RuntimeTurnPhaseProbeSlot::default(),
54            residency: crate::Residency::default(),
55        }
56    }
57
58    pub fn with_trigger_store(mut self, store: Arc<dyn crate::TriggerStore>) -> Self {
59        self.trigger_store = store;
60        self
61    }
62
63    pub fn with_session_policy(mut self, policy: crate::SessionPolicy) -> Self {
64        self.session_policy = policy;
65        self
66    }
67
68    pub fn with_residency(mut self, residency: crate::Residency) -> Self {
69        self.residency = residency;
70        self
71    }
72
73    #[doc(hidden)]
74    pub fn with_turn_phase_probe_slot(
75        mut self,
76        slot: crate::runtime::RuntimeTurnPhaseProbeSlot,
77    ) -> Self {
78        self.turn_phase_probe_slot = slot;
79        self
80    }
81
82    pub fn from_plugin_factories(
83        plugin_factories: impl IntoIterator<Item = Arc<dyn PluginFactory>>,
84        runtime_host: RuntimeHostConfig,
85        session_store_factory: Arc<dyn SessionStoreFactory>,
86        process_registry: Arc<dyn ProcessRegistry>,
87    ) -> Self {
88        Self::new(
89            Arc::new(PluginHost::new(plugin_factories.into_iter().collect())),
90            runtime_host,
91            session_store_factory,
92            process_registry,
93        )
94    }
95
96    pub fn from_plugin_stack(
97        plugin_stack: PluginStack,
98        runtime_host: RuntimeHostConfig,
99        session_store_factory: Arc<dyn SessionStoreFactory>,
100        process_registry: Arc<dyn ProcessRegistry>,
101    ) -> Self {
102        Self::from_plugin_factories(
103            plugin_stack.into_factories(),
104            runtime_host,
105            session_store_factory,
106            process_registry,
107        )
108    }
109}
110
111/// Reconstructable background-process worker.
112#[derive(Clone)]
113pub struct DurableProcessWorker {
114    config: Arc<DurableProcessWorkerConfig>,
115}
116
117/// Why a recovery run did not produce a terminal outcome under the lease.
118enum RecoverFailure {
119    /// The lease was lost mid-run (another owner reclaimed an expired lease).
120    /// The losing worker must not write a terminal outcome — the new owner is
121    /// now the single writer.
122    LeaseLost(PluginError),
123    /// The process could not be run (rebuild/store-facet failure). The lease is
124    /// still held, so this worker terminalizes the row.
125    Run(PluginError),
126}
127
128impl DurableProcessWorker {
129    pub fn new(config: DurableProcessWorkerConfig) -> Self {
130        Self {
131            config: Arc::new(config),
132        }
133    }
134
135    pub fn from_shared_config(config: Arc<DurableProcessWorkerConfig>) -> Self {
136        Self { config }
137    }
138
139    pub fn config(&self) -> &DurableProcessWorkerConfig {
140        &self.config
141    }
142
143    pub async fn run_process(
144        &self,
145        registration: ProcessRegistration,
146        execution_context: ProcessExecutionContext,
147        cancellation: CancellationToken,
148    ) -> Result<ProcessAwaitOutput, PluginError> {
149        let scoped_effect_controller = self
150            .config
151            .runtime_host
152            .control
153            .effect_host
154            .scoped_static(crate::ExecutionScope::process(registration.id.clone()))
155            .map_err(|err| PluginError::Session(err.to_string()))?
156            .ok_or_else(|| {
157                PluginError::Session(
158                    "process worker effect host must provide a static process scope".to_string(),
159                )
160            })?;
161        self.run_process_with_scoped_effect_controller(
162            registration,
163            execution_context,
164            scoped_effect_controller,
165            cancellation,
166        )
167        .await
168    }
169
170    pub async fn run_process_with_scoped_effect_controller(
171        &self,
172        registration: ProcessRegistration,
173        execution_context: ProcessExecutionContext,
174        scoped_effect_controller: crate::ScopedEffectController<'_>,
175        cancellation: CancellationToken,
176    ) -> Result<ProcessAwaitOutput, PluginError> {
177        self.ensure_stable_process_id(&registration)?;
178        self.ensure_host_profile_matches(&registration)?;
179        self.ensure_durable_store_facets()?;
180        if let ProcessInput::External { metadata } = registration.input.as_ref() {
181            return Ok(ProcessAwaitOutput::Success {
182                value: serde_json::json!({ "metadata": metadata.clone() }),
183                control: None,
184            });
185        }
186        let mut runtime = self.runtime_for_registration(&registration).await?;
187        let originator_scope = if let crate::ProcessOriginator::Session { scope } =
188            &registration.provenance.originator
189        {
190            Some(scope)
191        } else {
192            None
193        };
194        let probe_scope = registration.wake_target.as_ref().or(originator_scope);
195        if let Some(probe) =
196            probe_scope.and_then(|scope| self.config.turn_phase_probe_slot.get_for_scope(scope))
197        {
198            runtime.set_turn_phase_probe(probe);
199        }
200        let manager = RuntimeSessionServices::new(&runtime, true, None).map_err(|err| {
201            PluginError::Session(format!(
202                "failed to build runtime env for process `{}`: {err}",
203                registration.id
204            ))
205        })?;
206        Ok(manager
207            .run_process(
208                registration,
209                execution_context,
210                Arc::clone(&self.config.process_registry),
211                scoped_effect_controller,
212                cancellation,
213            )
214            .await)
215    }
216
217    /// Sweep the registry for non-terminal processes and re-execute the ones
218    /// this worker can claim, driving each to a terminal state.
219    ///
220    /// This is the crash-recovery counterpart to a worker that ran a process
221    /// from a live turn: a trigger/trigger-started process whose worker
222    /// died mid-flight is left non-terminal in the registry, and a subsequent
223    /// worker reopening that registry must finish it. The sweep:
224    ///
225    /// 1. lists every non-terminal process ([`ProcessRegistry::list_non_terminal`]);
226    /// 2. claims the durable single-owner [`ProcessLease`] over each — a process
227    ///    already leased live by *another* owner is skipped (it is being run by
228    ///    that owner right now), so a non-terminal process is re-run by exactly
229    ///    one owner (lease fencing);
230    /// 3. runs the claimed process on this worker's wired controller, renewing
231    ///    the lease across the long-running execution so a healthy recovery is
232    ///    not swept out from under itself;
233    /// 4. writes the terminal outcome and releases the lease.
234    ///
235    /// Idempotent by `process_id`: terminal processes are never in the worklist,
236    /// and a process that became terminal between the list and the claim is
237    /// detected after claiming and skipped, so re-running a recovery sweep does
238    /// not double-execute completed work.
239    pub async fn drive_pending_processes(&self) -> Result<(), PluginError> {
240        let records = self.config.process_registry.list_non_terminal().await?;
241        for record in records {
242            // Run each claimed process on its OWN lease-fenced task. A sequential
243            // drive that awaited each process to terminal would deadlock a process
244            // that blocks awaiting a nested child (`start child` then `await`, or a
245            // subagent fan-out): the one drive task would park inside the parent's
246            // await and never claim the child. Spawning frees the loop so a
247            // subsequent drive (poke or poll) claims and runs the child, and the
248            // per-process `ProcessLease` fences concurrent owners — so spawning a
249            // task per pending row on every drive is idempotent (a row already
250            // running is skipped on claim conflict) and one failing row never
251            // aborts the rest of the sweep.
252            let worker = self.clone();
253            tokio::spawn(async move { worker.recover_process(record).await });
254        }
255        Ok(())
256    }
257
258    async fn recover_process(&self, record: ProcessRecord) {
259        let owner_id = format!("process-recovery-{}", uuid::Uuid::new_v4());
260        let process_id = record.id.clone();
261        // Skip if held live by another owner: a claim conflict means a worker is
262        // already running this process, so re-running here would violate the
263        // single-owner contract. Treat any claim failure as "leased elsewhere".
264        let Ok(lease) = self
265            .config
266            .process_registry
267            .claim_process_lease(&process_id, &owner_id, RUNTIME_TURN_LEASE_TTL_MS)
268            .await
269        else {
270            return;
271        };
272        // The process may have reached a terminal state between the list and the
273        // claim. Idempotent by process_id: do not re-execute a finished process.
274        if self
275            .config
276            .process_registry
277            .get_process(&process_id)
278            .await
279            .is_some_and(|current| current.is_terminal())
280        {
281            self.release_or_log(&lease).await;
282            return;
283        }
284        let registration = ProcessRegistration {
285            id: record.id,
286            input: record.input,
287            event_types: record.event_types,
288            provenance: record.provenance.clone(),
289            env_ref: record.env_ref.clone(),
290            wake_target: record.wake_target.clone(),
291        };
292        let execution_context = ProcessExecutionContext::default();
293        match self
294            .run_process_with_lease_renewal(registration, execution_context, lease.clone())
295            .await
296        {
297            // Ran to a terminal outcome (success or a process-level failure) while
298            // holding the lease: this owner is the single writer of the terminal.
299            Ok(output) => self.complete_and_release(&lease, &process_id, output).await,
300            // The lease was lost mid-run — another owner reclaimed the expired
301            // lease and is now running this process. Do NOT write a terminal
302            // outcome or release the lease: that would race the new owner and
303            // could record a succeeded process as Failed. Leave the row to the
304            // lease holder; it will finish (or another sweep retries it).
305            Err(RecoverFailure::LeaseLost(err)) => {
306                tracing::warn!(
307                    process_id = %process_id,
308                    error = %err,
309                    "process recovery lost its lease mid-run; deferring to the new owner",
310                );
311            }
312            // The process could not be run at all (rebuild/store-facet failure):
313            // terminalize as a recovery failure so the row leaves the worklist.
314            Err(RecoverFailure::Run(err)) => {
315                let output = ProcessAwaitOutput::Failure {
316                    class: crate::ToolFailureClass::Execution,
317                    code: "process_recovery_failed".to_string(),
318                    message: err.to_string(),
319                    raw: None,
320                    control: None,
321                };
322                self.complete_and_release(&lease, &process_id, output).await;
323            }
324        }
325    }
326
327    /// Write a recovered process's terminal outcome (the running lease owner is
328    /// the single writer) and then release the lease, logging either failure
329    /// rather than aborting — the lease's TTL is the backstop.
330    async fn complete_and_release(
331        &self,
332        lease: &ProcessLease,
333        process_id: &str,
334        output: ProcessAwaitOutput,
335    ) {
336        // Fence the terminal write: re-confirm the lease immediately before
337        // writing. `renew_process_lease` is rejected (by owner/lease_token/
338        // fencing_token) if another owner has reclaimed an expired lease, and on
339        // success extends the window so the back-to-back write lands inside the
340        // owned interval. A worker that stalled past its TTL therefore cannot
341        // overwrite the new owner's outcome — it defers instead.
342        let fenced = match self
343            .config
344            .process_registry
345            .renew_process_lease(lease, RUNTIME_TURN_LEASE_TTL_MS)
346            .await
347        {
348            Ok(renewed) => renewed,
349            Err(err) => {
350                tracing::warn!(
351                    process_id = %process_id,
352                    error = %err,
353                    "lost process lease before terminal write; deferring to the new owner",
354                );
355                return;
356            }
357        };
358        if let Err(err) = self
359            .config
360            .process_registry
361            .complete_process(process_id, output)
362            .await
363        {
364            tracing::warn!(
365                process_id = %process_id,
366                error = %err,
367                "failed to write recovered process terminal outcome",
368            );
369        }
370        self.release_or_log(&fenced).await;
371    }
372
373    async fn release_or_log(&self, lease: &ProcessLease) {
374        if let Err(err) = self.release_process_lease(lease).await {
375            tracing::warn!(
376                process_id = %lease.process_id,
377                error = %err,
378                "failed to release recovered process lease",
379            );
380        }
381    }
382
383    /// Run a recovered process while renewing its lease across the execution,
384    /// mirroring the turn-lease renewal that keeps a long-running effect's lease
385    /// from expiring under the live owner.
386    async fn run_process_with_lease_renewal(
387        &self,
388        registration: ProcessRegistration,
389        execution_context: ProcessExecutionContext,
390        mut lease: ProcessLease,
391    ) -> Result<ProcessAwaitOutput, RecoverFailure> {
392        let process_id = registration.id.clone();
393        let cancellation = CancellationToken::new();
394        let cancel_watcher = {
395            let registry = Arc::clone(&self.config.process_registry);
396            let process_id = process_id.clone();
397            let cancellation = cancellation.clone();
398            tokio::spawn(async move {
399                match registry
400                    .wait_event_after(&process_id, "process.cancel_requested", 0)
401                    .await
402                {
403                    Ok(_) => cancellation.cancel(),
404                    Err(err) => tracing::warn!(
405                        process_id = %process_id,
406                        error = %err,
407                        "process cancel watcher stopped before observing cancellation",
408                    ),
409                }
410            })
411        };
412        let pending = self.run_process(registration, execution_context, cancellation.clone());
413        tokio::pin!(pending);
414        loop {
415            tokio::select! {
416                outcome = &mut pending => {
417                    cancel_watcher.abort();
418                    return outcome.map_err(RecoverFailure::Run);
419                }
420                _ = tokio::time::sleep(process_lease_renew_interval()) => {
421                    match self
422                        .config
423                        .process_registry
424                        .renew_process_lease(&lease, RUNTIME_TURN_LEASE_TTL_MS)
425                        .await
426                    {
427                        Ok(renewed) => lease = renewed,
428                        Err(err) => {
429                            cancellation.cancel();
430                            cancel_watcher.abort();
431                            return Err(RecoverFailure::LeaseLost(err));
432                        }
433                    }
434                }
435            }
436        }
437    }
438
439    async fn release_process_lease(&self, lease: &ProcessLease) -> Result<(), PluginError> {
440        self.config
441            .process_registry
442            .complete_process_lease(&ProcessLeaseCompletion::from_lease(lease))
443            .await
444    }
445
446    pub async fn request_process_cancel(
447        &self,
448        process_id: &str,
449        reason: Option<String>,
450    ) -> Result<(), PluginError> {
451        self.config
452            .process_registry
453            .append_event(
454                process_id,
455                crate::ProcessEventAppendRequest::cancel_requested(process_id, reason),
456            )
457            .await
458            .map(|_| ())
459    }
460
461    async fn runtime_for_registration(
462        &self,
463        registration: &ProcessRegistration,
464    ) -> Result<LashRuntime, PluginError> {
465        match registration.input.as_ref() {
466            ProcessInput::SessionTurn { create_request, .. } => {
467                self.runtime_for_session_turn(registration, create_request.as_ref())
468                    .await
469            }
470            ProcessInput::ToolCall { .. } | ProcessInput::LashlangProcess { .. } => {
471                self.runtime_for_process_env(registration).await
472            }
473            ProcessInput::External { .. } => unreachable!("external processes short-circuit"),
474        }
475    }
476
477    async fn runtime_for_session_turn(
478        &self,
479        registration: &ProcessRegistration,
480        create_request: &crate::SessionCreateRequest,
481    ) -> Result<LashRuntime, PluginError> {
482        let mut policy = create_request
483            .policy
484            .clone()
485            .unwrap_or_else(|| self.config.session_policy.clone());
486        if policy.recorded_provider_id().is_empty() {
487            policy.provider_id = self.config.session_policy.provider_id.clone();
488        }
489        self.build_ephemeral_runtime(
490            format!("process-session-turn:{}", registration.id),
491            policy,
492            create_request.plugin_options.clone(),
493            "session turn request",
494        )
495        .await
496    }
497
498    async fn runtime_for_process_env(
499        &self,
500        registration: &ProcessRegistration,
501    ) -> Result<LashRuntime, PluginError> {
502        let Some(env_ref) = registration.env_ref.as_ref() else {
503            return Err(PluginError::Session(format!(
504                "process `{}` is missing a captured execution env",
505                registration.id
506            )));
507        };
508        let env = crate::load_process_execution_env(
509            self.config
510                .runtime_host
511                .durability
512                .lashlang_artifact_store
513                .as_ref(),
514            env_ref,
515        )
516        .await?;
517        self.build_ephemeral_runtime(
518            format!("process-env:{}", registration.id),
519            env.policy,
520            env.plugin_options,
521            env_ref.as_str(),
522        )
523        .await
524    }
525
526    async fn build_ephemeral_runtime(
527        &self,
528        session_id: String,
529        policy: crate::SessionPolicy,
530        plugin_options: crate::PluginOptions,
531        source_label: &str,
532    ) -> Result<LashRuntime, PluginError> {
533        let store = Arc::new(InMemorySessionStore::default());
534        EmbeddedRuntimeBuilder::new()
535            .with_session_id(session_id.to_string())
536            .with_plugin_host(self.config.plugin_host.as_ref().clone())
537            .with_runtime_host(self.config.runtime_host.clone())
538            .with_policy(policy)
539            .with_plugin_options(plugin_options)
540            .with_session_store_factory(Arc::clone(&self.config.session_store_factory))
541            .with_trigger_store(Arc::clone(&self.config.trigger_store))
542            .with_process_registry(Arc::clone(&self.config.process_registry))
543            .with_residency(self.config.residency)
544            .with_store(store)
545            .build()
546            .await
547            .map_err(|err| {
548                PluginError::Session(format!(
549                    "failed to build process worker runtime for {source_label}: {err}"
550                ))
551            })
552    }
553
554    /// Enforce the durable-first wiring invariant at the worker process-run
555    /// boundary: when the worker was wired with a durable effect host, every
556    /// store it will execute against must also be durable. A durable host
557    /// running against any ephemeral store fails loudly here rather than
558    /// silently re-executing a process against non-durable state.
559    ///
560    /// Inline controllers (the default tier) impose no requirement, so
561    /// inline/in-memory workers pass unchanged.
562    fn ensure_durable_store_facets(&self) -> Result<(), PluginError> {
563        if self
564            .config
565            .runtime_host
566            .control
567            .effect_host
568            .durability_tier()
569            != crate::DurabilityTier::Durable
570        {
571            return Ok(());
572        }
573        let require = |facet: crate::DurableStoreFacet| {
574            PluginError::Session(crate::RuntimeError::durable_store_required(facet).to_string())
575        };
576        if self
577            .config
578            .runtime_host
579            .durability
580            .attachment_store
581            .persistence()
582            .durability_tier()
583            != crate::DurabilityTier::Durable
584        {
585            return Err(require(crate::DurableStoreFacet::AttachmentStore));
586        }
587        if self
588            .config
589            .runtime_host
590            .durability
591            .lashlang_artifact_store
592            .durability_tier()
593            != crate::DurabilityTier::Durable
594        {
595            return Err(require(crate::DurableStoreFacet::ArtifactStore));
596        }
597        if self.config.session_store_factory.durability_tier() != crate::DurabilityTier::Durable {
598            return Err(require(crate::DurableStoreFacet::SessionStore));
599        }
600        if self.config.process_registry.durability_tier() != crate::DurabilityTier::Durable {
601            return Err(require(crate::DurableStoreFacet::ProcessRegistry));
602        }
603        if self.config.trigger_store.durability_tier() != crate::DurabilityTier::Durable {
604            return Err(require(crate::DurableStoreFacet::TriggerStore));
605        }
606        Ok(())
607    }
608
609    /// Enforce the stable-process-id invariant at every (re-)execution: process
610    /// execution identity is the persisted `process_id`, so a retry — a Restate
611    /// `run` re-invocation (keyed `LashProcessWorkflow/{process_id}`) or a
612    /// recovery sweep re-running a non-terminal row — must present that stable
613    /// id. An empty/fresh id has lost its idempotency anchor and is rejected
614    /// loudly here, mirroring how `ExecutionScope` rejects an
615    /// empty turn id at the durable-effect boundary.
616    fn ensure_stable_process_id(
617        &self,
618        registration: &ProcessRegistration,
619    ) -> Result<(), PluginError> {
620        if registration.id.trim().is_empty() {
621            return Err(PluginError::Session(
622                crate::RuntimeError::missing_process_execution_id().to_string(),
623            ));
624        }
625        Ok(())
626    }
627
628    fn ensure_host_profile_matches(
629        &self,
630        registration: &ProcessRegistration,
631    ) -> Result<(), PluginError> {
632        let actual = registration.provenance.host_profile_id.as_str();
633        let expected = self.config.runtime_host.profile.host_profile_id.as_str();
634        if actual.is_empty() || actual == expected {
635            return Ok(());
636        }
637        Err(PluginError::Session(format!(
638            "process `{}` was created for host profile `{actual}` but this worker is `{expected}`",
639            registration.id
640        )))
641    }
642}
643
644fn process_lease_renew_interval() -> Duration {
645    Duration::from_millis(process_lease_renew_interval_ms())
646}
647
648#[cfg(test)]
649fn process_lease_renew_interval_ms() -> u64 {
650    25
651}
652
653#[cfg(not(test))]
654fn process_lease_renew_interval_ms() -> u64 {
655    30_000
656}
657
658#[cfg(test)]
659mod boundary_tests {
660    use super::*;
661    use crate::{
662        AttachmentStore, AttachmentStoreError, AttachmentStorePersistence, DurabilityTier,
663        DurableStoreFacet, InMemoryAttachmentStore, LashlangArtifactStore, ProcessInput,
664        ProcessRegistration, RuntimeEffectController, RuntimeError, StoredAttachment, TriggerStore,
665    };
666    use lash_sansio::{AttachmentCreateMeta, AttachmentId, AttachmentRef};
667
668    /// Effect controller that reports the durable tier; the worker boundary
669    /// only reads the tier, so the effect path is never exercised here.
670    #[derive(Default)]
671    struct DurableController;
672
673    #[async_trait::async_trait]
674    impl RuntimeEffectController for DurableController {
675        fn durability_tier(&self) -> DurabilityTier {
676            DurabilityTier::Durable
677        }
678
679        async fn execute_effect(
680            &self,
681            _envelope: crate::RuntimeEffectEnvelope,
682            _local_executor: crate::RuntimeEffectLocalExecutor<'_>,
683        ) -> Result<crate::RuntimeEffectOutcome, crate::RuntimeEffectControllerError> {
684            unreachable!("worker boundary rejects before executing any effect")
685        }
686    }
687
688    /// Attachment store reporting a durable tier over in-memory storage.
689    #[derive(Default)]
690    struct DurableAttachmentStore {
691        inner: InMemoryAttachmentStore,
692    }
693
694    #[async_trait::async_trait]
695    impl AttachmentStore for DurableAttachmentStore {
696        fn persistence(&self) -> AttachmentStorePersistence {
697            AttachmentStorePersistence::Durable
698        }
699
700        async fn put(
701            &self,
702            bytes: Vec<u8>,
703            meta: AttachmentCreateMeta,
704        ) -> Result<AttachmentRef, AttachmentStoreError> {
705            self.inner.put(bytes, meta).await
706        }
707
708        async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
709            self.inner.get(id).await
710        }
711    }
712
713    /// Lashlang artifact store reporting a durable tier over in-memory storage.
714    #[derive(Default)]
715    struct DurableArtifactStore {
716        inner: lashlang::InMemoryLashlangArtifactStore,
717    }
718
719    #[async_trait::async_trait]
720    impl LashlangArtifactStore for DurableArtifactStore {
721        fn durability_tier(&self) -> DurabilityTier {
722            DurabilityTier::Durable
723        }
724
725        async fn put_module_artifact(
726            &self,
727            artifact: &lashlang::ModuleArtifact,
728        ) -> Result<(), lashlang::ArtifactStoreError> {
729            self.inner.put_module_artifact(artifact).await
730        }
731
732        async fn get_module_artifact(
733            &self,
734            module_ref: &lashlang::ModuleRef,
735        ) -> Result<Option<Arc<lashlang::ModuleArtifact>>, lashlang::ArtifactStoreError> {
736            self.inner.get_module_artifact(module_ref).await
737        }
738
739        async fn put_artifact_bytes(
740            &self,
741            artifact_ref: &str,
742            descriptor: &str,
743            bytes: &[u8],
744        ) -> Result<(), lashlang::ArtifactStoreError> {
745            self.inner
746                .put_artifact_bytes(artifact_ref, descriptor, bytes)
747                .await
748        }
749
750        async fn get_artifact_bytes(
751            &self,
752            artifact_ref: &str,
753        ) -> Result<Option<Vec<u8>>, lashlang::ArtifactStoreError> {
754            self.inner.get_artifact_bytes(artifact_ref).await
755        }
756    }
757
758    /// Session store factory whose declared tier is configurable; it never has
759    /// to create a store because the worker boundary rejects first.
760    struct TierSessionStoreFactory {
761        tier: DurabilityTier,
762    }
763
764    #[async_trait::async_trait]
765    impl SessionStoreFactory for TierSessionStoreFactory {
766        fn durability_tier(&self) -> DurabilityTier {
767            self.tier
768        }
769
770        async fn create_store(
771            &self,
772            _request: &crate::SessionStoreCreateRequest,
773        ) -> Result<Arc<dyn crate::RuntimePersistence>, String> {
774            unreachable!("worker boundary rejects before creating a session store")
775        }
776
777        async fn delete_session(&self, _session_id: &str) -> Result<(), String> {
778            Ok(())
779        }
780    }
781
782    struct TierTriggerStore {
783        tier: DurabilityTier,
784        inner: crate::InMemoryTriggerStore,
785    }
786
787    impl TierTriggerStore {
788        fn new(tier: DurabilityTier) -> Self {
789            Self {
790                tier,
791                inner: crate::InMemoryTriggerStore::default(),
792            }
793        }
794    }
795
796    #[async_trait::async_trait]
797    impl TriggerStore for TierTriggerStore {
798        fn durability_tier(&self) -> DurabilityTier {
799            self.tier
800        }
801
802        async fn register_subscription(
803            &self,
804            draft: crate::TriggerSubscriptionDraft,
805        ) -> Result<crate::TriggerSubscriptionRecord, PluginError> {
806            self.inner.register_subscription(draft).await
807        }
808
809        async fn list_subscriptions(
810            &self,
811            filter: crate::TriggerSubscriptionFilter,
812        ) -> Result<Vec<crate::TriggerSubscriptionRecord>, PluginError> {
813            self.inner.list_subscriptions(filter).await
814        }
815
816        async fn cancel_subscription(
817            &self,
818            session_id: &str,
819            handle: &str,
820        ) -> Result<bool, PluginError> {
821            self.inner.cancel_subscription(session_id, handle).await
822        }
823
824        async fn delete_session_subscriptions(
825            &self,
826            session_id: &str,
827        ) -> Result<usize, PluginError> {
828            self.inner.delete_session_subscriptions(session_id).await
829        }
830
831        async fn record_occurrence(
832            &self,
833            request: crate::TriggerOccurrenceRequest,
834        ) -> Result<crate::TriggerOccurrenceRecord, PluginError> {
835            self.inner.record_occurrence(request).await
836        }
837
838        async fn reserve_matching_deliveries(
839            &self,
840            occurrence_id: &str,
841        ) -> Result<Vec<crate::TriggerDeliveryReservation>, PluginError> {
842            self.inner.reserve_matching_deliveries(occurrence_id).await
843        }
844    }
845
846    /// Build a worker whose controller is durable but whose stores can be set
847    /// per-facet to durable/ephemeral, so each facet's loud rejection can be
848    /// exercised independently.
849    fn worker(
850        attachment: Arc<dyn AttachmentStore>,
851        artifact: Arc<dyn LashlangArtifactStore>,
852        session_store_tier: DurabilityTier,
853    ) -> DurableProcessWorker {
854        worker_with_store_tiers(
855            attachment,
856            artifact,
857            session_store_tier,
858            DurabilityTier::Durable,
859            DurabilityTier::Durable,
860        )
861    }
862
863    fn worker_with_store_tiers(
864        attachment: Arc<dyn AttachmentStore>,
865        artifact: Arc<dyn LashlangArtifactStore>,
866        session_store_tier: DurabilityTier,
867        process_registry_tier: DurabilityTier,
868        trigger_store_tier: DurabilityTier,
869    ) -> DurableProcessWorker {
870        let mut runtime_host = RuntimeHostConfig::in_memory();
871        runtime_host.control.effect_host =
872            Arc::new(crate::InlineEffectHost::new(Arc::new(DurableController)));
873        runtime_host.durability.attachment_store = attachment;
874        runtime_host.durability.lashlang_artifact_store = artifact;
875        let plugin_host = Arc::new(crate::PluginHost::new(Vec::new()));
876        let factory: Arc<dyn SessionStoreFactory> = Arc::new(TierSessionStoreFactory {
877            tier: session_store_tier,
878        });
879        let registry: Arc<dyn ProcessRegistry> = Arc::new(
880            crate::TestLocalProcessRegistry::default().with_durability_tier(process_registry_tier),
881        );
882        let trigger_store: Arc<dyn TriggerStore> =
883            Arc::new(TierTriggerStore::new(trigger_store_tier));
884        DurableProcessWorker::new(
885            DurableProcessWorkerConfig::new(plugin_host, runtime_host, factory, registry)
886                .with_trigger_store(trigger_store),
887        )
888    }
889
890    fn external_registration() -> ProcessRegistration {
891        ProcessRegistration::new(
892            "worker-boundary-process",
893            ProcessInput::External {
894                metadata: serde_json::json!({}),
895            },
896            crate::ProcessProvenance::host("default"),
897        )
898    }
899
900    async fn run(worker: &DurableProcessWorker) -> Result<ProcessAwaitOutput, PluginError> {
901        worker
902            .run_process(
903                external_registration(),
904                ProcessExecutionContext::default(),
905                CancellationToken::new(),
906            )
907            .await
908    }
909
910    fn assert_facet(err: PluginError, facet: DurableStoreFacet) {
911        let PluginError::Session(message) = err else {
912            panic!("expected PluginError::Session, got {err:?}");
913        };
914        let expected = RuntimeError::durable_store_required(facet).to_string();
915        assert_eq!(message, expected, "worker must reject the {facet:?} facet");
916    }
917
918    #[tokio::test]
919    async fn durable_worker_rejects_ephemeral_attachment_store() {
920        let worker = worker(
921            Arc::new(InMemoryAttachmentStore::new()),
922            Arc::new(DurableArtifactStore::default()),
923            DurabilityTier::Durable,
924        );
925        let err = run(&worker)
926            .await
927            .expect_err("ephemeral attachment store must be rejected at the worker boundary");
928        assert_facet(err, DurableStoreFacet::AttachmentStore);
929    }
930
931    #[tokio::test]
932    async fn durable_worker_rejects_ephemeral_artifact_store() {
933        let worker = worker(
934            Arc::new(DurableAttachmentStore::default()),
935            Arc::new(lashlang::InMemoryLashlangArtifactStore::new()),
936            DurabilityTier::Durable,
937        );
938        let err = run(&worker)
939            .await
940            .expect_err("ephemeral artifact store must be rejected at the worker boundary");
941        assert_facet(err, DurableStoreFacet::ArtifactStore);
942    }
943
944    #[tokio::test]
945    async fn durable_worker_rejects_ephemeral_session_store_factory() {
946        let worker = worker(
947            Arc::new(DurableAttachmentStore::default()),
948            Arc::new(DurableArtifactStore::default()),
949            DurabilityTier::Inline,
950        );
951        let err = run(&worker)
952            .await
953            .expect_err("ephemeral session store factory must be rejected at the worker boundary");
954        assert_facet(err, DurableStoreFacet::SessionStore);
955    }
956
957    #[tokio::test]
958    async fn durable_worker_rejects_ephemeral_process_registry() {
959        let worker = worker_with_store_tiers(
960            Arc::new(DurableAttachmentStore::default()),
961            Arc::new(DurableArtifactStore::default()),
962            DurabilityTier::Durable,
963            DurabilityTier::Inline,
964            DurabilityTier::Durable,
965        );
966        let err = run(&worker)
967            .await
968            .expect_err("ephemeral process registry must be rejected at the worker boundary");
969        assert_facet(err, DurableStoreFacet::ProcessRegistry);
970    }
971
972    #[tokio::test]
973    async fn durable_worker_rejects_ephemeral_trigger_store() {
974        let worker = worker_with_store_tiers(
975            Arc::new(DurableAttachmentStore::default()),
976            Arc::new(DurableArtifactStore::default()),
977            DurabilityTier::Durable,
978            DurabilityTier::Durable,
979            DurabilityTier::Inline,
980        );
981        let err = run(&worker)
982            .await
983            .expect_err("ephemeral trigger store must be rejected at the worker boundary");
984        assert_facet(err, DurableStoreFacet::TriggerStore);
985    }
986
987    #[tokio::test]
988    async fn durable_worker_with_all_durable_stores_passes_store_facet_check() {
989        // Positive control: a durable worker wired against fully durable stores
990        // clears the store-facet guard and proceeds to run the (External)
991        // process.
992        let worker = worker(
993            Arc::new(DurableAttachmentStore::default()),
994            Arc::new(DurableArtifactStore::default()),
995            DurabilityTier::Durable,
996        );
997        let output = run(&worker)
998            .await
999            .expect("all-durable worker should pass the store-facet guard");
1000        assert!(matches!(output, ProcessAwaitOutput::Success { .. }));
1001    }
1002
1003    #[tokio::test]
1004    async fn inline_worker_passes_store_facet_check_with_ephemeral_stores() {
1005        // Inline controllers impose no requirement, so an in-memory worker runs
1006        // unchanged — the durable-first guard must not regress inline hosts.
1007        let runtime_host = RuntimeHostConfig::in_memory();
1008        let plugin_host = Arc::new(crate::PluginHost::new(Vec::new()));
1009        let factory: Arc<dyn SessionStoreFactory> = Arc::new(TierSessionStoreFactory {
1010            tier: DurabilityTier::Inline,
1011        });
1012        let registry: Arc<dyn ProcessRegistry> =
1013            Arc::new(crate::TestLocalProcessRegistry::default());
1014        let worker = DurableProcessWorker::new(DurableProcessWorkerConfig::new(
1015            plugin_host,
1016            runtime_host,
1017            factory,
1018            registry,
1019        ));
1020        let output = run(&worker)
1021            .await
1022            .expect("inline worker should pass the store-facet guard");
1023        assert!(matches!(output, ProcessAwaitOutput::Success { .. }));
1024    }
1025}