1use std::sync::Arc;
2use std::time::Duration;
3
4use tokio_util::sync::CancellationToken;
5
6use super::effect::ProcessRunner;
7use super::session_manager::RuntimeSessionServices;
8use super::{EmbeddedRuntimeBuilder, RUNTIME_TURN_LEASE_TTL_MS, RuntimeHostConfig};
9use crate::{
10 InMemorySessionStore, LashRuntime, PluginError, PluginFactory, PluginHost, PluginStack,
11 ProcessAwaitOutput, ProcessExecutionContext, ProcessInput, ProcessLease,
12 ProcessLeaseCompletion, ProcessRecord, ProcessRegistration, ProcessRegistry,
13 SessionStoreFactory,
14};
15
16#[derive(Clone)]
22pub struct DurableProcessWorkerConfig {
23 pub plugin_host: Arc<PluginHost>,
24 pub runtime_host: RuntimeHostConfig,
25 pub session_policy: crate::SessionPolicy,
26 pub session_store_factory: Arc<dyn SessionStoreFactory>,
27 pub process_registry: Arc<dyn ProcessRegistry>,
28 pub trigger_store: Arc<dyn crate::TriggerStore>,
29 #[doc(hidden)]
30 pub turn_phase_probe_slot: crate::runtime::RuntimeTurnPhaseProbeSlot,
31 pub residency: crate::Residency,
37}
38
39impl DurableProcessWorkerConfig {
40 pub fn new(
41 plugin_host: Arc<PluginHost>,
42 runtime_host: RuntimeHostConfig,
43 session_store_factory: Arc<dyn SessionStoreFactory>,
44 process_registry: Arc<dyn ProcessRegistry>,
45 ) -> Self {
46 Self {
47 plugin_host,
48 runtime_host,
49 session_policy: crate::SessionPolicy::default(),
50 session_store_factory,
51 process_registry,
52 trigger_store: Arc::new(crate::InMemoryTriggerStore::default()),
53 turn_phase_probe_slot: crate::runtime::RuntimeTurnPhaseProbeSlot::default(),
54 residency: crate::Residency::default(),
55 }
56 }
57
58 pub fn with_trigger_store(mut self, store: Arc<dyn crate::TriggerStore>) -> Self {
59 self.trigger_store = store;
60 self
61 }
62
63 pub fn with_session_policy(mut self, policy: crate::SessionPolicy) -> Self {
64 self.session_policy = policy;
65 self
66 }
67
68 pub fn with_residency(mut self, residency: crate::Residency) -> Self {
69 self.residency = residency;
70 self
71 }
72
73 #[doc(hidden)]
74 pub fn with_turn_phase_probe_slot(
75 mut self,
76 slot: crate::runtime::RuntimeTurnPhaseProbeSlot,
77 ) -> Self {
78 self.turn_phase_probe_slot = slot;
79 self
80 }
81
82 pub fn from_plugin_factories(
83 plugin_factories: impl IntoIterator<Item = Arc<dyn PluginFactory>>,
84 runtime_host: RuntimeHostConfig,
85 session_store_factory: Arc<dyn SessionStoreFactory>,
86 process_registry: Arc<dyn ProcessRegistry>,
87 ) -> Self {
88 Self::new(
89 Arc::new(PluginHost::new(plugin_factories.into_iter().collect())),
90 runtime_host,
91 session_store_factory,
92 process_registry,
93 )
94 }
95
96 pub fn from_plugin_stack(
97 plugin_stack: PluginStack,
98 runtime_host: RuntimeHostConfig,
99 session_store_factory: Arc<dyn SessionStoreFactory>,
100 process_registry: Arc<dyn ProcessRegistry>,
101 ) -> Self {
102 Self::from_plugin_factories(
103 plugin_stack.into_factories(),
104 runtime_host,
105 session_store_factory,
106 process_registry,
107 )
108 }
109}
110
111#[derive(Clone)]
113pub struct DurableProcessWorker {
114 config: Arc<DurableProcessWorkerConfig>,
115}
116
117enum RecoverFailure {
119 LeaseLost(PluginError),
123 Run(PluginError),
126}
127
128impl DurableProcessWorker {
129 pub fn new(config: DurableProcessWorkerConfig) -> Self {
130 Self {
131 config: Arc::new(config),
132 }
133 }
134
135 pub fn from_shared_config(config: Arc<DurableProcessWorkerConfig>) -> Self {
136 Self { config }
137 }
138
139 pub fn config(&self) -> &DurableProcessWorkerConfig {
140 &self.config
141 }
142
143 pub async fn run_process(
144 &self,
145 registration: ProcessRegistration,
146 execution_context: ProcessExecutionContext,
147 cancellation: CancellationToken,
148 ) -> Result<ProcessAwaitOutput, PluginError> {
149 let scoped_effect_controller = self
150 .config
151 .runtime_host
152 .control
153 .effect_host
154 .scoped_static(crate::ExecutionScope::process(registration.id.clone()))
155 .map_err(|err| PluginError::Session(err.to_string()))?
156 .ok_or_else(|| {
157 PluginError::Session(
158 "process worker effect host must provide a static process scope".to_string(),
159 )
160 })?;
161 self.run_process_with_scoped_effect_controller(
162 registration,
163 execution_context,
164 scoped_effect_controller,
165 cancellation,
166 )
167 .await
168 }
169
170 pub async fn run_process_with_scoped_effect_controller(
171 &self,
172 registration: ProcessRegistration,
173 execution_context: ProcessExecutionContext,
174 scoped_effect_controller: crate::ScopedEffectController<'_>,
175 cancellation: CancellationToken,
176 ) -> Result<ProcessAwaitOutput, PluginError> {
177 self.ensure_stable_process_id(®istration)?;
178 self.ensure_durable_store_facets()?;
179 if let ProcessInput::External { metadata } = registration.input.as_ref() {
180 return Ok(ProcessAwaitOutput::Success {
181 value: serde_json::json!({ "metadata": metadata.clone() }),
182 control: None,
183 });
184 }
185 let mut runtime = self.runtime_for_registration(®istration).await?;
186 let originator_scope = if let crate::ProcessOriginator::Session { scope } =
187 ®istration.provenance.originator
188 {
189 Some(scope)
190 } else {
191 None
192 };
193 let probe_scope = registration.wake_target.as_ref().or(originator_scope);
194 if let Some(probe) =
195 probe_scope.and_then(|scope| self.config.turn_phase_probe_slot.get_for_scope(scope))
196 {
197 runtime.set_turn_phase_probe(probe);
198 }
199 let manager = RuntimeSessionServices::new(&runtime, true, None).map_err(|err| {
200 PluginError::Session(format!(
201 "failed to build runtime env for process `{}`: {err}",
202 registration.id
203 ))
204 })?;
205 Ok(manager
206 .run_process(
207 registration,
208 execution_context,
209 Arc::clone(&self.config.process_registry),
210 scoped_effect_controller,
211 cancellation,
212 )
213 .await)
214 }
215
216 pub async fn drive_pending_processes(&self) -> Result<(), PluginError> {
239 let records = self.config.process_registry.list_non_terminal().await?;
240 for record in records {
241 let worker = self.clone();
252 tokio::spawn(async move { worker.recover_process(record).await });
253 }
254 Ok(())
255 }
256
257 async fn recover_process(&self, record: ProcessRecord) {
258 let owner_id = format!("process-recovery-{}", uuid::Uuid::new_v4());
259 let process_id = record.id.clone();
260 let Ok(lease) = self
264 .config
265 .process_registry
266 .claim_process_lease(&process_id, &owner_id, RUNTIME_TURN_LEASE_TTL_MS)
267 .await
268 else {
269 return;
270 };
271 if self
274 .config
275 .process_registry
276 .get_process(&process_id)
277 .await
278 .is_some_and(|current| current.is_terminal())
279 {
280 self.release_or_log(&lease).await;
281 return;
282 }
283 let registration = ProcessRegistration {
284 id: record.id,
285 input: record.input,
286 event_types: record.event_types,
287 provenance: record.provenance.clone(),
288 env_ref: record.env_ref.clone(),
289 wake_target: record.wake_target.clone(),
290 };
291 let execution_context = ProcessExecutionContext::default();
292 match self
293 .run_process_with_lease_renewal(registration, execution_context, lease.clone())
294 .await
295 {
296 Ok(output) => self.complete_and_release(&lease, &process_id, output).await,
299 Err(RecoverFailure::LeaseLost(err)) => {
305 tracing::warn!(
306 process_id = %process_id,
307 error = %err,
308 "process recovery lost its lease mid-run; deferring to the new owner",
309 );
310 }
311 Err(RecoverFailure::Run(err)) => {
314 let output = ProcessAwaitOutput::Failure {
315 class: crate::ToolFailureClass::Execution,
316 code: "process_recovery_failed".to_string(),
317 message: err.to_string(),
318 raw: None,
319 control: None,
320 };
321 self.complete_and_release(&lease, &process_id, output).await;
322 }
323 }
324 }
325
326 async fn complete_and_release(
330 &self,
331 lease: &ProcessLease,
332 process_id: &str,
333 output: ProcessAwaitOutput,
334 ) {
335 let fenced = match self
342 .config
343 .process_registry
344 .renew_process_lease(lease, RUNTIME_TURN_LEASE_TTL_MS)
345 .await
346 {
347 Ok(renewed) => renewed,
348 Err(err) => {
349 tracing::warn!(
350 process_id = %process_id,
351 error = %err,
352 "lost process lease before terminal write; deferring to the new owner",
353 );
354 return;
355 }
356 };
357 if let Err(err) = self
358 .config
359 .process_registry
360 .complete_process(process_id, output)
361 .await
362 {
363 tracing::warn!(
364 process_id = %process_id,
365 error = %err,
366 "failed to write recovered process terminal outcome",
367 );
368 }
369 self.release_or_log(&fenced).await;
370 }
371
372 async fn release_or_log(&self, lease: &ProcessLease) {
373 if let Err(err) = self.release_process_lease(lease).await {
374 tracing::warn!(
375 process_id = %lease.process_id,
376 error = %err,
377 "failed to release recovered process lease",
378 );
379 }
380 }
381
382 async fn run_process_with_lease_renewal(
386 &self,
387 registration: ProcessRegistration,
388 execution_context: ProcessExecutionContext,
389 mut lease: ProcessLease,
390 ) -> Result<ProcessAwaitOutput, RecoverFailure> {
391 let process_id = registration.id.clone();
392 let cancellation = CancellationToken::new();
393 let cancel_watcher = {
394 let registry = Arc::clone(&self.config.process_registry);
395 let process_id = process_id.clone();
396 let cancellation = cancellation.clone();
397 tokio::spawn(async move {
398 match registry
399 .wait_event_after(&process_id, "process.cancel_requested", 0)
400 .await
401 {
402 Ok(_) => cancellation.cancel(),
403 Err(err) => tracing::warn!(
404 process_id = %process_id,
405 error = %err,
406 "process cancel watcher stopped before observing cancellation",
407 ),
408 }
409 })
410 };
411 let pending = self.run_process(registration, execution_context, cancellation.clone());
412 tokio::pin!(pending);
413 loop {
414 tokio::select! {
415 outcome = &mut pending => {
416 cancel_watcher.abort();
417 return outcome.map_err(RecoverFailure::Run);
418 }
419 _ = tokio::time::sleep(process_lease_renew_interval()) => {
420 match self
421 .config
422 .process_registry
423 .renew_process_lease(&lease, RUNTIME_TURN_LEASE_TTL_MS)
424 .await
425 {
426 Ok(renewed) => lease = renewed,
427 Err(err) => {
428 cancellation.cancel();
429 cancel_watcher.abort();
430 return Err(RecoverFailure::LeaseLost(err));
431 }
432 }
433 }
434 }
435 }
436 }
437
438 async fn release_process_lease(&self, lease: &ProcessLease) -> Result<(), PluginError> {
439 self.config
440 .process_registry
441 .complete_process_lease(&ProcessLeaseCompletion::from_lease(lease))
442 .await
443 }
444
445 pub async fn request_process_cancel(
446 &self,
447 process_id: &str,
448 reason: Option<String>,
449 ) -> Result<(), PluginError> {
450 self.config
451 .process_registry
452 .append_event(
453 process_id,
454 crate::ProcessEventAppendRequest::cancel_requested(process_id, reason),
455 )
456 .await
457 .map(|_| ())
458 }
459
460 async fn runtime_for_registration(
461 &self,
462 registration: &ProcessRegistration,
463 ) -> Result<LashRuntime, PluginError> {
464 match registration.input.as_ref() {
465 ProcessInput::SessionTurn { create_request, .. } => {
466 self.runtime_for_session_turn(registration, create_request.as_ref())
467 .await
468 }
469 ProcessInput::ToolCall { .. } | ProcessInput::LashlangProcess { .. } => {
470 self.runtime_for_process_env(registration).await
471 }
472 ProcessInput::External { .. } => unreachable!("external processes short-circuit"),
473 }
474 }
475
476 async fn runtime_for_session_turn(
477 &self,
478 registration: &ProcessRegistration,
479 create_request: &crate::SessionCreateRequest,
480 ) -> Result<LashRuntime, PluginError> {
481 let mut policy = create_request
482 .policy
483 .clone()
484 .unwrap_or_else(|| self.config.session_policy.clone());
485 if policy.recorded_provider_id().is_empty() {
486 policy.provider_id = self.config.session_policy.provider_id.clone();
487 }
488 self.build_ephemeral_runtime(
489 format!("process-session-turn:{}", registration.id),
490 policy,
491 create_request.plugin_options.clone(),
492 "session turn request",
493 )
494 .await
495 }
496
497 async fn runtime_for_process_env(
498 &self,
499 registration: &ProcessRegistration,
500 ) -> Result<LashRuntime, PluginError> {
501 let Some(env_ref) = registration.env_ref.as_ref() else {
502 return Err(PluginError::Session(format!(
503 "process `{}` is missing a captured execution env",
504 registration.id
505 )));
506 };
507 let env = crate::load_process_execution_env(
508 self.config
509 .runtime_host
510 .durability
511 .lashlang_artifact_store
512 .as_ref(),
513 env_ref,
514 )
515 .await?;
516 self.build_ephemeral_runtime(
517 format!("process-env:{}", registration.id),
518 env.policy,
519 env.plugin_options,
520 env_ref.as_str(),
521 )
522 .await
523 }
524
525 async fn build_ephemeral_runtime(
526 &self,
527 session_id: String,
528 policy: crate::SessionPolicy,
529 plugin_options: crate::PluginOptions,
530 source_label: &str,
531 ) -> Result<LashRuntime, PluginError> {
532 let store = Arc::new(InMemorySessionStore::default());
533 EmbeddedRuntimeBuilder::new()
534 .with_session_id(session_id.to_string())
535 .with_plugin_host(self.config.plugin_host.as_ref().clone())
536 .with_runtime_host(self.config.runtime_host.clone())
537 .with_policy(policy)
538 .with_plugin_options(plugin_options)
539 .with_session_store_factory(Arc::clone(&self.config.session_store_factory))
540 .with_trigger_store(Arc::clone(&self.config.trigger_store))
541 .with_process_registry(Arc::clone(&self.config.process_registry))
542 .with_residency(self.config.residency)
543 .with_store(store)
544 .build()
545 .await
546 .map_err(|err| {
547 PluginError::Session(format!(
548 "failed to build process worker runtime for {source_label}: {err}"
549 ))
550 })
551 }
552
553 fn ensure_durable_store_facets(&self) -> Result<(), PluginError> {
562 if self
563 .config
564 .runtime_host
565 .control
566 .effect_host
567 .durability_tier()
568 != crate::DurabilityTier::Durable
569 {
570 return Ok(());
571 }
572 let require = |facet: crate::DurableStoreFacet| {
573 PluginError::Session(crate::RuntimeError::durable_store_required(facet).to_string())
574 };
575 if self
576 .config
577 .runtime_host
578 .durability
579 .attachment_store
580 .persistence()
581 .durability_tier()
582 != crate::DurabilityTier::Durable
583 {
584 return Err(require(crate::DurableStoreFacet::AttachmentStore));
585 }
586 if self
587 .config
588 .runtime_host
589 .durability
590 .lashlang_artifact_store
591 .durability_tier()
592 != crate::DurabilityTier::Durable
593 {
594 return Err(require(crate::DurableStoreFacet::ArtifactStore));
595 }
596 if self.config.session_store_factory.durability_tier() != crate::DurabilityTier::Durable {
597 return Err(require(crate::DurableStoreFacet::SessionStore));
598 }
599 if self.config.process_registry.durability_tier() != crate::DurabilityTier::Durable {
600 return Err(require(crate::DurableStoreFacet::ProcessRegistry));
601 }
602 if self.config.trigger_store.durability_tier() != crate::DurabilityTier::Durable {
603 return Err(require(crate::DurableStoreFacet::TriggerStore));
604 }
605 Ok(())
606 }
607
608 fn ensure_stable_process_id(
616 &self,
617 registration: &ProcessRegistration,
618 ) -> Result<(), PluginError> {
619 if registration.id.trim().is_empty() {
620 return Err(PluginError::Session(
621 crate::RuntimeError::missing_process_execution_id().to_string(),
622 ));
623 }
624 Ok(())
625 }
626}
627
628fn process_lease_renew_interval() -> Duration {
629 Duration::from_millis(process_lease_renew_interval_ms())
630}
631
632#[cfg(test)]
633fn process_lease_renew_interval_ms() -> u64 {
634 25
635}
636
637#[cfg(not(test))]
638fn process_lease_renew_interval_ms() -> u64 {
639 30_000
640}
641
642#[cfg(test)]
643mod boundary_tests {
644 use super::*;
645 use crate::{
646 AttachmentStore, AttachmentStoreError, AttachmentStorePersistence, DurabilityTier,
647 DurableStoreFacet, InMemoryAttachmentStore, LashlangArtifactStore, ProcessInput,
648 ProcessRegistration, RuntimeEffectController, RuntimeError, StoredAttachment, TriggerStore,
649 };
650 use lash_sansio::{AttachmentCreateMeta, AttachmentId, AttachmentRef};
651
652 #[derive(Default)]
655 struct DurableController;
656
657 #[async_trait::async_trait]
658 impl RuntimeEffectController for DurableController {
659 fn durability_tier(&self) -> DurabilityTier {
660 DurabilityTier::Durable
661 }
662
663 async fn execute_effect(
664 &self,
665 _envelope: crate::RuntimeEffectEnvelope,
666 _local_executor: crate::RuntimeEffectLocalExecutor<'_>,
667 ) -> Result<crate::RuntimeEffectOutcome, crate::RuntimeEffectControllerError> {
668 unreachable!("worker boundary rejects before executing any effect")
669 }
670 }
671
672 #[derive(Default)]
674 struct DurableAttachmentStore {
675 inner: InMemoryAttachmentStore,
676 }
677
678 #[async_trait::async_trait]
679 impl AttachmentStore for DurableAttachmentStore {
680 fn persistence(&self) -> AttachmentStorePersistence {
681 AttachmentStorePersistence::Durable
682 }
683
684 async fn put(
685 &self,
686 bytes: Vec<u8>,
687 meta: AttachmentCreateMeta,
688 ) -> Result<AttachmentRef, AttachmentStoreError> {
689 self.inner.put(bytes, meta).await
690 }
691
692 async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
693 self.inner.get(id).await
694 }
695 }
696
697 #[derive(Default)]
699 struct DurableArtifactStore {
700 inner: lashlang::InMemoryLashlangArtifactStore,
701 }
702
703 #[async_trait::async_trait]
704 impl LashlangArtifactStore for DurableArtifactStore {
705 fn durability_tier(&self) -> DurabilityTier {
706 DurabilityTier::Durable
707 }
708
709 async fn put_module_artifact(
710 &self,
711 artifact: &lashlang::ModuleArtifact,
712 ) -> Result<(), lashlang::ArtifactStoreError> {
713 self.inner.put_module_artifact(artifact).await
714 }
715
716 async fn get_module_artifact(
717 &self,
718 module_ref: &lashlang::ModuleRef,
719 ) -> Result<Option<Arc<lashlang::ModuleArtifact>>, lashlang::ArtifactStoreError> {
720 self.inner.get_module_artifact(module_ref).await
721 }
722
723 async fn put_artifact_bytes(
724 &self,
725 artifact_ref: &str,
726 descriptor: &str,
727 bytes: &[u8],
728 ) -> Result<(), lashlang::ArtifactStoreError> {
729 self.inner
730 .put_artifact_bytes(artifact_ref, descriptor, bytes)
731 .await
732 }
733
734 async fn get_artifact_bytes(
735 &self,
736 artifact_ref: &str,
737 ) -> Result<Option<Vec<u8>>, lashlang::ArtifactStoreError> {
738 self.inner.get_artifact_bytes(artifact_ref).await
739 }
740 }
741
742 struct TierSessionStoreFactory {
745 tier: DurabilityTier,
746 }
747
748 #[async_trait::async_trait]
749 impl SessionStoreFactory for TierSessionStoreFactory {
750 fn durability_tier(&self) -> DurabilityTier {
751 self.tier
752 }
753
754 async fn create_store(
755 &self,
756 _request: &crate::SessionStoreCreateRequest,
757 ) -> Result<Arc<dyn crate::RuntimePersistence>, String> {
758 unreachable!("worker boundary rejects before creating a session store")
759 }
760
761 async fn delete_session(&self, _session_id: &str) -> Result<(), String> {
762 Ok(())
763 }
764 }
765
766 struct TierTriggerStore {
767 tier: DurabilityTier,
768 inner: crate::InMemoryTriggerStore,
769 }
770
771 impl TierTriggerStore {
772 fn new(tier: DurabilityTier) -> Self {
773 Self {
774 tier,
775 inner: crate::InMemoryTriggerStore::default(),
776 }
777 }
778 }
779
780 #[async_trait::async_trait]
781 impl TriggerStore for TierTriggerStore {
782 fn durability_tier(&self) -> DurabilityTier {
783 self.tier
784 }
785
786 async fn register_subscription(
787 &self,
788 draft: crate::TriggerSubscriptionDraft,
789 ) -> Result<crate::TriggerSubscriptionRecord, PluginError> {
790 self.inner.register_subscription(draft).await
791 }
792
793 async fn list_subscriptions(
794 &self,
795 filter: crate::TriggerSubscriptionFilter,
796 ) -> Result<Vec<crate::TriggerSubscriptionRecord>, PluginError> {
797 self.inner.list_subscriptions(filter).await
798 }
799
800 async fn cancel_subscription(
801 &self,
802 session_id: &str,
803 handle: &str,
804 ) -> Result<bool, PluginError> {
805 self.inner.cancel_subscription(session_id, handle).await
806 }
807
808 async fn delete_session_subscriptions(
809 &self,
810 session_id: &str,
811 ) -> Result<usize, PluginError> {
812 self.inner.delete_session_subscriptions(session_id).await
813 }
814
815 async fn record_occurrence(
816 &self,
817 request: crate::TriggerOccurrenceRequest,
818 ) -> Result<crate::TriggerOccurrenceRecord, PluginError> {
819 self.inner.record_occurrence(request).await
820 }
821
822 async fn reserve_matching_deliveries(
823 &self,
824 occurrence_id: &str,
825 ) -> Result<Vec<crate::TriggerDeliveryReservation>, PluginError> {
826 self.inner.reserve_matching_deliveries(occurrence_id).await
827 }
828 }
829
830 fn worker(
834 attachment: Arc<dyn AttachmentStore>,
835 artifact: Arc<dyn LashlangArtifactStore>,
836 session_store_tier: DurabilityTier,
837 ) -> DurableProcessWorker {
838 worker_with_store_tiers(
839 attachment,
840 artifact,
841 session_store_tier,
842 DurabilityTier::Durable,
843 DurabilityTier::Durable,
844 )
845 }
846
847 fn worker_with_store_tiers(
848 attachment: Arc<dyn AttachmentStore>,
849 artifact: Arc<dyn LashlangArtifactStore>,
850 session_store_tier: DurabilityTier,
851 process_registry_tier: DurabilityTier,
852 trigger_store_tier: DurabilityTier,
853 ) -> DurableProcessWorker {
854 let mut runtime_host = RuntimeHostConfig::in_memory();
855 runtime_host.control.effect_host =
856 Arc::new(crate::InlineEffectHost::new(Arc::new(DurableController)));
857 runtime_host.durability.attachment_store = attachment;
858 runtime_host.durability.lashlang_artifact_store = artifact;
859 let plugin_host = Arc::new(crate::PluginHost::new(Vec::new()));
860 let factory: Arc<dyn SessionStoreFactory> = Arc::new(TierSessionStoreFactory {
861 tier: session_store_tier,
862 });
863 let registry: Arc<dyn ProcessRegistry> = Arc::new(
864 crate::TestLocalProcessRegistry::default().with_durability_tier(process_registry_tier),
865 );
866 let trigger_store: Arc<dyn TriggerStore> =
867 Arc::new(TierTriggerStore::new(trigger_store_tier));
868 DurableProcessWorker::new(
869 DurableProcessWorkerConfig::new(plugin_host, runtime_host, factory, registry)
870 .with_trigger_store(trigger_store),
871 )
872 }
873
874 fn external_registration() -> ProcessRegistration {
875 ProcessRegistration::new(
876 "worker-boundary-process",
877 ProcessInput::External {
878 metadata: serde_json::json!({}),
879 },
880 crate::ProcessProvenance::host(),
881 )
882 }
883
884 async fn run(worker: &DurableProcessWorker) -> Result<ProcessAwaitOutput, PluginError> {
885 worker
886 .run_process(
887 external_registration(),
888 ProcessExecutionContext::default(),
889 CancellationToken::new(),
890 )
891 .await
892 }
893
894 fn assert_facet(err: PluginError, facet: DurableStoreFacet) {
895 let PluginError::Session(message) = err else {
896 panic!("expected PluginError::Session, got {err:?}");
897 };
898 let expected = RuntimeError::durable_store_required(facet).to_string();
899 assert_eq!(message, expected, "worker must reject the {facet:?} facet");
900 }
901
902 #[tokio::test]
903 async fn durable_worker_rejects_ephemeral_attachment_store() {
904 let worker = worker(
905 Arc::new(InMemoryAttachmentStore::new()),
906 Arc::new(DurableArtifactStore::default()),
907 DurabilityTier::Durable,
908 );
909 let err = run(&worker)
910 .await
911 .expect_err("ephemeral attachment store must be rejected at the worker boundary");
912 assert_facet(err, DurableStoreFacet::AttachmentStore);
913 }
914
915 #[tokio::test]
916 async fn durable_worker_rejects_ephemeral_artifact_store() {
917 let worker = worker(
918 Arc::new(DurableAttachmentStore::default()),
919 Arc::new(lashlang::InMemoryLashlangArtifactStore::new()),
920 DurabilityTier::Durable,
921 );
922 let err = run(&worker)
923 .await
924 .expect_err("ephemeral artifact store must be rejected at the worker boundary");
925 assert_facet(err, DurableStoreFacet::ArtifactStore);
926 }
927
928 #[tokio::test]
929 async fn durable_worker_rejects_ephemeral_session_store_factory() {
930 let worker = worker(
931 Arc::new(DurableAttachmentStore::default()),
932 Arc::new(DurableArtifactStore::default()),
933 DurabilityTier::Inline,
934 );
935 let err = run(&worker)
936 .await
937 .expect_err("ephemeral session store factory must be rejected at the worker boundary");
938 assert_facet(err, DurableStoreFacet::SessionStore);
939 }
940
941 #[tokio::test]
942 async fn durable_worker_rejects_ephemeral_process_registry() {
943 let worker = worker_with_store_tiers(
944 Arc::new(DurableAttachmentStore::default()),
945 Arc::new(DurableArtifactStore::default()),
946 DurabilityTier::Durable,
947 DurabilityTier::Inline,
948 DurabilityTier::Durable,
949 );
950 let err = run(&worker)
951 .await
952 .expect_err("ephemeral process registry must be rejected at the worker boundary");
953 assert_facet(err, DurableStoreFacet::ProcessRegistry);
954 }
955
956 #[tokio::test]
957 async fn durable_worker_rejects_ephemeral_trigger_store() {
958 let worker = worker_with_store_tiers(
959 Arc::new(DurableAttachmentStore::default()),
960 Arc::new(DurableArtifactStore::default()),
961 DurabilityTier::Durable,
962 DurabilityTier::Durable,
963 DurabilityTier::Inline,
964 );
965 let err = run(&worker)
966 .await
967 .expect_err("ephemeral trigger store must be rejected at the worker boundary");
968 assert_facet(err, DurableStoreFacet::TriggerStore);
969 }
970
971 #[tokio::test]
972 async fn durable_worker_with_all_durable_stores_passes_store_facet_check() {
973 let worker = worker(
977 Arc::new(DurableAttachmentStore::default()),
978 Arc::new(DurableArtifactStore::default()),
979 DurabilityTier::Durable,
980 );
981 let output = run(&worker)
982 .await
983 .expect("all-durable worker should pass the store-facet guard");
984 assert!(matches!(output, ProcessAwaitOutput::Success { .. }));
985 }
986
987 #[tokio::test]
988 async fn inline_worker_passes_store_facet_check_with_ephemeral_stores() {
989 let runtime_host = RuntimeHostConfig::in_memory();
992 let plugin_host = Arc::new(crate::PluginHost::new(Vec::new()));
993 let factory: Arc<dyn SessionStoreFactory> = Arc::new(TierSessionStoreFactory {
994 tier: DurabilityTier::Inline,
995 });
996 let registry: Arc<dyn ProcessRegistry> =
997 Arc::new(crate::TestLocalProcessRegistry::default());
998 let worker = DurableProcessWorker::new(DurableProcessWorkerConfig::new(
999 plugin_host,
1000 runtime_host,
1001 factory,
1002 registry,
1003 ));
1004 let output = run(&worker)
1005 .await
1006 .expect("inline worker should pass the store-facet guard");
1007 assert!(matches!(output, ProcessAwaitOutput::Success { .. }));
1008 }
1009}