Skip to main content

drasi_lib/reactions/
checkpoint.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Checkpoint type for durable reactions.
16//!
17//! A [`ReactionCheckpoint`] records the last successfully processed sequence
18//! number and the query configuration hash at that point. Reactions persist
19//! checkpoints via the state store so they can resume from where they left off
20//! after a restart.
21
22use serde::{Deserialize, Serialize};
23
24use crate::state_store::StateStoreProvider;
25
26/// Persisted progress marker for a reaction's subscription to a single query.
27///
28/// Stored in the state store under the key `"checkpoint:{query_id}"` using
29/// `bincode` serialization for compactness.
30#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
31pub struct ReactionCheckpoint {
32    /// The last outbox sequence number that was successfully processed.
33    pub sequence: u64,
34    /// The query config hash at the time this checkpoint was written.
35    pub config_hash: u64,
36}
37
38// ============================================================================
39// Shared checkpoint I/O helpers
40// ============================================================================
41
42/// Read a single checkpoint from the state store.
43///
44/// Returns `Ok(None)` if the key does not exist.
45pub(crate) async fn read_checkpoint(
46    store: &dyn StateStoreProvider,
47    reaction_id: &str,
48    query_id: &str,
49) -> anyhow::Result<Option<ReactionCheckpoint>> {
50    let key = format!("checkpoint:{query_id}");
51    match store.get(reaction_id, &key).await? {
52        Some(bytes) => {
53            let cp: ReactionCheckpoint = bincode::deserialize(&bytes)
54                .map_err(|e| anyhow::anyhow!("Failed to deserialize checkpoint: {e}"))?;
55            Ok(Some(cp))
56        }
57        None => Ok(None),
58    }
59}
60
61/// Read checkpoints for all given query IDs in a single batch.
62pub(crate) async fn read_checkpoints_batch(
63    store: &dyn StateStoreProvider,
64    reaction_id: &str,
65    query_ids: &[String],
66) -> anyhow::Result<std::collections::HashMap<String, ReactionCheckpoint>> {
67    let keys: Vec<String> = query_ids
68        .iter()
69        .map(|q| format!("checkpoint:{q}"))
70        .collect();
71    let key_refs: Vec<&str> = keys.iter().map(|k| k.as_str()).collect();
72
73    let raw = store.get_many(reaction_id, &key_refs).await?;
74
75    let mut result = std::collections::HashMap::new();
76    for (key, bytes) in raw {
77        let qid = key.strip_prefix("checkpoint:").unwrap_or(&key).to_string();
78        let cp: ReactionCheckpoint = bincode::deserialize(&bytes).map_err(|e| {
79            anyhow::anyhow!("Failed to deserialize checkpoint for query '{qid}': {e}")
80        })?;
81        result.insert(qid, cp);
82    }
83    Ok(result)
84}
85
86/// Write a single checkpoint to the state store.
87pub(crate) async fn write_checkpoint(
88    store: &dyn StateStoreProvider,
89    reaction_id: &str,
90    query_id: &str,
91    checkpoint: &ReactionCheckpoint,
92) -> anyhow::Result<()> {
93    let key = format!("checkpoint:{query_id}");
94    let bytes = bincode::serialize(checkpoint)
95        .map_err(|e| anyhow::anyhow!("Failed to serialize checkpoint: {e}"))?;
96    store.set(reaction_id, &key, bytes).await?;
97    Ok(())
98}
99
100#[cfg(test)]
101mod tests {
102    use super::*;
103
104    #[test]
105    fn bincode_round_trip() {
106        let checkpoint = ReactionCheckpoint {
107            sequence: 42,
108            config_hash: 0xDEAD_BEEF,
109        };
110        let bytes = bincode::serialize(&checkpoint).unwrap();
111        let decoded: ReactionCheckpoint = bincode::deserialize(&bytes).unwrap();
112        assert_eq!(checkpoint, decoded);
113    }
114
115    #[test]
116    fn serde_json_round_trip() {
117        let checkpoint = ReactionCheckpoint {
118            sequence: 100,
119            config_hash: 12345,
120        };
121        let json = serde_json::to_string(&checkpoint).unwrap();
122        let decoded: ReactionCheckpoint = serde_json::from_str(&json).unwrap();
123        assert_eq!(checkpoint, decoded);
124    }
125}