1use std::sync::Arc;
2use std::time::Duration;
3
4use tokio_util::sync::CancellationToken;
5
6use super::effect::ProcessRunner;
7use super::session_manager::RuntimeSessionServices;
8use super::{EmbeddedRuntimeBuilder, RUNTIME_TURN_LEASE_TTL_MS, RuntimeHostConfig};
9use crate::{
10 LashRuntime, PluginError, PluginFactory, PluginHost, PluginStack, ProcessAwaitOutput,
11 ProcessExecutionContext, ProcessInput, ProcessLease, ProcessLeaseCompletion, ProcessRecord,
12 ProcessRegistration, ProcessRegistry, SessionStoreCreateRequest, SessionStoreFactory,
13};
14
15#[derive(Clone)]
21pub struct DurableProcessWorkerConfig {
22 pub plugin_host: Arc<PluginHost>,
23 pub runtime_host: RuntimeHostConfig,
24 pub session_policy: crate::SessionPolicy,
25 pub session_store_factory: Arc<dyn SessionStoreFactory>,
26 pub process_registry: Arc<dyn ProcessRegistry>,
27 pub host_event_store: Arc<dyn crate::HostEventStore>,
28 pub residency: crate::Residency,
34}
35
36impl DurableProcessWorkerConfig {
37 pub fn new(
38 plugin_host: Arc<PluginHost>,
39 runtime_host: RuntimeHostConfig,
40 session_store_factory: Arc<dyn SessionStoreFactory>,
41 process_registry: Arc<dyn ProcessRegistry>,
42 ) -> Self {
43 Self {
44 plugin_host,
45 runtime_host,
46 session_policy: crate::SessionPolicy::default(),
47 session_store_factory,
48 process_registry,
49 host_event_store: Arc::new(crate::InMemoryHostEventStore::default()),
50 residency: crate::Residency::default(),
51 }
52 }
53
54 pub fn with_host_event_store(mut self, store: Arc<dyn crate::HostEventStore>) -> Self {
55 self.host_event_store = store;
56 self
57 }
58
59 pub fn with_session_policy(mut self, policy: crate::SessionPolicy) -> Self {
60 self.session_policy = policy;
61 self
62 }
63
64 pub fn with_residency(mut self, residency: crate::Residency) -> Self {
65 self.residency = residency;
66 self
67 }
68
69 pub fn from_plugin_factories(
70 plugin_factories: impl IntoIterator<Item = Arc<dyn PluginFactory>>,
71 runtime_host: RuntimeHostConfig,
72 session_store_factory: Arc<dyn SessionStoreFactory>,
73 process_registry: Arc<dyn ProcessRegistry>,
74 ) -> Self {
75 Self::new(
76 Arc::new(PluginHost::new(plugin_factories.into_iter().collect())),
77 runtime_host,
78 session_store_factory,
79 process_registry,
80 )
81 }
82
83 pub fn from_plugin_stack(
84 plugin_stack: PluginStack,
85 runtime_host: RuntimeHostConfig,
86 session_store_factory: Arc<dyn SessionStoreFactory>,
87 process_registry: Arc<dyn ProcessRegistry>,
88 ) -> Self {
89 Self::from_plugin_factories(
90 plugin_stack.into_factories(),
91 runtime_host,
92 session_store_factory,
93 process_registry,
94 )
95 }
96}
97
98#[derive(Clone)]
100pub struct DurableProcessWorker {
101 config: Arc<DurableProcessWorkerConfig>,
102}
103
104enum RecoverFailure {
106 LeaseLost(PluginError),
110 Run(PluginError),
113}
114
115impl DurableProcessWorker {
116 pub fn new(config: DurableProcessWorkerConfig) -> Self {
117 Self {
118 config: Arc::new(config),
119 }
120 }
121
122 pub fn from_shared_config(config: Arc<DurableProcessWorkerConfig>) -> Self {
123 Self { config }
124 }
125
126 pub fn config(&self) -> &DurableProcessWorkerConfig {
127 &self.config
128 }
129
130 pub async fn run_process(
131 &self,
132 registration: ProcessRegistration,
133 execution_context: ProcessExecutionContext,
134 cancellation: CancellationToken,
135 ) -> Result<ProcessAwaitOutput, PluginError> {
136 let scoped_effect_controller = self
137 .config
138 .runtime_host
139 .control
140 .effect_host
141 .scoped_static(crate::EffectScope::process(registration.id.clone()))
142 .map_err(|err| PluginError::Session(err.to_string()))?
143 .ok_or_else(|| {
144 PluginError::Session(
145 "process worker effect host must provide a static process scope".to_string(),
146 )
147 })?;
148 self.run_process_with_scoped_effect_controller(
149 registration,
150 execution_context,
151 scoped_effect_controller,
152 cancellation,
153 )
154 .await
155 }
156
157 pub async fn run_process_with_scoped_effect_controller(
158 &self,
159 registration: ProcessRegistration,
160 execution_context: ProcessExecutionContext,
161 scoped_effect_controller: crate::ScopedEffectController<'_>,
162 cancellation: CancellationToken,
163 ) -> Result<ProcessAwaitOutput, PluginError> {
164 self.ensure_stable_process_id(®istration)?;
165 self.ensure_host_profile_matches(®istration)?;
166 self.ensure_durable_store_facets()?;
167 if let ProcessInput::External { metadata } = registration.input.as_ref() {
168 return Ok(ProcessAwaitOutput::Success {
169 value: serde_json::json!({ "metadata": metadata.clone() }),
170 control: None,
171 });
172 }
173 let session_id = registration.provenance.owner_scope.session_id.as_str();
174 if session_id.is_empty() {
175 return Err(PluginError::Session(format!(
176 "process `{}` is missing a structured owner scope",
177 registration.id
178 )));
179 }
180 let runtime = self.rebuild_runtime(session_id).await?;
181 let manager = RuntimeSessionServices::new(&runtime, true, None).map_err(|err| {
182 PluginError::Session(format!(
183 "failed to rebuild runtime session `{session_id}` for process `{}`: {err}",
184 registration.id
185 ))
186 })?;
187 Ok(manager
188 .run_process(
189 registration,
190 execution_context,
191 Arc::clone(&self.config.process_registry),
192 scoped_effect_controller,
193 cancellation,
194 )
195 .await)
196 }
197
198 pub async fn drive_pending_processes(&self) -> Result<(), PluginError> {
221 let records = self.config.process_registry.list_non_terminal().await?;
222 for record in records {
223 let worker = self.clone();
234 tokio::spawn(async move { worker.recover_process(record).await });
235 }
236 Ok(())
237 }
238
239 async fn recover_process(&self, record: ProcessRecord) {
240 let owner_id = format!("process-recovery-{}", uuid::Uuid::new_v4());
241 let process_id = record.id.clone();
242 let Ok(lease) = self
246 .config
247 .process_registry
248 .claim_process_lease(&process_id, &owner_id, RUNTIME_TURN_LEASE_TTL_MS)
249 .await
250 else {
251 return;
252 };
253 if self
256 .config
257 .process_registry
258 .get_process(&process_id)
259 .await
260 .is_some_and(|current| current.is_terminal())
261 {
262 self.release_or_log(&lease).await;
263 return;
264 }
265 let registration = ProcessRegistration {
266 id: record.id,
267 input: record.input,
268 event_types: record.event_types,
269 provenance: record.provenance.clone(),
270 };
271 let execution_context = ProcessExecutionContext::default()
274 .with_wake_target_scope(record.provenance.owner_scope);
275 match self
276 .run_process_with_lease_renewal(registration, execution_context, lease.clone())
277 .await
278 {
279 Ok(output) => self.complete_and_release(&lease, &process_id, output).await,
282 Err(RecoverFailure::LeaseLost(err)) => {
288 tracing::warn!(
289 process_id = %process_id,
290 error = %err,
291 "process recovery lost its lease mid-run; deferring to the new owner",
292 );
293 }
294 Err(RecoverFailure::Run(err)) => {
297 let output = ProcessAwaitOutput::Failure {
298 class: crate::ToolFailureClass::Execution,
299 code: "process_recovery_failed".to_string(),
300 message: err.to_string(),
301 raw: None,
302 control: None,
303 };
304 self.complete_and_release(&lease, &process_id, output).await;
305 }
306 }
307 }
308
309 async fn complete_and_release(
313 &self,
314 lease: &ProcessLease,
315 process_id: &str,
316 output: ProcessAwaitOutput,
317 ) {
318 let fenced = match self
325 .config
326 .process_registry
327 .renew_process_lease(lease, RUNTIME_TURN_LEASE_TTL_MS)
328 .await
329 {
330 Ok(renewed) => renewed,
331 Err(err) => {
332 tracing::warn!(
333 process_id = %process_id,
334 error = %err,
335 "lost process lease before terminal write; deferring to the new owner",
336 );
337 return;
338 }
339 };
340 if let Err(err) = self
341 .config
342 .process_registry
343 .complete_process(process_id, output)
344 .await
345 {
346 tracing::warn!(
347 process_id = %process_id,
348 error = %err,
349 "failed to write recovered process terminal outcome",
350 );
351 }
352 self.release_or_log(&fenced).await;
353 }
354
355 async fn release_or_log(&self, lease: &ProcessLease) {
356 if let Err(err) = self.release_process_lease(lease).await {
357 tracing::warn!(
358 process_id = %lease.process_id,
359 error = %err,
360 "failed to release recovered process lease",
361 );
362 }
363 }
364
365 async fn run_process_with_lease_renewal(
369 &self,
370 registration: ProcessRegistration,
371 execution_context: ProcessExecutionContext,
372 mut lease: ProcessLease,
373 ) -> Result<ProcessAwaitOutput, RecoverFailure> {
374 let process_id = registration.id.clone();
375 let cancellation = CancellationToken::new();
376 let cancel_watcher = {
377 let registry = Arc::clone(&self.config.process_registry);
378 let process_id = process_id.clone();
379 let cancellation = cancellation.clone();
380 tokio::spawn(async move {
381 match registry
382 .wait_event_after(&process_id, "process.cancel_requested", 0)
383 .await
384 {
385 Ok(_) => cancellation.cancel(),
386 Err(err) => tracing::warn!(
387 process_id = %process_id,
388 error = %err,
389 "process cancel watcher stopped before observing cancellation",
390 ),
391 }
392 })
393 };
394 let pending = self.run_process(registration, execution_context, cancellation.clone());
395 tokio::pin!(pending);
396 loop {
397 tokio::select! {
398 outcome = &mut pending => {
399 cancel_watcher.abort();
400 return outcome.map_err(RecoverFailure::Run);
401 }
402 _ = tokio::time::sleep(process_lease_renew_interval()) => {
403 match self
404 .config
405 .process_registry
406 .renew_process_lease(&lease, RUNTIME_TURN_LEASE_TTL_MS)
407 .await
408 {
409 Ok(renewed) => lease = renewed,
410 Err(err) => {
411 cancellation.cancel();
412 cancel_watcher.abort();
413 return Err(RecoverFailure::LeaseLost(err));
414 }
415 }
416 }
417 }
418 }
419 }
420
421 async fn release_process_lease(&self, lease: &ProcessLease) -> Result<(), PluginError> {
422 self.config
423 .process_registry
424 .complete_process_lease(&ProcessLeaseCompletion::from_lease(lease))
425 .await
426 }
427
428 pub async fn request_process_cancel(
429 &self,
430 process_id: &str,
431 reason: Option<String>,
432 ) -> Result<(), PluginError> {
433 self.config
434 .process_registry
435 .append_event(
436 process_id,
437 crate::ProcessEventAppendRequest::cancel_requested(process_id, reason),
438 )
439 .await
440 .map(|_| ())
441 }
442
443 async fn rebuild_runtime(&self, session_id: &str) -> Result<LashRuntime, PluginError> {
444 let store = self
445 .config
446 .session_store_factory
447 .create_store(&SessionStoreCreateRequest {
448 session_id: session_id.to_string(),
449 relation: crate::SessionRelation::Root,
450 policy: self.config.session_policy.clone(),
451 })
452 .await
453 .map_err(|err| {
454 PluginError::Session(format!(
455 "failed to open session store for process worker session `{session_id}`: {err}"
456 ))
457 })?;
458 EmbeddedRuntimeBuilder::new()
459 .with_session_id(session_id.to_string())
460 .with_plugin_host(self.config.plugin_host.as_ref().clone())
461 .with_runtime_host(self.config.runtime_host.clone())
462 .with_policy(self.config.session_policy.clone())
463 .with_session_store_factory(Arc::clone(&self.config.session_store_factory))
464 .with_host_event_store(Arc::clone(&self.config.host_event_store))
465 .with_process_registry(Arc::clone(&self.config.process_registry))
466 .with_residency(self.config.residency)
467 .with_store(store)
468 .build()
469 .await
470 .map_err(|err| {
471 PluginError::Session(format!(
472 "failed to rebuild process worker runtime for session `{session_id}`: {err}"
473 ))
474 })
475 }
476
477 fn ensure_durable_store_facets(&self) -> Result<(), PluginError> {
486 if self
487 .config
488 .runtime_host
489 .control
490 .effect_host
491 .durability_tier()
492 != crate::DurabilityTier::Durable
493 {
494 return Ok(());
495 }
496 let require = |facet: crate::DurableStoreFacet| {
497 PluginError::Session(crate::RuntimeError::durable_store_required(facet).to_string())
498 };
499 if self
500 .config
501 .runtime_host
502 .durability
503 .attachment_store
504 .persistence()
505 .durability_tier()
506 != crate::DurabilityTier::Durable
507 {
508 return Err(require(crate::DurableStoreFacet::AttachmentStore));
509 }
510 if self
511 .config
512 .runtime_host
513 .durability
514 .lashlang_artifact_store
515 .durability_tier()
516 != crate::DurabilityTier::Durable
517 {
518 return Err(require(crate::DurableStoreFacet::ArtifactStore));
519 }
520 if self.config.session_store_factory.durability_tier() != crate::DurabilityTier::Durable {
521 return Err(require(crate::DurableStoreFacet::SessionStore));
522 }
523 if self.config.process_registry.durability_tier() != crate::DurabilityTier::Durable {
524 return Err(require(crate::DurableStoreFacet::ProcessRegistry));
525 }
526 if self.config.host_event_store.durability_tier() != crate::DurabilityTier::Durable {
527 return Err(require(crate::DurableStoreFacet::HostEventStore));
528 }
529 Ok(())
530 }
531
532 fn ensure_stable_process_id(
540 &self,
541 registration: &ProcessRegistration,
542 ) -> Result<(), PluginError> {
543 if registration.id.trim().is_empty() {
544 return Err(PluginError::Session(
545 crate::RuntimeError::missing_process_execution_id().to_string(),
546 ));
547 }
548 Ok(())
549 }
550
551 fn ensure_host_profile_matches(
552 &self,
553 registration: &ProcessRegistration,
554 ) -> Result<(), PluginError> {
555 let actual = registration.provenance.host_profile_id.as_str();
556 let expected = self.config.runtime_host.profile.host_profile_id.as_str();
557 if actual.is_empty() || actual == expected {
558 return Ok(());
559 }
560 Err(PluginError::Session(format!(
561 "process `{}` was created for host profile `{actual}` but this worker is `{expected}`",
562 registration.id
563 )))
564 }
565}
566
567fn process_lease_renew_interval() -> Duration {
568 Duration::from_millis(process_lease_renew_interval_ms())
569}
570
571#[cfg(test)]
572fn process_lease_renew_interval_ms() -> u64 {
573 25
574}
575
576#[cfg(not(test))]
577fn process_lease_renew_interval_ms() -> u64 {
578 30_000
579}
580
581#[cfg(test)]
582mod boundary_tests {
583 use super::*;
584 use crate::{
585 AttachmentStore, AttachmentStoreError, AttachmentStorePersistence, DurabilityTier,
586 DurableStoreFacet, HostEventStore, InMemoryAttachmentStore, LashlangArtifactStore,
587 ProcessInput, ProcessRegistration, RuntimeEffectController, RuntimeError, StoredAttachment,
588 };
589 use lash_sansio::{AttachmentCreateMeta, AttachmentId, AttachmentRef};
590
591 #[derive(Default)]
594 struct DurableController;
595
596 #[async_trait::async_trait]
597 impl RuntimeEffectController for DurableController {
598 fn durability_tier(&self) -> DurabilityTier {
599 DurabilityTier::Durable
600 }
601
602 async fn execute_effect(
603 &self,
604 _envelope: crate::RuntimeEffectEnvelope,
605 _local_executor: crate::RuntimeEffectLocalExecutor<'_>,
606 ) -> Result<crate::RuntimeEffectOutcome, crate::RuntimeEffectControllerError> {
607 unreachable!("worker boundary rejects before executing any effect")
608 }
609 }
610
611 #[derive(Default)]
613 struct DurableAttachmentStore {
614 inner: InMemoryAttachmentStore,
615 }
616
617 impl AttachmentStore for DurableAttachmentStore {
618 fn persistence(&self) -> AttachmentStorePersistence {
619 AttachmentStorePersistence::Durable
620 }
621
622 fn put(
623 &self,
624 bytes: Vec<u8>,
625 meta: AttachmentCreateMeta,
626 ) -> Result<AttachmentRef, AttachmentStoreError> {
627 self.inner.put(bytes, meta)
628 }
629
630 fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
631 self.inner.get(id)
632 }
633 }
634
635 #[derive(Default)]
637 struct DurableArtifactStore {
638 inner: lashlang::InMemoryLashlangArtifactStore,
639 }
640
641 #[async_trait::async_trait]
642 impl LashlangArtifactStore for DurableArtifactStore {
643 fn durability_tier(&self) -> DurabilityTier {
644 DurabilityTier::Durable
645 }
646
647 async fn put_module_artifact(
648 &self,
649 artifact: &lashlang::ModuleArtifact,
650 ) -> Result<(), lashlang::ArtifactStoreError> {
651 self.inner.put_module_artifact(artifact).await
652 }
653
654 async fn get_module_artifact(
655 &self,
656 module_ref: &lashlang::ModuleRef,
657 ) -> Result<Option<Arc<lashlang::ModuleArtifact>>, lashlang::ArtifactStoreError> {
658 self.inner.get_module_artifact(module_ref).await
659 }
660 }
661
662 struct TierSessionStoreFactory {
665 tier: DurabilityTier,
666 }
667
668 #[async_trait::async_trait]
669 impl SessionStoreFactory for TierSessionStoreFactory {
670 fn durability_tier(&self) -> DurabilityTier {
671 self.tier
672 }
673
674 async fn create_store(
675 &self,
676 _request: &crate::SessionStoreCreateRequest,
677 ) -> Result<Arc<dyn crate::RuntimePersistence>, String> {
678 unreachable!("worker boundary rejects before creating a session store")
679 }
680
681 async fn delete_session(&self, _session_id: &str) -> Result<(), String> {
682 Ok(())
683 }
684 }
685
686 struct TierHostEventStore {
687 tier: DurabilityTier,
688 inner: crate::InMemoryHostEventStore,
689 }
690
691 impl TierHostEventStore {
692 fn new(tier: DurabilityTier) -> Self {
693 Self {
694 tier,
695 inner: crate::InMemoryHostEventStore::default(),
696 }
697 }
698 }
699
700 #[async_trait::async_trait]
701 impl HostEventStore for TierHostEventStore {
702 fn durability_tier(&self) -> DurabilityTier {
703 self.tier
704 }
705
706 async fn register_subscription(
707 &self,
708 draft: crate::TriggerSubscriptionDraft,
709 ) -> Result<crate::TriggerSubscriptionRecord, PluginError> {
710 self.inner.register_subscription(draft).await
711 }
712
713 async fn list_subscriptions(
714 &self,
715 filter: crate::TriggerSubscriptionFilter,
716 ) -> Result<Vec<crate::TriggerSubscriptionRecord>, PluginError> {
717 self.inner.list_subscriptions(filter).await
718 }
719
720 async fn cancel_subscription(
721 &self,
722 session_id: &str,
723 handle: &str,
724 ) -> Result<bool, PluginError> {
725 self.inner.cancel_subscription(session_id, handle).await
726 }
727
728 async fn record_occurrence(
729 &self,
730 request: crate::HostEventOccurrenceRequest,
731 ) -> Result<crate::HostEventOccurrenceRecord, PluginError> {
732 self.inner.record_occurrence(request).await
733 }
734
735 async fn reserve_matching_deliveries(
736 &self,
737 occurrence_id: &str,
738 ) -> Result<Vec<crate::TriggerDeliveryReservation>, PluginError> {
739 self.inner.reserve_matching_deliveries(occurrence_id).await
740 }
741 }
742
743 fn worker(
747 attachment: Arc<dyn AttachmentStore>,
748 artifact: Arc<dyn LashlangArtifactStore>,
749 session_store_tier: DurabilityTier,
750 ) -> DurableProcessWorker {
751 worker_with_store_tiers(
752 attachment,
753 artifact,
754 session_store_tier,
755 DurabilityTier::Durable,
756 DurabilityTier::Durable,
757 )
758 }
759
760 fn worker_with_store_tiers(
761 attachment: Arc<dyn AttachmentStore>,
762 artifact: Arc<dyn LashlangArtifactStore>,
763 session_store_tier: DurabilityTier,
764 process_registry_tier: DurabilityTier,
765 host_event_store_tier: DurabilityTier,
766 ) -> DurableProcessWorker {
767 let mut runtime_host = RuntimeHostConfig::in_memory();
768 runtime_host.control.effect_host =
769 Arc::new(crate::InlineEffectHost::new(Arc::new(DurableController)));
770 runtime_host.durability.attachment_store = attachment;
771 runtime_host.durability.lashlang_artifact_store = artifact;
772 let plugin_host = Arc::new(crate::PluginHost::new(Vec::new()));
773 let factory: Arc<dyn SessionStoreFactory> = Arc::new(TierSessionStoreFactory {
774 tier: session_store_tier,
775 });
776 let registry: Arc<dyn ProcessRegistry> = Arc::new(
777 crate::TestLocalProcessRegistry::default().with_durability_tier(process_registry_tier),
778 );
779 let host_event_store: Arc<dyn HostEventStore> =
780 Arc::new(TierHostEventStore::new(host_event_store_tier));
781 DurableProcessWorker::new(
782 DurableProcessWorkerConfig::new(plugin_host, runtime_host, factory, registry)
783 .with_host_event_store(host_event_store),
784 )
785 }
786
787 fn external_registration() -> ProcessRegistration {
788 ProcessRegistration::new(
789 "worker-boundary-process",
790 ProcessInput::External {
791 metadata: serde_json::json!({}),
792 },
793 )
794 }
795
796 async fn run(worker: &DurableProcessWorker) -> Result<ProcessAwaitOutput, PluginError> {
797 worker
798 .run_process(
799 external_registration(),
800 ProcessExecutionContext::default(),
801 CancellationToken::new(),
802 )
803 .await
804 }
805
806 fn assert_facet(err: PluginError, facet: DurableStoreFacet) {
807 let PluginError::Session(message) = err else {
808 panic!("expected PluginError::Session, got {err:?}");
809 };
810 let expected = RuntimeError::durable_store_required(facet).to_string();
811 assert_eq!(message, expected, "worker must reject the {facet:?} facet");
812 }
813
814 #[tokio::test]
815 async fn durable_worker_rejects_ephemeral_attachment_store() {
816 let worker = worker(
817 Arc::new(InMemoryAttachmentStore::new()),
818 Arc::new(DurableArtifactStore::default()),
819 DurabilityTier::Durable,
820 );
821 let err = run(&worker)
822 .await
823 .expect_err("ephemeral attachment store must be rejected at the worker boundary");
824 assert_facet(err, DurableStoreFacet::AttachmentStore);
825 }
826
827 #[tokio::test]
828 async fn durable_worker_rejects_ephemeral_artifact_store() {
829 let worker = worker(
830 Arc::new(DurableAttachmentStore::default()),
831 Arc::new(lashlang::InMemoryLashlangArtifactStore::new()),
832 DurabilityTier::Durable,
833 );
834 let err = run(&worker)
835 .await
836 .expect_err("ephemeral artifact store must be rejected at the worker boundary");
837 assert_facet(err, DurableStoreFacet::ArtifactStore);
838 }
839
840 #[tokio::test]
841 async fn durable_worker_rejects_ephemeral_session_store_factory() {
842 let worker = worker(
843 Arc::new(DurableAttachmentStore::default()),
844 Arc::new(DurableArtifactStore::default()),
845 DurabilityTier::Inline,
846 );
847 let err = run(&worker)
848 .await
849 .expect_err("ephemeral session store factory must be rejected at the worker boundary");
850 assert_facet(err, DurableStoreFacet::SessionStore);
851 }
852
853 #[tokio::test]
854 async fn durable_worker_rejects_ephemeral_process_registry() {
855 let worker = worker_with_store_tiers(
856 Arc::new(DurableAttachmentStore::default()),
857 Arc::new(DurableArtifactStore::default()),
858 DurabilityTier::Durable,
859 DurabilityTier::Inline,
860 DurabilityTier::Durable,
861 );
862 let err = run(&worker)
863 .await
864 .expect_err("ephemeral process registry must be rejected at the worker boundary");
865 assert_facet(err, DurableStoreFacet::ProcessRegistry);
866 }
867
868 #[tokio::test]
869 async fn durable_worker_rejects_ephemeral_host_event_store() {
870 let worker = worker_with_store_tiers(
871 Arc::new(DurableAttachmentStore::default()),
872 Arc::new(DurableArtifactStore::default()),
873 DurabilityTier::Durable,
874 DurabilityTier::Durable,
875 DurabilityTier::Inline,
876 );
877 let err = run(&worker)
878 .await
879 .expect_err("ephemeral host event store must be rejected at the worker boundary");
880 assert_facet(err, DurableStoreFacet::HostEventStore);
881 }
882
883 #[tokio::test]
884 async fn durable_worker_with_all_durable_stores_passes_store_facet_check() {
885 let worker = worker(
889 Arc::new(DurableAttachmentStore::default()),
890 Arc::new(DurableArtifactStore::default()),
891 DurabilityTier::Durable,
892 );
893 let output = run(&worker)
894 .await
895 .expect("all-durable worker should pass the store-facet guard");
896 assert!(matches!(output, ProcessAwaitOutput::Success { .. }));
897 }
898
899 #[tokio::test]
900 async fn inline_worker_passes_store_facet_check_with_ephemeral_stores() {
901 let runtime_host = RuntimeHostConfig::in_memory();
904 let plugin_host = Arc::new(crate::PluginHost::new(Vec::new()));
905 let factory: Arc<dyn SessionStoreFactory> = Arc::new(TierSessionStoreFactory {
906 tier: DurabilityTier::Inline,
907 });
908 let registry: Arc<dyn ProcessRegistry> =
909 Arc::new(crate::TestLocalProcessRegistry::default());
910 let worker = DurableProcessWorker::new(DurableProcessWorkerConfig::new(
911 plugin_host,
912 runtime_host,
913 factory,
914 registry,
915 ));
916 let output = run(&worker)
917 .await
918 .expect("inline worker should pass the store-facet guard");
919 assert!(matches!(output, ProcessAwaitOutput::Success { .. }));
920 }
921}