use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use crate::db::{DbPool, DbPoolMap};
use crate::error::AppResult;
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ReplayCutoff {
#[serde(skip_serializing_if = "Option::is_none")]
pub as_of_event_id: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub as_of_position: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub as_of_time: Option<DateTime<Utc>>,
}
impl ReplayCutoff {
pub fn is_empty(&self) -> bool {
self.as_of_event_id.is_none()
&& self.as_of_position.is_none()
&& self.as_of_time.is_none()
}
pub fn set_count(&self) -> usize {
usize::from(self.as_of_event_id.is_some())
+ usize::from(self.as_of_position.is_some())
+ usize::from(self.as_of_time.is_some())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ReplayProjection {
Execution,
Stage,
Frame,
Command,
BusinessObject,
Loop,
All,
}
impl Default for ReplayProjection {
fn default() -> Self {
Self::All
}
}
impl ReplayProjection {
pub fn as_str(&self) -> &'static str {
match self {
Self::Execution => "execution",
Self::Stage => "stage",
Self::Frame => "frame",
Self::Command => "command",
Self::BusinessObject => "business_object",
Self::Loop => "loop",
Self::All => "all",
}
}
pub fn from_str(s: &str) -> Option<Self> {
match s {
"execution" => Some(Self::Execution),
"stage" => Some(Self::Stage),
"frame" => Some(Self::Frame),
"command" => Some(Self::Command),
"business_object" => Some(Self::BusinessObject),
"loop" => Some(Self::Loop),
"all" => Some(Self::All),
_ => None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplayState {
pub tenant_id: String,
pub organization_id: String,
pub execution_id: i64,
pub projection: String,
pub event_count: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_event_id: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_event_type: Option<String>,
pub execution: ReplayExecutionState,
#[serde(default)]
pub stages: std::collections::BTreeMap<String, ReplayStageState>,
#[serde(default)]
pub frames: std::collections::BTreeMap<String, ReplayFrameState>,
#[serde(default)]
pub commands: std::collections::BTreeMap<String, ReplayCommandState>,
#[serde(default)]
pub business_objects: serde_json::Map<String, serde_json::Value>,
#[serde(default)]
pub loops: serde_json::Map<String, serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplayExecutionState {
pub status: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_node_name: Option<String>,
}
impl Default for ReplayExecutionState {
fn default() -> Self {
Self {
status: "UNKNOWN".to_string(),
last_node_name: None,
}
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ReplayStageState {
pub stage_id: String,
pub status: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub kind: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub step_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub parent_stage_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub loop_event_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub opened_event_id: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub closed_event_id: Option<i64>,
pub frame_count: i64,
pub row_count: i64,
pub events_emitted: i64,
pub failed_count: i64,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_event_id: Option<i64>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ReplayFrameState {
pub frame_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub stage_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub parent_frame_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub command_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub claimed_event_id: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub terminal_event_id: Option<i64>,
pub status: String,
pub row_count: i64,
pub attempts: i64,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_event_id: Option<i64>,
pub events_emitted: i64,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ReplayCommandState {
pub command_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub stage_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub frame_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub parent_command_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub worker_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub worker_locator: Option<String>,
pub status: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub issued_event_id: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub claimed_event_id: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub started_event_id: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub terminal_event_id: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_event_id: Option<i64>,
}
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct ReplayEventRow {
pub event_id: i64,
pub event_type: String,
pub node_name: Option<String>,
pub status: String,
pub created_at: DateTime<Utc>,
#[sqlx(default)]
pub stage_id: Option<String>,
#[sqlx(default)]
pub frame_id: Option<String>,
#[sqlx(default)]
pub command_id: Option<i64>,
#[sqlx(default)]
pub worker_id: Option<String>,
#[sqlx(default)]
pub aggregate_type: Option<String>,
#[sqlx(default)]
pub aggregate_id: Option<String>,
#[sqlx(default)]
pub meta: Option<serde_json::Value>,
}
#[derive(Clone)]
pub struct ReplayService {
pools: DbPoolMap,
}
impl ReplayService {
pub fn new(pools: DbPoolMap) -> Self {
Self { pools }
}
pub fn new_legacy(db: DbPool) -> Self {
Self::new(DbPoolMap::from_single_pool(db))
}
#[inline]
fn pool_for(&self, execution_id: i64) -> &DbPool {
self.pools.pool_for(execution_id)
}
pub async fn replay_state(
&self,
tenant_id: &str,
organization_id: &str,
execution_id: i64,
cutoff: ReplayCutoff,
projection: ReplayProjection,
limit: i64,
) -> AppResult<ReplayState> {
let events = self.load_events(execution_id, &cutoff, limit).await?;
Ok(fold_replay_state(
&events,
tenant_id,
organization_id,
execution_id,
projection,
))
}
pub async fn load_events(
&self,
execution_id: i64,
cutoff: &ReplayCutoff,
limit: i64,
) -> AppResult<Vec<ReplayEventRow>> {
let limit = limit.clamp(1, 100_000);
let rows = if let Some(event_id) = cutoff.as_of_event_id.or(cutoff.as_of_position) {
sqlx::query_as::<_, ReplayEventRow>(
r#"
SELECT
event_id,
event_type,
node_name,
status,
-- `noetl.event.created_at` is `TIMESTAMP` (no tz);
-- coerce to `TIMESTAMPTZ` so sqlx decodes into
-- `DateTime<Utc>` directly. Matches the cast the
-- existing services::execution queries use for the
-- same column.
created_at AT TIME ZONE 'UTC' AS created_at,
-- R5 R2 fold inputs. All optional in the DB
-- schema; the fold treats `None` as "this event
-- doesn't participate in that projection".
stage_id,
frame_id,
command_id,
worker_id,
aggregate_type,
aggregate_id,
meta
FROM noetl.event
WHERE execution_id = $1
AND event_id <= $2
ORDER BY event_id ASC
LIMIT $3
"#,
)
.bind(execution_id)
.bind(event_id)
.bind(limit)
.fetch_all(self.pool_for(execution_id))
.await?
} else if let Some(t) = cutoff.as_of_time {
sqlx::query_as::<_, ReplayEventRow>(
r#"
SELECT
event_id,
event_type,
node_name,
status,
-- `noetl.event.created_at` is `TIMESTAMP` (no tz);
-- coerce to `TIMESTAMPTZ` so sqlx decodes into
-- `DateTime<Utc>` directly. Matches the cast the
-- existing services::execution queries use for the
-- same column.
created_at AT TIME ZONE 'UTC' AS created_at,
-- R5 R2 fold inputs. All optional in the DB
-- schema; the fold treats `None` as "this event
-- doesn't participate in that projection".
stage_id,
frame_id,
command_id,
worker_id,
aggregate_type,
aggregate_id,
meta
FROM noetl.event
WHERE execution_id = $1
AND created_at <= $2
ORDER BY event_id ASC
LIMIT $3
"#,
)
.bind(execution_id)
.bind(t)
.bind(limit)
.fetch_all(self.pool_for(execution_id))
.await?
} else {
sqlx::query_as::<_, ReplayEventRow>(
r#"
SELECT
event_id,
event_type,
node_name,
status,
-- `noetl.event.created_at` is `TIMESTAMP` (no tz);
-- coerce to `TIMESTAMPTZ` so sqlx decodes into
-- `DateTime<Utc>` directly. Matches the cast the
-- existing services::execution queries use for the
-- same column.
created_at AT TIME ZONE 'UTC' AS created_at,
-- R5 R2 fold inputs. All optional in the DB
-- schema; the fold treats `None` as "this event
-- doesn't participate in that projection".
stage_id,
frame_id,
command_id,
worker_id,
aggregate_type,
aggregate_id,
meta
FROM noetl.event
WHERE execution_id = $1
ORDER BY event_id ASC
LIMIT $2
"#,
)
.bind(execution_id)
.bind(limit)
.fetch_all(self.pool_for(execution_id))
.await?
};
Ok(rows)
}
}
pub fn fold_replay_state(
events: &[ReplayEventRow],
tenant_id: &str,
organization_id: &str,
execution_id: i64,
projection: ReplayProjection,
) -> ReplayState {
let mut state = ReplayState {
tenant_id: tenant_id.to_string(),
organization_id: organization_id.to_string(),
execution_id,
projection: projection.as_str().to_string(),
event_count: 0,
last_event_id: None,
last_event_type: None,
execution: ReplayExecutionState::default(),
stages: std::collections::BTreeMap::new(),
frames: std::collections::BTreeMap::new(),
commands: std::collections::BTreeMap::new(),
business_objects: serde_json::Map::new(),
loops: serde_json::Map::new(),
};
let mut ordered: Vec<&ReplayEventRow> = events.iter().collect();
ordered.sort_by_key(|e| e.event_id);
for event in &ordered {
state.event_count += 1;
state.last_event_id = Some(event.event_id);
state.last_event_type = Some(event.event_type.clone());
match event.event_type.as_str() {
"playbook.completed" | "playbook_completed" => {
state.execution.status = "COMPLETED".to_string();
}
"playbook.failed" | "playbook_failed" => {
state.execution.status = "FAILED".to_string();
}
"playbook.cancelled" | "playbook_cancelled" => {
state.execution.status = "CANCELLED".to_string();
}
"step.enter" | "step_enter" | "step_started" => {
if state.execution.status == "UNKNOWN" {
state.execution.status = "RUNNING".to_string();
}
if let Some(name) = &event.node_name {
state.execution.last_node_name = Some(name.clone());
}
}
"step.exit" | "step_completed" | "command.completed" => {
if let Some(name) = &event.node_name {
state.execution.last_node_name = Some(name.clone());
}
}
_ => {
}
}
populate_stage(event, &mut state.stages);
populate_frame(event, &mut state.frames);
populate_command(event, &mut state.commands);
}
state
}
pub fn extract_stage_id(event: &ReplayEventRow) -> Option<String> {
if let Some(s) = &event.stage_id {
return Some(s.clone());
}
if event.aggregate_type.as_deref() == Some("stage") {
if let Some(id) = &event.aggregate_id {
return Some(id.strip_prefix("stage/").unwrap_or(id).to_string());
}
}
meta_str(&event.meta, "stage_id")
}
pub fn extract_frame_id(event: &ReplayEventRow) -> Option<String> {
if let Some(s) = &event.frame_id {
return Some(s.clone());
}
if event.aggregate_type.as_deref() == Some("frame") {
if let Some(id) = &event.aggregate_id {
return Some(id.strip_prefix("frame/").unwrap_or(id).to_string());
}
}
meta_str(&event.meta, "frame_id")
}
pub fn extract_command_id(event: &ReplayEventRow) -> Option<String> {
if let Some(c) = event.command_id {
return Some(c.to_string());
}
if let Some(m) = &event.meta {
if let Some(v) = m.get("command_id") {
return Some(value_to_string(v));
}
}
None
}
fn meta_str(meta: &Option<serde_json::Value>, key: &str) -> Option<String> {
meta.as_ref().and_then(|m| m.get(key)).map(value_to_string)
}
fn meta_i64(meta: &Option<serde_json::Value>, key: &str) -> Option<i64> {
meta.as_ref()
.and_then(|m| m.get(key))
.and_then(|v| v.as_i64())
}
fn value_to_string(v: &serde_json::Value) -> String {
match v {
serde_json::Value::String(s) => s.clone(),
serde_json::Value::Number(n) => n.to_string(),
serde_json::Value::Bool(b) => b.to_string(),
serde_json::Value::Null => "null".to_string(),
other => other.to_string(),
}
}
fn populate_stage(
event: &ReplayEventRow,
stages: &mut std::collections::BTreeMap<String, ReplayStageState>,
) {
let stage_id = match extract_stage_id(event) {
Some(id) => id,
None => return,
};
let stage = stages.entry(stage_id.clone()).or_insert_with(|| ReplayStageState {
stage_id: stage_id.clone(),
status: "UNKNOWN".to_string(),
kind: meta_str(&event.meta, "kind"),
step_name: event
.node_name
.clone()
.or_else(|| meta_str(&event.meta, "step_name")),
parent_stage_id: meta_str(&event.meta, "parent_stage_id"),
..Default::default()
});
stage.last_event_id = Some(event.event_id);
if let Some(parent) = meta_str(&event.meta, "parent_stage_id") {
stage.parent_stage_id = Some(parent);
}
if let Some(loop_id) = meta_str(&event.meta, "loop_id")
.or_else(|| meta_str(&event.meta, "loop_event_id"))
.or_else(|| meta_str(&event.meta, "__loop_epoch_id"))
{
stage.loop_event_id = Some(loop_id);
}
match event.event_type.as_str() {
"stage.opened" => {
stage.status = "OPEN".to_string();
stage.opened_event_id = Some(event.event_id);
}
"stage.closed" => {
stage.status = if event.status.is_empty() {
"CLOSED".to_string()
} else {
event.status.clone()
};
stage.closed_event_id = Some(event.event_id);
stage.frame_count = meta_i64(&event.meta, "frame_count").unwrap_or(stage.frame_count);
stage.row_count = meta_i64(&event.meta, "row_count").unwrap_or(stage.row_count);
stage.events_emitted =
meta_i64(&event.meta, "events_emitted").unwrap_or(stage.events_emitted);
stage.failed_count =
meta_i64(&event.meta, "failed_count").unwrap_or(stage.failed_count);
}
_ if !event.status.is_empty() => {
stage.status = event.status.clone();
}
_ => {}
}
}
fn populate_frame(
event: &ReplayEventRow,
frames: &mut std::collections::BTreeMap<String, ReplayFrameState>,
) {
let frame_id = match extract_frame_id(event) {
Some(id) => id,
None => return,
};
let stage_id_now = extract_stage_id(event);
let command_id_now = extract_command_id(event);
let frame = frames.entry(frame_id.clone()).or_insert_with(|| ReplayFrameState {
frame_id: frame_id.clone(),
stage_id: stage_id_now.clone(),
parent_frame_id: meta_str(&event.meta, "parent_frame_id"),
command_id: None,
status: "UNKNOWN".to_string(),
..Default::default()
});
frame.last_event_id = Some(event.event_id);
if stage_id_now.is_some() {
frame.stage_id = stage_id_now.clone();
}
if let Some(parent) = meta_str(&event.meta, "parent_frame_id") {
frame.parent_frame_id = Some(parent);
}
if command_id_now.is_some() {
frame.command_id = command_id_now.clone();
}
match event.event_type.as_str() {
"frame.dispatched" => {
frame.status = "CLAIMED".to_string();
frame.claimed_event_id = Some(event.event_id);
let attempt = meta_i64(&event.meta, "attempt").unwrap_or(1);
frame.attempts = frame.attempts.max(attempt);
}
"frame.started" => {
frame.status = "RUNNING".to_string();
}
"frame.abandoned" => {
frame.status = if event.status.is_empty() {
"ABANDONED".to_string()
} else {
event.status.clone()
};
}
"frame.committed" => {
frame.status = if event.status.is_empty() {
"COMPLETED".to_string()
} else {
event.status.clone()
};
frame.row_count = meta_i64(&event.meta, "row_count").unwrap_or(frame.row_count);
frame.events_emitted =
meta_i64(&event.meta, "events_emitted").unwrap_or(frame.events_emitted);
frame.terminal_event_id = Some(event.event_id);
}
"frame.failed" => {
frame.status = if event.status.is_empty() {
"FAILED".to_string()
} else {
event.status.clone()
};
frame.events_emitted =
meta_i64(&event.meta, "events_emitted").unwrap_or(frame.events_emitted);
frame.terminal_event_id = Some(event.event_id);
}
_ if !event.status.is_empty() => {
frame.status = event.status.clone();
}
_ => {}
}
}
fn populate_command(
event: &ReplayEventRow,
commands: &mut std::collections::BTreeMap<String, ReplayCommandState>,
) {
let command_id = match extract_command_id(event) {
Some(id) => id,
None => return,
};
let stage_id_now = extract_stage_id(event);
let frame_id_now = extract_frame_id(event);
let command = commands.entry(command_id.clone()).or_insert_with(|| ReplayCommandState {
command_id: command_id.clone(),
stage_id: stage_id_now.clone(),
frame_id: frame_id_now.clone(),
status: "UNKNOWN".to_string(),
..Default::default()
});
command.last_event_id = Some(event.event_id);
if stage_id_now.is_some() {
command.stage_id = stage_id_now;
}
if frame_id_now.is_some() {
command.frame_id = frame_id_now;
}
if let Some(parent) = meta_str(&event.meta, "parent_command_id") {
command.parent_command_id = Some(parent);
}
let worker_id_now = event
.worker_id
.clone()
.or_else(|| meta_str(&event.meta, "worker_id"));
if worker_id_now.is_some() {
command.worker_id = worker_id_now;
}
if let Some(worker_locator) = meta_str(&event.meta, "worker_locator") {
command.worker_locator = Some(worker_locator);
}
match event.event_type.as_str() {
"command.issued" => {
command.status = if event.status.is_empty() {
"PENDING".to_string()
} else {
event.status.clone()
};
command.issued_event_id = Some(event.event_id);
}
"command.claimed" => {
command.status = if event.status.is_empty() {
"CLAIMED".to_string()
} else {
event.status.clone()
};
command.claimed_event_id = Some(event.event_id);
}
"command.started" => {
command.status = if event.status.is_empty() {
"RUNNING".to_string()
} else {
event.status.clone()
};
command.started_event_id = Some(event.event_id);
}
"command.completed" | "command.failed" | "command.cancelled" => {
command.status = if event.status.is_empty() {
event
.event_type
.strip_prefix("command.")
.map(|s| s.to_ascii_uppercase())
.unwrap_or_else(|| event.event_type.clone())
} else {
event.status.clone()
};
command.terminal_event_id = Some(event.event_id);
}
other if other.starts_with("command.") && !event.status.is_empty() => {
command.status = event.status.clone();
}
_ => {}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn ev(event_id: i64, event_type: &str, node_name: Option<&str>, status: &str) -> ReplayEventRow {
ReplayEventRow {
event_id,
event_type: event_type.to_string(),
node_name: node_name.map(|s| s.to_string()),
status: status.to_string(),
created_at: Utc::now(),
stage_id: None,
frame_id: None,
command_id: None,
worker_id: None,
aggregate_type: None,
aggregate_id: None,
meta: None,
}
}
fn ev_full(
event_id: i64,
event_type: &str,
builder: impl FnOnce(&mut ReplayEventRow),
) -> ReplayEventRow {
let mut row = ev(event_id, event_type, None, "");
builder(&mut row);
row
}
#[test]
fn fold_empty_event_log_returns_unknown_status() {
let state = fold_replay_state(&[], "default", "default", 1, ReplayProjection::All);
assert_eq!(state.event_count, 0);
assert!(state.last_event_id.is_none());
assert!(state.last_event_type.is_none());
assert_eq!(state.execution.status, "UNKNOWN");
assert!(state.execution.last_node_name.is_none());
assert!(state.stages.is_empty());
assert!(state.frames.is_empty());
assert!(state.commands.is_empty());
}
#[test]
fn fold_step_enter_flips_status_to_running_and_tracks_node_name() {
let events = vec![
ev(1, "playbook_started", None, "RUNNING"),
ev(2, "step.enter", Some("start"), "ENTERED"),
];
let state = fold_replay_state(&events, "default", "default", 42, ReplayProjection::All);
assert_eq!(state.event_count, 2);
assert_eq!(state.last_event_id, Some(2));
assert_eq!(state.last_event_type.as_deref(), Some("step.enter"));
assert_eq!(state.execution.status, "RUNNING");
assert_eq!(state.execution.last_node_name.as_deref(), Some("start"));
}
#[test]
fn fold_playbook_completed_short_circuits_status() {
let events = vec![
ev(1, "step.enter", Some("start"), "ENTERED"),
ev(2, "command.completed", Some("start"), "success"),
ev(3, "playbook.completed", None, "COMPLETED"),
];
let state = fold_replay_state(&events, "default", "default", 42, ReplayProjection::All);
assert_eq!(state.execution.status, "COMPLETED");
assert_eq!(state.event_count, 3);
assert_eq!(state.last_event_id, Some(3));
assert_eq!(state.execution.last_node_name.as_deref(), Some("start"));
}
#[test]
fn fold_playbook_failed_short_circuits_status() {
let events = vec![
ev(1, "step.enter", Some("start"), "ENTERED"),
ev(2, "playbook.failed", None, "FAILED"),
];
let state = fold_replay_state(&events, "default", "default", 42, ReplayProjection::All);
assert_eq!(state.execution.status, "FAILED");
}
#[test]
fn fold_underscore_aliases_recognised() {
let events = vec![
ev(1, "step_started", Some("alpha"), "ENTERED"),
ev(2, "playbook_completed", None, "COMPLETED"),
];
let state = fold_replay_state(&events, "default", "default", 42, ReplayProjection::All);
assert_eq!(state.execution.status, "COMPLETED");
assert_eq!(state.execution.last_node_name.as_deref(), Some("alpha"));
}
#[test]
fn fold_is_order_deterministic_when_input_unsorted() {
let events = vec![
ev(3, "playbook.completed", None, "COMPLETED"),
ev(2, "command.completed", Some("start"), "success"),
ev(1, "step.enter", Some("start"), "ENTERED"),
];
let state = fold_replay_state(&events, "default", "default", 42, ReplayProjection::All);
assert_eq!(state.execution.status, "COMPLETED");
assert_eq!(state.last_event_id, Some(3));
assert_eq!(state.last_event_type.as_deref(), Some("playbook.completed"));
}
#[test]
fn projection_from_str_accepts_canonical_names() {
assert_eq!(
ReplayProjection::from_str("execution"),
Some(ReplayProjection::Execution)
);
assert_eq!(
ReplayProjection::from_str("business_object"),
Some(ReplayProjection::BusinessObject)
);
assert_eq!(
ReplayProjection::from_str("loop"),
Some(ReplayProjection::Loop)
);
assert_eq!(
ReplayProjection::from_str("all"),
Some(ReplayProjection::All)
);
assert!(ReplayProjection::from_str("garbage").is_none());
}
#[test]
fn cutoff_set_count_and_is_empty() {
let empty = ReplayCutoff::default();
assert!(empty.is_empty());
assert_eq!(empty.set_count(), 0);
let one = ReplayCutoff {
as_of_event_id: Some(100),
..Default::default()
};
assert!(!one.is_empty());
assert_eq!(one.set_count(), 1);
let three = ReplayCutoff {
as_of_event_id: Some(100),
as_of_position: Some(200),
as_of_time: Some(Utc::now()),
};
assert_eq!(three.set_count(), 3);
}
#[test]
fn extract_stage_id_prefers_column_then_aggregate_then_meta() {
let row = ev_full(1, "noop", |r| {
r.stage_id = Some("s-from-column".into());
r.aggregate_type = Some("stage".into());
r.aggregate_id = Some("stage/s-aggregate".into());
r.meta = Some(serde_json::json!({"stage_id": "s-from-meta"}));
});
assert_eq!(extract_stage_id(&row).as_deref(), Some("s-from-column"));
let row = ev_full(2, "stage.opened", |r| {
r.aggregate_type = Some("stage".into());
r.aggregate_id = Some("stage/s-aggregate".into());
});
assert_eq!(extract_stage_id(&row).as_deref(), Some("s-aggregate"));
let row = ev_full(3, "noop", |r| {
r.meta = Some(serde_json::json!({"stage_id": "s-from-meta"}));
});
assert_eq!(extract_stage_id(&row).as_deref(), Some("s-from-meta"));
let row = ev(4, "noop", None, "");
assert!(extract_stage_id(&row).is_none());
}
#[test]
fn extract_frame_id_mirrors_stage_id_resolution() {
let row = ev_full(1, "frame.dispatched", |r| {
r.aggregate_type = Some("frame".into());
r.aggregate_id = Some("frame/f-1".into());
});
assert_eq!(extract_frame_id(&row).as_deref(), Some("f-1"));
}
#[test]
fn extract_command_id_uses_top_level_bigint_or_meta() {
let row = ev_full(1, "command.issued", |r| {
r.command_id = Some(42);
});
assert_eq!(extract_command_id(&row).as_deref(), Some("42"));
let row = ev_full(2, "command.issued", |r| {
r.meta = Some(serde_json::json!({"command_id": "legacy-cmd"}));
});
assert_eq!(extract_command_id(&row).as_deref(), Some("legacy-cmd"));
let row = ev_full(3, "command.issued", |r| {
r.meta = Some(serde_json::json!({"command_id": 99}));
});
assert_eq!(extract_command_id(&row).as_deref(), Some("99"));
let row = ev(4, "noop", None, "");
assert!(extract_command_id(&row).is_none());
}
#[test]
fn fold_populates_stage_projection_through_lifecycle() {
let events = vec![
ev_full(1, "stage.opened", |r| {
r.stage_id = Some("s1".into());
r.node_name = Some("normalize".into());
r.meta = Some(serde_json::json!({"kind": "task"}));
}),
ev_full(2, "stage.closed", |r| {
r.stage_id = Some("s1".into());
r.status = "COMPLETED".into();
r.meta = Some(serde_json::json!({
"frame_count": 3,
"row_count": 42,
"events_emitted": 8,
"failed_count": 0,
}));
}),
];
let state = fold_replay_state(&events, "default", "default", 1, ReplayProjection::All);
let stage = state.stages.get("s1").expect("stage s1 must exist");
assert_eq!(stage.stage_id, "s1");
assert_eq!(stage.status, "COMPLETED");
assert_eq!(stage.opened_event_id, Some(1));
assert_eq!(stage.closed_event_id, Some(2));
assert_eq!(stage.frame_count, 3);
assert_eq!(stage.row_count, 42);
assert_eq!(stage.events_emitted, 8);
assert_eq!(stage.last_event_id, Some(2));
assert_eq!(stage.kind.as_deref(), Some("task"));
assert_eq!(stage.step_name.as_deref(), Some("normalize"));
}
#[test]
fn fold_populates_frame_projection_with_terminal_status() {
let events = vec![
ev_full(10, "frame.dispatched", |r| {
r.frame_id = Some("f-1".into());
r.stage_id = Some("s-1".into());
r.command_id = Some(7);
r.meta = Some(serde_json::json!({"attempt": 2}));
}),
ev_full(11, "frame.started", |r| {
r.frame_id = Some("f-1".into());
r.stage_id = Some("s-1".into());
}),
ev_full(12, "frame.committed", |r| {
r.frame_id = Some("f-1".into());
r.stage_id = Some("s-1".into());
r.status = "COMPLETED".into();
r.meta = Some(serde_json::json!({
"row_count": 12,
"events_emitted": 4,
}));
}),
];
let state = fold_replay_state(&events, "default", "default", 1, ReplayProjection::All);
let frame = state.frames.get("f-1").expect("frame f-1 must exist");
assert_eq!(frame.frame_id, "f-1");
assert_eq!(frame.stage_id.as_deref(), Some("s-1"));
assert_eq!(frame.command_id.as_deref(), Some("7"));
assert_eq!(frame.status, "COMPLETED");
assert_eq!(frame.claimed_event_id, Some(10));
assert_eq!(frame.terminal_event_id, Some(12));
assert_eq!(frame.row_count, 12);
assert_eq!(frame.events_emitted, 4);
assert_eq!(frame.attempts, 2);
}
#[test]
fn fold_populates_command_projection_through_full_lifecycle() {
let events = vec![
ev_full(100, "command.issued", |r| {
r.command_id = Some(42);
r.stage_id = Some("s-1".into());
r.frame_id = Some("f-1".into());
}),
ev_full(101, "command.claimed", |r| {
r.command_id = Some(42);
r.worker_id = Some("worker-pod-7".into());
}),
ev_full(102, "command.started", |r| {
r.command_id = Some(42);
}),
ev_full(103, "command.completed", |r| {
r.command_id = Some(42);
r.status = "success".into();
}),
];
let state = fold_replay_state(&events, "default", "default", 1, ReplayProjection::All);
let cmd = state.commands.get("42").expect("command 42 must exist");
assert_eq!(cmd.command_id, "42");
assert_eq!(cmd.stage_id.as_deref(), Some("s-1"));
assert_eq!(cmd.frame_id.as_deref(), Some("f-1"));
assert_eq!(cmd.worker_id.as_deref(), Some("worker-pod-7"));
assert_eq!(cmd.issued_event_id, Some(100));
assert_eq!(cmd.claimed_event_id, Some(101));
assert_eq!(cmd.started_event_id, Some(102));
assert_eq!(cmd.terminal_event_id, Some(103));
assert_eq!(cmd.status, "success");
}
#[test]
fn fold_command_terminal_status_defaults_when_event_status_empty() {
let events = vec![ev_full(10, "command.failed", |r| {
r.command_id = Some(99);
r.status = "".into();
})];
let state = fold_replay_state(&events, "default", "default", 1, ReplayProjection::All);
let cmd = state.commands.get("99").unwrap();
assert_eq!(cmd.status, "FAILED");
}
#[test]
fn fold_skips_population_when_event_has_no_identity() {
let events = vec![
ev_full(1, "step.enter", |r| r.node_name = Some("start".into())),
ev_full(2, "playbook.completed", |_| {}),
];
let state = fold_replay_state(&events, "default", "default", 1, ReplayProjection::All);
assert!(state.stages.is_empty());
assert!(state.frames.is_empty());
assert!(state.commands.is_empty());
assert_eq!(state.execution.status, "COMPLETED");
}
#[test]
fn fold_three_projections_populated_in_single_pass() {
let events = vec![ev_full(5, "frame.dispatched", |r| {
r.stage_id = Some("s-multi".into());
r.frame_id = Some("f-multi".into());
r.command_id = Some(7);
})];
let state = fold_replay_state(&events, "default", "default", 1, ReplayProjection::All);
assert!(state.stages.contains_key("s-multi"));
assert!(state.frames.contains_key("f-multi"));
assert!(state.commands.contains_key("7"));
}
#[test]
fn meta_helpers_round_trip_scalars() {
let meta = Some(serde_json::json!({
"s": "hello",
"n_i64": 7,
"n_neg": -1,
"b": true,
}));
assert_eq!(meta_str(&meta, "s").as_deref(), Some("hello"));
assert_eq!(meta_str(&meta, "n_i64").as_deref(), Some("7"));
assert_eq!(meta_str(&meta, "b").as_deref(), Some("true"));
assert_eq!(meta_i64(&meta, "n_i64"), Some(7));
assert_eq!(meta_i64(&meta, "n_neg"), Some(-1));
assert_eq!(meta_i64(&meta, "s"), None); assert_eq!(meta_i64(&meta, "missing"), None);
}
}