use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use crate::db::{DbPool, DbPoolMap};
use crate::error::AppResult;
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ReplayCutoff {
#[serde(skip_serializing_if = "Option::is_none")]
pub as_of_event_id: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub as_of_position: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub as_of_time: Option<DateTime<Utc>>,
}
impl ReplayCutoff {
pub fn is_empty(&self) -> bool {
self.as_of_event_id.is_none()
&& self.as_of_position.is_none()
&& self.as_of_time.is_none()
}
pub fn set_count(&self) -> usize {
usize::from(self.as_of_event_id.is_some())
+ usize::from(self.as_of_position.is_some())
+ usize::from(self.as_of_time.is_some())
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ReplayProjection {
Execution,
Stage,
Frame,
Command,
BusinessObject,
Loop,
#[default]
All,
}
impl ReplayProjection {
pub fn as_str(&self) -> &'static str {
match self {
Self::Execution => "execution",
Self::Stage => "stage",
Self::Frame => "frame",
Self::Command => "command",
Self::BusinessObject => "business_object",
Self::Loop => "loop",
Self::All => "all",
}
}
pub fn parse_wire(s: &str) -> Option<Self> {
match s {
"execution" => Some(Self::Execution),
"stage" => Some(Self::Stage),
"frame" => Some(Self::Frame),
"command" => Some(Self::Command),
"business_object" => Some(Self::BusinessObject),
"loop" => Some(Self::Loop),
"all" => Some(Self::All),
_ => None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplayState {
pub tenant_id: String,
pub organization_id: String,
pub execution_id: i64,
pub projection: String,
pub event_count: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_event_id: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_event_type: Option<String>,
pub execution: ReplayExecutionState,
#[serde(default)]
pub stages: std::collections::BTreeMap<String, ReplayStageState>,
#[serde(default)]
pub frames: std::collections::BTreeMap<String, ReplayFrameState>,
#[serde(default)]
pub commands: std::collections::BTreeMap<String, ReplayCommandState>,
#[serde(default)]
pub business_objects: std::collections::BTreeMap<String, ReplayBusinessObjectState>,
#[serde(default)]
pub loops: std::collections::BTreeMap<String, ReplayLoopState>,
#[serde(skip_serializing_if = "Option::is_none")]
pub upcaster_registry_digest: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub replay_snapshot: Option<ReplaySnapshotInfo>,
#[serde(skip_serializing_if = "Option::is_none")]
pub checksum: Option<Checksum>,
#[serde(default)]
pub projection_checksums: std::collections::BTreeMap<String, Checksum>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct PayloadSummary {
#[serde(skip_serializing_if = "Option::is_none")]
pub sha256: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub schema_digest: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub row_count: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub media_type: Option<String>,
#[serde(rename = "ref", skip_serializing_if = "Option::is_none")]
pub reference_uri: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PayloadRefEntry {
pub event_id: i64,
pub reference: serde_json::Value,
pub summary: PayloadSummary,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplayExecutionState {
pub status: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_node_name: Option<String>,
#[serde(default)]
pub payload_refs: Vec<PayloadRefEntry>,
}
impl Default for ReplayExecutionState {
fn default() -> Self {
Self {
status: "UNKNOWN".to_string(),
last_node_name: None,
payload_refs: Vec::new(),
}
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ReplayStageState {
pub stage_id: String,
pub status: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub kind: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub step_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub parent_stage_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub loop_event_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub opened_event_id: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub closed_event_id: Option<i64>,
pub frame_count: i64,
pub row_count: i64,
pub events_emitted: i64,
pub failed_count: i64,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_event_id: Option<i64>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ReplayFrameState {
pub frame_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub stage_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub parent_frame_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub command_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub claimed_event_id: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub terminal_event_id: Option<i64>,
pub status: String,
pub row_count: i64,
pub attempts: i64,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_event_id: Option<i64>,
pub events_emitted: i64,
#[serde(skip_serializing_if = "Option::is_none")]
pub output_ref: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub output_ref_summary: Option<PayloadSummary>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ReplayCommandState {
pub command_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub stage_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub frame_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub parent_command_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub worker_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub worker_locator: Option<String>,
pub status: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub issued_event_id: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub claimed_event_id: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub started_event_id: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub terminal_event_id: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_event_id: Option<i64>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ReplayLoopState {
pub loop_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub step_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub total: Option<i64>,
pub done: i64,
pub failed: i64,
pub completed: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_event_id: Option<i64>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ReplayBusinessObjectState {
pub object_key: String,
pub object_type: String,
pub object_id: String,
pub status: String,
pub version: i64,
pub event_count: i64,
#[serde(skip_serializing_if = "Option::is_none")]
pub first_event_id: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_event_id: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub deleted_event_id: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_event_type: Option<String>,
#[serde(default)]
pub attributes: serde_json::Map<String, serde_json::Value>,
#[serde(default)]
pub payload_refs: Vec<PayloadRefEntry>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_payload_ref: Option<PayloadRefEntry>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ChecksumType {
Sha256,
}
impl ChecksumType {
pub fn as_str(self) -> &'static str {
match self {
ChecksumType::Sha256 => "sha256",
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Checksum {
#[serde(rename = "type")]
pub algorithm: ChecksumType,
pub value: String,
}
impl Checksum {
pub fn sha256<T: Serialize>(value: &T) -> Self {
use sha2::{Digest, Sha256};
let payload = stable_json_bytes(value);
let digest = Sha256::digest(&payload);
Self {
algorithm: ChecksumType::Sha256,
value: hex_encode(&digest),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplaySnapshotSeed {
pub aggregate_id: String,
pub aggregate_type: String,
pub version: i64,
pub checksum: Checksum,
pub state: ReplayState,
#[serde(default)]
pub meta: serde_json::Map<String, serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplaySnapshotInfo {
pub aggregate_id: String,
pub aggregate_type: String,
pub version: i64,
pub checksum: Checksum,
#[serde(default)]
pub meta: serde_json::Map<String, serde_json::Value>,
}
#[derive(Debug, Clone, Default)]
pub struct ReplayFoldOptions {
pub base_state: Option<ReplayState>,
pub snapshot_seed: Option<ReplaySnapshotSeed>,
pub upcaster_registry_digest: Option<String>,
}
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct ReplayEventRow {
pub event_id: i64,
pub event_type: String,
pub node_name: Option<String>,
pub status: String,
pub created_at: DateTime<Utc>,
#[sqlx(default)]
pub stage_id: Option<String>,
#[sqlx(default)]
pub frame_id: Option<String>,
#[sqlx(default)]
pub command_id: Option<i64>,
#[sqlx(default)]
pub worker_id: Option<String>,
#[sqlx(default)]
pub aggregate_type: Option<String>,
#[sqlx(default)]
pub aggregate_id: Option<String>,
#[sqlx(default)]
pub meta: Option<serde_json::Value>,
#[sqlx(default)]
pub result: Option<serde_json::Value>,
}
#[derive(Clone)]
pub struct ReplayService {
pools: DbPoolMap,
}
impl ReplayService {
pub fn new(pools: DbPoolMap) -> Self {
Self { pools }
}
pub fn new_legacy(db: DbPool) -> Self {
Self::new(DbPoolMap::from_single_pool(db))
}
#[inline]
fn pool_for(&self, execution_id: i64) -> &DbPool {
self.pools.pool_for(execution_id)
}
pub async fn replay_state(
&self,
tenant_id: &str,
organization_id: &str,
execution_id: i64,
cutoff: ReplayCutoff,
projection: ReplayProjection,
limit: i64,
) -> AppResult<ReplayState> {
let events = self.load_events(execution_id, &cutoff, limit).await?;
Ok(fold_replay_state(
&events,
tenant_id,
organization_id,
execution_id,
projection,
))
}
pub async fn load_events(
&self,
execution_id: i64,
cutoff: &ReplayCutoff,
limit: i64,
) -> AppResult<Vec<ReplayEventRow>> {
let limit = limit.clamp(1, 100_000);
let rows = if let Some(event_id) = cutoff.as_of_event_id.or(cutoff.as_of_position) {
sqlx::query_as::<_, ReplayEventRow>(
r#"
SELECT
event_id,
event_type,
node_name,
status,
-- `noetl.event.created_at` is `TIMESTAMP` (no tz);
-- coerce to `TIMESTAMPTZ` so sqlx decodes into
-- `DateTime<Utc>` directly. Matches the cast the
-- existing services::execution queries use for the
-- same column.
created_at AT TIME ZONE 'UTC' AS created_at,
-- R5 R2 fold inputs. All optional in the DB
-- schema; the fold treats `None` as "this event
-- doesn't participate in that projection".
stage_id,
frame_id,
command_id,
worker_id,
aggregate_type,
aggregate_id,
meta,
-- R5 R6: result jsonb contains `{status, reference?, context?}`
-- per the table's CHECK constraint; the payload resolver
-- reads `result.reference` to extract the per-event payload.
result
FROM noetl.event
WHERE execution_id = $1
AND event_id <= $2
ORDER BY event_id ASC
LIMIT $3
"#,
)
.bind(execution_id)
.bind(event_id)
.bind(limit)
.fetch_all(self.pool_for(execution_id))
.await?
} else if let Some(t) = cutoff.as_of_time {
sqlx::query_as::<_, ReplayEventRow>(
r#"
SELECT
event_id,
event_type,
node_name,
status,
-- `noetl.event.created_at` is `TIMESTAMP` (no tz);
-- coerce to `TIMESTAMPTZ` so sqlx decodes into
-- `DateTime<Utc>` directly. Matches the cast the
-- existing services::execution queries use for the
-- same column.
created_at AT TIME ZONE 'UTC' AS created_at,
-- R5 R2 fold inputs. All optional in the DB
-- schema; the fold treats `None` as "this event
-- doesn't participate in that projection".
stage_id,
frame_id,
command_id,
worker_id,
aggregate_type,
aggregate_id,
meta,
-- R5 R6: result jsonb contains `{status, reference?, context?}`
-- per the table's CHECK constraint; the payload resolver
-- reads `result.reference` to extract the per-event payload.
result
FROM noetl.event
WHERE execution_id = $1
AND created_at <= $2
ORDER BY event_id ASC
LIMIT $3
"#,
)
.bind(execution_id)
.bind(t)
.bind(limit)
.fetch_all(self.pool_for(execution_id))
.await?
} else {
sqlx::query_as::<_, ReplayEventRow>(
r#"
SELECT
event_id,
event_type,
node_name,
status,
-- `noetl.event.created_at` is `TIMESTAMP` (no tz);
-- coerce to `TIMESTAMPTZ` so sqlx decodes into
-- `DateTime<Utc>` directly. Matches the cast the
-- existing services::execution queries use for the
-- same column.
created_at AT TIME ZONE 'UTC' AS created_at,
-- R5 R2 fold inputs. All optional in the DB
-- schema; the fold treats `None` as "this event
-- doesn't participate in that projection".
stage_id,
frame_id,
command_id,
worker_id,
aggregate_type,
aggregate_id,
meta,
-- R5 R6: result jsonb contains `{status, reference?, context?}`
-- per the table's CHECK constraint; the payload resolver
-- reads `result.reference` to extract the per-event payload.
result
FROM noetl.event
WHERE execution_id = $1
ORDER BY event_id ASC
LIMIT $2
"#,
)
.bind(execution_id)
.bind(limit)
.fetch_all(self.pool_for(execution_id))
.await?
};
Ok(rows)
}
}
pub fn fold_replay_state(
events: &[ReplayEventRow],
tenant_id: &str,
organization_id: &str,
execution_id: i64,
projection: ReplayProjection,
) -> ReplayState {
fold_replay_state_with_options(
events,
tenant_id,
organization_id,
execution_id,
projection,
ReplayFoldOptions::default(),
)
}
pub fn fold_replay_state_with_options(
events: &[ReplayEventRow],
tenant_id: &str,
organization_id: &str,
execution_id: i64,
projection: ReplayProjection,
options: ReplayFoldOptions,
) -> ReplayState {
let ReplayFoldOptions {
base_state,
snapshot_seed,
upcaster_registry_digest,
} = options;
let mut state = match base_state {
Some(mut base) => {
base.checksum = None;
base.projection_checksums = std::collections::BTreeMap::new();
base.tenant_id = tenant_id.to_string();
base.organization_id = organization_id.to_string();
base.execution_id = execution_id;
base.projection = projection.as_str().to_string();
base
}
None => ReplayState {
tenant_id: tenant_id.to_string(),
organization_id: organization_id.to_string(),
execution_id,
projection: projection.as_str().to_string(),
event_count: 0,
last_event_id: None,
last_event_type: None,
execution: ReplayExecutionState::default(),
stages: std::collections::BTreeMap::new(),
frames: std::collections::BTreeMap::new(),
commands: std::collections::BTreeMap::new(),
business_objects: std::collections::BTreeMap::new(),
loops: std::collections::BTreeMap::new(),
upcaster_registry_digest: None,
replay_snapshot: None,
checksum: None,
projection_checksums: std::collections::BTreeMap::new(),
},
};
state.upcaster_registry_digest = upcaster_registry_digest.or(state.upcaster_registry_digest);
if let Some(seed) = snapshot_seed {
state.replay_snapshot = Some(ReplaySnapshotInfo {
aggregate_id: seed.aggregate_id,
aggregate_type: seed.aggregate_type,
version: seed.version,
checksum: seed.checksum,
meta: seed.meta,
});
}
let mut ordered: Vec<&ReplayEventRow> = events.iter().collect();
ordered.sort_by_key(|e| e.event_id);
for event in &ordered {
state.event_count += 1;
state.last_event_id = Some(event.event_id);
state.last_event_type = Some(event.event_type.clone());
match event.event_type.as_str() {
"playbook.completed" | "playbook_completed" => {
state.execution.status = "COMPLETED".to_string();
}
"playbook.failed" | "playbook_failed" => {
state.execution.status = "FAILED".to_string();
}
"playbook.cancelled" | "playbook_cancelled" => {
state.execution.status = "CANCELLED".to_string();
}
"step.enter" | "step_enter" | "step_started" => {
if state.execution.status == "UNKNOWN" {
state.execution.status = "RUNNING".to_string();
}
if let Some(name) = &event.node_name {
state.execution.last_node_name = Some(name.clone());
}
}
"step.exit" | "step_completed" | "command.completed" => {
if let Some(name) = &event.node_name {
state.execution.last_node_name = Some(name.clone());
}
}
_ => {
}
}
populate_stage(event, &mut state.stages);
populate_frame(event, &mut state.frames);
populate_command(event, &mut state.commands);
populate_loop(event, &mut state.loops);
populate_business_object(event, &mut state.business_objects);
if let Some(reference) = extract_payload_ref(event) {
state
.execution
.payload_refs
.push(build_payload_entry(event.event_id, reference));
}
}
compute_checksums(&mut state);
state
}
pub fn extract_stage_id(event: &ReplayEventRow) -> Option<String> {
if let Some(s) = &event.stage_id {
return Some(s.clone());
}
if event.aggregate_type.as_deref() == Some("stage") {
if let Some(id) = &event.aggregate_id {
return Some(id.strip_prefix("stage/").unwrap_or(id).to_string());
}
}
meta_str(&event.meta, "stage_id")
}
pub fn extract_frame_id(event: &ReplayEventRow) -> Option<String> {
if let Some(s) = &event.frame_id {
return Some(s.clone());
}
if event.aggregate_type.as_deref() == Some("frame") {
if let Some(id) = &event.aggregate_id {
return Some(id.strip_prefix("frame/").unwrap_or(id).to_string());
}
}
meta_str(&event.meta, "frame_id")
}
pub fn extract_command_id(event: &ReplayEventRow) -> Option<String> {
if let Some(c) = event.command_id {
return Some(c.to_string());
}
if let Some(m) = &event.meta {
if let Some(v) = m.get("command_id") {
return Some(value_to_string(v));
}
}
None
}
fn meta_str(meta: &Option<serde_json::Value>, key: &str) -> Option<String> {
meta.as_ref().and_then(|m| m.get(key)).map(value_to_string)
}
fn meta_i64(meta: &Option<serde_json::Value>, key: &str) -> Option<i64> {
meta.as_ref()
.and_then(|m| m.get(key))
.and_then(|v| v.as_i64())
}
fn value_to_string(v: &serde_json::Value) -> String {
match v {
serde_json::Value::String(s) => s.clone(),
serde_json::Value::Number(n) => n.to_string(),
serde_json::Value::Bool(b) => b.to_string(),
serde_json::Value::Null => "null".to_string(),
other => other.to_string(),
}
}
fn populate_stage(
event: &ReplayEventRow,
stages: &mut std::collections::BTreeMap<String, ReplayStageState>,
) {
let stage_id = match extract_stage_id(event) {
Some(id) => id,
None => return,
};
let stage = stages.entry(stage_id.clone()).or_insert_with(|| ReplayStageState {
stage_id: stage_id.clone(),
status: "UNKNOWN".to_string(),
kind: meta_str(&event.meta, "kind"),
step_name: event
.node_name
.clone()
.or_else(|| meta_str(&event.meta, "step_name")),
parent_stage_id: meta_str(&event.meta, "parent_stage_id"),
..Default::default()
});
stage.last_event_id = Some(event.event_id);
if let Some(parent) = meta_str(&event.meta, "parent_stage_id") {
stage.parent_stage_id = Some(parent);
}
if let Some(loop_id) = meta_str(&event.meta, "loop_id")
.or_else(|| meta_str(&event.meta, "loop_event_id"))
.or_else(|| meta_str(&event.meta, "__loop_epoch_id"))
{
stage.loop_event_id = Some(loop_id);
}
match event.event_type.as_str() {
"stage.opened" => {
stage.status = "OPEN".to_string();
stage.opened_event_id = Some(event.event_id);
}
"stage.closed" => {
stage.status = if event.status.is_empty() {
"CLOSED".to_string()
} else {
event.status.clone()
};
stage.closed_event_id = Some(event.event_id);
stage.frame_count = meta_i64(&event.meta, "frame_count").unwrap_or(stage.frame_count);
stage.row_count = meta_i64(&event.meta, "row_count").unwrap_or(stage.row_count);
stage.events_emitted =
meta_i64(&event.meta, "events_emitted").unwrap_or(stage.events_emitted);
stage.failed_count =
meta_i64(&event.meta, "failed_count").unwrap_or(stage.failed_count);
}
_ if !event.status.is_empty() => {
stage.status = event.status.clone();
}
_ => {}
}
}
fn populate_frame(
event: &ReplayEventRow,
frames: &mut std::collections::BTreeMap<String, ReplayFrameState>,
) {
let frame_id = match extract_frame_id(event) {
Some(id) => id,
None => return,
};
let stage_id_now = extract_stage_id(event);
let command_id_now = extract_command_id(event);
let frame = frames.entry(frame_id.clone()).or_insert_with(|| ReplayFrameState {
frame_id: frame_id.clone(),
stage_id: stage_id_now.clone(),
parent_frame_id: meta_str(&event.meta, "parent_frame_id"),
command_id: None,
status: "UNKNOWN".to_string(),
..Default::default()
});
frame.last_event_id = Some(event.event_id);
if stage_id_now.is_some() {
frame.stage_id = stage_id_now.clone();
}
if let Some(parent) = meta_str(&event.meta, "parent_frame_id") {
frame.parent_frame_id = Some(parent);
}
if command_id_now.is_some() {
frame.command_id = command_id_now.clone();
}
match event.event_type.as_str() {
"frame.dispatched" => {
frame.status = "CLAIMED".to_string();
frame.claimed_event_id = Some(event.event_id);
let attempt = meta_i64(&event.meta, "attempt").unwrap_or(1);
frame.attempts = frame.attempts.max(attempt);
}
"frame.started" => {
frame.status = "RUNNING".to_string();
}
"frame.abandoned" => {
frame.status = if event.status.is_empty() {
"ABANDONED".to_string()
} else {
event.status.clone()
};
}
"frame.committed" => {
frame.status = if event.status.is_empty() {
"COMPLETED".to_string()
} else {
event.status.clone()
};
frame.row_count = meta_i64(&event.meta, "row_count").unwrap_or(frame.row_count);
frame.events_emitted =
meta_i64(&event.meta, "events_emitted").unwrap_or(frame.events_emitted);
frame.terminal_event_id = Some(event.event_id);
let reference = extract_payload_ref(event);
frame.output_ref_summary = Some(payload_summary(
reference.as_ref().unwrap_or(&serde_json::Value::Null),
));
frame.output_ref = reference;
}
"frame.failed" => {
frame.status = if event.status.is_empty() {
"FAILED".to_string()
} else {
event.status.clone()
};
frame.events_emitted =
meta_i64(&event.meta, "events_emitted").unwrap_or(frame.events_emitted);
frame.terminal_event_id = Some(event.event_id);
let reference = extract_payload_ref(event);
frame.output_ref_summary = Some(payload_summary(
reference.as_ref().unwrap_or(&serde_json::Value::Null),
));
frame.output_ref = reference;
}
_ if !event.status.is_empty() => {
frame.status = event.status.clone();
}
_ => {}
}
}
fn populate_command(
event: &ReplayEventRow,
commands: &mut std::collections::BTreeMap<String, ReplayCommandState>,
) {
let command_id = match extract_command_id(event) {
Some(id) => id,
None => return,
};
let stage_id_now = extract_stage_id(event);
let frame_id_now = extract_frame_id(event);
let command = commands.entry(command_id.clone()).or_insert_with(|| ReplayCommandState {
command_id: command_id.clone(),
stage_id: stage_id_now.clone(),
frame_id: frame_id_now.clone(),
status: "UNKNOWN".to_string(),
..Default::default()
});
command.last_event_id = Some(event.event_id);
if stage_id_now.is_some() {
command.stage_id = stage_id_now;
}
if frame_id_now.is_some() {
command.frame_id = frame_id_now;
}
if let Some(parent) = meta_str(&event.meta, "parent_command_id") {
command.parent_command_id = Some(parent);
}
let worker_id_now = event
.worker_id
.clone()
.or_else(|| meta_str(&event.meta, "worker_id"));
if worker_id_now.is_some() {
command.worker_id = worker_id_now;
}
if let Some(worker_locator) = meta_str(&event.meta, "worker_locator") {
command.worker_locator = Some(worker_locator);
}
match event.event_type.as_str() {
"command.issued" => {
command.status = if event.status.is_empty() {
"PENDING".to_string()
} else {
event.status.clone()
};
command.issued_event_id = Some(event.event_id);
}
"command.claimed" => {
command.status = if event.status.is_empty() {
"CLAIMED".to_string()
} else {
event.status.clone()
};
command.claimed_event_id = Some(event.event_id);
}
"command.started" => {
command.status = if event.status.is_empty() {
"RUNNING".to_string()
} else {
event.status.clone()
};
command.started_event_id = Some(event.event_id);
}
"command.completed" | "command.failed" | "command.cancelled" => {
command.status = if event.status.is_empty() {
event
.event_type
.strip_prefix("command.")
.map(|s| s.to_ascii_uppercase())
.unwrap_or_else(|| event.event_type.clone())
} else {
event.status.clone()
};
command.terminal_event_id = Some(event.event_id);
}
other if other.starts_with("command.") && !event.status.is_empty() => {
command.status = event.status.clone();
}
_ => {}
}
}
pub fn extract_loop_id(event: &ReplayEventRow) -> Option<String> {
for key in ["loop_id", "loop_event_id", "__loop_epoch_id"] {
if let Some(v) = meta_str(&event.meta, key) {
return Some(v);
}
}
None
}
pub fn extract_business_object_identity(
event: &ReplayEventRow,
) -> Option<(String, String, String)> {
let business_meta = event
.meta
.as_ref()
.and_then(|m| m.get("business_object"))
.and_then(|v| v.as_object());
let mut object_type: Option<String> = business_meta
.and_then(|m| m.get("object_type").or_else(|| m.get("type")))
.map(value_to_string)
.or_else(|| meta_str(&event.meta, "business_object_type"))
.or_else(|| meta_str(&event.meta, "object_type"));
let mut object_id: Option<String> = business_meta
.and_then(|m| m.get("object_id").or_else(|| m.get("id")))
.map(value_to_string)
.or_else(|| meta_str(&event.meta, "business_object_id"))
.or_else(|| meta_str(&event.meta, "object_id"));
if event.aggregate_type.as_deref() == Some("business_object") {
if let Some(agg_id) = &event.aggregate_id {
let stripped = agg_id
.strip_prefix("business_object/")
.unwrap_or(agg_id.as_str());
let parts: Vec<&str> = stripped.split('/').filter(|p| !p.is_empty()).collect();
if parts.len() >= 2 {
if object_type.is_none() {
object_type = Some(parts[0].to_string());
}
if object_id.is_none() {
object_id = Some(parts[1..].join("/"));
}
} else {
if object_type.is_none() {
object_type = Some("business_object".to_string());
}
if object_id.is_none() {
object_id = Some(agg_id.clone());
}
}
}
}
match (object_type, object_id) {
(Some(t), Some(id)) => {
let key = format!("{}/{}", t, id);
Some((key, t, id))
}
_ => None,
}
}
fn business_object_status(event_type: &str, status: &str) -> Option<String> {
if !status.is_empty() {
return Some(status.to_string());
}
let lowered = event_type.to_ascii_lowercase();
if lowered.ends_with(".deleted") || lowered.ends_with(".removed") {
return Some("DELETED".to_string());
}
if lowered.ends_with(".created")
|| lowered.ends_with(".updated")
|| lowered.ends_with(".upserted")
{
return Some("ACTIVE".to_string());
}
None
}
fn populate_loop(
event: &ReplayEventRow,
loops: &mut std::collections::BTreeMap<String, ReplayLoopState>,
) {
let loop_id = match extract_loop_id(event) {
Some(id) => id,
None => return,
};
let loop_entry = loops.entry(loop_id.clone()).or_insert_with(|| {
ReplayLoopState {
loop_id: loop_id.clone(),
step_name: event.node_name.clone(),
total: meta_i64(&event.meta, "collection_size")
.or_else(|| meta_i64(&event.meta, "total")),
done: 0,
failed: 0,
completed: false,
last_event_id: None,
}
});
loop_entry.last_event_id = Some(event.event_id);
match event.event_type.as_str() {
"command.completed" | "loop.shard.done" => {
loop_entry.done += 1;
}
"command.failed" | "loop.shard.failed" => {
loop_entry.failed += 1;
}
"loop.done" | "loop.fanin.completed" => {
loop_entry.completed = true;
}
_ => {}
}
}
fn populate_business_object(
event: &ReplayEventRow,
business_objects: &mut std::collections::BTreeMap<String, ReplayBusinessObjectState>,
) {
let (object_key, object_type, object_id) = match extract_business_object_identity(event) {
Some(t) => t,
None => return,
};
let entry = business_objects
.entry(object_key.clone())
.or_insert_with(|| ReplayBusinessObjectState {
object_key: object_key.clone(),
object_type: object_type.clone(),
object_id: object_id.clone(),
status: "UNKNOWN".to_string(),
version: 0,
event_count: 0,
first_event_id: Some(event.event_id),
last_event_id: None,
deleted_event_id: None,
last_event_type: None,
attributes: serde_json::Map::new(),
payload_refs: Vec::new(),
last_payload_ref: None,
});
entry.last_event_id = Some(event.event_id);
entry.last_event_type = Some(event.event_type.clone());
entry.event_count += 1;
let business_meta = event
.meta
.as_ref()
.and_then(|m| m.get("business_object"))
.and_then(|v| v.as_object());
let version_from_meta = business_meta
.and_then(|m| m.get("version"))
.and_then(|v| v.as_i64())
.or_else(|| meta_i64(&event.meta, "business_object_version"));
entry.version = version_from_meta.unwrap_or(entry.event_count);
if let Some(new_status) = business_object_status(&event.event_type, &event.status) {
entry.status = new_status.clone();
if new_status == "DELETED" {
entry.deleted_event_id = Some(event.event_id);
}
}
if let Some(state_val) = business_meta.and_then(|m| m.get("state")) {
if let Some(state_obj) = state_val.as_object() {
entry.attributes = state_obj.clone();
}
}
let patch_val = business_meta
.and_then(|m| m.get("patch").or_else(|| m.get("attributes")));
if let Some(patch_obj) = patch_val.and_then(|v| v.as_object()) {
for (k, v) in patch_obj {
entry.attributes.insert(k.clone(), v.clone());
}
}
if let Some(reference) = extract_payload_ref(event) {
let payload_entry = build_payload_entry(event.event_id, reference);
entry.payload_refs.push(payload_entry.clone());
entry.last_payload_ref = Some(payload_entry);
}
}
pub fn extract_payload_ref(event: &ReplayEventRow) -> Option<serde_json::Value> {
let result = event.result.as_ref()?.as_object()?;
let reference = result.get("reference")?;
if reference.is_null() {
return None;
}
Some(reference.clone())
}
pub fn payload_summary(reference: &serde_json::Value) -> PayloadSummary {
let obj = match reference.as_object() {
Some(o) => o,
None => return PayloadSummary::default(),
};
let rows_ref = obj.get("rows_ref").and_then(|v| v.as_object());
let rows_meta = rows_ref.and_then(|r| r.get("meta")).and_then(|v| v.as_object());
let rows_ipc = rows_ref.and_then(|r| r.get("ipc")).and_then(|v| v.as_object());
let lookup_str = |key: &str| -> Option<String> {
obj.get(key)
.and_then(|v| v.as_str().map(String::from))
.or_else(|| {
rows_meta
.and_then(|m| m.get(key))
.and_then(|v| v.as_str().map(String::from))
})
.or_else(|| {
rows_ipc
.and_then(|i| i.get(key))
.and_then(|v| v.as_str().map(String::from))
})
};
let lookup_i64 = |key: &str| -> Option<i64> {
obj.get(key)
.and_then(|v| v.as_i64())
.or_else(|| rows_meta.and_then(|m| m.get(key)).and_then(|v| v.as_i64()))
.or_else(|| rows_ipc.and_then(|i| i.get(key)).and_then(|v| v.as_i64()))
};
let sha256 = lookup_str("sha256").or_else(|| {
obj.get("digest").and_then(|v| v.as_str().map(String::from))
});
let schema_digest = lookup_str("schema_digest");
let row_count = lookup_i64("row_count");
let media_type = lookup_str("media_type");
let reference_uri = obj
.get("ref")
.and_then(|v| v.as_str().map(String::from))
.or_else(|| {
rows_ref
.and_then(|r| r.get("ref"))
.and_then(|v| v.as_str().map(String::from))
})
.or_else(|| obj.get("uri").and_then(|v| v.as_str().map(String::from)));
PayloadSummary {
sha256,
schema_digest,
row_count,
media_type,
reference_uri,
}
}
fn build_payload_entry(event_id: i64, reference: serde_json::Value) -> PayloadRefEntry {
let summary = payload_summary(&reference);
PayloadRefEntry {
event_id,
reference,
summary,
}
}
pub fn stable_json_bytes<T: Serialize>(value: &T) -> Vec<u8> {
let v = serde_json::to_value(value).expect("Serialize → Value is infallible for typed state");
let sorted = sort_value_keys(&v);
serde_json::to_vec(&sorted).expect("Value → Vec<u8> is infallible")
}
fn sort_value_keys(value: &serde_json::Value) -> serde_json::Value {
match value {
serde_json::Value::Object(map) => {
let mut sorted = std::collections::BTreeMap::new();
for (k, v) in map {
sorted.insert(k.clone(), sort_value_keys(v));
}
let mut out = serde_json::Map::new();
for (k, v) in sorted {
out.insert(k, v);
}
serde_json::Value::Object(out)
}
serde_json::Value::Array(items) => {
serde_json::Value::Array(items.iter().map(sort_value_keys).collect())
}
other => other.clone(),
}
}
fn hex_encode(bytes: &[u8]) -> String {
const HEX: &[u8; 16] = b"0123456789abcdef";
let mut out = String::with_capacity(bytes.len() * 2);
for &b in bytes {
out.push(HEX[(b >> 4) as usize] as char);
out.push(HEX[(b & 0x0f) as usize] as char);
}
out
}
fn compute_checksums(state: &mut ReplayState) {
let mut bundle = std::collections::BTreeMap::new();
bundle.insert(
"execution".to_string(),
Checksum::sha256(&state.execution),
);
bundle.insert("stage".to_string(), Checksum::sha256(&state.stages));
bundle.insert("frame".to_string(), Checksum::sha256(&state.frames));
bundle.insert("command".to_string(), Checksum::sha256(&state.commands));
bundle.insert(
"business_object".to_string(),
Checksum::sha256(&state.business_objects),
);
bundle.insert("loop".to_string(), Checksum::sha256(&state.loops));
state.projection_checksums = bundle;
debug_assert!(state.checksum.is_none());
state.checksum = Some(Checksum::sha256(state));
}
#[cfg(test)]
mod tests {
use super::*;
fn ev(event_id: i64, event_type: &str, node_name: Option<&str>, status: &str) -> ReplayEventRow {
ReplayEventRow {
event_id,
event_type: event_type.to_string(),
node_name: node_name.map(|s| s.to_string()),
status: status.to_string(),
created_at: Utc::now(),
stage_id: None,
frame_id: None,
command_id: None,
worker_id: None,
aggregate_type: None,
aggregate_id: None,
meta: None,
result: None,
}
}
fn ev_full(
event_id: i64,
event_type: &str,
builder: impl FnOnce(&mut ReplayEventRow),
) -> ReplayEventRow {
let mut row = ev(event_id, event_type, None, "");
builder(&mut row);
row
}
#[test]
fn fold_empty_event_log_returns_unknown_status() {
let state = fold_replay_state(&[], "default", "default", 1, ReplayProjection::All);
assert_eq!(state.event_count, 0);
assert!(state.last_event_id.is_none());
assert!(state.last_event_type.is_none());
assert_eq!(state.execution.status, "UNKNOWN");
assert!(state.execution.last_node_name.is_none());
assert!(state.stages.is_empty());
assert!(state.frames.is_empty());
assert!(state.commands.is_empty());
}
#[test]
fn fold_step_enter_flips_status_to_running_and_tracks_node_name() {
let events = vec![
ev(1, "playbook_started", None, "RUNNING"),
ev(2, "step.enter", Some("start"), "ENTERED"),
];
let state = fold_replay_state(&events, "default", "default", 42, ReplayProjection::All);
assert_eq!(state.event_count, 2);
assert_eq!(state.last_event_id, Some(2));
assert_eq!(state.last_event_type.as_deref(), Some("step.enter"));
assert_eq!(state.execution.status, "RUNNING");
assert_eq!(state.execution.last_node_name.as_deref(), Some("start"));
}
#[test]
fn fold_playbook_completed_short_circuits_status() {
let events = vec![
ev(1, "step.enter", Some("start"), "ENTERED"),
ev(2, "command.completed", Some("start"), "success"),
ev(3, "playbook.completed", None, "COMPLETED"),
];
let state = fold_replay_state(&events, "default", "default", 42, ReplayProjection::All);
assert_eq!(state.execution.status, "COMPLETED");
assert_eq!(state.event_count, 3);
assert_eq!(state.last_event_id, Some(3));
assert_eq!(state.execution.last_node_name.as_deref(), Some("start"));
}
#[test]
fn fold_playbook_failed_short_circuits_status() {
let events = vec![
ev(1, "step.enter", Some("start"), "ENTERED"),
ev(2, "playbook.failed", None, "FAILED"),
];
let state = fold_replay_state(&events, "default", "default", 42, ReplayProjection::All);
assert_eq!(state.execution.status, "FAILED");
}
#[test]
fn fold_underscore_aliases_recognised() {
let events = vec![
ev(1, "step_started", Some("alpha"), "ENTERED"),
ev(2, "playbook_completed", None, "COMPLETED"),
];
let state = fold_replay_state(&events, "default", "default", 42, ReplayProjection::All);
assert_eq!(state.execution.status, "COMPLETED");
assert_eq!(state.execution.last_node_name.as_deref(), Some("alpha"));
}
#[test]
fn fold_is_order_deterministic_when_input_unsorted() {
let events = vec![
ev(3, "playbook.completed", None, "COMPLETED"),
ev(2, "command.completed", Some("start"), "success"),
ev(1, "step.enter", Some("start"), "ENTERED"),
];
let state = fold_replay_state(&events, "default", "default", 42, ReplayProjection::All);
assert_eq!(state.execution.status, "COMPLETED");
assert_eq!(state.last_event_id, Some(3));
assert_eq!(state.last_event_type.as_deref(), Some("playbook.completed"));
}
#[test]
fn projection_from_str_accepts_canonical_names() {
assert_eq!(
ReplayProjection::parse_wire("execution"),
Some(ReplayProjection::Execution)
);
assert_eq!(
ReplayProjection::parse_wire("business_object"),
Some(ReplayProjection::BusinessObject)
);
assert_eq!(
ReplayProjection::parse_wire("loop"),
Some(ReplayProjection::Loop)
);
assert_eq!(
ReplayProjection::parse_wire("all"),
Some(ReplayProjection::All)
);
assert!(ReplayProjection::parse_wire("garbage").is_none());
}
#[test]
fn cutoff_set_count_and_is_empty() {
let empty = ReplayCutoff::default();
assert!(empty.is_empty());
assert_eq!(empty.set_count(), 0);
let one = ReplayCutoff {
as_of_event_id: Some(100),
..Default::default()
};
assert!(!one.is_empty());
assert_eq!(one.set_count(), 1);
let three = ReplayCutoff {
as_of_event_id: Some(100),
as_of_position: Some(200),
as_of_time: Some(Utc::now()),
};
assert_eq!(three.set_count(), 3);
}
#[test]
fn extract_stage_id_prefers_column_then_aggregate_then_meta() {
let row = ev_full(1, "noop", |r| {
r.stage_id = Some("s-from-column".into());
r.aggregate_type = Some("stage".into());
r.aggregate_id = Some("stage/s-aggregate".into());
r.meta = Some(serde_json::json!({"stage_id": "s-from-meta"}));
});
assert_eq!(extract_stage_id(&row).as_deref(), Some("s-from-column"));
let row = ev_full(2, "stage.opened", |r| {
r.aggregate_type = Some("stage".into());
r.aggregate_id = Some("stage/s-aggregate".into());
});
assert_eq!(extract_stage_id(&row).as_deref(), Some("s-aggregate"));
let row = ev_full(3, "noop", |r| {
r.meta = Some(serde_json::json!({"stage_id": "s-from-meta"}));
});
assert_eq!(extract_stage_id(&row).as_deref(), Some("s-from-meta"));
let row = ev(4, "noop", None, "");
assert!(extract_stage_id(&row).is_none());
}
#[test]
fn extract_frame_id_mirrors_stage_id_resolution() {
let row = ev_full(1, "frame.dispatched", |r| {
r.aggregate_type = Some("frame".into());
r.aggregate_id = Some("frame/f-1".into());
});
assert_eq!(extract_frame_id(&row).as_deref(), Some("f-1"));
}
#[test]
fn extract_command_id_uses_top_level_bigint_or_meta() {
let row = ev_full(1, "command.issued", |r| {
r.command_id = Some(42);
});
assert_eq!(extract_command_id(&row).as_deref(), Some("42"));
let row = ev_full(2, "command.issued", |r| {
r.meta = Some(serde_json::json!({"command_id": "legacy-cmd"}));
});
assert_eq!(extract_command_id(&row).as_deref(), Some("legacy-cmd"));
let row = ev_full(3, "command.issued", |r| {
r.meta = Some(serde_json::json!({"command_id": 99}));
});
assert_eq!(extract_command_id(&row).as_deref(), Some("99"));
let row = ev(4, "noop", None, "");
assert!(extract_command_id(&row).is_none());
}
#[test]
fn fold_populates_stage_projection_through_lifecycle() {
let events = vec![
ev_full(1, "stage.opened", |r| {
r.stage_id = Some("s1".into());
r.node_name = Some("normalize".into());
r.meta = Some(serde_json::json!({"kind": "task"}));
}),
ev_full(2, "stage.closed", |r| {
r.stage_id = Some("s1".into());
r.status = "COMPLETED".into();
r.meta = Some(serde_json::json!({
"frame_count": 3,
"row_count": 42,
"events_emitted": 8,
"failed_count": 0,
}));
}),
];
let state = fold_replay_state(&events, "default", "default", 1, ReplayProjection::All);
let stage = state.stages.get("s1").expect("stage s1 must exist");
assert_eq!(stage.stage_id, "s1");
assert_eq!(stage.status, "COMPLETED");
assert_eq!(stage.opened_event_id, Some(1));
assert_eq!(stage.closed_event_id, Some(2));
assert_eq!(stage.frame_count, 3);
assert_eq!(stage.row_count, 42);
assert_eq!(stage.events_emitted, 8);
assert_eq!(stage.last_event_id, Some(2));
assert_eq!(stage.kind.as_deref(), Some("task"));
assert_eq!(stage.step_name.as_deref(), Some("normalize"));
}
#[test]
fn fold_populates_frame_projection_with_terminal_status() {
let events = vec![
ev_full(10, "frame.dispatched", |r| {
r.frame_id = Some("f-1".into());
r.stage_id = Some("s-1".into());
r.command_id = Some(7);
r.meta = Some(serde_json::json!({"attempt": 2}));
}),
ev_full(11, "frame.started", |r| {
r.frame_id = Some("f-1".into());
r.stage_id = Some("s-1".into());
}),
ev_full(12, "frame.committed", |r| {
r.frame_id = Some("f-1".into());
r.stage_id = Some("s-1".into());
r.status = "COMPLETED".into();
r.meta = Some(serde_json::json!({
"row_count": 12,
"events_emitted": 4,
}));
}),
];
let state = fold_replay_state(&events, "default", "default", 1, ReplayProjection::All);
let frame = state.frames.get("f-1").expect("frame f-1 must exist");
assert_eq!(frame.frame_id, "f-1");
assert_eq!(frame.stage_id.as_deref(), Some("s-1"));
assert_eq!(frame.command_id.as_deref(), Some("7"));
assert_eq!(frame.status, "COMPLETED");
assert_eq!(frame.claimed_event_id, Some(10));
assert_eq!(frame.terminal_event_id, Some(12));
assert_eq!(frame.row_count, 12);
assert_eq!(frame.events_emitted, 4);
assert_eq!(frame.attempts, 2);
}
#[test]
fn fold_populates_command_projection_through_full_lifecycle() {
let events = vec![
ev_full(100, "command.issued", |r| {
r.command_id = Some(42);
r.stage_id = Some("s-1".into());
r.frame_id = Some("f-1".into());
}),
ev_full(101, "command.claimed", |r| {
r.command_id = Some(42);
r.worker_id = Some("worker-pod-7".into());
}),
ev_full(102, "command.started", |r| {
r.command_id = Some(42);
}),
ev_full(103, "command.completed", |r| {
r.command_id = Some(42);
r.status = "success".into();
}),
];
let state = fold_replay_state(&events, "default", "default", 1, ReplayProjection::All);
let cmd = state.commands.get("42").expect("command 42 must exist");
assert_eq!(cmd.command_id, "42");
assert_eq!(cmd.stage_id.as_deref(), Some("s-1"));
assert_eq!(cmd.frame_id.as_deref(), Some("f-1"));
assert_eq!(cmd.worker_id.as_deref(), Some("worker-pod-7"));
assert_eq!(cmd.issued_event_id, Some(100));
assert_eq!(cmd.claimed_event_id, Some(101));
assert_eq!(cmd.started_event_id, Some(102));
assert_eq!(cmd.terminal_event_id, Some(103));
assert_eq!(cmd.status, "success");
}
#[test]
fn fold_command_terminal_status_defaults_when_event_status_empty() {
let events = vec![ev_full(10, "command.failed", |r| {
r.command_id = Some(99);
r.status = "".into();
})];
let state = fold_replay_state(&events, "default", "default", 1, ReplayProjection::All);
let cmd = state.commands.get("99").unwrap();
assert_eq!(cmd.status, "FAILED");
}
#[test]
fn fold_skips_population_when_event_has_no_identity() {
let events = vec![
ev_full(1, "step.enter", |r| r.node_name = Some("start".into())),
ev_full(2, "playbook.completed", |_| {}),
];
let state = fold_replay_state(&events, "default", "default", 1, ReplayProjection::All);
assert!(state.stages.is_empty());
assert!(state.frames.is_empty());
assert!(state.commands.is_empty());
assert_eq!(state.execution.status, "COMPLETED");
}
#[test]
fn fold_three_projections_populated_in_single_pass() {
let events = vec![ev_full(5, "frame.dispatched", |r| {
r.stage_id = Some("s-multi".into());
r.frame_id = Some("f-multi".into());
r.command_id = Some(7);
})];
let state = fold_replay_state(&events, "default", "default", 1, ReplayProjection::All);
assert!(state.stages.contains_key("s-multi"));
assert!(state.frames.contains_key("f-multi"));
assert!(state.commands.contains_key("7"));
}
#[test]
fn meta_helpers_round_trip_scalars() {
let meta = Some(serde_json::json!({
"s": "hello",
"n_i64": 7,
"n_neg": -1,
"b": true,
}));
assert_eq!(meta_str(&meta, "s").as_deref(), Some("hello"));
assert_eq!(meta_str(&meta, "n_i64").as_deref(), Some("7"));
assert_eq!(meta_str(&meta, "b").as_deref(), Some("true"));
assert_eq!(meta_i64(&meta, "n_i64"), Some(7));
assert_eq!(meta_i64(&meta, "n_neg"), Some(-1));
assert_eq!(meta_i64(&meta, "s"), None); assert_eq!(meta_i64(&meta, "missing"), None);
}
#[test]
fn extract_loop_id_prefers_meta_loop_id_over_aliases() {
let event = ev_full(1, "loop.shard.done", |e| {
e.meta = Some(serde_json::json!({
"loop_id": "primary",
"loop_event_id": "alias-one",
"__loop_epoch_id": "alias-two",
}));
});
assert_eq!(extract_loop_id(&event).as_deref(), Some("primary"));
}
#[test]
fn extract_loop_id_falls_back_through_meta_aliases() {
let e_alias1 = ev_full(2, "command.completed", |e| {
e.meta = Some(serde_json::json!({"loop_event_id": "fallback"}));
});
assert_eq!(extract_loop_id(&e_alias1).as_deref(), Some("fallback"));
let e_alias2 = ev_full(3, "command.completed", |e| {
e.meta = Some(serde_json::json!({"__loop_epoch_id": "epoch-7"}));
});
assert_eq!(extract_loop_id(&e_alias2).as_deref(), Some("epoch-7"));
let e_none = ev_full(4, "command.completed", |e| {
e.meta = Some(serde_json::json!({"unrelated": 1}));
});
assert_eq!(extract_loop_id(&e_none), None);
}
#[test]
fn fold_populates_loop_with_counters_and_completion() {
let e1 = ev_full(10, "command.completed", |e| {
e.node_name = Some("iterate".to_string());
e.status = "success".to_string();
e.meta = Some(serde_json::json!({
"loop_id": "iter-1",
"collection_size": 3,
}));
});
let e2 = ev_full(11, "command.failed", |e| {
e.node_name = Some("iterate".to_string());
e.status = "failed".to_string();
e.meta = Some(serde_json::json!({"loop_id": "iter-1"}));
});
let e3 = ev_full(12, "loop.shard.done", |e| {
e.node_name = Some("iterate".to_string());
e.meta = Some(serde_json::json!({"loop_id": "iter-1"}));
});
let e4 = ev_full(13, "loop.done", |e| {
e.node_name = Some("iterate".to_string());
e.meta = Some(serde_json::json!({"loop_id": "iter-1"}));
});
let state = fold_replay_state(
&[e1, e2, e3, e4],
"t",
"o",
42,
ReplayProjection::All,
);
assert_eq!(state.loops.len(), 1);
let entry = state.loops.get("iter-1").unwrap();
assert_eq!(entry.loop_id, "iter-1");
assert_eq!(entry.step_name.as_deref(), Some("iterate"));
assert_eq!(entry.total, Some(3));
assert_eq!(entry.done, 2); assert_eq!(entry.failed, 1);
assert!(entry.completed);
assert_eq!(entry.last_event_id, Some(13));
}
#[test]
fn fold_loop_total_falls_back_to_meta_total() {
let e1 = ev_full(20, "command.completed", |e| {
e.node_name = Some("fanout".to_string());
e.status = "success".to_string();
e.meta = Some(serde_json::json!({
"loop_id": "fan-7",
"total": 5,
}));
});
let state = fold_replay_state(&[e1], "t", "o", 42, ReplayProjection::All);
let entry = state.loops.get("fan-7").unwrap();
assert_eq!(entry.total, Some(5));
}
#[test]
fn fold_loop_fanin_completed_marks_completed_true() {
let e1 = ev_full(30, "loop.fanin.completed", |e| {
e.node_name = Some("reduce".to_string());
e.meta = Some(serde_json::json!({"loop_id": "fanin-1"}));
});
let state = fold_replay_state(&[e1], "t", "o", 42, ReplayProjection::All);
assert!(state.loops.get("fanin-1").unwrap().completed);
}
#[test]
fn extract_business_object_identity_prefers_meta_dot_keys() {
let event = ev_full(40, "customer.created", |e| {
e.meta = Some(serde_json::json!({
"business_object": {
"object_type": "customer",
"object_id": "c-100",
}
}));
});
let (k, t, id) = extract_business_object_identity(&event).unwrap();
assert_eq!(k, "customer/c-100");
assert_eq!(t, "customer");
assert_eq!(id, "c-100");
}
#[test]
fn extract_business_object_identity_accepts_short_type_id_aliases() {
let event = ev_full(41, "order.updated", |e| {
e.meta = Some(serde_json::json!({
"business_object": {"type": "order", "id": "o-7"}
}));
});
let (k, t, id) = extract_business_object_identity(&event).unwrap();
assert_eq!(k, "order/o-7");
assert_eq!(t, "order");
assert_eq!(id, "o-7");
}
#[test]
fn extract_business_object_identity_falls_back_to_aggregate_id() {
let event = ev_full(42, "asset.created", |e| {
e.aggregate_type = Some("business_object".to_string());
e.aggregate_id = Some("business_object/asset/a-9".to_string());
});
let (k, t, id) = extract_business_object_identity(&event).unwrap();
assert_eq!(k, "asset/a-9");
assert_eq!(t, "asset");
assert_eq!(id, "a-9");
let event2 = ev_full(43, "asset.created", |e| {
e.aggregate_type = Some("business_object".to_string());
e.aggregate_id = Some("asset/a-10".to_string());
});
let (k2, t2, id2) = extract_business_object_identity(&event2).unwrap();
assert_eq!(k2, "asset/a-10");
assert_eq!(t2, "asset");
assert_eq!(id2, "a-10");
}
#[test]
fn extract_business_object_identity_returns_none_when_no_signal() {
let event = ev_full(50, "playbook.completed", |_| {});
assert!(extract_business_object_identity(&event).is_none());
}
#[test]
fn business_object_status_explicit_status_wins() {
assert_eq!(
business_object_status("customer.deleted", "ARCHIVED").as_deref(),
Some("ARCHIVED"),
);
}
#[test]
fn business_object_status_suffix_derives_active_or_deleted() {
assert_eq!(
business_object_status("customer.created", "").as_deref(),
Some("ACTIVE"),
);
assert_eq!(
business_object_status("customer.updated", "").as_deref(),
Some("ACTIVE"),
);
assert_eq!(
business_object_status("customer.upserted", "").as_deref(),
Some("ACTIVE"),
);
assert_eq!(
business_object_status("customer.deleted", "").as_deref(),
Some("DELETED"),
);
assert_eq!(
business_object_status("customer.removed", "").as_deref(),
Some("DELETED"),
);
assert_eq!(business_object_status("customer.changed", ""), None);
}
#[test]
fn fold_populates_business_object_through_lifecycle() {
let e1 = ev_full(60, "customer.created", |e| {
e.meta = Some(serde_json::json!({
"business_object": {
"object_type": "customer",
"object_id": "c-1",
"state": {"name": "Alice", "tier": "gold"},
}
}));
});
let e2 = ev_full(61, "customer.updated", |e| {
e.meta = Some(serde_json::json!({
"business_object": {
"object_type": "customer",
"object_id": "c-1",
"patch": {"tier": "platinum"},
"version": 7,
}
}));
});
let e3 = ev_full(62, "customer.deleted", |e| {
e.meta = Some(serde_json::json!({
"business_object": {"object_type": "customer", "object_id": "c-1"}
}));
});
let state = fold_replay_state(
&[e1, e2, e3],
"t",
"o",
42,
ReplayProjection::All,
);
assert_eq!(state.business_objects.len(), 1);
let bo = state.business_objects.get("customer/c-1").unwrap();
assert_eq!(bo.object_key, "customer/c-1");
assert_eq!(bo.object_type, "customer");
assert_eq!(bo.object_id, "c-1");
assert_eq!(bo.status, "DELETED");
assert_eq!(bo.event_count, 3);
assert_eq!(bo.first_event_id, Some(60));
assert_eq!(bo.last_event_id, Some(62));
assert_eq!(bo.deleted_event_id, Some(62));
assert_eq!(bo.last_event_type.as_deref(), Some("customer.deleted"));
assert_eq!(bo.version, 3);
assert_eq!(
bo.attributes.get("name").and_then(|v| v.as_str()),
Some("Alice"),
);
assert_eq!(
bo.attributes.get("tier").and_then(|v| v.as_str()),
Some("platinum"),
);
}
#[test]
fn fold_business_object_version_from_meta_business_object_version() {
let e1 = ev_full(70, "order.created", |e| {
e.meta = Some(serde_json::json!({
"business_object": {"object_type": "order", "object_id": "o-1"},
"business_object_version": 42,
}));
});
let state = fold_replay_state(&[e1], "t", "o", 99, ReplayProjection::All);
assert_eq!(state.business_objects.get("order/o-1").unwrap().version, 42);
}
#[test]
fn fold_skips_loop_and_business_object_when_no_signal() {
let event = ev_full(80, "command.completed", |e| {
e.node_name = Some("plain_step".to_string());
e.status = "success".to_string();
});
let state = fold_replay_state(&[event], "t", "o", 42, ReplayProjection::All);
assert!(state.loops.is_empty());
assert!(state.business_objects.is_empty());
}
#[test]
fn checksum_type_serializes_as_lowercase_snake_case() {
let v = serde_json::to_value(ChecksumType::Sha256).unwrap();
assert_eq!(v, serde_json::json!("sha256"));
assert_eq!(ChecksumType::Sha256.as_str(), "sha256");
}
#[test]
fn checksum_serializes_as_typed_pair() {
let c = Checksum::sha256(&serde_json::json!({"k": "v"}));
let v = serde_json::to_value(&c).unwrap();
assert_eq!(v["type"], serde_json::json!("sha256"));
assert!(v["value"].as_str().unwrap().len() == 64); assert!(v["value"]
.as_str()
.unwrap()
.chars()
.all(|c| c.is_ascii_hexdigit() && !c.is_ascii_uppercase()));
}
#[test]
fn checksum_sha256_matches_python_for_simple_value() {
let c = Checksum::sha256(&serde_json::json!({"k": "v"}));
assert_eq!(c.algorithm, ChecksumType::Sha256);
assert_eq!(c.value.len(), 64);
let c2 = Checksum::sha256(&serde_json::json!({"k": "v"}));
assert_eq!(c.value, c2.value);
}
#[test]
fn stable_json_sorts_keys_recursively() {
let nested = serde_json::json!({
"b": {"y": 2, "x": 1},
"a": 1,
});
let bytes = stable_json_bytes(&nested);
let encoded = std::str::from_utf8(&bytes).unwrap();
assert_eq!(encoded, r#"{"a":1,"b":{"x":1,"y":2}}"#);
}
#[test]
fn fold_populates_checksum_and_projection_checksums() {
let state = fold_replay_state(&[], "default", "default", 1, ReplayProjection::All);
let c = state.checksum.as_ref().expect("top-level checksum populated");
assert_eq!(c.algorithm, ChecksumType::Sha256);
assert_eq!(c.value.len(), 64);
assert_eq!(state.projection_checksums.len(), 6);
for key in [
"execution",
"stage",
"frame",
"command",
"business_object",
"loop",
] {
let pc = state
.projection_checksums
.get(key)
.unwrap_or_else(|| panic!("missing checksum for projection `{key}`"));
assert_eq!(pc.algorithm, ChecksumType::Sha256);
assert_eq!(pc.value.len(), 64);
}
}
#[test]
fn fold_checksum_changes_when_state_changes() {
let empty = fold_replay_state(&[], "default", "default", 1, ReplayProjection::All);
let with_event = fold_replay_state(
&[ev(1, "playbook_started", None, "RUNNING")],
"default",
"default",
1,
ReplayProjection::All,
);
assert_ne!(
empty.checksum.as_ref().unwrap().value,
with_event.checksum.as_ref().unwrap().value,
);
}
#[test]
fn fold_checksum_deterministic_across_runs() {
let events = vec![
ev(1, "playbook_started", None, "RUNNING"),
ev(2, "step.enter", Some("start"), "ENTERED"),
ev(3, "playbook.completed", None, "COMPLETED"),
];
let s1 = fold_replay_state(&events, "t", "o", 42, ReplayProjection::All);
let s2 = fold_replay_state(&events, "t", "o", 42, ReplayProjection::All);
assert_eq!(
s1.checksum.as_ref().unwrap().value,
s2.checksum.as_ref().unwrap().value,
);
for key in s1.projection_checksums.keys() {
assert_eq!(
s1.projection_checksums.get(key).unwrap().value,
s2.projection_checksums.get(key).unwrap().value,
);
}
}
#[test]
fn fold_projection_checksums_isolated_per_projection() {
let base = fold_replay_state(&[], "t", "o", 42, ReplayProjection::All);
let loop_event = ev_full(10, "loop.shard.done", |e| {
e.node_name = Some("iterate".to_string());
e.meta = Some(serde_json::json!({"loop_id": "L1"}));
});
let with_loop = fold_replay_state(
&[loop_event],
"t",
"o",
42,
ReplayProjection::All,
);
assert_ne!(
base.projection_checksums.get("loop").unwrap().value,
with_loop.projection_checksums.get("loop").unwrap().value,
);
assert_eq!(
base.projection_checksums.get("stage").unwrap().value,
with_loop.projection_checksums.get("stage").unwrap().value,
);
assert_ne!(
base.checksum.as_ref().unwrap().value,
with_loop.checksum.as_ref().unwrap().value,
);
}
#[test]
fn fold_top_level_checksum_does_not_depend_on_itself() {
let state = fold_replay_state(&[], "default", "default", 1, ReplayProjection::All);
let v = serde_json::to_value(&state).unwrap();
assert!(v.get("checksum").is_some());
assert!(v.get("projection_checksums").is_some());
assert!(v["checksum"]["value"].as_str().unwrap().len() == 64);
}
#[test]
fn fold_default_options_omit_snapshot_and_digest() {
let state = fold_replay_state(&[], "default", "default", 1, ReplayProjection::All);
let v = serde_json::to_value(&state).unwrap();
assert!(v.get("replay_snapshot").is_none());
assert!(v.get("upcaster_registry_digest").is_none());
assert!(state.replay_snapshot.is_none());
assert!(state.upcaster_registry_digest.is_none());
}
#[test]
fn fold_with_options_propagates_upcaster_digest() {
let state = fold_replay_state_with_options(
&[],
"t",
"o",
42,
ReplayProjection::All,
ReplayFoldOptions {
upcaster_registry_digest: Some("abc123".to_string()),
..Default::default()
},
);
assert_eq!(state.upcaster_registry_digest.as_deref(), Some("abc123"));
let v = serde_json::to_value(&state).unwrap();
assert_eq!(v["upcaster_registry_digest"].as_str(), Some("abc123"));
}
#[test]
fn fold_with_snapshot_seed_surfaces_info_metadata() {
let prev = fold_replay_state(&[], "t", "o", 42, ReplayProjection::All);
let seed = ReplaySnapshotSeed {
aggregate_id: "exec/42".to_string(),
aggregate_type: "execution".to_string(),
version: 100,
checksum: prev.checksum.clone().unwrap(),
state: prev,
meta: serde_json::Map::from_iter([(
"author".to_string(),
serde_json::json!("snapshot-bot"),
)]),
};
let state = fold_replay_state_with_options(
&[],
"t",
"o",
42,
ReplayProjection::All,
ReplayFoldOptions {
snapshot_seed: Some(seed),
..Default::default()
},
);
let info = state
.replay_snapshot
.as_ref()
.expect("replay_snapshot populated when seed provided");
assert_eq!(info.aggregate_id, "exec/42");
assert_eq!(info.aggregate_type, "execution");
assert_eq!(info.version, 100);
assert_eq!(info.checksum.algorithm, ChecksumType::Sha256);
assert_eq!(
info.meta.get("author").and_then(|v| v.as_str()),
Some("snapshot-bot"),
);
}
#[test]
fn fold_with_base_state_continues_counters_from_seed() {
let initial_events = vec![
ev(1, "playbook_started", None, "RUNNING"),
ev(2, "step.enter", Some("start"), "ENTERED"),
];
let base = fold_replay_state(&initial_events, "t", "o", 42, ReplayProjection::All);
assert_eq!(base.event_count, 2);
assert_eq!(base.last_event_id, Some(2));
let more_events = vec![
ev(3, "step.exit", Some("start"), "EXITED"),
ev(4, "playbook.completed", None, "COMPLETED"),
];
let seeded = fold_replay_state_with_options(
&more_events,
"t",
"o",
42,
ReplayProjection::All,
ReplayFoldOptions {
base_state: Some(base),
..Default::default()
},
);
assert_eq!(seeded.event_count, 4, "counters continue from base");
assert_eq!(seeded.last_event_id, Some(4));
assert_eq!(seeded.last_event_type.as_deref(), Some("playbook.completed"));
assert_eq!(seeded.execution.status, "COMPLETED");
}
#[test]
fn fold_with_base_state_strips_prior_checksum() {
let base = fold_replay_state(&[], "t", "o", 42, ReplayProjection::All);
let base_checksum = base.checksum.clone().unwrap().value;
let seeded = fold_replay_state_with_options(
&[ev(1, "playbook.completed", None, "COMPLETED")],
"t",
"o",
42,
ReplayProjection::All,
ReplayFoldOptions {
base_state: Some(base),
..Default::default()
},
);
assert_ne!(seeded.checksum.as_ref().unwrap().value, base_checksum);
assert_eq!(seeded.projection_checksums.len(), 6);
}
#[test]
fn fold_with_base_state_overrides_tenant_org_execution_id() {
let base = fold_replay_state(&[], "old-tenant", "old-org", 99, ReplayProjection::All);
let seeded = fold_replay_state_with_options(
&[],
"new-tenant",
"new-org",
42,
ReplayProjection::All,
ReplayFoldOptions {
base_state: Some(base),
..Default::default()
},
);
assert_eq!(seeded.tenant_id, "new-tenant");
assert_eq!(seeded.organization_id, "new-org");
assert_eq!(seeded.execution_id, 42);
}
#[test]
fn fold_with_seed_caller_digest_wins_over_base_state_digest() {
let mut base = fold_replay_state(&[], "t", "o", 42, ReplayProjection::All);
base.upcaster_registry_digest = Some("v1-digest".to_string());
let seeded = fold_replay_state_with_options(
&[],
"t",
"o",
42,
ReplayProjection::All,
ReplayFoldOptions {
base_state: Some(base),
upcaster_registry_digest: Some("v2-digest".to_string()),
..Default::default()
},
);
assert_eq!(seeded.upcaster_registry_digest.as_deref(), Some("v2-digest"));
}
#[test]
fn fold_with_seed_preserves_base_digest_when_caller_supplies_none() {
let mut base = fold_replay_state(&[], "t", "o", 42, ReplayProjection::All);
base.upcaster_registry_digest = Some("v1-digest".to_string());
let seeded = fold_replay_state_with_options(
&[],
"t",
"o",
42,
ReplayProjection::All,
ReplayFoldOptions {
base_state: Some(base),
..Default::default()
},
);
assert_eq!(seeded.upcaster_registry_digest.as_deref(), Some("v1-digest"));
}
#[test]
fn extract_payload_ref_returns_none_when_no_result() {
let event = ev(1, "playbook.completed", None, "COMPLETED");
assert!(extract_payload_ref(&event).is_none());
}
#[test]
fn extract_payload_ref_returns_none_when_no_reference_key() {
let event = ev_full(2, "playbook.completed", |e| {
e.result = Some(serde_json::json!({"status": "ok"}));
});
assert!(extract_payload_ref(&event).is_none());
}
#[test]
fn extract_payload_ref_returns_none_when_reference_is_null() {
let event = ev_full(3, "step.exit", |e| {
e.result = Some(serde_json::json!({"status": "ok", "reference": null}));
});
assert!(extract_payload_ref(&event).is_none());
}
#[test]
fn extract_payload_ref_returns_reference_object() {
let event = ev_full(4, "step.exit", |e| {
e.result = Some(serde_json::json!({
"status": "ok",
"reference": {"sha256": "abc", "row_count": 7}
}));
});
let r = extract_payload_ref(&event).expect("reference present");
assert_eq!(r["sha256"], serde_json::json!("abc"));
assert_eq!(r["row_count"], serde_json::json!(7));
}
#[test]
fn payload_summary_extracts_direct_fields() {
let r = serde_json::json!({
"sha256": "abc",
"schema_digest": "sd1",
"row_count": 10,
"media_type": "application/json",
"ref": "gs://bucket/key",
});
let s = payload_summary(&r);
assert_eq!(s.sha256.as_deref(), Some("abc"));
assert_eq!(s.schema_digest.as_deref(), Some("sd1"));
assert_eq!(s.row_count, Some(10));
assert_eq!(s.media_type.as_deref(), Some("application/json"));
assert_eq!(s.reference_uri.as_deref(), Some("gs://bucket/key"));
}
#[test]
fn payload_summary_falls_back_to_rows_ref_meta() {
let r = serde_json::json!({
"rows_ref": {
"meta": {
"sha256": "from-meta",
"row_count": 99,
"schema_digest": "sd-meta",
"media_type": "x/parquet",
},
"ref": "s3://b/k",
}
});
let s = payload_summary(&r);
assert_eq!(s.sha256.as_deref(), Some("from-meta"));
assert_eq!(s.schema_digest.as_deref(), Some("sd-meta"));
assert_eq!(s.row_count, Some(99));
assert_eq!(s.media_type.as_deref(), Some("x/parquet"));
assert_eq!(s.reference_uri.as_deref(), Some("s3://b/k"));
}
#[test]
fn payload_summary_falls_back_to_digest_for_sha256() {
let r = serde_json::json!({"digest": "alt-digest"});
let s = payload_summary(&r);
assert_eq!(s.sha256.as_deref(), Some("alt-digest"));
}
#[test]
fn payload_summary_falls_back_to_uri_for_ref() {
let r = serde_json::json!({"uri": "gs://b/k"});
let s = payload_summary(&r);
assert_eq!(s.reference_uri.as_deref(), Some("gs://b/k"));
}
#[test]
fn payload_summary_returns_all_none_for_non_object() {
let s = payload_summary(&serde_json::json!("not-an-object"));
assert!(s.sha256.is_none());
assert!(s.schema_digest.is_none());
assert!(s.row_count.is_none());
assert!(s.media_type.is_none());
assert!(s.reference_uri.is_none());
}
#[test]
fn fold_populates_execution_payload_refs_in_order() {
let e1 = ev_full(10, "step.exit", |e| {
e.node_name = Some("a".to_string());
e.result = Some(serde_json::json!({
"status": "ok",
"reference": {"sha256": "h1", "row_count": 1}
}));
});
let e2 = ev(11, "playbook_started", None, "RUNNING");
let e3 = ev_full(12, "step.exit", |e| {
e.node_name = Some("b".to_string());
e.result = Some(serde_json::json!({
"status": "ok",
"reference": {"sha256": "h2", "row_count": 2}
}));
});
let state = fold_replay_state(&[e1, e2, e3], "t", "o", 42, ReplayProjection::All);
assert_eq!(state.execution.payload_refs.len(), 2);
assert_eq!(state.execution.payload_refs[0].event_id, 10);
assert_eq!(
state.execution.payload_refs[0].summary.sha256.as_deref(),
Some("h1"),
);
assert_eq!(state.execution.payload_refs[1].event_id, 12);
assert_eq!(
state.execution.payload_refs[1].summary.row_count,
Some(2),
);
}
#[test]
fn fold_populates_frame_output_ref_on_committed() {
let e1 = ev_full(20, "frame.committed", |e| {
e.frame_id = Some("frame-1".to_string());
e.status = "COMPLETED".to_string();
e.meta = Some(serde_json::json!({"row_count": 100}));
e.result = Some(serde_json::json!({
"status": "ok",
"reference": {
"sha256": "frame-hash",
"row_count": 100,
"media_type": "x/parquet",
}
}));
});
let state = fold_replay_state(&[e1], "t", "o", 42, ReplayProjection::All);
let frame = state.frames.get("frame-1").expect("frame populated");
assert_eq!(frame.status, "COMPLETED");
assert!(frame.output_ref.is_some());
let summary = frame.output_ref_summary.as_ref().expect("summary set");
assert_eq!(summary.sha256.as_deref(), Some("frame-hash"));
assert_eq!(summary.row_count, Some(100));
assert_eq!(summary.media_type.as_deref(), Some("x/parquet"));
}
#[test]
fn fold_populates_frame_output_ref_on_failed() {
let e1 = ev_full(21, "frame.failed", |e| {
e.frame_id = Some("frame-x".to_string());
e.status = "FAILED".to_string();
e.result = Some(serde_json::json!({
"status": "failed",
"reference": {"sha256": "err-hash"}
}));
});
let state = fold_replay_state(&[e1], "t", "o", 42, ReplayProjection::All);
let frame = state.frames.get("frame-x").expect("frame populated");
assert_eq!(frame.status, "FAILED");
assert!(frame.output_ref.is_some());
assert_eq!(
frame
.output_ref_summary
.as_ref()
.unwrap()
.sha256
.as_deref(),
Some("err-hash"),
);
}
#[test]
fn fold_frame_committed_without_reference_keeps_summary_default() {
let e1 = ev_full(22, "frame.committed", |e| {
e.frame_id = Some("frame-y".to_string());
e.status = "COMPLETED".to_string();
});
let state = fold_replay_state(&[e1], "t", "o", 42, ReplayProjection::All);
let frame = state.frames.get("frame-y").expect("frame populated");
assert!(frame.output_ref.is_none());
let summary = frame.output_ref_summary.as_ref().expect("summary set even when ref is None");
assert!(summary.sha256.is_none());
assert!(summary.row_count.is_none());
}
#[test]
fn fold_populates_business_object_payload_refs() {
let e1 = ev_full(30, "customer.created", |e| {
e.meta = Some(serde_json::json!({
"business_object": {"object_type": "customer", "object_id": "c-1"}
}));
e.result = Some(serde_json::json!({
"status": "ok",
"reference": {"sha256": "v1-hash"}
}));
});
let e2 = ev_full(31, "customer.updated", |e| {
e.meta = Some(serde_json::json!({
"business_object": {"object_type": "customer", "object_id": "c-1"}
}));
e.result = Some(serde_json::json!({
"status": "ok",
"reference": {"sha256": "v2-hash"}
}));
});
let state = fold_replay_state(&[e1, e2], "t", "o", 42, ReplayProjection::All);
let bo = state
.business_objects
.get("customer/c-1")
.expect("BO present");
assert_eq!(bo.payload_refs.len(), 2);
assert_eq!(bo.payload_refs[0].event_id, 30);
assert_eq!(
bo.payload_refs[0].summary.sha256.as_deref(),
Some("v1-hash"),
);
let last = bo.last_payload_ref.as_ref().expect("last_payload_ref set");
assert_eq!(last.event_id, 31);
assert_eq!(last.summary.sha256.as_deref(), Some("v2-hash"));
}
#[test]
fn fold_empty_log_omits_payload_fields_from_json() {
let state = fold_replay_state(&[], "default", "default", 1, ReplayProjection::All);
let v = serde_json::to_value(&state).unwrap();
assert_eq!(v["execution"]["payload_refs"], serde_json::json!([]));
}
}