use jamjet_core::workflow::{ExecutionId, WorkflowExecution, WorkflowStatus};
use jamjet_state::backend::{StateBackend, WorkItem, WorkflowDefinition};
use jamjet_state::event::EventKind;
use jamjet_state::{Event, SqliteBackend, DEFAULT_TENANT};
use std::sync::Arc;
use std::time::Duration;
use uuid::Uuid;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn workflow_runs_to_completion() {
let db_path = std::env::temp_dir().join(format!("jjtest-{}.db", Uuid::new_v4()));
let url = format!("sqlite://{}", db_path.display());
let backend: Arc<dyn StateBackend> = Arc::new(
SqliteBackend::open(&url)
.await
.expect("open sqlite backend"),
);
let ir = serde_json::json!({
"workflow_id": "e2e-completion",
"version": "0.1.0",
"name": null,
"description": null,
"state_schema": "",
"start_node": "route",
"nodes": {
"route": {
"id": "route",
"kind": { "type": "condition", "branches": [] },
"retry_policy": null,
"node_timeout_secs": null,
"description": null,
"labels": {}
}
},
"edges": [ { "from": "route", "to": "end", "condition": null } ],
"retry_policies": {},
"timeouts": {},
"models": {},
"tools": {},
"mcp_servers": {},
"remote_agents": {},
"labels": {}
});
backend
.store_workflow(WorkflowDefinition {
workflow_id: "e2e-completion".into(),
version: "0.1.0".into(),
ir,
created_at: chrono::Utc::now(),
tenant_id: DEFAULT_TENANT.into(),
})
.await
.expect("store_workflow");
let execution_id = ExecutionId::new();
let now = chrono::Utc::now();
backend
.create_execution(WorkflowExecution {
execution_id: execution_id.clone(),
workflow_id: "e2e-completion".into(),
workflow_version: "0.1.0".into(),
status: WorkflowStatus::Running,
initial_input: serde_json::json!({}),
current_state: serde_json::json!({}),
started_at: now,
updated_at: now,
completed_at: None,
session_type: None,
})
.await
.expect("create_execution");
backend
.append_event(Event::new(
execution_id.clone(),
1,
EventKind::WorkflowStarted {
workflow_id: "e2e-completion".into(),
workflow_version: "0.1.0".into(),
initial_input: serde_json::json!({}),
},
))
.await
.expect("append WorkflowStarted");
backend
.append_event(Event::new(
execution_id.clone(),
2,
EventKind::NodeScheduled {
node_id: "route".into(),
queue_type: "general".into(),
},
))
.await
.expect("append NodeScheduled");
backend
.enqueue_work_item(WorkItem {
id: Uuid::new_v4(),
execution_id: execution_id.clone(),
node_id: "route".into(),
queue_type: "general".into(),
payload: serde_json::json!({
"workflow_id": "e2e-completion",
"workflow_version": "0.1.0",
}),
attempt: 0,
max_attempts: 3,
created_at: now,
lease_expires_at: None,
worker_id: None,
tenant_id: DEFAULT_TENANT.into(),
})
.await
.expect("enqueue_work_item");
let scheduler = jamjet_scheduler::Scheduler::new(backend.clone())
.with_poll_interval(Duration::from_millis(25));
let sched_handle = tokio::spawn(async move { scheduler.run().await });
let worker_handles = jamjet_worker::default_pool(backend.clone()).spawn();
let poll_backend = backend.clone();
let exec_id = execution_id.clone();
let wait = async move {
loop {
let exec = poll_backend
.get_execution(&exec_id)
.await
.expect("get_execution")
.expect("execution exists");
if matches!(
exec.status,
WorkflowStatus::Completed | WorkflowStatus::Failed
) {
return exec.status;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
};
let result = tokio::time::timeout(Duration::from_secs(20), wait).await;
sched_handle.abort();
for h in worker_handles {
h.abort();
}
if result.is_err() {
let events = backend.get_events(&execution_id).await.unwrap_or_default();
let kinds: Vec<String> = events.iter().map(|e| format!("{:?}", e.kind)).collect();
eprintln!("E2E TIMEOUT — {} events: {:#?}", events.len(), kinds);
if let Ok(Some(e)) = backend.get_execution(&execution_id).await {
eprintln!("E2E final stored status: {:?}", e.status);
}
}
let _ = std::fs::remove_file(&db_path);
let status = result.ok();
assert_eq!(
status,
Some(WorkflowStatus::Completed),
"workflow should run to completion (got {status:?})"
);
}