use std::ops::ControlFlow;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use sayiir_core::error::WorkflowError;
use sayiir_core::snapshot::ExecutionPosition;
use sayiir_persistence::SnapshotStore;
use crate::error::RuntimeError;
pub(crate) enum ParkReason {
Delay {
delay_id: String,
wake_at: DateTime<Utc>,
next_task_id: Option<String>,
passthrough: Bytes,
},
AwaitingSignal {
signal_id: String,
signal_name: String,
timeout: Option<DateTime<Utc>>,
next_task_id: Option<String>,
},
}
pub(crate) enum StepOutcome {
Done(Bytes),
Park(ParkReason),
}
pub(crate) type StepResult = Result<ControlFlow<StepOutcome, Bytes>, RuntimeError>;
pub(crate) fn compute_wake_at(
duration: &std::time::Duration,
) -> Result<DateTime<Utc>, RuntimeError> {
let now = Utc::now();
chrono::Duration::from_std(*duration)
.map(|d| now + d)
.map_err(|e| RuntimeError::from(WorkflowError::ResumeError(e.to_string())))
}
pub(crate) fn compute_signal_timeout(
timeout: Option<&std::time::Duration>,
) -> Option<DateTime<Utc>> {
timeout.and_then(|d| {
chrono::Duration::from_std(*d)
.ok()
.map(|cd| Utc::now() + cd)
})
}
pub(crate) async fn save_park_checkpoint<B: SnapshotStore>(
reason: ParkReason,
snapshot: &mut sayiir_core::snapshot::WorkflowSnapshot,
backend: &B,
) -> RuntimeError {
match reason {
ParkReason::Delay {
delay_id,
wake_at,
next_task_id,
passthrough,
} => {
let now = Utc::now();
snapshot.update_position(ExecutionPosition::AtDelay {
delay_id: delay_id.clone(),
entered_at: now,
wake_at,
next_task_id,
});
snapshot.mark_task_completed(delay_id, passthrough);
if let Err(e) = backend.save_snapshot(snapshot).await {
return RuntimeError::from(e);
}
WorkflowError::Waiting { wake_at }.into()
}
ParkReason::AwaitingSignal {
signal_id,
signal_name,
timeout,
next_task_id,
} => {
snapshot.update_position(ExecutionPosition::AtSignal {
signal_id: signal_id.clone(),
signal_name: signal_name.clone(),
wake_at: timeout,
next_task_id,
});
if let Err(e) = backend.save_snapshot(snapshot).await {
return RuntimeError::from(e);
}
WorkflowError::AwaitingSignal {
signal_id,
signal_name,
wake_at: timeout,
}
.into()
}
}
}
pub(crate) async fn save_branch_park_checkpoint<B: SnapshotStore>(
reason: ParkReason,
instance_id: &str,
backend: &B,
) -> RuntimeError {
match reason {
ParkReason::Delay {
delay_id,
wake_at,
passthrough,
..
} => {
tracing::info!(delay_id = %delay_id, "parking branch at delay");
if let Err(e) = backend
.save_task_result(instance_id, &delay_id, passthrough)
.await
{
return RuntimeError::from(e);
}
WorkflowError::Waiting { wake_at }.into()
}
ParkReason::AwaitingSignal {
signal_id,
signal_name,
timeout,
..
} => {
tracing::info!(signal_id = %signal_id, %signal_name, "parking branch at signal");
WorkflowError::AwaitingSignal {
signal_id,
signal_name,
wake_at: timeout,
}
.into()
}
}
}