use std::{
collections::HashMap,
path::{Path, PathBuf},
sync::LazyLock,
};
use anyhow::Context;
use serde_json::Value;
use tandem_types::TenantContext;
use tokio::io::AsyncWriteExt;
use super::durable_io::{repair_jsonl_torn_tail, sync_parent_dir, write_file_atomically};
use super::phases::phase_state_from_status;
use super::types::{StatefulRunEventRecord, StatefulRunSnapshotRecord};
const STATEFUL_RUN_EVENT_LOG_COMPACTED_TYPE: &str = "stateful_runtime.event_log_compacted";
static STATEFUL_RUN_EVENT_APPEND_CURSORS: LazyLock<
tokio::sync::Mutex<HashMap<StatefulRunEventCursorKey, StatefulRunEventAppendCursor>>,
> = LazyLock::new(|| tokio::sync::Mutex::new(HashMap::new()));
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct StatefulRunEventCursorKey {
path: PathBuf,
org_id: String,
workspace_id: String,
deployment_id: Option<String>,
run_id: String,
}
impl StatefulRunEventCursorKey {
fn new(path: &Path, tenant: &TenantContext, run_id: &str) -> Self {
Self {
path: path.to_path_buf(),
org_id: tenant.org_id.clone(),
workspace_id: tenant.workspace_id.clone(),
deployment_id: tenant.deployment_id.clone(),
run_id: run_id.to_string(),
}
}
}
#[derive(Debug, Default, Clone)]
struct StatefulRunEventAppendCursor {
last_seq: u64,
event_seq_by_id: HashMap<String, u64>,
}
#[derive(Debug, Clone)]
pub struct StatefulRuntimeStoragePaths {
pub run_events_path: PathBuf,
pub snapshots_root: PathBuf,
pub waits_path: PathBuf,
}
impl StatefulRuntimeStoragePaths {
pub fn new(run_events_path: PathBuf, snapshots_root: PathBuf, waits_path: PathBuf) -> Self {
Self {
run_events_path,
snapshots_root,
waits_path,
}
}
pub fn from_runtime_events_path(runtime_events_path: &Path) -> Self {
let runtime_root = runtime_events_path
.parent()
.map(Path::to_path_buf)
.unwrap_or_else(|| PathBuf::from("."));
Self {
run_events_path: runtime_root.join("stateful_events.jsonl"),
snapshots_root: runtime_root.join("stateful_snapshots"),
waits_path: runtime_root.join("stateful_waits.json"),
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct StatefulRunEventQuery<'a> {
pub run_id: &'a str,
pub after_seq: Option<u64>,
pub before_seq: Option<u64>,
pub limit: Option<usize>,
pub tail: bool,
}
pub async fn append_stateful_run_event(
path: &Path,
record: &StatefulRunEventRecord,
) -> anyhow::Result<()> {
let mut cursors = STATEFUL_RUN_EVENT_APPEND_CURSORS.lock().await;
append_stateful_run_event_unlocked(path, record).await?;
invalidate_stateful_run_event_cursors_for_path(&mut cursors, path);
Ok(())
}
async fn append_stateful_run_event_unlocked(
path: &Path,
record: &StatefulRunEventRecord,
) -> anyhow::Result<()> {
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent).await.with_context(|| {
format!(
"failed to create stateful run event directory {}",
parent.display()
)
})?;
}
repair_jsonl_torn_tail(path, "stateful run event log").await?;
let mut file = tokio::fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)
.await
.with_context(|| format!("failed to open stateful run event log {}", path.display()))?;
let mut line = serde_json::to_vec(record)?;
line.push(b'\n');
file.write_all(&line)
.await
.with_context(|| format!("failed to append stateful run event log {}", path.display()))?;
file.flush()
.await
.with_context(|| format!("failed to flush stateful run event log {}", path.display()))?;
file.sync_all()
.await
.with_context(|| format!("failed to sync stateful run event log {}", path.display()))?;
drop(file);
sync_parent_dir(path, "stateful run event log").await?;
Ok(())
}
pub async fn append_stateful_run_event_once(
path: &Path,
record: &StatefulRunEventRecord,
) -> anyhow::Result<bool> {
let mut cursors = STATEFUL_RUN_EVENT_APPEND_CURSORS.lock().await;
if stateful_run_event_exists(path, record) {
return Ok(false);
}
append_stateful_run_event_unlocked(path, record).await?;
invalidate_stateful_run_event_cursors_for_path(&mut cursors, path);
Ok(true)
}
pub async fn append_stateful_run_event_once_with_next_seq(
path: &Path,
tenant_context: &TenantContext,
record: &StatefulRunEventRecord,
) -> anyhow::Result<(bool, u64)> {
let mut cursors = STATEFUL_RUN_EVENT_APPEND_CURSORS.lock().await;
let key = StatefulRunEventCursorKey::new(path, tenant_context, &record.run_id);
let cursor = cursors
.entry(key.clone())
.or_insert_with(|| seed_stateful_run_event_cursor(path, &key));
if let Some(seq) = cursor.event_seq_by_id.get(&record.event_id).copied() {
return Ok((false, seq));
}
let seq = cursor.last_seq.saturating_add(1).max(1);
let mut record = record.clone();
record.seq = seq;
append_stateful_run_event_unlocked(path, &record).await?;
cursor.last_seq = seq;
cursor.event_seq_by_id.insert(record.event_id.clone(), seq);
Ok((true, seq))
}
fn stateful_run_event_exists(path: &Path, record: &StatefulRunEventRecord) -> bool {
load_stateful_run_events(path).into_iter().any(|existing| {
existing.run_id == record.run_id
&& (existing.event_id == record.event_id
|| stateful_run_event_compacted_event_ids(&existing)
.iter()
.any(|(event_id, _)| event_id == &record.event_id))
})
}
pub fn stateful_run_event_compacted_event_ids(row: &StatefulRunEventRecord) -> Vec<(String, u64)> {
if row.event_type != STATEFUL_RUN_EVENT_LOG_COMPACTED_TYPE {
return Vec::new();
}
row.payload
.get("compacted_event_ids")
.and_then(Value::as_array)
.map(|events| {
events
.iter()
.filter_map(|event| {
let event_id = event.get("event_id")?.as_str()?.trim();
if event_id.is_empty() {
return None;
}
let seq = event.get("seq")?.as_u64()?;
Some((event_id.to_string(), seq))
})
.collect()
})
.unwrap_or_default()
}
fn seed_stateful_run_event_cursor(
path: &Path,
key: &StatefulRunEventCursorKey,
) -> StatefulRunEventAppendCursor {
let mut cursor = StatefulRunEventAppendCursor::default();
for row in load_stateful_run_events(path)
.into_iter()
.filter(|row| stateful_run_event_matches_cursor_key(row, key))
{
cursor.last_seq = cursor.last_seq.max(row.seq);
cursor
.event_seq_by_id
.entry(row.event_id.clone())
.or_insert(row.seq);
for (event_id, seq) in stateful_run_event_compacted_event_ids(&row) {
cursor.last_seq = cursor.last_seq.max(seq);
cursor.event_seq_by_id.entry(event_id).or_insert(seq);
}
}
cursor
}
fn stateful_run_event_matches_cursor_key(
row: &StatefulRunEventRecord,
key: &StatefulRunEventCursorKey,
) -> bool {
row.run_id == key.run_id
&& row.scope.tenant_context.org_id == key.org_id
&& row.scope.tenant_context.workspace_id == key.workspace_id
&& row.scope.tenant_context.deployment_id == key.deployment_id
}
fn invalidate_stateful_run_event_cursors_for_path(
cursors: &mut HashMap<StatefulRunEventCursorKey, StatefulRunEventAppendCursor>,
path: &Path,
) {
cursors.retain(|key, _| key.path != path);
}
pub fn load_stateful_run_events(path: &Path) -> Vec<StatefulRunEventRecord> {
let Ok(content) = std::fs::read_to_string(path) else {
return Vec::new();
};
let mut rows = content
.lines()
.enumerate()
.filter_map(
|(index, line)| match serde_json::from_str::<StatefulRunEventRecord>(line) {
Ok(record) => Some(record),
Err(error) => {
tracing::warn!(
line = index + 1,
error = %error,
"skipping invalid stateful run event row"
);
None
}
},
)
.collect::<Vec<_>>();
rows.sort_by_key(|row| row.seq);
rows
}
pub async fn compact_stateful_run_event_log(
path: &Path,
retention_ms: u64,
now_ms: u64,
) -> anyhow::Result<usize> {
if retention_ms == 0 || !path.exists() {
return Ok(0);
}
let mut cursors = STATEFUL_RUN_EVENT_APPEND_CURSORS.lock().await;
repair_jsonl_torn_tail(path, "stateful run event log").await?;
let cutoff_ms = now_ms.saturating_sub(retention_ms);
let mut retained = Vec::new();
let mut compacted =
HashMap::<StatefulRunEventCompactionKey, StatefulRunEventCompactionSummary>::new();
let mut pruned = 0_usize;
for row in load_stateful_run_events(path) {
if row.occurred_at_ms >= cutoff_ms {
retained.push(row);
continue;
}
pruned += 1;
let key = StatefulRunEventCompactionKey::from_event(&row);
compacted
.entry(key)
.and_modify(|summary| summary.observe(&row))
.or_insert_with(|| StatefulRunEventCompactionSummary::from_event(&row));
}
if pruned == 0 {
return Ok(0);
}
for row in &retained {
if row.event_type == STATEFUL_RUN_EVENT_LOG_COMPACTED_TYPE {
let key = StatefulRunEventCompactionKey::from_event(row);
if let Some(summary) = compacted.get_mut(&key) {
summary.observe(row);
}
}
}
retained.retain(|row| {
row.event_type != STATEFUL_RUN_EVENT_LOG_COMPACTED_TYPE
|| !compacted.contains_key(&StatefulRunEventCompactionKey::from_event(row))
});
retained.extend(
compacted
.values()
.map(|summary| summary.compaction_marker(now_ms)),
);
retained.sort_by(|left, right| {
left.run_id
.cmp(&right.run_id)
.then_with(|| left.seq.cmp(&right.seq))
.then_with(|| left.event_id.cmp(&right.event_id))
});
write_stateful_run_event_rows(path, &retained).await?;
invalidate_stateful_run_event_cursors_for_path(&mut cursors, path);
Ok(pruned)
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct StatefulRunEventCompactionKey {
run_id: String,
org_id: String,
workspace_id: String,
deployment_id: Option<String>,
}
impl StatefulRunEventCompactionKey {
fn from_event(row: &StatefulRunEventRecord) -> Self {
Self {
run_id: row.run_id.clone(),
org_id: row.scope.tenant_context.org_id.clone(),
workspace_id: row.scope.tenant_context.workspace_id.clone(),
deployment_id: row.scope.tenant_context.deployment_id.clone(),
}
}
}
#[derive(Debug, Clone)]
struct StatefulRunEventCompactionSummary {
key: StatefulRunEventCompactionKey,
compacted_through_seq: u64,
compacted_through_ms: u64,
pruned_events: usize,
pruned_event_ids: Vec<(String, u64)>,
scope: super::types::StatefulRuntimeScope,
}
impl StatefulRunEventCompactionSummary {
fn from_event(row: &StatefulRunEventRecord) -> Self {
let mut pruned_event_ids = stateful_run_event_compacted_event_ids(row);
pruned_event_ids.push((row.event_id.clone(), row.seq));
Self {
key: StatefulRunEventCompactionKey::from_event(row),
compacted_through_seq: row.seq,
compacted_through_ms: row.occurred_at_ms,
pruned_events: 1,
pruned_event_ids,
scope: row.scope.clone(),
}
}
fn observe(&mut self, row: &StatefulRunEventRecord) {
self.compacted_through_seq = self.compacted_through_seq.max(row.seq);
self.compacted_through_ms = self.compacted_through_ms.max(row.occurred_at_ms);
self.pruned_events += 1;
self.pruned_event_ids
.extend(stateful_run_event_compacted_event_ids(row));
self.pruned_event_ids.push((row.event_id.clone(), row.seq));
}
fn compaction_marker(&self, now_ms: u64) -> StatefulRunEventRecord {
let compacted_through_seq = self.compacted_through_seq.to_string();
let digest = crate::sha256_hex(&[
&self.key.org_id,
&self.key.workspace_id,
self.key.deployment_id.as_deref().unwrap_or(""),
&self.key.run_id,
compacted_through_seq.as_str(),
]);
let compacted_event_ids = self
.pruned_event_ids
.iter()
.map(|(event_id, seq)| serde_json::json!({ "event_id": event_id, "seq": seq }))
.collect::<Vec<_>>();
StatefulRunEventRecord {
schema_version: 1,
event_id: format!("stateful-compaction-{digest}"),
run_id: self.key.run_id.clone(),
seq: self.compacted_through_seq,
event_type: STATEFUL_RUN_EVENT_LOG_COMPACTED_TYPE.to_string(),
occurred_at_ms: now_ms,
scope: self.scope.clone(),
actor: None,
phase_id: None,
phase_transition: None,
wait_kind: None,
causation_id: None,
correlation_id: None,
payload: serde_json::json!({
"compacted_through_seq": self.compacted_through_seq,
"compacted_through_ms": self.compacted_through_ms,
"pruned_events": self.pruned_events,
"compacted_event_ids": compacted_event_ids,
"source": "stateful_event_log_compaction"
}),
}
}
}
async fn write_stateful_run_event_rows(
path: &Path,
rows: &[StatefulRunEventRecord],
) -> anyhow::Result<()> {
let mut content = Vec::new();
for row in rows {
content.extend_from_slice(&serde_json::to_vec(row)?);
content.push(b'\n');
}
write_file_atomically(path, &content, "stateful run event log").await
}
pub fn query_stateful_run_events(
path: &Path,
tenant: &TenantContext,
query: StatefulRunEventQuery<'_>,
) -> Vec<StatefulRunEventRecord> {
let mut rows = load_stateful_run_events(path)
.into_iter()
.filter(|row| row.run_id == query.run_id)
.filter(|row| {
query
.after_seq
.map(|after_seq| row.seq > after_seq)
.unwrap_or(true)
})
.filter(|row| {
query
.before_seq
.map(|before_seq| row.seq < before_seq)
.unwrap_or(true)
})
.filter(|row| row.visible_to_tenant(tenant))
.collect::<Vec<_>>();
if let Some(limit) = query.limit.filter(|limit| *limit > 0) {
if rows.len() > limit {
if query.tail {
let remove_count = rows.len() - limit;
rows.drain(0..remove_count);
} else {
rows.truncate(limit);
}
}
}
rows
}
pub fn next_stateful_run_event_seq(
path: &Path,
tenant_context: &TenantContext,
run_id: &str,
) -> u64 {
query_stateful_run_events(
path,
tenant_context,
StatefulRunEventQuery {
run_id,
after_seq: None,
before_seq: None,
limit: None,
tail: false,
},
)
.last()
.map(|event| event.seq.saturating_add(1))
.unwrap_or(1)
}
pub fn stateful_run_event_seq_by_id(
path: &Path,
tenant_context: &TenantContext,
run_id: &str,
event_id: &str,
) -> Option<u64> {
query_stateful_run_events(
path,
tenant_context,
StatefulRunEventQuery {
run_id,
after_seq: None,
before_seq: None,
limit: None,
tail: false,
},
)
.into_iter()
.find(|event| event.event_id == event_id)
.map(|event| event.seq)
}
pub async fn write_stateful_run_snapshot(
root: &Path,
snapshot: &StatefulRunSnapshotRecord,
) -> anyhow::Result<PathBuf> {
let dir = root.join(safe_path_segment(&snapshot.run_id));
tokio::fs::create_dir_all(&dir).await.with_context(|| {
format!(
"failed to create stateful snapshot directory {}",
dir.display()
)
})?;
let path = dir.join(format!("{}.json", safe_path_segment(&snapshot.snapshot_id)));
let content = serde_json::to_vec_pretty(snapshot)?;
write_file_atomically(&path, &content, "stateful snapshot").await?;
Ok(path)
}
pub fn list_stateful_run_snapshots(
root: &Path,
tenant: &TenantContext,
run_id: &str,
limit: Option<usize>,
) -> Vec<StatefulRunSnapshotRecord> {
let dir = root.join(safe_path_segment(run_id));
let Ok(entries) = std::fs::read_dir(dir) else {
return Vec::new();
};
let mut snapshots = entries
.filter_map(Result::ok)
.map(|entry| entry.path())
.filter(|path| path.extension().and_then(|value| value.to_str()) == Some("json"))
.filter_map(|path| match read_stateful_run_snapshot(&path) {
Ok(snapshot) => Some(snapshot),
Err(error) => {
tracing::warn!(
path = %path.display(),
error = %error,
"skipping invalid stateful run snapshot"
);
None
}
})
.filter(|snapshot| snapshot.run_id == run_id)
.filter(|snapshot| snapshot.visible_to_tenant(tenant))
.collect::<Vec<_>>();
snapshots.sort_by_key(|snapshot| snapshot.seq);
if let Some(limit) = limit.filter(|limit| *limit > 0) {
if snapshots.len() > limit {
let remove_count = snapshots.len() - limit;
snapshots.drain(0..remove_count);
}
}
snapshots
}
pub fn read_stateful_run_snapshot(path: &Path) -> anyhow::Result<StatefulRunSnapshotRecord> {
let content = std::fs::read_to_string(path)
.with_context(|| format!("failed to read stateful snapshot {}", path.display()))?;
let value = serde_json::from_str::<Value>(&content)
.with_context(|| format!("failed to parse stateful snapshot {}", path.display()))?;
let has_phase = value.get("phase").is_some();
let has_phase_history = value.get("phase_history").is_some();
let has_allowed_next_phases = value.get("allowed_next_phases").is_some();
let mut snapshot = serde_json::from_value::<StatefulRunSnapshotRecord>(value)
.with_context(|| format!("failed to parse stateful snapshot {}", path.display()))?;
hydrate_legacy_snapshot_phase_fields(
&mut snapshot,
has_phase,
has_phase_history,
has_allowed_next_phases,
);
Ok(snapshot)
}
fn hydrate_legacy_snapshot_phase_fields(
snapshot: &mut StatefulRunSnapshotRecord,
has_phase: bool,
has_phase_history: bool,
has_allowed_next_phases: bool,
) {
if !has_phase {
let phase_state = phase_state_from_status(
&snapshot.run_id,
&snapshot.status,
snapshot.created_at_ms,
snapshot.phase_id.as_deref(),
);
snapshot.phase = phase_state.phase;
if !has_phase_history {
snapshot.phase_history = phase_state.phase_history;
}
if !has_allowed_next_phases {
snapshot.allowed_next_phases = phase_state.allowed_next_phases;
}
return;
}
if !has_allowed_next_phases {
snapshot.allowed_next_phases = snapshot.phase.allowed_next_phases().to_vec();
}
}
pub fn read_stateful_run_snapshot_for_run(
root: &Path,
tenant: &TenantContext,
run_id: &str,
snapshot_id: &str,
) -> anyhow::Result<Option<StatefulRunSnapshotRecord>> {
let path = stateful_run_snapshot_path(root, run_id, snapshot_id);
if !path.exists() {
return Ok(None);
}
let snapshot = read_stateful_run_snapshot(&path)?;
if snapshot.run_id != run_id || snapshot.snapshot_id != snapshot_id {
return Ok(None);
}
if !snapshot.visible_to_tenant(tenant) {
return Ok(None);
}
Ok(Some(snapshot))
}
pub fn stateful_run_snapshot_path(root: &Path, run_id: &str, snapshot_id: &str) -> PathBuf {
root.join(safe_path_segment(run_id))
.join(format!("{}.json", safe_path_segment(snapshot_id)))
}
fn safe_path_segment(value: &str) -> String {
let segment = value
.chars()
.map(|ch| {
if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_' | '.') {
ch
} else {
'_'
}
})
.collect::<String>();
if segment.is_empty() || segment == "." || segment == ".." {
"_".to_string()
} else {
segment
}
}
#[cfg(test)]
mod tests {
use serde_json::json;
use tandem_types::{PrincipalKind, PrincipalRef, TenantContext};
use uuid::Uuid;
use super::*;
use crate::stateful_runtime::types::{
StatefulRunEventRecord, StatefulRunSnapshotRecord, StatefulRuntimeScope,
StatefulWorkflowRunStatus,
};
use crate::stateful_runtime::StatefulWorkflowPhase;
fn tenant(org: &str, workspace: &str) -> TenantContext {
TenantContext::explicit_user_workspace(org, workspace, None, "user-a")
}
fn event(seq: u64, run_id: &str, tenant_context: TenantContext) -> StatefulRunEventRecord {
StatefulRunEventRecord {
schema_version: 1,
event_id: format!("event-{seq}"),
run_id: run_id.to_string(),
seq,
event_type: "stateful_runtime.test".to_string(),
occurred_at_ms: seq * 100,
scope: StatefulRuntimeScope::from_tenant_context(tenant_context),
actor: Some(PrincipalRef::new(PrincipalKind::HumanUser, "user-a")),
phase_id: None,
phase_transition: None,
wait_kind: None,
causation_id: None,
correlation_id: None,
payload: json!({ "seq": seq }),
}
}
#[tokio::test]
async fn query_filters_stateful_events_by_tenant_run_and_sequence() {
let path =
std::env::temp_dir().join(format!("stateful-runtime-events-{}.jsonl", Uuid::new_v4()));
let tenant_a = tenant("org-a", "workspace-a");
let tenant_b = tenant("org-b", "workspace-b");
for record in [
event(1, "run-a", tenant_a.clone()),
event(2, "run-b", tenant_a.clone()),
event(3, "run-a", tenant_b),
event(4, "run-a", tenant_a.clone()),
] {
append_stateful_run_event(&path, &record)
.await
.expect("append");
}
let rows = query_stateful_run_events(
&path,
&tenant_a,
StatefulRunEventQuery {
run_id: "run-a",
after_seq: Some(1),
before_seq: None,
limit: None,
tail: false,
},
);
assert_eq!(rows.iter().map(|row| row.seq).collect::<Vec<_>>(), vec![4]);
let _ = tokio::fs::remove_file(path).await;
}
#[tokio::test]
async fn query_supports_before_sequence_and_tail_window() {
let path = std::env::temp_dir().join(format!(
"stateful-runtime-events-window-{}.jsonl",
Uuid::new_v4()
));
let tenant_a = tenant("org-a", "workspace-a");
for seq in 1..=5 {
append_stateful_run_event(&path, &event(seq, "run-a", tenant_a.clone()))
.await
.expect("append");
}
let rows = query_stateful_run_events(
&path,
&tenant_a,
StatefulRunEventQuery {
run_id: "run-a",
after_seq: None,
before_seq: Some(5),
limit: Some(2),
tail: true,
},
);
assert_eq!(
rows.iter().map(|row| row.seq).collect::<Vec<_>>(),
vec![3, 4]
);
let _ = tokio::fs::remove_file(path).await;
}
#[tokio::test]
async fn event_sequence_helpers_read_latest_and_existing_event_id() {
let path = std::env::temp_dir().join(format!(
"stateful-runtime-events-seq-{}.jsonl",
Uuid::new_v4()
));
let tenant_a = tenant("org-a", "workspace-a");
append_stateful_run_event(&path, &event(3, "run-a", tenant_a.clone()))
.await
.expect("append first");
append_stateful_run_event(&path, &event(7, "run-a", tenant_a.clone()))
.await
.expect("append second");
assert_eq!(next_stateful_run_event_seq(&path, &tenant_a, "run-a"), 8);
assert_eq!(
stateful_run_event_seq_by_id(&path, &tenant_a, "run-a", "event-3"),
Some(3)
);
assert_eq!(
stateful_run_event_seq_by_id(&path, &tenant_a, "run-a", "missing"),
None
);
let _ = tokio::fs::remove_file(path).await;
}
#[tokio::test]
async fn append_repairs_torn_event_log_tail_before_writing() {
let path = std::env::temp_dir().join(format!(
"stateful-runtime-events-torn-tail-{}.jsonl",
Uuid::new_v4()
));
let tenant_a = tenant("org-a", "workspace-a");
let first = event(1, "run-a", tenant_a.clone());
let first_line = serde_json::to_string(&first).expect("serialize first event");
std::fs::write(&path, format!("{first_line}\n{{\"partial\":")).expect("write torn log");
append_stateful_run_event(&path, &event(2, "run-a", tenant_a))
.await
.expect("append after repair");
let content = std::fs::read_to_string(&path).expect("read repaired log");
assert!(content.ends_with('\n'));
assert!(!content.contains("partial"));
let rows = load_stateful_run_events(&path);
assert_eq!(
rows.iter().map(|row| row.seq).collect::<Vec<_>>(),
vec![1, 2]
);
let _ = tokio::fs::remove_file(path).await;
}
#[tokio::test]
async fn append_preserves_complete_tail_event_without_newline() {
let path = std::env::temp_dir().join(format!(
"stateful-runtime-events-complete-tail-{}.jsonl",
Uuid::new_v4()
));
let tenant_a = tenant("org-a", "workspace-a");
let first_line =
serde_json::to_string(&event(1, "run-a", tenant_a.clone())).expect("serialize first");
let second_line =
serde_json::to_string(&event(2, "run-a", tenant_a.clone())).expect("serialize second");
std::fs::write(&path, format!("{first_line}\n{second_line}"))
.expect("write missing newline log");
append_stateful_run_event(&path, &event(3, "run-a", tenant_a))
.await
.expect("append after newline repair");
let content = std::fs::read_to_string(&path).expect("read repaired log");
assert!(content.ends_with('\n'));
assert!(content.contains("\"event_id\":\"event-2\""));
let rows = load_stateful_run_events(&path);
assert_eq!(
rows.iter().map(|row| row.seq).collect::<Vec<_>>(),
vec![1, 2, 3]
);
let _ = tokio::fs::remove_file(path).await;
}
#[tokio::test]
async fn append_once_uses_event_id_as_idempotency_key() {
let path = std::env::temp_dir().join(format!(
"stateful-runtime-events-once-{}.jsonl",
Uuid::new_v4()
));
let tenant = tenant("org-a", "workspace-a");
let record = event(1, "run-a", tenant);
assert!(append_stateful_run_event_once(&path, &record)
.await
.expect("first append"));
assert!(!append_stateful_run_event_once(&path, &record)
.await
.expect("duplicate append"));
let rows = load_stateful_run_events(&path);
assert_eq!(rows.len(), 1);
let _ = tokio::fs::remove_file(path).await;
}
#[tokio::test]
async fn append_once_serializes_concurrent_duplicate_writes() {
let path = std::env::temp_dir().join(format!(
"stateful-runtime-events-once-concurrent-{}.jsonl",
Uuid::new_v4()
));
let tenant = tenant("org-a", "workspace-a");
let record = event(1, "run-a", tenant);
let (first, second) = tokio::join!(
append_stateful_run_event_once(&path, &record),
append_stateful_run_event_once(&path, &record)
);
let appended = [first.expect("first append"), second.expect("second append")]
.into_iter()
.filter(|value| *value)
.count();
assert_eq!(appended, 1);
let rows = load_stateful_run_events(&path);
assert_eq!(rows.len(), 1);
let _ = tokio::fs::remove_file(path).await;
}
#[tokio::test]
async fn append_once_with_next_seq_assigns_monotonic_sequence_under_lock() {
let path = std::env::temp_dir().join(format!(
"stateful-runtime-events-next-seq-{}.jsonl",
Uuid::new_v4()
));
let tenant = tenant("org-a", "workspace-a");
let first = event(0, "run-a", tenant.clone());
let mut second = event(0, "run-a", tenant.clone());
second.event_id = "event-second".to_string();
let (first_appended, first_seq) =
append_stateful_run_event_once_with_next_seq(&path, &tenant, &first)
.await
.expect("first append");
let (second_appended, second_seq) =
append_stateful_run_event_once_with_next_seq(&path, &tenant, &second)
.await
.expect("second append");
let (duplicate_appended, duplicate_seq) =
append_stateful_run_event_once_with_next_seq(&path, &tenant, &first)
.await
.expect("duplicate append");
assert!(first_appended);
assert!(second_appended);
assert!(!duplicate_appended);
assert_eq!(first_seq, 1);
assert_eq!(second_seq, 2);
assert_eq!(duplicate_seq, 1);
let rows = load_stateful_run_events(&path);
assert_eq!(
rows.iter().map(|row| row.seq).collect::<Vec<_>>(),
vec![1, 2]
);
let _ = tokio::fs::remove_file(path).await;
}
#[tokio::test]
async fn append_once_with_next_seq_uses_cursor_and_preserves_tenant_boundaries() {
let path = std::env::temp_dir().join(format!(
"stateful-runtime-events-next-seq-cursor-{}.jsonl",
Uuid::new_v4()
));
let tenant_a = tenant("org-a", "workspace-a");
let tenant_b = tenant("org-b", "workspace-b");
let mut first = event(0, "run-a", tenant_a.clone());
first.event_id = "event-a".to_string();
let mut second = event(0, "run-a", tenant_a.clone());
second.event_id = "event-b".to_string();
let mut foreign = event(0, "run-a", tenant_b.clone());
foreign.event_id = "event-foreign".to_string();
let (_, first_seq) = append_stateful_run_event_once_with_next_seq(&path, &tenant_a, &first)
.await
.expect("append first");
let (_, second_seq) =
append_stateful_run_event_once_with_next_seq(&path, &tenant_a, &second)
.await
.expect("append second");
let (_, foreign_seq) =
append_stateful_run_event_once_with_next_seq(&path, &tenant_b, &foreign)
.await
.expect("append foreign");
assert_eq!((first_seq, second_seq, foreign_seq), (1, 2, 1));
let _ = tokio::fs::remove_file(path).await;
}
#[tokio::test]
async fn compaction_prunes_old_events_and_preserves_next_sequence_marker() {
let path = std::env::temp_dir().join(format!(
"stateful-runtime-events-compact-{}.jsonl",
Uuid::new_v4()
));
let tenant_a = tenant("org-a", "workspace-a");
for seq in 1..=3 {
let mut record = event(seq, "run-a", tenant_a.clone());
record.occurred_at_ms = seq * 100;
append_stateful_run_event(&path, &record)
.await
.expect("append old event");
}
let mut retained = event(4, "run-a", tenant_a.clone());
retained.occurred_at_ms = 900;
append_stateful_run_event(&path, &retained)
.await
.expect("append retained event");
let pruned = compact_stateful_run_event_log(&path, 500, 1_000)
.await
.expect("compact");
assert_eq!(pruned, 3);
let rows = query_stateful_run_events(
&path,
&tenant_a,
StatefulRunEventQuery {
run_id: "run-a",
after_seq: None,
before_seq: None,
limit: None,
tail: false,
},
);
assert_eq!(
rows.iter().map(|row| row.seq).collect::<Vec<_>>(),
vec![3, 4]
);
assert_eq!(rows[0].event_type, STATEFUL_RUN_EVENT_LOG_COMPACTED_TYPE);
let compacted_ids = stateful_run_event_compacted_event_ids(&rows[0]);
assert_eq!(
compacted_ids,
vec![
("event-1".to_string(), 1),
("event-2".to_string(), 2),
("event-3".to_string(), 3),
]
);
let mut duplicate = event(0, "run-a", tenant_a.clone());
duplicate.event_id = "event-2".to_string();
let (duplicate_appended, duplicate_seq) =
append_stateful_run_event_once_with_next_seq(&path, &tenant_a, &duplicate)
.await
.expect("append duplicate");
assert!(!duplicate_appended);
assert_eq!(duplicate_seq, 2);
let mut next = event(0, "run-a", tenant_a.clone());
next.event_id = "event-next".to_string();
let (_, next_seq) = append_stateful_run_event_once_with_next_seq(&path, &tenant_a, &next)
.await
.expect("append next");
assert_eq!(next_seq, 5);
let second_pruned = compact_stateful_run_event_log(&path, 500, 2_000)
.await
.expect("compact again");
assert_eq!(second_pruned, 3);
let rows = query_stateful_run_events(
&path,
&tenant_a,
StatefulRunEventQuery {
run_id: "run-a",
after_seq: None,
before_seq: None,
limit: None,
tail: false,
},
);
assert_eq!(rows.len(), 1);
let compacted_ids = stateful_run_event_compacted_event_ids(&rows[0]);
assert!(compacted_ids
.iter()
.any(|(event_id, seq)| event_id == "event-1" && *seq == 1));
assert!(compacted_ids
.iter()
.any(|(event_id, seq)| event_id == "event-next" && *seq == 5));
let mut old_duplicate = event(0, "run-a", tenant_a.clone());
old_duplicate.event_id = "event-1".to_string();
let (old_duplicate_appended, old_duplicate_seq) =
append_stateful_run_event_once_with_next_seq(&path, &tenant_a, &old_duplicate)
.await
.expect("append old duplicate");
assert!(!old_duplicate_appended);
assert_eq!(old_duplicate_seq, 1);
let _ = tokio::fs::remove_file(path).await;
}
#[tokio::test]
async fn compaction_keeps_tenant_boundaries_independent() {
let path = std::env::temp_dir().join(format!(
"stateful-runtime-events-compact-tenant-{}.jsonl",
Uuid::new_v4()
));
let tenant_a = tenant("org-a", "workspace-a");
let tenant_b = tenant("org-b", "workspace-b");
for (seq, tenant_context) in [(1, tenant_a.clone()), (2, tenant_b.clone())] {
let mut record = event(seq, "shared-run", tenant_context);
record.occurred_at_ms = 100;
append_stateful_run_event(&path, &record)
.await
.expect("append old event");
}
let pruned = compact_stateful_run_event_log(&path, 500, 1_000)
.await
.expect("compact");
assert_eq!(pruned, 2);
assert_eq!(
next_stateful_run_event_seq(&path, &tenant_a, "shared-run"),
2
);
assert_eq!(
next_stateful_run_event_seq(&path, &tenant_b, "shared-run"),
3
);
let _ = tokio::fs::remove_file(path).await;
}
#[tokio::test]
async fn snapshot_paths_are_scoped_under_sanitized_run_directory() {
let root =
std::env::temp_dir().join(format!("stateful-runtime-snapshots-{}", Uuid::new_v4()));
let status = StatefulWorkflowRunStatus::Running;
let phase_state = phase_state_from_status("run/../a", &status, 700, Some("phase-a"));
let snapshot = StatefulRunSnapshotRecord {
schema_version: 1,
snapshot_id: "snapshot/../a".to_string(),
run_id: "run/../a".to_string(),
seq: 7,
created_at_ms: 700,
scope: StatefulRuntimeScope::from_tenant_context(tenant("org-a", "workspace-a")),
status,
phase: phase_state.phase,
phase_history: phase_state.phase_history,
allowed_next_phases: phase_state.allowed_next_phases,
phase_id: Some("phase-a".to_string()),
source_record_kind: None,
checkpoint: Some(json!({ "phase": "phase-a" })),
payload_digest: Some("sha256:test".to_string()),
workflow_definition_version: None,
workflow_definition_snapshot_hash: None,
metadata: None,
};
let path = write_stateful_run_snapshot(&root, &snapshot)
.await
.expect("write snapshot");
assert!(path.starts_with(&root));
assert_eq!(
path.file_name().and_then(|name| name.to_str()),
Some("snapshot_.._a.json")
);
let loaded = read_stateful_run_snapshot(&path).expect("read snapshot");
assert_eq!(loaded.snapshot_id, snapshot.snapshot_id);
assert_eq!(loaded.run_id, snapshot.run_id);
let _ = tokio::fs::remove_dir_all(root).await;
}
#[tokio::test]
async fn read_snapshot_derives_phase_fields_for_legacy_v1_rows() {
let path =
std::env::temp_dir().join(format!("stateful-runtime-legacy-{}.json", Uuid::new_v4()));
let legacy = json!({
"schema_version": 1,
"snapshot_id": "snapshot-1",
"run_id": "run-1",
"seq": 42,
"created_at_ms": 4200,
"scope": StatefulRuntimeScope::from_tenant_context(tenant("org-a", "workspace-a")),
"status": "running",
"phase_id": "node-a",
"checkpoint": { "step": "legacy" }
});
std::fs::write(
&path,
serde_json::to_vec_pretty(&legacy).expect("serialize legacy"),
)
.expect("write legacy snapshot");
let loaded = read_stateful_run_snapshot(&path).expect("read legacy snapshot");
assert_eq!(loaded.phase, StatefulWorkflowPhase::RunningPhase);
assert_eq!(
loaded.allowed_next_phases,
StatefulWorkflowPhase::RunningPhase
.allowed_next_phases()
.to_vec()
);
assert_eq!(loaded.phase_history.len(), 1);
assert_eq!(
loaded.phase_history[0].reason.as_deref(),
Some("observed_status:running")
);
assert_eq!(loaded.phase_history[0].phase_id.as_deref(), Some("node-a"));
let _ = tokio::fs::remove_file(path).await;
}
#[tokio::test]
async fn snapshot_paths_rewrite_dot_only_and_empty_segments() {
let root =
std::env::temp_dir().join(format!("stateful-runtime-snapshots-dot-{}", Uuid::new_v4()));
for (run_id, snapshot_id) in [("..", "."), ("", "")] {
let status = StatefulWorkflowRunStatus::Running;
let phase_state = phase_state_from_status(run_id, &status, 100, None);
let snapshot = StatefulRunSnapshotRecord {
schema_version: 1,
snapshot_id: snapshot_id.to_string(),
run_id: run_id.to_string(),
seq: 1,
created_at_ms: 100,
scope: StatefulRuntimeScope::from_tenant_context(tenant("org-a", "workspace-a")),
status,
phase: phase_state.phase,
phase_history: phase_state.phase_history,
allowed_next_phases: phase_state.allowed_next_phases,
phase_id: None,
source_record_kind: None,
checkpoint: None,
payload_digest: None,
workflow_definition_version: None,
workflow_definition_snapshot_hash: None,
metadata: None,
};
let path = write_stateful_run_snapshot(&root, &snapshot)
.await
.expect("write snapshot");
assert!(path.starts_with(&root));
assert_eq!(
path.parent()
.and_then(|path| path.file_name())
.and_then(|name| name.to_str()),
Some("_")
);
assert_eq!(
path.file_name().and_then(|name| name.to_str()),
Some("_.json")
);
}
let _ = tokio::fs::remove_dir_all(root).await;
}
#[tokio::test]
async fn snapshot_listing_and_fetch_are_tenant_filtered() {
let root = std::env::temp_dir().join(format!(
"stateful-runtime-snapshots-filtered-{}",
Uuid::new_v4()
));
let tenant_a = tenant("org-a", "workspace-a");
let tenant_b = tenant("org-b", "workspace-b");
for (seq, tenant_context) in [
(1, tenant_a.clone()),
(2, tenant_b.clone()),
(3, tenant_a.clone()),
] {
let status = StatefulWorkflowRunStatus::Running;
let phase_state = phase_state_from_status("run-a", &status, seq * 100, None);
let snapshot = StatefulRunSnapshotRecord {
schema_version: 1,
snapshot_id: format!("snapshot-{seq}"),
run_id: "run-a".to_string(),
seq,
created_at_ms: seq * 100,
scope: StatefulRuntimeScope::from_tenant_context(tenant_context),
status,
phase: phase_state.phase,
phase_history: phase_state.phase_history,
allowed_next_phases: phase_state.allowed_next_phases,
phase_id: None,
source_record_kind: None,
checkpoint: None,
payload_digest: None,
workflow_definition_version: None,
workflow_definition_snapshot_hash: None,
metadata: None,
};
write_stateful_run_snapshot(&root, &snapshot)
.await
.expect("write snapshot");
}
let snapshots = list_stateful_run_snapshots(&root, &tenant_a, "run-a", None);
assert_eq!(
snapshots
.iter()
.map(|snapshot| snapshot.seq)
.collect::<Vec<_>>(),
vec![1, 3]
);
let visible = read_stateful_run_snapshot_for_run(&root, &tenant_a, "run-a", "snapshot-3")
.expect("read visible snapshot");
assert_eq!(visible.map(|snapshot| snapshot.seq), Some(3));
let hidden = read_stateful_run_snapshot_for_run(&root, &tenant_a, "run-a", "snapshot-2")
.expect("read hidden snapshot");
assert!(hidden.is_none());
let latest = list_stateful_run_snapshots(&root, &tenant_a, "run-a", Some(1));
assert_eq!(
latest
.iter()
.map(|snapshot| snapshot.seq)
.collect::<Vec<_>>(),
vec![3]
);
let _ = tokio::fs::remove_dir_all(root).await;
}
}