1use md5::{Digest, Md5};
2use serde::{Deserialize, Serialize};
3
4use rouchdb_core::adapter::Adapter;
5use rouchdb_core::document::Seq;
6use rouchdb_core::error::Result;
7
8#[derive(Debug, Clone, Serialize, Deserialize)]
10pub struct CheckpointDoc {
11 pub last_seq: Seq,
12 pub session_id: String,
13 pub version: u32,
14 pub replicator: String,
15 pub history: Vec<CheckpointHistory>,
16}
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct CheckpointHistory {
20 pub last_seq: Seq,
21 pub session_id: String,
22}
23
24pub struct Checkpointer {
26 replication_id: String,
27 session_id: String,
28}
29
30impl Checkpointer {
31 pub fn new(source_id: &str, target_id: &str) -> Self {
33 let replication_id = generate_replication_id(source_id, target_id);
34 let session_id = uuid::Uuid::new_v4().to_string();
35 Self {
36 replication_id,
37 session_id,
38 }
39 }
40
41 pub fn replication_id(&self) -> &str {
42 &self.replication_id
43 }
44
45 pub async fn read_checkpoint(
48 &self,
49 source: &dyn Adapter,
50 target: &dyn Adapter,
51 ) -> Result<Seq> {
52 let source_cp = self.read_from(source).await;
53 let target_cp = self.read_from(target).await;
54
55 match (source_cp, target_cp) {
56 (Ok(s), Ok(t)) => Ok(compare_checkpoints(&s, &t)),
57 _ => Ok(Seq::zero()), }
59 }
60
61 pub async fn write_checkpoint(
63 &self,
64 source: &dyn Adapter,
65 target: &dyn Adapter,
66 last_seq: Seq,
67 ) -> Result<()> {
68 let doc = self.build_checkpoint_doc(last_seq);
69 let json = serde_json::to_value(&doc)?;
70
71 let source_result = source.put_local(&self.replication_id, json.clone()).await;
73 let target_result = target.put_local(&self.replication_id, json).await;
74
75 match (source_result, target_result) {
77 (Err(e), Err(_)) => Err(e),
78 _ => Ok(()),
79 }
80 }
81
82 async fn read_from(&self, adapter: &dyn Adapter) -> Result<CheckpointDoc> {
83 let json = adapter.get_local(&self.replication_id).await?;
84 let doc: CheckpointDoc = serde_json::from_value(json)?;
85 Ok(doc)
86 }
87
88 fn build_checkpoint_doc(&self, last_seq: Seq) -> CheckpointDoc {
89 CheckpointDoc {
90 last_seq: last_seq.clone(),
91 session_id: self.session_id.clone(),
92 version: 1,
93 replicator: "rouchdb".into(),
94 history: vec![CheckpointHistory {
95 last_seq,
96 session_id: self.session_id.clone(),
97 }],
98 }
99 }
100}
101
102fn generate_replication_id(source_id: &str, target_id: &str) -> String {
104 let mut hasher = Md5::new();
105 hasher.update(source_id.as_bytes());
106 hasher.update(target_id.as_bytes());
107 let hash = format!("{:x}", hasher.finalize());
108 hash.replace('/', ".").replace('+', "_")
110}
111
112fn compare_checkpoints(source: &CheckpointDoc, target: &CheckpointDoc) -> Seq {
117 if source.session_id == target.session_id {
119 return if source.last_seq.as_num() <= target.last_seq.as_num() {
120 source.last_seq.clone()
121 } else {
122 target.last_seq.clone()
123 };
124 }
125
126 for sh in &source.history {
128 for th in &target.history {
129 if sh.session_id == th.session_id {
130 return if sh.last_seq.as_num() <= th.last_seq.as_num() {
131 sh.last_seq.clone()
132 } else {
133 th.last_seq.clone()
134 };
135 }
136 }
137 }
138
139 Seq::zero()
141}
142
143#[cfg(test)]
144mod tests {
145 use super::*;
146
147 #[test]
148 fn replication_id_deterministic() {
149 let id1 = generate_replication_id("source_a", "target_b");
150 let id2 = generate_replication_id("source_a", "target_b");
151 assert_eq!(id1, id2);
152
153 let id3 = generate_replication_id("source_a", "target_c");
154 assert_ne!(id1, id3);
155 }
156
157 #[test]
158 fn compare_same_session() {
159 let cp = CheckpointDoc {
160 last_seq: Seq::Num(42),
161 session_id: "sess1".into(),
162 version: 1,
163 replicator: "rouchdb".into(),
164 history: vec![],
165 };
166 assert_eq!(compare_checkpoints(&cp, &cp).as_num(), 42);
167 }
168
169 #[test]
170 fn compare_different_session_with_history() {
171 let source = CheckpointDoc {
172 last_seq: Seq::Num(50),
173 session_id: "sess2".into(),
174 version: 1,
175 replicator: "rouchdb".into(),
176 history: vec![
177 CheckpointHistory { last_seq: Seq::Num(50), session_id: "sess2".into() },
178 CheckpointHistory { last_seq: Seq::Num(30), session_id: "sess1".into() },
179 ],
180 };
181 let target = CheckpointDoc {
182 last_seq: Seq::Num(40),
183 session_id: "sess3".into(),
184 version: 1,
185 replicator: "rouchdb".into(),
186 history: vec![
187 CheckpointHistory { last_seq: Seq::Num(40), session_id: "sess3".into() },
188 CheckpointHistory { last_seq: Seq::Num(30), session_id: "sess1".into() },
189 ],
190 };
191 assert_eq!(compare_checkpoints(&source, &target).as_num(), 30);
193 }
194
195 #[test]
196 fn compare_no_common_session() {
197 let source = CheckpointDoc {
198 last_seq: Seq::Num(50),
199 session_id: "a".into(),
200 version: 1,
201 replicator: "rouchdb".into(),
202 history: vec![],
203 };
204 let target = CheckpointDoc {
205 last_seq: Seq::Num(40),
206 session_id: "b".into(),
207 version: 1,
208 replicator: "rouchdb".into(),
209 history: vec![],
210 };
211 assert_eq!(compare_checkpoints(&source, &target).as_num(), 0);
212 }
213}