use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use crate::db::{DbPool, DbPoolMap};
use crate::error::AppResult;
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ReplayCutoff {
#[serde(skip_serializing_if = "Option::is_none")]
pub as_of_event_id: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub as_of_position: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub as_of_time: Option<DateTime<Utc>>,
}
impl ReplayCutoff {
pub fn is_empty(&self) -> bool {
self.as_of_event_id.is_none()
&& self.as_of_position.is_none()
&& self.as_of_time.is_none()
}
pub fn set_count(&self) -> usize {
usize::from(self.as_of_event_id.is_some())
+ usize::from(self.as_of_position.is_some())
+ usize::from(self.as_of_time.is_some())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ReplayProjection {
Execution,
Stage,
Frame,
Command,
BusinessObject,
Loop,
All,
}
impl Default for ReplayProjection {
fn default() -> Self {
Self::All
}
}
impl ReplayProjection {
pub fn as_str(&self) -> &'static str {
match self {
Self::Execution => "execution",
Self::Stage => "stage",
Self::Frame => "frame",
Self::Command => "command",
Self::BusinessObject => "business_object",
Self::Loop => "loop",
Self::All => "all",
}
}
pub fn from_str(s: &str) -> Option<Self> {
match s {
"execution" => Some(Self::Execution),
"stage" => Some(Self::Stage),
"frame" => Some(Self::Frame),
"command" => Some(Self::Command),
"business_object" => Some(Self::BusinessObject),
"loop" => Some(Self::Loop),
"all" => Some(Self::All),
_ => None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplayState {
pub tenant_id: String,
pub organization_id: String,
pub execution_id: i64,
pub projection: String,
pub event_count: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_event_id: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_event_type: Option<String>,
pub execution: ReplayExecutionState,
#[serde(default)]
pub stages: serde_json::Map<String, serde_json::Value>,
#[serde(default)]
pub frames: serde_json::Map<String, serde_json::Value>,
#[serde(default)]
pub commands: serde_json::Map<String, serde_json::Value>,
#[serde(default)]
pub business_objects: serde_json::Map<String, serde_json::Value>,
#[serde(default)]
pub loops: serde_json::Map<String, serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplayExecutionState {
pub status: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_node_name: Option<String>,
}
impl Default for ReplayExecutionState {
fn default() -> Self {
Self {
status: "UNKNOWN".to_string(),
last_node_name: None,
}
}
}
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct ReplayEventRow {
pub event_id: i64,
pub event_type: String,
pub node_name: Option<String>,
pub status: String,
pub created_at: DateTime<Utc>,
}
#[derive(Clone)]
pub struct ReplayService {
pools: DbPoolMap,
}
impl ReplayService {
pub fn new(pools: DbPoolMap) -> Self {
Self { pools }
}
pub fn new_legacy(db: DbPool) -> Self {
Self::new(DbPoolMap::from_single_pool(db))
}
#[inline]
fn pool_for(&self, execution_id: i64) -> &DbPool {
self.pools.pool_for(execution_id)
}
pub async fn replay_state(
&self,
tenant_id: &str,
organization_id: &str,
execution_id: i64,
cutoff: ReplayCutoff,
projection: ReplayProjection,
limit: i64,
) -> AppResult<ReplayState> {
let events = self.load_events(execution_id, &cutoff, limit).await?;
Ok(fold_replay_state(
&events,
tenant_id,
organization_id,
execution_id,
projection,
))
}
pub async fn load_events(
&self,
execution_id: i64,
cutoff: &ReplayCutoff,
limit: i64,
) -> AppResult<Vec<ReplayEventRow>> {
let limit = limit.clamp(1, 100_000);
let rows = if let Some(event_id) = cutoff.as_of_event_id.or(cutoff.as_of_position) {
sqlx::query_as::<_, ReplayEventRow>(
r#"
SELECT
event_id,
event_type,
node_name,
status,
-- `noetl.event.created_at` is `TIMESTAMP` (no tz);
-- coerce to `TIMESTAMPTZ` so sqlx decodes into
-- `DateTime<Utc>` directly. Matches the cast the
-- existing services::execution queries use for the
-- same column.
created_at AT TIME ZONE 'UTC' AS created_at
FROM noetl.event
WHERE execution_id = $1
AND event_id <= $2
ORDER BY event_id ASC
LIMIT $3
"#,
)
.bind(execution_id)
.bind(event_id)
.bind(limit)
.fetch_all(self.pool_for(execution_id))
.await?
} else if let Some(t) = cutoff.as_of_time {
sqlx::query_as::<_, ReplayEventRow>(
r#"
SELECT
event_id,
event_type,
node_name,
status,
-- `noetl.event.created_at` is `TIMESTAMP` (no tz);
-- coerce to `TIMESTAMPTZ` so sqlx decodes into
-- `DateTime<Utc>` directly. Matches the cast the
-- existing services::execution queries use for the
-- same column.
created_at AT TIME ZONE 'UTC' AS created_at
FROM noetl.event
WHERE execution_id = $1
AND created_at <= $2
ORDER BY event_id ASC
LIMIT $3
"#,
)
.bind(execution_id)
.bind(t)
.bind(limit)
.fetch_all(self.pool_for(execution_id))
.await?
} else {
sqlx::query_as::<_, ReplayEventRow>(
r#"
SELECT
event_id,
event_type,
node_name,
status,
-- `noetl.event.created_at` is `TIMESTAMP` (no tz);
-- coerce to `TIMESTAMPTZ` so sqlx decodes into
-- `DateTime<Utc>` directly. Matches the cast the
-- existing services::execution queries use for the
-- same column.
created_at AT TIME ZONE 'UTC' AS created_at
FROM noetl.event
WHERE execution_id = $1
ORDER BY event_id ASC
LIMIT $2
"#,
)
.bind(execution_id)
.bind(limit)
.fetch_all(self.pool_for(execution_id))
.await?
};
Ok(rows)
}
}
pub fn fold_replay_state(
events: &[ReplayEventRow],
tenant_id: &str,
organization_id: &str,
execution_id: i64,
projection: ReplayProjection,
) -> ReplayState {
let mut state = ReplayState {
tenant_id: tenant_id.to_string(),
organization_id: organization_id.to_string(),
execution_id,
projection: projection.as_str().to_string(),
event_count: 0,
last_event_id: None,
last_event_type: None,
execution: ReplayExecutionState::default(),
stages: serde_json::Map::new(),
frames: serde_json::Map::new(),
commands: serde_json::Map::new(),
business_objects: serde_json::Map::new(),
loops: serde_json::Map::new(),
};
let mut ordered: Vec<&ReplayEventRow> = events.iter().collect();
ordered.sort_by_key(|e| e.event_id);
for event in &ordered {
state.event_count += 1;
state.last_event_id = Some(event.event_id);
state.last_event_type = Some(event.event_type.clone());
match event.event_type.as_str() {
"playbook.completed" | "playbook_completed" => {
state.execution.status = "COMPLETED".to_string();
}
"playbook.failed" | "playbook_failed" => {
state.execution.status = "FAILED".to_string();
}
"playbook.cancelled" | "playbook_cancelled" => {
state.execution.status = "CANCELLED".to_string();
}
"step.enter" | "step_enter" | "step_started" => {
if state.execution.status == "UNKNOWN" {
state.execution.status = "RUNNING".to_string();
}
if let Some(name) = &event.node_name {
state.execution.last_node_name = Some(name.clone());
}
}
"step.exit" | "step_completed" | "command.completed" => {
if let Some(name) = &event.node_name {
state.execution.last_node_name = Some(name.clone());
}
}
_ => {
}
}
}
state
}
#[cfg(test)]
mod tests {
use super::*;
fn ev(event_id: i64, event_type: &str, node_name: Option<&str>, status: &str) -> ReplayEventRow {
ReplayEventRow {
event_id,
event_type: event_type.to_string(),
node_name: node_name.map(|s| s.to_string()),
status: status.to_string(),
created_at: Utc::now(),
}
}
#[test]
fn fold_empty_event_log_returns_unknown_status() {
let state = fold_replay_state(&[], "default", "default", 1, ReplayProjection::All);
assert_eq!(state.event_count, 0);
assert!(state.last_event_id.is_none());
assert!(state.last_event_type.is_none());
assert_eq!(state.execution.status, "UNKNOWN");
assert!(state.execution.last_node_name.is_none());
assert!(state.stages.is_empty());
assert!(state.frames.is_empty());
assert!(state.commands.is_empty());
}
#[test]
fn fold_step_enter_flips_status_to_running_and_tracks_node_name() {
let events = vec![
ev(1, "playbook_started", None, "RUNNING"),
ev(2, "step.enter", Some("start"), "ENTERED"),
];
let state = fold_replay_state(&events, "default", "default", 42, ReplayProjection::All);
assert_eq!(state.event_count, 2);
assert_eq!(state.last_event_id, Some(2));
assert_eq!(state.last_event_type.as_deref(), Some("step.enter"));
assert_eq!(state.execution.status, "RUNNING");
assert_eq!(state.execution.last_node_name.as_deref(), Some("start"));
}
#[test]
fn fold_playbook_completed_short_circuits_status() {
let events = vec![
ev(1, "step.enter", Some("start"), "ENTERED"),
ev(2, "command.completed", Some("start"), "success"),
ev(3, "playbook.completed", None, "COMPLETED"),
];
let state = fold_replay_state(&events, "default", "default", 42, ReplayProjection::All);
assert_eq!(state.execution.status, "COMPLETED");
assert_eq!(state.event_count, 3);
assert_eq!(state.last_event_id, Some(3));
assert_eq!(state.execution.last_node_name.as_deref(), Some("start"));
}
#[test]
fn fold_playbook_failed_short_circuits_status() {
let events = vec![
ev(1, "step.enter", Some("start"), "ENTERED"),
ev(2, "playbook.failed", None, "FAILED"),
];
let state = fold_replay_state(&events, "default", "default", 42, ReplayProjection::All);
assert_eq!(state.execution.status, "FAILED");
}
#[test]
fn fold_underscore_aliases_recognised() {
let events = vec![
ev(1, "step_started", Some("alpha"), "ENTERED"),
ev(2, "playbook_completed", None, "COMPLETED"),
];
let state = fold_replay_state(&events, "default", "default", 42, ReplayProjection::All);
assert_eq!(state.execution.status, "COMPLETED");
assert_eq!(state.execution.last_node_name.as_deref(), Some("alpha"));
}
#[test]
fn fold_is_order_deterministic_when_input_unsorted() {
let events = vec![
ev(3, "playbook.completed", None, "COMPLETED"),
ev(2, "command.completed", Some("start"), "success"),
ev(1, "step.enter", Some("start"), "ENTERED"),
];
let state = fold_replay_state(&events, "default", "default", 42, ReplayProjection::All);
assert_eq!(state.execution.status, "COMPLETED");
assert_eq!(state.last_event_id, Some(3));
assert_eq!(state.last_event_type.as_deref(), Some("playbook.completed"));
}
#[test]
fn projection_from_str_accepts_canonical_names() {
assert_eq!(
ReplayProjection::from_str("execution"),
Some(ReplayProjection::Execution)
);
assert_eq!(
ReplayProjection::from_str("business_object"),
Some(ReplayProjection::BusinessObject)
);
assert_eq!(
ReplayProjection::from_str("loop"),
Some(ReplayProjection::Loop)
);
assert_eq!(
ReplayProjection::from_str("all"),
Some(ReplayProjection::All)
);
assert!(ReplayProjection::from_str("garbage").is_none());
}
#[test]
fn cutoff_set_count_and_is_empty() {
let empty = ReplayCutoff::default();
assert!(empty.is_empty());
assert_eq!(empty.set_count(), 0);
let one = ReplayCutoff {
as_of_event_id: Some(100),
..Default::default()
};
assert!(!one.is_empty());
assert_eq!(one.set_count(), 1);
let three = ReplayCutoff {
as_of_event_id: Some(100),
as_of_position: Some(200),
as_of_time: Some(Utc::now()),
};
assert_eq!(three.set_count(), 3);
}
}