Skip to main content

rouchdb_replication/
checkpoint.rs

1use md5::{Digest, Md5};
2use serde::{Deserialize, Serialize};
3
4use rouchdb_core::adapter::Adapter;
5use rouchdb_core::document::Seq;
6use rouchdb_core::error::Result;
7
8/// A checkpoint document stored as `_local/{replication_id}`.
9#[derive(Debug, Clone, Serialize, Deserialize)]
10pub struct CheckpointDoc {
11    pub last_seq: Seq,
12    pub session_id: String,
13    pub version: u32,
14    pub replicator: String,
15    pub history: Vec<CheckpointHistory>,
16}
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct CheckpointHistory {
20    pub last_seq: Seq,
21    pub session_id: String,
22}
23
24/// Manages checkpoints for a replication session.
25pub struct Checkpointer {
26    replication_id: String,
27    session_id: String,
28}
29
30impl Checkpointer {
31    /// Create a new checkpointer for a replication between source and target.
32    pub fn new(source_id: &str, target_id: &str) -> Self {
33        let replication_id = generate_replication_id(source_id, target_id);
34        let session_id = uuid::Uuid::new_v4().to_string();
35        Self {
36            replication_id,
37            session_id,
38        }
39    }
40
41    pub fn replication_id(&self) -> &str {
42        &self.replication_id
43    }
44
45    /// Read the checkpoint from both source and target, and find the last
46    /// common sequence number.
47    pub async fn read_checkpoint(&self, source: &dyn Adapter, target: &dyn Adapter) -> Result<Seq> {
48        let source_cp = self.read_from(source).await;
49        let target_cp = self.read_from(target).await;
50
51        match (source_cp, target_cp) {
52            (Ok(s), Ok(t)) => Ok(compare_checkpoints(&s, &t)),
53            _ => Ok(Seq::zero()), // No checkpoint found, start from beginning
54        }
55    }
56
57    /// Write the checkpoint to both source and target.
58    pub async fn write_checkpoint(
59        &self,
60        source: &dyn Adapter,
61        target: &dyn Adapter,
62        last_seq: Seq,
63    ) -> Result<()> {
64        let doc = self.build_checkpoint_doc(last_seq);
65        let json = serde_json::to_value(&doc)?;
66
67        // Write to both, but don't fail if one side fails
68        let source_result = source.put_local(&self.replication_id, json.clone()).await;
69        let target_result = target.put_local(&self.replication_id, json).await;
70
71        // If both fail, return the error
72        match (source_result, target_result) {
73            (Err(e), Err(_)) => Err(e),
74            _ => Ok(()),
75        }
76    }
77
78    async fn read_from(&self, adapter: &dyn Adapter) -> Result<CheckpointDoc> {
79        let json = adapter.get_local(&self.replication_id).await?;
80        let doc: CheckpointDoc = serde_json::from_value(json)?;
81        Ok(doc)
82    }
83
84    fn build_checkpoint_doc(&self, last_seq: Seq) -> CheckpointDoc {
85        CheckpointDoc {
86            last_seq: last_seq.clone(),
87            session_id: self.session_id.clone(),
88            version: 1,
89            replicator: "rouchdb".into(),
90            history: vec![CheckpointHistory {
91                last_seq,
92                session_id: self.session_id.clone(),
93            }],
94        }
95    }
96}
97
98/// Generate a deterministic replication ID from source and target identifiers.
99fn generate_replication_id(source_id: &str, target_id: &str) -> String {
100    let mut hasher = Md5::new();
101    hasher.update(source_id.as_bytes());
102    hasher.update(target_id.as_bytes());
103    let hash = format!("{:x}", hasher.finalize());
104    // Replace chars that are special in CouchDB URLs
105    hash.replace('/', ".").replace('+', "_")
106}
107
108/// Compare source and target checkpoints to find the last common sequence.
109///
110/// Returns the original `Seq` value (preserving opaque strings from CouchDB)
111/// rather than converting to numeric, so it can be passed back as `since`.
112fn compare_checkpoints(source: &CheckpointDoc, target: &CheckpointDoc) -> Seq {
113    // If sessions match, use the sequence directly
114    if source.session_id == target.session_id {
115        return if source.last_seq.as_num() <= target.last_seq.as_num() {
116            source.last_seq.clone()
117        } else {
118            target.last_seq.clone()
119        };
120    }
121
122    // Walk through histories to find a common session
123    for sh in &source.history {
124        for th in &target.history {
125            if sh.session_id == th.session_id {
126                return if sh.last_seq.as_num() <= th.last_seq.as_num() {
127                    sh.last_seq.clone()
128                } else {
129                    th.last_seq.clone()
130                };
131            }
132        }
133    }
134
135    // No common point found, start from beginning
136    Seq::zero()
137}
138
139#[cfg(test)]
140mod tests {
141    use super::*;
142
143    #[test]
144    fn replication_id_deterministic() {
145        let id1 = generate_replication_id("source_a", "target_b");
146        let id2 = generate_replication_id("source_a", "target_b");
147        assert_eq!(id1, id2);
148
149        let id3 = generate_replication_id("source_a", "target_c");
150        assert_ne!(id1, id3);
151    }
152
153    #[test]
154    fn compare_same_session() {
155        let cp = CheckpointDoc {
156            last_seq: Seq::Num(42),
157            session_id: "sess1".into(),
158            version: 1,
159            replicator: "rouchdb".into(),
160            history: vec![],
161        };
162        assert_eq!(compare_checkpoints(&cp, &cp).as_num(), 42);
163    }
164
165    #[test]
166    fn compare_different_session_with_history() {
167        let source = CheckpointDoc {
168            last_seq: Seq::Num(50),
169            session_id: "sess2".into(),
170            version: 1,
171            replicator: "rouchdb".into(),
172            history: vec![
173                CheckpointHistory {
174                    last_seq: Seq::Num(50),
175                    session_id: "sess2".into(),
176                },
177                CheckpointHistory {
178                    last_seq: Seq::Num(30),
179                    session_id: "sess1".into(),
180                },
181            ],
182        };
183        let target = CheckpointDoc {
184            last_seq: Seq::Num(40),
185            session_id: "sess3".into(),
186            version: 1,
187            replicator: "rouchdb".into(),
188            history: vec![
189                CheckpointHistory {
190                    last_seq: Seq::Num(40),
191                    session_id: "sess3".into(),
192                },
193                CheckpointHistory {
194                    last_seq: Seq::Num(30),
195                    session_id: "sess1".into(),
196                },
197            ],
198        };
199        // Common session "sess1" at seq 30
200        assert_eq!(compare_checkpoints(&source, &target).as_num(), 30);
201    }
202
203    #[test]
204    fn compare_no_common_session() {
205        let source = CheckpointDoc {
206            last_seq: Seq::Num(50),
207            session_id: "a".into(),
208            version: 1,
209            replicator: "rouchdb".into(),
210            history: vec![],
211        };
212        let target = CheckpointDoc {
213            last_seq: Seq::Num(40),
214            session_id: "b".into(),
215            version: 1,
216            replicator: "rouchdb".into(),
217            history: vec![],
218        };
219        assert_eq!(compare_checkpoints(&source, &target).as_num(), 0);
220    }
221}