use std::sync::Arc;
use aion_core::{Event, RunId, WorkflowFilter, WorkflowStatus};
use crate::{
EngineError,
durability::current_run_segment,
lifecycle::start::{StartWorkflowContext, StartWorkflowOptions, start_workflow_with_options},
};
use super::startup::StartupRecoveryContext;
pub(super) async fn sweep_recorded_children(
context: &StartupRecoveryContext,
parent_workflow_id: &aion_core::WorkflowId,
parent_run_id: &RunId,
parent_history: &[Event],
) -> Result<(), EngineError> {
let segment = current_run_segment(parent_history.to_vec(), parent_run_id)?;
for event in &segment {
let Event::ChildWorkflowStarted {
child_workflow_id,
workflow_type,
input,
package_version,
..
} = event
else {
continue;
};
let has_parent_side_terminal = segment.iter().any(|candidate| {
matches!(
candidate,
Event::ChildWorkflowCompleted { child_workflow_id: recorded, .. }
| Event::ChildWorkflowFailed { child_workflow_id: recorded, .. }
if recorded == child_workflow_id
)
});
if has_parent_side_terminal {
continue;
}
let child_history = context
.store
.as_ref()
.read_history(child_workflow_id)
.await?;
if !child_history.is_empty() {
continue;
}
tracing::info!(
parent_workflow_id = %parent_workflow_id,
child_workflow_id = %child_workflow_id,
workflow_type = %workflow_type,
"starting recorded-but-never-spawned child found by the recovery sweep"
);
start_workflow_with_options(
StartWorkflowContext {
store: Arc::clone(&context.store),
visibility_store: Arc::clone(&context.visibility_store),
catalog: Arc::clone(&context.catalog),
runtime: Arc::clone(&context.runtime),
supervision: Arc::clone(&context.supervision),
registry: Arc::clone(&context.registry),
signal_handoff: None,
search_attribute_schema: Arc::clone(&context.search_attribute_schema),
monitor_tokio_handle: tokio::runtime::Handle::current(),
},
workflow_type,
input.clone(),
StartWorkflowOptions {
workflow_id: Some(child_workflow_id.clone()),
loaded_version: Some(crate::loader::parse_package_version(
workflow_type,
package_version,
)?),
..StartWorkflowOptions::default()
},
)
.await?;
}
Ok(())
}
pub(super) async fn sweep_continued_as_new_replacements(
context: &StartupRecoveryContext,
) -> Result<(), EngineError> {
let stranded = context
.store
.as_ref()
.query(&WorkflowFilter {
status: Some(WorkflowStatus::ContinuedAsNew),
..WorkflowFilter::default()
})
.await?;
for summary in stranded {
let workflow_id = summary.workflow_id;
let history = context.store.as_ref().read_history(&workflow_id).await?;
let Some((input, type_override, continued_run_id)) =
history.iter().rev().find_map(|event| match event {
Event::WorkflowContinuedAsNew {
input,
workflow_type,
parent_run_id,
..
} => Some((input.clone(), workflow_type.clone(), parent_run_id.clone())),
_ => None,
})
else {
continue;
};
let already_started = history.iter().any(|event| {
matches!(
event,
Event::WorkflowStarted {
parent_run_id: Some(existing),
..
} if existing == &continued_run_id
)
});
if already_started {
continue;
}
let replacement_type = match type_override {
Some(workflow_type) => workflow_type,
None => continued_run_workflow_type(&workflow_id, &history, &continued_run_id)?,
};
tracing::info!(
workflow_id = %workflow_id,
continued_run_id = %continued_run_id,
workflow_type = %replacement_type,
"starting continue-as-new successor run found by the recovery sweep"
);
let started = start_workflow_with_options(
StartWorkflowContext {
store: Arc::clone(&context.store),
visibility_store: Arc::clone(&context.visibility_store),
catalog: Arc::clone(&context.catalog),
runtime: Arc::clone(&context.runtime),
supervision: Arc::clone(&context.supervision),
registry: Arc::clone(&context.registry),
signal_handoff: None,
search_attribute_schema: Arc::clone(&context.search_attribute_schema),
monitor_tokio_handle: tokio::runtime::Handle::current(),
},
&replacement_type,
input,
StartWorkflowOptions {
workflow_id: Some(workflow_id.clone()),
parent_run_id: Some(continued_run_id.clone()),
..StartWorkflowOptions::default()
},
)
.await;
if let Err(error) = started {
if successor_started(context, &workflow_id, &continued_run_id).await? {
tracing::info!(
workflow_id = %workflow_id,
continued_run_id = %continued_run_id,
error = %error,
"continue-as-new sweep lost the start race to the exit monitor; \
successor run is durable"
);
continue;
}
return Err(error);
}
}
Ok(())
}
async fn successor_started(
context: &StartupRecoveryContext,
workflow_id: &aion_core::WorkflowId,
continued_run_id: &RunId,
) -> Result<bool, EngineError> {
let history = context.store.as_ref().read_history(workflow_id).await?;
Ok(history.iter().any(|event| {
matches!(
event,
Event::WorkflowStarted {
parent_run_id: Some(existing),
..
} if existing == continued_run_id
)
}))
}
fn continued_run_workflow_type(
workflow_id: &aion_core::WorkflowId,
history: &[Event],
continued_run_id: &RunId,
) -> Result<String, EngineError> {
history
.iter()
.find_map(|event| match event {
Event::WorkflowStarted {
run_id,
workflow_type,
..
} if run_id == continued_run_id => Some(workflow_type.clone()),
_ => None,
})
.ok_or_else(|| EngineError::Load {
reason: format!(
"workflow `{workflow_id}` continued from run `{continued_run_id}` \
but that run has no WorkflowStarted event in durable history"
),
})
}