1use md5::{Digest, Md5};
2use serde::{Deserialize, Serialize};
3
4use rouchdb_core::adapter::Adapter;
5use rouchdb_core::document::Seq;
6use rouchdb_core::error::Result;
7
8#[derive(Debug, Clone, Serialize, Deserialize)]
10pub struct CheckpointDoc {
11 pub last_seq: Seq,
12 pub session_id: String,
13 pub version: u32,
14 pub replicator: String,
15 pub history: Vec<CheckpointHistory>,
16}
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct CheckpointHistory {
20 pub last_seq: Seq,
21 pub session_id: String,
22}
23
24pub struct Checkpointer {
26 replication_id: String,
27 session_id: String,
28}
29
30impl Checkpointer {
31 pub fn new(source_id: &str, target_id: &str) -> Self {
33 let replication_id = generate_replication_id(source_id, target_id);
34 let session_id = uuid::Uuid::new_v4().to_string();
35 Self {
36 replication_id,
37 session_id,
38 }
39 }
40
41 pub fn replication_id(&self) -> &str {
42 &self.replication_id
43 }
44
45 pub async fn read_checkpoint(&self, source: &dyn Adapter, target: &dyn Adapter) -> Result<Seq> {
48 let source_cp = self.read_from(source).await;
49 let target_cp = self.read_from(target).await;
50
51 match (source_cp, target_cp) {
52 (Ok(s), Ok(t)) => Ok(compare_checkpoints(&s, &t)),
53 _ => Ok(Seq::zero()), }
55 }
56
57 pub async fn write_checkpoint(
59 &self,
60 source: &dyn Adapter,
61 target: &dyn Adapter,
62 last_seq: Seq,
63 ) -> Result<()> {
64 let doc = self.build_checkpoint_doc(last_seq);
65 let json = serde_json::to_value(&doc)?;
66
67 let source_result = source.put_local(&self.replication_id, json.clone()).await;
69 let target_result = target.put_local(&self.replication_id, json).await;
70
71 match (source_result, target_result) {
72 (Ok(()), Ok(())) => Ok(()),
73 (Err(e), _) | (_, Err(e)) => Err(e),
74 }
75 }
76
77 async fn read_from(&self, adapter: &dyn Adapter) -> Result<CheckpointDoc> {
78 let json = adapter.get_local(&self.replication_id).await?;
79 let doc: CheckpointDoc = serde_json::from_value(json)?;
80 Ok(doc)
81 }
82
83 fn build_checkpoint_doc(&self, last_seq: Seq) -> CheckpointDoc {
84 CheckpointDoc {
85 last_seq: last_seq.clone(),
86 session_id: self.session_id.clone(),
87 version: 1,
88 replicator: "rouchdb".into(),
89 history: vec![CheckpointHistory {
90 last_seq,
91 session_id: self.session_id.clone(),
92 }],
93 }
94 }
95}
96
97fn generate_replication_id(source_id: &str, target_id: &str) -> String {
99 let mut hasher = Md5::new();
100 hasher.update(source_id.as_bytes());
101 hasher.update(target_id.as_bytes());
102 let hash = format!("{:x}", hasher.finalize());
103 hash.replace('/', ".").replace('+', "_")
105}
106
107fn compare_checkpoints(source: &CheckpointDoc, target: &CheckpointDoc) -> Seq {
112 if source.session_id == target.session_id {
114 return if source.last_seq.as_num() <= target.last_seq.as_num() {
115 source.last_seq.clone()
116 } else {
117 target.last_seq.clone()
118 };
119 }
120
121 for sh in &source.history {
123 for th in &target.history {
124 if sh.session_id == th.session_id {
125 return if sh.last_seq.as_num() <= th.last_seq.as_num() {
126 sh.last_seq.clone()
127 } else {
128 th.last_seq.clone()
129 };
130 }
131 }
132 }
133
134 Seq::zero()
136}
137
138#[cfg(test)]
139mod tests {
140 use super::*;
141
142 #[test]
143 fn replication_id_deterministic() {
144 let id1 = generate_replication_id("source_a", "target_b");
145 let id2 = generate_replication_id("source_a", "target_b");
146 assert_eq!(id1, id2);
147
148 let id3 = generate_replication_id("source_a", "target_c");
149 assert_ne!(id1, id3);
150 }
151
152 #[test]
153 fn compare_same_session() {
154 let cp = CheckpointDoc {
155 last_seq: Seq::Num(42),
156 session_id: "sess1".into(),
157 version: 1,
158 replicator: "rouchdb".into(),
159 history: vec![],
160 };
161 assert_eq!(compare_checkpoints(&cp, &cp).as_num(), 42);
162 }
163
164 #[test]
165 fn compare_different_session_with_history() {
166 let source = CheckpointDoc {
167 last_seq: Seq::Num(50),
168 session_id: "sess2".into(),
169 version: 1,
170 replicator: "rouchdb".into(),
171 history: vec![
172 CheckpointHistory {
173 last_seq: Seq::Num(50),
174 session_id: "sess2".into(),
175 },
176 CheckpointHistory {
177 last_seq: Seq::Num(30),
178 session_id: "sess1".into(),
179 },
180 ],
181 };
182 let target = CheckpointDoc {
183 last_seq: Seq::Num(40),
184 session_id: "sess3".into(),
185 version: 1,
186 replicator: "rouchdb".into(),
187 history: vec![
188 CheckpointHistory {
189 last_seq: Seq::Num(40),
190 session_id: "sess3".into(),
191 },
192 CheckpointHistory {
193 last_seq: Seq::Num(30),
194 session_id: "sess1".into(),
195 },
196 ],
197 };
198 assert_eq!(compare_checkpoints(&source, &target).as_num(), 30);
200 }
201
202 #[test]
203 fn compare_no_common_session() {
204 let source = CheckpointDoc {
205 last_seq: Seq::Num(50),
206 session_id: "a".into(),
207 version: 1,
208 replicator: "rouchdb".into(),
209 history: vec![],
210 };
211 let target = CheckpointDoc {
212 last_seq: Seq::Num(40),
213 session_id: "b".into(),
214 version: 1,
215 replicator: "rouchdb".into(),
216 history: vec![],
217 };
218 assert_eq!(compare_checkpoints(&source, &target).as_num(), 0);
219 }
220}