Skip to main content

lash_core/runtime/
process_worker.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use tokio_util::sync::CancellationToken;
5
6use super::effect::ProcessRunner;
7use super::session_manager::RuntimeSessionServices;
8use super::{EmbeddedRuntimeBuilder, RUNTIME_TURN_LEASE_TTL_MS, RuntimeHostConfig};
9use crate::{
10    InMemorySessionStore, LashRuntime, PluginError, PluginFactory, PluginHost, PluginStack,
11    ProcessAwaitOutput, ProcessExecutionContext, ProcessInput, ProcessLease,
12    ProcessLeaseCompletion, ProcessRecord, ProcessRegistration, ProcessRegistry,
13    SessionStoreFactory,
14};
15
16/// Deployment-local configuration for rebuilding durable process executions.
17///
18/// Process rows intentionally carry only portable process input and provenance.
19/// Workers provide plugins, providers, stores, secrets, and host capabilities
20/// for the deployment that owns those rows.
21#[derive(Clone)]
22pub struct DurableProcessWorkerConfig {
23    pub plugin_host: Arc<PluginHost>,
24    pub runtime_host: RuntimeHostConfig,
25    pub session_policy: crate::SessionPolicy,
26    pub session_store_factory: Arc<dyn SessionStoreFactory>,
27    pub process_registry: Arc<dyn ProcessRegistry>,
28    pub trigger_store: Arc<dyn crate::TriggerStore>,
29    #[doc(hidden)]
30    pub turn_phase_probe_slot: crate::runtime::RuntimeTurnPhaseProbeSlot,
31    /// Residency for sessions the worker rebuilds to run a process. Defaults to
32    /// [`Residency::KeepAll`]; a host running [`Residency::ActivePathOnly`] wires
33    /// it here so the worker's rebuilt sessions trim to the active path too,
34    /// instead of silently diverging from the live runtime by keeping the full
35    /// graph resident.
36    pub residency: crate::Residency,
37}
38
39impl DurableProcessWorkerConfig {
40    pub fn new(
41        plugin_host: Arc<PluginHost>,
42        runtime_host: RuntimeHostConfig,
43        session_store_factory: Arc<dyn SessionStoreFactory>,
44        process_registry: Arc<dyn ProcessRegistry>,
45    ) -> Self {
46        Self {
47            plugin_host,
48            runtime_host,
49            session_policy: crate::SessionPolicy::default(),
50            session_store_factory,
51            process_registry,
52            trigger_store: Arc::new(crate::InMemoryTriggerStore::default()),
53            turn_phase_probe_slot: crate::runtime::RuntimeTurnPhaseProbeSlot::default(),
54            residency: crate::Residency::default(),
55        }
56    }
57
58    pub fn with_trigger_store(mut self, store: Arc<dyn crate::TriggerStore>) -> Self {
59        self.trigger_store = store;
60        self
61    }
62
63    pub fn with_session_policy(mut self, policy: crate::SessionPolicy) -> Self {
64        self.session_policy = policy;
65        self
66    }
67
68    pub fn with_residency(mut self, residency: crate::Residency) -> Self {
69        self.residency = residency;
70        self
71    }
72
73    #[doc(hidden)]
74    pub fn with_turn_phase_probe_slot(
75        mut self,
76        slot: crate::runtime::RuntimeTurnPhaseProbeSlot,
77    ) -> Self {
78        self.turn_phase_probe_slot = slot;
79        self
80    }
81
82    pub fn from_plugin_factories(
83        plugin_factories: impl IntoIterator<Item = Arc<dyn PluginFactory>>,
84        runtime_host: RuntimeHostConfig,
85        session_store_factory: Arc<dyn SessionStoreFactory>,
86        process_registry: Arc<dyn ProcessRegistry>,
87    ) -> Self {
88        Self::new(
89            Arc::new(PluginHost::new(plugin_factories.into_iter().collect())),
90            runtime_host,
91            session_store_factory,
92            process_registry,
93        )
94    }
95
96    pub fn from_plugin_stack(
97        plugin_stack: PluginStack,
98        runtime_host: RuntimeHostConfig,
99        session_store_factory: Arc<dyn SessionStoreFactory>,
100        process_registry: Arc<dyn ProcessRegistry>,
101    ) -> Self {
102        Self::from_plugin_factories(
103            plugin_stack.into_factories(),
104            runtime_host,
105            session_store_factory,
106            process_registry,
107        )
108    }
109}
110
111/// Reconstructable background-process worker.
112#[derive(Clone)]
113pub struct DurableProcessWorker {
114    config: Arc<DurableProcessWorkerConfig>,
115}
116
117/// Why a recovery run did not produce a terminal outcome under the lease.
118enum RecoverFailure {
119    /// The lease was lost mid-run (another owner reclaimed an expired lease).
120    /// The losing worker must not write a terminal outcome — the new owner is
121    /// now the single writer.
122    LeaseLost(PluginError),
123    /// The process could not be run (rebuild/store-facet failure). The lease is
124    /// still held, so this worker terminalizes the row.
125    Run(PluginError),
126}
127
128impl DurableProcessWorker {
129    pub fn new(config: DurableProcessWorkerConfig) -> Self {
130        Self {
131            config: Arc::new(config),
132        }
133    }
134
135    pub fn from_shared_config(config: Arc<DurableProcessWorkerConfig>) -> Self {
136        Self { config }
137    }
138
139    pub fn config(&self) -> &DurableProcessWorkerConfig {
140        &self.config
141    }
142
143    pub async fn run_process(
144        &self,
145        registration: ProcessRegistration,
146        execution_context: ProcessExecutionContext,
147        cancellation: CancellationToken,
148    ) -> Result<ProcessAwaitOutput, PluginError> {
149        let scoped_effect_controller = self
150            .config
151            .runtime_host
152            .control
153            .effect_host
154            .scoped_static(crate::ExecutionScope::process(registration.id.clone()))
155            .map_err(|err| PluginError::Session(err.to_string()))?
156            .ok_or_else(|| {
157                PluginError::Session(
158                    "process worker effect host must provide a static process scope".to_string(),
159                )
160            })?;
161        self.run_process_with_scoped_effect_controller(
162            registration,
163            execution_context,
164            scoped_effect_controller,
165            cancellation,
166        )
167        .await
168    }
169
170    pub async fn run_process_with_scoped_effect_controller(
171        &self,
172        registration: ProcessRegistration,
173        execution_context: ProcessExecutionContext,
174        scoped_effect_controller: crate::ScopedEffectController<'_>,
175        cancellation: CancellationToken,
176    ) -> Result<ProcessAwaitOutput, PluginError> {
177        self.ensure_stable_process_id(&registration)?;
178        self.ensure_durable_store_facets()?;
179        if let ProcessInput::External { metadata } = registration.input.as_ref() {
180            return Ok(ProcessAwaitOutput::Success {
181                value: serde_json::json!({ "metadata": metadata.clone() }),
182                control: None,
183            });
184        }
185        let mut runtime = self.runtime_for_registration(&registration).await?;
186        let originator_scope = if let crate::ProcessOriginator::Session { scope } =
187            &registration.provenance.originator
188        {
189            Some(scope)
190        } else {
191            None
192        };
193        let probe_scope = registration.wake_target.as_ref().or(originator_scope);
194        if let Some(probe) =
195            probe_scope.and_then(|scope| self.config.turn_phase_probe_slot.get_for_scope(scope))
196        {
197            runtime.set_turn_phase_probe(probe);
198        }
199        let manager = RuntimeSessionServices::new(&runtime, true, None).map_err(|err| {
200            PluginError::Session(format!(
201                "failed to build runtime env for process `{}`: {err}",
202                registration.id
203            ))
204        })?;
205        Ok(manager
206            .run_process(
207                registration,
208                execution_context,
209                Arc::clone(&self.config.process_registry),
210                scoped_effect_controller,
211                cancellation,
212            )
213            .await)
214    }
215
216    /// Sweep the registry for non-terminal processes and re-execute the ones
217    /// this worker can claim, driving each to a terminal state.
218    ///
219    /// This is the crash-recovery counterpart to a worker that ran a process
220    /// from a live turn: a trigger/trigger-started process whose worker
221    /// died mid-flight is left non-terminal in the registry, and a subsequent
222    /// worker reopening that registry must finish it. The sweep:
223    ///
224    /// 1. lists every non-terminal process ([`ProcessRegistry::list_non_terminal`]);
225    /// 2. claims the durable single-owner [`ProcessLease`] over each — a process
226    ///    already leased live by *another* owner is skipped (it is being run by
227    ///    that owner right now), so a non-terminal process is re-run by exactly
228    ///    one owner (lease fencing);
229    /// 3. runs the claimed process on this worker's wired controller, renewing
230    ///    the lease across the long-running execution so a healthy recovery is
231    ///    not swept out from under itself;
232    /// 4. writes the terminal outcome and releases the lease.
233    ///
234    /// Idempotent by `process_id`: terminal processes are never in the worklist,
235    /// and a process that became terminal between the list and the claim is
236    /// detected after claiming and skipped, so re-running a recovery sweep does
237    /// not double-execute completed work.
238    pub async fn drive_pending_processes(&self) -> Result<(), PluginError> {
239        let records = self.config.process_registry.list_non_terminal().await?;
240        for record in records {
241            // Run each claimed process on its OWN lease-fenced task. A sequential
242            // drive that awaited each process to terminal would deadlock a process
243            // that blocks awaiting a nested child (`start child` then `await`, or a
244            // subagent fan-out): the one drive task would park inside the parent's
245            // await and never claim the child. Spawning frees the loop so a
246            // subsequent drive (poke or poll) claims and runs the child, and the
247            // per-process `ProcessLease` fences concurrent owners — so spawning a
248            // task per pending row on every drive is idempotent (a row already
249            // running is skipped on claim conflict) and one failing row never
250            // aborts the rest of the sweep.
251            let worker = self.clone();
252            tokio::spawn(async move { worker.recover_process(record).await });
253        }
254        Ok(())
255    }
256
257    async fn recover_process(&self, record: ProcessRecord) {
258        let owner_id = format!("process-recovery-{}", uuid::Uuid::new_v4());
259        let process_id = record.id.clone();
260        // Skip if held live by another owner: a claim conflict means a worker is
261        // already running this process, so re-running here would violate the
262        // single-owner contract. Treat any claim failure as "leased elsewhere".
263        let Ok(lease) = self
264            .config
265            .process_registry
266            .claim_process_lease(&process_id, &owner_id, RUNTIME_TURN_LEASE_TTL_MS)
267            .await
268        else {
269            return;
270        };
271        // The process may have reached a terminal state between the list and the
272        // claim. Idempotent by process_id: do not re-execute a finished process.
273        if self
274            .config
275            .process_registry
276            .get_process(&process_id)
277            .await
278            .is_some_and(|current| current.is_terminal())
279        {
280            self.release_or_log(&lease).await;
281            return;
282        }
283        let registration = ProcessRegistration {
284            id: record.id,
285            input: record.input,
286            identity: record.identity,
287            event_types: record.event_types,
288            provenance: record.provenance.clone(),
289            env_ref: record.env_ref.clone(),
290            wake_target: record.wake_target.clone(),
291        };
292        let execution_context = ProcessExecutionContext::default();
293        match self
294            .run_process_with_lease_renewal(registration, execution_context, lease.clone())
295            .await
296        {
297            // Ran to a terminal outcome (success or a process-level failure) while
298            // holding the lease: this owner is the single writer of the terminal.
299            Ok(output) => self.complete_and_release(&lease, &process_id, output).await,
300            // The lease was lost mid-run — another owner reclaimed the expired
301            // lease and is now running this process. Do NOT write a terminal
302            // outcome or release the lease: that would race the new owner and
303            // could record a succeeded process as Failed. Leave the row to the
304            // lease holder; it will finish (or another sweep retries it).
305            Err(RecoverFailure::LeaseLost(err)) => {
306                tracing::warn!(
307                    process_id = %process_id,
308                    error = %err,
309                    "process recovery lost its lease mid-run; deferring to the new owner",
310                );
311            }
312            // The process could not be run at all (rebuild/store-facet failure):
313            // terminalize as a recovery failure so the row leaves the worklist.
314            Err(RecoverFailure::Run(err)) => {
315                let output = ProcessAwaitOutput::Failure {
316                    class: crate::ToolFailureClass::Execution,
317                    code: "process_recovery_failed".to_string(),
318                    message: err.to_string(),
319                    raw: None,
320                    control: None,
321                };
322                self.complete_and_release(&lease, &process_id, output).await;
323            }
324        }
325    }
326
327    /// Write a recovered process's terminal outcome (the running lease owner is
328    /// the single writer) and then release the lease, logging either failure
329    /// rather than aborting — the lease's TTL is the backstop.
330    async fn complete_and_release(
331        &self,
332        lease: &ProcessLease,
333        process_id: &str,
334        output: ProcessAwaitOutput,
335    ) {
336        // Fence the terminal write: re-confirm the lease immediately before
337        // writing. `renew_process_lease` is rejected (by owner/lease_token/
338        // fencing_token) if another owner has reclaimed an expired lease, and on
339        // success extends the window so the back-to-back write lands inside the
340        // owned interval. A worker that stalled past its TTL therefore cannot
341        // overwrite the new owner's outcome — it defers instead.
342        let fenced = match self
343            .config
344            .process_registry
345            .renew_process_lease(lease, RUNTIME_TURN_LEASE_TTL_MS)
346            .await
347        {
348            Ok(renewed) => renewed,
349            Err(err) => {
350                tracing::warn!(
351                    process_id = %process_id,
352                    error = %err,
353                    "lost process lease before terminal write; deferring to the new owner",
354                );
355                return;
356            }
357        };
358        if let Err(err) = self
359            .config
360            .process_registry
361            .complete_process(process_id, output)
362            .await
363        {
364            tracing::warn!(
365                process_id = %process_id,
366                error = %err,
367                "failed to write recovered process terminal outcome",
368            );
369        }
370        self.release_or_log(&fenced).await;
371    }
372
373    async fn release_or_log(&self, lease: &ProcessLease) {
374        if let Err(err) = self.release_process_lease(lease).await {
375            tracing::warn!(
376                process_id = %lease.process_id,
377                error = %err,
378                "failed to release recovered process lease",
379            );
380        }
381    }
382
383    /// Run a recovered process while renewing its lease across the execution,
384    /// mirroring the turn-lease renewal that keeps a long-running effect's lease
385    /// from expiring under the live owner.
386    async fn run_process_with_lease_renewal(
387        &self,
388        registration: ProcessRegistration,
389        execution_context: ProcessExecutionContext,
390        mut lease: ProcessLease,
391    ) -> Result<ProcessAwaitOutput, RecoverFailure> {
392        let process_id = registration.id.clone();
393        let cancellation = CancellationToken::new();
394        let cancel_watcher = {
395            let registry = Arc::clone(&self.config.process_registry);
396            let process_id = process_id.clone();
397            let cancellation = cancellation.clone();
398            tokio::spawn(async move {
399                match registry
400                    .wait_event_after(&process_id, "process.cancel_requested", 0)
401                    .await
402                {
403                    Ok(_) => cancellation.cancel(),
404                    Err(err) => tracing::warn!(
405                        process_id = %process_id,
406                        error = %err,
407                        "process cancel watcher stopped before observing cancellation",
408                    ),
409                }
410            })
411        };
412        let pending = self.run_process(registration, execution_context, cancellation.clone());
413        tokio::pin!(pending);
414        loop {
415            tokio::select! {
416                outcome = &mut pending => {
417                    cancel_watcher.abort();
418                    return outcome.map_err(RecoverFailure::Run);
419                }
420                _ = tokio::time::sleep(process_lease_renew_interval()) => {
421                    match self
422                        .config
423                        .process_registry
424                        .renew_process_lease(&lease, RUNTIME_TURN_LEASE_TTL_MS)
425                        .await
426                    {
427                        Ok(renewed) => lease = renewed,
428                        Err(err) => {
429                            cancellation.cancel();
430                            cancel_watcher.abort();
431                            return Err(RecoverFailure::LeaseLost(err));
432                        }
433                    }
434                }
435            }
436        }
437    }
438
439    async fn release_process_lease(&self, lease: &ProcessLease) -> Result<(), PluginError> {
440        self.config
441            .process_registry
442            .complete_process_lease(&ProcessLeaseCompletion::from_lease(lease))
443            .await
444    }
445
446    pub async fn request_process_cancel(
447        &self,
448        process_id: &str,
449        reason: Option<String>,
450    ) -> Result<(), PluginError> {
451        self.config
452            .process_registry
453            .append_event(
454                process_id,
455                crate::ProcessEventAppendRequest::cancel_requested(process_id, reason),
456            )
457            .await
458            .map(|_| ())
459    }
460
461    async fn runtime_for_registration(
462        &self,
463        registration: &ProcessRegistration,
464    ) -> Result<LashRuntime, PluginError> {
465        match registration.input.as_ref() {
466            ProcessInput::SessionTurn { create_request, .. } => {
467                self.runtime_for_session_turn(registration, create_request.as_ref())
468                    .await
469            }
470            ProcessInput::ToolCall { .. } | ProcessInput::Engine { .. } => {
471                self.runtime_for_process_env(registration).await
472            }
473            ProcessInput::External { .. } => unreachable!("external processes short-circuit"),
474        }
475    }
476
477    async fn runtime_for_session_turn(
478        &self,
479        registration: &ProcessRegistration,
480        create_request: &crate::SessionCreateRequest,
481    ) -> Result<LashRuntime, PluginError> {
482        let mut policy = create_request
483            .policy
484            .clone()
485            .unwrap_or_else(|| self.config.session_policy.clone());
486        if policy.recorded_provider_id().is_empty() {
487            policy.provider_id = self.config.session_policy.provider_id.clone();
488        }
489        self.build_ephemeral_runtime(
490            format!("process-session-turn:{}", registration.id),
491            policy,
492            create_request.plugin_options.clone(),
493            "session turn request",
494        )
495        .await
496    }
497
498    async fn runtime_for_process_env(
499        &self,
500        registration: &ProcessRegistration,
501    ) -> Result<LashRuntime, PluginError> {
502        let Some(env_ref) = registration.env_ref.as_ref() else {
503            return Err(PluginError::Session(format!(
504                "process `{}` is missing a captured execution env",
505                registration.id
506            )));
507        };
508        let env = crate::load_process_execution_env(
509            self.config
510                .runtime_host
511                .durability
512                .process_env_store
513                .as_ref(),
514            env_ref,
515        )
516        .await?;
517        self.build_ephemeral_runtime(
518            format!("process-env:{}", registration.id),
519            env.policy,
520            env.plugin_options,
521            env_ref.as_str(),
522        )
523        .await
524    }
525
526    async fn build_ephemeral_runtime(
527        &self,
528        session_id: String,
529        policy: crate::SessionPolicy,
530        plugin_options: crate::PluginOptions,
531        source_label: &str,
532    ) -> Result<LashRuntime, PluginError> {
533        let store = Arc::new(InMemorySessionStore::default());
534        EmbeddedRuntimeBuilder::new()
535            .with_session_id(session_id.to_string())
536            .with_plugin_host(self.config.plugin_host.as_ref().clone())
537            .with_runtime_host(self.config.runtime_host.clone())
538            .with_policy(policy)
539            .with_plugin_options(plugin_options)
540            .with_session_store_factory(Arc::clone(&self.config.session_store_factory))
541            .with_trigger_store(Arc::clone(&self.config.trigger_store))
542            .with_process_registry(Arc::clone(&self.config.process_registry))
543            .with_residency(self.config.residency)
544            .with_store(store)
545            .build()
546            .await
547            .map_err(|err| {
548                PluginError::Session(format!(
549                    "failed to build process worker runtime for {source_label}: {err}"
550                ))
551            })
552    }
553
554    /// Enforce the durable-first wiring invariant at the worker process-run
555    /// boundary: when the worker was wired with a durable effect host, every
556    /// store it will execute against must also be durable. A durable host
557    /// running against any ephemeral store fails loudly here rather than
558    /// silently re-executing a process against non-durable state.
559    ///
560    /// Inline controllers (the default tier) impose no requirement, so
561    /// inline/in-memory workers pass unchanged.
562    fn ensure_durable_store_facets(&self) -> Result<(), PluginError> {
563        if self
564            .config
565            .runtime_host
566            .control
567            .effect_host
568            .durability_tier()
569            != crate::DurabilityTier::Durable
570        {
571            return Ok(());
572        }
573        let require = |facet: crate::DurableStoreFacet| {
574            PluginError::Session(crate::RuntimeError::durable_store_required(facet).to_string())
575        };
576        if self
577            .config
578            .runtime_host
579            .durability
580            .attachment_store
581            .persistence()
582            .durability_tier()
583            != crate::DurabilityTier::Durable
584        {
585            return Err(require(crate::DurableStoreFacet::AttachmentStore));
586        }
587        if self
588            .config
589            .runtime_host
590            .durability
591            .process_env_store
592            .durability_tier()
593            != crate::DurabilityTier::Durable
594        {
595            return Err(require(crate::DurableStoreFacet::ProcessEnvStore));
596        }
597        if self.config.session_store_factory.durability_tier() != crate::DurabilityTier::Durable {
598            return Err(require(crate::DurableStoreFacet::SessionStore));
599        }
600        if self.config.process_registry.durability_tier() != crate::DurabilityTier::Durable {
601            return Err(require(crate::DurableStoreFacet::ProcessRegistry));
602        }
603        if self.config.trigger_store.durability_tier() != crate::DurabilityTier::Durable {
604            return Err(require(crate::DurableStoreFacet::TriggerStore));
605        }
606        Ok(())
607    }
608
609    /// Enforce the stable-process-id invariant at every (re-)execution: process
610    /// execution identity is the persisted `process_id`, so a retry — a Restate
611    /// `run` re-invocation (keyed `LashProcessWorkflow/{process_id}`) or a
612    /// recovery sweep re-running a non-terminal row — must present that stable
613    /// id. An empty/fresh id has lost its idempotency anchor and is rejected
614    /// loudly here, mirroring how `ExecutionScope` rejects an
615    /// empty turn id at the durable-effect boundary.
616    fn ensure_stable_process_id(
617        &self,
618        registration: &ProcessRegistration,
619    ) -> Result<(), PluginError> {
620        if registration.id.trim().is_empty() {
621            return Err(PluginError::Session(
622                crate::RuntimeError::missing_process_execution_id().to_string(),
623            ));
624        }
625        Ok(())
626    }
627}
628
629fn process_lease_renew_interval() -> Duration {
630    Duration::from_millis(process_lease_renew_interval_ms())
631}
632
633#[cfg(test)]
634fn process_lease_renew_interval_ms() -> u64 {
635    25
636}
637
638#[cfg(not(test))]
639fn process_lease_renew_interval_ms() -> u64 {
640    30_000
641}
642
643#[cfg(test)]
644mod boundary_tests {
645    use super::*;
646    use crate::{
647        AttachmentStore, AttachmentStoreError, AttachmentStorePersistence, DurabilityTier,
648        DurableStoreFacet, InMemoryAttachmentStore, ProcessExecutionEnvRef,
649        ProcessExecutionEnvStore, ProcessInput, ProcessRegistration, RuntimeEffectController,
650        RuntimeError, StoredAttachment, TriggerStore,
651    };
652    use lash_sansio::{AttachmentCreateMeta, AttachmentId, AttachmentRef};
653
654    /// Effect controller that reports the durable tier; the worker boundary
655    /// only reads the tier, so the effect path is never exercised here.
656    #[derive(Default)]
657    struct DurableController;
658
659    #[async_trait::async_trait]
660    impl RuntimeEffectController for DurableController {
661        fn durability_tier(&self) -> DurabilityTier {
662            DurabilityTier::Durable
663        }
664
665        async fn execute_effect(
666            &self,
667            _envelope: crate::RuntimeEffectEnvelope,
668            _local_executor: crate::RuntimeEffectLocalExecutor<'_>,
669        ) -> Result<crate::RuntimeEffectOutcome, crate::RuntimeEffectControllerError> {
670            unreachable!("worker boundary rejects before executing any effect")
671        }
672    }
673
674    /// Attachment store reporting a durable tier over in-memory storage.
675    #[derive(Default)]
676    struct DurableAttachmentStore {
677        inner: InMemoryAttachmentStore,
678    }
679
680    #[async_trait::async_trait]
681    impl AttachmentStore for DurableAttachmentStore {
682        fn persistence(&self) -> AttachmentStorePersistence {
683            AttachmentStorePersistence::Durable
684        }
685
686        async fn put(
687            &self,
688            bytes: Vec<u8>,
689            meta: AttachmentCreateMeta,
690        ) -> Result<AttachmentRef, AttachmentStoreError> {
691            self.inner.put(bytes, meta).await
692        }
693
694        async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
695            self.inner.get(id).await
696        }
697    }
698
699    /// Process env store reporting a durable tier over in-memory storage.
700    #[derive(Default)]
701    struct DurableProcessEnvStore {
702        inner: crate::InMemoryProcessExecutionEnvStore,
703    }
704
705    #[async_trait::async_trait]
706    impl ProcessExecutionEnvStore for DurableProcessEnvStore {
707        fn durability_tier(&self) -> DurabilityTier {
708            DurabilityTier::Durable
709        }
710
711        async fn put_process_execution_env(
712            &self,
713            env_ref: &ProcessExecutionEnvRef,
714            bytes: &[u8],
715        ) -> Result<(), PluginError> {
716            self.inner.put_process_execution_env(env_ref, bytes).await
717        }
718
719        async fn get_process_execution_env(
720            &self,
721            env_ref: &ProcessExecutionEnvRef,
722        ) -> Result<Option<Vec<u8>>, PluginError> {
723            self.inner.get_process_execution_env(env_ref).await
724        }
725    }
726
727    /// Session store factory whose declared tier is configurable; it never has
728    /// to create a store because the worker boundary rejects first.
729    struct TierSessionStoreFactory {
730        tier: DurabilityTier,
731    }
732
733    #[async_trait::async_trait]
734    impl SessionStoreFactory for TierSessionStoreFactory {
735        fn durability_tier(&self) -> DurabilityTier {
736            self.tier
737        }
738
739        async fn create_store(
740            &self,
741            _request: &crate::SessionStoreCreateRequest,
742        ) -> Result<Arc<dyn crate::RuntimePersistence>, String> {
743            unreachable!("worker boundary rejects before creating a session store")
744        }
745
746        async fn delete_session(&self, _session_id: &str) -> Result<(), String> {
747            Ok(())
748        }
749    }
750
751    struct TierTriggerStore {
752        tier: DurabilityTier,
753        inner: crate::InMemoryTriggerStore,
754    }
755
756    impl TierTriggerStore {
757        fn new(tier: DurabilityTier) -> Self {
758            Self {
759                tier,
760                inner: crate::InMemoryTriggerStore::default(),
761            }
762        }
763    }
764
765    #[async_trait::async_trait]
766    impl TriggerStore for TierTriggerStore {
767        fn durability_tier(&self) -> DurabilityTier {
768            self.tier
769        }
770
771        async fn register_subscription(
772            &self,
773            draft: crate::TriggerSubscriptionDraft,
774        ) -> Result<crate::TriggerSubscriptionRecord, PluginError> {
775            self.inner.register_subscription(draft).await
776        }
777
778        async fn list_subscriptions(
779            &self,
780            filter: crate::TriggerSubscriptionFilter,
781        ) -> Result<Vec<crate::TriggerSubscriptionRecord>, PluginError> {
782            self.inner.list_subscriptions(filter).await
783        }
784
785        async fn cancel_subscription(
786            &self,
787            session_id: &str,
788            handle: &str,
789        ) -> Result<bool, PluginError> {
790            self.inner.cancel_subscription(session_id, handle).await
791        }
792
793        async fn delete_session_subscriptions(
794            &self,
795            session_id: &str,
796        ) -> Result<usize, PluginError> {
797            self.inner.delete_session_subscriptions(session_id).await
798        }
799
800        async fn record_occurrence(
801            &self,
802            request: crate::TriggerOccurrenceRequest,
803        ) -> Result<crate::TriggerOccurrenceRecord, PluginError> {
804            self.inner.record_occurrence(request).await
805        }
806
807        async fn reserve_matching_deliveries(
808            &self,
809            occurrence_id: &str,
810        ) -> Result<Vec<crate::TriggerDeliveryReservation>, PluginError> {
811            self.inner.reserve_matching_deliveries(occurrence_id).await
812        }
813    }
814
815    /// Build a worker whose controller is durable but whose stores can be set
816    /// per-facet to durable/ephemeral, so each facet's loud rejection can be
817    /// exercised independently.
818    fn worker(
819        attachment: Arc<dyn AttachmentStore>,
820        process_env_store: Arc<dyn ProcessExecutionEnvStore>,
821        session_store_tier: DurabilityTier,
822    ) -> DurableProcessWorker {
823        worker_with_store_tiers(
824            attachment,
825            process_env_store,
826            session_store_tier,
827            DurabilityTier::Durable,
828            DurabilityTier::Durable,
829        )
830    }
831
832    fn worker_with_store_tiers(
833        attachment: Arc<dyn AttachmentStore>,
834        process_env_store: Arc<dyn ProcessExecutionEnvStore>,
835        session_store_tier: DurabilityTier,
836        process_registry_tier: DurabilityTier,
837        trigger_store_tier: DurabilityTier,
838    ) -> DurableProcessWorker {
839        let mut runtime_host = RuntimeHostConfig::in_memory();
840        runtime_host.control.effect_host =
841            Arc::new(crate::InlineEffectHost::new(Arc::new(DurableController)));
842        runtime_host.durability.attachment_store = attachment;
843        runtime_host.durability.process_env_store = process_env_store;
844        let plugin_host = Arc::new(crate::PluginHost::new(Vec::new()));
845        let factory: Arc<dyn SessionStoreFactory> = Arc::new(TierSessionStoreFactory {
846            tier: session_store_tier,
847        });
848        let registry: Arc<dyn ProcessRegistry> = Arc::new(
849            crate::TestLocalProcessRegistry::default().with_durability_tier(process_registry_tier),
850        );
851        let trigger_store: Arc<dyn TriggerStore> =
852            Arc::new(TierTriggerStore::new(trigger_store_tier));
853        DurableProcessWorker::new(
854            DurableProcessWorkerConfig::new(plugin_host, runtime_host, factory, registry)
855                .with_trigger_store(trigger_store),
856        )
857    }
858
859    fn external_registration() -> ProcessRegistration {
860        ProcessRegistration::new(
861            "worker-boundary-process",
862            ProcessInput::External {
863                metadata: serde_json::json!({}),
864            },
865            crate::ProcessProvenance::host(),
866        )
867    }
868
869    async fn run(worker: &DurableProcessWorker) -> Result<ProcessAwaitOutput, PluginError> {
870        worker
871            .run_process(
872                external_registration(),
873                ProcessExecutionContext::default(),
874                CancellationToken::new(),
875            )
876            .await
877    }
878
879    fn assert_facet(err: PluginError, facet: DurableStoreFacet) {
880        let PluginError::Session(message) = err else {
881            panic!("expected PluginError::Session, got {err:?}");
882        };
883        let expected = RuntimeError::durable_store_required(facet).to_string();
884        assert_eq!(message, expected, "worker must reject the {facet:?} facet");
885    }
886
887    #[tokio::test]
888    async fn durable_worker_rejects_ephemeral_attachment_store() {
889        let worker = worker(
890            Arc::new(InMemoryAttachmentStore::new()),
891            Arc::new(DurableProcessEnvStore::default()),
892            DurabilityTier::Durable,
893        );
894        let err = run(&worker)
895            .await
896            .expect_err("ephemeral attachment store must be rejected at the worker boundary");
897        assert_facet(err, DurableStoreFacet::AttachmentStore);
898    }
899
900    #[tokio::test]
901    async fn durable_worker_rejects_ephemeral_process_env_store() {
902        let worker = worker(
903            Arc::new(DurableAttachmentStore::default()),
904            Arc::new(crate::InMemoryProcessExecutionEnvStore::new()),
905            DurabilityTier::Durable,
906        );
907        let err = run(&worker)
908            .await
909            .expect_err("ephemeral process env store must be rejected at the worker boundary");
910        assert_facet(err, DurableStoreFacet::ProcessEnvStore);
911    }
912
913    #[tokio::test]
914    async fn durable_worker_rejects_ephemeral_session_store_factory() {
915        let worker = worker(
916            Arc::new(DurableAttachmentStore::default()),
917            Arc::new(DurableProcessEnvStore::default()),
918            DurabilityTier::Inline,
919        );
920        let err = run(&worker)
921            .await
922            .expect_err("ephemeral session store factory must be rejected at the worker boundary");
923        assert_facet(err, DurableStoreFacet::SessionStore);
924    }
925
926    #[tokio::test]
927    async fn durable_worker_rejects_ephemeral_process_registry() {
928        let worker = worker_with_store_tiers(
929            Arc::new(DurableAttachmentStore::default()),
930            Arc::new(DurableProcessEnvStore::default()),
931            DurabilityTier::Durable,
932            DurabilityTier::Inline,
933            DurabilityTier::Durable,
934        );
935        let err = run(&worker)
936            .await
937            .expect_err("ephemeral process registry must be rejected at the worker boundary");
938        assert_facet(err, DurableStoreFacet::ProcessRegistry);
939    }
940
941    #[tokio::test]
942    async fn durable_worker_rejects_ephemeral_trigger_store() {
943        let worker = worker_with_store_tiers(
944            Arc::new(DurableAttachmentStore::default()),
945            Arc::new(DurableProcessEnvStore::default()),
946            DurabilityTier::Durable,
947            DurabilityTier::Durable,
948            DurabilityTier::Inline,
949        );
950        let err = run(&worker)
951            .await
952            .expect_err("ephemeral trigger store must be rejected at the worker boundary");
953        assert_facet(err, DurableStoreFacet::TriggerStore);
954    }
955
956    #[tokio::test]
957    async fn durable_worker_with_all_durable_stores_passes_store_facet_check() {
958        // Positive control: a durable worker wired against fully durable stores
959        // clears the store-facet guard and proceeds to run the (External)
960        // process.
961        let worker = worker(
962            Arc::new(DurableAttachmentStore::default()),
963            Arc::new(DurableProcessEnvStore::default()),
964            DurabilityTier::Durable,
965        );
966        let output = run(&worker)
967            .await
968            .expect("all-durable worker should pass the store-facet guard");
969        assert!(matches!(output, ProcessAwaitOutput::Success { .. }));
970    }
971
972    #[tokio::test]
973    async fn inline_worker_passes_store_facet_check_with_ephemeral_stores() {
974        // Inline controllers impose no requirement, so an in-memory worker runs
975        // unchanged — the durable-first guard must not regress inline hosts.
976        let runtime_host = RuntimeHostConfig::in_memory();
977        let plugin_host = Arc::new(crate::PluginHost::new(Vec::new()));
978        let factory: Arc<dyn SessionStoreFactory> = Arc::new(TierSessionStoreFactory {
979            tier: DurabilityTier::Inline,
980        });
981        let registry: Arc<dyn ProcessRegistry> =
982            Arc::new(crate::TestLocalProcessRegistry::default());
983        let worker = DurableProcessWorker::new(DurableProcessWorkerConfig::new(
984            plugin_host,
985            runtime_host,
986            factory,
987            registry,
988        ));
989        let output = run(&worker)
990            .await
991            .expect("inline worker should pass the store-facet guard");
992        assert!(matches!(output, ProcessAwaitOutput::Success { .. }));
993    }
994}