drasi_lib/reactions/
checkpoint.rs1use serde::{Deserialize, Serialize};
23
24use crate::state_store::StateStoreProvider;
25
26#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
31pub struct ReactionCheckpoint {
32 pub sequence: u64,
34 pub config_hash: u64,
36}
37
38pub(crate) async fn read_checkpoint(
46 store: &dyn StateStoreProvider,
47 reaction_id: &str,
48 query_id: &str,
49) -> anyhow::Result<Option<ReactionCheckpoint>> {
50 let key = format!("checkpoint:{query_id}");
51 match store.get(reaction_id, &key).await? {
52 Some(bytes) => {
53 let cp: ReactionCheckpoint = bincode::deserialize(&bytes)
54 .map_err(|e| anyhow::anyhow!("Failed to deserialize checkpoint: {e}"))?;
55 Ok(Some(cp))
56 }
57 None => Ok(None),
58 }
59}
60
61pub(crate) async fn read_checkpoints_batch(
63 store: &dyn StateStoreProvider,
64 reaction_id: &str,
65 query_ids: &[String],
66) -> anyhow::Result<std::collections::HashMap<String, ReactionCheckpoint>> {
67 let keys: Vec<String> = query_ids
68 .iter()
69 .map(|q| format!("checkpoint:{q}"))
70 .collect();
71 let key_refs: Vec<&str> = keys.iter().map(|k| k.as_str()).collect();
72
73 let raw = store.get_many(reaction_id, &key_refs).await?;
74
75 let mut result = std::collections::HashMap::new();
76 for (key, bytes) in raw {
77 let qid = key.strip_prefix("checkpoint:").unwrap_or(&key).to_string();
78 let cp: ReactionCheckpoint = bincode::deserialize(&bytes).map_err(|e| {
79 anyhow::anyhow!("Failed to deserialize checkpoint for query '{qid}': {e}")
80 })?;
81 result.insert(qid, cp);
82 }
83 Ok(result)
84}
85
86pub(crate) async fn write_checkpoint(
88 store: &dyn StateStoreProvider,
89 reaction_id: &str,
90 query_id: &str,
91 checkpoint: &ReactionCheckpoint,
92) -> anyhow::Result<()> {
93 let key = format!("checkpoint:{query_id}");
94 let bytes = bincode::serialize(checkpoint)
95 .map_err(|e| anyhow::anyhow!("Failed to serialize checkpoint: {e}"))?;
96 store.set(reaction_id, &key, bytes).await?;
97 Ok(())
98}
99
100#[cfg(test)]
101mod tests {
102 use super::*;
103
104 #[test]
105 fn bincode_round_trip() {
106 let checkpoint = ReactionCheckpoint {
107 sequence: 42,
108 config_hash: 0xDEAD_BEEF,
109 };
110 let bytes = bincode::serialize(&checkpoint).unwrap();
111 let decoded: ReactionCheckpoint = bincode::deserialize(&bytes).unwrap();
112 assert_eq!(checkpoint, decoded);
113 }
114
115 #[test]
116 fn serde_json_round_trip() {
117 let checkpoint = ReactionCheckpoint {
118 sequence: 100,
119 config_hash: 12345,
120 };
121 let json = serde_json::to_string(&checkpoint).unwrap();
122 let decoded: ReactionCheckpoint = serde_json::from_str(&json).unwrap();
123 assert_eq!(checkpoint, decoded);
124 }
125}