use std::collections::HashMap;
use async_trait::async_trait;
use serde::{Deserialize, Serialize, de::DeserializeOwned};
use serde_json::Value;
use crate::checkpoint::Checkpointer;
use crate::error::GraphError;
use crate::state::{Checkpoint, State};
pub trait Diff: Sized {
type Delta: Clone + Serialize + DeserializeOwned + Send + Sync;
fn diff(old: &Self, new: &Self) -> Self::Delta;
fn apply(base: &Self, delta: &Self::Delta) -> Self;
}
#[derive(Debug, Clone)]
pub struct DeltaConfig {
pub full_snapshot_interval: u32,
}
impl Default for DeltaConfig {
fn default() -> Self {
Self { full_snapshot_interval: 10 }
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CheckpointType {
Full,
Delta {
base_checkpoint_id: String,
},
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct VecDelta {
pub full_replacement: bool,
pub start_index: usize,
pub items: Vec<Value>,
}
impl Diff for Vec<Value> {
type Delta = VecDelta;
fn diff(old: &Self, new: &Self) -> Self::Delta {
if new.len() >= old.len() && old.iter().zip(new.iter()).all(|(a, b)| a == b) {
VecDelta {
full_replacement: false,
start_index: old.len(),
items: new[old.len()..].to_vec(),
}
} else {
VecDelta { full_replacement: true, start_index: 0, items: new.clone() }
}
}
fn apply(base: &Self, delta: &Self::Delta) -> Self {
if delta.full_replacement {
delta.items.clone()
} else {
let mut result = base[..delta.start_index].to_vec();
result.extend_from_slice(&delta.items);
result
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct MapDelta {
pub added: HashMap<String, Value>,
pub removed: Vec<String>,
pub modified: HashMap<String, Value>,
}
impl Diff for HashMap<String, Value> {
type Delta = MapDelta;
fn diff(old: &Self, new: &Self) -> Self::Delta {
let mut added = HashMap::new();
let mut removed = Vec::new();
let mut modified = HashMap::new();
for (key, old_value) in old {
match new.get(key) {
None => removed.push(key.clone()),
Some(new_value) if new_value != old_value => {
modified.insert(key.clone(), new_value.clone());
}
_ => {} }
}
for (key, new_value) in new {
if !old.contains_key(key) {
added.insert(key.clone(), new_value.clone());
}
}
MapDelta { added, removed, modified }
}
fn apply(base: &Self, delta: &Self::Delta) -> Self {
let mut result = base.clone();
for key in &delta.removed {
result.remove(key);
}
for (key, value) in &delta.added {
result.insert(key.clone(), value.clone());
}
for (key, value) in &delta.modified {
result.insert(key.clone(), value.clone());
}
result
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub enum StringOp {
Equal(String),
Insert(String),
Delete(usize),
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct StringDelta {
pub ops: Vec<StringOp>,
}
#[cfg(feature = "delta-checkpoint")]
impl Diff for String {
type Delta = StringDelta;
fn diff(old: &Self, new: &Self) -> Self::Delta {
use similar::{ChangeTag, TextDiff};
let text_diff = TextDiff::from_chars(old.as_str(), new.as_str());
let mut ops = Vec::new();
for change in text_diff.iter_all_changes() {
let value = change.value();
match change.tag() {
ChangeTag::Equal => {
if let Some(StringOp::Equal(s)) = ops.last_mut() {
s.push_str(value);
} else {
ops.push(StringOp::Equal(value.to_string()));
}
}
ChangeTag::Insert => {
if let Some(StringOp::Insert(s)) = ops.last_mut() {
s.push_str(value);
} else {
ops.push(StringOp::Insert(value.to_string()));
}
}
ChangeTag::Delete => {
if let Some(StringOp::Delete(count)) = ops.last_mut() {
*count += value.chars().count();
} else {
ops.push(StringOp::Delete(value.chars().count()));
}
}
}
}
StringDelta { ops }
}
fn apply(base: &Self, delta: &Self::Delta) -> Self {
let mut result = String::new();
let mut chars = base.chars();
for op in &delta.ops {
match op {
StringOp::Equal(s) => {
for _ in 0..s.chars().count() {
if let Some(c) = chars.next() {
result.push(c);
}
}
}
StringOp::Delete(count) => {
for _ in 0..*count {
chars.next();
}
}
StringOp::Insert(s) => {
result.push_str(s);
}
}
}
result
}
}
const META_CHECKPOINT_TYPE: &str = "__delta_ckpt_type";
const META_DELTA_PAYLOAD: &str = "__delta_ckpt_payload";
const META_BASE_CHECKPOINT_ID: &str = "__delta_ckpt_base_id";
const TYPE_FULL: &str = "full";
const TYPE_DELTA: &str = "delta";
pub struct DeltaCheckpointer<C: Checkpointer> {
inner: C,
config: DeltaConfig,
}
impl<C: Checkpointer> DeltaCheckpointer<C> {
pub fn new(inner: C, config: DeltaConfig) -> Self {
Self { inner, config }
}
pub fn inner(&self) -> &C {
&self.inner
}
pub fn config(&self) -> &DeltaConfig {
&self.config
}
fn is_full_snapshot_step(&self, step: usize) -> bool {
step == 0 || (step as u32) % self.config.full_snapshot_interval == 0
}
fn reconstruct_state(
checkpoints: &[Checkpoint],
target_step: usize,
) -> crate::error::Result<State> {
let full_snapshot_idx = checkpoints
.iter()
.enumerate()
.rev()
.find(|(_, cp)| {
cp.step <= target_step
&& cp.metadata.get(META_CHECKPOINT_TYPE).and_then(|v| v.as_str())
== Some(TYPE_FULL)
})
.map(|(idx, _)| idx);
let full_idx = full_snapshot_idx.ok_or_else(|| {
GraphError::CheckpointError(
"No full snapshot found for state reconstruction".to_string(),
)
})?;
let base_checkpoint = &checkpoints[full_idx];
let mut state = base_checkpoint.state.clone();
for cp in &checkpoints[full_idx + 1..] {
if cp.step > target_step {
break;
}
let cp_type =
cp.metadata.get(META_CHECKPOINT_TYPE).and_then(|v| v.as_str()).unwrap_or(TYPE_FULL);
if cp_type == TYPE_DELTA {
let delta_json = cp.metadata.get(META_DELTA_PAYLOAD).ok_or_else(|| {
GraphError::CheckpointError(format!(
"Delta checkpoint at step {} missing payload",
cp.step
))
})?;
let delta: MapDelta = serde_json::from_value(delta_json.clone()).map_err(|e| {
GraphError::CheckpointError(format!(
"Failed to deserialize delta at step {}: {e}",
cp.step
))
})?;
state = <HashMap<String, Value> as Diff>::apply(&state, &delta);
} else {
state = cp.state.clone();
}
}
Ok(state)
}
}
#[async_trait]
impl<C: Checkpointer> Checkpointer for DeltaCheckpointer<C> {
async fn save(&self, checkpoint: &Checkpoint) -> crate::error::Result<String> {
if self.is_full_snapshot_step(checkpoint.step) {
let mut full_checkpoint = checkpoint.clone();
full_checkpoint
.metadata
.insert(META_CHECKPOINT_TYPE.to_string(), Value::String(TYPE_FULL.to_string()));
self.inner.save(&full_checkpoint).await
} else {
let all_checkpoints = self.inner.list(&checkpoint.thread_id).await?;
let previous = all_checkpoints
.iter()
.filter(|cp| cp.step < checkpoint.step)
.max_by_key(|cp| cp.step);
match previous {
Some(prev_cp) => {
let prev_state = Self::reconstruct_state(&all_checkpoints, prev_cp.step)?;
let delta =
<HashMap<String, Value> as Diff>::diff(&prev_state, &checkpoint.state);
let mut delta_checkpoint = checkpoint.clone();
delta_checkpoint.state = HashMap::new(); delta_checkpoint.metadata.insert(
META_CHECKPOINT_TYPE.to_string(),
Value::String(TYPE_DELTA.to_string()),
);
delta_checkpoint.metadata.insert(
META_BASE_CHECKPOINT_ID.to_string(),
Value::String(prev_cp.checkpoint_id.clone()),
);
delta_checkpoint.metadata.insert(
META_DELTA_PAYLOAD.to_string(),
serde_json::to_value(&delta).map_err(|e| {
GraphError::CheckpointError(format!("Failed to serialize delta: {e}"))
})?,
);
self.inner.save(&delta_checkpoint).await
}
None => {
let mut full_checkpoint = checkpoint.clone();
full_checkpoint.metadata.insert(
META_CHECKPOINT_TYPE.to_string(),
Value::String(TYPE_FULL.to_string()),
);
self.inner.save(&full_checkpoint).await
}
}
}
}
async fn load(&self, thread_id: &str) -> crate::error::Result<Option<Checkpoint>> {
let all_checkpoints = self.inner.list(thread_id).await?;
if all_checkpoints.is_empty() {
return Ok(None);
}
let latest = all_checkpoints.last().unwrap();
let target_step = latest.step;
let full_state = Self::reconstruct_state(&all_checkpoints, target_step)?;
let mut result = latest.clone();
result.state = full_state;
Ok(Some(result))
}
async fn load_by_id(&self, checkpoint_id: &str) -> crate::error::Result<Option<Checkpoint>> {
let checkpoint = self.inner.load_by_id(checkpoint_id).await?;
match checkpoint {
Some(cp) => {
let all_checkpoints = self.inner.list(&cp.thread_id).await?;
let full_state = Self::reconstruct_state(&all_checkpoints, cp.step)?;
let mut result = cp;
result.state = full_state;
Ok(Some(result))
}
None => Ok(None),
}
}
async fn list(&self, thread_id: &str) -> crate::error::Result<Vec<Checkpoint>> {
self.inner.list(thread_id).await
}
async fn delete(&self, thread_id: &str) -> crate::error::Result<()> {
self.inner.delete(thread_id).await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn delta_config_default_interval_is_10() {
let config = DeltaConfig::default();
assert_eq!(config.full_snapshot_interval, 10);
}
#[test]
fn delta_config_custom_interval() {
let config = DeltaConfig { full_snapshot_interval: 25 };
assert_eq!(config.full_snapshot_interval, 25);
}
#[test]
fn checkpoint_type_full_variant() {
let ct = CheckpointType::Full;
assert!(matches!(ct, CheckpointType::Full));
}
#[test]
fn checkpoint_type_delta_variant() {
let ct = CheckpointType::Delta { base_checkpoint_id: "abc-123".to_string() };
assert!(matches!(ct, CheckpointType::Delta { .. }));
if let CheckpointType::Delta { base_checkpoint_id } = ct {
assert_eq!(base_checkpoint_id, "abc-123");
}
}
#[test]
fn vec_value_diff_append() {
use serde_json::json;
let old = vec![json!(1), json!(2), json!(3)];
let new = vec![json!(1), json!(2), json!(3), json!(4), json!(5)];
let delta = <Vec<Value> as Diff>::diff(&old, &new);
assert!(!delta.full_replacement);
assert_eq!(delta.start_index, 3);
assert_eq!(delta.items, vec![json!(4), json!(5)]);
assert_eq!(<Vec<Value> as Diff>::apply(&old, &delta), new);
}
#[test]
fn vec_value_diff_identical() {
use serde_json::json;
let old = vec![json!("a"), json!("b")];
let new = vec![json!("a"), json!("b")];
let delta = <Vec<Value> as Diff>::diff(&old, &new);
assert!(!delta.full_replacement);
assert_eq!(delta.start_index, 2);
assert!(delta.items.is_empty());
assert_eq!(<Vec<Value> as Diff>::apply(&old, &delta), new);
}
#[test]
fn vec_value_diff_shorter_new() {
use serde_json::json;
let old = vec![json!(1), json!(2), json!(3)];
let new = vec![json!(1)];
let delta = <Vec<Value> as Diff>::diff(&old, &new);
assert!(delta.full_replacement);
assert_eq!(delta.items, vec![json!(1)]);
assert_eq!(<Vec<Value> as Diff>::apply(&old, &delta), new);
}
#[test]
fn vec_value_diff_modified_element() {
use serde_json::json;
let old = vec![json!(1), json!(2), json!(3)];
let new = vec![json!(1), json!(99), json!(3)];
let delta = <Vec<Value> as Diff>::diff(&old, &new);
assert!(delta.full_replacement);
assert_eq!(<Vec<Value> as Diff>::apply(&old, &delta), new);
}
#[test]
fn vec_value_diff_empty_to_items() {
use serde_json::json;
let old: Vec<Value> = vec![];
let new = vec![json!(1), json!(2)];
let delta = <Vec<Value> as Diff>::diff(&old, &new);
assert!(!delta.full_replacement);
assert_eq!(delta.start_index, 0);
assert_eq!(delta.items, vec![json!(1), json!(2)]);
assert_eq!(<Vec<Value> as Diff>::apply(&old, &delta), new);
}
#[test]
fn vec_value_diff_items_to_empty() {
use serde_json::json;
let old = vec![json!(1), json!(2)];
let new: Vec<Value> = vec![];
let delta = <Vec<Value> as Diff>::diff(&old, &new);
assert!(delta.full_replacement);
assert!(delta.items.is_empty());
assert_eq!(<Vec<Value> as Diff>::apply(&old, &delta), new);
}
#[test]
fn map_value_diff_added_keys() {
use serde_json::json;
let old: HashMap<String, Value> = [("a".to_string(), json!(1))].into_iter().collect();
let new: HashMap<String, Value> =
[("a".to_string(), json!(1)), ("b".to_string(), json!(2)), ("c".to_string(), json!(3))]
.into_iter()
.collect();
let delta = <HashMap<String, Value> as Diff>::diff(&old, &new);
assert!(delta.removed.is_empty());
assert!(delta.modified.is_empty());
assert_eq!(delta.added.len(), 2);
assert_eq!(delta.added.get("b"), Some(&json!(2)));
assert_eq!(delta.added.get("c"), Some(&json!(3)));
assert_eq!(<HashMap<String, Value> as Diff>::apply(&old, &delta), new);
}
#[test]
fn map_value_diff_removed_keys() {
use serde_json::json;
let old: HashMap<String, Value> =
[("a".to_string(), json!(1)), ("b".to_string(), json!(2)), ("c".to_string(), json!(3))]
.into_iter()
.collect();
let new: HashMap<String, Value> = [("a".to_string(), json!(1))].into_iter().collect();
let delta = <HashMap<String, Value> as Diff>::diff(&old, &new);
assert!(delta.added.is_empty());
assert!(delta.modified.is_empty());
assert_eq!(delta.removed.len(), 2);
assert!(delta.removed.contains(&"b".to_string()));
assert!(delta.removed.contains(&"c".to_string()));
assert_eq!(<HashMap<String, Value> as Diff>::apply(&old, &delta), new);
}
#[test]
fn map_value_diff_modified_keys() {
use serde_json::json;
let old: HashMap<String, Value> =
[("a".to_string(), json!(1)), ("b".to_string(), json!(2))].into_iter().collect();
let new: HashMap<String, Value> =
[("a".to_string(), json!(1)), ("b".to_string(), json!(99))].into_iter().collect();
let delta = <HashMap<String, Value> as Diff>::diff(&old, &new);
assert!(delta.added.is_empty());
assert!(delta.removed.is_empty());
assert_eq!(delta.modified.len(), 1);
assert_eq!(delta.modified.get("b"), Some(&json!(99)));
assert_eq!(<HashMap<String, Value> as Diff>::apply(&old, &delta), new);
}
#[test]
fn map_value_diff_mixed_changes() {
use serde_json::json;
let old: HashMap<String, Value> =
[("a".to_string(), json!(1)), ("b".to_string(), json!(2)), ("c".to_string(), json!(3))]
.into_iter()
.collect();
let new: HashMap<String, Value> = [
("a".to_string(), json!(1)),
("b".to_string(), json!(99)),
("d".to_string(), json!(4)),
]
.into_iter()
.collect();
let delta = <HashMap<String, Value> as Diff>::diff(&old, &new);
assert_eq!(delta.added.len(), 1);
assert_eq!(delta.added.get("d"), Some(&json!(4)));
assert_eq!(delta.removed, vec!["c".to_string()]);
assert_eq!(delta.modified.len(), 1);
assert_eq!(delta.modified.get("b"), Some(&json!(99)));
assert_eq!(<HashMap<String, Value> as Diff>::apply(&old, &delta), new);
}
#[test]
fn map_value_diff_identical() {
use serde_json::json;
let old: HashMap<String, Value> =
[("a".to_string(), json!(1)), ("b".to_string(), json!(2))].into_iter().collect();
let new = old.clone();
let delta = <HashMap<String, Value> as Diff>::diff(&old, &new);
assert!(delta.added.is_empty());
assert!(delta.removed.is_empty());
assert!(delta.modified.is_empty());
assert_eq!(<HashMap<String, Value> as Diff>::apply(&old, &delta), new);
}
#[test]
fn map_value_diff_empty_to_populated() {
use serde_json::json;
let old: HashMap<String, Value> = HashMap::new();
let new: HashMap<String, Value> =
[("a".to_string(), json!(1)), ("b".to_string(), json!(2))].into_iter().collect();
let delta = <HashMap<String, Value> as Diff>::diff(&old, &new);
assert_eq!(delta.added.len(), 2);
assert!(delta.removed.is_empty());
assert!(delta.modified.is_empty());
assert_eq!(<HashMap<String, Value> as Diff>::apply(&old, &delta), new);
}
#[test]
fn map_value_diff_populated_to_empty() {
use serde_json::json;
let old: HashMap<String, Value> =
[("a".to_string(), json!(1)), ("b".to_string(), json!(2))].into_iter().collect();
let new: HashMap<String, Value> = HashMap::new();
let delta = <HashMap<String, Value> as Diff>::diff(&old, &new);
assert!(delta.added.is_empty());
assert_eq!(delta.removed.len(), 2);
assert!(delta.modified.is_empty());
assert_eq!(<HashMap<String, Value> as Diff>::apply(&old, &delta), new);
}
#[test]
fn map_value_diff_nested_values() {
use serde_json::json;
let old: HashMap<String, Value> = [
("config".to_string(), json!({"host": "localhost", "port": 8080})),
("name".to_string(), json!("app")),
]
.into_iter()
.collect();
let new: HashMap<String, Value> = [
("config".to_string(), json!({"host": "prod.example.com", "port": 443})),
("name".to_string(), json!("app")),
("version".to_string(), json!("1.0.0")),
]
.into_iter()
.collect();
let delta = <HashMap<String, Value> as Diff>::diff(&old, &new);
assert_eq!(delta.added.get("version"), Some(&json!("1.0.0")));
assert!(delta.removed.is_empty());
assert_eq!(
delta.modified.get("config"),
Some(&json!({"host": "prod.example.com", "port": 443}))
);
assert_eq!(<HashMap<String, Value> as Diff>::apply(&old, &delta), new);
}
#[cfg(feature = "delta-checkpoint")]
#[test]
fn string_diff_identical() {
let old = "hello world".to_string();
let new = "hello world".to_string();
let delta = <String as Diff>::diff(&old, &new);
assert_eq!(delta.ops.len(), 1);
assert_eq!(delta.ops[0], StringOp::Equal("hello world".to_string()));
assert_eq!(<String as Diff>::apply(&old, &delta), new);
}
#[cfg(feature = "delta-checkpoint")]
#[test]
fn string_diff_completely_different() {
let old = "abc".to_string();
let new = "xyz".to_string();
let delta = <String as Diff>::diff(&old, &new);
let reconstructed = <String as Diff>::apply(&old, &delta);
assert_eq!(reconstructed, new);
}
#[cfg(feature = "delta-checkpoint")]
#[test]
fn string_diff_suffix_change() {
let old = "hello world".to_string();
let new = "hello rust".to_string();
let delta = <String as Diff>::diff(&old, &new);
let reconstructed = <String as Diff>::apply(&old, &delta);
assert_eq!(reconstructed, new);
}
#[cfg(feature = "delta-checkpoint")]
#[test]
fn string_diff_prefix_change() {
let old = "hello world".to_string();
let new = "goodbye world".to_string();
let delta = <String as Diff>::diff(&old, &new);
let reconstructed = <String as Diff>::apply(&old, &delta);
assert_eq!(reconstructed, new);
}
#[cfg(feature = "delta-checkpoint")]
#[test]
fn string_diff_middle_insertion() {
let old = "abcdef".to_string();
let new = "abcXYZdef".to_string();
let delta = <String as Diff>::diff(&old, &new);
let reconstructed = <String as Diff>::apply(&old, &delta);
assert_eq!(reconstructed, new);
}
#[cfg(feature = "delta-checkpoint")]
#[test]
fn string_diff_middle_deletion() {
let old = "abcXYZdef".to_string();
let new = "abcdef".to_string();
let delta = <String as Diff>::diff(&old, &new);
let reconstructed = <String as Diff>::apply(&old, &delta);
assert_eq!(reconstructed, new);
}
#[cfg(feature = "delta-checkpoint")]
#[test]
fn string_diff_empty_to_content() {
let old = String::new();
let new = "hello".to_string();
let delta = <String as Diff>::diff(&old, &new);
assert_eq!(delta.ops.len(), 1);
assert_eq!(delta.ops[0], StringOp::Insert("hello".to_string()));
assert_eq!(<String as Diff>::apply(&old, &delta), new);
}
#[cfg(feature = "delta-checkpoint")]
#[test]
fn string_diff_content_to_empty() {
let old = "hello".to_string();
let new = String::new();
let delta = <String as Diff>::diff(&old, &new);
assert_eq!(delta.ops.len(), 1);
assert_eq!(delta.ops[0], StringOp::Delete(5));
assert_eq!(<String as Diff>::apply(&old, &delta), new);
}
#[cfg(feature = "delta-checkpoint")]
#[test]
fn string_diff_both_empty() {
let old = String::new();
let new = String::new();
let delta = <String as Diff>::diff(&old, &new);
assert!(delta.ops.is_empty());
assert_eq!(<String as Diff>::apply(&old, &delta), new);
}
#[cfg(feature = "delta-checkpoint")]
#[test]
fn string_diff_unicode() {
let old = "héllo wörld 🌍".to_string();
let new = "héllo rüst 🦀".to_string();
let delta = <String as Diff>::diff(&old, &new);
let reconstructed = <String as Diff>::apply(&old, &delta);
assert_eq!(reconstructed, new);
}
#[cfg(feature = "delta-checkpoint")]
#[test]
fn string_diff_multiline() {
let old = "line1\nline2\nline3".to_string();
let new = "line1\nmodified\nline3\nline4".to_string();
let delta = <String as Diff>::diff(&old, &new);
let reconstructed = <String as Diff>::apply(&old, &delta);
assert_eq!(reconstructed, new);
}
#[cfg(feature = "delta-checkpoint")]
#[test]
fn string_delta_serialization_round_trip() {
let old = "hello world".to_string();
let new = "hello rust".to_string();
let delta = <String as Diff>::diff(&old, &new);
let json = serde_json::to_string(&delta).unwrap();
let deserialized: StringDelta = serde_json::from_str(&json).unwrap();
assert_eq!(<String as Diff>::apply(&old, &deserialized), new);
}
#[test]
fn diff_trait_round_trip() {
#[derive(Clone, Debug, PartialEq)]
struct TestState(Vec<i32>);
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
struct TestDelta {
added: Vec<i32>,
removed_count: usize,
}
impl Diff for TestState {
type Delta = TestDelta;
fn diff(old: &Self, new: &Self) -> Self::Delta {
if new.0.len() >= old.0.len() {
TestDelta { added: new.0[old.0.len()..].to_vec(), removed_count: 0 }
} else {
TestDelta { added: vec![], removed_count: old.0.len() - new.0.len() }
}
}
fn apply(base: &Self, delta: &Self::Delta) -> Self {
let mut result = base.0.clone();
if delta.removed_count > 0 {
result.truncate(result.len() - delta.removed_count);
}
result.extend_from_slice(&delta.added);
TestState(result)
}
}
let s1 = TestState(vec![1, 2, 3]);
let s2 = TestState(vec![1, 2, 3, 4, 5]);
let delta = TestState::diff(&s1, &s2);
let reconstructed = TestState::apply(&s1, &delta);
assert_eq!(reconstructed, s2);
}
use crate::checkpoint::MemoryCheckpointer;
fn make_state(pairs: &[(&str, i64)]) -> State {
pairs.iter().map(|(k, v)| (k.to_string(), serde_json::json!(v))).collect()
}
#[tokio::test]
async fn delta_checkpointer_first_save_is_full() {
let inner = MemoryCheckpointer::new();
let dc = DeltaCheckpointer::new(inner, DeltaConfig { full_snapshot_interval: 5 });
let state = make_state(&[("x", 1)]);
let cp = Checkpoint::new("t1", state.clone(), 0, vec![]);
dc.save(&cp).await.unwrap();
let loaded = dc.load("t1").await.unwrap().unwrap();
assert_eq!(loaded.state, state);
assert_eq!(
loaded.metadata.get(META_CHECKPOINT_TYPE).and_then(|v| v.as_str()),
Some(TYPE_FULL)
);
}
#[tokio::test]
async fn delta_checkpointer_stores_delta_between_full_snapshots() {
let inner = MemoryCheckpointer::new();
let dc = DeltaCheckpointer::new(inner, DeltaConfig { full_snapshot_interval: 5 });
let state0 = make_state(&[("x", 1), ("y", 2)]);
let cp0 = Checkpoint::new("t1", state0, 0, vec![]);
dc.save(&cp0).await.unwrap();
let state1 = make_state(&[("x", 1), ("y", 3), ("z", 4)]);
let cp1 = Checkpoint::new("t1", state1.clone(), 1, vec![]);
dc.save(&cp1).await.unwrap();
let loaded = dc.load("t1").await.unwrap().unwrap();
assert_eq!(loaded.state, state1);
assert_eq!(loaded.step, 1);
}
#[tokio::test]
async fn delta_checkpointer_full_snapshot_at_interval() {
let inner = MemoryCheckpointer::new();
let dc = DeltaCheckpointer::new(inner, DeltaConfig { full_snapshot_interval: 3 });
for step in 0..=3 {
let state = make_state(&[("counter", step as i64)]);
let cp = Checkpoint::new("t1", state, step, vec![]);
dc.save(&cp).await.unwrap();
}
let all = dc.list("t1").await.unwrap();
let step3 = all.iter().find(|cp| cp.step == 3).unwrap();
assert_eq!(
step3.metadata.get(META_CHECKPOINT_TYPE).and_then(|v| v.as_str()),
Some(TYPE_FULL)
);
let step1 = all.iter().find(|cp| cp.step == 1).unwrap();
assert_eq!(
step1.metadata.get(META_CHECKPOINT_TYPE).and_then(|v| v.as_str()),
Some(TYPE_DELTA)
);
}
#[tokio::test]
async fn delta_checkpointer_load_by_id_reconstructs_state() {
let inner = MemoryCheckpointer::new();
let dc = DeltaCheckpointer::new(inner, DeltaConfig { full_snapshot_interval: 10 });
let state0 = make_state(&[("a", 1)]);
let cp0 = Checkpoint::new("t1", state0, 0, vec![]);
dc.save(&cp0).await.unwrap();
let state1 = make_state(&[("a", 1), ("b", 2)]);
let cp1 = Checkpoint::new("t1", state1.clone(), 1, vec![]);
let id1 = dc.save(&cp1).await.unwrap();
let state2 = make_state(&[("a", 1), ("b", 2), ("c", 3)]);
let cp2 = Checkpoint::new("t1", state2.clone(), 2, vec![]);
let id2 = dc.save(&cp2).await.unwrap();
let loaded1 = dc.load_by_id(&id1).await.unwrap().unwrap();
assert_eq!(loaded1.state, state1);
let loaded2 = dc.load_by_id(&id2).await.unwrap().unwrap();
assert_eq!(loaded2.state, state2);
}
#[tokio::test]
async fn delta_checkpointer_multiple_deltas_between_snapshots() {
let inner = MemoryCheckpointer::new();
let dc = DeltaCheckpointer::new(inner, DeltaConfig { full_snapshot_interval: 5 });
let states: Vec<State> =
(0..5).map(|i| make_state(&[("step", i as i64), ("data", i * 10)])).collect();
for (step, state) in states.iter().enumerate() {
let cp = Checkpoint::new("t1", state.clone(), step, vec![]);
dc.save(&cp).await.unwrap();
}
let loaded = dc.load("t1").await.unwrap().unwrap();
assert_eq!(loaded.state, states[4]);
assert_eq!(loaded.step, 4);
}
#[tokio::test]
async fn delta_checkpointer_delete_delegates() {
let inner = MemoryCheckpointer::new();
let dc = DeltaCheckpointer::new(inner, DeltaConfig::default());
let state = make_state(&[("x", 1)]);
let cp = Checkpoint::new("t1", state, 0, vec![]);
dc.save(&cp).await.unwrap();
dc.delete("t1").await.unwrap();
let loaded = dc.load("t1").await.unwrap();
assert!(loaded.is_none());
}
#[tokio::test]
async fn delta_checkpointer_handles_key_removal() {
let inner = MemoryCheckpointer::new();
let dc = DeltaCheckpointer::new(inner, DeltaConfig { full_snapshot_interval: 10 });
let state0 = make_state(&[("a", 1), ("b", 2), ("c", 3)]);
let cp0 = Checkpoint::new("t1", state0, 0, vec![]);
dc.save(&cp0).await.unwrap();
let state1 = make_state(&[("a", 1), ("c", 99)]);
let cp1 = Checkpoint::new("t1", state1.clone(), 1, vec![]);
dc.save(&cp1).await.unwrap();
let loaded = dc.load("t1").await.unwrap().unwrap();
assert_eq!(loaded.state, state1);
assert!(!loaded.state.contains_key("b"));
}
#[tokio::test]
async fn delta_checkpointer_reconstruction_across_full_snapshot_boundary() {
let inner = MemoryCheckpointer::new();
let dc = DeltaCheckpointer::new(inner, DeltaConfig { full_snapshot_interval: 3 });
let states: Vec<State> = (0..6).map(|i| make_state(&[("val", i * 100)])).collect();
for (step, state) in states.iter().enumerate() {
let cp = Checkpoint::new("t1", state.clone(), step, vec![]);
dc.save(&cp).await.unwrap();
}
let loaded = dc.load("t1").await.unwrap().unwrap();
assert_eq!(loaded.state, states[5]);
}
}