1use std::sync::Arc;
2
3use tokio_util::sync::CancellationToken;
4
5use super::effect::ProcessRunner;
6use super::session_manager::RuntimeSessionServices;
7use super::{EmbeddedRuntimeBuilder, ProcessWorkDriver, QueuedWorkDriver, RuntimeHostConfig};
8use crate::{
9 InMemorySessionStore, LashRuntime, PluginError, PluginFactory, PluginHost, PluginStack,
10 ProcessAwaitOutput, ProcessExecutionContext, ProcessInput, ProcessLease,
11 ProcessLeaseCompletion, ProcessRecord, ProcessRegistration, ProcessRegistry,
12 SessionStoreFactory,
13};
14
15#[derive(Clone)]
21pub struct DurableProcessWorkerConfig {
22 pub plugin_host: Arc<PluginHost>,
23 pub runtime_host: RuntimeHostConfig,
24 pub session_policy: crate::SessionPolicy,
25 pub session_store_factory: Arc<dyn SessionStoreFactory>,
26 pub process_registry: Arc<dyn ProcessRegistry>,
27 pub process_change_hub: Option<crate::ProcessChangeHub>,
28 pub trigger_store: Arc<dyn crate::TriggerStore>,
29 pub process_work_driver: Option<ProcessWorkDriver>,
30 pub queued_work_driver: Option<QueuedWorkDriver>,
31 #[doc(hidden)]
32 pub turn_phase_probe_slot: crate::runtime::RuntimeTurnPhaseProbeSlot,
33 pub residency: crate::Residency,
39 pub lease_owner: crate::LeaseOwnerIdentity,
52}
53
54impl DurableProcessWorkerConfig {
55 pub fn new(
56 plugin_host: Arc<PluginHost>,
57 runtime_host: RuntimeHostConfig,
58 session_store_factory: Arc<dyn SessionStoreFactory>,
59 process_registry: Arc<dyn ProcessRegistry>,
60 ) -> Self {
61 let clock = Arc::clone(&runtime_host.clock);
62 Self {
63 plugin_host,
64 runtime_host,
65 session_policy: crate::SessionPolicy::default(),
66 session_store_factory,
67 process_registry,
68 process_change_hub: None,
69 trigger_store: Arc::new(crate::InMemoryTriggerStore::with_clock(clock)),
70 process_work_driver: None,
71 queued_work_driver: None,
72 turn_phase_probe_slot: crate::runtime::RuntimeTurnPhaseProbeSlot::default(),
73 residency: crate::Residency::default(),
74 lease_owner: crate::LeaseOwnerIdentity::opaque(
75 format!("durable-process-worker:{}", uuid::Uuid::new_v4()),
76 uuid::Uuid::new_v4().to_string(),
77 ),
78 }
79 }
80
81 pub fn with_trigger_store(mut self, store: Arc<dyn crate::TriggerStore>) -> Self {
82 self.trigger_store = store;
83 self
84 }
85
86 pub fn with_session_policy(mut self, policy: crate::SessionPolicy) -> Self {
87 self.session_policy = policy;
88 self
89 }
90
91 pub fn with_residency(mut self, residency: crate::Residency) -> Self {
92 self.residency = residency;
93 self
94 }
95
96 pub fn with_process_work_driver(mut self, driver: ProcessWorkDriver) -> Self {
97 self.process_work_driver = Some(driver);
98 self
99 }
100
101 pub fn with_change_hub(mut self, hub: crate::ProcessChangeHub) -> Self {
102 self.process_change_hub = Some(hub);
103 self
104 }
105
106 pub fn with_lease_owner(mut self, lease_owner: crate::LeaseOwnerIdentity) -> Self {
109 self.lease_owner = lease_owner;
110 self
111 }
112
113 pub fn with_queued_work_driver(mut self, driver: QueuedWorkDriver) -> Self {
114 self.queued_work_driver = Some(driver);
115 self
116 }
117
118 #[doc(hidden)]
119 pub fn with_turn_phase_probe_slot(
120 mut self,
121 slot: crate::runtime::RuntimeTurnPhaseProbeSlot,
122 ) -> Self {
123 self.turn_phase_probe_slot = slot;
124 self
125 }
126
127 pub fn from_plugin_factories(
128 plugin_factories: impl IntoIterator<Item = Arc<dyn PluginFactory>>,
129 runtime_host: RuntimeHostConfig,
130 session_store_factory: Arc<dyn SessionStoreFactory>,
131 process_registry: Arc<dyn ProcessRegistry>,
132 ) -> Self {
133 Self::new(
134 Arc::new(PluginHost::new(plugin_factories.into_iter().collect())),
135 runtime_host,
136 session_store_factory,
137 process_registry,
138 )
139 }
140
141 pub fn from_plugin_stack(
142 plugin_stack: PluginStack,
143 runtime_host: RuntimeHostConfig,
144 session_store_factory: Arc<dyn SessionStoreFactory>,
145 process_registry: Arc<dyn ProcessRegistry>,
146 ) -> Self {
147 Self::from_plugin_factories(
148 plugin_stack.into_factories(),
149 runtime_host,
150 session_store_factory,
151 process_registry,
152 )
153 }
154}
155
156#[derive(Clone)]
158pub struct DurableProcessWorker {
159 config: Arc<DurableProcessWorkerConfig>,
160}
161
162enum RecoverFailure {
164 LeaseLost(PluginError),
168 Run(PluginError),
171}
172
173impl DurableProcessWorker {
174 pub fn new(config: DurableProcessWorkerConfig) -> Self {
175 Self {
176 config: Arc::new(config),
177 }
178 }
179
180 pub fn from_shared_config(config: Arc<DurableProcessWorkerConfig>) -> Self {
181 Self { config }
182 }
183
184 pub fn config(&self) -> &DurableProcessWorkerConfig {
185 &self.config
186 }
187
188 pub async fn run_process(
189 &self,
190 registration: ProcessRegistration,
191 execution_context: ProcessExecutionContext,
192 cancellation: CancellationToken,
193 ) -> Result<ProcessAwaitOutput, PluginError> {
194 let scoped_effect_controller = self
195 .config
196 .runtime_host
197 .control
198 .effect_host
199 .scoped_static(crate::ExecutionScope::process(registration.id.clone()))
200 .map_err(|err| PluginError::Session(err.to_string()))?
201 .ok_or_else(|| {
202 PluginError::Session(
203 "process worker effect host must provide a static process scope".to_string(),
204 )
205 })?;
206 self.run_process_with_scoped_effect_controller(
207 registration,
208 execution_context,
209 scoped_effect_controller,
210 cancellation,
211 )
212 .await
213 }
214
215 pub async fn run_process_with_scoped_effect_controller(
216 &self,
217 registration: ProcessRegistration,
218 execution_context: ProcessExecutionContext,
219 scoped_effect_controller: crate::ScopedEffectController<'_>,
220 cancellation: CancellationToken,
221 ) -> Result<ProcessAwaitOutput, PluginError> {
222 self.ensure_stable_process_id(®istration)?;
223 self.ensure_durable_store_facets()?;
224 if let ProcessInput::External { metadata } = registration.input.as_ref() {
225 return Ok(ProcessAwaitOutput::Success {
226 value: serde_json::json!({ "metadata": metadata.clone() }),
227 control: None,
228 });
229 }
230 let mut runtime = self.runtime_for_registration(®istration).await?;
231 let originator_scope = if let crate::ProcessOriginator::Session { scope } =
232 ®istration.provenance.originator
233 {
234 Some(scope)
235 } else {
236 None
237 };
238 let probe_scope = registration.wake_target.as_ref().or(originator_scope);
239 if let Some(probe) =
240 probe_scope.and_then(|scope| self.config.turn_phase_probe_slot.get_for_scope(scope))
241 {
242 runtime.set_turn_phase_probe(probe);
243 }
244 let manager = RuntimeSessionServices::new(&runtime, true, None).map_err(|err| {
245 PluginError::Session(format!(
246 "failed to build runtime env for process `{}`: {err}",
247 registration.id
248 ))
249 })?;
250 Ok(manager
251 .run_process(
252 registration,
253 execution_context,
254 Arc::clone(&self.config.process_registry),
255 scoped_effect_controller,
256 cancellation,
257 )
258 .await)
259 }
260
261 pub async fn drive_pending_processes(&self) -> Result<(), PluginError> {
287 let records = self.config.process_registry.list_non_terminal().await?;
288 for record in records {
289 let worker = self.clone();
300 tokio::spawn(async move { worker.recover_process(record).await });
301 }
302 Ok(())
303 }
304
305 fn recovery_lease_owner(&self) -> crate::LeaseOwnerIdentity {
313 let attempt = uuid::Uuid::new_v4();
314 crate::LeaseOwnerIdentity {
315 owner_id: format!("{}:recovery:{attempt}", self.config.lease_owner.owner_id),
316 incarnation_id: attempt.to_string(),
317 liveness: self.config.lease_owner.liveness.clone(),
318 }
319 }
320
321 async fn recover_process(&self, record: ProcessRecord) {
322 let process_id = record.id.clone();
323 let lease_ttl_ms = self.lease_timings().ttl_ms();
324 let owner = &self.recovery_lease_owner();
325 let lease = match self
333 .config
334 .process_registry
335 .claim_process_lease(&process_id, owner, lease_ttl_ms)
336 .await
337 {
338 Ok(crate::ProcessLeaseClaimOutcome::Acquired(lease)) => lease,
339 Ok(crate::ProcessLeaseClaimOutcome::Busy { holder })
340 if holder.owner.is_definitely_dead_for_claimant(owner) =>
341 {
342 match self
343 .config
344 .process_registry
345 .reclaim_process_lease(&process_id, owner, &holder, lease_ttl_ms)
346 .await
347 {
348 Ok(crate::ProcessLeaseClaimOutcome::Acquired(lease)) => lease,
349 Ok(crate::ProcessLeaseClaimOutcome::Busy { .. }) | Err(_) => return,
350 }
351 }
352 Ok(crate::ProcessLeaseClaimOutcome::Busy { .. }) | Err(_) => return,
353 };
354 if self
357 .config
358 .process_registry
359 .get_process(&process_id)
360 .await
361 .is_some_and(|current| current.is_terminal())
362 {
363 self.release_or_log(&lease).await;
364 return;
365 }
366 let registration = ProcessRegistration {
367 id: record.id,
368 input: record.input,
369 identity: record.identity,
370 event_types: record.event_types,
371 provenance: record.provenance.clone(),
372 env_ref: record.env_ref.clone(),
373 wake_target: record.wake_target.clone(),
374 };
375 let execution_context = ProcessExecutionContext::default();
376 match self
377 .run_process_with_lease_renewal(registration, execution_context, lease.clone())
378 .await
379 {
380 Ok(output) => self.complete_and_release(&lease, &process_id, output).await,
383 Err(RecoverFailure::LeaseLost(err)) => {
389 tracing::warn!(
390 process_id = %process_id,
391 error = %err,
392 "process recovery lost its lease mid-run; deferring to the new owner",
393 );
394 }
395 Err(RecoverFailure::Run(err)) => {
398 let output = ProcessAwaitOutput::Failure {
399 class: crate::ToolFailureClass::Execution,
400 code: "process_recovery_failed".to_string(),
401 message: err.to_string(),
402 raw: None,
403 control: None,
404 };
405 self.complete_and_release(&lease, &process_id, output).await;
406 }
407 }
408 }
409
410 async fn complete_and_release(
414 &self,
415 lease: &ProcessLease,
416 process_id: &str,
417 output: ProcessAwaitOutput,
418 ) {
419 let fenced = match self
426 .config
427 .process_registry
428 .renew_process_lease(lease, self.lease_timings().ttl_ms())
429 .await
430 {
431 Ok(renewed) => renewed,
432 Err(err) => {
433 tracing::warn!(
434 process_id = %process_id,
435 error = %err,
436 "lost process lease before terminal write; deferring to the new owner",
437 );
438 return;
439 }
440 };
441 if let Err(err) = self
442 .config
443 .process_registry
444 .complete_process(process_id, output)
445 .await
446 {
447 tracing::warn!(
448 process_id = %process_id,
449 error = %err,
450 "failed to write recovered process terminal outcome",
451 );
452 }
453 self.release_or_log(&fenced).await;
454 }
455
456 async fn release_or_log(&self, lease: &ProcessLease) {
457 if let Err(err) = self.release_process_lease(lease).await {
458 tracing::warn!(
459 process_id = %lease.process_id,
460 error = %err,
461 "failed to release recovered process lease",
462 );
463 }
464 }
465
466 async fn run_process_with_lease_renewal(
470 &self,
471 registration: ProcessRegistration,
472 execution_context: ProcessExecutionContext,
473 mut lease: ProcessLease,
474 ) -> Result<ProcessAwaitOutput, RecoverFailure> {
475 let process_id = registration.id.clone();
476 let cancellation = CancellationToken::new();
477 let cancel_watcher = {
478 let awaiter = self
479 .config
480 .process_change_hub
481 .clone()
482 .map(|hub| {
483 crate::ProcessAwaiter::new(Arc::clone(&self.config.process_registry), hub)
484 })
485 .unwrap_or_else(|| {
486 crate::ProcessAwaiter::polling(Arc::clone(&self.config.process_registry))
487 });
488 let process_id = process_id.clone();
489 let cancellation = cancellation.clone();
490 tokio::spawn(async move {
491 match awaiter
492 .await_event(&process_id, "process.cancel_requested", 0)
493 .await
494 {
495 Ok(_) => cancellation.cancel(),
496 Err(err) => tracing::warn!(
497 process_id = %process_id,
498 error = %err,
499 "process cancel watcher stopped before observing cancellation",
500 ),
501 }
502 })
503 };
504 let pending = self.run_process(registration, execution_context, cancellation.clone());
505 tokio::pin!(pending);
506 loop {
507 tokio::select! {
508 outcome = &mut pending => {
509 cancel_watcher.abort();
510 return outcome.map_err(RecoverFailure::Run);
511 }
512 _ = self.config.runtime_host.clock.sleep(self.lease_timings().renew_interval()) => {
513 match self
514 .config
515 .process_registry
516 .renew_process_lease(&lease, self.lease_timings().ttl_ms())
517 .await
518 {
519 Ok(renewed) => lease = renewed,
520 Err(err) => {
521 cancellation.cancel();
522 cancel_watcher.abort();
523 return Err(RecoverFailure::LeaseLost(err));
524 }
525 }
526 }
527 }
528 }
529 }
530
531 fn lease_timings(&self) -> crate::LeaseTimings {
532 self.config.runtime_host.control.lease_timings
533 }
534
535 async fn release_process_lease(&self, lease: &ProcessLease) -> Result<(), PluginError> {
536 self.config
537 .process_registry
538 .complete_process_lease(&ProcessLeaseCompletion::from_lease(lease))
539 .await
540 }
541
542 pub async fn request_process_cancel(
543 &self,
544 process_id: &str,
545 reason: Option<String>,
546 ) -> Result<(), PluginError> {
547 self.config
548 .process_registry
549 .append_event(
550 process_id,
551 crate::ProcessEventAppendRequest::cancel_requested(process_id, reason),
552 )
553 .await
554 .map(|_| ())
555 }
556
557 async fn runtime_for_registration(
558 &self,
559 registration: &ProcessRegistration,
560 ) -> Result<LashRuntime, PluginError> {
561 match registration.input.as_ref() {
562 ProcessInput::SessionTurn { create_request, .. } => {
563 self.runtime_for_session_turn(registration, create_request.as_ref())
564 .await
565 }
566 ProcessInput::ToolCall { .. } | ProcessInput::Engine { .. } => {
567 self.runtime_for_process_env(registration).await
568 }
569 ProcessInput::External { .. } => unreachable!("external processes short-circuit"),
570 }
571 }
572
573 async fn runtime_for_session_turn(
574 &self,
575 registration: &ProcessRegistration,
576 create_request: &crate::SessionCreateRequest,
577 ) -> Result<LashRuntime, PluginError> {
578 let mut policy = create_request
579 .policy
580 .clone()
581 .unwrap_or_else(|| self.config.session_policy.clone());
582 if policy.recorded_provider_id().is_empty() {
583 policy.provider_id = self.config.session_policy.provider_id.clone();
584 }
585 self.build_ephemeral_runtime(
586 format!("process-session-turn:{}", registration.id),
587 policy,
588 create_request.plugin_options.clone(),
589 "session turn request",
590 )
591 .await
592 }
593
594 async fn runtime_for_process_env(
595 &self,
596 registration: &ProcessRegistration,
597 ) -> Result<LashRuntime, PluginError> {
598 let Some(env_ref) = registration.env_ref.as_ref() else {
599 return Err(PluginError::Session(format!(
600 "process `{}` is missing a captured execution env",
601 registration.id
602 )));
603 };
604 let env = crate::load_process_execution_env(
605 self.config
606 .runtime_host
607 .durability
608 .process_env_store
609 .as_ref(),
610 env_ref,
611 )
612 .await?;
613 self.build_ephemeral_runtime(
614 format!("process-env:{}", registration.id),
615 env.policy,
616 env.plugin_options,
617 env_ref.as_str(),
618 )
619 .await
620 }
621
622 async fn build_ephemeral_runtime(
623 &self,
624 session_id: String,
625 policy: crate::SessionPolicy,
626 plugin_options: crate::PluginOptions,
627 source_label: &str,
628 ) -> Result<LashRuntime, PluginError> {
629 let store = Arc::new(InMemorySessionStore::default());
630 let process_work_driver = self.config.process_work_driver.clone().unwrap_or_else(|| {
631 if let Some(hub) = self.config.process_change_hub.clone() {
632 ProcessWorkDriver::from_watched(
633 Arc::clone(&self.config.process_registry),
634 hub,
635 Arc::new(crate::InlineProcessRunHandle::new(self.clone())),
636 )
637 } else {
638 ProcessWorkDriver::inline(Arc::clone(&self.config.process_registry), self.clone())
639 }
640 });
641 let mut builder = EmbeddedRuntimeBuilder::new()
642 .with_session_id(session_id.to_string())
643 .with_plugin_host(self.config.plugin_host.as_ref().clone())
644 .with_runtime_host(self.config.runtime_host.clone())
645 .with_policy(policy)
646 .with_plugin_options(plugin_options)
647 .with_session_store_factory(Arc::clone(&self.config.session_store_factory))
648 .with_trigger_store(Arc::clone(&self.config.trigger_store))
649 .with_process_registry(Arc::clone(&self.config.process_registry))
650 .with_process_work_driver(process_work_driver)
651 .with_residency(self.config.residency)
652 .with_store(store);
653 if let Some(driver) = self.config.queued_work_driver.clone() {
654 builder = builder.with_queued_work_driver(driver);
655 }
656 builder.build().await.map_err(|err| {
657 PluginError::Session(format!(
658 "failed to build process worker runtime for {source_label}: {err}"
659 ))
660 })
661 }
662
663 fn ensure_durable_store_facets(&self) -> Result<(), PluginError> {
672 if self
673 .config
674 .runtime_host
675 .control
676 .effect_host
677 .durability_tier()
678 != crate::DurabilityTier::Durable
679 {
680 return Ok(());
681 }
682 let require = |facet: crate::DurableStoreFacet| {
683 PluginError::Session(crate::RuntimeError::durable_store_required(facet).to_string())
684 };
685 if self
686 .config
687 .runtime_host
688 .durability
689 .attachment_store
690 .persistence()
691 .durability_tier()
692 != crate::DurabilityTier::Durable
693 {
694 return Err(require(crate::DurableStoreFacet::AttachmentStore));
695 }
696 if self
697 .config
698 .runtime_host
699 .durability
700 .process_env_store
701 .durability_tier()
702 != crate::DurabilityTier::Durable
703 {
704 return Err(require(crate::DurableStoreFacet::ProcessEnvStore));
705 }
706 if self.config.session_store_factory.durability_tier() != crate::DurabilityTier::Durable {
707 return Err(require(crate::DurableStoreFacet::SessionStore));
708 }
709 if self.config.process_registry.durability_tier() != crate::DurabilityTier::Durable {
710 return Err(require(crate::DurableStoreFacet::ProcessRegistry));
711 }
712 if self.config.trigger_store.durability_tier() != crate::DurabilityTier::Durable {
713 return Err(require(crate::DurableStoreFacet::TriggerStore));
714 }
715 Ok(())
716 }
717
718 fn ensure_stable_process_id(
726 &self,
727 registration: &ProcessRegistration,
728 ) -> Result<(), PluginError> {
729 if registration.id.trim().is_empty() {
730 return Err(PluginError::Session(
731 crate::RuntimeError::missing_process_execution_id().to_string(),
732 ));
733 }
734 Ok(())
735 }
736}
737
738#[cfg(test)]
739mod recovery_tests {
740 use super::*;
741 use crate::{
742 DurabilityTier, LeaseOwnerIdentity, LeaseOwnerLiveness, ProcessInput, ProcessRegistration,
743 };
744
745 fn inline_worker(
746 registry: Arc<dyn ProcessRegistry>,
747 lease_owner: LeaseOwnerIdentity,
748 ) -> DurableProcessWorker {
749 struct InlineSessionStoreFactory;
750
751 #[async_trait::async_trait]
752 impl SessionStoreFactory for InlineSessionStoreFactory {
753 fn durability_tier(&self) -> DurabilityTier {
754 DurabilityTier::Inline
755 }
756
757 async fn create_store(
758 &self,
759 _request: &crate::SessionStoreCreateRequest,
760 ) -> Result<Arc<dyn crate::RuntimePersistence>, String> {
761 Ok(Arc::new(InMemorySessionStore::default()))
762 }
763
764 async fn delete_session(&self, _session_id: &str) -> Result<(), String> {
765 Ok(())
766 }
767 }
768
769 DurableProcessWorker::new(
770 DurableProcessWorkerConfig::new(
771 Arc::new(PluginHost::new(Vec::new())),
772 RuntimeHostConfig::in_memory(),
773 Arc::new(InlineSessionStoreFactory),
774 registry,
775 )
776 .with_lease_owner(lease_owner),
777 )
778 }
779
780 fn external_registration(id: &str) -> ProcessRegistration {
781 ProcessRegistration::new(
782 id,
783 ProcessInput::External {
784 metadata: serde_json::json!({}),
785 },
786 crate::ProcessProvenance::host(),
787 )
788 }
789
790 fn local_owner(owner_id: &str, host_id: &str, process_start: &str) -> LeaseOwnerIdentity {
791 LeaseOwnerIdentity {
792 owner_id: owner_id.to_string(),
793 incarnation_id: format!("{owner_id}:incarnation"),
794 liveness: LeaseOwnerLiveness::local_process_for_test(
795 host_id,
796 "boot-a",
797 std::process::id(),
798 process_start,
799 ),
800 }
801 }
802
803 async fn await_terminal(registry: &Arc<dyn ProcessRegistry>, process_id: &str) {
804 let awaiter = crate::ProcessAwaiter::polling(Arc::clone(registry));
805 tokio::time::timeout(
806 std::time::Duration::from_secs(5),
807 awaiter.await_terminal(process_id),
808 )
809 .await
810 .expect("recovered process reaches terminal within the sweep")
811 .expect("recovered process terminal output");
812 }
813
814 #[tokio::test]
817 async fn sweep_reclaims_dead_holder_lease_and_recovers_the_process() {
818 let registry: Arc<dyn ProcessRegistry> =
819 Arc::new(crate::TestLocalProcessRegistry::default());
820 registry
821 .register_process(external_registration("proc-sweep-reclaim"))
822 .await
823 .expect("register");
824 let dead_holder = local_owner("dead-worker", "host-a", "not-the-current-process-start");
825 registry
826 .claim_process_lease("proc-sweep-reclaim", &dead_holder, 60_000)
827 .await
828 .expect("dead holder claims")
829 .acquired()
830 .expect("dead holder lease acquired");
831
832 let claimant = local_owner("live-worker", "host-a", "claimant-start");
833 let worker = inline_worker(Arc::clone(®istry), claimant);
834 worker
835 .drive_pending_processes()
836 .await
837 .expect("sweep dispatches");
838 await_terminal(®istry, "proc-sweep-reclaim").await;
839 }
840
841 #[tokio::test]
843 async fn sweep_skips_rows_whose_holder_is_not_provably_dead() {
844 let registry: Arc<dyn ProcessRegistry> =
845 Arc::new(crate::TestLocalProcessRegistry::default());
846 registry
847 .register_process(external_registration("proc-sweep-skip"))
848 .await
849 .expect("register");
850 registry
852 .claim_process_lease(
853 "proc-sweep-skip",
854 &LeaseOwnerIdentity::opaque("other-worker", "other-incarnation"),
855 60_000,
856 )
857 .await
858 .expect("live holder claims")
859 .acquired()
860 .expect("live holder lease acquired");
861
862 let claimant = local_owner("live-worker", "host-a", "claimant-start");
863 let worker = inline_worker(Arc::clone(®istry), claimant);
864 worker
865 .drive_pending_processes()
866 .await
867 .expect("sweep dispatches");
868 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
869 let record = registry
870 .get_process("proc-sweep-skip")
871 .await
872 .expect("process exists");
873 assert!(
874 !record.is_terminal(),
875 "a live-leased process must not be re-run by the sweep"
876 );
877 }
878}
879
880#[cfg(test)]
881mod boundary_tests {
882 use super::*;
883 use crate::{
884 AttachmentStore, AttachmentStoreError, AttachmentStorePersistence, DurabilityTier,
885 DurableStoreFacet, InMemoryAttachmentStore, ProcessExecutionEnvRef,
886 ProcessExecutionEnvStore, ProcessInput, ProcessRegistration, RuntimeEffectController,
887 RuntimeError, StoredAttachment, TriggerStore,
888 };
889 use lash_sansio::{AttachmentCreateMeta, AttachmentId, AttachmentRef};
890
891 #[derive(Default)]
894 struct DurableController;
895
896 impl crate::AwaitEventResolver for DurableController {
897 fn durability_tier(&self) -> DurabilityTier {
898 DurabilityTier::Durable
899 }
900 }
901
902 #[async_trait::async_trait]
903 impl RuntimeEffectController for DurableController {
904 async fn execute_effect(
905 &self,
906 _envelope: crate::RuntimeEffectEnvelope,
907 _local_executor: crate::RuntimeEffectLocalExecutor<'_>,
908 ) -> Result<crate::RuntimeEffectOutcome, crate::RuntimeEffectControllerError> {
909 unreachable!("worker boundary rejects before executing any effect")
910 }
911 }
912
913 #[derive(Default)]
915 struct DurableAttachmentStore {
916 inner: InMemoryAttachmentStore,
917 }
918
919 #[async_trait::async_trait]
920 impl AttachmentStore for DurableAttachmentStore {
921 fn persistence(&self) -> AttachmentStorePersistence {
922 AttachmentStorePersistence::Durable
923 }
924
925 async fn put(
926 &self,
927 bytes: Vec<u8>,
928 meta: AttachmentCreateMeta,
929 ) -> Result<AttachmentRef, AttachmentStoreError> {
930 self.inner.put(bytes, meta).await
931 }
932
933 async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
934 self.inner.get(id).await
935 }
936
937 async fn delete(&self, id: &AttachmentId) -> Result<(), AttachmentStoreError> {
938 self.inner.delete(id).await
939 }
940 }
941
942 #[derive(Default)]
944 struct DurableProcessEnvStore {
945 inner: crate::InMemoryProcessExecutionEnvStore,
946 }
947
948 #[async_trait::async_trait]
949 impl ProcessExecutionEnvStore for DurableProcessEnvStore {
950 fn durability_tier(&self) -> DurabilityTier {
951 DurabilityTier::Durable
952 }
953
954 async fn put_process_execution_env(
955 &self,
956 env_ref: &ProcessExecutionEnvRef,
957 bytes: &[u8],
958 ) -> Result<(), PluginError> {
959 self.inner.put_process_execution_env(env_ref, bytes).await
960 }
961
962 async fn get_process_execution_env(
963 &self,
964 env_ref: &ProcessExecutionEnvRef,
965 ) -> Result<Option<Vec<u8>>, PluginError> {
966 self.inner.get_process_execution_env(env_ref).await
967 }
968 }
969
970 struct TierSessionStoreFactory {
973 tier: DurabilityTier,
974 }
975
976 #[async_trait::async_trait]
977 impl SessionStoreFactory for TierSessionStoreFactory {
978 fn durability_tier(&self) -> DurabilityTier {
979 self.tier
980 }
981
982 async fn create_store(
983 &self,
984 _request: &crate::SessionStoreCreateRequest,
985 ) -> Result<Arc<dyn crate::RuntimePersistence>, String> {
986 unreachable!("worker boundary rejects before creating a session store")
987 }
988
989 async fn delete_session(&self, _session_id: &str) -> Result<(), String> {
990 Ok(())
991 }
992 }
993
994 struct TierTriggerStore {
995 tier: DurabilityTier,
996 inner: crate::InMemoryTriggerStore,
997 }
998
999 impl TierTriggerStore {
1000 fn new(tier: DurabilityTier) -> Self {
1001 Self {
1002 tier,
1003 inner: crate::InMemoryTriggerStore::default(),
1004 }
1005 }
1006 }
1007
1008 #[async_trait::async_trait]
1009 impl TriggerStore for TierTriggerStore {
1010 fn durability_tier(&self) -> DurabilityTier {
1011 self.tier
1012 }
1013
1014 async fn register_subscription(
1015 &self,
1016 draft: crate::TriggerSubscriptionDraft,
1017 ) -> Result<crate::TriggerSubscriptionRecord, PluginError> {
1018 self.inner.register_subscription(draft).await
1019 }
1020
1021 async fn list_subscriptions(
1022 &self,
1023 filter: crate::TriggerSubscriptionFilter,
1024 ) -> Result<Vec<crate::TriggerSubscriptionRecord>, PluginError> {
1025 self.inner.list_subscriptions(filter).await
1026 }
1027
1028 async fn cancel_subscription(
1029 &self,
1030 session_id: &str,
1031 handle: &str,
1032 ) -> Result<bool, PluginError> {
1033 self.inner.cancel_subscription(session_id, handle).await
1034 }
1035
1036 async fn delete_session_subscriptions(
1037 &self,
1038 session_id: &str,
1039 ) -> Result<usize, PluginError> {
1040 self.inner.delete_session_subscriptions(session_id).await
1041 }
1042
1043 async fn record_occurrence(
1044 &self,
1045 request: crate::TriggerOccurrenceRequest,
1046 ) -> Result<crate::TriggerOccurrenceRecord, PluginError> {
1047 self.inner.record_occurrence(request).await
1048 }
1049
1050 async fn reserve_matching_deliveries(
1051 &self,
1052 occurrence_id: &str,
1053 ) -> Result<Vec<crate::TriggerDeliveryReservation>, PluginError> {
1054 self.inner.reserve_matching_deliveries(occurrence_id).await
1055 }
1056 }
1057
1058 fn worker(
1062 attachment: Arc<dyn AttachmentStore>,
1063 process_env_store: Arc<dyn ProcessExecutionEnvStore>,
1064 session_store_tier: DurabilityTier,
1065 ) -> DurableProcessWorker {
1066 worker_with_store_tiers(
1067 attachment,
1068 process_env_store,
1069 session_store_tier,
1070 DurabilityTier::Durable,
1071 DurabilityTier::Durable,
1072 )
1073 }
1074
1075 fn worker_with_store_tiers(
1076 attachment: Arc<dyn AttachmentStore>,
1077 process_env_store: Arc<dyn ProcessExecutionEnvStore>,
1078 session_store_tier: DurabilityTier,
1079 process_registry_tier: DurabilityTier,
1080 trigger_store_tier: DurabilityTier,
1081 ) -> DurableProcessWorker {
1082 let mut runtime_host = RuntimeHostConfig::in_memory();
1083 runtime_host.control.effect_host =
1084 Arc::new(crate::InlineEffectHost::new(Arc::new(DurableController)));
1085 runtime_host.durability.attachment_store = attachment;
1086 runtime_host.durability.process_env_store = process_env_store;
1087 let plugin_host = Arc::new(crate::PluginHost::new(Vec::new()));
1088 let factory: Arc<dyn SessionStoreFactory> = Arc::new(TierSessionStoreFactory {
1089 tier: session_store_tier,
1090 });
1091 let registry: Arc<dyn ProcessRegistry> = Arc::new(
1092 crate::TestLocalProcessRegistry::default().with_durability_tier(process_registry_tier),
1093 );
1094 let trigger_store: Arc<dyn TriggerStore> =
1095 Arc::new(TierTriggerStore::new(trigger_store_tier));
1096 DurableProcessWorker::new(
1097 DurableProcessWorkerConfig::new(plugin_host, runtime_host, factory, registry)
1098 .with_trigger_store(trigger_store),
1099 )
1100 }
1101
1102 fn external_registration() -> ProcessRegistration {
1103 ProcessRegistration::new(
1104 "worker-boundary-process",
1105 ProcessInput::External {
1106 metadata: serde_json::json!({}),
1107 },
1108 crate::ProcessProvenance::host(),
1109 )
1110 }
1111
1112 async fn run(worker: &DurableProcessWorker) -> Result<ProcessAwaitOutput, PluginError> {
1113 worker
1114 .run_process(
1115 external_registration(),
1116 ProcessExecutionContext::default(),
1117 CancellationToken::new(),
1118 )
1119 .await
1120 }
1121
1122 fn assert_facet(err: PluginError, facet: DurableStoreFacet) {
1123 let PluginError::Session(message) = err else {
1124 panic!("expected PluginError::Session, got {err:?}");
1125 };
1126 let expected = RuntimeError::durable_store_required(facet).to_string();
1127 assert_eq!(message, expected, "worker must reject the {facet:?} facet");
1128 }
1129
1130 #[tokio::test]
1131 async fn durable_worker_rejects_ephemeral_attachment_store() {
1132 let worker = worker(
1133 Arc::new(InMemoryAttachmentStore::new()),
1134 Arc::new(DurableProcessEnvStore::default()),
1135 DurabilityTier::Durable,
1136 );
1137 let err = run(&worker)
1138 .await
1139 .expect_err("ephemeral attachment store must be rejected at the worker boundary");
1140 assert_facet(err, DurableStoreFacet::AttachmentStore);
1141 }
1142
1143 #[tokio::test]
1144 async fn durable_worker_rejects_ephemeral_process_env_store() {
1145 let worker = worker(
1146 Arc::new(DurableAttachmentStore::default()),
1147 Arc::new(crate::InMemoryProcessExecutionEnvStore::new()),
1148 DurabilityTier::Durable,
1149 );
1150 let err = run(&worker)
1151 .await
1152 .expect_err("ephemeral process env store must be rejected at the worker boundary");
1153 assert_facet(err, DurableStoreFacet::ProcessEnvStore);
1154 }
1155
1156 #[tokio::test]
1157 async fn durable_worker_rejects_ephemeral_session_store_factory() {
1158 let worker = worker(
1159 Arc::new(DurableAttachmentStore::default()),
1160 Arc::new(DurableProcessEnvStore::default()),
1161 DurabilityTier::Inline,
1162 );
1163 let err = run(&worker)
1164 .await
1165 .expect_err("ephemeral session store factory must be rejected at the worker boundary");
1166 assert_facet(err, DurableStoreFacet::SessionStore);
1167 }
1168
1169 #[tokio::test]
1170 async fn durable_worker_rejects_ephemeral_process_registry() {
1171 let worker = worker_with_store_tiers(
1172 Arc::new(DurableAttachmentStore::default()),
1173 Arc::new(DurableProcessEnvStore::default()),
1174 DurabilityTier::Durable,
1175 DurabilityTier::Inline,
1176 DurabilityTier::Durable,
1177 );
1178 let err = run(&worker)
1179 .await
1180 .expect_err("ephemeral process registry must be rejected at the worker boundary");
1181 assert_facet(err, DurableStoreFacet::ProcessRegistry);
1182 }
1183
1184 #[tokio::test]
1185 async fn durable_worker_rejects_ephemeral_trigger_store() {
1186 let worker = worker_with_store_tiers(
1187 Arc::new(DurableAttachmentStore::default()),
1188 Arc::new(DurableProcessEnvStore::default()),
1189 DurabilityTier::Durable,
1190 DurabilityTier::Durable,
1191 DurabilityTier::Inline,
1192 );
1193 let err = run(&worker)
1194 .await
1195 .expect_err("ephemeral trigger store must be rejected at the worker boundary");
1196 assert_facet(err, DurableStoreFacet::TriggerStore);
1197 }
1198
1199 #[tokio::test]
1200 async fn durable_worker_with_all_durable_stores_passes_store_facet_check() {
1201 let worker = worker(
1205 Arc::new(DurableAttachmentStore::default()),
1206 Arc::new(DurableProcessEnvStore::default()),
1207 DurabilityTier::Durable,
1208 );
1209 let output = run(&worker)
1210 .await
1211 .expect("all-durable worker should pass the store-facet guard");
1212 assert!(matches!(output, ProcessAwaitOutput::Success { .. }));
1213 }
1214
1215 #[tokio::test]
1216 async fn inline_worker_passes_store_facet_check_with_ephemeral_stores() {
1217 let runtime_host = RuntimeHostConfig::in_memory();
1220 let plugin_host = Arc::new(crate::PluginHost::new(Vec::new()));
1221 let factory: Arc<dyn SessionStoreFactory> = Arc::new(TierSessionStoreFactory {
1222 tier: DurabilityTier::Inline,
1223 });
1224 let registry: Arc<dyn ProcessRegistry> =
1225 Arc::new(crate::TestLocalProcessRegistry::default());
1226 let worker = DurableProcessWorker::new(DurableProcessWorkerConfig::new(
1227 plugin_host,
1228 runtime_host,
1229 factory,
1230 registry,
1231 ));
1232 let output = run(&worker)
1233 .await
1234 .expect("inline worker should pass the store-facet guard");
1235 assert!(matches!(output, ProcessAwaitOutput::Success { .. }));
1236 }
1237}