Skip to main content

rouchdb_replication/
checkpoint.rs

1use md5::{Digest, Md5};
2use serde::{Deserialize, Serialize};
3
4use rouchdb_core::adapter::Adapter;
5use rouchdb_core::document::Seq;
6use rouchdb_core::error::Result;
7
8/// A checkpoint document stored as `_local/{replication_id}`.
9#[derive(Debug, Clone, Serialize, Deserialize)]
10pub struct CheckpointDoc {
11    pub last_seq: Seq,
12    pub session_id: String,
13    pub version: u32,
14    pub replicator: String,
15    pub history: Vec<CheckpointHistory>,
16}
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct CheckpointHistory {
20    pub last_seq: Seq,
21    pub session_id: String,
22}
23
24/// Manages checkpoints for a replication session.
25pub struct Checkpointer {
26    replication_id: String,
27    session_id: String,
28}
29
30impl Checkpointer {
31    /// Create a new checkpointer for a replication between source and target.
32    pub fn new(source_id: &str, target_id: &str) -> Self {
33        let replication_id = generate_replication_id(source_id, target_id);
34        let session_id = uuid::Uuid::new_v4().to_string();
35        Self {
36            replication_id,
37            session_id,
38        }
39    }
40
41    pub fn replication_id(&self) -> &str {
42        &self.replication_id
43    }
44
45    /// Read the checkpoint from both source and target, and find the last
46    /// common sequence number.
47    pub async fn read_checkpoint(
48        &self,
49        source: &dyn Adapter,
50        target: &dyn Adapter,
51    ) -> Result<Seq> {
52        let source_cp = self.read_from(source).await;
53        let target_cp = self.read_from(target).await;
54
55        match (source_cp, target_cp) {
56            (Ok(s), Ok(t)) => Ok(compare_checkpoints(&s, &t)),
57            _ => Ok(Seq::zero()), // No checkpoint found, start from beginning
58        }
59    }
60
61    /// Write the checkpoint to both source and target.
62    pub async fn write_checkpoint(
63        &self,
64        source: &dyn Adapter,
65        target: &dyn Adapter,
66        last_seq: Seq,
67    ) -> Result<()> {
68        let doc = self.build_checkpoint_doc(last_seq);
69        let json = serde_json::to_value(&doc)?;
70
71        // Write to both, but don't fail if one side fails
72        let source_result = source.put_local(&self.replication_id, json.clone()).await;
73        let target_result = target.put_local(&self.replication_id, json).await;
74
75        // If both fail, return the error
76        match (source_result, target_result) {
77            (Err(e), Err(_)) => Err(e),
78            _ => Ok(()),
79        }
80    }
81
82    async fn read_from(&self, adapter: &dyn Adapter) -> Result<CheckpointDoc> {
83        let json = adapter.get_local(&self.replication_id).await?;
84        let doc: CheckpointDoc = serde_json::from_value(json)?;
85        Ok(doc)
86    }
87
88    fn build_checkpoint_doc(&self, last_seq: Seq) -> CheckpointDoc {
89        CheckpointDoc {
90            last_seq: last_seq.clone(),
91            session_id: self.session_id.clone(),
92            version: 1,
93            replicator: "rouchdb".into(),
94            history: vec![CheckpointHistory {
95                last_seq,
96                session_id: self.session_id.clone(),
97            }],
98        }
99    }
100}
101
102/// Generate a deterministic replication ID from source and target identifiers.
103fn generate_replication_id(source_id: &str, target_id: &str) -> String {
104    let mut hasher = Md5::new();
105    hasher.update(source_id.as_bytes());
106    hasher.update(target_id.as_bytes());
107    let hash = format!("{:x}", hasher.finalize());
108    // Replace chars that are special in CouchDB URLs
109    hash.replace('/', ".").replace('+', "_")
110}
111
112/// Compare source and target checkpoints to find the last common sequence.
113///
114/// Returns the original `Seq` value (preserving opaque strings from CouchDB)
115/// rather than converting to numeric, so it can be passed back as `since`.
116fn compare_checkpoints(source: &CheckpointDoc, target: &CheckpointDoc) -> Seq {
117    // If sessions match, use the sequence directly
118    if source.session_id == target.session_id {
119        return if source.last_seq.as_num() <= target.last_seq.as_num() {
120            source.last_seq.clone()
121        } else {
122            target.last_seq.clone()
123        };
124    }
125
126    // Walk through histories to find a common session
127    for sh in &source.history {
128        for th in &target.history {
129            if sh.session_id == th.session_id {
130                return if sh.last_seq.as_num() <= th.last_seq.as_num() {
131                    sh.last_seq.clone()
132                } else {
133                    th.last_seq.clone()
134                };
135            }
136        }
137    }
138
139    // No common point found, start from beginning
140    Seq::zero()
141}
142
143#[cfg(test)]
144mod tests {
145    use super::*;
146
147    #[test]
148    fn replication_id_deterministic() {
149        let id1 = generate_replication_id("source_a", "target_b");
150        let id2 = generate_replication_id("source_a", "target_b");
151        assert_eq!(id1, id2);
152
153        let id3 = generate_replication_id("source_a", "target_c");
154        assert_ne!(id1, id3);
155    }
156
157    #[test]
158    fn compare_same_session() {
159        let cp = CheckpointDoc {
160            last_seq: Seq::Num(42),
161            session_id: "sess1".into(),
162            version: 1,
163            replicator: "rouchdb".into(),
164            history: vec![],
165        };
166        assert_eq!(compare_checkpoints(&cp, &cp).as_num(), 42);
167    }
168
169    #[test]
170    fn compare_different_session_with_history() {
171        let source = CheckpointDoc {
172            last_seq: Seq::Num(50),
173            session_id: "sess2".into(),
174            version: 1,
175            replicator: "rouchdb".into(),
176            history: vec![
177                CheckpointHistory { last_seq: Seq::Num(50), session_id: "sess2".into() },
178                CheckpointHistory { last_seq: Seq::Num(30), session_id: "sess1".into() },
179            ],
180        };
181        let target = CheckpointDoc {
182            last_seq: Seq::Num(40),
183            session_id: "sess3".into(),
184            version: 1,
185            replicator: "rouchdb".into(),
186            history: vec![
187                CheckpointHistory { last_seq: Seq::Num(40), session_id: "sess3".into() },
188                CheckpointHistory { last_seq: Seq::Num(30), session_id: "sess1".into() },
189            ],
190        };
191        // Common session "sess1" at seq 30
192        assert_eq!(compare_checkpoints(&source, &target).as_num(), 30);
193    }
194
195    #[test]
196    fn compare_no_common_session() {
197        let source = CheckpointDoc {
198            last_seq: Seq::Num(50),
199            session_id: "a".into(),
200            version: 1,
201            replicator: "rouchdb".into(),
202            history: vec![],
203        };
204        let target = CheckpointDoc {
205            last_seq: Seq::Num(40),
206            session_id: "b".into(),
207            version: 1,
208            replicator: "rouchdb".into(),
209            history: vec![],
210        };
211        assert_eq!(compare_checkpoints(&source, &target).as_num(), 0);
212    }
213}