1use std::sync::Arc;
2use std::time::Duration;
3
4use tokio_util::sync::CancellationToken;
5
6use super::effect::ProcessRunner;
7use super::session_manager::RuntimeSessionServices;
8use super::{
9 EmbeddedRuntimeBuilder, ProcessWorkDriver, QueuedWorkDriver, RUNTIME_TURN_LEASE_TTL_MS,
10 RuntimeHostConfig,
11};
12use crate::{
13 InMemorySessionStore, LashRuntime, PluginError, PluginFactory, PluginHost, PluginStack,
14 ProcessAwaitOutput, ProcessExecutionContext, ProcessInput, ProcessLease,
15 ProcessLeaseCompletion, ProcessRecord, ProcessRegistration, ProcessRegistry,
16 SessionStoreFactory,
17};
18
19#[derive(Clone)]
25pub struct DurableProcessWorkerConfig {
26 pub plugin_host: Arc<PluginHost>,
27 pub runtime_host: RuntimeHostConfig,
28 pub session_policy: crate::SessionPolicy,
29 pub session_store_factory: Arc<dyn SessionStoreFactory>,
30 pub process_registry: Arc<dyn ProcessRegistry>,
31 pub trigger_store: Arc<dyn crate::TriggerStore>,
32 pub process_work_driver: Option<ProcessWorkDriver>,
33 pub queued_work_driver: Option<QueuedWorkDriver>,
34 #[doc(hidden)]
35 pub turn_phase_probe_slot: crate::runtime::RuntimeTurnPhaseProbeSlot,
36 pub residency: crate::Residency,
42}
43
44impl DurableProcessWorkerConfig {
45 pub fn new(
46 plugin_host: Arc<PluginHost>,
47 runtime_host: RuntimeHostConfig,
48 session_store_factory: Arc<dyn SessionStoreFactory>,
49 process_registry: Arc<dyn ProcessRegistry>,
50 ) -> Self {
51 let clock = Arc::clone(&runtime_host.clock);
52 Self {
53 plugin_host,
54 runtime_host,
55 session_policy: crate::SessionPolicy::default(),
56 session_store_factory,
57 process_registry,
58 trigger_store: Arc::new(crate::InMemoryTriggerStore::with_clock(clock)),
59 process_work_driver: None,
60 queued_work_driver: None,
61 turn_phase_probe_slot: crate::runtime::RuntimeTurnPhaseProbeSlot::default(),
62 residency: crate::Residency::default(),
63 }
64 }
65
66 pub fn with_trigger_store(mut self, store: Arc<dyn crate::TriggerStore>) -> Self {
67 self.trigger_store = store;
68 self
69 }
70
71 pub fn with_session_policy(mut self, policy: crate::SessionPolicy) -> Self {
72 self.session_policy = policy;
73 self
74 }
75
76 pub fn with_residency(mut self, residency: crate::Residency) -> Self {
77 self.residency = residency;
78 self
79 }
80
81 pub fn with_process_work_driver(mut self, driver: ProcessWorkDriver) -> Self {
82 self.process_work_driver = Some(driver);
83 self
84 }
85
86 pub fn with_queued_work_driver(mut self, driver: QueuedWorkDriver) -> Self {
87 self.queued_work_driver = Some(driver);
88 self
89 }
90
91 #[doc(hidden)]
92 pub fn with_turn_phase_probe_slot(
93 mut self,
94 slot: crate::runtime::RuntimeTurnPhaseProbeSlot,
95 ) -> Self {
96 self.turn_phase_probe_slot = slot;
97 self
98 }
99
100 pub fn from_plugin_factories(
101 plugin_factories: impl IntoIterator<Item = Arc<dyn PluginFactory>>,
102 runtime_host: RuntimeHostConfig,
103 session_store_factory: Arc<dyn SessionStoreFactory>,
104 process_registry: Arc<dyn ProcessRegistry>,
105 ) -> Self {
106 Self::new(
107 Arc::new(PluginHost::new(plugin_factories.into_iter().collect())),
108 runtime_host,
109 session_store_factory,
110 process_registry,
111 )
112 }
113
114 pub fn from_plugin_stack(
115 plugin_stack: PluginStack,
116 runtime_host: RuntimeHostConfig,
117 session_store_factory: Arc<dyn SessionStoreFactory>,
118 process_registry: Arc<dyn ProcessRegistry>,
119 ) -> Self {
120 Self::from_plugin_factories(
121 plugin_stack.into_factories(),
122 runtime_host,
123 session_store_factory,
124 process_registry,
125 )
126 }
127}
128
129#[derive(Clone)]
131pub struct DurableProcessWorker {
132 config: Arc<DurableProcessWorkerConfig>,
133}
134
135enum RecoverFailure {
137 LeaseLost(PluginError),
141 Run(PluginError),
144}
145
146impl DurableProcessWorker {
147 pub fn new(config: DurableProcessWorkerConfig) -> Self {
148 Self {
149 config: Arc::new(config),
150 }
151 }
152
153 pub fn from_shared_config(config: Arc<DurableProcessWorkerConfig>) -> Self {
154 Self { config }
155 }
156
157 pub fn config(&self) -> &DurableProcessWorkerConfig {
158 &self.config
159 }
160
161 pub async fn run_process(
162 &self,
163 registration: ProcessRegistration,
164 execution_context: ProcessExecutionContext,
165 cancellation: CancellationToken,
166 ) -> Result<ProcessAwaitOutput, PluginError> {
167 let scoped_effect_controller = self
168 .config
169 .runtime_host
170 .control
171 .effect_host
172 .scoped_static(crate::ExecutionScope::process(registration.id.clone()))
173 .map_err(|err| PluginError::Session(err.to_string()))?
174 .ok_or_else(|| {
175 PluginError::Session(
176 "process worker effect host must provide a static process scope".to_string(),
177 )
178 })?;
179 self.run_process_with_scoped_effect_controller(
180 registration,
181 execution_context,
182 scoped_effect_controller,
183 cancellation,
184 )
185 .await
186 }
187
188 pub async fn run_process_with_scoped_effect_controller(
189 &self,
190 registration: ProcessRegistration,
191 execution_context: ProcessExecutionContext,
192 scoped_effect_controller: crate::ScopedEffectController<'_>,
193 cancellation: CancellationToken,
194 ) -> Result<ProcessAwaitOutput, PluginError> {
195 self.ensure_stable_process_id(®istration)?;
196 self.ensure_durable_store_facets()?;
197 if let ProcessInput::External { metadata } = registration.input.as_ref() {
198 return Ok(ProcessAwaitOutput::Success {
199 value: serde_json::json!({ "metadata": metadata.clone() }),
200 control: None,
201 });
202 }
203 let mut runtime = self.runtime_for_registration(®istration).await?;
204 let originator_scope = if let crate::ProcessOriginator::Session { scope } =
205 ®istration.provenance.originator
206 {
207 Some(scope)
208 } else {
209 None
210 };
211 let probe_scope = registration.wake_target.as_ref().or(originator_scope);
212 if let Some(probe) =
213 probe_scope.and_then(|scope| self.config.turn_phase_probe_slot.get_for_scope(scope))
214 {
215 runtime.set_turn_phase_probe(probe);
216 }
217 let manager = RuntimeSessionServices::new(&runtime, true, None).map_err(|err| {
218 PluginError::Session(format!(
219 "failed to build runtime env for process `{}`: {err}",
220 registration.id
221 ))
222 })?;
223 Ok(manager
224 .run_process(
225 registration,
226 execution_context,
227 Arc::clone(&self.config.process_registry),
228 scoped_effect_controller,
229 cancellation,
230 )
231 .await)
232 }
233
234 pub async fn drive_pending_processes(&self) -> Result<(), PluginError> {
257 let records = self.config.process_registry.list_non_terminal().await?;
258 for record in records {
259 let worker = self.clone();
270 tokio::spawn(async move { worker.recover_process(record).await });
271 }
272 Ok(())
273 }
274
275 async fn recover_process(&self, record: ProcessRecord) {
276 let owner_id = format!("process-recovery-{}", uuid::Uuid::new_v4());
277 let process_id = record.id.clone();
278 let Ok(lease) = self
282 .config
283 .process_registry
284 .claim_process_lease(&process_id, &owner_id, RUNTIME_TURN_LEASE_TTL_MS)
285 .await
286 else {
287 return;
288 };
289 if self
292 .config
293 .process_registry
294 .get_process(&process_id)
295 .await
296 .is_some_and(|current| current.is_terminal())
297 {
298 self.release_or_log(&lease).await;
299 return;
300 }
301 let registration = ProcessRegistration {
302 id: record.id,
303 input: record.input,
304 identity: record.identity,
305 event_types: record.event_types,
306 provenance: record.provenance.clone(),
307 env_ref: record.env_ref.clone(),
308 wake_target: record.wake_target.clone(),
309 };
310 let execution_context = ProcessExecutionContext::default();
311 match self
312 .run_process_with_lease_renewal(registration, execution_context, lease.clone())
313 .await
314 {
315 Ok(output) => self.complete_and_release(&lease, &process_id, output).await,
318 Err(RecoverFailure::LeaseLost(err)) => {
324 tracing::warn!(
325 process_id = %process_id,
326 error = %err,
327 "process recovery lost its lease mid-run; deferring to the new owner",
328 );
329 }
330 Err(RecoverFailure::Run(err)) => {
333 let output = ProcessAwaitOutput::Failure {
334 class: crate::ToolFailureClass::Execution,
335 code: "process_recovery_failed".to_string(),
336 message: err.to_string(),
337 raw: None,
338 control: None,
339 };
340 self.complete_and_release(&lease, &process_id, output).await;
341 }
342 }
343 }
344
345 async fn complete_and_release(
349 &self,
350 lease: &ProcessLease,
351 process_id: &str,
352 output: ProcessAwaitOutput,
353 ) {
354 let fenced = match self
361 .config
362 .process_registry
363 .renew_process_lease(lease, RUNTIME_TURN_LEASE_TTL_MS)
364 .await
365 {
366 Ok(renewed) => renewed,
367 Err(err) => {
368 tracing::warn!(
369 process_id = %process_id,
370 error = %err,
371 "lost process lease before terminal write; deferring to the new owner",
372 );
373 return;
374 }
375 };
376 if let Err(err) = self
377 .config
378 .process_registry
379 .complete_process(process_id, output)
380 .await
381 {
382 tracing::warn!(
383 process_id = %process_id,
384 error = %err,
385 "failed to write recovered process terminal outcome",
386 );
387 }
388 self.release_or_log(&fenced).await;
389 }
390
391 async fn release_or_log(&self, lease: &ProcessLease) {
392 if let Err(err) = self.release_process_lease(lease).await {
393 tracing::warn!(
394 process_id = %lease.process_id,
395 error = %err,
396 "failed to release recovered process lease",
397 );
398 }
399 }
400
401 async fn run_process_with_lease_renewal(
405 &self,
406 registration: ProcessRegistration,
407 execution_context: ProcessExecutionContext,
408 mut lease: ProcessLease,
409 ) -> Result<ProcessAwaitOutput, RecoverFailure> {
410 let process_id = registration.id.clone();
411 let cancellation = CancellationToken::new();
412 let cancel_watcher = {
413 let registry = Arc::clone(&self.config.process_registry);
414 let process_id = process_id.clone();
415 let cancellation = cancellation.clone();
416 tokio::spawn(async move {
417 match registry
418 .wait_event_after(&process_id, "process.cancel_requested", 0)
419 .await
420 {
421 Ok(_) => cancellation.cancel(),
422 Err(err) => tracing::warn!(
423 process_id = %process_id,
424 error = %err,
425 "process cancel watcher stopped before observing cancellation",
426 ),
427 }
428 })
429 };
430 let pending = self.run_process(registration, execution_context, cancellation.clone());
431 tokio::pin!(pending);
432 loop {
433 tokio::select! {
434 outcome = &mut pending => {
435 cancel_watcher.abort();
436 return outcome.map_err(RecoverFailure::Run);
437 }
438 _ = self.config.runtime_host.clock.sleep(process_lease_renew_interval()) => {
439 match self
440 .config
441 .process_registry
442 .renew_process_lease(&lease, RUNTIME_TURN_LEASE_TTL_MS)
443 .await
444 {
445 Ok(renewed) => lease = renewed,
446 Err(err) => {
447 cancellation.cancel();
448 cancel_watcher.abort();
449 return Err(RecoverFailure::LeaseLost(err));
450 }
451 }
452 }
453 }
454 }
455 }
456
457 async fn release_process_lease(&self, lease: &ProcessLease) -> Result<(), PluginError> {
458 self.config
459 .process_registry
460 .complete_process_lease(&ProcessLeaseCompletion::from_lease(lease))
461 .await
462 }
463
464 pub async fn request_process_cancel(
465 &self,
466 process_id: &str,
467 reason: Option<String>,
468 ) -> Result<(), PluginError> {
469 self.config
470 .process_registry
471 .append_event(
472 process_id,
473 crate::ProcessEventAppendRequest::cancel_requested(process_id, reason),
474 )
475 .await
476 .map(|_| ())
477 }
478
479 async fn runtime_for_registration(
480 &self,
481 registration: &ProcessRegistration,
482 ) -> Result<LashRuntime, PluginError> {
483 match registration.input.as_ref() {
484 ProcessInput::SessionTurn { create_request, .. } => {
485 self.runtime_for_session_turn(registration, create_request.as_ref())
486 .await
487 }
488 ProcessInput::ToolCall { .. } | ProcessInput::Engine { .. } => {
489 self.runtime_for_process_env(registration).await
490 }
491 ProcessInput::External { .. } => unreachable!("external processes short-circuit"),
492 }
493 }
494
495 async fn runtime_for_session_turn(
496 &self,
497 registration: &ProcessRegistration,
498 create_request: &crate::SessionCreateRequest,
499 ) -> Result<LashRuntime, PluginError> {
500 let mut policy = create_request
501 .policy
502 .clone()
503 .unwrap_or_else(|| self.config.session_policy.clone());
504 if policy.recorded_provider_id().is_empty() {
505 policy.provider_id = self.config.session_policy.provider_id.clone();
506 }
507 self.build_ephemeral_runtime(
508 format!("process-session-turn:{}", registration.id),
509 policy,
510 create_request.plugin_options.clone(),
511 "session turn request",
512 )
513 .await
514 }
515
516 async fn runtime_for_process_env(
517 &self,
518 registration: &ProcessRegistration,
519 ) -> Result<LashRuntime, PluginError> {
520 let Some(env_ref) = registration.env_ref.as_ref() else {
521 return Err(PluginError::Session(format!(
522 "process `{}` is missing a captured execution env",
523 registration.id
524 )));
525 };
526 let env = crate::load_process_execution_env(
527 self.config
528 .runtime_host
529 .durability
530 .process_env_store
531 .as_ref(),
532 env_ref,
533 )
534 .await?;
535 self.build_ephemeral_runtime(
536 format!("process-env:{}", registration.id),
537 env.policy,
538 env.plugin_options,
539 env_ref.as_str(),
540 )
541 .await
542 }
543
544 async fn build_ephemeral_runtime(
545 &self,
546 session_id: String,
547 policy: crate::SessionPolicy,
548 plugin_options: crate::PluginOptions,
549 source_label: &str,
550 ) -> Result<LashRuntime, PluginError> {
551 let store = Arc::new(InMemorySessionStore::default());
552 let process_work_driver = self.config.process_work_driver.clone().unwrap_or_else(|| {
553 ProcessWorkDriver::inline(Arc::clone(&self.config.process_registry), self.clone())
554 });
555 let mut builder = EmbeddedRuntimeBuilder::new()
556 .with_session_id(session_id.to_string())
557 .with_plugin_host(self.config.plugin_host.as_ref().clone())
558 .with_runtime_host(self.config.runtime_host.clone())
559 .with_policy(policy)
560 .with_plugin_options(plugin_options)
561 .with_session_store_factory(Arc::clone(&self.config.session_store_factory))
562 .with_trigger_store(Arc::clone(&self.config.trigger_store))
563 .with_process_registry(Arc::clone(&self.config.process_registry))
564 .with_process_work_driver(process_work_driver)
565 .with_residency(self.config.residency)
566 .with_store(store);
567 if let Some(driver) = self.config.queued_work_driver.clone() {
568 builder = builder.with_queued_work_driver(driver);
569 }
570 builder.build().await.map_err(|err| {
571 PluginError::Session(format!(
572 "failed to build process worker runtime for {source_label}: {err}"
573 ))
574 })
575 }
576
577 fn ensure_durable_store_facets(&self) -> Result<(), PluginError> {
586 if self
587 .config
588 .runtime_host
589 .control
590 .effect_host
591 .durability_tier()
592 != crate::DurabilityTier::Durable
593 {
594 return Ok(());
595 }
596 let require = |facet: crate::DurableStoreFacet| {
597 PluginError::Session(crate::RuntimeError::durable_store_required(facet).to_string())
598 };
599 if self
600 .config
601 .runtime_host
602 .durability
603 .attachment_store
604 .persistence()
605 .durability_tier()
606 != crate::DurabilityTier::Durable
607 {
608 return Err(require(crate::DurableStoreFacet::AttachmentStore));
609 }
610 if self
611 .config
612 .runtime_host
613 .durability
614 .process_env_store
615 .durability_tier()
616 != crate::DurabilityTier::Durable
617 {
618 return Err(require(crate::DurableStoreFacet::ProcessEnvStore));
619 }
620 if self.config.session_store_factory.durability_tier() != crate::DurabilityTier::Durable {
621 return Err(require(crate::DurableStoreFacet::SessionStore));
622 }
623 if self.config.process_registry.durability_tier() != crate::DurabilityTier::Durable {
624 return Err(require(crate::DurableStoreFacet::ProcessRegistry));
625 }
626 if self.config.trigger_store.durability_tier() != crate::DurabilityTier::Durable {
627 return Err(require(crate::DurableStoreFacet::TriggerStore));
628 }
629 Ok(())
630 }
631
632 fn ensure_stable_process_id(
640 &self,
641 registration: &ProcessRegistration,
642 ) -> Result<(), PluginError> {
643 if registration.id.trim().is_empty() {
644 return Err(PluginError::Session(
645 crate::RuntimeError::missing_process_execution_id().to_string(),
646 ));
647 }
648 Ok(())
649 }
650}
651
652fn process_lease_renew_interval() -> Duration {
653 Duration::from_millis(process_lease_renew_interval_ms())
654}
655
656#[cfg(test)]
657fn process_lease_renew_interval_ms() -> u64 {
658 25
659}
660
661#[cfg(not(test))]
662fn process_lease_renew_interval_ms() -> u64 {
663 30_000
664}
665
666#[cfg(test)]
667mod boundary_tests {
668 use super::*;
669 use crate::{
670 AttachmentStore, AttachmentStoreError, AttachmentStorePersistence, DurabilityTier,
671 DurableStoreFacet, InMemoryAttachmentStore, ProcessExecutionEnvRef,
672 ProcessExecutionEnvStore, ProcessInput, ProcessRegistration, RuntimeEffectController,
673 RuntimeError, StoredAttachment, TriggerStore,
674 };
675 use lash_sansio::{AttachmentCreateMeta, AttachmentId, AttachmentRef};
676
677 #[derive(Default)]
680 struct DurableController;
681
682 #[async_trait::async_trait]
683 impl RuntimeEffectController for DurableController {
684 fn durability_tier(&self) -> DurabilityTier {
685 DurabilityTier::Durable
686 }
687
688 async fn execute_effect(
689 &self,
690 _envelope: crate::RuntimeEffectEnvelope,
691 _local_executor: crate::RuntimeEffectLocalExecutor<'_>,
692 ) -> Result<crate::RuntimeEffectOutcome, crate::RuntimeEffectControllerError> {
693 unreachable!("worker boundary rejects before executing any effect")
694 }
695 }
696
697 #[derive(Default)]
699 struct DurableAttachmentStore {
700 inner: InMemoryAttachmentStore,
701 }
702
703 #[async_trait::async_trait]
704 impl AttachmentStore for DurableAttachmentStore {
705 fn persistence(&self) -> AttachmentStorePersistence {
706 AttachmentStorePersistence::Durable
707 }
708
709 async fn put(
710 &self,
711 bytes: Vec<u8>,
712 meta: AttachmentCreateMeta,
713 ) -> Result<AttachmentRef, AttachmentStoreError> {
714 self.inner.put(bytes, meta).await
715 }
716
717 async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
718 self.inner.get(id).await
719 }
720 }
721
722 #[derive(Default)]
724 struct DurableProcessEnvStore {
725 inner: crate::InMemoryProcessExecutionEnvStore,
726 }
727
728 #[async_trait::async_trait]
729 impl ProcessExecutionEnvStore for DurableProcessEnvStore {
730 fn durability_tier(&self) -> DurabilityTier {
731 DurabilityTier::Durable
732 }
733
734 async fn put_process_execution_env(
735 &self,
736 env_ref: &ProcessExecutionEnvRef,
737 bytes: &[u8],
738 ) -> Result<(), PluginError> {
739 self.inner.put_process_execution_env(env_ref, bytes).await
740 }
741
742 async fn get_process_execution_env(
743 &self,
744 env_ref: &ProcessExecutionEnvRef,
745 ) -> Result<Option<Vec<u8>>, PluginError> {
746 self.inner.get_process_execution_env(env_ref).await
747 }
748 }
749
750 struct TierSessionStoreFactory {
753 tier: DurabilityTier,
754 }
755
756 #[async_trait::async_trait]
757 impl SessionStoreFactory for TierSessionStoreFactory {
758 fn durability_tier(&self) -> DurabilityTier {
759 self.tier
760 }
761
762 async fn create_store(
763 &self,
764 _request: &crate::SessionStoreCreateRequest,
765 ) -> Result<Arc<dyn crate::RuntimePersistence>, String> {
766 unreachable!("worker boundary rejects before creating a session store")
767 }
768
769 async fn delete_session(&self, _session_id: &str) -> Result<(), String> {
770 Ok(())
771 }
772 }
773
774 struct TierTriggerStore {
775 tier: DurabilityTier,
776 inner: crate::InMemoryTriggerStore,
777 }
778
779 impl TierTriggerStore {
780 fn new(tier: DurabilityTier) -> Self {
781 Self {
782 tier,
783 inner: crate::InMemoryTriggerStore::default(),
784 }
785 }
786 }
787
788 #[async_trait::async_trait]
789 impl TriggerStore for TierTriggerStore {
790 fn durability_tier(&self) -> DurabilityTier {
791 self.tier
792 }
793
794 async fn register_subscription(
795 &self,
796 draft: crate::TriggerSubscriptionDraft,
797 ) -> Result<crate::TriggerSubscriptionRecord, PluginError> {
798 self.inner.register_subscription(draft).await
799 }
800
801 async fn list_subscriptions(
802 &self,
803 filter: crate::TriggerSubscriptionFilter,
804 ) -> Result<Vec<crate::TriggerSubscriptionRecord>, PluginError> {
805 self.inner.list_subscriptions(filter).await
806 }
807
808 async fn cancel_subscription(
809 &self,
810 session_id: &str,
811 handle: &str,
812 ) -> Result<bool, PluginError> {
813 self.inner.cancel_subscription(session_id, handle).await
814 }
815
816 async fn delete_session_subscriptions(
817 &self,
818 session_id: &str,
819 ) -> Result<usize, PluginError> {
820 self.inner.delete_session_subscriptions(session_id).await
821 }
822
823 async fn record_occurrence(
824 &self,
825 request: crate::TriggerOccurrenceRequest,
826 ) -> Result<crate::TriggerOccurrenceRecord, PluginError> {
827 self.inner.record_occurrence(request).await
828 }
829
830 async fn reserve_matching_deliveries(
831 &self,
832 occurrence_id: &str,
833 ) -> Result<Vec<crate::TriggerDeliveryReservation>, PluginError> {
834 self.inner.reserve_matching_deliveries(occurrence_id).await
835 }
836 }
837
838 fn worker(
842 attachment: Arc<dyn AttachmentStore>,
843 process_env_store: Arc<dyn ProcessExecutionEnvStore>,
844 session_store_tier: DurabilityTier,
845 ) -> DurableProcessWorker {
846 worker_with_store_tiers(
847 attachment,
848 process_env_store,
849 session_store_tier,
850 DurabilityTier::Durable,
851 DurabilityTier::Durable,
852 )
853 }
854
855 fn worker_with_store_tiers(
856 attachment: Arc<dyn AttachmentStore>,
857 process_env_store: Arc<dyn ProcessExecutionEnvStore>,
858 session_store_tier: DurabilityTier,
859 process_registry_tier: DurabilityTier,
860 trigger_store_tier: DurabilityTier,
861 ) -> DurableProcessWorker {
862 let mut runtime_host = RuntimeHostConfig::in_memory();
863 runtime_host.control.effect_host =
864 Arc::new(crate::InlineEffectHost::new(Arc::new(DurableController)));
865 runtime_host.durability.attachment_store = attachment;
866 runtime_host.durability.process_env_store = process_env_store;
867 let plugin_host = Arc::new(crate::PluginHost::new(Vec::new()));
868 let factory: Arc<dyn SessionStoreFactory> = Arc::new(TierSessionStoreFactory {
869 tier: session_store_tier,
870 });
871 let registry: Arc<dyn ProcessRegistry> = Arc::new(
872 crate::TestLocalProcessRegistry::default().with_durability_tier(process_registry_tier),
873 );
874 let trigger_store: Arc<dyn TriggerStore> =
875 Arc::new(TierTriggerStore::new(trigger_store_tier));
876 DurableProcessWorker::new(
877 DurableProcessWorkerConfig::new(plugin_host, runtime_host, factory, registry)
878 .with_trigger_store(trigger_store),
879 )
880 }
881
882 fn external_registration() -> ProcessRegistration {
883 ProcessRegistration::new(
884 "worker-boundary-process",
885 ProcessInput::External {
886 metadata: serde_json::json!({}),
887 },
888 crate::ProcessProvenance::host(),
889 )
890 }
891
892 async fn run(worker: &DurableProcessWorker) -> Result<ProcessAwaitOutput, PluginError> {
893 worker
894 .run_process(
895 external_registration(),
896 ProcessExecutionContext::default(),
897 CancellationToken::new(),
898 )
899 .await
900 }
901
902 fn assert_facet(err: PluginError, facet: DurableStoreFacet) {
903 let PluginError::Session(message) = err else {
904 panic!("expected PluginError::Session, got {err:?}");
905 };
906 let expected = RuntimeError::durable_store_required(facet).to_string();
907 assert_eq!(message, expected, "worker must reject the {facet:?} facet");
908 }
909
910 #[tokio::test]
911 async fn durable_worker_rejects_ephemeral_attachment_store() {
912 let worker = worker(
913 Arc::new(InMemoryAttachmentStore::new()),
914 Arc::new(DurableProcessEnvStore::default()),
915 DurabilityTier::Durable,
916 );
917 let err = run(&worker)
918 .await
919 .expect_err("ephemeral attachment store must be rejected at the worker boundary");
920 assert_facet(err, DurableStoreFacet::AttachmentStore);
921 }
922
923 #[tokio::test]
924 async fn durable_worker_rejects_ephemeral_process_env_store() {
925 let worker = worker(
926 Arc::new(DurableAttachmentStore::default()),
927 Arc::new(crate::InMemoryProcessExecutionEnvStore::new()),
928 DurabilityTier::Durable,
929 );
930 let err = run(&worker)
931 .await
932 .expect_err("ephemeral process env store must be rejected at the worker boundary");
933 assert_facet(err, DurableStoreFacet::ProcessEnvStore);
934 }
935
936 #[tokio::test]
937 async fn durable_worker_rejects_ephemeral_session_store_factory() {
938 let worker = worker(
939 Arc::new(DurableAttachmentStore::default()),
940 Arc::new(DurableProcessEnvStore::default()),
941 DurabilityTier::Inline,
942 );
943 let err = run(&worker)
944 .await
945 .expect_err("ephemeral session store factory must be rejected at the worker boundary");
946 assert_facet(err, DurableStoreFacet::SessionStore);
947 }
948
949 #[tokio::test]
950 async fn durable_worker_rejects_ephemeral_process_registry() {
951 let worker = worker_with_store_tiers(
952 Arc::new(DurableAttachmentStore::default()),
953 Arc::new(DurableProcessEnvStore::default()),
954 DurabilityTier::Durable,
955 DurabilityTier::Inline,
956 DurabilityTier::Durable,
957 );
958 let err = run(&worker)
959 .await
960 .expect_err("ephemeral process registry must be rejected at the worker boundary");
961 assert_facet(err, DurableStoreFacet::ProcessRegistry);
962 }
963
964 #[tokio::test]
965 async fn durable_worker_rejects_ephemeral_trigger_store() {
966 let worker = worker_with_store_tiers(
967 Arc::new(DurableAttachmentStore::default()),
968 Arc::new(DurableProcessEnvStore::default()),
969 DurabilityTier::Durable,
970 DurabilityTier::Durable,
971 DurabilityTier::Inline,
972 );
973 let err = run(&worker)
974 .await
975 .expect_err("ephemeral trigger store must be rejected at the worker boundary");
976 assert_facet(err, DurableStoreFacet::TriggerStore);
977 }
978
979 #[tokio::test]
980 async fn durable_worker_with_all_durable_stores_passes_store_facet_check() {
981 let worker = worker(
985 Arc::new(DurableAttachmentStore::default()),
986 Arc::new(DurableProcessEnvStore::default()),
987 DurabilityTier::Durable,
988 );
989 let output = run(&worker)
990 .await
991 .expect("all-durable worker should pass the store-facet guard");
992 assert!(matches!(output, ProcessAwaitOutput::Success { .. }));
993 }
994
995 #[tokio::test]
996 async fn inline_worker_passes_store_facet_check_with_ephemeral_stores() {
997 let runtime_host = RuntimeHostConfig::in_memory();
1000 let plugin_host = Arc::new(crate::PluginHost::new(Vec::new()));
1001 let factory: Arc<dyn SessionStoreFactory> = Arc::new(TierSessionStoreFactory {
1002 tier: DurabilityTier::Inline,
1003 });
1004 let registry: Arc<dyn ProcessRegistry> =
1005 Arc::new(crate::TestLocalProcessRegistry::default());
1006 let worker = DurableProcessWorker::new(DurableProcessWorkerConfig::new(
1007 plugin_host,
1008 runtime_host,
1009 factory,
1010 registry,
1011 ));
1012 let output = run(&worker)
1013 .await
1014 .expect("inline worker should pass the store-facet guard");
1015 assert!(matches!(output, ProcessAwaitOutput::Success { .. }));
1016 }
1017}