use std::collections::HashMap;
use langgraph_checkpoint::checkpoint::base::BaseCheckpointSaver;
use langgraph_checkpoint::checkpoint::types::{
ChannelVersions, Checkpoint, CheckpointMetadata, CheckpointSource,
};
use langgraph_checkpoint::config::RunnableConfig;
use langgraph_checkpoint_sqlite::SqliteSaver;
use serde_json::Value as JsonValue;
fn config_for(thread_id: &str) -> RunnableConfig {
serde_json::from_value(serde_json::json!({
"configurable": { "thread_id": thread_id, "checkpoint_ns": "" }
}))
.unwrap()
}
fn make_checkpoint(channel_values: Vec<(&str, JsonValue, i64)>) -> (Checkpoint, ChannelVersions) {
let mut cp = Checkpoint::empty();
let mut versions: ChannelVersions = HashMap::new();
for (k, v, ver) in channel_values {
cp.channel_values.insert(k.to_string(), v);
cp.channel_versions
.insert(k.to_string(), JsonValue::Number(ver.into()));
versions.insert(k.to_string(), JsonValue::Number(ver.into()));
}
(cp, versions)
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let saver = SqliteSaver::from_conn_string("sqlite::memory:").await?;
saver.setup().await?;
let cfg = config_for("demo-thread");
let (cp1, vers1) = make_checkpoint(vec![
("messages", serde_json::json!(["hello"]), 1),
("counter", serde_json::json!(1), 1),
]);
let metadata = CheckpointMetadata {
source: Some(CheckpointSource::Loop),
step: Some(0),
..Default::default()
};
let next_cfg = saver.aput(&cfg, &cp1, &metadata, &vers1).await?;
println!("stored checkpoint #1: id={}", cp1.id);
let (cp2, vers2) = make_checkpoint(vec![
("messages", serde_json::json!(["hello", "world"]), 2),
("counter", serde_json::json!(2), 2),
]);
let metadata2 = CheckpointMetadata {
source: Some(CheckpointSource::Loop),
step: Some(1),
..Default::default()
};
saver.aput(&next_cfg, &cp2, &metadata2, &vers2).await?;
println!("stored checkpoint #2: id={}", cp2.id);
let latest = saver.aget_tuple(&cfg).await?.expect("latest checkpoint");
println!(
"latest checkpoint id={} step={:?} channel_values={:?}",
latest.checkpoint.id, latest.metadata.step, latest.checkpoint.channel_values
);
let history = saver.alist(Some(&cfg), None, None, None).await?;
println!("history length = {}", history.len());
for (i, t) in history.iter().enumerate() {
println!(" [{}] id={} step={:?}", i, t.checkpoint.id, t.metadata.step);
}
let cfg_with_id: RunnableConfig = serde_json::from_value(serde_json::json!({
"configurable": {
"thread_id": "demo-thread",
"checkpoint_ns": "",
"checkpoint_id": cp2.id,
}
}))?;
saver
.aput_writes(
&cfg_with_id,
vec![(
"outbox".into(),
"task-1".into(),
serde_json::json!({"sent": true}),
)],
"task-1".into(),
"".into(),
)
.await?;
let with_writes = saver.aget_tuple(&cfg_with_id).await?.unwrap();
println!("pending writes = {:?}", with_writes.pending_writes);
saver.adelete_thread("demo-thread".into()).await?;
println!("thread deleted");
Ok(())
}