#[cfg(feature = "openai")]
#[tokio::test]
async fn live_durable_graph_checkpoints_then_resumes_across_model_call() {
use std::sync::Arc;
use serde_json::json;
use tinyagents::harness::message::Message;
use tinyagents::harness::providers::openai::OpenAiModel;
use tinyagents::harness::runtime::AgentHarness;
use tinyagents::{
Checkpointer, Command, GraphBuilder, InMemoryCheckpointer, Interrupt, NodeContext,
NodeResult,
};
let _ = dotenvy::dotenv();
if std::env::var("OPENAI_API_KEY").is_err() {
eprintln!(
"skipping live_durable_graph_checkpoints_then_resumes_across_model_call: \
OPENAI_API_KEY is not set"
);
return;
}
#[derive(Clone, Debug, Default)]
struct QaState {
question: String,
answer: Option<String>,
model_calls: usize,
}
let mut harness: AgentHarness<()> = AgentHarness::new();
harness
.register_model(
"openai",
Arc::new(OpenAiModel::from_env().expect("OPENAI_API_KEY present")),
)
.set_default_model("openai");
let harness = Arc::new(harness);
let checkpointer = Arc::new(InMemoryCheckpointer::<QaState>::new());
let node_harness = harness.clone();
let graph = GraphBuilder::<QaState, QaState>::overwrite()
.with_graph_id("live-checkpoint-resume")
.add_node("ask", move |mut state: QaState, ctx: NodeContext| {
let harness = node_harness.clone();
async move {
match ctx.resume {
None => Ok(NodeResult::Interrupt(Interrupt::new(
"ask",
json!({ "ask": "approve calling the model?" }),
))),
Some(_) => {
let run = harness
.invoke_default(&(), vec![Message::user(state.question.clone())])
.await?;
state.answer = run.text();
state.model_calls = run.model_calls;
Ok(NodeResult::Update(state))
}
}
}
})
.set_entry("ask")
.set_finish("ask")
.compile()
.expect("durable graph compiles")
.with_checkpointer(checkpointer.clone());
let paused = graph
.run_with_thread(
"ckpt-thread",
QaState {
question: "Reply with a single short greeting.".to_string(),
answer: None,
model_calls: 0,
},
)
.await
.expect("first pass succeeds");
assert!(
paused.is_interrupted(),
"the run should pause before the model call"
);
assert_eq!(paused.interrupts.len(), 1);
assert_eq!(paused.interrupts[0].node.as_str(), "ask");
assert!(
paused.state.answer.is_none(),
"no answer should exist before resume"
);
let checkpoints = checkpointer
.list("ckpt-thread")
.await
.expect("list succeeds");
assert!(
!checkpoints.is_empty(),
"an interrupt must persist a checkpoint"
);
let resumed = graph
.resume("ckpt-thread", Command::resume(json!({ "approved": true })))
.await
.expect("resume succeeds");
assert!(!resumed.is_interrupted(), "resumed run must complete");
assert!(
resumed.state.model_calls >= 1,
"the resumed pass should make at least one model call"
);
let answer = resumed.state.answer.unwrap_or_default();
assert!(
!answer.trim().is_empty(),
"the resumed run should fold a non-empty model answer into state"
);
let after_resume = checkpointer
.list("ckpt-thread")
.await
.expect("list succeeds");
assert!(
after_resume.len() >= checkpoints.len(),
"resuming should not drop persisted checkpoints"
);
}