1use std::sync::Arc;
2use std::time::Duration;
3
4use tokio_util::sync::CancellationToken;
5
6use super::effect::ProcessRunner;
7use super::session_manager::RuntimeSessionServices;
8use super::{EmbeddedRuntimeBuilder, RUNTIME_TURN_LEASE_TTL_MS, RuntimeHostConfig};
9use crate::{
10 InMemorySessionStore, LashRuntime, PluginError, PluginFactory, PluginHost, PluginStack,
11 ProcessAwaitOutput, ProcessExecutionContext, ProcessInput, ProcessLease,
12 ProcessLeaseCompletion, ProcessRecord, ProcessRegistration, ProcessRegistry,
13 SessionStoreFactory,
14};
15
16#[derive(Clone)]
22pub struct DurableProcessWorkerConfig {
23 pub plugin_host: Arc<PluginHost>,
24 pub runtime_host: RuntimeHostConfig,
25 pub session_policy: crate::SessionPolicy,
26 pub session_store_factory: Arc<dyn SessionStoreFactory>,
27 pub process_registry: Arc<dyn ProcessRegistry>,
28 pub trigger_store: Arc<dyn crate::TriggerStore>,
29 #[doc(hidden)]
30 pub turn_phase_probe_slot: crate::runtime::RuntimeTurnPhaseProbeSlot,
31 pub residency: crate::Residency,
37}
38
39impl DurableProcessWorkerConfig {
40 pub fn new(
41 plugin_host: Arc<PluginHost>,
42 runtime_host: RuntimeHostConfig,
43 session_store_factory: Arc<dyn SessionStoreFactory>,
44 process_registry: Arc<dyn ProcessRegistry>,
45 ) -> Self {
46 Self {
47 plugin_host,
48 runtime_host,
49 session_policy: crate::SessionPolicy::default(),
50 session_store_factory,
51 process_registry,
52 trigger_store: Arc::new(crate::InMemoryTriggerStore::default()),
53 turn_phase_probe_slot: crate::runtime::RuntimeTurnPhaseProbeSlot::default(),
54 residency: crate::Residency::default(),
55 }
56 }
57
58 pub fn with_trigger_store(mut self, store: Arc<dyn crate::TriggerStore>) -> Self {
59 self.trigger_store = store;
60 self
61 }
62
63 pub fn with_session_policy(mut self, policy: crate::SessionPolicy) -> Self {
64 self.session_policy = policy;
65 self
66 }
67
68 pub fn with_residency(mut self, residency: crate::Residency) -> Self {
69 self.residency = residency;
70 self
71 }
72
73 #[doc(hidden)]
74 pub fn with_turn_phase_probe_slot(
75 mut self,
76 slot: crate::runtime::RuntimeTurnPhaseProbeSlot,
77 ) -> Self {
78 self.turn_phase_probe_slot = slot;
79 self
80 }
81
82 pub fn from_plugin_factories(
83 plugin_factories: impl IntoIterator<Item = Arc<dyn PluginFactory>>,
84 runtime_host: RuntimeHostConfig,
85 session_store_factory: Arc<dyn SessionStoreFactory>,
86 process_registry: Arc<dyn ProcessRegistry>,
87 ) -> Self {
88 Self::new(
89 Arc::new(PluginHost::new(plugin_factories.into_iter().collect())),
90 runtime_host,
91 session_store_factory,
92 process_registry,
93 )
94 }
95
96 pub fn from_plugin_stack(
97 plugin_stack: PluginStack,
98 runtime_host: RuntimeHostConfig,
99 session_store_factory: Arc<dyn SessionStoreFactory>,
100 process_registry: Arc<dyn ProcessRegistry>,
101 ) -> Self {
102 Self::from_plugin_factories(
103 plugin_stack.into_factories(),
104 runtime_host,
105 session_store_factory,
106 process_registry,
107 )
108 }
109}
110
111#[derive(Clone)]
113pub struct DurableProcessWorker {
114 config: Arc<DurableProcessWorkerConfig>,
115}
116
117enum RecoverFailure {
119 LeaseLost(PluginError),
123 Run(PluginError),
126}
127
128impl DurableProcessWorker {
129 pub fn new(config: DurableProcessWorkerConfig) -> Self {
130 Self {
131 config: Arc::new(config),
132 }
133 }
134
135 pub fn from_shared_config(config: Arc<DurableProcessWorkerConfig>) -> Self {
136 Self { config }
137 }
138
139 pub fn config(&self) -> &DurableProcessWorkerConfig {
140 &self.config
141 }
142
143 pub async fn run_process(
144 &self,
145 registration: ProcessRegistration,
146 execution_context: ProcessExecutionContext,
147 cancellation: CancellationToken,
148 ) -> Result<ProcessAwaitOutput, PluginError> {
149 let scoped_effect_controller = self
150 .config
151 .runtime_host
152 .control
153 .effect_host
154 .scoped_static(crate::ExecutionScope::process(registration.id.clone()))
155 .map_err(|err| PluginError::Session(err.to_string()))?
156 .ok_or_else(|| {
157 PluginError::Session(
158 "process worker effect host must provide a static process scope".to_string(),
159 )
160 })?;
161 self.run_process_with_scoped_effect_controller(
162 registration,
163 execution_context,
164 scoped_effect_controller,
165 cancellation,
166 )
167 .await
168 }
169
170 pub async fn run_process_with_scoped_effect_controller(
171 &self,
172 registration: ProcessRegistration,
173 execution_context: ProcessExecutionContext,
174 scoped_effect_controller: crate::ScopedEffectController<'_>,
175 cancellation: CancellationToken,
176 ) -> Result<ProcessAwaitOutput, PluginError> {
177 self.ensure_stable_process_id(®istration)?;
178 self.ensure_host_profile_matches(®istration)?;
179 self.ensure_durable_store_facets()?;
180 if let ProcessInput::External { metadata } = registration.input.as_ref() {
181 return Ok(ProcessAwaitOutput::Success {
182 value: serde_json::json!({ "metadata": metadata.clone() }),
183 control: None,
184 });
185 }
186 let mut runtime = self.runtime_for_registration(®istration).await?;
187 let originator_scope = if let crate::ProcessOriginator::Session { scope } =
188 ®istration.provenance.originator
189 {
190 Some(scope)
191 } else {
192 None
193 };
194 let probe_scope = registration.wake_target.as_ref().or(originator_scope);
195 if let Some(probe) =
196 probe_scope.and_then(|scope| self.config.turn_phase_probe_slot.get_for_scope(scope))
197 {
198 runtime.set_turn_phase_probe(probe);
199 }
200 let manager = RuntimeSessionServices::new(&runtime, true, None).map_err(|err| {
201 PluginError::Session(format!(
202 "failed to build runtime env for process `{}`: {err}",
203 registration.id
204 ))
205 })?;
206 Ok(manager
207 .run_process(
208 registration,
209 execution_context,
210 Arc::clone(&self.config.process_registry),
211 scoped_effect_controller,
212 cancellation,
213 )
214 .await)
215 }
216
217 pub async fn drive_pending_processes(&self) -> Result<(), PluginError> {
240 let records = self.config.process_registry.list_non_terminal().await?;
241 for record in records {
242 let worker = self.clone();
253 tokio::spawn(async move { worker.recover_process(record).await });
254 }
255 Ok(())
256 }
257
258 async fn recover_process(&self, record: ProcessRecord) {
259 let owner_id = format!("process-recovery-{}", uuid::Uuid::new_v4());
260 let process_id = record.id.clone();
261 let Ok(lease) = self
265 .config
266 .process_registry
267 .claim_process_lease(&process_id, &owner_id, RUNTIME_TURN_LEASE_TTL_MS)
268 .await
269 else {
270 return;
271 };
272 if self
275 .config
276 .process_registry
277 .get_process(&process_id)
278 .await
279 .is_some_and(|current| current.is_terminal())
280 {
281 self.release_or_log(&lease).await;
282 return;
283 }
284 let registration = ProcessRegistration {
285 id: record.id,
286 input: record.input,
287 event_types: record.event_types,
288 provenance: record.provenance.clone(),
289 env_ref: record.env_ref.clone(),
290 wake_target: record.wake_target.clone(),
291 };
292 let execution_context = ProcessExecutionContext::default();
293 match self
294 .run_process_with_lease_renewal(registration, execution_context, lease.clone())
295 .await
296 {
297 Ok(output) => self.complete_and_release(&lease, &process_id, output).await,
300 Err(RecoverFailure::LeaseLost(err)) => {
306 tracing::warn!(
307 process_id = %process_id,
308 error = %err,
309 "process recovery lost its lease mid-run; deferring to the new owner",
310 );
311 }
312 Err(RecoverFailure::Run(err)) => {
315 let output = ProcessAwaitOutput::Failure {
316 class: crate::ToolFailureClass::Execution,
317 code: "process_recovery_failed".to_string(),
318 message: err.to_string(),
319 raw: None,
320 control: None,
321 };
322 self.complete_and_release(&lease, &process_id, output).await;
323 }
324 }
325 }
326
327 async fn complete_and_release(
331 &self,
332 lease: &ProcessLease,
333 process_id: &str,
334 output: ProcessAwaitOutput,
335 ) {
336 let fenced = match self
343 .config
344 .process_registry
345 .renew_process_lease(lease, RUNTIME_TURN_LEASE_TTL_MS)
346 .await
347 {
348 Ok(renewed) => renewed,
349 Err(err) => {
350 tracing::warn!(
351 process_id = %process_id,
352 error = %err,
353 "lost process lease before terminal write; deferring to the new owner",
354 );
355 return;
356 }
357 };
358 if let Err(err) = self
359 .config
360 .process_registry
361 .complete_process(process_id, output)
362 .await
363 {
364 tracing::warn!(
365 process_id = %process_id,
366 error = %err,
367 "failed to write recovered process terminal outcome",
368 );
369 }
370 self.release_or_log(&fenced).await;
371 }
372
373 async fn release_or_log(&self, lease: &ProcessLease) {
374 if let Err(err) = self.release_process_lease(lease).await {
375 tracing::warn!(
376 process_id = %lease.process_id,
377 error = %err,
378 "failed to release recovered process lease",
379 );
380 }
381 }
382
383 async fn run_process_with_lease_renewal(
387 &self,
388 registration: ProcessRegistration,
389 execution_context: ProcessExecutionContext,
390 mut lease: ProcessLease,
391 ) -> Result<ProcessAwaitOutput, RecoverFailure> {
392 let process_id = registration.id.clone();
393 let cancellation = CancellationToken::new();
394 let cancel_watcher = {
395 let registry = Arc::clone(&self.config.process_registry);
396 let process_id = process_id.clone();
397 let cancellation = cancellation.clone();
398 tokio::spawn(async move {
399 match registry
400 .wait_event_after(&process_id, "process.cancel_requested", 0)
401 .await
402 {
403 Ok(_) => cancellation.cancel(),
404 Err(err) => tracing::warn!(
405 process_id = %process_id,
406 error = %err,
407 "process cancel watcher stopped before observing cancellation",
408 ),
409 }
410 })
411 };
412 let pending = self.run_process(registration, execution_context, cancellation.clone());
413 tokio::pin!(pending);
414 loop {
415 tokio::select! {
416 outcome = &mut pending => {
417 cancel_watcher.abort();
418 return outcome.map_err(RecoverFailure::Run);
419 }
420 _ = tokio::time::sleep(process_lease_renew_interval()) => {
421 match self
422 .config
423 .process_registry
424 .renew_process_lease(&lease, RUNTIME_TURN_LEASE_TTL_MS)
425 .await
426 {
427 Ok(renewed) => lease = renewed,
428 Err(err) => {
429 cancellation.cancel();
430 cancel_watcher.abort();
431 return Err(RecoverFailure::LeaseLost(err));
432 }
433 }
434 }
435 }
436 }
437 }
438
439 async fn release_process_lease(&self, lease: &ProcessLease) -> Result<(), PluginError> {
440 self.config
441 .process_registry
442 .complete_process_lease(&ProcessLeaseCompletion::from_lease(lease))
443 .await
444 }
445
446 pub async fn request_process_cancel(
447 &self,
448 process_id: &str,
449 reason: Option<String>,
450 ) -> Result<(), PluginError> {
451 self.config
452 .process_registry
453 .append_event(
454 process_id,
455 crate::ProcessEventAppendRequest::cancel_requested(process_id, reason),
456 )
457 .await
458 .map(|_| ())
459 }
460
461 async fn runtime_for_registration(
462 &self,
463 registration: &ProcessRegistration,
464 ) -> Result<LashRuntime, PluginError> {
465 match registration.input.as_ref() {
466 ProcessInput::SessionTurn { create_request, .. } => {
467 self.runtime_for_session_turn(registration, create_request.as_ref())
468 .await
469 }
470 ProcessInput::ToolCall { .. } | ProcessInput::LashlangProcess { .. } => {
471 self.runtime_for_process_env(registration).await
472 }
473 ProcessInput::External { .. } => unreachable!("external processes short-circuit"),
474 }
475 }
476
477 async fn runtime_for_session_turn(
478 &self,
479 registration: &ProcessRegistration,
480 create_request: &crate::SessionCreateRequest,
481 ) -> Result<LashRuntime, PluginError> {
482 let mut policy = create_request
483 .policy
484 .clone()
485 .unwrap_or_else(|| self.config.session_policy.clone());
486 if policy.recorded_provider_id().is_empty() {
487 policy.provider_id = self.config.session_policy.provider_id.clone();
488 }
489 self.build_ephemeral_runtime(
490 format!("process-session-turn:{}", registration.id),
491 policy,
492 create_request.plugin_options.clone(),
493 "session turn request",
494 )
495 .await
496 }
497
498 async fn runtime_for_process_env(
499 &self,
500 registration: &ProcessRegistration,
501 ) -> Result<LashRuntime, PluginError> {
502 let Some(env_ref) = registration.env_ref.as_ref() else {
503 return Err(PluginError::Session(format!(
504 "process `{}` is missing a captured execution env",
505 registration.id
506 )));
507 };
508 let env = crate::load_process_execution_env(
509 self.config
510 .runtime_host
511 .durability
512 .lashlang_artifact_store
513 .as_ref(),
514 env_ref,
515 )
516 .await?;
517 self.build_ephemeral_runtime(
518 format!("process-env:{}", registration.id),
519 env.policy,
520 env.plugin_options,
521 env_ref.as_str(),
522 )
523 .await
524 }
525
526 async fn build_ephemeral_runtime(
527 &self,
528 session_id: String,
529 policy: crate::SessionPolicy,
530 plugin_options: crate::PluginOptions,
531 source_label: &str,
532 ) -> Result<LashRuntime, PluginError> {
533 let store = Arc::new(InMemorySessionStore::default());
534 EmbeddedRuntimeBuilder::new()
535 .with_session_id(session_id.to_string())
536 .with_plugin_host(self.config.plugin_host.as_ref().clone())
537 .with_runtime_host(self.config.runtime_host.clone())
538 .with_policy(policy)
539 .with_plugin_options(plugin_options)
540 .with_session_store_factory(Arc::clone(&self.config.session_store_factory))
541 .with_trigger_store(Arc::clone(&self.config.trigger_store))
542 .with_process_registry(Arc::clone(&self.config.process_registry))
543 .with_residency(self.config.residency)
544 .with_store(store)
545 .build()
546 .await
547 .map_err(|err| {
548 PluginError::Session(format!(
549 "failed to build process worker runtime for {source_label}: {err}"
550 ))
551 })
552 }
553
554 fn ensure_durable_store_facets(&self) -> Result<(), PluginError> {
563 if self
564 .config
565 .runtime_host
566 .control
567 .effect_host
568 .durability_tier()
569 != crate::DurabilityTier::Durable
570 {
571 return Ok(());
572 }
573 let require = |facet: crate::DurableStoreFacet| {
574 PluginError::Session(crate::RuntimeError::durable_store_required(facet).to_string())
575 };
576 if self
577 .config
578 .runtime_host
579 .durability
580 .attachment_store
581 .persistence()
582 .durability_tier()
583 != crate::DurabilityTier::Durable
584 {
585 return Err(require(crate::DurableStoreFacet::AttachmentStore));
586 }
587 if self
588 .config
589 .runtime_host
590 .durability
591 .lashlang_artifact_store
592 .durability_tier()
593 != crate::DurabilityTier::Durable
594 {
595 return Err(require(crate::DurableStoreFacet::ArtifactStore));
596 }
597 if self.config.session_store_factory.durability_tier() != crate::DurabilityTier::Durable {
598 return Err(require(crate::DurableStoreFacet::SessionStore));
599 }
600 if self.config.process_registry.durability_tier() != crate::DurabilityTier::Durable {
601 return Err(require(crate::DurableStoreFacet::ProcessRegistry));
602 }
603 if self.config.trigger_store.durability_tier() != crate::DurabilityTier::Durable {
604 return Err(require(crate::DurableStoreFacet::TriggerStore));
605 }
606 Ok(())
607 }
608
609 fn ensure_stable_process_id(
617 &self,
618 registration: &ProcessRegistration,
619 ) -> Result<(), PluginError> {
620 if registration.id.trim().is_empty() {
621 return Err(PluginError::Session(
622 crate::RuntimeError::missing_process_execution_id().to_string(),
623 ));
624 }
625 Ok(())
626 }
627
628 fn ensure_host_profile_matches(
629 &self,
630 registration: &ProcessRegistration,
631 ) -> Result<(), PluginError> {
632 let actual = registration.provenance.host_profile_id.as_str();
633 let expected = self.config.runtime_host.profile.host_profile_id.as_str();
634 if actual.is_empty() || actual == expected {
635 return Ok(());
636 }
637 Err(PluginError::Session(format!(
638 "process `{}` was created for host profile `{actual}` but this worker is `{expected}`",
639 registration.id
640 )))
641 }
642}
643
644fn process_lease_renew_interval() -> Duration {
645 Duration::from_millis(process_lease_renew_interval_ms())
646}
647
648#[cfg(test)]
649fn process_lease_renew_interval_ms() -> u64 {
650 25
651}
652
653#[cfg(not(test))]
654fn process_lease_renew_interval_ms() -> u64 {
655 30_000
656}
657
658#[cfg(test)]
659mod boundary_tests {
660 use super::*;
661 use crate::{
662 AttachmentStore, AttachmentStoreError, AttachmentStorePersistence, DurabilityTier,
663 DurableStoreFacet, InMemoryAttachmentStore, LashlangArtifactStore, ProcessInput,
664 ProcessRegistration, RuntimeEffectController, RuntimeError, StoredAttachment, TriggerStore,
665 };
666 use lash_sansio::{AttachmentCreateMeta, AttachmentId, AttachmentRef};
667
668 #[derive(Default)]
671 struct DurableController;
672
673 #[async_trait::async_trait]
674 impl RuntimeEffectController for DurableController {
675 fn durability_tier(&self) -> DurabilityTier {
676 DurabilityTier::Durable
677 }
678
679 async fn execute_effect(
680 &self,
681 _envelope: crate::RuntimeEffectEnvelope,
682 _local_executor: crate::RuntimeEffectLocalExecutor<'_>,
683 ) -> Result<crate::RuntimeEffectOutcome, crate::RuntimeEffectControllerError> {
684 unreachable!("worker boundary rejects before executing any effect")
685 }
686 }
687
688 #[derive(Default)]
690 struct DurableAttachmentStore {
691 inner: InMemoryAttachmentStore,
692 }
693
694 #[async_trait::async_trait]
695 impl AttachmentStore for DurableAttachmentStore {
696 fn persistence(&self) -> AttachmentStorePersistence {
697 AttachmentStorePersistence::Durable
698 }
699
700 async fn put(
701 &self,
702 bytes: Vec<u8>,
703 meta: AttachmentCreateMeta,
704 ) -> Result<AttachmentRef, AttachmentStoreError> {
705 self.inner.put(bytes, meta).await
706 }
707
708 async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
709 self.inner.get(id).await
710 }
711 }
712
713 #[derive(Default)]
715 struct DurableArtifactStore {
716 inner: lashlang::InMemoryLashlangArtifactStore,
717 }
718
719 #[async_trait::async_trait]
720 impl LashlangArtifactStore for DurableArtifactStore {
721 fn durability_tier(&self) -> DurabilityTier {
722 DurabilityTier::Durable
723 }
724
725 async fn put_module_artifact(
726 &self,
727 artifact: &lashlang::ModuleArtifact,
728 ) -> Result<(), lashlang::ArtifactStoreError> {
729 self.inner.put_module_artifact(artifact).await
730 }
731
732 async fn get_module_artifact(
733 &self,
734 module_ref: &lashlang::ModuleRef,
735 ) -> Result<Option<Arc<lashlang::ModuleArtifact>>, lashlang::ArtifactStoreError> {
736 self.inner.get_module_artifact(module_ref).await
737 }
738
739 async fn put_artifact_bytes(
740 &self,
741 artifact_ref: &str,
742 descriptor: &str,
743 bytes: &[u8],
744 ) -> Result<(), lashlang::ArtifactStoreError> {
745 self.inner
746 .put_artifact_bytes(artifact_ref, descriptor, bytes)
747 .await
748 }
749
750 async fn get_artifact_bytes(
751 &self,
752 artifact_ref: &str,
753 ) -> Result<Option<Vec<u8>>, lashlang::ArtifactStoreError> {
754 self.inner.get_artifact_bytes(artifact_ref).await
755 }
756 }
757
758 struct TierSessionStoreFactory {
761 tier: DurabilityTier,
762 }
763
764 #[async_trait::async_trait]
765 impl SessionStoreFactory for TierSessionStoreFactory {
766 fn durability_tier(&self) -> DurabilityTier {
767 self.tier
768 }
769
770 async fn create_store(
771 &self,
772 _request: &crate::SessionStoreCreateRequest,
773 ) -> Result<Arc<dyn crate::RuntimePersistence>, String> {
774 unreachable!("worker boundary rejects before creating a session store")
775 }
776
777 async fn delete_session(&self, _session_id: &str) -> Result<(), String> {
778 Ok(())
779 }
780 }
781
782 struct TierTriggerStore {
783 tier: DurabilityTier,
784 inner: crate::InMemoryTriggerStore,
785 }
786
787 impl TierTriggerStore {
788 fn new(tier: DurabilityTier) -> Self {
789 Self {
790 tier,
791 inner: crate::InMemoryTriggerStore::default(),
792 }
793 }
794 }
795
796 #[async_trait::async_trait]
797 impl TriggerStore for TierTriggerStore {
798 fn durability_tier(&self) -> DurabilityTier {
799 self.tier
800 }
801
802 async fn register_subscription(
803 &self,
804 draft: crate::TriggerSubscriptionDraft,
805 ) -> Result<crate::TriggerSubscriptionRecord, PluginError> {
806 self.inner.register_subscription(draft).await
807 }
808
809 async fn list_subscriptions(
810 &self,
811 filter: crate::TriggerSubscriptionFilter,
812 ) -> Result<Vec<crate::TriggerSubscriptionRecord>, PluginError> {
813 self.inner.list_subscriptions(filter).await
814 }
815
816 async fn cancel_subscription(
817 &self,
818 session_id: &str,
819 handle: &str,
820 ) -> Result<bool, PluginError> {
821 self.inner.cancel_subscription(session_id, handle).await
822 }
823
824 async fn delete_session_subscriptions(
825 &self,
826 session_id: &str,
827 ) -> Result<usize, PluginError> {
828 self.inner.delete_session_subscriptions(session_id).await
829 }
830
831 async fn record_occurrence(
832 &self,
833 request: crate::TriggerOccurrenceRequest,
834 ) -> Result<crate::TriggerOccurrenceRecord, PluginError> {
835 self.inner.record_occurrence(request).await
836 }
837
838 async fn reserve_matching_deliveries(
839 &self,
840 occurrence_id: &str,
841 ) -> Result<Vec<crate::TriggerDeliveryReservation>, PluginError> {
842 self.inner.reserve_matching_deliveries(occurrence_id).await
843 }
844 }
845
846 fn worker(
850 attachment: Arc<dyn AttachmentStore>,
851 artifact: Arc<dyn LashlangArtifactStore>,
852 session_store_tier: DurabilityTier,
853 ) -> DurableProcessWorker {
854 worker_with_store_tiers(
855 attachment,
856 artifact,
857 session_store_tier,
858 DurabilityTier::Durable,
859 DurabilityTier::Durable,
860 )
861 }
862
863 fn worker_with_store_tiers(
864 attachment: Arc<dyn AttachmentStore>,
865 artifact: Arc<dyn LashlangArtifactStore>,
866 session_store_tier: DurabilityTier,
867 process_registry_tier: DurabilityTier,
868 trigger_store_tier: DurabilityTier,
869 ) -> DurableProcessWorker {
870 let mut runtime_host = RuntimeHostConfig::in_memory();
871 runtime_host.control.effect_host =
872 Arc::new(crate::InlineEffectHost::new(Arc::new(DurableController)));
873 runtime_host.durability.attachment_store = attachment;
874 runtime_host.durability.lashlang_artifact_store = artifact;
875 let plugin_host = Arc::new(crate::PluginHost::new(Vec::new()));
876 let factory: Arc<dyn SessionStoreFactory> = Arc::new(TierSessionStoreFactory {
877 tier: session_store_tier,
878 });
879 let registry: Arc<dyn ProcessRegistry> = Arc::new(
880 crate::TestLocalProcessRegistry::default().with_durability_tier(process_registry_tier),
881 );
882 let trigger_store: Arc<dyn TriggerStore> =
883 Arc::new(TierTriggerStore::new(trigger_store_tier));
884 DurableProcessWorker::new(
885 DurableProcessWorkerConfig::new(plugin_host, runtime_host, factory, registry)
886 .with_trigger_store(trigger_store),
887 )
888 }
889
890 fn external_registration() -> ProcessRegistration {
891 ProcessRegistration::new(
892 "worker-boundary-process",
893 ProcessInput::External {
894 metadata: serde_json::json!({}),
895 },
896 crate::ProcessProvenance::host("default"),
897 )
898 }
899
900 async fn run(worker: &DurableProcessWorker) -> Result<ProcessAwaitOutput, PluginError> {
901 worker
902 .run_process(
903 external_registration(),
904 ProcessExecutionContext::default(),
905 CancellationToken::new(),
906 )
907 .await
908 }
909
910 fn assert_facet(err: PluginError, facet: DurableStoreFacet) {
911 let PluginError::Session(message) = err else {
912 panic!("expected PluginError::Session, got {err:?}");
913 };
914 let expected = RuntimeError::durable_store_required(facet).to_string();
915 assert_eq!(message, expected, "worker must reject the {facet:?} facet");
916 }
917
918 #[tokio::test]
919 async fn durable_worker_rejects_ephemeral_attachment_store() {
920 let worker = worker(
921 Arc::new(InMemoryAttachmentStore::new()),
922 Arc::new(DurableArtifactStore::default()),
923 DurabilityTier::Durable,
924 );
925 let err = run(&worker)
926 .await
927 .expect_err("ephemeral attachment store must be rejected at the worker boundary");
928 assert_facet(err, DurableStoreFacet::AttachmentStore);
929 }
930
931 #[tokio::test]
932 async fn durable_worker_rejects_ephemeral_artifact_store() {
933 let worker = worker(
934 Arc::new(DurableAttachmentStore::default()),
935 Arc::new(lashlang::InMemoryLashlangArtifactStore::new()),
936 DurabilityTier::Durable,
937 );
938 let err = run(&worker)
939 .await
940 .expect_err("ephemeral artifact store must be rejected at the worker boundary");
941 assert_facet(err, DurableStoreFacet::ArtifactStore);
942 }
943
944 #[tokio::test]
945 async fn durable_worker_rejects_ephemeral_session_store_factory() {
946 let worker = worker(
947 Arc::new(DurableAttachmentStore::default()),
948 Arc::new(DurableArtifactStore::default()),
949 DurabilityTier::Inline,
950 );
951 let err = run(&worker)
952 .await
953 .expect_err("ephemeral session store factory must be rejected at the worker boundary");
954 assert_facet(err, DurableStoreFacet::SessionStore);
955 }
956
957 #[tokio::test]
958 async fn durable_worker_rejects_ephemeral_process_registry() {
959 let worker = worker_with_store_tiers(
960 Arc::new(DurableAttachmentStore::default()),
961 Arc::new(DurableArtifactStore::default()),
962 DurabilityTier::Durable,
963 DurabilityTier::Inline,
964 DurabilityTier::Durable,
965 );
966 let err = run(&worker)
967 .await
968 .expect_err("ephemeral process registry must be rejected at the worker boundary");
969 assert_facet(err, DurableStoreFacet::ProcessRegistry);
970 }
971
972 #[tokio::test]
973 async fn durable_worker_rejects_ephemeral_trigger_store() {
974 let worker = worker_with_store_tiers(
975 Arc::new(DurableAttachmentStore::default()),
976 Arc::new(DurableArtifactStore::default()),
977 DurabilityTier::Durable,
978 DurabilityTier::Durable,
979 DurabilityTier::Inline,
980 );
981 let err = run(&worker)
982 .await
983 .expect_err("ephemeral trigger store must be rejected at the worker boundary");
984 assert_facet(err, DurableStoreFacet::TriggerStore);
985 }
986
987 #[tokio::test]
988 async fn durable_worker_with_all_durable_stores_passes_store_facet_check() {
989 let worker = worker(
993 Arc::new(DurableAttachmentStore::default()),
994 Arc::new(DurableArtifactStore::default()),
995 DurabilityTier::Durable,
996 );
997 let output = run(&worker)
998 .await
999 .expect("all-durable worker should pass the store-facet guard");
1000 assert!(matches!(output, ProcessAwaitOutput::Success { .. }));
1001 }
1002
1003 #[tokio::test]
1004 async fn inline_worker_passes_store_facet_check_with_ephemeral_stores() {
1005 let runtime_host = RuntimeHostConfig::in_memory();
1008 let plugin_host = Arc::new(crate::PluginHost::new(Vec::new()));
1009 let factory: Arc<dyn SessionStoreFactory> = Arc::new(TierSessionStoreFactory {
1010 tier: DurabilityTier::Inline,
1011 });
1012 let registry: Arc<dyn ProcessRegistry> =
1013 Arc::new(crate::TestLocalProcessRegistry::default());
1014 let worker = DurableProcessWorker::new(DurableProcessWorkerConfig::new(
1015 plugin_host,
1016 runtime_host,
1017 factory,
1018 registry,
1019 ));
1020 let output = run(&worker)
1021 .await
1022 .expect("inline worker should pass the store-facet guard");
1023 assert!(matches!(output, ProcessAwaitOutput::Success { .. }));
1024 }
1025}