1use std::sync::Arc;
2use std::time::Duration;
3
4use tokio_util::sync::CancellationToken;
5
6use super::effect::ProcessRunner;
7use super::session_manager::RuntimeSessionServices;
8use super::{EmbeddedRuntimeBuilder, RUNTIME_TURN_LEASE_TTL_MS, RuntimeHostConfig};
9use crate::{
10 InMemorySessionStore, LashRuntime, PluginError, PluginFactory, PluginHost, PluginStack,
11 ProcessAwaitOutput, ProcessExecutionContext, ProcessInput, ProcessLease,
12 ProcessLeaseCompletion, ProcessRecord, ProcessRegistration, ProcessRegistry,
13 SessionStoreFactory,
14};
15
16#[derive(Clone)]
22pub struct DurableProcessWorkerConfig {
23 pub plugin_host: Arc<PluginHost>,
24 pub runtime_host: RuntimeHostConfig,
25 pub session_policy: crate::SessionPolicy,
26 pub session_store_factory: Arc<dyn SessionStoreFactory>,
27 pub process_registry: Arc<dyn ProcessRegistry>,
28 pub trigger_store: Arc<dyn crate::TriggerStore>,
29 #[doc(hidden)]
30 pub turn_phase_probe_slot: crate::runtime::RuntimeTurnPhaseProbeSlot,
31 pub residency: crate::Residency,
37}
38
39impl DurableProcessWorkerConfig {
40 pub fn new(
41 plugin_host: Arc<PluginHost>,
42 runtime_host: RuntimeHostConfig,
43 session_store_factory: Arc<dyn SessionStoreFactory>,
44 process_registry: Arc<dyn ProcessRegistry>,
45 ) -> Self {
46 Self {
47 plugin_host,
48 runtime_host,
49 session_policy: crate::SessionPolicy::default(),
50 session_store_factory,
51 process_registry,
52 trigger_store: Arc::new(crate::InMemoryTriggerStore::default()),
53 turn_phase_probe_slot: crate::runtime::RuntimeTurnPhaseProbeSlot::default(),
54 residency: crate::Residency::default(),
55 }
56 }
57
58 pub fn with_trigger_store(mut self, store: Arc<dyn crate::TriggerStore>) -> Self {
59 self.trigger_store = store;
60 self
61 }
62
63 pub fn with_session_policy(mut self, policy: crate::SessionPolicy) -> Self {
64 self.session_policy = policy;
65 self
66 }
67
68 pub fn with_residency(mut self, residency: crate::Residency) -> Self {
69 self.residency = residency;
70 self
71 }
72
73 #[doc(hidden)]
74 pub fn with_turn_phase_probe_slot(
75 mut self,
76 slot: crate::runtime::RuntimeTurnPhaseProbeSlot,
77 ) -> Self {
78 self.turn_phase_probe_slot = slot;
79 self
80 }
81
82 pub fn from_plugin_factories(
83 plugin_factories: impl IntoIterator<Item = Arc<dyn PluginFactory>>,
84 runtime_host: RuntimeHostConfig,
85 session_store_factory: Arc<dyn SessionStoreFactory>,
86 process_registry: Arc<dyn ProcessRegistry>,
87 ) -> Self {
88 Self::new(
89 Arc::new(PluginHost::new(plugin_factories.into_iter().collect())),
90 runtime_host,
91 session_store_factory,
92 process_registry,
93 )
94 }
95
96 pub fn from_plugin_stack(
97 plugin_stack: PluginStack,
98 runtime_host: RuntimeHostConfig,
99 session_store_factory: Arc<dyn SessionStoreFactory>,
100 process_registry: Arc<dyn ProcessRegistry>,
101 ) -> Self {
102 Self::from_plugin_factories(
103 plugin_stack.into_factories(),
104 runtime_host,
105 session_store_factory,
106 process_registry,
107 )
108 }
109}
110
111#[derive(Clone)]
113pub struct DurableProcessWorker {
114 config: Arc<DurableProcessWorkerConfig>,
115}
116
117enum RecoverFailure {
119 LeaseLost(PluginError),
123 Run(PluginError),
126}
127
128impl DurableProcessWorker {
129 pub fn new(config: DurableProcessWorkerConfig) -> Self {
130 Self {
131 config: Arc::new(config),
132 }
133 }
134
135 pub fn from_shared_config(config: Arc<DurableProcessWorkerConfig>) -> Self {
136 Self { config }
137 }
138
139 pub fn config(&self) -> &DurableProcessWorkerConfig {
140 &self.config
141 }
142
143 pub async fn run_process(
144 &self,
145 registration: ProcessRegistration,
146 execution_context: ProcessExecutionContext,
147 cancellation: CancellationToken,
148 ) -> Result<ProcessAwaitOutput, PluginError> {
149 let scoped_effect_controller = self
150 .config
151 .runtime_host
152 .control
153 .effect_host
154 .scoped_static(crate::ExecutionScope::process(registration.id.clone()))
155 .map_err(|err| PluginError::Session(err.to_string()))?
156 .ok_or_else(|| {
157 PluginError::Session(
158 "process worker effect host must provide a static process scope".to_string(),
159 )
160 })?;
161 self.run_process_with_scoped_effect_controller(
162 registration,
163 execution_context,
164 scoped_effect_controller,
165 cancellation,
166 )
167 .await
168 }
169
170 pub async fn run_process_with_scoped_effect_controller(
171 &self,
172 registration: ProcessRegistration,
173 execution_context: ProcessExecutionContext,
174 scoped_effect_controller: crate::ScopedEffectController<'_>,
175 cancellation: CancellationToken,
176 ) -> Result<ProcessAwaitOutput, PluginError> {
177 self.ensure_stable_process_id(®istration)?;
178 self.ensure_durable_store_facets()?;
179 if let ProcessInput::External { metadata } = registration.input.as_ref() {
180 return Ok(ProcessAwaitOutput::Success {
181 value: serde_json::json!({ "metadata": metadata.clone() }),
182 control: None,
183 });
184 }
185 let mut runtime = self.runtime_for_registration(®istration).await?;
186 let originator_scope = if let crate::ProcessOriginator::Session { scope } =
187 ®istration.provenance.originator
188 {
189 Some(scope)
190 } else {
191 None
192 };
193 let probe_scope = registration.wake_target.as_ref().or(originator_scope);
194 if let Some(probe) =
195 probe_scope.and_then(|scope| self.config.turn_phase_probe_slot.get_for_scope(scope))
196 {
197 runtime.set_turn_phase_probe(probe);
198 }
199 let manager = RuntimeSessionServices::new(&runtime, true, None).map_err(|err| {
200 PluginError::Session(format!(
201 "failed to build runtime env for process `{}`: {err}",
202 registration.id
203 ))
204 })?;
205 Ok(manager
206 .run_process(
207 registration,
208 execution_context,
209 Arc::clone(&self.config.process_registry),
210 scoped_effect_controller,
211 cancellation,
212 )
213 .await)
214 }
215
216 pub async fn drive_pending_processes(&self) -> Result<(), PluginError> {
239 let records = self.config.process_registry.list_non_terminal().await?;
240 for record in records {
241 let worker = self.clone();
252 tokio::spawn(async move { worker.recover_process(record).await });
253 }
254 Ok(())
255 }
256
257 async fn recover_process(&self, record: ProcessRecord) {
258 let owner_id = format!("process-recovery-{}", uuid::Uuid::new_v4());
259 let process_id = record.id.clone();
260 let Ok(lease) = self
264 .config
265 .process_registry
266 .claim_process_lease(&process_id, &owner_id, RUNTIME_TURN_LEASE_TTL_MS)
267 .await
268 else {
269 return;
270 };
271 if self
274 .config
275 .process_registry
276 .get_process(&process_id)
277 .await
278 .is_some_and(|current| current.is_terminal())
279 {
280 self.release_or_log(&lease).await;
281 return;
282 }
283 let registration = ProcessRegistration {
284 id: record.id,
285 input: record.input,
286 identity: record.identity,
287 event_types: record.event_types,
288 provenance: record.provenance.clone(),
289 env_ref: record.env_ref.clone(),
290 wake_target: record.wake_target.clone(),
291 };
292 let execution_context = ProcessExecutionContext::default();
293 match self
294 .run_process_with_lease_renewal(registration, execution_context, lease.clone())
295 .await
296 {
297 Ok(output) => self.complete_and_release(&lease, &process_id, output).await,
300 Err(RecoverFailure::LeaseLost(err)) => {
306 tracing::warn!(
307 process_id = %process_id,
308 error = %err,
309 "process recovery lost its lease mid-run; deferring to the new owner",
310 );
311 }
312 Err(RecoverFailure::Run(err)) => {
315 let output = ProcessAwaitOutput::Failure {
316 class: crate::ToolFailureClass::Execution,
317 code: "process_recovery_failed".to_string(),
318 message: err.to_string(),
319 raw: None,
320 control: None,
321 };
322 self.complete_and_release(&lease, &process_id, output).await;
323 }
324 }
325 }
326
327 async fn complete_and_release(
331 &self,
332 lease: &ProcessLease,
333 process_id: &str,
334 output: ProcessAwaitOutput,
335 ) {
336 let fenced = match self
343 .config
344 .process_registry
345 .renew_process_lease(lease, RUNTIME_TURN_LEASE_TTL_MS)
346 .await
347 {
348 Ok(renewed) => renewed,
349 Err(err) => {
350 tracing::warn!(
351 process_id = %process_id,
352 error = %err,
353 "lost process lease before terminal write; deferring to the new owner",
354 );
355 return;
356 }
357 };
358 if let Err(err) = self
359 .config
360 .process_registry
361 .complete_process(process_id, output)
362 .await
363 {
364 tracing::warn!(
365 process_id = %process_id,
366 error = %err,
367 "failed to write recovered process terminal outcome",
368 );
369 }
370 self.release_or_log(&fenced).await;
371 }
372
373 async fn release_or_log(&self, lease: &ProcessLease) {
374 if let Err(err) = self.release_process_lease(lease).await {
375 tracing::warn!(
376 process_id = %lease.process_id,
377 error = %err,
378 "failed to release recovered process lease",
379 );
380 }
381 }
382
383 async fn run_process_with_lease_renewal(
387 &self,
388 registration: ProcessRegistration,
389 execution_context: ProcessExecutionContext,
390 mut lease: ProcessLease,
391 ) -> Result<ProcessAwaitOutput, RecoverFailure> {
392 let process_id = registration.id.clone();
393 let cancellation = CancellationToken::new();
394 let cancel_watcher = {
395 let registry = Arc::clone(&self.config.process_registry);
396 let process_id = process_id.clone();
397 let cancellation = cancellation.clone();
398 tokio::spawn(async move {
399 match registry
400 .wait_event_after(&process_id, "process.cancel_requested", 0)
401 .await
402 {
403 Ok(_) => cancellation.cancel(),
404 Err(err) => tracing::warn!(
405 process_id = %process_id,
406 error = %err,
407 "process cancel watcher stopped before observing cancellation",
408 ),
409 }
410 })
411 };
412 let pending = self.run_process(registration, execution_context, cancellation.clone());
413 tokio::pin!(pending);
414 loop {
415 tokio::select! {
416 outcome = &mut pending => {
417 cancel_watcher.abort();
418 return outcome.map_err(RecoverFailure::Run);
419 }
420 _ = tokio::time::sleep(process_lease_renew_interval()) => {
421 match self
422 .config
423 .process_registry
424 .renew_process_lease(&lease, RUNTIME_TURN_LEASE_TTL_MS)
425 .await
426 {
427 Ok(renewed) => lease = renewed,
428 Err(err) => {
429 cancellation.cancel();
430 cancel_watcher.abort();
431 return Err(RecoverFailure::LeaseLost(err));
432 }
433 }
434 }
435 }
436 }
437 }
438
439 async fn release_process_lease(&self, lease: &ProcessLease) -> Result<(), PluginError> {
440 self.config
441 .process_registry
442 .complete_process_lease(&ProcessLeaseCompletion::from_lease(lease))
443 .await
444 }
445
446 pub async fn request_process_cancel(
447 &self,
448 process_id: &str,
449 reason: Option<String>,
450 ) -> Result<(), PluginError> {
451 self.config
452 .process_registry
453 .append_event(
454 process_id,
455 crate::ProcessEventAppendRequest::cancel_requested(process_id, reason),
456 )
457 .await
458 .map(|_| ())
459 }
460
461 async fn runtime_for_registration(
462 &self,
463 registration: &ProcessRegistration,
464 ) -> Result<LashRuntime, PluginError> {
465 match registration.input.as_ref() {
466 ProcessInput::SessionTurn { create_request, .. } => {
467 self.runtime_for_session_turn(registration, create_request.as_ref())
468 .await
469 }
470 ProcessInput::ToolCall { .. } | ProcessInput::Engine { .. } => {
471 self.runtime_for_process_env(registration).await
472 }
473 ProcessInput::External { .. } => unreachable!("external processes short-circuit"),
474 }
475 }
476
477 async fn runtime_for_session_turn(
478 &self,
479 registration: &ProcessRegistration,
480 create_request: &crate::SessionCreateRequest,
481 ) -> Result<LashRuntime, PluginError> {
482 let mut policy = create_request
483 .policy
484 .clone()
485 .unwrap_or_else(|| self.config.session_policy.clone());
486 if policy.recorded_provider_id().is_empty() {
487 policy.provider_id = self.config.session_policy.provider_id.clone();
488 }
489 self.build_ephemeral_runtime(
490 format!("process-session-turn:{}", registration.id),
491 policy,
492 create_request.plugin_options.clone(),
493 "session turn request",
494 )
495 .await
496 }
497
498 async fn runtime_for_process_env(
499 &self,
500 registration: &ProcessRegistration,
501 ) -> Result<LashRuntime, PluginError> {
502 let Some(env_ref) = registration.env_ref.as_ref() else {
503 return Err(PluginError::Session(format!(
504 "process `{}` is missing a captured execution env",
505 registration.id
506 )));
507 };
508 let env = crate::load_process_execution_env(
509 self.config
510 .runtime_host
511 .durability
512 .process_env_store
513 .as_ref(),
514 env_ref,
515 )
516 .await?;
517 self.build_ephemeral_runtime(
518 format!("process-env:{}", registration.id),
519 env.policy,
520 env.plugin_options,
521 env_ref.as_str(),
522 )
523 .await
524 }
525
526 async fn build_ephemeral_runtime(
527 &self,
528 session_id: String,
529 policy: crate::SessionPolicy,
530 plugin_options: crate::PluginOptions,
531 source_label: &str,
532 ) -> Result<LashRuntime, PluginError> {
533 let store = Arc::new(InMemorySessionStore::default());
534 EmbeddedRuntimeBuilder::new()
535 .with_session_id(session_id.to_string())
536 .with_plugin_host(self.config.plugin_host.as_ref().clone())
537 .with_runtime_host(self.config.runtime_host.clone())
538 .with_policy(policy)
539 .with_plugin_options(plugin_options)
540 .with_session_store_factory(Arc::clone(&self.config.session_store_factory))
541 .with_trigger_store(Arc::clone(&self.config.trigger_store))
542 .with_process_registry(Arc::clone(&self.config.process_registry))
543 .with_residency(self.config.residency)
544 .with_store(store)
545 .build()
546 .await
547 .map_err(|err| {
548 PluginError::Session(format!(
549 "failed to build process worker runtime for {source_label}: {err}"
550 ))
551 })
552 }
553
554 fn ensure_durable_store_facets(&self) -> Result<(), PluginError> {
563 if self
564 .config
565 .runtime_host
566 .control
567 .effect_host
568 .durability_tier()
569 != crate::DurabilityTier::Durable
570 {
571 return Ok(());
572 }
573 let require = |facet: crate::DurableStoreFacet| {
574 PluginError::Session(crate::RuntimeError::durable_store_required(facet).to_string())
575 };
576 if self
577 .config
578 .runtime_host
579 .durability
580 .attachment_store
581 .persistence()
582 .durability_tier()
583 != crate::DurabilityTier::Durable
584 {
585 return Err(require(crate::DurableStoreFacet::AttachmentStore));
586 }
587 if self
588 .config
589 .runtime_host
590 .durability
591 .process_env_store
592 .durability_tier()
593 != crate::DurabilityTier::Durable
594 {
595 return Err(require(crate::DurableStoreFacet::ProcessEnvStore));
596 }
597 if self.config.session_store_factory.durability_tier() != crate::DurabilityTier::Durable {
598 return Err(require(crate::DurableStoreFacet::SessionStore));
599 }
600 if self.config.process_registry.durability_tier() != crate::DurabilityTier::Durable {
601 return Err(require(crate::DurableStoreFacet::ProcessRegistry));
602 }
603 if self.config.trigger_store.durability_tier() != crate::DurabilityTier::Durable {
604 return Err(require(crate::DurableStoreFacet::TriggerStore));
605 }
606 Ok(())
607 }
608
609 fn ensure_stable_process_id(
617 &self,
618 registration: &ProcessRegistration,
619 ) -> Result<(), PluginError> {
620 if registration.id.trim().is_empty() {
621 return Err(PluginError::Session(
622 crate::RuntimeError::missing_process_execution_id().to_string(),
623 ));
624 }
625 Ok(())
626 }
627}
628
629fn process_lease_renew_interval() -> Duration {
630 Duration::from_millis(process_lease_renew_interval_ms())
631}
632
633#[cfg(test)]
634fn process_lease_renew_interval_ms() -> u64 {
635 25
636}
637
638#[cfg(not(test))]
639fn process_lease_renew_interval_ms() -> u64 {
640 30_000
641}
642
643#[cfg(test)]
644mod boundary_tests {
645 use super::*;
646 use crate::{
647 AttachmentStore, AttachmentStoreError, AttachmentStorePersistence, DurabilityTier,
648 DurableStoreFacet, InMemoryAttachmentStore, ProcessExecutionEnvRef,
649 ProcessExecutionEnvStore, ProcessInput, ProcessRegistration, RuntimeEffectController,
650 RuntimeError, StoredAttachment, TriggerStore,
651 };
652 use lash_sansio::{AttachmentCreateMeta, AttachmentId, AttachmentRef};
653
654 #[derive(Default)]
657 struct DurableController;
658
659 #[async_trait::async_trait]
660 impl RuntimeEffectController for DurableController {
661 fn durability_tier(&self) -> DurabilityTier {
662 DurabilityTier::Durable
663 }
664
665 async fn execute_effect(
666 &self,
667 _envelope: crate::RuntimeEffectEnvelope,
668 _local_executor: crate::RuntimeEffectLocalExecutor<'_>,
669 ) -> Result<crate::RuntimeEffectOutcome, crate::RuntimeEffectControllerError> {
670 unreachable!("worker boundary rejects before executing any effect")
671 }
672 }
673
674 #[derive(Default)]
676 struct DurableAttachmentStore {
677 inner: InMemoryAttachmentStore,
678 }
679
680 #[async_trait::async_trait]
681 impl AttachmentStore for DurableAttachmentStore {
682 fn persistence(&self) -> AttachmentStorePersistence {
683 AttachmentStorePersistence::Durable
684 }
685
686 async fn put(
687 &self,
688 bytes: Vec<u8>,
689 meta: AttachmentCreateMeta,
690 ) -> Result<AttachmentRef, AttachmentStoreError> {
691 self.inner.put(bytes, meta).await
692 }
693
694 async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
695 self.inner.get(id).await
696 }
697 }
698
699 #[derive(Default)]
701 struct DurableProcessEnvStore {
702 inner: crate::InMemoryProcessExecutionEnvStore,
703 }
704
705 #[async_trait::async_trait]
706 impl ProcessExecutionEnvStore for DurableProcessEnvStore {
707 fn durability_tier(&self) -> DurabilityTier {
708 DurabilityTier::Durable
709 }
710
711 async fn put_process_execution_env(
712 &self,
713 env_ref: &ProcessExecutionEnvRef,
714 bytes: &[u8],
715 ) -> Result<(), PluginError> {
716 self.inner.put_process_execution_env(env_ref, bytes).await
717 }
718
719 async fn get_process_execution_env(
720 &self,
721 env_ref: &ProcessExecutionEnvRef,
722 ) -> Result<Option<Vec<u8>>, PluginError> {
723 self.inner.get_process_execution_env(env_ref).await
724 }
725 }
726
727 struct TierSessionStoreFactory {
730 tier: DurabilityTier,
731 }
732
733 #[async_trait::async_trait]
734 impl SessionStoreFactory for TierSessionStoreFactory {
735 fn durability_tier(&self) -> DurabilityTier {
736 self.tier
737 }
738
739 async fn create_store(
740 &self,
741 _request: &crate::SessionStoreCreateRequest,
742 ) -> Result<Arc<dyn crate::RuntimePersistence>, String> {
743 unreachable!("worker boundary rejects before creating a session store")
744 }
745
746 async fn delete_session(&self, _session_id: &str) -> Result<(), String> {
747 Ok(())
748 }
749 }
750
751 struct TierTriggerStore {
752 tier: DurabilityTier,
753 inner: crate::InMemoryTriggerStore,
754 }
755
756 impl TierTriggerStore {
757 fn new(tier: DurabilityTier) -> Self {
758 Self {
759 tier,
760 inner: crate::InMemoryTriggerStore::default(),
761 }
762 }
763 }
764
765 #[async_trait::async_trait]
766 impl TriggerStore for TierTriggerStore {
767 fn durability_tier(&self) -> DurabilityTier {
768 self.tier
769 }
770
771 async fn register_subscription(
772 &self,
773 draft: crate::TriggerSubscriptionDraft,
774 ) -> Result<crate::TriggerSubscriptionRecord, PluginError> {
775 self.inner.register_subscription(draft).await
776 }
777
778 async fn list_subscriptions(
779 &self,
780 filter: crate::TriggerSubscriptionFilter,
781 ) -> Result<Vec<crate::TriggerSubscriptionRecord>, PluginError> {
782 self.inner.list_subscriptions(filter).await
783 }
784
785 async fn cancel_subscription(
786 &self,
787 session_id: &str,
788 handle: &str,
789 ) -> Result<bool, PluginError> {
790 self.inner.cancel_subscription(session_id, handle).await
791 }
792
793 async fn delete_session_subscriptions(
794 &self,
795 session_id: &str,
796 ) -> Result<usize, PluginError> {
797 self.inner.delete_session_subscriptions(session_id).await
798 }
799
800 async fn record_occurrence(
801 &self,
802 request: crate::TriggerOccurrenceRequest,
803 ) -> Result<crate::TriggerOccurrenceRecord, PluginError> {
804 self.inner.record_occurrence(request).await
805 }
806
807 async fn reserve_matching_deliveries(
808 &self,
809 occurrence_id: &str,
810 ) -> Result<Vec<crate::TriggerDeliveryReservation>, PluginError> {
811 self.inner.reserve_matching_deliveries(occurrence_id).await
812 }
813 }
814
815 fn worker(
819 attachment: Arc<dyn AttachmentStore>,
820 process_env_store: Arc<dyn ProcessExecutionEnvStore>,
821 session_store_tier: DurabilityTier,
822 ) -> DurableProcessWorker {
823 worker_with_store_tiers(
824 attachment,
825 process_env_store,
826 session_store_tier,
827 DurabilityTier::Durable,
828 DurabilityTier::Durable,
829 )
830 }
831
832 fn worker_with_store_tiers(
833 attachment: Arc<dyn AttachmentStore>,
834 process_env_store: Arc<dyn ProcessExecutionEnvStore>,
835 session_store_tier: DurabilityTier,
836 process_registry_tier: DurabilityTier,
837 trigger_store_tier: DurabilityTier,
838 ) -> DurableProcessWorker {
839 let mut runtime_host = RuntimeHostConfig::in_memory();
840 runtime_host.control.effect_host =
841 Arc::new(crate::InlineEffectHost::new(Arc::new(DurableController)));
842 runtime_host.durability.attachment_store = attachment;
843 runtime_host.durability.process_env_store = process_env_store;
844 let plugin_host = Arc::new(crate::PluginHost::new(Vec::new()));
845 let factory: Arc<dyn SessionStoreFactory> = Arc::new(TierSessionStoreFactory {
846 tier: session_store_tier,
847 });
848 let registry: Arc<dyn ProcessRegistry> = Arc::new(
849 crate::TestLocalProcessRegistry::default().with_durability_tier(process_registry_tier),
850 );
851 let trigger_store: Arc<dyn TriggerStore> =
852 Arc::new(TierTriggerStore::new(trigger_store_tier));
853 DurableProcessWorker::new(
854 DurableProcessWorkerConfig::new(plugin_host, runtime_host, factory, registry)
855 .with_trigger_store(trigger_store),
856 )
857 }
858
859 fn external_registration() -> ProcessRegistration {
860 ProcessRegistration::new(
861 "worker-boundary-process",
862 ProcessInput::External {
863 metadata: serde_json::json!({}),
864 },
865 crate::ProcessProvenance::host(),
866 )
867 }
868
869 async fn run(worker: &DurableProcessWorker) -> Result<ProcessAwaitOutput, PluginError> {
870 worker
871 .run_process(
872 external_registration(),
873 ProcessExecutionContext::default(),
874 CancellationToken::new(),
875 )
876 .await
877 }
878
879 fn assert_facet(err: PluginError, facet: DurableStoreFacet) {
880 let PluginError::Session(message) = err else {
881 panic!("expected PluginError::Session, got {err:?}");
882 };
883 let expected = RuntimeError::durable_store_required(facet).to_string();
884 assert_eq!(message, expected, "worker must reject the {facet:?} facet");
885 }
886
887 #[tokio::test]
888 async fn durable_worker_rejects_ephemeral_attachment_store() {
889 let worker = worker(
890 Arc::new(InMemoryAttachmentStore::new()),
891 Arc::new(DurableProcessEnvStore::default()),
892 DurabilityTier::Durable,
893 );
894 let err = run(&worker)
895 .await
896 .expect_err("ephemeral attachment store must be rejected at the worker boundary");
897 assert_facet(err, DurableStoreFacet::AttachmentStore);
898 }
899
900 #[tokio::test]
901 async fn durable_worker_rejects_ephemeral_process_env_store() {
902 let worker = worker(
903 Arc::new(DurableAttachmentStore::default()),
904 Arc::new(crate::InMemoryProcessExecutionEnvStore::new()),
905 DurabilityTier::Durable,
906 );
907 let err = run(&worker)
908 .await
909 .expect_err("ephemeral process env store must be rejected at the worker boundary");
910 assert_facet(err, DurableStoreFacet::ProcessEnvStore);
911 }
912
913 #[tokio::test]
914 async fn durable_worker_rejects_ephemeral_session_store_factory() {
915 let worker = worker(
916 Arc::new(DurableAttachmentStore::default()),
917 Arc::new(DurableProcessEnvStore::default()),
918 DurabilityTier::Inline,
919 );
920 let err = run(&worker)
921 .await
922 .expect_err("ephemeral session store factory must be rejected at the worker boundary");
923 assert_facet(err, DurableStoreFacet::SessionStore);
924 }
925
926 #[tokio::test]
927 async fn durable_worker_rejects_ephemeral_process_registry() {
928 let worker = worker_with_store_tiers(
929 Arc::new(DurableAttachmentStore::default()),
930 Arc::new(DurableProcessEnvStore::default()),
931 DurabilityTier::Durable,
932 DurabilityTier::Inline,
933 DurabilityTier::Durable,
934 );
935 let err = run(&worker)
936 .await
937 .expect_err("ephemeral process registry must be rejected at the worker boundary");
938 assert_facet(err, DurableStoreFacet::ProcessRegistry);
939 }
940
941 #[tokio::test]
942 async fn durable_worker_rejects_ephemeral_trigger_store() {
943 let worker = worker_with_store_tiers(
944 Arc::new(DurableAttachmentStore::default()),
945 Arc::new(DurableProcessEnvStore::default()),
946 DurabilityTier::Durable,
947 DurabilityTier::Durable,
948 DurabilityTier::Inline,
949 );
950 let err = run(&worker)
951 .await
952 .expect_err("ephemeral trigger store must be rejected at the worker boundary");
953 assert_facet(err, DurableStoreFacet::TriggerStore);
954 }
955
956 #[tokio::test]
957 async fn durable_worker_with_all_durable_stores_passes_store_facet_check() {
958 let worker = worker(
962 Arc::new(DurableAttachmentStore::default()),
963 Arc::new(DurableProcessEnvStore::default()),
964 DurabilityTier::Durable,
965 );
966 let output = run(&worker)
967 .await
968 .expect("all-durable worker should pass the store-facet guard");
969 assert!(matches!(output, ProcessAwaitOutput::Success { .. }));
970 }
971
972 #[tokio::test]
973 async fn inline_worker_passes_store_facet_check_with_ephemeral_stores() {
974 let runtime_host = RuntimeHostConfig::in_memory();
977 let plugin_host = Arc::new(crate::PluginHost::new(Vec::new()));
978 let factory: Arc<dyn SessionStoreFactory> = Arc::new(TierSessionStoreFactory {
979 tier: DurabilityTier::Inline,
980 });
981 let registry: Arc<dyn ProcessRegistry> =
982 Arc::new(crate::TestLocalProcessRegistry::default());
983 let worker = DurableProcessWorker::new(DurableProcessWorkerConfig::new(
984 plugin_host,
985 runtime_host,
986 factory,
987 registry,
988 ));
989 let output = run(&worker)
990 .await
991 .expect("inline worker should pass the store-facet guard");
992 assert!(matches!(output, ProcessAwaitOutput::Success { .. }));
993 }
994}