use std::collections::HashMap;
use std::sync::Arc;
use aion_core::{
Event, Payload, RunId, SearchAttributeSchema, SearchAttributeValue, WorkflowId, WorkflowStatus,
};
use aion_package::ContentHash;
use aion_store::EventStore;
use aion_store::visibility::VisibilityStore;
use chrono::Utc;
use super::completion::{ProcessExitContext, handle_process_exit};
use super::visibility::upsert_workflow_visibility;
use crate::durability::Recorder;
use crate::loader::LoadedWorkflows;
use crate::registry::{
CompletionNotifier, HandleResidency, Registry, WorkflowHandle, WorkflowHandleParts,
};
use crate::runtime::{RuntimeHandle, RuntimeInput};
use crate::supervision::{SupervisionTree, spawn_workflow_with_policy};
use crate::{
EngineError,
engine_seam::{
ChildWorkflowSpawnRequest, ChildWorkflowSpawnResult, EngineHandle, EngineSeamError,
TimerWheelEntry, WorkflowMailboxMessage, WorkflowProcessHandle, WorkflowResidency,
},
signal::SignalResumeHandoff,
};
pub struct StartWorkflowContext<'a> {
pub store: Arc<dyn EventStore>,
pub visibility_store: Arc<dyn VisibilityStore>,
pub loaded_workflows: &'a LoadedWorkflows,
pub runtime: Arc<RuntimeHandle>,
pub supervision: Arc<SupervisionTree>,
pub registry: Arc<Registry>,
pub signal_handoff: Option<Arc<SignalResumeHandoff>>,
pub search_attribute_schema: Arc<SearchAttributeSchema>,
}
#[derive(Clone, Debug, Default)]
pub struct StartWorkflowOptions {
pub workflow_id: Option<WorkflowId>,
pub parent_run_id: Option<RunId>,
pub loaded_version: Option<ContentHash>,
pub search_attributes: HashMap<String, SearchAttributeValue>,
}
pub async fn start_workflow(
context: StartWorkflowContext<'_>,
workflow_type: &str,
input: Payload,
) -> Result<WorkflowHandle, EngineError> {
start_workflow_with_options(
context,
workflow_type,
input,
StartWorkflowOptions::default(),
)
.await
}
pub async fn start_workflow_with_options(
context: StartWorkflowContext<'_>,
workflow_type: &str,
input: Payload,
options: StartWorkflowOptions,
) -> Result<WorkflowHandle, EngineError> {
let loaded = match &options.loaded_version {
Some(version) => context.loaded_workflows.get(workflow_type, version),
None => context.loaded_workflows.latest(workflow_type),
}
.ok_or_else(|| EngineError::WorkflowNotFound {
workflow_type: workflow_type.to_owned(),
})?;
let supplied_workflow_id = options.workflow_id.is_some();
let workflow_id = options.workflow_id.unwrap_or_else(WorkflowId::new_v4);
let run_id = RunId::new_v4();
let initial_head = if supplied_workflow_id {
context
.store
.read_history(&workflow_id)
.await?
.iter()
.map(Event::seq)
.max()
.unwrap_or_default()
} else {
0
};
let mut recorder = Recorder::resume_at(
workflow_id.clone(),
Arc::clone(&context.store),
initial_head,
)
.with_visibility(run_id.clone(), Arc::clone(&context.visibility_store));
recorder
.record_workflow_started_with_attributes(
Utc::now(),
crate::durability::WorkflowStartRecord {
workflow_type: workflow_type.to_owned(),
input: input.clone(),
run_id: run_id.clone(),
parent_run_id: options.parent_run_id,
},
options.search_attributes,
&context.search_attribute_schema,
)
.await?;
upsert_workflow_visibility(
Arc::clone(&context.store),
Arc::clone(&context.visibility_store),
&workflow_id,
&run_id,
)
.await?;
context
.supervision
.ensure_type_supervisor(loaded.workflow_type())?;
let runtime_input = RuntimeInput::from_payload(&input)?;
let pid = spawn_workflow_with_policy(
&context.runtime,
loaded.deployed_entry_module(),
loaded.entry_function(),
runtime_input,
)?;
if let Err(error) = context
.supervision
.place_workflow(loaded.workflow_type(), pid)
{
let _ = context.runtime.cancel_pid(pid);
return Err(error);
}
let completion = CompletionNotifier::new();
let handle = WorkflowHandle::new(WorkflowHandleParts {
workflow_id: workflow_id.clone(),
run_id: run_id.clone(),
pid,
workflow_type: loaded.workflow_type().to_owned(),
loaded_version: loaded.version().clone(),
cached_status: WorkflowStatus::Running,
residency: HandleResidency::Resident,
recorder,
completion,
});
if let Err(error) = context
.registry
.insert((workflow_id, run_id), handle.clone())
.map(|_| ())
{
let _ = context.runtime.cancel_pid(pid);
return Err(error);
}
if let Err(error) = install_completion_monitor(&context, pid, &handle) {
let _ = context
.registry
.remove(handle.workflow_id(), handle.run_id());
let _ = context.runtime.cancel_pid(pid);
return Err(error);
}
deliver_deferred_signals(&context, &handle);
Ok(handle)
}
fn install_completion_monitor(
context: &StartWorkflowContext<'_>,
pid: crate::Pid,
handle: &WorkflowHandle,
) -> Result<(), EngineError> {
let completion_context = ProcessExitContext {
store: Arc::clone(&context.store),
visibility_store: Arc::clone(&context.visibility_store),
registry: Arc::clone(&context.registry),
loaded_workflows: Arc::new(context.loaded_workflows.clone()),
runtime: Arc::clone(&context.runtime),
supervision: Arc::clone(&context.supervision),
tokio_handle: tokio::runtime::Handle::current(),
search_attribute_schema: Arc::clone(&context.search_attribute_schema),
};
let completion_handle = handle.clone();
context.runtime.monitor_process(pid, move |outcome| {
if let Err(error) = handle_process_exit(completion_context, completion_handle, outcome) {
tracing::error!(workflow_pid = pid, error = %error, "workflow process monitor completion failed");
}
})?;
Ok(())
}
fn deliver_deferred_signals(context: &StartWorkflowContext<'_>, handle: &WorkflowHandle) {
let Some(handoff) = &context.signal_handoff else {
return;
};
let adapter = StartResumeEngineHandle {
runtime: &context.runtime,
registry: &context.registry,
};
if let Err(error) = handoff.deliver_deferred(&adapter, handle.workflow_id()) {
tracing::warn!(
workflow_id = %handle.workflow_id(),
error = %error,
"failed to flush deferred signals after workflow became resident"
);
}
}
struct StartResumeEngineHandle<'a> {
runtime: &'a RuntimeHandle,
registry: &'a Registry,
}
impl EngineHandle for StartResumeEngineHandle<'_> {
fn resolve_workflow(
&self,
workflow_id: &WorkflowId,
) -> Result<WorkflowResidency, EngineSeamError> {
let handle = self
.registry
.list()
.map_err(|error| EngineSeamError::Delivery {
reason: error.to_string(),
})?
.into_iter()
.find(|handle| handle.workflow_id() == workflow_id);
match handle {
Some(handle) if handle.residency() == HandleResidency::Resident => Ok(
WorkflowResidency::Resident(WorkflowProcessHandle::new(handle.pid())),
),
Some(_) => Ok(WorkflowResidency::NonResident),
None => Ok(WorkflowResidency::Unknown),
}
}
fn deliver_workflow_message(
&self,
process: WorkflowProcessHandle,
message: WorkflowMailboxMessage,
) -> Result<(), EngineSeamError> {
match message {
WorkflowMailboxMessage::SignalReceived { .. } => self
.runtime
.deliver_signal_received(process.pid())
.map_err(|error| EngineSeamError::Delivery {
reason: error.to_string(),
}),
other => Err(EngineSeamError::Delivery {
reason: format!("unsupported resume handoff message: {other:?}"),
}),
}
}
fn spawn_child_workflow(
&self,
request: ChildWorkflowSpawnRequest,
) -> Result<ChildWorkflowSpawnResult, EngineSeamError> {
let _ = request;
Err(EngineSeamError::ChildSpawn {
reason: "start resume handoff cannot spawn child workflows".to_owned(),
})
}
fn terminate_linked_child_workflow(
&self,
parent_workflow_id: &WorkflowId,
child_process: WorkflowProcessHandle,
correlation: u64,
) -> Result<(), EngineSeamError> {
let _ = (parent_workflow_id, child_process, correlation);
Err(EngineSeamError::ChildTermination {
reason: "start resume handoff cannot terminate child workflows".to_owned(),
})
}
fn terminate_linked_activity(
&self,
parent_workflow_id: &WorkflowId,
activity_process: crate::Pid,
correlation: u64,
) -> Result<(), EngineSeamError> {
let _ = (parent_workflow_id, activity_process, correlation);
Err(EngineSeamError::ChildTermination {
reason: "start resume handoff cannot terminate activities".to_owned(),
})
}
fn arm_timer(&self, entry: TimerWheelEntry) -> Result<(), EngineSeamError> {
let _ = entry;
Err(EngineSeamError::TimerWheel {
reason: "start resume handoff cannot arm timers".to_owned(),
})
}
fn disarm_timer(
&self,
process: WorkflowProcessHandle,
timer_id: &aion_core::TimerId,
) -> Result<(), EngineSeamError> {
let _ = (process, timer_id);
Err(EngineSeamError::TimerWheel {
reason: "start resume handoff cannot disarm timers".to_owned(),
})
}
fn record_workflow_event(
&self,
workflow_id: &WorkflowId,
event: Event,
) -> Result<(), EngineSeamError> {
let _ = (workflow_id, event);
Err(EngineSeamError::Recorder {
reason: "start resume handoff cannot record workflow events".to_owned(),
})
}
}
#[cfg(test)]
#[path = "start_tests.rs"]
mod start_tests;