1use std::sync::Arc;
2
3use tokio_util::sync::CancellationToken;
4
5use super::effect::ProcessRunner;
6use super::session_manager::RuntimeSessionServices;
7use super::{EmbeddedRuntimeBuilder, ProcessWorkDriver, QueuedWorkDriver, RuntimeHostConfig};
8use crate::{
9 AbandonEvidence, AbandonWriter, InMemorySessionStore, LashRuntime, PluginError, PluginFactory,
10 PluginHost, PluginStack, ProcessAwaitOutput, ProcessExecutionContext, ProcessInput,
11 ProcessLease, ProcessLeaseCompletion, ProcessRecord, ProcessRegistration, ProcessRegistry,
12 RecoveryDisposition, SessionStoreFactory,
13};
14
15#[derive(Clone)]
21pub struct DurableProcessWorkerConfig {
22 pub plugin_host: Arc<PluginHost>,
23 pub runtime_host: RuntimeHostConfig,
24 pub session_policy: crate::SessionPolicy,
25 pub session_store_factory: Arc<dyn SessionStoreFactory>,
26 pub process_registry: Arc<dyn ProcessRegistry>,
27 pub process_change_hub: Option<crate::ProcessChangeHub>,
28 pub trigger_store: Arc<dyn crate::TriggerStore>,
29 pub process_work_driver: Option<ProcessWorkDriver>,
30 pub queued_work_driver: Option<QueuedWorkDriver>,
31 #[doc(hidden)]
32 pub turn_phase_probe_slot: crate::runtime::RuntimeTurnPhaseProbeSlot,
33 pub residency: crate::Residency,
39 pub lease_owner: crate::LeaseOwnerIdentity,
52}
53
54impl DurableProcessWorkerConfig {
55 pub fn new(
56 plugin_host: Arc<PluginHost>,
57 runtime_host: RuntimeHostConfig,
58 session_store_factory: Arc<dyn SessionStoreFactory>,
59 process_registry: Arc<dyn ProcessRegistry>,
60 ) -> Self {
61 let clock = Arc::clone(&runtime_host.clock);
62 Self {
63 plugin_host,
64 runtime_host,
65 session_policy: crate::SessionPolicy::default(),
66 session_store_factory,
67 process_registry,
68 process_change_hub: None,
69 trigger_store: Arc::new(crate::InMemoryTriggerStore::with_clock(clock)),
70 process_work_driver: None,
71 queued_work_driver: None,
72 turn_phase_probe_slot: crate::runtime::RuntimeTurnPhaseProbeSlot::default(),
73 residency: crate::Residency::default(),
74 lease_owner: crate::LeaseOwnerIdentity::opaque(
75 format!("durable-process-worker:{}", uuid::Uuid::new_v4()),
76 uuid::Uuid::new_v4().to_string(),
77 ),
78 }
79 }
80
81 pub fn with_trigger_store(mut self, store: Arc<dyn crate::TriggerStore>) -> Self {
82 self.trigger_store = store;
83 self
84 }
85
86 pub fn with_session_policy(mut self, policy: crate::SessionPolicy) -> Self {
87 self.session_policy = policy;
88 self
89 }
90
91 pub fn with_residency(mut self, residency: crate::Residency) -> Self {
92 self.residency = residency;
93 self
94 }
95
96 pub fn with_process_work_driver(mut self, driver: ProcessWorkDriver) -> Self {
97 self.process_work_driver = Some(driver);
98 self
99 }
100
101 pub fn with_change_hub(mut self, hub: crate::ProcessChangeHub) -> Self {
102 self.process_change_hub = Some(hub);
103 self
104 }
105
106 pub fn with_lease_owner(mut self, lease_owner: crate::LeaseOwnerIdentity) -> Self {
109 self.lease_owner = lease_owner;
110 self
111 }
112
113 pub fn with_queued_work_driver(mut self, driver: QueuedWorkDriver) -> Self {
114 self.queued_work_driver = Some(driver);
115 self
116 }
117
118 #[doc(hidden)]
119 pub fn with_turn_phase_probe_slot(
120 mut self,
121 slot: crate::runtime::RuntimeTurnPhaseProbeSlot,
122 ) -> Self {
123 self.turn_phase_probe_slot = slot;
124 self
125 }
126
127 pub fn from_plugin_factories(
128 plugin_factories: impl IntoIterator<Item = Arc<dyn PluginFactory>>,
129 runtime_host: RuntimeHostConfig,
130 session_store_factory: Arc<dyn SessionStoreFactory>,
131 process_registry: Arc<dyn ProcessRegistry>,
132 ) -> Self {
133 Self::new(
134 Arc::new(PluginHost::new(plugin_factories.into_iter().collect())),
135 runtime_host,
136 session_store_factory,
137 process_registry,
138 )
139 }
140
141 pub fn from_plugin_stack(
142 plugin_stack: PluginStack,
143 runtime_host: RuntimeHostConfig,
144 session_store_factory: Arc<dyn SessionStoreFactory>,
145 process_registry: Arc<dyn ProcessRegistry>,
146 ) -> Self {
147 Self::from_plugin_factories(
148 plugin_stack.into_factories(),
149 runtime_host,
150 session_store_factory,
151 process_registry,
152 )
153 }
154}
155
156#[derive(Clone)]
158pub struct DurableProcessWorker {
159 config: Arc<DurableProcessWorkerConfig>,
160}
161
162#[derive(Clone, Debug, Default, PartialEq, Eq)]
164pub struct ProcessDrainReport {
165 pub abandoned: Vec<String>,
168}
169
170enum RecoverFailure {
172 LeaseLost(PluginError),
176 Run(PluginError),
179}
180
181impl DurableProcessWorker {
182 pub fn new(config: DurableProcessWorkerConfig) -> Self {
183 Self {
184 config: Arc::new(config),
185 }
186 }
187
188 pub fn from_shared_config(config: Arc<DurableProcessWorkerConfig>) -> Self {
189 Self { config }
190 }
191
192 pub fn config(&self) -> &DurableProcessWorkerConfig {
193 &self.config
194 }
195
196 pub async fn run_process(
197 &self,
198 registration: ProcessRegistration,
199 execution_context: ProcessExecutionContext,
200 cancellation: CancellationToken,
201 ) -> Result<ProcessAwaitOutput, PluginError> {
202 let scoped_effect_controller = self
203 .config
204 .runtime_host
205 .control
206 .effect_host
207 .scoped_static(crate::ExecutionScope::process(registration.id.clone()))
208 .map_err(|err| PluginError::Session(err.to_string()))?
209 .ok_or_else(|| {
210 PluginError::Session(
211 "process worker effect host must provide a static process scope".to_string(),
212 )
213 })?;
214 self.run_process_with_scoped_effect_controller(
215 registration,
216 execution_context,
217 scoped_effect_controller,
218 cancellation,
219 )
220 .await
221 }
222
223 pub async fn run_process_with_scoped_effect_controller(
224 &self,
225 registration: ProcessRegistration,
226 execution_context: ProcessExecutionContext,
227 scoped_effect_controller: crate::ScopedEffectController<'_>,
228 cancellation: CancellationToken,
229 ) -> Result<ProcessAwaitOutput, PluginError> {
230 self.ensure_stable_process_id(®istration)?;
231 self.ensure_durable_store_facets()?;
232 if registration.disposition == RecoveryDisposition::ExternallyOwned {
236 return Err(PluginError::Session(format!(
237 "process `{}` is externally-owned and must not be executed by lash",
238 registration.id
239 )));
240 }
241 self.config
247 .process_registry
248 .record_first_started(
249 ®istration.id,
250 crate::ProcessStarted {
251 owner: self.config.lease_owner.clone(),
252 started_at_ms: self.now_ms(),
253 },
254 )
255 .await?;
256 let mut runtime = self.runtime_for_registration(®istration).await?;
257 let originator_scope = if let crate::ProcessOriginator::Session { scope } =
258 ®istration.provenance.originator
259 {
260 Some(scope)
261 } else {
262 None
263 };
264 let probe_scope = registration.wake_target.as_ref().or(originator_scope);
265 if let Some(probe) =
266 probe_scope.and_then(|scope| self.config.turn_phase_probe_slot.get_for_scope(scope))
267 {
268 runtime.set_turn_phase_probe(probe);
269 }
270 let manager = RuntimeSessionServices::new(&runtime, true, None).map_err(|err| {
271 PluginError::Session(format!(
272 "failed to build runtime env for process `{}`: {err}",
273 registration.id
274 ))
275 })?;
276 Ok(manager
277 .run_process(
278 registration,
279 execution_context,
280 Arc::clone(&self.config.process_registry),
281 scoped_effect_controller,
282 cancellation,
283 )
284 .await)
285 }
286
287 pub async fn drive_pending_processes(&self) -> Result<(), PluginError> {
313 let records = self.config.process_registry.list_non_terminal().await?;
314 for record in records {
315 let worker = self.clone();
326 tokio::spawn(async move { worker.recover_process(record).await });
327 }
328 Ok(())
329 }
330
331 pub async fn drain_owner_bound_work(&self) -> Result<ProcessDrainReport, PluginError> {
363 let mut abandoned = Vec::new();
364 for record in self.config.process_registry.list_non_terminal().await? {
365 if record.disposition != RecoveryDisposition::OwnerBound {
366 continue;
367 }
368 let Some(first_started) = record.first_started.as_ref() else {
369 continue;
373 };
374 if first_started.owner != self.config.lease_owner {
375 continue;
377 }
378 let owner = first_started.owner.clone();
379 if self.drain_one_owner_bound(&record.id, owner).await {
380 abandoned.push(record.id);
381 }
382 }
383 Ok(ProcessDrainReport { abandoned })
384 }
385
386 async fn drain_one_owner_bound(
391 &self,
392 process_id: &str,
393 owner: crate::LeaseOwnerIdentity,
394 ) -> bool {
395 let lease_ttl_ms = self.lease_timings().ttl_ms();
396 let drain_owner = self.recovery_lease_owner();
397 let lease = match self
398 .config
399 .process_registry
400 .claim_process_lease(process_id, &drain_owner, lease_ttl_ms)
401 .await
402 {
403 Ok(crate::ProcessLeaseClaimOutcome::Acquired(lease)) => lease,
404 Ok(crate::ProcessLeaseClaimOutcome::Busy { .. }) | Err(_) => return false,
406 };
407 if self
408 .config
409 .process_registry
410 .get_process(process_id)
411 .await
412 .is_some_and(|current| current.is_terminal())
413 {
414 self.release_or_log(&lease).await;
415 return false;
416 }
417 let evidence = AbandonEvidence {
418 writer: AbandonWriter::OwnerDrain,
419 owner: Some(owner),
420 epoch_ms: self.now_ms(),
421 };
422 self.complete_and_release(
423 &lease,
424 process_id,
425 ProcessAwaitOutput::Abandoned {
426 evidence: Box::new(evidence),
427 control: None,
428 },
429 )
430 .await;
431 true
432 }
433
434 fn recovery_lease_owner(&self) -> crate::LeaseOwnerIdentity {
442 let attempt = uuid::Uuid::new_v4();
443 crate::LeaseOwnerIdentity {
444 owner_id: format!("{}:recovery:{attempt}", self.config.lease_owner.owner_id),
445 incarnation_id: attempt.to_string(),
446 liveness: self.config.lease_owner.liveness.clone(),
447 }
448 }
449
450 async fn recover_process(&self, record: ProcessRecord) {
468 let process_id = record.id.clone();
469 if record.disposition == RecoveryDisposition::ExternallyOwned {
473 if record.abandon_request.is_some() {
474 self.reconcile_externally_owned_abandon(&process_id).await;
475 }
476 return;
477 }
478
479 let lease_ttl_ms = self.lease_timings().ttl_ms();
480 let owner = self.recovery_lease_owner();
481 let Some((lease, dead_holder)) = self
486 .claim_for_recovery(&process_id, &owner, lease_ttl_ms)
487 .await
488 else {
489 return;
490 };
491 if self
494 .config
495 .process_registry
496 .get_process(&process_id)
497 .await
498 .is_some_and(|current| current.is_terminal())
499 {
500 self.release_or_log(&lease).await;
501 return;
502 }
503
504 match record.disposition {
505 RecoveryDisposition::Rerunnable => self.run_and_complete(record, lease).await,
507 RecoveryDisposition::OwnerBound if record.first_started.is_some() => {
508 let lapsed_owner = record
512 .first_started
513 .as_ref()
514 .map(|started| started.owner.clone());
515 let evidence = if let Some(dead_holder) = dead_holder {
516 Some(AbandonEvidence {
518 writer: AbandonWriter::Sweep,
519 owner: Some(dead_holder.owner),
520 epoch_ms: self.now_ms(),
521 })
522 } else if record.abandon_request.is_some() {
523 Some(AbandonEvidence {
527 writer: AbandonWriter::ReconciledRequest,
528 owner: lapsed_owner,
529 epoch_ms: self.now_ms(),
530 })
531 } else {
532 None
535 };
536 match evidence {
537 Some(evidence) => {
538 self.complete_and_release(
539 &lease,
540 &process_id,
541 ProcessAwaitOutput::Abandoned {
542 evidence: Box::new(evidence),
543 control: None,
544 },
545 )
546 .await;
547 }
548 None => self.release_or_log(&lease).await,
549 }
550 }
551 RecoveryDisposition::OwnerBound => self.run_and_complete(record, lease).await,
554 RecoveryDisposition::ExternallyOwned => self.release_or_log(&lease).await,
556 }
557 }
558
559 fn now_ms(&self) -> u64 {
561 self.config.runtime_host.clock.timestamp_ms()
562 }
563
564 async fn claim_for_recovery(
569 &self,
570 process_id: &str,
571 owner: &crate::LeaseOwnerIdentity,
572 lease_ttl_ms: u64,
573 ) -> Option<(ProcessLease, Option<ProcessLease>)> {
574 match self
575 .config
576 .process_registry
577 .claim_process_lease(process_id, owner, lease_ttl_ms)
578 .await
579 {
580 Ok(crate::ProcessLeaseClaimOutcome::Acquired(lease)) => Some((lease, None)),
581 Ok(crate::ProcessLeaseClaimOutcome::Busy { holder })
582 if holder.owner.is_definitely_dead_for_claimant(owner) =>
583 {
584 match self
585 .config
586 .process_registry
587 .reclaim_process_lease(process_id, owner, &holder, lease_ttl_ms)
588 .await
589 {
590 Ok(crate::ProcessLeaseClaimOutcome::Acquired(lease)) => {
591 Some((lease, Some(holder)))
592 }
593 Ok(crate::ProcessLeaseClaimOutcome::Busy { .. }) | Err(_) => None,
594 }
595 }
596 Ok(crate::ProcessLeaseClaimOutcome::Busy { .. }) | Err(_) => None,
597 }
598 }
599
600 async fn reconcile_externally_owned_abandon(&self, process_id: &str) {
605 let lease_ttl_ms = self.lease_timings().ttl_ms();
606 let owner = self.recovery_lease_owner();
607 let lease = match self
608 .config
609 .process_registry
610 .claim_process_lease(process_id, &owner, lease_ttl_ms)
611 .await
612 {
613 Ok(crate::ProcessLeaseClaimOutcome::Acquired(lease)) => lease,
614 Ok(crate::ProcessLeaseClaimOutcome::Busy { .. }) | Err(_) => return,
616 };
617 if self
618 .config
619 .process_registry
620 .get_process(process_id)
621 .await
622 .is_some_and(|current| current.is_terminal())
623 {
624 self.release_or_log(&lease).await;
625 return;
626 }
627 let evidence = AbandonEvidence {
628 writer: AbandonWriter::ReconciledRequest,
629 owner: None,
631 epoch_ms: self.now_ms(),
632 };
633 self.complete_and_release(
634 &lease,
635 process_id,
636 ProcessAwaitOutput::Abandoned {
637 evidence: Box::new(evidence),
638 control: None,
639 },
640 )
641 .await;
642 }
643
644 async fn run_and_complete(&self, record: ProcessRecord, lease: ProcessLease) {
647 let process_id = record.id.clone();
648 let registration = registration_from_record(record);
649 let execution_context = ProcessExecutionContext::default();
650 match self
651 .run_process_with_lease_renewal(registration, execution_context, lease.clone())
652 .await
653 {
654 Ok(output) => self.complete_and_release(&lease, &process_id, output).await,
657 Err(RecoverFailure::LeaseLost(err)) => {
663 tracing::warn!(
664 process_id = %process_id,
665 error = %err,
666 "process recovery lost its lease mid-run; deferring to the new owner",
667 );
668 }
669 Err(RecoverFailure::Run(err)) => {
672 let output = ProcessAwaitOutput::Failure {
673 class: crate::ToolFailureClass::Execution,
674 code: "process_recovery_failed".to_string(),
675 message: err.to_string(),
676 raw: None,
677 control: None,
678 };
679 self.complete_and_release(&lease, &process_id, output).await;
680 }
681 }
682 }
683
684 async fn complete_and_release(
688 &self,
689 lease: &ProcessLease,
690 process_id: &str,
691 output: ProcessAwaitOutput,
692 ) {
693 let fenced = match self
700 .config
701 .process_registry
702 .renew_process_lease(lease, self.lease_timings().ttl_ms())
703 .await
704 {
705 Ok(renewed) => renewed,
706 Err(err) => {
707 tracing::warn!(
708 process_id = %process_id,
709 error = %err,
710 "lost process lease before terminal write; deferring to the new owner",
711 );
712 return;
713 }
714 };
715 if let Err(err) = self
716 .config
717 .process_registry
718 .complete_process(process_id, output)
719 .await
720 {
721 tracing::warn!(
722 process_id = %process_id,
723 error = %err,
724 "failed to write recovered process terminal outcome",
725 );
726 }
727 self.release_or_log(&fenced).await;
728 }
729
730 async fn release_or_log(&self, lease: &ProcessLease) {
731 if let Err(err) = self.release_process_lease(lease).await {
732 tracing::warn!(
733 process_id = %lease.process_id,
734 error = %err,
735 "failed to release recovered process lease",
736 );
737 }
738 }
739
740 async fn run_process_with_lease_renewal(
744 &self,
745 registration: ProcessRegistration,
746 execution_context: ProcessExecutionContext,
747 mut lease: ProcessLease,
748 ) -> Result<ProcessAwaitOutput, RecoverFailure> {
749 let process_id = registration.id.clone();
750 let cancellation = CancellationToken::new();
751 let cancel_watcher = {
752 let awaiter = self
753 .config
754 .process_change_hub
755 .clone()
756 .map(|hub| {
757 crate::ProcessAwaiter::new(Arc::clone(&self.config.process_registry), hub)
758 })
759 .unwrap_or_else(|| {
760 crate::ProcessAwaiter::polling(Arc::clone(&self.config.process_registry))
761 });
762 let process_id = process_id.clone();
763 let cancellation = cancellation.clone();
764 tokio::spawn(async move {
765 match awaiter
766 .await_event(&process_id, "process.cancel_requested", 0)
767 .await
768 {
769 Ok(_) => cancellation.cancel(),
770 Err(err) => tracing::warn!(
771 process_id = %process_id,
772 error = %err,
773 "process cancel watcher stopped before observing cancellation",
774 ),
775 }
776 })
777 };
778 let pending = self.run_process(registration, execution_context, cancellation.clone());
779 tokio::pin!(pending);
780 loop {
781 tokio::select! {
782 outcome = &mut pending => {
783 cancel_watcher.abort();
784 return outcome.map_err(RecoverFailure::Run);
785 }
786 _ = self.config.runtime_host.clock.sleep(self.lease_timings().renew_interval()) => {
787 match self
788 .config
789 .process_registry
790 .renew_process_lease(&lease, self.lease_timings().ttl_ms())
791 .await
792 {
793 Ok(renewed) => lease = renewed,
794 Err(err) => {
795 cancellation.cancel();
796 cancel_watcher.abort();
797 return Err(RecoverFailure::LeaseLost(err));
798 }
799 }
800 }
801 }
802 }
803 }
804
805 fn lease_timings(&self) -> crate::LeaseTimings {
806 self.config.runtime_host.control.lease_timings
807 }
808
809 async fn release_process_lease(&self, lease: &ProcessLease) -> Result<(), PluginError> {
810 self.config
811 .process_registry
812 .complete_process_lease(&ProcessLeaseCompletion::from_lease(lease))
813 .await
814 }
815
816 pub async fn request_process_cancel(
817 &self,
818 process_id: &str,
819 reason: Option<String>,
820 ) -> Result<(), PluginError> {
821 self.config
822 .process_registry
823 .append_event(
824 process_id,
825 crate::ProcessEventAppendRequest::cancel_requested(process_id, reason),
826 )
827 .await
828 .map(|_| ())
829 }
830
831 async fn runtime_for_registration(
832 &self,
833 registration: &ProcessRegistration,
834 ) -> Result<LashRuntime, PluginError> {
835 match registration.input.as_ref() {
836 ProcessInput::SessionTurn { create_request, .. } => {
837 self.runtime_for_session_turn(registration, create_request.as_ref())
838 .await
839 }
840 ProcessInput::ToolCall { .. } | ProcessInput::Engine { .. } => {
841 self.runtime_for_process_env(registration).await
842 }
843 ProcessInput::External { .. } => Err(PluginError::Session(format!(
847 "process `{}` is externally-owned and has no execution runtime",
848 registration.id
849 ))),
850 }
851 }
852
853 async fn runtime_for_session_turn(
854 &self,
855 registration: &ProcessRegistration,
856 create_request: &crate::SessionCreateRequest,
857 ) -> Result<LashRuntime, PluginError> {
858 let mut policy = create_request
859 .policy
860 .clone()
861 .unwrap_or_else(|| self.config.session_policy.clone());
862 if policy.recorded_provider_id().is_empty() {
863 policy.provider_id = self.config.session_policy.provider_id.clone();
864 }
865 self.build_ephemeral_runtime(
866 format!("process-session-turn:{}", registration.id),
867 policy,
868 create_request.plugin_options.clone(),
869 "session turn request",
870 )
871 .await
872 }
873
874 async fn runtime_for_process_env(
875 &self,
876 registration: &ProcessRegistration,
877 ) -> Result<LashRuntime, PluginError> {
878 let Some(env_ref) = registration.env_ref.as_ref() else {
879 return Err(PluginError::Session(format!(
880 "process `{}` is missing a captured execution env",
881 registration.id
882 )));
883 };
884 let env = crate::load_process_execution_env(
885 self.config
886 .runtime_host
887 .durability
888 .process_env_store
889 .as_ref(),
890 env_ref,
891 )
892 .await?;
893 self.build_ephemeral_runtime(
894 format!("process-env:{}", registration.id),
895 env.policy,
896 env.plugin_options,
897 env_ref.as_str(),
898 )
899 .await
900 }
901
902 async fn build_ephemeral_runtime(
903 &self,
904 session_id: String,
905 policy: crate::SessionPolicy,
906 plugin_options: crate::PluginOptions,
907 source_label: &str,
908 ) -> Result<LashRuntime, PluginError> {
909 let store = Arc::new(InMemorySessionStore::default());
910 let process_work_driver = self.config.process_work_driver.clone().unwrap_or_else(|| {
911 if let Some(hub) = self.config.process_change_hub.clone() {
912 ProcessWorkDriver::from_watched(
913 Arc::clone(&self.config.process_registry),
914 hub,
915 Arc::new(crate::InlineProcessRunHandle::new(self.clone())),
916 )
917 } else {
918 ProcessWorkDriver::inline(Arc::clone(&self.config.process_registry), self.clone())
919 }
920 });
921 let mut builder = EmbeddedRuntimeBuilder::new()
922 .with_session_id(session_id.to_string())
923 .with_plugin_host(self.config.plugin_host.as_ref().clone())
924 .with_runtime_host(self.config.runtime_host.clone())
925 .with_policy(policy)
926 .with_plugin_options(plugin_options)
927 .with_session_store_factory(Arc::clone(&self.config.session_store_factory))
928 .with_trigger_store(Arc::clone(&self.config.trigger_store))
929 .with_process_registry(Arc::clone(&self.config.process_registry))
930 .with_process_work_driver(process_work_driver)
931 .with_residency(self.config.residency)
932 .with_store(store);
933 if let Some(driver) = self.config.queued_work_driver.clone() {
934 builder = builder.with_queued_work_driver(driver);
935 }
936 builder.build().await.map_err(|err| {
937 PluginError::Session(format!(
938 "failed to build process worker runtime for {source_label}: {err}"
939 ))
940 })
941 }
942
943 fn ensure_durable_store_facets(&self) -> Result<(), PluginError> {
952 if self
953 .config
954 .runtime_host
955 .control
956 .effect_host
957 .durability_tier()
958 != crate::DurabilityTier::Durable
959 {
960 return Ok(());
961 }
962 let require = |facet: crate::DurableStoreFacet| {
963 PluginError::Session(crate::RuntimeError::durable_store_required(facet).to_string())
964 };
965 if self
966 .config
967 .runtime_host
968 .durability
969 .attachment_store
970 .persistence()
971 .durability_tier()
972 != crate::DurabilityTier::Durable
973 {
974 return Err(require(crate::DurableStoreFacet::AttachmentStore));
975 }
976 if self
977 .config
978 .runtime_host
979 .durability
980 .process_env_store
981 .durability_tier()
982 != crate::DurabilityTier::Durable
983 {
984 return Err(require(crate::DurableStoreFacet::ProcessEnvStore));
985 }
986 if self.config.session_store_factory.durability_tier() != crate::DurabilityTier::Durable {
987 return Err(require(crate::DurableStoreFacet::SessionStore));
988 }
989 if self.config.process_registry.durability_tier() != crate::DurabilityTier::Durable {
990 return Err(require(crate::DurableStoreFacet::ProcessRegistry));
991 }
992 if self.config.trigger_store.durability_tier() != crate::DurabilityTier::Durable {
993 return Err(require(crate::DurableStoreFacet::TriggerStore));
994 }
995 Ok(())
996 }
997
998 fn ensure_stable_process_id(
1006 &self,
1007 registration: &ProcessRegistration,
1008 ) -> Result<(), PluginError> {
1009 if registration.id.trim().is_empty() {
1010 return Err(PluginError::Session(
1011 crate::RuntimeError::missing_process_execution_id().to_string(),
1012 ));
1013 }
1014 Ok(())
1015 }
1016}
1017
1018fn registration_from_record(record: ProcessRecord) -> ProcessRegistration {
1021 ProcessRegistration {
1022 id: record.id,
1023 input: record.input,
1024 disposition: record.disposition,
1025 identity: record.identity,
1026 event_types: record.event_types,
1027 provenance: record.provenance,
1028 env_ref: record.env_ref,
1029 wake_target: record.wake_target,
1030 }
1031}
1032
1033#[cfg(test)]
1034mod boundary_tests;
1035#[cfg(test)]
1036mod recovery_tests;