use anyhow::{Context, Result};
use crate::cli::GraphCommands;
pub async fn run(command: GraphCommands) -> Result<()> {
match command {
GraphCommands::Steps { thread_id, db } => run_steps(&thread_id, db.as_deref()).await,
GraphCommands::Replay { thread_id, from, to, db } => {
run_replay(&thread_id, from, to, db.as_deref()).await
}
GraphCommands::Fork { thread_id, at, new_thread, db } => {
run_fork(&thread_id, at, &new_thread, db.as_deref()).await
}
GraphCommands::Resume { thread_id, from, db } => {
run_resume(&thread_id, from, db.as_deref()).await
}
}
}
async fn run_steps(thread_id: &str, db: Option<&str>) -> Result<()> {
let checkpointer = load_checkpointer(db)?;
let handle = standalone_handle(thread_id, checkpointer.clone());
let steps = handle.steps().await.context("failed to list steps")?;
if steps.is_empty() {
println!("No checkpoints found for thread '{thread_id}'.");
return Ok(());
}
println!("Thread: {thread_id}");
println!("{:<6} {:<40} {:<26} Pending Nodes", "Step", "Checkpoint ID", "Timestamp");
println!("{}", "-".repeat(100));
for step in &steps {
let ts = step.timestamp.as_deref().unwrap_or("-");
let pending = if step.pending_nodes.is_empty() {
"(none)".to_string()
} else {
step.pending_nodes.join(", ")
};
println!("{:<6} {:<40} {:<26} {}", step.step, step.checkpoint_id, ts, pending);
}
println!("\nTotal: {} checkpoint(s)", steps.len());
Ok(())
}
async fn run_replay(
thread_id: &str,
from: usize,
to: Option<usize>,
db: Option<&str>,
) -> Result<()> {
let checkpointer = load_checkpointer(db)?;
let handle = standalone_handle(thread_id, checkpointer.clone());
let transitions = handle.replay(from, to).await.context("failed to replay")?;
if transitions.is_empty() {
println!("No state transitions found in the specified range.");
return Ok(());
}
let range_desc = match to {
Some(end) => format!("{from}..={end}"),
None => format!("{from}..end"),
};
println!("Replaying thread '{thread_id}' (steps {range_desc}):\n");
for (step, state) in &transitions {
println!("── Step {step} ──");
let json = serde_json::to_string_pretty(state).unwrap_or_else(|_| format!("{state:?}"));
println!("{json}\n");
}
println!("Replay complete: {} state(s) shown.", transitions.len());
Ok(())
}
async fn run_fork(thread_id: &str, at: usize, new_thread: &str, db: Option<&str>) -> Result<()> {
let checkpointer = load_checkpointer(db)?;
let handle = standalone_handle(thread_id, checkpointer.clone());
handle.fork_at(at, new_thread).await.context("failed to fork thread")?;
println!("Forked thread '{thread_id}' at step {at} → new thread '{new_thread}'.");
Ok(())
}
async fn run_resume(thread_id: &str, from: usize, db: Option<&str>) -> Result<()> {
let _checkpointer = load_checkpointer(db)?;
println!("Resume is not yet supported from the CLI.");
println!();
println!("Resuming execution requires a compiled graph, which cannot be");
println!("constructed from CLI arguments alone.");
println!();
println!("To resume programmatically, use the TimeTravelHandle API:");
println!();
println!(" let handle = graph.time_travel(\"{thread_id}\");");
println!(" let config = ExecutionConfig::new(\"{thread_id}\");");
println!(" let state = handle.resume_from({from}, config).await?;");
println!();
println!("See: https://docs.rs/adk-graph/latest/adk_graph/time_travel/");
Ok(())
}
fn load_checkpointer(
db: Option<&str>,
) -> Result<std::sync::Arc<dyn adk_graph::checkpoint::Checkpointer>> {
match db {
Some(_path) => {
#[cfg(feature = "graph-sqlite")]
{
let rt = tokio::runtime::Handle::current();
let checkpointer = rt.block_on(async {
adk_graph::checkpoint::SqliteCheckpointer::new(_path)
.await
.context("failed to open SQLite checkpoint database")
})?;
Ok(std::sync::Arc::new(checkpointer))
}
#[cfg(not(feature = "graph-sqlite"))]
{
anyhow::bail!(
"SQLite checkpoint support requires the `graph-sqlite` feature.\n\
Rebuild with: cargo install adk-cli --features graph-sqlite\n\
\n\
Without --db, an in-memory checkpointer is used (useful for testing)."
);
}
}
None => {
Ok(std::sync::Arc::new(adk_graph::checkpoint::MemoryCheckpointer::new()))
}
}
}
fn standalone_handle(
thread_id: &str,
checkpointer: std::sync::Arc<dyn adk_graph::checkpoint::Checkpointer>,
) -> StandaloneTimeTravelHandle {
StandaloneTimeTravelHandle { thread_id: thread_id.to_string(), checkpointer }
}
struct StandaloneTimeTravelHandle {
thread_id: String,
checkpointer: std::sync::Arc<dyn adk_graph::checkpoint::Checkpointer>,
}
struct CliStepInfo {
step: usize,
checkpoint_id: String,
timestamp: Option<String>,
pending_nodes: Vec<String>,
}
impl StandaloneTimeTravelHandle {
async fn steps(&self) -> adk_graph::error::Result<Vec<CliStepInfo>> {
let checkpoints = self.checkpointer.list(&self.thread_id).await?;
let mut steps: Vec<CliStepInfo> = checkpoints
.into_iter()
.map(|cp| CliStepInfo {
step: cp.step,
checkpoint_id: cp.checkpoint_id,
timestamp: Some(cp.created_at.to_rfc3339()),
pending_nodes: cp.pending_nodes,
})
.collect();
steps.sort_by_key(|s| s.step);
Ok(steps)
}
async fn replay(
&self,
from_step: usize,
to_step: Option<usize>,
) -> adk_graph::error::Result<Vec<(usize, adk_graph::state::State)>> {
let mut checkpoints = self.checkpointer.list(&self.thread_id).await?;
checkpoints.sort_by_key(|cp| cp.step);
let results: Vec<(usize, adk_graph::state::State)> = checkpoints
.into_iter()
.filter(|cp| cp.step >= from_step && to_step.is_none_or(|end| cp.step <= end))
.map(|cp| (cp.step, cp.state))
.collect();
if results.is_empty() || results[0].0 != from_step {
return Err(adk_graph::error::GraphError::CheckpointError(format!(
"no checkpoint found at step {from_step} for thread '{}'",
self.thread_id
)));
}
Ok(results)
}
async fn fork_at(&self, step: usize, new_thread_id: &str) -> adk_graph::error::Result<()> {
let checkpoints = self.checkpointer.list(&self.thread_id).await?;
let checkpoint = checkpoints.into_iter().find(|cp| cp.step == step).ok_or_else(|| {
adk_graph::error::GraphError::CheckpointError(format!(
"no checkpoint found at step {step} for thread '{}'",
self.thread_id
))
})?;
let forked = adk_graph::state::Checkpoint::new(
new_thread_id,
checkpoint.state,
checkpoint.step,
checkpoint.pending_nodes,
);
self.checkpointer.save(&forked).await?;
Ok(())
}
}