use std::sync::Arc;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use super::effect::ProcessRunner;
use super::session_manager::RuntimeSessionServices;
use super::{EmbeddedRuntimeBuilder, RUNTIME_TURN_LEASE_TTL_MS, RuntimeHostConfig};
use crate::{
LashRuntime, PluginError, PluginFactory, PluginHost, PluginStack, ProcessAwaitOutput,
ProcessExecutionContext, ProcessInput, ProcessLease, ProcessLeaseCompletion, ProcessRecord,
ProcessRegistration, ProcessRegistry, SessionStoreCreateRequest, SessionStoreFactory,
};
#[derive(Clone)]
pub struct DurableProcessWorkerConfig {
pub plugin_host: Arc<PluginHost>,
pub runtime_host: RuntimeHostConfig,
pub session_policy: crate::SessionPolicy,
pub session_store_factory: Arc<dyn SessionStoreFactory>,
pub process_registry: Arc<dyn ProcessRegistry>,
pub residency: crate::Residency,
}
impl DurableProcessWorkerConfig {
pub fn new(
plugin_host: Arc<PluginHost>,
runtime_host: RuntimeHostConfig,
session_store_factory: Arc<dyn SessionStoreFactory>,
process_registry: Arc<dyn ProcessRegistry>,
) -> Self {
Self {
plugin_host,
runtime_host,
session_policy: crate::SessionPolicy::default(),
session_store_factory,
process_registry,
residency: crate::Residency::default(),
}
}
pub fn with_session_policy(mut self, policy: crate::SessionPolicy) -> Self {
self.session_policy = policy;
self
}
pub fn with_residency(mut self, residency: crate::Residency) -> Self {
self.residency = residency;
self
}
pub fn from_plugin_factories(
plugin_factories: impl IntoIterator<Item = Arc<dyn PluginFactory>>,
runtime_host: RuntimeHostConfig,
session_store_factory: Arc<dyn SessionStoreFactory>,
process_registry: Arc<dyn ProcessRegistry>,
) -> Self {
Self::new(
Arc::new(PluginHost::new(plugin_factories.into_iter().collect())),
runtime_host,
session_store_factory,
process_registry,
)
}
pub fn from_plugin_stack(
plugin_stack: PluginStack,
runtime_host: RuntimeHostConfig,
session_store_factory: Arc<dyn SessionStoreFactory>,
process_registry: Arc<dyn ProcessRegistry>,
) -> Self {
Self::from_plugin_factories(
plugin_stack.into_factories(),
runtime_host,
session_store_factory,
process_registry,
)
}
}
#[derive(Clone)]
pub struct DurableProcessWorker {
config: Arc<DurableProcessWorkerConfig>,
}
enum RecoverFailure {
LeaseLost(PluginError),
Run(PluginError),
}
impl DurableProcessWorker {
pub fn new(config: DurableProcessWorkerConfig) -> Self {
Self {
config: Arc::new(config),
}
}
pub fn from_shared_config(config: Arc<DurableProcessWorkerConfig>) -> Self {
Self { config }
}
pub fn config(&self) -> &DurableProcessWorkerConfig {
&self.config
}
pub async fn run_process(
&self,
registration: ProcessRegistration,
execution_context: ProcessExecutionContext,
cancellation: CancellationToken,
) -> Result<ProcessAwaitOutput, PluginError> {
let scoped_effect_controller = self
.config
.runtime_host
.control
.effect_host
.scoped_static(crate::EffectScope::process(registration.id.clone()))
.map_err(|err| PluginError::Session(err.to_string()))?
.ok_or_else(|| {
PluginError::Session(
"process worker effect host must provide a static process scope".to_string(),
)
})?;
self.run_process_with_scoped_effect_controller(
registration,
execution_context,
scoped_effect_controller,
cancellation,
)
.await
}
pub async fn run_process_with_scoped_effect_controller(
&self,
registration: ProcessRegistration,
execution_context: ProcessExecutionContext,
scoped_effect_controller: crate::ScopedEffectController<'_>,
cancellation: CancellationToken,
) -> Result<ProcessAwaitOutput, PluginError> {
self.ensure_stable_process_id(®istration)?;
self.ensure_host_profile_matches(®istration)?;
self.ensure_durable_store_facets()?;
if let ProcessInput::External { metadata } = registration.input.as_ref() {
return Ok(ProcessAwaitOutput::Success {
value: serde_json::json!({ "metadata": metadata.clone() }),
control: None,
});
}
let session_id = registration.provenance.owner_scope.session_id.as_str();
if session_id.is_empty() {
return Err(PluginError::Session(format!(
"process `{}` is missing a structured owner scope",
registration.id
)));
}
let runtime = self.rebuild_runtime(session_id).await?;
let manager = RuntimeSessionServices::new(&runtime, true, None).map_err(|err| {
PluginError::Session(format!(
"failed to rebuild runtime session `{session_id}` for process `{}`: {err}",
registration.id
))
})?;
Ok(manager
.run_process(
registration,
execution_context,
Arc::clone(&self.config.process_registry),
scoped_effect_controller,
cancellation,
)
.await)
}
pub async fn drive_pending_processes(&self) -> Result<(), PluginError> {
let records = self.config.process_registry.list_non_terminal().await?;
for record in records {
let worker = self.clone();
tokio::spawn(async move { worker.recover_process(record).await });
}
Ok(())
}
async fn recover_process(&self, record: ProcessRecord) {
let owner_id = format!("process-recovery-{}", uuid::Uuid::new_v4());
let process_id = record.id.clone();
let Ok(lease) = self
.config
.process_registry
.claim_process_lease(&process_id, &owner_id, RUNTIME_TURN_LEASE_TTL_MS)
.await
else {
return;
};
if self
.config
.process_registry
.get_process(&process_id)
.await
.is_some_and(|current| current.is_terminal())
{
self.release_or_log(&lease).await;
return;
}
let registration = ProcessRegistration {
id: record.id,
input: record.input,
event_types: record.event_types,
provenance: record.provenance.clone(),
};
let execution_context = ProcessExecutionContext::default()
.with_wake_target_scope(record.provenance.owner_scope);
match self
.run_process_with_lease_renewal(registration, execution_context, lease.clone())
.await
{
Ok(output) => self.complete_and_release(&lease, &process_id, output).await,
Err(RecoverFailure::LeaseLost(err)) => {
tracing::warn!(
process_id = %process_id,
error = %err,
"process recovery lost its lease mid-run; deferring to the new owner",
);
}
Err(RecoverFailure::Run(err)) => {
let output = ProcessAwaitOutput::Failure {
class: crate::ToolFailureClass::Execution,
code: "process_recovery_failed".to_string(),
message: err.to_string(),
raw: None,
control: None,
};
self.complete_and_release(&lease, &process_id, output).await;
}
}
}
async fn complete_and_release(
&self,
lease: &ProcessLease,
process_id: &str,
output: ProcessAwaitOutput,
) {
let fenced = match self
.config
.process_registry
.renew_process_lease(lease, RUNTIME_TURN_LEASE_TTL_MS)
.await
{
Ok(renewed) => renewed,
Err(err) => {
tracing::warn!(
process_id = %process_id,
error = %err,
"lost process lease before terminal write; deferring to the new owner",
);
return;
}
};
if let Err(err) = self
.config
.process_registry
.complete_process(process_id, output)
.await
{
tracing::warn!(
process_id = %process_id,
error = %err,
"failed to write recovered process terminal outcome",
);
}
self.release_or_log(&fenced).await;
}
async fn release_or_log(&self, lease: &ProcessLease) {
if let Err(err) = self.release_process_lease(lease).await {
tracing::warn!(
process_id = %lease.process_id,
error = %err,
"failed to release recovered process lease",
);
}
}
async fn run_process_with_lease_renewal(
&self,
registration: ProcessRegistration,
execution_context: ProcessExecutionContext,
mut lease: ProcessLease,
) -> Result<ProcessAwaitOutput, RecoverFailure> {
let process_id = registration.id.clone();
let cancellation = CancellationToken::new();
let cancel_watcher = {
let registry = Arc::clone(&self.config.process_registry);
let process_id = process_id.clone();
let cancellation = cancellation.clone();
tokio::spawn(async move {
match registry
.wait_event_after(&process_id, "process.cancel_requested", 0)
.await
{
Ok(_) => cancellation.cancel(),
Err(err) => tracing::warn!(
process_id = %process_id,
error = %err,
"process cancel watcher stopped before observing cancellation",
),
}
})
};
let pending = self.run_process(registration, execution_context, cancellation.clone());
tokio::pin!(pending);
loop {
tokio::select! {
outcome = &mut pending => {
cancel_watcher.abort();
return outcome.map_err(RecoverFailure::Run);
}
_ = tokio::time::sleep(process_lease_renew_interval()) => {
match self
.config
.process_registry
.renew_process_lease(&lease, RUNTIME_TURN_LEASE_TTL_MS)
.await
{
Ok(renewed) => lease = renewed,
Err(err) => {
cancellation.cancel();
cancel_watcher.abort();
return Err(RecoverFailure::LeaseLost(err));
}
}
}
}
}
}
async fn release_process_lease(&self, lease: &ProcessLease) -> Result<(), PluginError> {
self.config
.process_registry
.complete_process_lease(&ProcessLeaseCompletion::from_lease(lease))
.await
}
pub async fn request_process_cancel(
&self,
process_id: &str,
reason: Option<String>,
) -> Result<(), PluginError> {
self.config
.process_registry
.append_event(
process_id,
crate::ProcessEventAppendRequest::cancel_requested(process_id, reason),
)
.await
.map(|_| ())
}
async fn rebuild_runtime(&self, session_id: &str) -> Result<LashRuntime, PluginError> {
let store = self
.config
.session_store_factory
.create_store(&SessionStoreCreateRequest {
session_id: session_id.to_string(),
relation: crate::SessionRelation::Root,
policy: self.config.session_policy.clone(),
})
.await
.map_err(|err| {
PluginError::Session(format!(
"failed to open session store for process worker session `{session_id}`: {err}"
))
})?;
EmbeddedRuntimeBuilder::new()
.with_session_id(session_id.to_string())
.with_plugin_host(self.config.plugin_host.as_ref().clone())
.with_runtime_host(self.config.runtime_host.clone())
.with_policy(self.config.session_policy.clone())
.with_session_store_factory(Arc::clone(&self.config.session_store_factory))
.with_process_registry(Arc::clone(&self.config.process_registry))
.with_residency(self.config.residency)
.with_store(store)
.build()
.await
.map_err(|err| {
PluginError::Session(format!(
"failed to rebuild process worker runtime for session `{session_id}`: {err}"
))
})
}
fn ensure_durable_store_facets(&self) -> Result<(), PluginError> {
if self
.config
.runtime_host
.control
.effect_host
.durability_tier()
!= crate::DurabilityTier::Durable
{
return Ok(());
}
let require = |facet: crate::DurableStoreFacet| {
PluginError::Session(crate::RuntimeError::durable_store_required(facet).to_string())
};
if self
.config
.runtime_host
.durability
.attachment_store
.persistence()
.durability_tier()
!= crate::DurabilityTier::Durable
{
return Err(require(crate::DurableStoreFacet::AttachmentStore));
}
if self
.config
.runtime_host
.durability
.lashlang_artifact_store
.durability_tier()
!= crate::DurabilityTier::Durable
{
return Err(require(crate::DurableStoreFacet::ArtifactStore));
}
if self.config.session_store_factory.durability_tier() != crate::DurabilityTier::Durable {
return Err(require(crate::DurableStoreFacet::SessionStore));
}
if self.config.process_registry.durability_tier() != crate::DurabilityTier::Durable {
return Err(require(crate::DurableStoreFacet::ProcessRegistry));
}
Ok(())
}
fn ensure_stable_process_id(
&self,
registration: &ProcessRegistration,
) -> Result<(), PluginError> {
if registration.id.trim().is_empty() {
return Err(PluginError::Session(
crate::RuntimeError::missing_process_execution_id().to_string(),
));
}
Ok(())
}
fn ensure_host_profile_matches(
&self,
registration: &ProcessRegistration,
) -> Result<(), PluginError> {
let actual = registration.provenance.host_profile_id.as_str();
let expected = self.config.runtime_host.profile.host_profile_id.as_str();
if actual.is_empty() || actual == expected {
return Ok(());
}
Err(PluginError::Session(format!(
"process `{}` was created for host profile `{actual}` but this worker is `{expected}`",
registration.id
)))
}
}
fn process_lease_renew_interval() -> Duration {
Duration::from_millis(process_lease_renew_interval_ms())
}
#[cfg(test)]
fn process_lease_renew_interval_ms() -> u64 {
25
}
#[cfg(not(test))]
fn process_lease_renew_interval_ms() -> u64 {
30_000
}
#[cfg(test)]
mod boundary_tests {
use super::*;
use crate::{
AttachmentStore, AttachmentStoreError, AttachmentStorePersistence, DurabilityTier,
DurableStoreFacet, InMemoryAttachmentStore, LashlangArtifactStore, ProcessInput,
ProcessRegistration, RuntimeEffectController, RuntimeError, StoredAttachment,
};
use lash_sansio::{AttachmentCreateMeta, AttachmentId, AttachmentRef};
#[derive(Default)]
struct DurableController;
#[async_trait::async_trait]
impl RuntimeEffectController for DurableController {
fn durability_tier(&self) -> DurabilityTier {
DurabilityTier::Durable
}
async fn execute_effect(
&self,
_envelope: crate::RuntimeEffectEnvelope,
_local_executor: crate::RuntimeEffectLocalExecutor<'_>,
) -> Result<crate::RuntimeEffectOutcome, crate::RuntimeEffectControllerError> {
unreachable!("worker boundary rejects before executing any effect")
}
}
#[derive(Default)]
struct DurableAttachmentStore {
inner: InMemoryAttachmentStore,
}
impl AttachmentStore for DurableAttachmentStore {
fn persistence(&self) -> AttachmentStorePersistence {
AttachmentStorePersistence::Durable
}
fn put(
&self,
bytes: Vec<u8>,
meta: AttachmentCreateMeta,
) -> Result<AttachmentRef, AttachmentStoreError> {
self.inner.put(bytes, meta)
}
fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
self.inner.get(id)
}
}
#[derive(Default)]
struct DurableArtifactStore {
inner: lashlang::InMemoryLashlangArtifactStore,
}
#[async_trait::async_trait]
impl LashlangArtifactStore for DurableArtifactStore {
fn durability_tier(&self) -> DurabilityTier {
DurabilityTier::Durable
}
async fn put_module_artifact(
&self,
artifact: &lashlang::ModuleArtifact,
) -> Result<(), lashlang::ArtifactStoreError> {
self.inner.put_module_artifact(artifact).await
}
async fn get_module_artifact(
&self,
module_ref: &lashlang::ModuleRef,
) -> Result<Option<Arc<lashlang::ModuleArtifact>>, lashlang::ArtifactStoreError> {
self.inner.get_module_artifact(module_ref).await
}
}
struct TierSessionStoreFactory {
tier: DurabilityTier,
}
#[async_trait::async_trait]
impl SessionStoreFactory for TierSessionStoreFactory {
fn durability_tier(&self) -> DurabilityTier {
self.tier
}
async fn create_store(
&self,
_request: &crate::SessionStoreCreateRequest,
) -> Result<Arc<dyn crate::RuntimePersistence>, String> {
unreachable!("worker boundary rejects before creating a session store")
}
async fn delete_session(&self, _session_id: &str) -> Result<(), String> {
Ok(())
}
}
fn worker(
attachment: Arc<dyn AttachmentStore>,
artifact: Arc<dyn LashlangArtifactStore>,
session_store_tier: DurabilityTier,
) -> DurableProcessWorker {
worker_with_process_registry_tier(
attachment,
artifact,
session_store_tier,
DurabilityTier::Durable,
)
}
fn worker_with_process_registry_tier(
attachment: Arc<dyn AttachmentStore>,
artifact: Arc<dyn LashlangArtifactStore>,
session_store_tier: DurabilityTier,
process_registry_tier: DurabilityTier,
) -> DurableProcessWorker {
let mut runtime_host = RuntimeHostConfig::in_memory();
runtime_host.control.effect_host =
Arc::new(crate::InlineEffectHost::new(Arc::new(DurableController)));
runtime_host.durability.attachment_store = attachment;
runtime_host.durability.lashlang_artifact_store = artifact;
let plugin_host = Arc::new(crate::PluginHost::new(Vec::new()));
let factory: Arc<dyn SessionStoreFactory> = Arc::new(TierSessionStoreFactory {
tier: session_store_tier,
});
let registry: Arc<dyn ProcessRegistry> = Arc::new(
crate::TestLocalProcessRegistry::default().with_durability_tier(process_registry_tier),
);
DurableProcessWorker::new(DurableProcessWorkerConfig::new(
plugin_host,
runtime_host,
factory,
registry,
))
}
fn external_registration() -> ProcessRegistration {
ProcessRegistration::new(
"worker-boundary-process",
ProcessInput::External {
metadata: serde_json::json!({}),
},
)
}
async fn run(worker: &DurableProcessWorker) -> Result<ProcessAwaitOutput, PluginError> {
worker
.run_process(
external_registration(),
ProcessExecutionContext::default(),
CancellationToken::new(),
)
.await
}
fn assert_facet(err: PluginError, facet: DurableStoreFacet) {
let PluginError::Session(message) = err else {
panic!("expected PluginError::Session, got {err:?}");
};
let expected = RuntimeError::durable_store_required(facet).to_string();
assert_eq!(message, expected, "worker must reject the {facet:?} facet");
}
#[tokio::test]
async fn durable_worker_rejects_ephemeral_attachment_store() {
let worker = worker(
Arc::new(InMemoryAttachmentStore::new()),
Arc::new(DurableArtifactStore::default()),
DurabilityTier::Durable,
);
let err = run(&worker)
.await
.expect_err("ephemeral attachment store must be rejected at the worker boundary");
assert_facet(err, DurableStoreFacet::AttachmentStore);
}
#[tokio::test]
async fn durable_worker_rejects_ephemeral_artifact_store() {
let worker = worker(
Arc::new(DurableAttachmentStore::default()),
Arc::new(lashlang::InMemoryLashlangArtifactStore::new()),
DurabilityTier::Durable,
);
let err = run(&worker)
.await
.expect_err("ephemeral artifact store must be rejected at the worker boundary");
assert_facet(err, DurableStoreFacet::ArtifactStore);
}
#[tokio::test]
async fn durable_worker_rejects_ephemeral_session_store_factory() {
let worker = worker(
Arc::new(DurableAttachmentStore::default()),
Arc::new(DurableArtifactStore::default()),
DurabilityTier::Inline,
);
let err = run(&worker)
.await
.expect_err("ephemeral session store factory must be rejected at the worker boundary");
assert_facet(err, DurableStoreFacet::SessionStore);
}
#[tokio::test]
async fn durable_worker_rejects_ephemeral_process_registry() {
let worker = worker_with_process_registry_tier(
Arc::new(DurableAttachmentStore::default()),
Arc::new(DurableArtifactStore::default()),
DurabilityTier::Durable,
DurabilityTier::Inline,
);
let err = run(&worker)
.await
.expect_err("ephemeral process registry must be rejected at the worker boundary");
assert_facet(err, DurableStoreFacet::ProcessRegistry);
}
#[tokio::test]
async fn durable_worker_with_all_durable_stores_passes_store_facet_check() {
let worker = worker(
Arc::new(DurableAttachmentStore::default()),
Arc::new(DurableArtifactStore::default()),
DurabilityTier::Durable,
);
let output = run(&worker)
.await
.expect("all-durable worker should pass the store-facet guard");
assert!(matches!(output, ProcessAwaitOutput::Success { .. }));
}
#[tokio::test]
async fn inline_worker_passes_store_facet_check_with_ephemeral_stores() {
let runtime_host = RuntimeHostConfig::in_memory();
let plugin_host = Arc::new(crate::PluginHost::new(Vec::new()));
let factory: Arc<dyn SessionStoreFactory> = Arc::new(TierSessionStoreFactory {
tier: DurabilityTier::Inline,
});
let registry: Arc<dyn ProcessRegistry> =
Arc::new(crate::TestLocalProcessRegistry::default());
let worker = DurableProcessWorker::new(DurableProcessWorkerConfig::new(
plugin_host,
runtime_host,
factory,
registry,
));
let output = run(&worker)
.await
.expect("inline worker should pass the store-facet guard");
assert!(matches!(output, ProcessAwaitOutput::Success { .. }));
}
}