1use std::path::{Path, PathBuf};
14use std::sync::{Arc, Mutex};
15use std::time::{Duration, SystemTime, UNIX_EPOCH};
16
17use edgestore::{Engine, ImportResult, RemoteStore};
18use edgestore::replication::ReplicationProtocol;
19
20use crate::http_client::HttpReplicationClient;
21
22#[derive(serde::Serialize, serde::Deserialize, Default)]
24pub struct PeerCursor {
25 pub last_known_merkle_root: Vec<u8>,
27 pub segments_pending: Vec<Vec<u8>>,
29 pub last_attempt_secs: u64,
31 pub segments_applied_total: u64,
33}
34
35pub struct AntiEntropyLoop {
41 engine: Arc<Mutex<Engine>>,
42 peer_url: String,
43 peer_id: String,
44 db_path: PathBuf,
45 pub interval_secs: u64,
47 remote_store: Option<Arc<dyn RemoteStore>>,
51}
52
53impl AntiEntropyLoop {
54 pub fn new(
61 engine: Arc<Mutex<Engine>>,
62 peer_url: String,
63 peer_id: String,
64 db_path: PathBuf,
65 ) -> Self {
66 AntiEntropyLoop {
67 engine,
68 peer_url,
69 peer_id,
70 db_path,
71 interval_secs: 30,
72 remote_store: None,
73 }
74 }
75
76 pub fn with_remote_store(mut self, store: Arc<dyn RemoteStore>) -> Self {
80 self.remote_store = Some(store);
81 self
82 }
83
84 pub fn with_interval(mut self, secs: u64) -> Self {
88 self.interval_secs = secs;
89 self
90 }
91
92 pub fn start(self) -> std::thread::JoinHandle<()> {
97 std::thread::spawn(move || {
98 loop {
99 std::thread::sleep(Duration::from_secs(self.interval_secs));
100 run_once(
101 &self.engine,
102 &self.peer_url,
103 &self.peer_id,
104 &self.db_path,
105 self.remote_store.as_deref(),
106 );
107 }
108 })
109 }
110}
111
112fn run_once(
114 engine: &Arc<Mutex<Engine>>,
115 peer_url: &str,
116 peer_id: &str,
117 db_path: &Path,
118 remote_store: Option<&dyn RemoteStore>,
119) {
120 let cursor_path = cursor_file_path(db_path, peer_id);
122 let mut cursor = load_cursor(&cursor_path);
123
124 cursor.last_attempt_secs = now_secs();
126 if let Err(e) = flush_cursor(&cursor, &cursor_path) {
127 eprintln!("[anti_entropy] cursor flush error: {}", e);
128 }
129
130 let client = HttpReplicationClient::new(peer_url);
132
133 let peer_root = match client.merkle_root() {
134 Ok(r) => r,
135 Err(e) => {
136 eprintln!("[anti_entropy] peer {} merkle_root error: {}", peer_id, e);
137 return;
138 }
139 };
140
141 let in_sync = {
143 match engine.lock() {
144 Ok(eng) => match eng.compare_merkle(&peer_root) {
145 Ok(same) => same,
146 Err(e) => {
147 eprintln!("[anti_entropy] compare_merkle error: {}", e);
148 return;
149 }
150 },
151 Err(_) => {
152 eprintln!("[anti_entropy] engine lock poisoned");
153 return;
154 }
155 }
156 };
157
158 if in_sync {
159 cursor.last_known_merkle_root = peer_root.to_vec();
161 if let Err(e) = flush_cursor(&cursor, &cursor_path) {
162 eprintln!("[anti_entropy] cursor flush (in-sync) error: {}", e);
163 }
164 return;
165 }
166
167 let peer_segments = match client.list_segments() {
169 Ok(segs) => segs,
170 Err(e) => {
171 eprintln!("[anti_entropy] peer {} list_segments error: {}", peer_id, e);
172 return;
173 }
174 };
175
176 let missing: Vec<[u8; 32]> = {
178 match engine.lock() {
179 Ok(eng) => eng.missing_segments(&peer_segments),
180 Err(_) => {
181 eprintln!("[anti_entropy] engine lock poisoned (missing_segments)");
182 return;
183 }
184 }
185 };
186
187 cursor.segments_pending = missing.iter().map(|h| h.to_vec()).collect();
189 if let Err(e) = flush_cursor(&cursor, &cursor_path) {
190 eprintln!("[anti_entropy] cursor flush (pending) error: {}", e);
191 }
192
193 let pending_hashes: Vec<Vec<u8>> = cursor.segments_pending.clone();
195 for hash_vec in &pending_hashes {
196 if hash_vec.len() != 32 {
197 eprintln!("[anti_entropy] skipping malformed hash (len={})", hash_vec.len());
198 continue;
199 }
200
201 let mut hash = [0u8; 32];
202 hash.copy_from_slice(hash_vec);
203
204 let data = match client.fetch_segment(&hash) {
206 Ok(d) => d,
207 Err(e) => {
208 eprintln!("[anti_entropy] fetch_segment error: {}", e);
209 continue;
210 }
211 };
212
213 let result = {
215 match engine.lock() {
216 Ok(mut eng) => eng.import_segment(&data, &hash),
217 Err(_) => {
218 eprintln!("[anti_entropy] engine lock poisoned (import_segment)");
219 continue;
220 }
221 }
222 };
223
224 match result {
225 Ok(ImportResult::Applied { keys_written, keys_skipped }) => {
226 cursor.segments_pending.retain(|h| h != hash_vec);
228 cursor.segments_applied_total += 1;
229 if let Err(e) = flush_cursor(&cursor, &cursor_path) {
230 eprintln!("[anti_entropy] cursor flush (applied) error: {}", e);
231 }
232 eprintln!(
233 "[anti_entropy] applied segment {}: {} written, {} skipped",
234 hex_str(&hash),
235 keys_written,
236 keys_skipped
237 );
238
239 if let Some(rs) = remote_store {
241 if let Err(e) = rs.upload(&hash, &data) {
242 eprintln!(
243 "[anti_entropy] remote_store upload warning for {}: {}",
244 hex_str(&hash),
245 e
246 );
247 }
248 }
249 }
250 Ok(ImportResult::Skipped) => {
251 cursor.segments_pending.retain(|h| h != hash_vec);
253 cursor.segments_applied_total += 1;
254 if let Err(e) = flush_cursor(&cursor, &cursor_path) {
255 eprintln!("[anti_entropy] cursor flush (skipped) error: {}", e);
256 }
257 }
258 Ok(ImportResult::HashMismatch) => {
259 eprintln!(
261 "[anti_entropy] BLAKE3 mismatch for segment {} — will retry",
262 hex_str(&hash)
263 );
264 }
265 Err(e) => {
266 eprintln!("[anti_entropy] import_segment error: {}", e);
267 }
269 }
270 }
271
272 cursor.last_known_merkle_root = peer_root.to_vec();
274 if let Err(e) = flush_cursor(&cursor, &cursor_path) {
275 eprintln!("[anti_entropy] cursor flush (final) error: {}", e);
276 }
277}
278
279fn cursor_file_path(db_path: &Path, peer_id: &str) -> PathBuf {
281 db_path.join("sync").join(format!("{}.cursor", peer_id))
282}
283
284fn load_cursor(cursor_path: &Path) -> PeerCursor {
288 match std::fs::File::open(cursor_path) {
289 Ok(file) => {
290 rmp_serde::from_read(file).unwrap_or_default()
291 }
292 Err(_) => PeerCursor::default(),
293 }
294}
295
296fn flush_cursor(cursor: &PeerCursor, cursor_path: &Path) -> Result<(), std::io::Error> {
300 if let Some(parent) = cursor_path.parent() {
302 std::fs::create_dir_all(parent)?;
303 }
304
305 let tmp_path = cursor_path.with_extension("cursor.tmp");
306
307 let bytes = rmp_serde::to_vec(cursor)
308 .map_err(|e| std::io::Error::other(e.to_string()))?;
309
310 std::fs::write(&tmp_path, &bytes)?;
311 std::fs::rename(&tmp_path, cursor_path)?;
312
313 Ok(())
314}
315
316fn now_secs() -> u64 {
318 SystemTime::now()
319 .duration_since(UNIX_EPOCH)
320 .unwrap_or_default()
321 .as_secs()
322}
323
324fn hex_str(hash: &[u8; 32]) -> String {
326 hash.iter().map(|b| format!("{:02x}", b)).collect()
327}
328
329