Skip to main content

rouchdb_replication/
checkpoint.rs

1use md5::{Digest, Md5};
2use serde::{Deserialize, Serialize};
3
4use rouchdb_core::adapter::Adapter;
5use rouchdb_core::document::Seq;
6use rouchdb_core::error::Result;
7
8/// A checkpoint document stored as `_local/{replication_id}`.
9#[derive(Debug, Clone, Serialize, Deserialize)]
10pub struct CheckpointDoc {
11    pub last_seq: Seq,
12    pub session_id: String,
13    pub version: u32,
14    pub replicator: String,
15    pub history: Vec<CheckpointHistory>,
16}
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct CheckpointHistory {
20    pub last_seq: Seq,
21    pub session_id: String,
22}
23
24/// Manages checkpoints for a replication session.
25pub struct Checkpointer {
26    replication_id: String,
27    session_id: String,
28}
29
30impl Checkpointer {
31    /// Create a new checkpointer for a replication between source and target.
32    pub fn new(source_id: &str, target_id: &str) -> Self {
33        let replication_id = generate_replication_id(source_id, target_id);
34        let session_id = uuid::Uuid::new_v4().to_string();
35        Self {
36            replication_id,
37            session_id,
38        }
39    }
40
41    pub fn replication_id(&self) -> &str {
42        &self.replication_id
43    }
44
45    /// Read the checkpoint from both source and target, and find the last
46    /// common sequence number.
47    pub async fn read_checkpoint(&self, source: &dyn Adapter, target: &dyn Adapter) -> Result<Seq> {
48        let source_cp = self.read_from(source).await;
49        let target_cp = self.read_from(target).await;
50
51        match (source_cp, target_cp) {
52            (Ok(s), Ok(t)) => Ok(compare_checkpoints(&s, &t)),
53            _ => Ok(Seq::zero()), // No checkpoint found, start from beginning
54        }
55    }
56
57    /// Write the checkpoint to both source and target.
58    pub async fn write_checkpoint(
59        &self,
60        source: &dyn Adapter,
61        target: &dyn Adapter,
62        last_seq: Seq,
63    ) -> Result<()> {
64        let doc = self.build_checkpoint_doc(last_seq);
65        let json = serde_json::to_value(&doc)?;
66
67        // Write to both sides — fail if either side fails to keep them in sync
68        let source_result = source.put_local(&self.replication_id, json.clone()).await;
69        let target_result = target.put_local(&self.replication_id, json).await;
70
71        match (source_result, target_result) {
72            (Ok(()), Ok(())) => Ok(()),
73            (Err(e), _) | (_, Err(e)) => Err(e),
74        }
75    }
76
77    async fn read_from(&self, adapter: &dyn Adapter) -> Result<CheckpointDoc> {
78        let json = adapter.get_local(&self.replication_id).await?;
79        let doc: CheckpointDoc = serde_json::from_value(json)?;
80        Ok(doc)
81    }
82
83    fn build_checkpoint_doc(&self, last_seq: Seq) -> CheckpointDoc {
84        CheckpointDoc {
85            last_seq: last_seq.clone(),
86            session_id: self.session_id.clone(),
87            version: 1,
88            replicator: "rouchdb".into(),
89            history: vec![CheckpointHistory {
90                last_seq,
91                session_id: self.session_id.clone(),
92            }],
93        }
94    }
95}
96
97/// Generate a deterministic replication ID from source and target identifiers.
98fn generate_replication_id(source_id: &str, target_id: &str) -> String {
99    let mut hasher = Md5::new();
100    hasher.update(source_id.as_bytes());
101    hasher.update(target_id.as_bytes());
102    let hash = format!("{:x}", hasher.finalize());
103    // Replace chars that are special in CouchDB URLs
104    hash.replace('/', ".").replace('+', "_")
105}
106
107/// Compare source and target checkpoints to find the last common sequence.
108///
109/// Returns the original `Seq` value (preserving opaque strings from CouchDB)
110/// rather than converting to numeric, so it can be passed back as `since`.
111fn compare_checkpoints(source: &CheckpointDoc, target: &CheckpointDoc) -> Seq {
112    // If sessions match, use the sequence directly
113    if source.session_id == target.session_id {
114        return if source.last_seq.as_num() <= target.last_seq.as_num() {
115            source.last_seq.clone()
116        } else {
117            target.last_seq.clone()
118        };
119    }
120
121    // Walk through histories to find a common session
122    for sh in &source.history {
123        for th in &target.history {
124            if sh.session_id == th.session_id {
125                return if sh.last_seq.as_num() <= th.last_seq.as_num() {
126                    sh.last_seq.clone()
127                } else {
128                    th.last_seq.clone()
129                };
130            }
131        }
132    }
133
134    // No common point found, start from beginning
135    Seq::zero()
136}
137
138#[cfg(test)]
139mod tests {
140    use super::*;
141
142    #[test]
143    fn replication_id_deterministic() {
144        let id1 = generate_replication_id("source_a", "target_b");
145        let id2 = generate_replication_id("source_a", "target_b");
146        assert_eq!(id1, id2);
147
148        let id3 = generate_replication_id("source_a", "target_c");
149        assert_ne!(id1, id3);
150    }
151
152    #[test]
153    fn compare_same_session() {
154        let cp = CheckpointDoc {
155            last_seq: Seq::Num(42),
156            session_id: "sess1".into(),
157            version: 1,
158            replicator: "rouchdb".into(),
159            history: vec![],
160        };
161        assert_eq!(compare_checkpoints(&cp, &cp).as_num(), 42);
162    }
163
164    #[test]
165    fn compare_different_session_with_history() {
166        let source = CheckpointDoc {
167            last_seq: Seq::Num(50),
168            session_id: "sess2".into(),
169            version: 1,
170            replicator: "rouchdb".into(),
171            history: vec![
172                CheckpointHistory {
173                    last_seq: Seq::Num(50),
174                    session_id: "sess2".into(),
175                },
176                CheckpointHistory {
177                    last_seq: Seq::Num(30),
178                    session_id: "sess1".into(),
179                },
180            ],
181        };
182        let target = CheckpointDoc {
183            last_seq: Seq::Num(40),
184            session_id: "sess3".into(),
185            version: 1,
186            replicator: "rouchdb".into(),
187            history: vec![
188                CheckpointHistory {
189                    last_seq: Seq::Num(40),
190                    session_id: "sess3".into(),
191                },
192                CheckpointHistory {
193                    last_seq: Seq::Num(30),
194                    session_id: "sess1".into(),
195                },
196            ],
197        };
198        // Common session "sess1" at seq 30
199        assert_eq!(compare_checkpoints(&source, &target).as_num(), 30);
200    }
201
202    #[test]
203    fn compare_no_common_session() {
204        let source = CheckpointDoc {
205            last_seq: Seq::Num(50),
206            session_id: "a".into(),
207            version: 1,
208            replicator: "rouchdb".into(),
209            history: vec![],
210        };
211        let target = CheckpointDoc {
212            last_seq: Seq::Num(40),
213            session_id: "b".into(),
214            version: 1,
215            replicator: "rouchdb".into(),
216            history: vec![],
217        };
218        assert_eq!(compare_checkpoints(&source, &target).as_num(), 0);
219    }
220}