1use std::sync::Arc;
2use std::time::Duration;
3
4use tokio_util::sync::CancellationToken;
5
6use super::effect::ProcessRunner;
7use super::session_manager::RuntimeSessionServices;
8use super::{EmbeddedRuntimeBuilder, RUNTIME_TURN_LEASE_TTL_MS, RuntimeHostConfig};
9use crate::{
10 LashRuntime, PluginError, PluginFactory, PluginHost, PluginStack, ProcessAwaitOutput,
11 ProcessExecutionContext, ProcessInput, ProcessLease, ProcessLeaseCompletion, ProcessRecord,
12 ProcessRegistration, ProcessRegistry, SessionStoreCreateRequest, SessionStoreFactory,
13};
14
15#[derive(Clone)]
21pub struct DurableProcessWorkerConfig {
22 pub plugin_host: Arc<PluginHost>,
23 pub runtime_host: RuntimeHostConfig,
24 pub session_policy: crate::SessionPolicy,
25 pub session_store_factory: Arc<dyn SessionStoreFactory>,
26 pub process_registry: Arc<dyn ProcessRegistry>,
27 pub residency: crate::Residency,
33}
34
35impl DurableProcessWorkerConfig {
36 pub fn new(
37 plugin_host: Arc<PluginHost>,
38 runtime_host: RuntimeHostConfig,
39 session_store_factory: Arc<dyn SessionStoreFactory>,
40 process_registry: Arc<dyn ProcessRegistry>,
41 ) -> Self {
42 Self {
43 plugin_host,
44 runtime_host,
45 session_policy: crate::SessionPolicy::default(),
46 session_store_factory,
47 process_registry,
48 residency: crate::Residency::default(),
49 }
50 }
51
52 pub fn with_session_policy(mut self, policy: crate::SessionPolicy) -> Self {
53 self.session_policy = policy;
54 self
55 }
56
57 pub fn with_residency(mut self, residency: crate::Residency) -> Self {
58 self.residency = residency;
59 self
60 }
61
62 pub fn from_plugin_factories(
63 plugin_factories: impl IntoIterator<Item = Arc<dyn PluginFactory>>,
64 runtime_host: RuntimeHostConfig,
65 session_store_factory: Arc<dyn SessionStoreFactory>,
66 process_registry: Arc<dyn ProcessRegistry>,
67 ) -> Self {
68 Self::new(
69 Arc::new(PluginHost::new(plugin_factories.into_iter().collect())),
70 runtime_host,
71 session_store_factory,
72 process_registry,
73 )
74 }
75
76 pub fn from_plugin_stack(
77 plugin_stack: PluginStack,
78 runtime_host: RuntimeHostConfig,
79 session_store_factory: Arc<dyn SessionStoreFactory>,
80 process_registry: Arc<dyn ProcessRegistry>,
81 ) -> Self {
82 Self::from_plugin_factories(
83 plugin_stack.into_factories(),
84 runtime_host,
85 session_store_factory,
86 process_registry,
87 )
88 }
89}
90
91#[derive(Clone)]
93pub struct DurableProcessWorker {
94 config: Arc<DurableProcessWorkerConfig>,
95}
96
97enum RecoverFailure {
99 LeaseLost(PluginError),
103 Run(PluginError),
106}
107
108impl DurableProcessWorker {
109 pub fn new(config: DurableProcessWorkerConfig) -> Self {
110 Self {
111 config: Arc::new(config),
112 }
113 }
114
115 pub fn from_shared_config(config: Arc<DurableProcessWorkerConfig>) -> Self {
116 Self { config }
117 }
118
119 pub fn config(&self) -> &DurableProcessWorkerConfig {
120 &self.config
121 }
122
123 pub async fn run_process(
124 &self,
125 registration: ProcessRegistration,
126 execution_context: ProcessExecutionContext,
127 cancellation: CancellationToken,
128 ) -> Result<ProcessAwaitOutput, PluginError> {
129 let scoped_effect_controller = self
130 .config
131 .runtime_host
132 .control
133 .effect_host
134 .scoped_static(crate::EffectScope::process(registration.id.clone()))
135 .map_err(|err| PluginError::Session(err.to_string()))?
136 .ok_or_else(|| {
137 PluginError::Session(
138 "process worker effect host must provide a static process scope".to_string(),
139 )
140 })?;
141 self.run_process_with_scoped_effect_controller(
142 registration,
143 execution_context,
144 scoped_effect_controller,
145 cancellation,
146 )
147 .await
148 }
149
150 pub async fn run_process_with_scoped_effect_controller(
151 &self,
152 registration: ProcessRegistration,
153 execution_context: ProcessExecutionContext,
154 scoped_effect_controller: crate::ScopedEffectController<'_>,
155 cancellation: CancellationToken,
156 ) -> Result<ProcessAwaitOutput, PluginError> {
157 self.ensure_stable_process_id(®istration)?;
158 self.ensure_host_profile_matches(®istration)?;
159 self.ensure_durable_store_facets()?;
160 if let ProcessInput::External { metadata } = registration.input.as_ref() {
161 return Ok(ProcessAwaitOutput::Success {
162 value: serde_json::json!({ "metadata": metadata.clone() }),
163 control: None,
164 });
165 }
166 let session_id = registration.provenance.owner_scope.session_id.as_str();
167 if session_id.is_empty() {
168 return Err(PluginError::Session(format!(
169 "process `{}` is missing a structured owner scope",
170 registration.id
171 )));
172 }
173 let runtime = self.rebuild_runtime(session_id).await?;
174 let manager = RuntimeSessionServices::new(&runtime, true, None).map_err(|err| {
175 PluginError::Session(format!(
176 "failed to rebuild runtime session `{session_id}` for process `{}`: {err}",
177 registration.id
178 ))
179 })?;
180 Ok(manager
181 .run_process(
182 registration,
183 execution_context,
184 Arc::clone(&self.config.process_registry),
185 scoped_effect_controller,
186 cancellation,
187 )
188 .await)
189 }
190
191 pub async fn drive_pending_processes(&self) -> Result<(), PluginError> {
214 let records = self.config.process_registry.list_non_terminal().await?;
215 for record in records {
216 let worker = self.clone();
227 tokio::spawn(async move { worker.recover_process(record).await });
228 }
229 Ok(())
230 }
231
232 async fn recover_process(&self, record: ProcessRecord) {
233 let owner_id = format!("process-recovery-{}", uuid::Uuid::new_v4());
234 let process_id = record.id.clone();
235 let Ok(lease) = self
239 .config
240 .process_registry
241 .claim_process_lease(&process_id, &owner_id, RUNTIME_TURN_LEASE_TTL_MS)
242 .await
243 else {
244 return;
245 };
246 if self
249 .config
250 .process_registry
251 .get_process(&process_id)
252 .await
253 .is_some_and(|current| current.is_terminal())
254 {
255 self.release_or_log(&lease).await;
256 return;
257 }
258 let registration = ProcessRegistration {
259 id: record.id,
260 input: record.input,
261 event_types: record.event_types,
262 provenance: record.provenance.clone(),
263 };
264 let execution_context = ProcessExecutionContext::default()
267 .with_wake_target_scope(record.provenance.owner_scope);
268 match self
269 .run_process_with_lease_renewal(registration, execution_context, lease.clone())
270 .await
271 {
272 Ok(output) => self.complete_and_release(&lease, &process_id, output).await,
275 Err(RecoverFailure::LeaseLost(err)) => {
281 tracing::warn!(
282 process_id = %process_id,
283 error = %err,
284 "process recovery lost its lease mid-run; deferring to the new owner",
285 );
286 }
287 Err(RecoverFailure::Run(err)) => {
290 let output = ProcessAwaitOutput::Failure {
291 class: crate::ToolFailureClass::Execution,
292 code: "process_recovery_failed".to_string(),
293 message: err.to_string(),
294 raw: None,
295 control: None,
296 };
297 self.complete_and_release(&lease, &process_id, output).await;
298 }
299 }
300 }
301
302 async fn complete_and_release(
306 &self,
307 lease: &ProcessLease,
308 process_id: &str,
309 output: ProcessAwaitOutput,
310 ) {
311 let fenced = match self
318 .config
319 .process_registry
320 .renew_process_lease(lease, RUNTIME_TURN_LEASE_TTL_MS)
321 .await
322 {
323 Ok(renewed) => renewed,
324 Err(err) => {
325 tracing::warn!(
326 process_id = %process_id,
327 error = %err,
328 "lost process lease before terminal write; deferring to the new owner",
329 );
330 return;
331 }
332 };
333 if let Err(err) = self
334 .config
335 .process_registry
336 .complete_process(process_id, output)
337 .await
338 {
339 tracing::warn!(
340 process_id = %process_id,
341 error = %err,
342 "failed to write recovered process terminal outcome",
343 );
344 }
345 self.release_or_log(&fenced).await;
346 }
347
348 async fn release_or_log(&self, lease: &ProcessLease) {
349 if let Err(err) = self.release_process_lease(lease).await {
350 tracing::warn!(
351 process_id = %lease.process_id,
352 error = %err,
353 "failed to release recovered process lease",
354 );
355 }
356 }
357
358 async fn run_process_with_lease_renewal(
362 &self,
363 registration: ProcessRegistration,
364 execution_context: ProcessExecutionContext,
365 mut lease: ProcessLease,
366 ) -> Result<ProcessAwaitOutput, RecoverFailure> {
367 let process_id = registration.id.clone();
368 let cancellation = CancellationToken::new();
369 let cancel_watcher = {
370 let registry = Arc::clone(&self.config.process_registry);
371 let process_id = process_id.clone();
372 let cancellation = cancellation.clone();
373 tokio::spawn(async move {
374 match registry
375 .wait_event_after(&process_id, "process.cancel_requested", 0)
376 .await
377 {
378 Ok(_) => cancellation.cancel(),
379 Err(err) => tracing::warn!(
380 process_id = %process_id,
381 error = %err,
382 "process cancel watcher stopped before observing cancellation",
383 ),
384 }
385 })
386 };
387 let pending = self.run_process(registration, execution_context, cancellation.clone());
388 tokio::pin!(pending);
389 loop {
390 tokio::select! {
391 outcome = &mut pending => {
392 cancel_watcher.abort();
393 return outcome.map_err(RecoverFailure::Run);
394 }
395 _ = tokio::time::sleep(process_lease_renew_interval()) => {
396 match self
397 .config
398 .process_registry
399 .renew_process_lease(&lease, RUNTIME_TURN_LEASE_TTL_MS)
400 .await
401 {
402 Ok(renewed) => lease = renewed,
403 Err(err) => {
404 cancellation.cancel();
405 cancel_watcher.abort();
406 return Err(RecoverFailure::LeaseLost(err));
407 }
408 }
409 }
410 }
411 }
412 }
413
414 async fn release_process_lease(&self, lease: &ProcessLease) -> Result<(), PluginError> {
415 self.config
416 .process_registry
417 .complete_process_lease(&ProcessLeaseCompletion::from_lease(lease))
418 .await
419 }
420
421 pub async fn request_process_cancel(
422 &self,
423 process_id: &str,
424 reason: Option<String>,
425 ) -> Result<(), PluginError> {
426 self.config
427 .process_registry
428 .append_event(
429 process_id,
430 crate::ProcessEventAppendRequest::cancel_requested(process_id, reason),
431 )
432 .await
433 .map(|_| ())
434 }
435
436 async fn rebuild_runtime(&self, session_id: &str) -> Result<LashRuntime, PluginError> {
437 let store = self
438 .config
439 .session_store_factory
440 .create_store(&SessionStoreCreateRequest {
441 session_id: session_id.to_string(),
442 relation: crate::SessionRelation::Root,
443 policy: self.config.session_policy.clone(),
444 })
445 .await
446 .map_err(|err| {
447 PluginError::Session(format!(
448 "failed to open session store for process worker session `{session_id}`: {err}"
449 ))
450 })?;
451 EmbeddedRuntimeBuilder::new()
452 .with_session_id(session_id.to_string())
453 .with_plugin_host(self.config.plugin_host.as_ref().clone())
454 .with_runtime_host(self.config.runtime_host.clone())
455 .with_policy(self.config.session_policy.clone())
456 .with_session_store_factory(Arc::clone(&self.config.session_store_factory))
457 .with_process_registry(Arc::clone(&self.config.process_registry))
458 .with_residency(self.config.residency)
459 .with_store(store)
460 .build()
461 .await
462 .map_err(|err| {
463 PluginError::Session(format!(
464 "failed to rebuild process worker runtime for session `{session_id}`: {err}"
465 ))
466 })
467 }
468
469 fn ensure_durable_store_facets(&self) -> Result<(), PluginError> {
478 if self
479 .config
480 .runtime_host
481 .control
482 .effect_host
483 .durability_tier()
484 != crate::DurabilityTier::Durable
485 {
486 return Ok(());
487 }
488 let require = |facet: crate::DurableStoreFacet| {
489 PluginError::Session(crate::RuntimeError::durable_store_required(facet).to_string())
490 };
491 if self
492 .config
493 .runtime_host
494 .durability
495 .attachment_store
496 .persistence()
497 .durability_tier()
498 != crate::DurabilityTier::Durable
499 {
500 return Err(require(crate::DurableStoreFacet::AttachmentStore));
501 }
502 if self
503 .config
504 .runtime_host
505 .durability
506 .lashlang_artifact_store
507 .durability_tier()
508 != crate::DurabilityTier::Durable
509 {
510 return Err(require(crate::DurableStoreFacet::ArtifactStore));
511 }
512 if self.config.session_store_factory.durability_tier() != crate::DurabilityTier::Durable {
513 return Err(require(crate::DurableStoreFacet::SessionStore));
514 }
515 if self.config.process_registry.durability_tier() != crate::DurabilityTier::Durable {
516 return Err(require(crate::DurableStoreFacet::ProcessRegistry));
517 }
518 Ok(())
519 }
520
521 fn ensure_stable_process_id(
529 &self,
530 registration: &ProcessRegistration,
531 ) -> Result<(), PluginError> {
532 if registration.id.trim().is_empty() {
533 return Err(PluginError::Session(
534 crate::RuntimeError::missing_process_execution_id().to_string(),
535 ));
536 }
537 Ok(())
538 }
539
540 fn ensure_host_profile_matches(
541 &self,
542 registration: &ProcessRegistration,
543 ) -> Result<(), PluginError> {
544 let actual = registration.provenance.host_profile_id.as_str();
545 let expected = self.config.runtime_host.profile.host_profile_id.as_str();
546 if actual.is_empty() || actual == expected {
547 return Ok(());
548 }
549 Err(PluginError::Session(format!(
550 "process `{}` was created for host profile `{actual}` but this worker is `{expected}`",
551 registration.id
552 )))
553 }
554}
555
556fn process_lease_renew_interval() -> Duration {
557 Duration::from_millis(process_lease_renew_interval_ms())
558}
559
560#[cfg(test)]
561fn process_lease_renew_interval_ms() -> u64 {
562 25
563}
564
565#[cfg(not(test))]
566fn process_lease_renew_interval_ms() -> u64 {
567 30_000
568}
569
570#[cfg(test)]
571mod boundary_tests {
572 use super::*;
573 use crate::{
574 AttachmentStore, AttachmentStoreError, AttachmentStorePersistence, DurabilityTier,
575 DurableStoreFacet, InMemoryAttachmentStore, LashlangArtifactStore, ProcessInput,
576 ProcessRegistration, RuntimeEffectController, RuntimeError, StoredAttachment,
577 };
578 use lash_sansio::{AttachmentCreateMeta, AttachmentId, AttachmentRef};
579
580 #[derive(Default)]
583 struct DurableController;
584
585 #[async_trait::async_trait]
586 impl RuntimeEffectController for DurableController {
587 fn durability_tier(&self) -> DurabilityTier {
588 DurabilityTier::Durable
589 }
590
591 async fn execute_effect(
592 &self,
593 _envelope: crate::RuntimeEffectEnvelope,
594 _local_executor: crate::RuntimeEffectLocalExecutor<'_>,
595 ) -> Result<crate::RuntimeEffectOutcome, crate::RuntimeEffectControllerError> {
596 unreachable!("worker boundary rejects before executing any effect")
597 }
598 }
599
600 #[derive(Default)]
602 struct DurableAttachmentStore {
603 inner: InMemoryAttachmentStore,
604 }
605
606 impl AttachmentStore for DurableAttachmentStore {
607 fn persistence(&self) -> AttachmentStorePersistence {
608 AttachmentStorePersistence::Durable
609 }
610
611 fn put(
612 &self,
613 bytes: Vec<u8>,
614 meta: AttachmentCreateMeta,
615 ) -> Result<AttachmentRef, AttachmentStoreError> {
616 self.inner.put(bytes, meta)
617 }
618
619 fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
620 self.inner.get(id)
621 }
622 }
623
624 #[derive(Default)]
626 struct DurableArtifactStore {
627 inner: lashlang::InMemoryLashlangArtifactStore,
628 }
629
630 #[async_trait::async_trait]
631 impl LashlangArtifactStore for DurableArtifactStore {
632 fn durability_tier(&self) -> DurabilityTier {
633 DurabilityTier::Durable
634 }
635
636 async fn put_module_artifact(
637 &self,
638 artifact: &lashlang::ModuleArtifact,
639 ) -> Result<(), lashlang::ArtifactStoreError> {
640 self.inner.put_module_artifact(artifact).await
641 }
642
643 async fn get_module_artifact(
644 &self,
645 module_ref: &lashlang::ModuleRef,
646 ) -> Result<Option<Arc<lashlang::ModuleArtifact>>, lashlang::ArtifactStoreError> {
647 self.inner.get_module_artifact(module_ref).await
648 }
649 }
650
651 struct TierSessionStoreFactory {
654 tier: DurabilityTier,
655 }
656
657 #[async_trait::async_trait]
658 impl SessionStoreFactory for TierSessionStoreFactory {
659 fn durability_tier(&self) -> DurabilityTier {
660 self.tier
661 }
662
663 async fn create_store(
664 &self,
665 _request: &crate::SessionStoreCreateRequest,
666 ) -> Result<Arc<dyn crate::RuntimePersistence>, String> {
667 unreachable!("worker boundary rejects before creating a session store")
668 }
669
670 async fn delete_session(&self, _session_id: &str) -> Result<(), String> {
671 Ok(())
672 }
673 }
674
675 fn worker(
679 attachment: Arc<dyn AttachmentStore>,
680 artifact: Arc<dyn LashlangArtifactStore>,
681 session_store_tier: DurabilityTier,
682 ) -> DurableProcessWorker {
683 worker_with_process_registry_tier(
684 attachment,
685 artifact,
686 session_store_tier,
687 DurabilityTier::Durable,
688 )
689 }
690
691 fn worker_with_process_registry_tier(
692 attachment: Arc<dyn AttachmentStore>,
693 artifact: Arc<dyn LashlangArtifactStore>,
694 session_store_tier: DurabilityTier,
695 process_registry_tier: DurabilityTier,
696 ) -> DurableProcessWorker {
697 let mut runtime_host = RuntimeHostConfig::in_memory();
698 runtime_host.control.effect_host =
699 Arc::new(crate::InlineEffectHost::new(Arc::new(DurableController)));
700 runtime_host.durability.attachment_store = attachment;
701 runtime_host.durability.lashlang_artifact_store = artifact;
702 let plugin_host = Arc::new(crate::PluginHost::new(Vec::new()));
703 let factory: Arc<dyn SessionStoreFactory> = Arc::new(TierSessionStoreFactory {
704 tier: session_store_tier,
705 });
706 let registry: Arc<dyn ProcessRegistry> = Arc::new(
707 crate::TestLocalProcessRegistry::default().with_durability_tier(process_registry_tier),
708 );
709 DurableProcessWorker::new(DurableProcessWorkerConfig::new(
710 plugin_host,
711 runtime_host,
712 factory,
713 registry,
714 ))
715 }
716
717 fn external_registration() -> ProcessRegistration {
718 ProcessRegistration::new(
719 "worker-boundary-process",
720 ProcessInput::External {
721 metadata: serde_json::json!({}),
722 },
723 )
724 }
725
726 async fn run(worker: &DurableProcessWorker) -> Result<ProcessAwaitOutput, PluginError> {
727 worker
728 .run_process(
729 external_registration(),
730 ProcessExecutionContext::default(),
731 CancellationToken::new(),
732 )
733 .await
734 }
735
736 fn assert_facet(err: PluginError, facet: DurableStoreFacet) {
737 let PluginError::Session(message) = err else {
738 panic!("expected PluginError::Session, got {err:?}");
739 };
740 let expected = RuntimeError::durable_store_required(facet).to_string();
741 assert_eq!(message, expected, "worker must reject the {facet:?} facet");
742 }
743
744 #[tokio::test]
745 async fn durable_worker_rejects_ephemeral_attachment_store() {
746 let worker = worker(
747 Arc::new(InMemoryAttachmentStore::new()),
748 Arc::new(DurableArtifactStore::default()),
749 DurabilityTier::Durable,
750 );
751 let err = run(&worker)
752 .await
753 .expect_err("ephemeral attachment store must be rejected at the worker boundary");
754 assert_facet(err, DurableStoreFacet::AttachmentStore);
755 }
756
757 #[tokio::test]
758 async fn durable_worker_rejects_ephemeral_artifact_store() {
759 let worker = worker(
760 Arc::new(DurableAttachmentStore::default()),
761 Arc::new(lashlang::InMemoryLashlangArtifactStore::new()),
762 DurabilityTier::Durable,
763 );
764 let err = run(&worker)
765 .await
766 .expect_err("ephemeral artifact store must be rejected at the worker boundary");
767 assert_facet(err, DurableStoreFacet::ArtifactStore);
768 }
769
770 #[tokio::test]
771 async fn durable_worker_rejects_ephemeral_session_store_factory() {
772 let worker = worker(
773 Arc::new(DurableAttachmentStore::default()),
774 Arc::new(DurableArtifactStore::default()),
775 DurabilityTier::Inline,
776 );
777 let err = run(&worker)
778 .await
779 .expect_err("ephemeral session store factory must be rejected at the worker boundary");
780 assert_facet(err, DurableStoreFacet::SessionStore);
781 }
782
783 #[tokio::test]
784 async fn durable_worker_rejects_ephemeral_process_registry() {
785 let worker = worker_with_process_registry_tier(
786 Arc::new(DurableAttachmentStore::default()),
787 Arc::new(DurableArtifactStore::default()),
788 DurabilityTier::Durable,
789 DurabilityTier::Inline,
790 );
791 let err = run(&worker)
792 .await
793 .expect_err("ephemeral process registry must be rejected at the worker boundary");
794 assert_facet(err, DurableStoreFacet::ProcessRegistry);
795 }
796
797 #[tokio::test]
798 async fn durable_worker_with_all_durable_stores_passes_store_facet_check() {
799 let worker = worker(
803 Arc::new(DurableAttachmentStore::default()),
804 Arc::new(DurableArtifactStore::default()),
805 DurabilityTier::Durable,
806 );
807 let output = run(&worker)
808 .await
809 .expect("all-durable worker should pass the store-facet guard");
810 assert!(matches!(output, ProcessAwaitOutput::Success { .. }));
811 }
812
813 #[tokio::test]
814 async fn inline_worker_passes_store_facet_check_with_ephemeral_stores() {
815 let runtime_host = RuntimeHostConfig::in_memory();
818 let plugin_host = Arc::new(crate::PluginHost::new(Vec::new()));
819 let factory: Arc<dyn SessionStoreFactory> = Arc::new(TierSessionStoreFactory {
820 tier: DurabilityTier::Inline,
821 });
822 let registry: Arc<dyn ProcessRegistry> =
823 Arc::new(crate::TestLocalProcessRegistry::default());
824 let worker = DurableProcessWorker::new(DurableProcessWorkerConfig::new(
825 plugin_host,
826 runtime_host,
827 factory,
828 registry,
829 ));
830 let output = run(&worker)
831 .await
832 .expect("inline worker should pass the store-facet guard");
833 assert!(matches!(output, ProcessAwaitOutput::Success { .. }));
834 }
835}