Skip to main content

lash_core/runtime/
process_worker.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use tokio_util::sync::CancellationToken;
5
6use super::effect::ProcessRunner;
7use super::session_manager::RuntimeSessionServices;
8use super::{EmbeddedRuntimeBuilder, RUNTIME_TURN_LEASE_TTL_MS, RuntimeHostConfig};
9use crate::{
10    InMemorySessionStore, LashRuntime, PluginError, PluginFactory, PluginHost, PluginStack,
11    ProcessAwaitOutput, ProcessExecutionContext, ProcessInput, ProcessLease,
12    ProcessLeaseCompletion, ProcessRecord, ProcessRegistration, ProcessRegistry,
13    SessionStoreFactory,
14};
15
16/// Deployment-local configuration for rebuilding durable process executions.
17///
18/// Process rows intentionally carry only portable process input and provenance.
19/// Workers provide plugins, providers, stores, secrets, and host capabilities
20/// for the deployment that owns those rows.
21#[derive(Clone)]
22pub struct DurableProcessWorkerConfig {
23    pub plugin_host: Arc<PluginHost>,
24    pub runtime_host: RuntimeHostConfig,
25    pub session_policy: crate::SessionPolicy,
26    pub session_store_factory: Arc<dyn SessionStoreFactory>,
27    pub process_registry: Arc<dyn ProcessRegistry>,
28    pub trigger_store: Arc<dyn crate::TriggerStore>,
29    #[doc(hidden)]
30    pub turn_phase_probe_slot: crate::runtime::RuntimeTurnPhaseProbeSlot,
31    /// Residency for sessions the worker rebuilds to run a process. Defaults to
32    /// [`Residency::KeepAll`]; a host running [`Residency::ActivePathOnly`] wires
33    /// it here so the worker's rebuilt sessions trim to the active path too,
34    /// instead of silently diverging from the live runtime by keeping the full
35    /// graph resident.
36    pub residency: crate::Residency,
37}
38
39impl DurableProcessWorkerConfig {
40    pub fn new(
41        plugin_host: Arc<PluginHost>,
42        runtime_host: RuntimeHostConfig,
43        session_store_factory: Arc<dyn SessionStoreFactory>,
44        process_registry: Arc<dyn ProcessRegistry>,
45    ) -> Self {
46        Self {
47            plugin_host,
48            runtime_host,
49            session_policy: crate::SessionPolicy::default(),
50            session_store_factory,
51            process_registry,
52            trigger_store: Arc::new(crate::InMemoryTriggerStore::default()),
53            turn_phase_probe_slot: crate::runtime::RuntimeTurnPhaseProbeSlot::default(),
54            residency: crate::Residency::default(),
55        }
56    }
57
58    pub fn with_trigger_store(mut self, store: Arc<dyn crate::TriggerStore>) -> Self {
59        self.trigger_store = store;
60        self
61    }
62
63    pub fn with_session_policy(mut self, policy: crate::SessionPolicy) -> Self {
64        self.session_policy = policy;
65        self
66    }
67
68    pub fn with_residency(mut self, residency: crate::Residency) -> Self {
69        self.residency = residency;
70        self
71    }
72
73    #[doc(hidden)]
74    pub fn with_turn_phase_probe_slot(
75        mut self,
76        slot: crate::runtime::RuntimeTurnPhaseProbeSlot,
77    ) -> Self {
78        self.turn_phase_probe_slot = slot;
79        self
80    }
81
82    pub fn from_plugin_factories(
83        plugin_factories: impl IntoIterator<Item = Arc<dyn PluginFactory>>,
84        runtime_host: RuntimeHostConfig,
85        session_store_factory: Arc<dyn SessionStoreFactory>,
86        process_registry: Arc<dyn ProcessRegistry>,
87    ) -> Self {
88        Self::new(
89            Arc::new(PluginHost::new(plugin_factories.into_iter().collect())),
90            runtime_host,
91            session_store_factory,
92            process_registry,
93        )
94    }
95
96    pub fn from_plugin_stack(
97        plugin_stack: PluginStack,
98        runtime_host: RuntimeHostConfig,
99        session_store_factory: Arc<dyn SessionStoreFactory>,
100        process_registry: Arc<dyn ProcessRegistry>,
101    ) -> Self {
102        Self::from_plugin_factories(
103            plugin_stack.into_factories(),
104            runtime_host,
105            session_store_factory,
106            process_registry,
107        )
108    }
109}
110
111/// Reconstructable background-process worker.
112#[derive(Clone)]
113pub struct DurableProcessWorker {
114    config: Arc<DurableProcessWorkerConfig>,
115}
116
117/// Why a recovery run did not produce a terminal outcome under the lease.
118enum RecoverFailure {
119    /// The lease was lost mid-run (another owner reclaimed an expired lease).
120    /// The losing worker must not write a terminal outcome — the new owner is
121    /// now the single writer.
122    LeaseLost(PluginError),
123    /// The process could not be run (rebuild/store-facet failure). The lease is
124    /// still held, so this worker terminalizes the row.
125    Run(PluginError),
126}
127
128impl DurableProcessWorker {
129    pub fn new(config: DurableProcessWorkerConfig) -> Self {
130        Self {
131            config: Arc::new(config),
132        }
133    }
134
135    pub fn from_shared_config(config: Arc<DurableProcessWorkerConfig>) -> Self {
136        Self { config }
137    }
138
139    pub fn config(&self) -> &DurableProcessWorkerConfig {
140        &self.config
141    }
142
143    pub async fn run_process(
144        &self,
145        registration: ProcessRegistration,
146        execution_context: ProcessExecutionContext,
147        cancellation: CancellationToken,
148    ) -> Result<ProcessAwaitOutput, PluginError> {
149        let scoped_effect_controller = self
150            .config
151            .runtime_host
152            .control
153            .effect_host
154            .scoped_static(crate::ExecutionScope::process(registration.id.clone()))
155            .map_err(|err| PluginError::Session(err.to_string()))?
156            .ok_or_else(|| {
157                PluginError::Session(
158                    "process worker effect host must provide a static process scope".to_string(),
159                )
160            })?;
161        self.run_process_with_scoped_effect_controller(
162            registration,
163            execution_context,
164            scoped_effect_controller,
165            cancellation,
166        )
167        .await
168    }
169
170    pub async fn run_process_with_scoped_effect_controller(
171        &self,
172        registration: ProcessRegistration,
173        execution_context: ProcessExecutionContext,
174        scoped_effect_controller: crate::ScopedEffectController<'_>,
175        cancellation: CancellationToken,
176    ) -> Result<ProcessAwaitOutput, PluginError> {
177        self.ensure_stable_process_id(&registration)?;
178        self.ensure_durable_store_facets()?;
179        if let ProcessInput::External { metadata } = registration.input.as_ref() {
180            return Ok(ProcessAwaitOutput::Success {
181                value: serde_json::json!({ "metadata": metadata.clone() }),
182                control: None,
183            });
184        }
185        let mut runtime = self.runtime_for_registration(&registration).await?;
186        let originator_scope = if let crate::ProcessOriginator::Session { scope } =
187            &registration.provenance.originator
188        {
189            Some(scope)
190        } else {
191            None
192        };
193        let probe_scope = registration.wake_target.as_ref().or(originator_scope);
194        if let Some(probe) =
195            probe_scope.and_then(|scope| self.config.turn_phase_probe_slot.get_for_scope(scope))
196        {
197            runtime.set_turn_phase_probe(probe);
198        }
199        let manager = RuntimeSessionServices::new(&runtime, true, None).map_err(|err| {
200            PluginError::Session(format!(
201                "failed to build runtime env for process `{}`: {err}",
202                registration.id
203            ))
204        })?;
205        Ok(manager
206            .run_process(
207                registration,
208                execution_context,
209                Arc::clone(&self.config.process_registry),
210                scoped_effect_controller,
211                cancellation,
212            )
213            .await)
214    }
215
216    /// Sweep the registry for non-terminal processes and re-execute the ones
217    /// this worker can claim, driving each to a terminal state.
218    ///
219    /// This is the crash-recovery counterpart to a worker that ran a process
220    /// from a live turn: a trigger/trigger-started process whose worker
221    /// died mid-flight is left non-terminal in the registry, and a subsequent
222    /// worker reopening that registry must finish it. The sweep:
223    ///
224    /// 1. lists every non-terminal process ([`ProcessRegistry::list_non_terminal`]);
225    /// 2. claims the durable single-owner [`ProcessLease`] over each — a process
226    ///    already leased live by *another* owner is skipped (it is being run by
227    ///    that owner right now), so a non-terminal process is re-run by exactly
228    ///    one owner (lease fencing);
229    /// 3. runs the claimed process on this worker's wired controller, renewing
230    ///    the lease across the long-running execution so a healthy recovery is
231    ///    not swept out from under itself;
232    /// 4. writes the terminal outcome and releases the lease.
233    ///
234    /// Idempotent by `process_id`: terminal processes are never in the worklist,
235    /// and a process that became terminal between the list and the claim is
236    /// detected after claiming and skipped, so re-running a recovery sweep does
237    /// not double-execute completed work.
238    pub async fn drive_pending_processes(&self) -> Result<(), PluginError> {
239        let records = self.config.process_registry.list_non_terminal().await?;
240        for record in records {
241            // Run each claimed process on its OWN lease-fenced task. A sequential
242            // drive that awaited each process to terminal would deadlock a process
243            // that blocks awaiting a nested child (`start child` then `await`, or a
244            // subagent fan-out): the one drive task would park inside the parent's
245            // await and never claim the child. Spawning frees the loop so a
246            // subsequent drive (poke or poll) claims and runs the child, and the
247            // per-process `ProcessLease` fences concurrent owners — so spawning a
248            // task per pending row on every drive is idempotent (a row already
249            // running is skipped on claim conflict) and one failing row never
250            // aborts the rest of the sweep.
251            let worker = self.clone();
252            tokio::spawn(async move { worker.recover_process(record).await });
253        }
254        Ok(())
255    }
256
257    async fn recover_process(&self, record: ProcessRecord) {
258        let owner_id = format!("process-recovery-{}", uuid::Uuid::new_v4());
259        let process_id = record.id.clone();
260        // Skip if held live by another owner: a claim conflict means a worker is
261        // already running this process, so re-running here would violate the
262        // single-owner contract. Treat any claim failure as "leased elsewhere".
263        let Ok(lease) = self
264            .config
265            .process_registry
266            .claim_process_lease(&process_id, &owner_id, RUNTIME_TURN_LEASE_TTL_MS)
267            .await
268        else {
269            return;
270        };
271        // The process may have reached a terminal state between the list and the
272        // claim. Idempotent by process_id: do not re-execute a finished process.
273        if self
274            .config
275            .process_registry
276            .get_process(&process_id)
277            .await
278            .is_some_and(|current| current.is_terminal())
279        {
280            self.release_or_log(&lease).await;
281            return;
282        }
283        let registration = ProcessRegistration {
284            id: record.id,
285            input: record.input,
286            event_types: record.event_types,
287            provenance: record.provenance.clone(),
288            env_ref: record.env_ref.clone(),
289            wake_target: record.wake_target.clone(),
290        };
291        let execution_context = ProcessExecutionContext::default();
292        match self
293            .run_process_with_lease_renewal(registration, execution_context, lease.clone())
294            .await
295        {
296            // Ran to a terminal outcome (success or a process-level failure) while
297            // holding the lease: this owner is the single writer of the terminal.
298            Ok(output) => self.complete_and_release(&lease, &process_id, output).await,
299            // The lease was lost mid-run — another owner reclaimed the expired
300            // lease and is now running this process. Do NOT write a terminal
301            // outcome or release the lease: that would race the new owner and
302            // could record a succeeded process as Failed. Leave the row to the
303            // lease holder; it will finish (or another sweep retries it).
304            Err(RecoverFailure::LeaseLost(err)) => {
305                tracing::warn!(
306                    process_id = %process_id,
307                    error = %err,
308                    "process recovery lost its lease mid-run; deferring to the new owner",
309                );
310            }
311            // The process could not be run at all (rebuild/store-facet failure):
312            // terminalize as a recovery failure so the row leaves the worklist.
313            Err(RecoverFailure::Run(err)) => {
314                let output = ProcessAwaitOutput::Failure {
315                    class: crate::ToolFailureClass::Execution,
316                    code: "process_recovery_failed".to_string(),
317                    message: err.to_string(),
318                    raw: None,
319                    control: None,
320                };
321                self.complete_and_release(&lease, &process_id, output).await;
322            }
323        }
324    }
325
326    /// Write a recovered process's terminal outcome (the running lease owner is
327    /// the single writer) and then release the lease, logging either failure
328    /// rather than aborting — the lease's TTL is the backstop.
329    async fn complete_and_release(
330        &self,
331        lease: &ProcessLease,
332        process_id: &str,
333        output: ProcessAwaitOutput,
334    ) {
335        // Fence the terminal write: re-confirm the lease immediately before
336        // writing. `renew_process_lease` is rejected (by owner/lease_token/
337        // fencing_token) if another owner has reclaimed an expired lease, and on
338        // success extends the window so the back-to-back write lands inside the
339        // owned interval. A worker that stalled past its TTL therefore cannot
340        // overwrite the new owner's outcome — it defers instead.
341        let fenced = match self
342            .config
343            .process_registry
344            .renew_process_lease(lease, RUNTIME_TURN_LEASE_TTL_MS)
345            .await
346        {
347            Ok(renewed) => renewed,
348            Err(err) => {
349                tracing::warn!(
350                    process_id = %process_id,
351                    error = %err,
352                    "lost process lease before terminal write; deferring to the new owner",
353                );
354                return;
355            }
356        };
357        if let Err(err) = self
358            .config
359            .process_registry
360            .complete_process(process_id, output)
361            .await
362        {
363            tracing::warn!(
364                process_id = %process_id,
365                error = %err,
366                "failed to write recovered process terminal outcome",
367            );
368        }
369        self.release_or_log(&fenced).await;
370    }
371
372    async fn release_or_log(&self, lease: &ProcessLease) {
373        if let Err(err) = self.release_process_lease(lease).await {
374            tracing::warn!(
375                process_id = %lease.process_id,
376                error = %err,
377                "failed to release recovered process lease",
378            );
379        }
380    }
381
382    /// Run a recovered process while renewing its lease across the execution,
383    /// mirroring the turn-lease renewal that keeps a long-running effect's lease
384    /// from expiring under the live owner.
385    async fn run_process_with_lease_renewal(
386        &self,
387        registration: ProcessRegistration,
388        execution_context: ProcessExecutionContext,
389        mut lease: ProcessLease,
390    ) -> Result<ProcessAwaitOutput, RecoverFailure> {
391        let process_id = registration.id.clone();
392        let cancellation = CancellationToken::new();
393        let cancel_watcher = {
394            let registry = Arc::clone(&self.config.process_registry);
395            let process_id = process_id.clone();
396            let cancellation = cancellation.clone();
397            tokio::spawn(async move {
398                match registry
399                    .wait_event_after(&process_id, "process.cancel_requested", 0)
400                    .await
401                {
402                    Ok(_) => cancellation.cancel(),
403                    Err(err) => tracing::warn!(
404                        process_id = %process_id,
405                        error = %err,
406                        "process cancel watcher stopped before observing cancellation",
407                    ),
408                }
409            })
410        };
411        let pending = self.run_process(registration, execution_context, cancellation.clone());
412        tokio::pin!(pending);
413        loop {
414            tokio::select! {
415                outcome = &mut pending => {
416                    cancel_watcher.abort();
417                    return outcome.map_err(RecoverFailure::Run);
418                }
419                _ = tokio::time::sleep(process_lease_renew_interval()) => {
420                    match self
421                        .config
422                        .process_registry
423                        .renew_process_lease(&lease, RUNTIME_TURN_LEASE_TTL_MS)
424                        .await
425                    {
426                        Ok(renewed) => lease = renewed,
427                        Err(err) => {
428                            cancellation.cancel();
429                            cancel_watcher.abort();
430                            return Err(RecoverFailure::LeaseLost(err));
431                        }
432                    }
433                }
434            }
435        }
436    }
437
438    async fn release_process_lease(&self, lease: &ProcessLease) -> Result<(), PluginError> {
439        self.config
440            .process_registry
441            .complete_process_lease(&ProcessLeaseCompletion::from_lease(lease))
442            .await
443    }
444
445    pub async fn request_process_cancel(
446        &self,
447        process_id: &str,
448        reason: Option<String>,
449    ) -> Result<(), PluginError> {
450        self.config
451            .process_registry
452            .append_event(
453                process_id,
454                crate::ProcessEventAppendRequest::cancel_requested(process_id, reason),
455            )
456            .await
457            .map(|_| ())
458    }
459
460    async fn runtime_for_registration(
461        &self,
462        registration: &ProcessRegistration,
463    ) -> Result<LashRuntime, PluginError> {
464        match registration.input.as_ref() {
465            ProcessInput::SessionTurn { create_request, .. } => {
466                self.runtime_for_session_turn(registration, create_request.as_ref())
467                    .await
468            }
469            ProcessInput::ToolCall { .. } | ProcessInput::LashlangProcess { .. } => {
470                self.runtime_for_process_env(registration).await
471            }
472            ProcessInput::External { .. } => unreachable!("external processes short-circuit"),
473        }
474    }
475
476    async fn runtime_for_session_turn(
477        &self,
478        registration: &ProcessRegistration,
479        create_request: &crate::SessionCreateRequest,
480    ) -> Result<LashRuntime, PluginError> {
481        let mut policy = create_request
482            .policy
483            .clone()
484            .unwrap_or_else(|| self.config.session_policy.clone());
485        if policy.recorded_provider_id().is_empty() {
486            policy.provider_id = self.config.session_policy.provider_id.clone();
487        }
488        self.build_ephemeral_runtime(
489            format!("process-session-turn:{}", registration.id),
490            policy,
491            create_request.plugin_options.clone(),
492            "session turn request",
493        )
494        .await
495    }
496
497    async fn runtime_for_process_env(
498        &self,
499        registration: &ProcessRegistration,
500    ) -> Result<LashRuntime, PluginError> {
501        let Some(env_ref) = registration.env_ref.as_ref() else {
502            return Err(PluginError::Session(format!(
503                "process `{}` is missing a captured execution env",
504                registration.id
505            )));
506        };
507        let env = crate::load_process_execution_env(
508            self.config
509                .runtime_host
510                .durability
511                .lashlang_artifact_store
512                .as_ref(),
513            env_ref,
514        )
515        .await?;
516        self.build_ephemeral_runtime(
517            format!("process-env:{}", registration.id),
518            env.policy,
519            env.plugin_options,
520            env_ref.as_str(),
521        )
522        .await
523    }
524
525    async fn build_ephemeral_runtime(
526        &self,
527        session_id: String,
528        policy: crate::SessionPolicy,
529        plugin_options: crate::PluginOptions,
530        source_label: &str,
531    ) -> Result<LashRuntime, PluginError> {
532        let store = Arc::new(InMemorySessionStore::default());
533        EmbeddedRuntimeBuilder::new()
534            .with_session_id(session_id.to_string())
535            .with_plugin_host(self.config.plugin_host.as_ref().clone())
536            .with_runtime_host(self.config.runtime_host.clone())
537            .with_policy(policy)
538            .with_plugin_options(plugin_options)
539            .with_session_store_factory(Arc::clone(&self.config.session_store_factory))
540            .with_trigger_store(Arc::clone(&self.config.trigger_store))
541            .with_process_registry(Arc::clone(&self.config.process_registry))
542            .with_residency(self.config.residency)
543            .with_store(store)
544            .build()
545            .await
546            .map_err(|err| {
547                PluginError::Session(format!(
548                    "failed to build process worker runtime for {source_label}: {err}"
549                ))
550            })
551    }
552
553    /// Enforce the durable-first wiring invariant at the worker process-run
554    /// boundary: when the worker was wired with a durable effect host, every
555    /// store it will execute against must also be durable. A durable host
556    /// running against any ephemeral store fails loudly here rather than
557    /// silently re-executing a process against non-durable state.
558    ///
559    /// Inline controllers (the default tier) impose no requirement, so
560    /// inline/in-memory workers pass unchanged.
561    fn ensure_durable_store_facets(&self) -> Result<(), PluginError> {
562        if self
563            .config
564            .runtime_host
565            .control
566            .effect_host
567            .durability_tier()
568            != crate::DurabilityTier::Durable
569        {
570            return Ok(());
571        }
572        let require = |facet: crate::DurableStoreFacet| {
573            PluginError::Session(crate::RuntimeError::durable_store_required(facet).to_string())
574        };
575        if self
576            .config
577            .runtime_host
578            .durability
579            .attachment_store
580            .persistence()
581            .durability_tier()
582            != crate::DurabilityTier::Durable
583        {
584            return Err(require(crate::DurableStoreFacet::AttachmentStore));
585        }
586        if self
587            .config
588            .runtime_host
589            .durability
590            .lashlang_artifact_store
591            .durability_tier()
592            != crate::DurabilityTier::Durable
593        {
594            return Err(require(crate::DurableStoreFacet::ArtifactStore));
595        }
596        if self.config.session_store_factory.durability_tier() != crate::DurabilityTier::Durable {
597            return Err(require(crate::DurableStoreFacet::SessionStore));
598        }
599        if self.config.process_registry.durability_tier() != crate::DurabilityTier::Durable {
600            return Err(require(crate::DurableStoreFacet::ProcessRegistry));
601        }
602        if self.config.trigger_store.durability_tier() != crate::DurabilityTier::Durable {
603            return Err(require(crate::DurableStoreFacet::TriggerStore));
604        }
605        Ok(())
606    }
607
608    /// Enforce the stable-process-id invariant at every (re-)execution: process
609    /// execution identity is the persisted `process_id`, so a retry — a Restate
610    /// `run` re-invocation (keyed `LashProcessWorkflow/{process_id}`) or a
611    /// recovery sweep re-running a non-terminal row — must present that stable
612    /// id. An empty/fresh id has lost its idempotency anchor and is rejected
613    /// loudly here, mirroring how `ExecutionScope` rejects an
614    /// empty turn id at the durable-effect boundary.
615    fn ensure_stable_process_id(
616        &self,
617        registration: &ProcessRegistration,
618    ) -> Result<(), PluginError> {
619        if registration.id.trim().is_empty() {
620            return Err(PluginError::Session(
621                crate::RuntimeError::missing_process_execution_id().to_string(),
622            ));
623        }
624        Ok(())
625    }
626}
627
628fn process_lease_renew_interval() -> Duration {
629    Duration::from_millis(process_lease_renew_interval_ms())
630}
631
632#[cfg(test)]
633fn process_lease_renew_interval_ms() -> u64 {
634    25
635}
636
637#[cfg(not(test))]
638fn process_lease_renew_interval_ms() -> u64 {
639    30_000
640}
641
642#[cfg(test)]
643mod boundary_tests {
644    use super::*;
645    use crate::{
646        AttachmentStore, AttachmentStoreError, AttachmentStorePersistence, DurabilityTier,
647        DurableStoreFacet, InMemoryAttachmentStore, LashlangArtifactStore, ProcessInput,
648        ProcessRegistration, RuntimeEffectController, RuntimeError, StoredAttachment, TriggerStore,
649    };
650    use lash_sansio::{AttachmentCreateMeta, AttachmentId, AttachmentRef};
651
652    /// Effect controller that reports the durable tier; the worker boundary
653    /// only reads the tier, so the effect path is never exercised here.
654    #[derive(Default)]
655    struct DurableController;
656
657    #[async_trait::async_trait]
658    impl RuntimeEffectController for DurableController {
659        fn durability_tier(&self) -> DurabilityTier {
660            DurabilityTier::Durable
661        }
662
663        async fn execute_effect(
664            &self,
665            _envelope: crate::RuntimeEffectEnvelope,
666            _local_executor: crate::RuntimeEffectLocalExecutor<'_>,
667        ) -> Result<crate::RuntimeEffectOutcome, crate::RuntimeEffectControllerError> {
668            unreachable!("worker boundary rejects before executing any effect")
669        }
670    }
671
672    /// Attachment store reporting a durable tier over in-memory storage.
673    #[derive(Default)]
674    struct DurableAttachmentStore {
675        inner: InMemoryAttachmentStore,
676    }
677
678    #[async_trait::async_trait]
679    impl AttachmentStore for DurableAttachmentStore {
680        fn persistence(&self) -> AttachmentStorePersistence {
681            AttachmentStorePersistence::Durable
682        }
683
684        async fn put(
685            &self,
686            bytes: Vec<u8>,
687            meta: AttachmentCreateMeta,
688        ) -> Result<AttachmentRef, AttachmentStoreError> {
689            self.inner.put(bytes, meta).await
690        }
691
692        async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
693            self.inner.get(id).await
694        }
695    }
696
697    /// Lashlang artifact store reporting a durable tier over in-memory storage.
698    #[derive(Default)]
699    struct DurableArtifactStore {
700        inner: lashlang::InMemoryLashlangArtifactStore,
701    }
702
703    #[async_trait::async_trait]
704    impl LashlangArtifactStore for DurableArtifactStore {
705        fn durability_tier(&self) -> DurabilityTier {
706            DurabilityTier::Durable
707        }
708
709        async fn put_module_artifact(
710            &self,
711            artifact: &lashlang::ModuleArtifact,
712        ) -> Result<(), lashlang::ArtifactStoreError> {
713            self.inner.put_module_artifact(artifact).await
714        }
715
716        async fn get_module_artifact(
717            &self,
718            module_ref: &lashlang::ModuleRef,
719        ) -> Result<Option<Arc<lashlang::ModuleArtifact>>, lashlang::ArtifactStoreError> {
720            self.inner.get_module_artifact(module_ref).await
721        }
722
723        async fn put_artifact_bytes(
724            &self,
725            artifact_ref: &str,
726            descriptor: &str,
727            bytes: &[u8],
728        ) -> Result<(), lashlang::ArtifactStoreError> {
729            self.inner
730                .put_artifact_bytes(artifact_ref, descriptor, bytes)
731                .await
732        }
733
734        async fn get_artifact_bytes(
735            &self,
736            artifact_ref: &str,
737        ) -> Result<Option<Vec<u8>>, lashlang::ArtifactStoreError> {
738            self.inner.get_artifact_bytes(artifact_ref).await
739        }
740    }
741
742    /// Session store factory whose declared tier is configurable; it never has
743    /// to create a store because the worker boundary rejects first.
744    struct TierSessionStoreFactory {
745        tier: DurabilityTier,
746    }
747
748    #[async_trait::async_trait]
749    impl SessionStoreFactory for TierSessionStoreFactory {
750        fn durability_tier(&self) -> DurabilityTier {
751            self.tier
752        }
753
754        async fn create_store(
755            &self,
756            _request: &crate::SessionStoreCreateRequest,
757        ) -> Result<Arc<dyn crate::RuntimePersistence>, String> {
758            unreachable!("worker boundary rejects before creating a session store")
759        }
760
761        async fn delete_session(&self, _session_id: &str) -> Result<(), String> {
762            Ok(())
763        }
764    }
765
766    struct TierTriggerStore {
767        tier: DurabilityTier,
768        inner: crate::InMemoryTriggerStore,
769    }
770
771    impl TierTriggerStore {
772        fn new(tier: DurabilityTier) -> Self {
773            Self {
774                tier,
775                inner: crate::InMemoryTriggerStore::default(),
776            }
777        }
778    }
779
780    #[async_trait::async_trait]
781    impl TriggerStore for TierTriggerStore {
782        fn durability_tier(&self) -> DurabilityTier {
783            self.tier
784        }
785
786        async fn register_subscription(
787            &self,
788            draft: crate::TriggerSubscriptionDraft,
789        ) -> Result<crate::TriggerSubscriptionRecord, PluginError> {
790            self.inner.register_subscription(draft).await
791        }
792
793        async fn list_subscriptions(
794            &self,
795            filter: crate::TriggerSubscriptionFilter,
796        ) -> Result<Vec<crate::TriggerSubscriptionRecord>, PluginError> {
797            self.inner.list_subscriptions(filter).await
798        }
799
800        async fn cancel_subscription(
801            &self,
802            session_id: &str,
803            handle: &str,
804        ) -> Result<bool, PluginError> {
805            self.inner.cancel_subscription(session_id, handle).await
806        }
807
808        async fn delete_session_subscriptions(
809            &self,
810            session_id: &str,
811        ) -> Result<usize, PluginError> {
812            self.inner.delete_session_subscriptions(session_id).await
813        }
814
815        async fn record_occurrence(
816            &self,
817            request: crate::TriggerOccurrenceRequest,
818        ) -> Result<crate::TriggerOccurrenceRecord, PluginError> {
819            self.inner.record_occurrence(request).await
820        }
821
822        async fn reserve_matching_deliveries(
823            &self,
824            occurrence_id: &str,
825        ) -> Result<Vec<crate::TriggerDeliveryReservation>, PluginError> {
826            self.inner.reserve_matching_deliveries(occurrence_id).await
827        }
828    }
829
830    /// Build a worker whose controller is durable but whose stores can be set
831    /// per-facet to durable/ephemeral, so each facet's loud rejection can be
832    /// exercised independently.
833    fn worker(
834        attachment: Arc<dyn AttachmentStore>,
835        artifact: Arc<dyn LashlangArtifactStore>,
836        session_store_tier: DurabilityTier,
837    ) -> DurableProcessWorker {
838        worker_with_store_tiers(
839            attachment,
840            artifact,
841            session_store_tier,
842            DurabilityTier::Durable,
843            DurabilityTier::Durable,
844        )
845    }
846
847    fn worker_with_store_tiers(
848        attachment: Arc<dyn AttachmentStore>,
849        artifact: Arc<dyn LashlangArtifactStore>,
850        session_store_tier: DurabilityTier,
851        process_registry_tier: DurabilityTier,
852        trigger_store_tier: DurabilityTier,
853    ) -> DurableProcessWorker {
854        let mut runtime_host = RuntimeHostConfig::in_memory();
855        runtime_host.control.effect_host =
856            Arc::new(crate::InlineEffectHost::new(Arc::new(DurableController)));
857        runtime_host.durability.attachment_store = attachment;
858        runtime_host.durability.lashlang_artifact_store = artifact;
859        let plugin_host = Arc::new(crate::PluginHost::new(Vec::new()));
860        let factory: Arc<dyn SessionStoreFactory> = Arc::new(TierSessionStoreFactory {
861            tier: session_store_tier,
862        });
863        let registry: Arc<dyn ProcessRegistry> = Arc::new(
864            crate::TestLocalProcessRegistry::default().with_durability_tier(process_registry_tier),
865        );
866        let trigger_store: Arc<dyn TriggerStore> =
867            Arc::new(TierTriggerStore::new(trigger_store_tier));
868        DurableProcessWorker::new(
869            DurableProcessWorkerConfig::new(plugin_host, runtime_host, factory, registry)
870                .with_trigger_store(trigger_store),
871        )
872    }
873
874    fn external_registration() -> ProcessRegistration {
875        ProcessRegistration::new(
876            "worker-boundary-process",
877            ProcessInput::External {
878                metadata: serde_json::json!({}),
879            },
880            crate::ProcessProvenance::host(),
881        )
882    }
883
884    async fn run(worker: &DurableProcessWorker) -> Result<ProcessAwaitOutput, PluginError> {
885        worker
886            .run_process(
887                external_registration(),
888                ProcessExecutionContext::default(),
889                CancellationToken::new(),
890            )
891            .await
892    }
893
894    fn assert_facet(err: PluginError, facet: DurableStoreFacet) {
895        let PluginError::Session(message) = err else {
896            panic!("expected PluginError::Session, got {err:?}");
897        };
898        let expected = RuntimeError::durable_store_required(facet).to_string();
899        assert_eq!(message, expected, "worker must reject the {facet:?} facet");
900    }
901
902    #[tokio::test]
903    async fn durable_worker_rejects_ephemeral_attachment_store() {
904        let worker = worker(
905            Arc::new(InMemoryAttachmentStore::new()),
906            Arc::new(DurableArtifactStore::default()),
907            DurabilityTier::Durable,
908        );
909        let err = run(&worker)
910            .await
911            .expect_err("ephemeral attachment store must be rejected at the worker boundary");
912        assert_facet(err, DurableStoreFacet::AttachmentStore);
913    }
914
915    #[tokio::test]
916    async fn durable_worker_rejects_ephemeral_artifact_store() {
917        let worker = worker(
918            Arc::new(DurableAttachmentStore::default()),
919            Arc::new(lashlang::InMemoryLashlangArtifactStore::new()),
920            DurabilityTier::Durable,
921        );
922        let err = run(&worker)
923            .await
924            .expect_err("ephemeral artifact store must be rejected at the worker boundary");
925        assert_facet(err, DurableStoreFacet::ArtifactStore);
926    }
927
928    #[tokio::test]
929    async fn durable_worker_rejects_ephemeral_session_store_factory() {
930        let worker = worker(
931            Arc::new(DurableAttachmentStore::default()),
932            Arc::new(DurableArtifactStore::default()),
933            DurabilityTier::Inline,
934        );
935        let err = run(&worker)
936            .await
937            .expect_err("ephemeral session store factory must be rejected at the worker boundary");
938        assert_facet(err, DurableStoreFacet::SessionStore);
939    }
940
941    #[tokio::test]
942    async fn durable_worker_rejects_ephemeral_process_registry() {
943        let worker = worker_with_store_tiers(
944            Arc::new(DurableAttachmentStore::default()),
945            Arc::new(DurableArtifactStore::default()),
946            DurabilityTier::Durable,
947            DurabilityTier::Inline,
948            DurabilityTier::Durable,
949        );
950        let err = run(&worker)
951            .await
952            .expect_err("ephemeral process registry must be rejected at the worker boundary");
953        assert_facet(err, DurableStoreFacet::ProcessRegistry);
954    }
955
956    #[tokio::test]
957    async fn durable_worker_rejects_ephemeral_trigger_store() {
958        let worker = worker_with_store_tiers(
959            Arc::new(DurableAttachmentStore::default()),
960            Arc::new(DurableArtifactStore::default()),
961            DurabilityTier::Durable,
962            DurabilityTier::Durable,
963            DurabilityTier::Inline,
964        );
965        let err = run(&worker)
966            .await
967            .expect_err("ephemeral trigger store must be rejected at the worker boundary");
968        assert_facet(err, DurableStoreFacet::TriggerStore);
969    }
970
971    #[tokio::test]
972    async fn durable_worker_with_all_durable_stores_passes_store_facet_check() {
973        // Positive control: a durable worker wired against fully durable stores
974        // clears the store-facet guard and proceeds to run the (External)
975        // process.
976        let worker = worker(
977            Arc::new(DurableAttachmentStore::default()),
978            Arc::new(DurableArtifactStore::default()),
979            DurabilityTier::Durable,
980        );
981        let output = run(&worker)
982            .await
983            .expect("all-durable worker should pass the store-facet guard");
984        assert!(matches!(output, ProcessAwaitOutput::Success { .. }));
985    }
986
987    #[tokio::test]
988    async fn inline_worker_passes_store_facet_check_with_ephemeral_stores() {
989        // Inline controllers impose no requirement, so an in-memory worker runs
990        // unchanged — the durable-first guard must not regress inline hosts.
991        let runtime_host = RuntimeHostConfig::in_memory();
992        let plugin_host = Arc::new(crate::PluginHost::new(Vec::new()));
993        let factory: Arc<dyn SessionStoreFactory> = Arc::new(TierSessionStoreFactory {
994            tier: DurabilityTier::Inline,
995        });
996        let registry: Arc<dyn ProcessRegistry> =
997            Arc::new(crate::TestLocalProcessRegistry::default());
998        let worker = DurableProcessWorker::new(DurableProcessWorkerConfig::new(
999            plugin_host,
1000            runtime_host,
1001            factory,
1002            registry,
1003        ));
1004        let output = run(&worker)
1005            .await
1006            .expect("inline worker should pass the store-facet guard");
1007        assert!(matches!(output, ProcessAwaitOutput::Success { .. }));
1008    }
1009}