1use std::sync::Arc;
2use std::time::Duration;
3
4use tokio_util::sync::CancellationToken;
5
6use super::effect::ProcessRunner;
7use super::session_manager::RuntimeSessionServices;
8use super::{EmbeddedRuntimeBuilder, RUNTIME_TURN_LEASE_TTL_MS, RuntimeHostConfig};
9use crate::{
10 InMemorySessionStore, LashRuntime, PluginError, PluginFactory, PluginHost, PluginStack,
11 ProcessAwaitOutput, ProcessExecutionContext, ProcessInput, ProcessLease,
12 ProcessLeaseCompletion, ProcessRecord, ProcessRegistration, ProcessRegistry,
13 SessionStoreFactory,
14};
15
16#[derive(Clone)]
22pub struct DurableProcessWorkerConfig {
23 pub plugin_host: Arc<PluginHost>,
24 pub runtime_host: RuntimeHostConfig,
25 pub session_policy: crate::SessionPolicy,
26 pub session_store_factory: Arc<dyn SessionStoreFactory>,
27 pub process_registry: Arc<dyn ProcessRegistry>,
28 pub host_event_store: Arc<dyn crate::HostEventStore>,
29 pub residency: crate::Residency,
35}
36
37impl DurableProcessWorkerConfig {
38 pub fn new(
39 plugin_host: Arc<PluginHost>,
40 runtime_host: RuntimeHostConfig,
41 session_store_factory: Arc<dyn SessionStoreFactory>,
42 process_registry: Arc<dyn ProcessRegistry>,
43 ) -> Self {
44 Self {
45 plugin_host,
46 runtime_host,
47 session_policy: crate::SessionPolicy::default(),
48 session_store_factory,
49 process_registry,
50 host_event_store: Arc::new(crate::InMemoryHostEventStore::default()),
51 residency: crate::Residency::default(),
52 }
53 }
54
55 pub fn with_host_event_store(mut self, store: Arc<dyn crate::HostEventStore>) -> Self {
56 self.host_event_store = store;
57 self
58 }
59
60 pub fn with_session_policy(mut self, policy: crate::SessionPolicy) -> Self {
61 self.session_policy = policy;
62 self
63 }
64
65 pub fn with_residency(mut self, residency: crate::Residency) -> Self {
66 self.residency = residency;
67 self
68 }
69
70 pub fn from_plugin_factories(
71 plugin_factories: impl IntoIterator<Item = Arc<dyn PluginFactory>>,
72 runtime_host: RuntimeHostConfig,
73 session_store_factory: Arc<dyn SessionStoreFactory>,
74 process_registry: Arc<dyn ProcessRegistry>,
75 ) -> Self {
76 Self::new(
77 Arc::new(PluginHost::new(plugin_factories.into_iter().collect())),
78 runtime_host,
79 session_store_factory,
80 process_registry,
81 )
82 }
83
84 pub fn from_plugin_stack(
85 plugin_stack: PluginStack,
86 runtime_host: RuntimeHostConfig,
87 session_store_factory: Arc<dyn SessionStoreFactory>,
88 process_registry: Arc<dyn ProcessRegistry>,
89 ) -> Self {
90 Self::from_plugin_factories(
91 plugin_stack.into_factories(),
92 runtime_host,
93 session_store_factory,
94 process_registry,
95 )
96 }
97}
98
99#[derive(Clone)]
101pub struct DurableProcessWorker {
102 config: Arc<DurableProcessWorkerConfig>,
103}
104
105enum RecoverFailure {
107 LeaseLost(PluginError),
111 Run(PluginError),
114}
115
116impl DurableProcessWorker {
117 pub fn new(config: DurableProcessWorkerConfig) -> Self {
118 Self {
119 config: Arc::new(config),
120 }
121 }
122
123 pub fn from_shared_config(config: Arc<DurableProcessWorkerConfig>) -> Self {
124 Self { config }
125 }
126
127 pub fn config(&self) -> &DurableProcessWorkerConfig {
128 &self.config
129 }
130
131 pub async fn run_process(
132 &self,
133 registration: ProcessRegistration,
134 execution_context: ProcessExecutionContext,
135 cancellation: CancellationToken,
136 ) -> Result<ProcessAwaitOutput, PluginError> {
137 let scoped_effect_controller = self
138 .config
139 .runtime_host
140 .control
141 .effect_host
142 .scoped_static(crate::EffectScope::process(registration.id.clone()))
143 .map_err(|err| PluginError::Session(err.to_string()))?
144 .ok_or_else(|| {
145 PluginError::Session(
146 "process worker effect host must provide a static process scope".to_string(),
147 )
148 })?;
149 self.run_process_with_scoped_effect_controller(
150 registration,
151 execution_context,
152 scoped_effect_controller,
153 cancellation,
154 )
155 .await
156 }
157
158 pub async fn run_process_with_scoped_effect_controller(
159 &self,
160 registration: ProcessRegistration,
161 execution_context: ProcessExecutionContext,
162 scoped_effect_controller: crate::ScopedEffectController<'_>,
163 cancellation: CancellationToken,
164 ) -> Result<ProcessAwaitOutput, PluginError> {
165 self.ensure_stable_process_id(®istration)?;
166 self.ensure_host_profile_matches(®istration)?;
167 self.ensure_durable_store_facets()?;
168 if let ProcessInput::External { metadata } = registration.input.as_ref() {
169 return Ok(ProcessAwaitOutput::Success {
170 value: serde_json::json!({ "metadata": metadata.clone() }),
171 control: None,
172 });
173 }
174 let runtime = self.runtime_for_registration(®istration).await?;
175 let manager = RuntimeSessionServices::new(&runtime, true, None).map_err(|err| {
176 PluginError::Session(format!(
177 "failed to build runtime env for process `{}`: {err}",
178 registration.id
179 ))
180 })?;
181 Ok(manager
182 .run_process(
183 registration,
184 execution_context,
185 Arc::clone(&self.config.process_registry),
186 scoped_effect_controller,
187 cancellation,
188 )
189 .await)
190 }
191
192 pub async fn drive_pending_processes(&self) -> Result<(), PluginError> {
215 let records = self.config.process_registry.list_non_terminal().await?;
216 for record in records {
217 let worker = self.clone();
228 tokio::spawn(async move { worker.recover_process(record).await });
229 }
230 Ok(())
231 }
232
233 async fn recover_process(&self, record: ProcessRecord) {
234 let owner_id = format!("process-recovery-{}", uuid::Uuid::new_v4());
235 let process_id = record.id.clone();
236 let Ok(lease) = self
240 .config
241 .process_registry
242 .claim_process_lease(&process_id, &owner_id, RUNTIME_TURN_LEASE_TTL_MS)
243 .await
244 else {
245 return;
246 };
247 if self
250 .config
251 .process_registry
252 .get_process(&process_id)
253 .await
254 .is_some_and(|current| current.is_terminal())
255 {
256 self.release_or_log(&lease).await;
257 return;
258 }
259 let registration = ProcessRegistration {
260 id: record.id,
261 input: record.input,
262 event_types: record.event_types,
263 provenance: record.provenance.clone(),
264 env_ref: record.env_ref.clone(),
265 wake_target: record.wake_target.clone(),
266 };
267 let execution_context = ProcessExecutionContext::default();
268 match self
269 .run_process_with_lease_renewal(registration, execution_context, lease.clone())
270 .await
271 {
272 Ok(output) => self.complete_and_release(&lease, &process_id, output).await,
275 Err(RecoverFailure::LeaseLost(err)) => {
281 tracing::warn!(
282 process_id = %process_id,
283 error = %err,
284 "process recovery lost its lease mid-run; deferring to the new owner",
285 );
286 }
287 Err(RecoverFailure::Run(err)) => {
290 let output = ProcessAwaitOutput::Failure {
291 class: crate::ToolFailureClass::Execution,
292 code: "process_recovery_failed".to_string(),
293 message: err.to_string(),
294 raw: None,
295 control: None,
296 };
297 self.complete_and_release(&lease, &process_id, output).await;
298 }
299 }
300 }
301
302 async fn complete_and_release(
306 &self,
307 lease: &ProcessLease,
308 process_id: &str,
309 output: ProcessAwaitOutput,
310 ) {
311 let fenced = match self
318 .config
319 .process_registry
320 .renew_process_lease(lease, RUNTIME_TURN_LEASE_TTL_MS)
321 .await
322 {
323 Ok(renewed) => renewed,
324 Err(err) => {
325 tracing::warn!(
326 process_id = %process_id,
327 error = %err,
328 "lost process lease before terminal write; deferring to the new owner",
329 );
330 return;
331 }
332 };
333 if let Err(err) = self
334 .config
335 .process_registry
336 .complete_process(process_id, output)
337 .await
338 {
339 tracing::warn!(
340 process_id = %process_id,
341 error = %err,
342 "failed to write recovered process terminal outcome",
343 );
344 }
345 self.release_or_log(&fenced).await;
346 }
347
348 async fn release_or_log(&self, lease: &ProcessLease) {
349 if let Err(err) = self.release_process_lease(lease).await {
350 tracing::warn!(
351 process_id = %lease.process_id,
352 error = %err,
353 "failed to release recovered process lease",
354 );
355 }
356 }
357
358 async fn run_process_with_lease_renewal(
362 &self,
363 registration: ProcessRegistration,
364 execution_context: ProcessExecutionContext,
365 mut lease: ProcessLease,
366 ) -> Result<ProcessAwaitOutput, RecoverFailure> {
367 let process_id = registration.id.clone();
368 let cancellation = CancellationToken::new();
369 let cancel_watcher = {
370 let registry = Arc::clone(&self.config.process_registry);
371 let process_id = process_id.clone();
372 let cancellation = cancellation.clone();
373 tokio::spawn(async move {
374 match registry
375 .wait_event_after(&process_id, "process.cancel_requested", 0)
376 .await
377 {
378 Ok(_) => cancellation.cancel(),
379 Err(err) => tracing::warn!(
380 process_id = %process_id,
381 error = %err,
382 "process cancel watcher stopped before observing cancellation",
383 ),
384 }
385 })
386 };
387 let pending = self.run_process(registration, execution_context, cancellation.clone());
388 tokio::pin!(pending);
389 loop {
390 tokio::select! {
391 outcome = &mut pending => {
392 cancel_watcher.abort();
393 return outcome.map_err(RecoverFailure::Run);
394 }
395 _ = tokio::time::sleep(process_lease_renew_interval()) => {
396 match self
397 .config
398 .process_registry
399 .renew_process_lease(&lease, RUNTIME_TURN_LEASE_TTL_MS)
400 .await
401 {
402 Ok(renewed) => lease = renewed,
403 Err(err) => {
404 cancellation.cancel();
405 cancel_watcher.abort();
406 return Err(RecoverFailure::LeaseLost(err));
407 }
408 }
409 }
410 }
411 }
412 }
413
414 async fn release_process_lease(&self, lease: &ProcessLease) -> Result<(), PluginError> {
415 self.config
416 .process_registry
417 .complete_process_lease(&ProcessLeaseCompletion::from_lease(lease))
418 .await
419 }
420
421 pub async fn request_process_cancel(
422 &self,
423 process_id: &str,
424 reason: Option<String>,
425 ) -> Result<(), PluginError> {
426 self.config
427 .process_registry
428 .append_event(
429 process_id,
430 crate::ProcessEventAppendRequest::cancel_requested(process_id, reason),
431 )
432 .await
433 .map(|_| ())
434 }
435
436 async fn runtime_for_registration(
437 &self,
438 registration: &ProcessRegistration,
439 ) -> Result<LashRuntime, PluginError> {
440 match registration.input.as_ref() {
441 ProcessInput::SessionTurn { create_request, .. } => {
442 self.runtime_for_session_turn(registration, create_request.as_ref())
443 .await
444 }
445 ProcessInput::ToolCall { .. } | ProcessInput::LashlangProcess { .. } => {
446 self.runtime_for_process_env(registration).await
447 }
448 ProcessInput::External { .. } => unreachable!("external processes short-circuit"),
449 }
450 }
451
452 async fn runtime_for_session_turn(
453 &self,
454 registration: &ProcessRegistration,
455 create_request: &crate::SessionCreateRequest,
456 ) -> Result<LashRuntime, PluginError> {
457 let mut policy = create_request
458 .policy
459 .clone()
460 .unwrap_or_else(|| self.config.session_policy.clone());
461 if policy.recorded_provider_id().is_empty() {
462 policy.provider_id = self.config.session_policy.provider_id.clone();
463 }
464 self.build_ephemeral_runtime(
465 format!("process-session-turn:{}", registration.id),
466 policy,
467 create_request.plugin_options.clone(),
468 "session turn request",
469 )
470 .await
471 }
472
473 async fn runtime_for_process_env(
474 &self,
475 registration: &ProcessRegistration,
476 ) -> Result<LashRuntime, PluginError> {
477 let Some(env_ref) = registration.env_ref.as_ref() else {
478 return Err(PluginError::Session(format!(
479 "process `{}` is missing a captured execution env",
480 registration.id
481 )));
482 };
483 let env = crate::load_process_execution_env(
484 self.config
485 .runtime_host
486 .durability
487 .lashlang_artifact_store
488 .as_ref(),
489 env_ref,
490 )
491 .await?;
492 self.build_ephemeral_runtime(
493 format!("process-env:{}", registration.id),
494 env.policy,
495 env.plugin_options,
496 env_ref.as_str(),
497 )
498 .await
499 }
500
501 async fn build_ephemeral_runtime(
502 &self,
503 session_id: String,
504 policy: crate::SessionPolicy,
505 plugin_options: crate::PluginOptions,
506 source_label: &str,
507 ) -> Result<LashRuntime, PluginError> {
508 let store = Arc::new(InMemorySessionStore::default());
509 EmbeddedRuntimeBuilder::new()
510 .with_session_id(session_id.to_string())
511 .with_plugin_host(self.config.plugin_host.as_ref().clone())
512 .with_runtime_host(self.config.runtime_host.clone())
513 .with_policy(policy)
514 .with_plugin_options(plugin_options)
515 .with_session_store_factory(Arc::clone(&self.config.session_store_factory))
516 .with_host_event_store(Arc::clone(&self.config.host_event_store))
517 .with_process_registry(Arc::clone(&self.config.process_registry))
518 .with_residency(self.config.residency)
519 .with_store(store)
520 .build()
521 .await
522 .map_err(|err| {
523 PluginError::Session(format!(
524 "failed to build process worker runtime for {source_label}: {err}"
525 ))
526 })
527 }
528
529 fn ensure_durable_store_facets(&self) -> Result<(), PluginError> {
538 if self
539 .config
540 .runtime_host
541 .control
542 .effect_host
543 .durability_tier()
544 != crate::DurabilityTier::Durable
545 {
546 return Ok(());
547 }
548 let require = |facet: crate::DurableStoreFacet| {
549 PluginError::Session(crate::RuntimeError::durable_store_required(facet).to_string())
550 };
551 if self
552 .config
553 .runtime_host
554 .durability
555 .attachment_store
556 .persistence()
557 .durability_tier()
558 != crate::DurabilityTier::Durable
559 {
560 return Err(require(crate::DurableStoreFacet::AttachmentStore));
561 }
562 if self
563 .config
564 .runtime_host
565 .durability
566 .lashlang_artifact_store
567 .durability_tier()
568 != crate::DurabilityTier::Durable
569 {
570 return Err(require(crate::DurableStoreFacet::ArtifactStore));
571 }
572 if self.config.session_store_factory.durability_tier() != crate::DurabilityTier::Durable {
573 return Err(require(crate::DurableStoreFacet::SessionStore));
574 }
575 if self.config.process_registry.durability_tier() != crate::DurabilityTier::Durable {
576 return Err(require(crate::DurableStoreFacet::ProcessRegistry));
577 }
578 if self.config.host_event_store.durability_tier() != crate::DurabilityTier::Durable {
579 return Err(require(crate::DurableStoreFacet::HostEventStore));
580 }
581 Ok(())
582 }
583
584 fn ensure_stable_process_id(
592 &self,
593 registration: &ProcessRegistration,
594 ) -> Result<(), PluginError> {
595 if registration.id.trim().is_empty() {
596 return Err(PluginError::Session(
597 crate::RuntimeError::missing_process_execution_id().to_string(),
598 ));
599 }
600 Ok(())
601 }
602
603 fn ensure_host_profile_matches(
604 &self,
605 registration: &ProcessRegistration,
606 ) -> Result<(), PluginError> {
607 let actual = registration.provenance.host_profile_id.as_str();
608 let expected = self.config.runtime_host.profile.host_profile_id.as_str();
609 if actual.is_empty() || actual == expected {
610 return Ok(());
611 }
612 Err(PluginError::Session(format!(
613 "process `{}` was created for host profile `{actual}` but this worker is `{expected}`",
614 registration.id
615 )))
616 }
617}
618
619fn process_lease_renew_interval() -> Duration {
620 Duration::from_millis(process_lease_renew_interval_ms())
621}
622
623#[cfg(test)]
624fn process_lease_renew_interval_ms() -> u64 {
625 25
626}
627
628#[cfg(not(test))]
629fn process_lease_renew_interval_ms() -> u64 {
630 30_000
631}
632
633#[cfg(test)]
634mod boundary_tests {
635 use super::*;
636 use crate::{
637 AttachmentStore, AttachmentStoreError, AttachmentStorePersistence, DurabilityTier,
638 DurableStoreFacet, HostEventStore, InMemoryAttachmentStore, LashlangArtifactStore,
639 ProcessInput, ProcessRegistration, RuntimeEffectController, RuntimeError, StoredAttachment,
640 };
641 use lash_sansio::{AttachmentCreateMeta, AttachmentId, AttachmentRef};
642
643 #[derive(Default)]
646 struct DurableController;
647
648 #[async_trait::async_trait]
649 impl RuntimeEffectController for DurableController {
650 fn durability_tier(&self) -> DurabilityTier {
651 DurabilityTier::Durable
652 }
653
654 async fn execute_effect(
655 &self,
656 _envelope: crate::RuntimeEffectEnvelope,
657 _local_executor: crate::RuntimeEffectLocalExecutor<'_>,
658 ) -> Result<crate::RuntimeEffectOutcome, crate::RuntimeEffectControllerError> {
659 unreachable!("worker boundary rejects before executing any effect")
660 }
661 }
662
663 #[derive(Default)]
665 struct DurableAttachmentStore {
666 inner: InMemoryAttachmentStore,
667 }
668
669 #[async_trait::async_trait]
670 impl AttachmentStore for DurableAttachmentStore {
671 fn persistence(&self) -> AttachmentStorePersistence {
672 AttachmentStorePersistence::Durable
673 }
674
675 async fn put(
676 &self,
677 bytes: Vec<u8>,
678 meta: AttachmentCreateMeta,
679 ) -> Result<AttachmentRef, AttachmentStoreError> {
680 self.inner.put(bytes, meta).await
681 }
682
683 async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
684 self.inner.get(id).await
685 }
686 }
687
688 #[derive(Default)]
690 struct DurableArtifactStore {
691 inner: lashlang::InMemoryLashlangArtifactStore,
692 }
693
694 #[async_trait::async_trait]
695 impl LashlangArtifactStore for DurableArtifactStore {
696 fn durability_tier(&self) -> DurabilityTier {
697 DurabilityTier::Durable
698 }
699
700 async fn put_module_artifact(
701 &self,
702 artifact: &lashlang::ModuleArtifact,
703 ) -> Result<(), lashlang::ArtifactStoreError> {
704 self.inner.put_module_artifact(artifact).await
705 }
706
707 async fn get_module_artifact(
708 &self,
709 module_ref: &lashlang::ModuleRef,
710 ) -> Result<Option<Arc<lashlang::ModuleArtifact>>, lashlang::ArtifactStoreError> {
711 self.inner.get_module_artifact(module_ref).await
712 }
713
714 async fn put_artifact_bytes(
715 &self,
716 artifact_ref: &str,
717 descriptor: &str,
718 bytes: &[u8],
719 ) -> Result<(), lashlang::ArtifactStoreError> {
720 self.inner
721 .put_artifact_bytes(artifact_ref, descriptor, bytes)
722 .await
723 }
724
725 async fn get_artifact_bytes(
726 &self,
727 artifact_ref: &str,
728 ) -> Result<Option<Vec<u8>>, lashlang::ArtifactStoreError> {
729 self.inner.get_artifact_bytes(artifact_ref).await
730 }
731 }
732
733 struct TierSessionStoreFactory {
736 tier: DurabilityTier,
737 }
738
739 #[async_trait::async_trait]
740 impl SessionStoreFactory for TierSessionStoreFactory {
741 fn durability_tier(&self) -> DurabilityTier {
742 self.tier
743 }
744
745 async fn create_store(
746 &self,
747 _request: &crate::SessionStoreCreateRequest,
748 ) -> Result<Arc<dyn crate::RuntimePersistence>, String> {
749 unreachable!("worker boundary rejects before creating a session store")
750 }
751
752 async fn delete_session(&self, _session_id: &str) -> Result<(), String> {
753 Ok(())
754 }
755 }
756
757 struct TierHostEventStore {
758 tier: DurabilityTier,
759 inner: crate::InMemoryHostEventStore,
760 }
761
762 impl TierHostEventStore {
763 fn new(tier: DurabilityTier) -> Self {
764 Self {
765 tier,
766 inner: crate::InMemoryHostEventStore::default(),
767 }
768 }
769 }
770
771 #[async_trait::async_trait]
772 impl HostEventStore for TierHostEventStore {
773 fn durability_tier(&self) -> DurabilityTier {
774 self.tier
775 }
776
777 async fn register_subscription(
778 &self,
779 draft: crate::TriggerSubscriptionDraft,
780 ) -> Result<crate::TriggerSubscriptionRecord, PluginError> {
781 self.inner.register_subscription(draft).await
782 }
783
784 async fn list_subscriptions(
785 &self,
786 filter: crate::TriggerSubscriptionFilter,
787 ) -> Result<Vec<crate::TriggerSubscriptionRecord>, PluginError> {
788 self.inner.list_subscriptions(filter).await
789 }
790
791 async fn cancel_subscription(
792 &self,
793 session_id: &str,
794 handle: &str,
795 ) -> Result<bool, PluginError> {
796 self.inner.cancel_subscription(session_id, handle).await
797 }
798
799 async fn delete_session_subscriptions(
800 &self,
801 session_id: &str,
802 ) -> Result<usize, PluginError> {
803 self.inner.delete_session_subscriptions(session_id).await
804 }
805
806 async fn record_occurrence(
807 &self,
808 request: crate::HostEventOccurrenceRequest,
809 ) -> Result<crate::HostEventOccurrenceRecord, PluginError> {
810 self.inner.record_occurrence(request).await
811 }
812
813 async fn reserve_matching_deliveries(
814 &self,
815 occurrence_id: &str,
816 ) -> Result<Vec<crate::TriggerDeliveryReservation>, PluginError> {
817 self.inner.reserve_matching_deliveries(occurrence_id).await
818 }
819 }
820
821 fn worker(
825 attachment: Arc<dyn AttachmentStore>,
826 artifact: Arc<dyn LashlangArtifactStore>,
827 session_store_tier: DurabilityTier,
828 ) -> DurableProcessWorker {
829 worker_with_store_tiers(
830 attachment,
831 artifact,
832 session_store_tier,
833 DurabilityTier::Durable,
834 DurabilityTier::Durable,
835 )
836 }
837
838 fn worker_with_store_tiers(
839 attachment: Arc<dyn AttachmentStore>,
840 artifact: Arc<dyn LashlangArtifactStore>,
841 session_store_tier: DurabilityTier,
842 process_registry_tier: DurabilityTier,
843 host_event_store_tier: DurabilityTier,
844 ) -> DurableProcessWorker {
845 let mut runtime_host = RuntimeHostConfig::in_memory();
846 runtime_host.control.effect_host =
847 Arc::new(crate::InlineEffectHost::new(Arc::new(DurableController)));
848 runtime_host.durability.attachment_store = attachment;
849 runtime_host.durability.lashlang_artifact_store = artifact;
850 let plugin_host = Arc::new(crate::PluginHost::new(Vec::new()));
851 let factory: Arc<dyn SessionStoreFactory> = Arc::new(TierSessionStoreFactory {
852 tier: session_store_tier,
853 });
854 let registry: Arc<dyn ProcessRegistry> = Arc::new(
855 crate::TestLocalProcessRegistry::default().with_durability_tier(process_registry_tier),
856 );
857 let host_event_store: Arc<dyn HostEventStore> =
858 Arc::new(TierHostEventStore::new(host_event_store_tier));
859 DurableProcessWorker::new(
860 DurableProcessWorkerConfig::new(plugin_host, runtime_host, factory, registry)
861 .with_host_event_store(host_event_store),
862 )
863 }
864
865 fn external_registration() -> ProcessRegistration {
866 ProcessRegistration::new(
867 "worker-boundary-process",
868 ProcessInput::External {
869 metadata: serde_json::json!({}),
870 },
871 crate::ProcessProvenance::host("default"),
872 )
873 }
874
875 async fn run(worker: &DurableProcessWorker) -> Result<ProcessAwaitOutput, PluginError> {
876 worker
877 .run_process(
878 external_registration(),
879 ProcessExecutionContext::default(),
880 CancellationToken::new(),
881 )
882 .await
883 }
884
885 fn assert_facet(err: PluginError, facet: DurableStoreFacet) {
886 let PluginError::Session(message) = err else {
887 panic!("expected PluginError::Session, got {err:?}");
888 };
889 let expected = RuntimeError::durable_store_required(facet).to_string();
890 assert_eq!(message, expected, "worker must reject the {facet:?} facet");
891 }
892
893 #[tokio::test]
894 async fn durable_worker_rejects_ephemeral_attachment_store() {
895 let worker = worker(
896 Arc::new(InMemoryAttachmentStore::new()),
897 Arc::new(DurableArtifactStore::default()),
898 DurabilityTier::Durable,
899 );
900 let err = run(&worker)
901 .await
902 .expect_err("ephemeral attachment store must be rejected at the worker boundary");
903 assert_facet(err, DurableStoreFacet::AttachmentStore);
904 }
905
906 #[tokio::test]
907 async fn durable_worker_rejects_ephemeral_artifact_store() {
908 let worker = worker(
909 Arc::new(DurableAttachmentStore::default()),
910 Arc::new(lashlang::InMemoryLashlangArtifactStore::new()),
911 DurabilityTier::Durable,
912 );
913 let err = run(&worker)
914 .await
915 .expect_err("ephemeral artifact store must be rejected at the worker boundary");
916 assert_facet(err, DurableStoreFacet::ArtifactStore);
917 }
918
919 #[tokio::test]
920 async fn durable_worker_rejects_ephemeral_session_store_factory() {
921 let worker = worker(
922 Arc::new(DurableAttachmentStore::default()),
923 Arc::new(DurableArtifactStore::default()),
924 DurabilityTier::Inline,
925 );
926 let err = run(&worker)
927 .await
928 .expect_err("ephemeral session store factory must be rejected at the worker boundary");
929 assert_facet(err, DurableStoreFacet::SessionStore);
930 }
931
932 #[tokio::test]
933 async fn durable_worker_rejects_ephemeral_process_registry() {
934 let worker = worker_with_store_tiers(
935 Arc::new(DurableAttachmentStore::default()),
936 Arc::new(DurableArtifactStore::default()),
937 DurabilityTier::Durable,
938 DurabilityTier::Inline,
939 DurabilityTier::Durable,
940 );
941 let err = run(&worker)
942 .await
943 .expect_err("ephemeral process registry must be rejected at the worker boundary");
944 assert_facet(err, DurableStoreFacet::ProcessRegistry);
945 }
946
947 #[tokio::test]
948 async fn durable_worker_rejects_ephemeral_host_event_store() {
949 let worker = worker_with_store_tiers(
950 Arc::new(DurableAttachmentStore::default()),
951 Arc::new(DurableArtifactStore::default()),
952 DurabilityTier::Durable,
953 DurabilityTier::Durable,
954 DurabilityTier::Inline,
955 );
956 let err = run(&worker)
957 .await
958 .expect_err("ephemeral host event store must be rejected at the worker boundary");
959 assert_facet(err, DurableStoreFacet::HostEventStore);
960 }
961
962 #[tokio::test]
963 async fn durable_worker_with_all_durable_stores_passes_store_facet_check() {
964 let worker = worker(
968 Arc::new(DurableAttachmentStore::default()),
969 Arc::new(DurableArtifactStore::default()),
970 DurabilityTier::Durable,
971 );
972 let output = run(&worker)
973 .await
974 .expect("all-durable worker should pass the store-facet guard");
975 assert!(matches!(output, ProcessAwaitOutput::Success { .. }));
976 }
977
978 #[tokio::test]
979 async fn inline_worker_passes_store_facet_check_with_ephemeral_stores() {
980 let runtime_host = RuntimeHostConfig::in_memory();
983 let plugin_host = Arc::new(crate::PluginHost::new(Vec::new()));
984 let factory: Arc<dyn SessionStoreFactory> = Arc::new(TierSessionStoreFactory {
985 tier: DurabilityTier::Inline,
986 });
987 let registry: Arc<dyn ProcessRegistry> =
988 Arc::new(crate::TestLocalProcessRegistry::default());
989 let worker = DurableProcessWorker::new(DurableProcessWorkerConfig::new(
990 plugin_host,
991 runtime_host,
992 factory,
993 registry,
994 ));
995 let output = run(&worker)
996 .await
997 .expect("inline worker should pass the store-facet guard");
998 assert!(matches!(output, ProcessAwaitOutput::Success { .. }));
999 }
1000}