1use md5::{Digest, Md5};
2use serde::{Deserialize, Serialize};
3
4use rouchdb_core::adapter::Adapter;
5use rouchdb_core::document::Seq;
6use rouchdb_core::error::Result;
7
8#[derive(Debug, Clone, Serialize, Deserialize)]
10pub struct CheckpointDoc {
11 pub last_seq: Seq,
12 pub session_id: String,
13 pub version: u32,
14 pub replicator: String,
15 pub history: Vec<CheckpointHistory>,
16}
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct CheckpointHistory {
20 pub last_seq: Seq,
21 pub session_id: String,
22}
23
24pub struct Checkpointer {
26 replication_id: String,
27 session_id: String,
28}
29
30impl Checkpointer {
31 pub fn new(source_id: &str, target_id: &str) -> Self {
33 let replication_id = generate_replication_id(source_id, target_id);
34 let session_id = uuid::Uuid::new_v4().to_string();
35 Self {
36 replication_id,
37 session_id,
38 }
39 }
40
41 pub fn replication_id(&self) -> &str {
42 &self.replication_id
43 }
44
45 pub async fn read_checkpoint(&self, source: &dyn Adapter, target: &dyn Adapter) -> Result<Seq> {
48 let source_cp = self.read_from(source).await;
49 let target_cp = self.read_from(target).await;
50
51 match (source_cp, target_cp) {
52 (Ok(s), Ok(t)) => Ok(compare_checkpoints(&s, &t)),
53 _ => Ok(Seq::zero()), }
55 }
56
57 pub async fn write_checkpoint(
59 &self,
60 source: &dyn Adapter,
61 target: &dyn Adapter,
62 last_seq: Seq,
63 ) -> Result<()> {
64 let doc = self.build_checkpoint_doc(last_seq);
65 let json = serde_json::to_value(&doc)?;
66
67 let source_result = source.put_local(&self.replication_id, json.clone()).await;
69 let target_result = target.put_local(&self.replication_id, json).await;
70
71 match (source_result, target_result) {
73 (Err(e), Err(_)) => Err(e),
74 _ => Ok(()),
75 }
76 }
77
78 async fn read_from(&self, adapter: &dyn Adapter) -> Result<CheckpointDoc> {
79 let json = adapter.get_local(&self.replication_id).await?;
80 let doc: CheckpointDoc = serde_json::from_value(json)?;
81 Ok(doc)
82 }
83
84 fn build_checkpoint_doc(&self, last_seq: Seq) -> CheckpointDoc {
85 CheckpointDoc {
86 last_seq: last_seq.clone(),
87 session_id: self.session_id.clone(),
88 version: 1,
89 replicator: "rouchdb".into(),
90 history: vec![CheckpointHistory {
91 last_seq,
92 session_id: self.session_id.clone(),
93 }],
94 }
95 }
96}
97
98fn generate_replication_id(source_id: &str, target_id: &str) -> String {
100 let mut hasher = Md5::new();
101 hasher.update(source_id.as_bytes());
102 hasher.update(target_id.as_bytes());
103 let hash = format!("{:x}", hasher.finalize());
104 hash.replace('/', ".").replace('+', "_")
106}
107
108fn compare_checkpoints(source: &CheckpointDoc, target: &CheckpointDoc) -> Seq {
113 if source.session_id == target.session_id {
115 return if source.last_seq.as_num() <= target.last_seq.as_num() {
116 source.last_seq.clone()
117 } else {
118 target.last_seq.clone()
119 };
120 }
121
122 for sh in &source.history {
124 for th in &target.history {
125 if sh.session_id == th.session_id {
126 return if sh.last_seq.as_num() <= th.last_seq.as_num() {
127 sh.last_seq.clone()
128 } else {
129 th.last_seq.clone()
130 };
131 }
132 }
133 }
134
135 Seq::zero()
137}
138
139#[cfg(test)]
140mod tests {
141 use super::*;
142
143 #[test]
144 fn replication_id_deterministic() {
145 let id1 = generate_replication_id("source_a", "target_b");
146 let id2 = generate_replication_id("source_a", "target_b");
147 assert_eq!(id1, id2);
148
149 let id3 = generate_replication_id("source_a", "target_c");
150 assert_ne!(id1, id3);
151 }
152
153 #[test]
154 fn compare_same_session() {
155 let cp = CheckpointDoc {
156 last_seq: Seq::Num(42),
157 session_id: "sess1".into(),
158 version: 1,
159 replicator: "rouchdb".into(),
160 history: vec![],
161 };
162 assert_eq!(compare_checkpoints(&cp, &cp).as_num(), 42);
163 }
164
165 #[test]
166 fn compare_different_session_with_history() {
167 let source = CheckpointDoc {
168 last_seq: Seq::Num(50),
169 session_id: "sess2".into(),
170 version: 1,
171 replicator: "rouchdb".into(),
172 history: vec![
173 CheckpointHistory {
174 last_seq: Seq::Num(50),
175 session_id: "sess2".into(),
176 },
177 CheckpointHistory {
178 last_seq: Seq::Num(30),
179 session_id: "sess1".into(),
180 },
181 ],
182 };
183 let target = CheckpointDoc {
184 last_seq: Seq::Num(40),
185 session_id: "sess3".into(),
186 version: 1,
187 replicator: "rouchdb".into(),
188 history: vec![
189 CheckpointHistory {
190 last_seq: Seq::Num(40),
191 session_id: "sess3".into(),
192 },
193 CheckpointHistory {
194 last_seq: Seq::Num(30),
195 session_id: "sess1".into(),
196 },
197 ],
198 };
199 assert_eq!(compare_checkpoints(&source, &target).as_num(), 30);
201 }
202
203 #[test]
204 fn compare_no_common_session() {
205 let source = CheckpointDoc {
206 last_seq: Seq::Num(50),
207 session_id: "a".into(),
208 version: 1,
209 replicator: "rouchdb".into(),
210 history: vec![],
211 };
212 let target = CheckpointDoc {
213 last_seq: Seq::Num(40),
214 session_id: "b".into(),
215 version: 1,
216 replicator: "rouchdb".into(),
217 history: vec![],
218 };
219 assert_eq!(compare_checkpoints(&source, &target).as_num(), 0);
220 }
221}