Skip to main content

lash_core/runtime/
process_worker.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use tokio_util::sync::CancellationToken;
5
6use super::effect::ProcessRunner;
7use super::session_manager::RuntimeSessionServices;
8use super::{EmbeddedRuntimeBuilder, RUNTIME_TURN_LEASE_TTL_MS, RuntimeHostConfig};
9use crate::{
10    InMemorySessionStore, LashRuntime, PluginError, PluginFactory, PluginHost, PluginStack,
11    ProcessAwaitOutput, ProcessExecutionContext, ProcessInput, ProcessLease,
12    ProcessLeaseCompletion, ProcessRecord, ProcessRegistration, ProcessRegistry,
13    SessionStoreFactory,
14};
15
16/// Deployment-local configuration for rebuilding durable process executions.
17///
18/// Process rows intentionally carry only portable process input and provenance.
19/// Workers provide the host profile, plugins, providers, stores, secrets, and
20/// host capabilities for the deployment that owns those rows.
21#[derive(Clone)]
22pub struct DurableProcessWorkerConfig {
23    pub plugin_host: Arc<PluginHost>,
24    pub runtime_host: RuntimeHostConfig,
25    pub session_policy: crate::SessionPolicy,
26    pub session_store_factory: Arc<dyn SessionStoreFactory>,
27    pub process_registry: Arc<dyn ProcessRegistry>,
28    pub host_event_store: Arc<dyn crate::HostEventStore>,
29    /// Residency for sessions the worker rebuilds to run a process. Defaults to
30    /// [`Residency::KeepAll`]; a host running [`Residency::ActivePathOnly`] wires
31    /// it here so the worker's rebuilt sessions trim to the active path too,
32    /// instead of silently diverging from the live runtime by keeping the full
33    /// graph resident.
34    pub residency: crate::Residency,
35}
36
37impl DurableProcessWorkerConfig {
38    pub fn new(
39        plugin_host: Arc<PluginHost>,
40        runtime_host: RuntimeHostConfig,
41        session_store_factory: Arc<dyn SessionStoreFactory>,
42        process_registry: Arc<dyn ProcessRegistry>,
43    ) -> Self {
44        Self {
45            plugin_host,
46            runtime_host,
47            session_policy: crate::SessionPolicy::default(),
48            session_store_factory,
49            process_registry,
50            host_event_store: Arc::new(crate::InMemoryHostEventStore::default()),
51            residency: crate::Residency::default(),
52        }
53    }
54
55    pub fn with_host_event_store(mut self, store: Arc<dyn crate::HostEventStore>) -> Self {
56        self.host_event_store = store;
57        self
58    }
59
60    pub fn with_session_policy(mut self, policy: crate::SessionPolicy) -> Self {
61        self.session_policy = policy;
62        self
63    }
64
65    pub fn with_residency(mut self, residency: crate::Residency) -> Self {
66        self.residency = residency;
67        self
68    }
69
70    pub fn from_plugin_factories(
71        plugin_factories: impl IntoIterator<Item = Arc<dyn PluginFactory>>,
72        runtime_host: RuntimeHostConfig,
73        session_store_factory: Arc<dyn SessionStoreFactory>,
74        process_registry: Arc<dyn ProcessRegistry>,
75    ) -> Self {
76        Self::new(
77            Arc::new(PluginHost::new(plugin_factories.into_iter().collect())),
78            runtime_host,
79            session_store_factory,
80            process_registry,
81        )
82    }
83
84    pub fn from_plugin_stack(
85        plugin_stack: PluginStack,
86        runtime_host: RuntimeHostConfig,
87        session_store_factory: Arc<dyn SessionStoreFactory>,
88        process_registry: Arc<dyn ProcessRegistry>,
89    ) -> Self {
90        Self::from_plugin_factories(
91            plugin_stack.into_factories(),
92            runtime_host,
93            session_store_factory,
94            process_registry,
95        )
96    }
97}
98
99/// Reconstructable background-process worker.
100#[derive(Clone)]
101pub struct DurableProcessWorker {
102    config: Arc<DurableProcessWorkerConfig>,
103}
104
105/// Why a recovery run did not produce a terminal outcome under the lease.
106enum RecoverFailure {
107    /// The lease was lost mid-run (another owner reclaimed an expired lease).
108    /// The losing worker must not write a terminal outcome — the new owner is
109    /// now the single writer.
110    LeaseLost(PluginError),
111    /// The process could not be run (rebuild/store-facet failure). The lease is
112    /// still held, so this worker terminalizes the row.
113    Run(PluginError),
114}
115
116impl DurableProcessWorker {
117    pub fn new(config: DurableProcessWorkerConfig) -> Self {
118        Self {
119            config: Arc::new(config),
120        }
121    }
122
123    pub fn from_shared_config(config: Arc<DurableProcessWorkerConfig>) -> Self {
124        Self { config }
125    }
126
127    pub fn config(&self) -> &DurableProcessWorkerConfig {
128        &self.config
129    }
130
131    pub async fn run_process(
132        &self,
133        registration: ProcessRegistration,
134        execution_context: ProcessExecutionContext,
135        cancellation: CancellationToken,
136    ) -> Result<ProcessAwaitOutput, PluginError> {
137        let scoped_effect_controller = self
138            .config
139            .runtime_host
140            .control
141            .effect_host
142            .scoped_static(crate::EffectScope::process(registration.id.clone()))
143            .map_err(|err| PluginError::Session(err.to_string()))?
144            .ok_or_else(|| {
145                PluginError::Session(
146                    "process worker effect host must provide a static process scope".to_string(),
147                )
148            })?;
149        self.run_process_with_scoped_effect_controller(
150            registration,
151            execution_context,
152            scoped_effect_controller,
153            cancellation,
154        )
155        .await
156    }
157
158    pub async fn run_process_with_scoped_effect_controller(
159        &self,
160        registration: ProcessRegistration,
161        execution_context: ProcessExecutionContext,
162        scoped_effect_controller: crate::ScopedEffectController<'_>,
163        cancellation: CancellationToken,
164    ) -> Result<ProcessAwaitOutput, PluginError> {
165        self.ensure_stable_process_id(&registration)?;
166        self.ensure_host_profile_matches(&registration)?;
167        self.ensure_durable_store_facets()?;
168        if let ProcessInput::External { metadata } = registration.input.as_ref() {
169            return Ok(ProcessAwaitOutput::Success {
170                value: serde_json::json!({ "metadata": metadata.clone() }),
171                control: None,
172            });
173        }
174        let runtime = self.runtime_for_registration(&registration).await?;
175        let manager = RuntimeSessionServices::new(&runtime, true, None).map_err(|err| {
176            PluginError::Session(format!(
177                "failed to build runtime env for process `{}`: {err}",
178                registration.id
179            ))
180        })?;
181        Ok(manager
182            .run_process(
183                registration,
184                execution_context,
185                Arc::clone(&self.config.process_registry),
186                scoped_effect_controller,
187                cancellation,
188            )
189            .await)
190    }
191
192    /// Sweep the registry for non-terminal processes and re-execute the ones
193    /// this worker can claim, driving each to a terminal state.
194    ///
195    /// This is the crash-recovery counterpart to a worker that ran a process
196    /// from a live turn: a trigger/host-event-started process whose worker
197    /// died mid-flight is left non-terminal in the registry, and a subsequent
198    /// worker reopening that registry must finish it. The sweep:
199    ///
200    /// 1. lists every non-terminal process ([`ProcessRegistry::list_non_terminal`]);
201    /// 2. claims the durable single-owner [`ProcessLease`] over each — a process
202    ///    already leased live by *another* owner is skipped (it is being run by
203    ///    that owner right now), so a non-terminal process is re-run by exactly
204    ///    one owner (lease fencing);
205    /// 3. runs the claimed process on this worker's wired controller, renewing
206    ///    the lease across the long-running execution so a healthy recovery is
207    ///    not swept out from under itself;
208    /// 4. writes the terminal outcome and releases the lease.
209    ///
210    /// Idempotent by `process_id`: terminal processes are never in the worklist,
211    /// and a process that became terminal between the list and the claim is
212    /// detected after claiming and skipped, so re-running a recovery sweep does
213    /// not double-execute completed work.
214    pub async fn drive_pending_processes(&self) -> Result<(), PluginError> {
215        let records = self.config.process_registry.list_non_terminal().await?;
216        for record in records {
217            // Run each claimed process on its OWN lease-fenced task. A sequential
218            // drive that awaited each process to terminal would deadlock a process
219            // that blocks awaiting a nested child (`start child` then `await`, or a
220            // subagent fan-out): the one drive task would park inside the parent's
221            // await and never claim the child. Spawning frees the loop so a
222            // subsequent drive (poke or poll) claims and runs the child, and the
223            // per-process `ProcessLease` fences concurrent owners — so spawning a
224            // task per pending row on every drive is idempotent (a row already
225            // running is skipped on claim conflict) and one failing row never
226            // aborts the rest of the sweep.
227            let worker = self.clone();
228            tokio::spawn(async move { worker.recover_process(record).await });
229        }
230        Ok(())
231    }
232
233    async fn recover_process(&self, record: ProcessRecord) {
234        let owner_id = format!("process-recovery-{}", uuid::Uuid::new_v4());
235        let process_id = record.id.clone();
236        // Skip if held live by another owner: a claim conflict means a worker is
237        // already running this process, so re-running here would violate the
238        // single-owner contract. Treat any claim failure as "leased elsewhere".
239        let Ok(lease) = self
240            .config
241            .process_registry
242            .claim_process_lease(&process_id, &owner_id, RUNTIME_TURN_LEASE_TTL_MS)
243            .await
244        else {
245            return;
246        };
247        // The process may have reached a terminal state between the list and the
248        // claim. Idempotent by process_id: do not re-execute a finished process.
249        if self
250            .config
251            .process_registry
252            .get_process(&process_id)
253            .await
254            .is_some_and(|current| current.is_terminal())
255        {
256            self.release_or_log(&lease).await;
257            return;
258        }
259        let registration = ProcessRegistration {
260            id: record.id,
261            input: record.input,
262            event_types: record.event_types,
263            provenance: record.provenance.clone(),
264            env_ref: record.env_ref.clone(),
265            wake_target: record.wake_target.clone(),
266        };
267        let execution_context = ProcessExecutionContext::default();
268        match self
269            .run_process_with_lease_renewal(registration, execution_context, lease.clone())
270            .await
271        {
272            // Ran to a terminal outcome (success or a process-level failure) while
273            // holding the lease: this owner is the single writer of the terminal.
274            Ok(output) => self.complete_and_release(&lease, &process_id, output).await,
275            // The lease was lost mid-run — another owner reclaimed the expired
276            // lease and is now running this process. Do NOT write a terminal
277            // outcome or release the lease: that would race the new owner and
278            // could record a succeeded process as Failed. Leave the row to the
279            // lease holder; it will finish (or another sweep retries it).
280            Err(RecoverFailure::LeaseLost(err)) => {
281                tracing::warn!(
282                    process_id = %process_id,
283                    error = %err,
284                    "process recovery lost its lease mid-run; deferring to the new owner",
285                );
286            }
287            // The process could not be run at all (rebuild/store-facet failure):
288            // terminalize as a recovery failure so the row leaves the worklist.
289            Err(RecoverFailure::Run(err)) => {
290                let output = ProcessAwaitOutput::Failure {
291                    class: crate::ToolFailureClass::Execution,
292                    code: "process_recovery_failed".to_string(),
293                    message: err.to_string(),
294                    raw: None,
295                    control: None,
296                };
297                self.complete_and_release(&lease, &process_id, output).await;
298            }
299        }
300    }
301
302    /// Write a recovered process's terminal outcome (the running lease owner is
303    /// the single writer) and then release the lease, logging either failure
304    /// rather than aborting — the lease's TTL is the backstop.
305    async fn complete_and_release(
306        &self,
307        lease: &ProcessLease,
308        process_id: &str,
309        output: ProcessAwaitOutput,
310    ) {
311        // Fence the terminal write: re-confirm the lease immediately before
312        // writing. `renew_process_lease` is rejected (by owner/lease_token/
313        // fencing_token) if another owner has reclaimed an expired lease, and on
314        // success extends the window so the back-to-back write lands inside the
315        // owned interval. A worker that stalled past its TTL therefore cannot
316        // overwrite the new owner's outcome — it defers instead.
317        let fenced = match self
318            .config
319            .process_registry
320            .renew_process_lease(lease, RUNTIME_TURN_LEASE_TTL_MS)
321            .await
322        {
323            Ok(renewed) => renewed,
324            Err(err) => {
325                tracing::warn!(
326                    process_id = %process_id,
327                    error = %err,
328                    "lost process lease before terminal write; deferring to the new owner",
329                );
330                return;
331            }
332        };
333        if let Err(err) = self
334            .config
335            .process_registry
336            .complete_process(process_id, output)
337            .await
338        {
339            tracing::warn!(
340                process_id = %process_id,
341                error = %err,
342                "failed to write recovered process terminal outcome",
343            );
344        }
345        self.release_or_log(&fenced).await;
346    }
347
348    async fn release_or_log(&self, lease: &ProcessLease) {
349        if let Err(err) = self.release_process_lease(lease).await {
350            tracing::warn!(
351                process_id = %lease.process_id,
352                error = %err,
353                "failed to release recovered process lease",
354            );
355        }
356    }
357
358    /// Run a recovered process while renewing its lease across the execution,
359    /// mirroring the turn-lease renewal that keeps a long-running effect's lease
360    /// from expiring under the live owner.
361    async fn run_process_with_lease_renewal(
362        &self,
363        registration: ProcessRegistration,
364        execution_context: ProcessExecutionContext,
365        mut lease: ProcessLease,
366    ) -> Result<ProcessAwaitOutput, RecoverFailure> {
367        let process_id = registration.id.clone();
368        let cancellation = CancellationToken::new();
369        let cancel_watcher = {
370            let registry = Arc::clone(&self.config.process_registry);
371            let process_id = process_id.clone();
372            let cancellation = cancellation.clone();
373            tokio::spawn(async move {
374                match registry
375                    .wait_event_after(&process_id, "process.cancel_requested", 0)
376                    .await
377                {
378                    Ok(_) => cancellation.cancel(),
379                    Err(err) => tracing::warn!(
380                        process_id = %process_id,
381                        error = %err,
382                        "process cancel watcher stopped before observing cancellation",
383                    ),
384                }
385            })
386        };
387        let pending = self.run_process(registration, execution_context, cancellation.clone());
388        tokio::pin!(pending);
389        loop {
390            tokio::select! {
391                outcome = &mut pending => {
392                    cancel_watcher.abort();
393                    return outcome.map_err(RecoverFailure::Run);
394                }
395                _ = tokio::time::sleep(process_lease_renew_interval()) => {
396                    match self
397                        .config
398                        .process_registry
399                        .renew_process_lease(&lease, RUNTIME_TURN_LEASE_TTL_MS)
400                        .await
401                    {
402                        Ok(renewed) => lease = renewed,
403                        Err(err) => {
404                            cancellation.cancel();
405                            cancel_watcher.abort();
406                            return Err(RecoverFailure::LeaseLost(err));
407                        }
408                    }
409                }
410            }
411        }
412    }
413
414    async fn release_process_lease(&self, lease: &ProcessLease) -> Result<(), PluginError> {
415        self.config
416            .process_registry
417            .complete_process_lease(&ProcessLeaseCompletion::from_lease(lease))
418            .await
419    }
420
421    pub async fn request_process_cancel(
422        &self,
423        process_id: &str,
424        reason: Option<String>,
425    ) -> Result<(), PluginError> {
426        self.config
427            .process_registry
428            .append_event(
429                process_id,
430                crate::ProcessEventAppendRequest::cancel_requested(process_id, reason),
431            )
432            .await
433            .map(|_| ())
434    }
435
436    async fn runtime_for_registration(
437        &self,
438        registration: &ProcessRegistration,
439    ) -> Result<LashRuntime, PluginError> {
440        match registration.input.as_ref() {
441            ProcessInput::SessionTurn { create_request, .. } => {
442                self.runtime_for_session_turn(registration, create_request.as_ref())
443                    .await
444            }
445            ProcessInput::ToolCall { .. } | ProcessInput::LashlangProcess { .. } => {
446                self.runtime_for_process_env(registration).await
447            }
448            ProcessInput::External { .. } => unreachable!("external processes short-circuit"),
449        }
450    }
451
452    async fn runtime_for_session_turn(
453        &self,
454        registration: &ProcessRegistration,
455        create_request: &crate::SessionCreateRequest,
456    ) -> Result<LashRuntime, PluginError> {
457        let mut policy = create_request
458            .policy
459            .clone()
460            .unwrap_or_else(|| self.config.session_policy.clone());
461        if policy.recorded_provider_id().is_empty() {
462            policy.provider_id = self.config.session_policy.provider_id.clone();
463        }
464        self.build_ephemeral_runtime(
465            format!("process-session-turn:{}", registration.id),
466            policy,
467            create_request.plugin_options.clone(),
468            "session turn request",
469        )
470        .await
471    }
472
473    async fn runtime_for_process_env(
474        &self,
475        registration: &ProcessRegistration,
476    ) -> Result<LashRuntime, PluginError> {
477        let Some(env_ref) = registration.env_ref.as_ref() else {
478            return Err(PluginError::Session(format!(
479                "process `{}` is missing a captured execution env",
480                registration.id
481            )));
482        };
483        let env = crate::load_process_execution_env(
484            self.config
485                .runtime_host
486                .durability
487                .lashlang_artifact_store
488                .as_ref(),
489            env_ref,
490        )
491        .await?;
492        self.build_ephemeral_runtime(
493            format!("process-env:{}", registration.id),
494            env.policy,
495            env.plugin_options,
496            env_ref.as_str(),
497        )
498        .await
499    }
500
501    async fn build_ephemeral_runtime(
502        &self,
503        session_id: String,
504        policy: crate::SessionPolicy,
505        plugin_options: crate::PluginOptions,
506        source_label: &str,
507    ) -> Result<LashRuntime, PluginError> {
508        let store = Arc::new(InMemorySessionStore::default());
509        EmbeddedRuntimeBuilder::new()
510            .with_session_id(session_id.to_string())
511            .with_plugin_host(self.config.plugin_host.as_ref().clone())
512            .with_runtime_host(self.config.runtime_host.clone())
513            .with_policy(policy)
514            .with_plugin_options(plugin_options)
515            .with_session_store_factory(Arc::clone(&self.config.session_store_factory))
516            .with_host_event_store(Arc::clone(&self.config.host_event_store))
517            .with_process_registry(Arc::clone(&self.config.process_registry))
518            .with_residency(self.config.residency)
519            .with_store(store)
520            .build()
521            .await
522            .map_err(|err| {
523                PluginError::Session(format!(
524                    "failed to build process worker runtime for {source_label}: {err}"
525                ))
526            })
527    }
528
529    /// Enforce the durable-first wiring invariant at the worker process-run
530    /// boundary: when the worker was wired with a durable effect host, every
531    /// store it will execute against must also be durable. A durable host
532    /// running against any ephemeral store fails loudly here rather than
533    /// silently re-executing a process against non-durable state.
534    ///
535    /// Inline controllers (the default tier) impose no requirement, so
536    /// inline/in-memory workers pass unchanged.
537    fn ensure_durable_store_facets(&self) -> Result<(), PluginError> {
538        if self
539            .config
540            .runtime_host
541            .control
542            .effect_host
543            .durability_tier()
544            != crate::DurabilityTier::Durable
545        {
546            return Ok(());
547        }
548        let require = |facet: crate::DurableStoreFacet| {
549            PluginError::Session(crate::RuntimeError::durable_store_required(facet).to_string())
550        };
551        if self
552            .config
553            .runtime_host
554            .durability
555            .attachment_store
556            .persistence()
557            .durability_tier()
558            != crate::DurabilityTier::Durable
559        {
560            return Err(require(crate::DurableStoreFacet::AttachmentStore));
561        }
562        if self
563            .config
564            .runtime_host
565            .durability
566            .lashlang_artifact_store
567            .durability_tier()
568            != crate::DurabilityTier::Durable
569        {
570            return Err(require(crate::DurableStoreFacet::ArtifactStore));
571        }
572        if self.config.session_store_factory.durability_tier() != crate::DurabilityTier::Durable {
573            return Err(require(crate::DurableStoreFacet::SessionStore));
574        }
575        if self.config.process_registry.durability_tier() != crate::DurabilityTier::Durable {
576            return Err(require(crate::DurableStoreFacet::ProcessRegistry));
577        }
578        if self.config.host_event_store.durability_tier() != crate::DurabilityTier::Durable {
579            return Err(require(crate::DurableStoreFacet::HostEventStore));
580        }
581        Ok(())
582    }
583
584    /// Enforce the stable-process-id invariant at every (re-)execution: process
585    /// execution identity is the persisted `process_id`, so a retry — a Restate
586    /// `run` re-invocation (keyed `LashProcessWorkflow/{process_id}`) or a
587    /// recovery sweep re-running a non-terminal row — must present that stable
588    /// id. An empty/fresh id has lost its idempotency anchor and is rejected
589    /// loudly here, mirroring how `EffectScope` rejects an
590    /// empty turn id at the durable-effect boundary.
591    fn ensure_stable_process_id(
592        &self,
593        registration: &ProcessRegistration,
594    ) -> Result<(), PluginError> {
595        if registration.id.trim().is_empty() {
596            return Err(PluginError::Session(
597                crate::RuntimeError::missing_process_execution_id().to_string(),
598            ));
599        }
600        Ok(())
601    }
602
603    fn ensure_host_profile_matches(
604        &self,
605        registration: &ProcessRegistration,
606    ) -> Result<(), PluginError> {
607        let actual = registration.provenance.host_profile_id.as_str();
608        let expected = self.config.runtime_host.profile.host_profile_id.as_str();
609        if actual.is_empty() || actual == expected {
610            return Ok(());
611        }
612        Err(PluginError::Session(format!(
613            "process `{}` was created for host profile `{actual}` but this worker is `{expected}`",
614            registration.id
615        )))
616    }
617}
618
619fn process_lease_renew_interval() -> Duration {
620    Duration::from_millis(process_lease_renew_interval_ms())
621}
622
623#[cfg(test)]
624fn process_lease_renew_interval_ms() -> u64 {
625    25
626}
627
628#[cfg(not(test))]
629fn process_lease_renew_interval_ms() -> u64 {
630    30_000
631}
632
633#[cfg(test)]
634mod boundary_tests {
635    use super::*;
636    use crate::{
637        AttachmentStore, AttachmentStoreError, AttachmentStorePersistence, DurabilityTier,
638        DurableStoreFacet, HostEventStore, InMemoryAttachmentStore, LashlangArtifactStore,
639        ProcessInput, ProcessRegistration, RuntimeEffectController, RuntimeError, StoredAttachment,
640    };
641    use lash_sansio::{AttachmentCreateMeta, AttachmentId, AttachmentRef};
642
643    /// Effect controller that reports the durable tier; the worker boundary
644    /// only reads the tier, so the effect path is never exercised here.
645    #[derive(Default)]
646    struct DurableController;
647
648    #[async_trait::async_trait]
649    impl RuntimeEffectController for DurableController {
650        fn durability_tier(&self) -> DurabilityTier {
651            DurabilityTier::Durable
652        }
653
654        async fn execute_effect(
655            &self,
656            _envelope: crate::RuntimeEffectEnvelope,
657            _local_executor: crate::RuntimeEffectLocalExecutor<'_>,
658        ) -> Result<crate::RuntimeEffectOutcome, crate::RuntimeEffectControllerError> {
659            unreachable!("worker boundary rejects before executing any effect")
660        }
661    }
662
663    /// Attachment store reporting a durable tier over in-memory storage.
664    #[derive(Default)]
665    struct DurableAttachmentStore {
666        inner: InMemoryAttachmentStore,
667    }
668
669    #[async_trait::async_trait]
670    impl AttachmentStore for DurableAttachmentStore {
671        fn persistence(&self) -> AttachmentStorePersistence {
672            AttachmentStorePersistence::Durable
673        }
674
675        async fn put(
676            &self,
677            bytes: Vec<u8>,
678            meta: AttachmentCreateMeta,
679        ) -> Result<AttachmentRef, AttachmentStoreError> {
680            self.inner.put(bytes, meta).await
681        }
682
683        async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
684            self.inner.get(id).await
685        }
686    }
687
688    /// Lashlang artifact store reporting a durable tier over in-memory storage.
689    #[derive(Default)]
690    struct DurableArtifactStore {
691        inner: lashlang::InMemoryLashlangArtifactStore,
692    }
693
694    #[async_trait::async_trait]
695    impl LashlangArtifactStore for DurableArtifactStore {
696        fn durability_tier(&self) -> DurabilityTier {
697            DurabilityTier::Durable
698        }
699
700        async fn put_module_artifact(
701            &self,
702            artifact: &lashlang::ModuleArtifact,
703        ) -> Result<(), lashlang::ArtifactStoreError> {
704            self.inner.put_module_artifact(artifact).await
705        }
706
707        async fn get_module_artifact(
708            &self,
709            module_ref: &lashlang::ModuleRef,
710        ) -> Result<Option<Arc<lashlang::ModuleArtifact>>, lashlang::ArtifactStoreError> {
711            self.inner.get_module_artifact(module_ref).await
712        }
713
714        async fn put_artifact_bytes(
715            &self,
716            artifact_ref: &str,
717            descriptor: &str,
718            bytes: &[u8],
719        ) -> Result<(), lashlang::ArtifactStoreError> {
720            self.inner
721                .put_artifact_bytes(artifact_ref, descriptor, bytes)
722                .await
723        }
724
725        async fn get_artifact_bytes(
726            &self,
727            artifact_ref: &str,
728        ) -> Result<Option<Vec<u8>>, lashlang::ArtifactStoreError> {
729            self.inner.get_artifact_bytes(artifact_ref).await
730        }
731    }
732
733    /// Session store factory whose declared tier is configurable; it never has
734    /// to create a store because the worker boundary rejects first.
735    struct TierSessionStoreFactory {
736        tier: DurabilityTier,
737    }
738
739    #[async_trait::async_trait]
740    impl SessionStoreFactory for TierSessionStoreFactory {
741        fn durability_tier(&self) -> DurabilityTier {
742            self.tier
743        }
744
745        async fn create_store(
746            &self,
747            _request: &crate::SessionStoreCreateRequest,
748        ) -> Result<Arc<dyn crate::RuntimePersistence>, String> {
749            unreachable!("worker boundary rejects before creating a session store")
750        }
751
752        async fn delete_session(&self, _session_id: &str) -> Result<(), String> {
753            Ok(())
754        }
755    }
756
757    struct TierHostEventStore {
758        tier: DurabilityTier,
759        inner: crate::InMemoryHostEventStore,
760    }
761
762    impl TierHostEventStore {
763        fn new(tier: DurabilityTier) -> Self {
764            Self {
765                tier,
766                inner: crate::InMemoryHostEventStore::default(),
767            }
768        }
769    }
770
771    #[async_trait::async_trait]
772    impl HostEventStore for TierHostEventStore {
773        fn durability_tier(&self) -> DurabilityTier {
774            self.tier
775        }
776
777        async fn register_subscription(
778            &self,
779            draft: crate::TriggerSubscriptionDraft,
780        ) -> Result<crate::TriggerSubscriptionRecord, PluginError> {
781            self.inner.register_subscription(draft).await
782        }
783
784        async fn list_subscriptions(
785            &self,
786            filter: crate::TriggerSubscriptionFilter,
787        ) -> Result<Vec<crate::TriggerSubscriptionRecord>, PluginError> {
788            self.inner.list_subscriptions(filter).await
789        }
790
791        async fn cancel_subscription(
792            &self,
793            session_id: &str,
794            handle: &str,
795        ) -> Result<bool, PluginError> {
796            self.inner.cancel_subscription(session_id, handle).await
797        }
798
799        async fn delete_session_subscriptions(
800            &self,
801            session_id: &str,
802        ) -> Result<usize, PluginError> {
803            self.inner.delete_session_subscriptions(session_id).await
804        }
805
806        async fn record_occurrence(
807            &self,
808            request: crate::HostEventOccurrenceRequest,
809        ) -> Result<crate::HostEventOccurrenceRecord, PluginError> {
810            self.inner.record_occurrence(request).await
811        }
812
813        async fn reserve_matching_deliveries(
814            &self,
815            occurrence_id: &str,
816        ) -> Result<Vec<crate::TriggerDeliveryReservation>, PluginError> {
817            self.inner.reserve_matching_deliveries(occurrence_id).await
818        }
819    }
820
821    /// Build a worker whose controller is durable but whose stores can be set
822    /// per-facet to durable/ephemeral, so each facet's loud rejection can be
823    /// exercised independently.
824    fn worker(
825        attachment: Arc<dyn AttachmentStore>,
826        artifact: Arc<dyn LashlangArtifactStore>,
827        session_store_tier: DurabilityTier,
828    ) -> DurableProcessWorker {
829        worker_with_store_tiers(
830            attachment,
831            artifact,
832            session_store_tier,
833            DurabilityTier::Durable,
834            DurabilityTier::Durable,
835        )
836    }
837
838    fn worker_with_store_tiers(
839        attachment: Arc<dyn AttachmentStore>,
840        artifact: Arc<dyn LashlangArtifactStore>,
841        session_store_tier: DurabilityTier,
842        process_registry_tier: DurabilityTier,
843        host_event_store_tier: DurabilityTier,
844    ) -> DurableProcessWorker {
845        let mut runtime_host = RuntimeHostConfig::in_memory();
846        runtime_host.control.effect_host =
847            Arc::new(crate::InlineEffectHost::new(Arc::new(DurableController)));
848        runtime_host.durability.attachment_store = attachment;
849        runtime_host.durability.lashlang_artifact_store = artifact;
850        let plugin_host = Arc::new(crate::PluginHost::new(Vec::new()));
851        let factory: Arc<dyn SessionStoreFactory> = Arc::new(TierSessionStoreFactory {
852            tier: session_store_tier,
853        });
854        let registry: Arc<dyn ProcessRegistry> = Arc::new(
855            crate::TestLocalProcessRegistry::default().with_durability_tier(process_registry_tier),
856        );
857        let host_event_store: Arc<dyn HostEventStore> =
858            Arc::new(TierHostEventStore::new(host_event_store_tier));
859        DurableProcessWorker::new(
860            DurableProcessWorkerConfig::new(plugin_host, runtime_host, factory, registry)
861                .with_host_event_store(host_event_store),
862        )
863    }
864
865    fn external_registration() -> ProcessRegistration {
866        ProcessRegistration::new(
867            "worker-boundary-process",
868            ProcessInput::External {
869                metadata: serde_json::json!({}),
870            },
871            crate::ProcessProvenance::host("default"),
872        )
873    }
874
875    async fn run(worker: &DurableProcessWorker) -> Result<ProcessAwaitOutput, PluginError> {
876        worker
877            .run_process(
878                external_registration(),
879                ProcessExecutionContext::default(),
880                CancellationToken::new(),
881            )
882            .await
883    }
884
885    fn assert_facet(err: PluginError, facet: DurableStoreFacet) {
886        let PluginError::Session(message) = err else {
887            panic!("expected PluginError::Session, got {err:?}");
888        };
889        let expected = RuntimeError::durable_store_required(facet).to_string();
890        assert_eq!(message, expected, "worker must reject the {facet:?} facet");
891    }
892
893    #[tokio::test]
894    async fn durable_worker_rejects_ephemeral_attachment_store() {
895        let worker = worker(
896            Arc::new(InMemoryAttachmentStore::new()),
897            Arc::new(DurableArtifactStore::default()),
898            DurabilityTier::Durable,
899        );
900        let err = run(&worker)
901            .await
902            .expect_err("ephemeral attachment store must be rejected at the worker boundary");
903        assert_facet(err, DurableStoreFacet::AttachmentStore);
904    }
905
906    #[tokio::test]
907    async fn durable_worker_rejects_ephemeral_artifact_store() {
908        let worker = worker(
909            Arc::new(DurableAttachmentStore::default()),
910            Arc::new(lashlang::InMemoryLashlangArtifactStore::new()),
911            DurabilityTier::Durable,
912        );
913        let err = run(&worker)
914            .await
915            .expect_err("ephemeral artifact store must be rejected at the worker boundary");
916        assert_facet(err, DurableStoreFacet::ArtifactStore);
917    }
918
919    #[tokio::test]
920    async fn durable_worker_rejects_ephemeral_session_store_factory() {
921        let worker = worker(
922            Arc::new(DurableAttachmentStore::default()),
923            Arc::new(DurableArtifactStore::default()),
924            DurabilityTier::Inline,
925        );
926        let err = run(&worker)
927            .await
928            .expect_err("ephemeral session store factory must be rejected at the worker boundary");
929        assert_facet(err, DurableStoreFacet::SessionStore);
930    }
931
932    #[tokio::test]
933    async fn durable_worker_rejects_ephemeral_process_registry() {
934        let worker = worker_with_store_tiers(
935            Arc::new(DurableAttachmentStore::default()),
936            Arc::new(DurableArtifactStore::default()),
937            DurabilityTier::Durable,
938            DurabilityTier::Inline,
939            DurabilityTier::Durable,
940        );
941        let err = run(&worker)
942            .await
943            .expect_err("ephemeral process registry must be rejected at the worker boundary");
944        assert_facet(err, DurableStoreFacet::ProcessRegistry);
945    }
946
947    #[tokio::test]
948    async fn durable_worker_rejects_ephemeral_host_event_store() {
949        let worker = worker_with_store_tiers(
950            Arc::new(DurableAttachmentStore::default()),
951            Arc::new(DurableArtifactStore::default()),
952            DurabilityTier::Durable,
953            DurabilityTier::Durable,
954            DurabilityTier::Inline,
955        );
956        let err = run(&worker)
957            .await
958            .expect_err("ephemeral host event store must be rejected at the worker boundary");
959        assert_facet(err, DurableStoreFacet::HostEventStore);
960    }
961
962    #[tokio::test]
963    async fn durable_worker_with_all_durable_stores_passes_store_facet_check() {
964        // Positive control: a durable worker wired against fully durable stores
965        // clears the store-facet guard and proceeds to run the (External)
966        // process.
967        let worker = worker(
968            Arc::new(DurableAttachmentStore::default()),
969            Arc::new(DurableArtifactStore::default()),
970            DurabilityTier::Durable,
971        );
972        let output = run(&worker)
973            .await
974            .expect("all-durable worker should pass the store-facet guard");
975        assert!(matches!(output, ProcessAwaitOutput::Success { .. }));
976    }
977
978    #[tokio::test]
979    async fn inline_worker_passes_store_facet_check_with_ephemeral_stores() {
980        // Inline controllers impose no requirement, so an in-memory worker runs
981        // unchanged — the durable-first guard must not regress inline hosts.
982        let runtime_host = RuntimeHostConfig::in_memory();
983        let plugin_host = Arc::new(crate::PluginHost::new(Vec::new()));
984        let factory: Arc<dyn SessionStoreFactory> = Arc::new(TierSessionStoreFactory {
985            tier: DurabilityTier::Inline,
986        });
987        let registry: Arc<dyn ProcessRegistry> =
988            Arc::new(crate::TestLocalProcessRegistry::default());
989        let worker = DurableProcessWorker::new(DurableProcessWorkerConfig::new(
990            plugin_host,
991            runtime_host,
992            factory,
993            registry,
994        ));
995        let output = run(&worker)
996            .await
997            .expect("inline worker should pass the store-facet guard");
998        assert!(matches!(output, ProcessAwaitOutput::Success { .. }));
999    }
1000}