use chrono::Utc;
use jamjet_core::workflow::{ExecutionId, WorkflowExecution, WorkflowStatus};
use jamjet_state::{
backend::{StateBackend, WorkflowDefinition},
event::{Event, EventKind},
materializer::{apply_events, materialize},
snapshot::Snapshot,
SqliteBackend,
};
use serde_json::json;
use std::path::PathBuf;
fn temp_db_path() -> PathBuf {
let mut path = std::env::temp_dir();
let unique = uuid::Uuid::new_v4().to_string().replace('-', "");
path.push(format!("jamjet_test_{unique}.db"));
path
}
async fn open_db(path: &PathBuf) -> SqliteBackend {
let url = format!("sqlite://{}", path.display());
SqliteBackend::open(&url)
.await
.expect("failed to open test db")
}
fn sample_execution(id: &ExecutionId) -> WorkflowExecution {
let now = Utc::now();
WorkflowExecution {
execution_id: id.clone(),
workflow_id: "wf-crash-test".into(),
workflow_version: "1.0.0".into(),
status: WorkflowStatus::Running,
initial_input: json!({ "x": 1 }),
current_state: json!({ "x": 1 }),
started_at: now,
updated_at: now,
completed_at: None,
session_type: None,
}
}
#[tokio::test]
async fn test_state_machine_transitions() {
let path = temp_db_path();
let db = open_db(&path).await;
let id = ExecutionId::new();
db.create_execution(sample_execution(&id)).await.unwrap();
db.update_execution_status(&id, WorkflowStatus::Running)
.await
.unwrap();
let exec = db.get_execution(&id).await.unwrap().unwrap();
assert_eq!(exec.status, WorkflowStatus::Running);
db.update_execution_status(&id, WorkflowStatus::Paused)
.await
.unwrap();
let exec = db.get_execution(&id).await.unwrap().unwrap();
assert_eq!(exec.status, WorkflowStatus::Paused);
db.update_execution_status(&id, WorkflowStatus::Running)
.await
.unwrap();
db.update_execution_status(&id, WorkflowStatus::Completed)
.await
.unwrap();
let exec = db.get_execution(&id).await.unwrap().unwrap();
assert_eq!(exec.status, WorkflowStatus::Completed);
assert!(exec.completed_at.is_some());
std::fs::remove_file(&path).ok();
}
#[tokio::test]
async fn test_crash_recovery_from_event_log() {
let path = temp_db_path();
let id = ExecutionId::new();
{
let db = open_db(&path).await;
db.create_execution(sample_execution(&id)).await.unwrap();
let events = vec![
Event::new(
id.clone(),
1,
EventKind::WorkflowStarted {
workflow_id: "wf-crash-test".into(),
workflow_version: "1.0.0".into(),
initial_input: json!({ "x": 1 }),
},
),
Event::new(
id.clone(),
2,
EventKind::NodeScheduled {
node_id: "step1".into(),
queue_type: "tool".into(),
},
),
Event::new(
id.clone(),
3,
EventKind::NodeStarted {
node_id: "step1".into(),
worker_id: "worker-1".into(),
attempt: 0,
},
),
Event::new(
id.clone(),
4,
EventKind::NodeCompleted {
node_id: "step1".into(),
output: json!("hello"),
state_patch: json!({ "greeting": "hello" }),
duration_ms: 42,
gen_ai_system: None,
gen_ai_model: None,
input_tokens: None,
output_tokens: None,
finish_reason: None,
cost_usd: None,
provenance: None,
},
),
Event::new(
id.clone(),
5,
EventKind::NodeScheduled {
node_id: "step2".into(),
queue_type: "tool".into(),
},
),
];
for event in events {
db.append_event(event).await.unwrap();
}
}
{
let db = open_db(&path).await;
let exec = db.get_execution(&id).await.unwrap().unwrap();
assert_eq!(exec.workflow_id, "wf-crash-test");
let events = db.get_events(&id).await.unwrap();
assert_eq!(events.len(), 5);
assert_eq!(db.latest_sequence(&id).await.unwrap(), 5);
let mat = materialize(&db, &id).await.unwrap();
assert_eq!(mat.current_state["greeting"], "hello");
assert!(mat.completed_nodes.contains_key("step1"));
assert!(mat.active_nodes.contains("step2"));
assert_eq!(mat.status, WorkflowStatus::Running);
assert_eq!(mat.last_sequence, 5);
}
std::fs::remove_file(&path).ok();
}
#[tokio::test]
async fn test_recovery_with_snapshot() {
let path = temp_db_path();
let id = ExecutionId::new();
{
let db = open_db(&path).await;
db.create_execution(sample_execution(&id)).await.unwrap();
for seq in 1..=5i64 {
let event = Event::new(
id.clone(),
seq,
EventKind::NodeCompleted {
node_id: format!("n{seq}"),
output: json!(seq),
state_patch: json!({ format!("n{seq}"): seq }),
duration_ms: 1,
gen_ai_system: None,
gen_ai_model: None,
input_tokens: None,
output_tokens: None,
finish_reason: None,
cost_usd: None,
provenance: None,
},
);
db.append_event(event).await.unwrap();
}
let snap = Snapshot::new(
id.clone(),
5,
json!({ "n1": 1, "n2": 2, "n3": 3, "n4": 4, "n5": 5 }),
);
db.write_snapshot(snap).await.unwrap();
db.append_event(Event::new(
id.clone(),
6,
EventKind::NodeCompleted {
node_id: "n6".into(),
output: json!(6),
state_patch: json!({ "n6": 6 }),
duration_ms: 1,
gen_ai_system: None,
gen_ai_model: None,
input_tokens: None,
output_tokens: None,
finish_reason: None,
cost_usd: None,
provenance: None,
},
))
.await
.unwrap();
db.append_event(Event::new(
id.clone(),
7,
EventKind::WorkflowCompleted {
final_state: json!({ "done": true }),
},
))
.await
.unwrap();
}
{
let db = open_db(&path).await;
let mat = materialize(&db, &id).await.unwrap();
assert_eq!(mat.current_state["done"], true);
assert_eq!(mat.status, WorkflowStatus::Completed);
assert!(mat.completed_nodes.contains_key("n6"));
assert_eq!(mat.last_sequence, 7);
}
std::fs::remove_file(&path).ok();
}
#[tokio::test]
async fn test_event_sequencing() {
let path = temp_db_path();
let db = open_db(&path).await;
let id = ExecutionId::new();
db.create_execution(sample_execution(&id)).await.unwrap();
for seq in [3i64, 1, 2] {
db.append_event(Event::new(
id.clone(),
seq,
EventKind::NodeScheduled {
node_id: format!("n{seq}"),
queue_type: "tool".into(),
},
))
.await
.unwrap();
}
let events = db.get_events(&id).await.unwrap();
assert_eq!(events[0].sequence, 1);
assert_eq!(events[1].sequence, 2);
assert_eq!(events[2].sequence, 3);
let since = db.get_events_since(&id, 1).await.unwrap();
assert_eq!(since.len(), 2);
assert_eq!(since[0].sequence, 2);
std::fs::remove_file(&path).ok();
}