use aion_core::{ActivityError, ActivityId, Payload, TimerId, WorkflowError, WorkflowId};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use crate::durability::{
Command, CorrelationKey, DurabilityError, Recorder, Resolution, ResolveOutcome, Resolver,
};
#[derive(Clone, Debug, PartialEq)]
pub enum LiveActivityOutcome {
Completed(Payload),
Failed(ActivityError),
}
#[derive(Clone, Debug, PartialEq)]
pub enum LiveChildOutcome {
Completed {
child_workflow_id: WorkflowId,
result: Payload,
},
Failed {
child_workflow_id: WorkflowId,
error: WorkflowError,
},
}
impl LiveChildOutcome {
fn child_workflow_id(&self) -> WorkflowId {
match self {
Self::Completed {
child_workflow_id, ..
}
| Self::Failed {
child_workflow_id, ..
} => child_workflow_id.clone(),
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum HandoffOutcome {
Resolved(Resolution),
WorkflowCompleted,
}
#[async_trait]
pub trait LiveExecutor: Send + Sync {
async fn run_activity(
&self,
activity_type: String,
input: Payload,
) -> Result<LiveActivityOutcome, DurabilityError>;
async fn start_timer(
&self,
timer_id: TimerId,
fire_at: DateTime<Utc>,
) -> Result<(), DurabilityError>;
async fn await_signal(&self, name: String, index: usize) -> Result<Payload, DurabilityError>;
async fn spawn_child(
&self,
workflow_type: String,
input: Payload,
) -> Result<LiveChildOutcome, DurabilityError>;
}
pub async fn resolve_or_execute_live(
resolver: &mut Resolver,
recorder: &mut Recorder,
executor: &dyn LiveExecutor,
command: Command,
recorded_at: DateTime<Utc>,
) -> Result<HandoffOutcome, DurabilityError> {
match resolver.resolve(command.clone())? {
ResolveOutcome::Recorded(resolution) => Ok(HandoffOutcome::Resolved(resolution)),
ResolveOutcome::ResumeLive => {
execute_live_and_record(recorder, executor, command, recorded_at).await
}
}
}
async fn execute_live_and_record(
recorder: &mut Recorder,
executor: &dyn LiveExecutor,
command: Command,
recorded_at: DateTime<Utc>,
) -> Result<HandoffOutcome, DurabilityError> {
match command {
Command::RunActivity {
key,
activity_type,
input,
} => {
let activity_id = activity_id_from_key(&key)?;
recorder
.record_activity_scheduled(
recorded_at,
activity_id.clone(),
activity_type.clone(),
input.clone(),
)
.await?;
recorder
.record_activity_started(recorded_at, activity_id.clone())
.await?;
let outcome = executor.run_activity(activity_type, input).await?;
match outcome {
LiveActivityOutcome::Completed(result) => {
recorder
.record_activity_completed(recorded_at, activity_id, result.clone())
.await?;
Ok(HandoffOutcome::Resolved(Resolution::ActivityCompleted(
result,
)))
}
LiveActivityOutcome::Failed(error) => {
ensure_terminal_activity_error(&error)?;
recorder
.record_activity_failed(recorded_at, activity_id, error.clone(), 1)
.await?;
Ok(HandoffOutcome::Resolved(
Resolution::ActivityFailedTerminal(error),
))
}
}
}
Command::StartTimer { key, fire_at } => {
let timer_id = timer_id_from_key(&key)?;
recorder
.record_timer_started(recorded_at, timer_id.clone(), fire_at)
.await?;
executor.start_timer(timer_id, fire_at).await?;
Ok(HandoffOutcome::Resolved(Resolution::TimerFired))
}
Command::AwaitSignal { key } => {
let (name, index) = signal_from_key(&key)?;
let payload = executor.await_signal(name, index).await?;
Ok(HandoffOutcome::Resolved(Resolution::SignalDelivered(
payload,
)))
}
Command::SendSignal { .. } => Err(DurabilityError::HistoryShape {
reason: "send-signal live execution is owned by the NIF signal bridge".to_owned(),
}),
Command::AwaitChild { .. } => Err(DurabilityError::HistoryShape {
reason: "await-child live execution is owned by the NIF child bridge".to_owned(),
}),
Command::SpawnChild {
key,
workflow_type,
input,
} => {
child_from_key(&key)?;
let outcome = executor
.spawn_child(workflow_type.clone(), input.clone())
.await?;
recorder
.record_child_workflow_started(
recorded_at,
outcome.child_workflow_id(),
workflow_type,
input,
)
.await?;
match outcome {
LiveChildOutcome::Completed { result, .. } => {
Ok(HandoffOutcome::Resolved(Resolution::ChildCompleted(result)))
}
LiveChildOutcome::Failed { error, .. } => {
Ok(HandoffOutcome::Resolved(Resolution::ChildFailed(error)))
}
}
}
Command::CompleteWorkflow { result } => {
recorder
.record_workflow_completed(recorded_at, result)
.await?;
Ok(HandoffOutcome::WorkflowCompleted)
}
}
}
fn ensure_terminal_activity_error(error: &ActivityError) -> Result<(), DurabilityError> {
if error.is_retryable() {
return Err(DurabilityError::HistoryShape {
reason: "live activity failure must be terminal before AD can record a terminal \
resolution"
.to_owned(),
});
}
Ok(())
}
fn activity_id_from_key(key: &CorrelationKey) -> Result<ActivityId, DurabilityError> {
match key {
CorrelationKey::Activity(ordinal) => Ok(ActivityId::from_sequence_position(*ordinal)),
other => Err(DurabilityError::HistoryShape {
reason: format!("RunActivity requires an activity correlation key, got {other:?}"),
}),
}
}
fn timer_id_from_key(key: &CorrelationKey) -> Result<TimerId, DurabilityError> {
match key {
CorrelationKey::Timer(timer_id) => Ok(timer_id.clone()),
other => Err(DurabilityError::HistoryShape {
reason: format!("StartTimer requires a timer correlation key, got {other:?}"),
}),
}
}
fn signal_from_key(key: &CorrelationKey) -> Result<(String, usize), DurabilityError> {
match key {
CorrelationKey::Signal { name, index } => Ok((name.clone(), *index)),
other => Err(DurabilityError::HistoryShape {
reason: format!("AwaitSignal requires a signal correlation key, got {other:?}"),
}),
}
}
fn child_from_key(key: &CorrelationKey) -> Result<u64, DurabilityError> {
match key {
CorrelationKey::Child(ordinal) => Ok(*ordinal),
other => Err(DurabilityError::HistoryShape {
reason: format!("SpawnChild requires a child correlation key, got {other:?}"),
}),
}
}
#[cfg(test)]
#[path = "executor_tests.rs"]
mod executor_tests;