Skip to main content

edgestore_repl/
anti_entropy.rs

1//! Pull-only anti-entropy loop with per-peer cursor persistence.
2//!
3//! `AntiEntropyLoop` wakes every N seconds, probes the peer's Merkle root, and if
4//! diverged pulls all missing segments one by one. Progress is tracked in a per-peer
5//! cursor file at `{db_path}/sync/{peer_id}.cursor` (MessagePack format, D08).
6//!
7//! Cursor fields (D08):
8//!   - `last_known_merkle_root` — peer's Merkle root as of last successful sync
9//!   - `segments_pending`       — hashes not yet applied (resume after crash)
10//!   - `last_attempt_secs`      — unix timestamp of last probe attempt
11//!   - `segments_applied_total` — running count of segments applied
12
13use std::path::{Path, PathBuf};
14use std::sync::{Arc, Mutex};
15use std::time::{Duration, SystemTime, UNIX_EPOCH};
16
17use edgestore::{Engine, ImportResult, RemoteStore};
18use edgestore::replication::ReplicationProtocol;
19
20use crate::http_client::HttpReplicationClient;
21
22/// Per-peer cursor: durable progress state for the anti-entropy loop (D08).
23#[derive(serde::Serialize, serde::Deserialize, Default)]
24pub struct PeerCursor {
25    /// Peer's Merkle root from the last completed sync (32 bytes stored as Vec).
26    pub last_known_merkle_root: Vec<u8>,
27    /// Segment hashes that have been identified as missing but not yet applied.
28    pub segments_pending: Vec<Vec<u8>>,
29    /// Unix timestamp (seconds) of the last probe attempt.
30    pub last_attempt_secs: u64,
31    /// Total number of segments applied to date.
32    pub segments_applied_total: u64,
33}
34
35/// Background pull-only anti-entropy loop.
36///
37/// Spawned via `AntiEntropyLoop::start()`. Probes the configured peer every `interval_secs`
38/// seconds. If the Merkle roots differ, pulls all missing segments and applies LWW merges
39/// via `Engine::import_segment`. Per-peer cursor makes progress durable across crashes.
40pub struct AntiEntropyLoop {
41    engine: Arc<Mutex<Engine>>,
42    peer_url: String,
43    peer_id: String,
44    db_path: PathBuf,
45    /// Probe interval in seconds. Default: 30.
46    pub interval_secs: u64,
47    /// Optional durable segment backend. When `Some`, each successfully applied
48    /// segment is uploaded after import (D08). Upload failure is non-fatal — the
49    /// segment is already applied locally.
50    remote_store: Option<Arc<dyn RemoteStore>>,
51}
52
53impl AntiEntropyLoop {
54    /// Create a new anti-entropy loop.
55    ///
56    /// - `engine`   — shared engine (`Arc<Mutex>`) for replication API access.
57    /// - `peer_url` — base URL of the remote peer's `HttpReplicationServer` (e.g. `"http://host:8900"`).
58    /// - `peer_id`  — unique identifier for the peer; used as the cursor file name.
59    /// - `db_path`  — database directory path; cursor file is written under `{db_path}/sync/`.
60    pub fn new(
61        engine: Arc<Mutex<Engine>>,
62        peer_url: String,
63        peer_id: String,
64        db_path: PathBuf,
65    ) -> Self {
66        AntiEntropyLoop {
67            engine,
68            peer_url,
69            peer_id,
70            db_path,
71            interval_secs: 30,
72            remote_store: None,
73        }
74    }
75
76    /// Attach a `RemoteStore` backend. After each segment is successfully applied, the
77    /// loop will call `remote_store.upload(hash, data)`. Upload failures are logged and
78    /// ignored — they do not abort the sync loop.
79    pub fn with_remote_store(mut self, store: Arc<dyn RemoteStore>) -> Self {
80        self.remote_store = Some(store);
81        self
82    }
83
84    /// Override the probe interval (default: 30 seconds).
85    ///
86    /// Useful in tests to reduce the time between anti-entropy cycles.
87    pub fn with_interval(mut self, secs: u64) -> Self {
88        self.interval_secs = secs;
89        self
90    }
91
92    /// Spawn the anti-entropy loop in a background thread.
93    ///
94    /// Returns the `JoinHandle` for the background thread. The thread runs until the
95    /// process exits.
96    pub fn start(self) -> std::thread::JoinHandle<()> {
97        std::thread::spawn(move || {
98            loop {
99                std::thread::sleep(Duration::from_secs(self.interval_secs));
100                run_once(
101                    &self.engine,
102                    &self.peer_url,
103                    &self.peer_id,
104                    &self.db_path,
105                    self.remote_store.as_deref(),
106                );
107            }
108        })
109    }
110}
111
112/// Execute one anti-entropy probe-and-pull cycle.
113fn run_once(
114    engine: &Arc<Mutex<Engine>>,
115    peer_url: &str,
116    peer_id: &str,
117    db_path: &Path,
118    remote_store: Option<&dyn RemoteStore>,
119) {
120    // Step 1: Load or create cursor.
121    let cursor_path = cursor_file_path(db_path, peer_id);
122    let mut cursor = load_cursor(&cursor_path);
123
124    // Step 2: Update attempt timestamp.
125    cursor.last_attempt_secs = now_secs();
126    if let Err(e) = flush_cursor(&cursor, &cursor_path) {
127        eprintln!("[anti_entropy] cursor flush error: {}", e);
128    }
129
130    // Step 3: Create client and probe peer Merkle root.
131    let client = HttpReplicationClient::new(peer_url);
132
133    let peer_root = match client.merkle_root() {
134        Ok(r) => r,
135        Err(e) => {
136            eprintln!("[anti_entropy] peer {} merkle_root error: {}", peer_id, e);
137            return;
138        }
139    };
140
141    // Step 4: Compare Merkle roots.
142    let in_sync = {
143        match engine.lock() {
144            Ok(eng) => match eng.compare_merkle(&peer_root) {
145                Ok(same) => same,
146                Err(e) => {
147                    eprintln!("[anti_entropy] compare_merkle error: {}", e);
148                    return;
149                }
150            },
151            Err(_) => {
152                eprintln!("[anti_entropy] engine lock poisoned");
153                return;
154            }
155        }
156    };
157
158    if in_sync {
159        // Roots match — update cursor and skip expensive manifest diff.
160        cursor.last_known_merkle_root = peer_root.to_vec();
161        if let Err(e) = flush_cursor(&cursor, &cursor_path) {
162            eprintln!("[anti_entropy] cursor flush (in-sync) error: {}", e);
163        }
164        return;
165    }
166
167    // Step 5: Fetch peer segment manifest.
168    let peer_segments = match client.list_segments() {
169        Ok(segs) => segs,
170        Err(e) => {
171            eprintln!("[anti_entropy] peer {} list_segments error: {}", peer_id, e);
172            return;
173        }
174    };
175
176    // Step 6: Compute missing segments.
177    let missing: Vec<[u8; 32]> = {
178        match engine.lock() {
179            Ok(eng) => eng.missing_segments(&peer_segments),
180            Err(_) => {
181                eprintln!("[anti_entropy] engine lock poisoned (missing_segments)");
182                return;
183            }
184        }
185    };
186
187    // Step 7: Update cursor with pending hashes.
188    cursor.segments_pending = missing.iter().map(|h| h.to_vec()).collect();
189    if let Err(e) = flush_cursor(&cursor, &cursor_path) {
190        eprintln!("[anti_entropy] cursor flush (pending) error: {}", e);
191    }
192
193    // Step 8: Pull and apply each missing segment.
194    let pending_hashes: Vec<Vec<u8>> = cursor.segments_pending.clone();
195    for hash_vec in &pending_hashes {
196        if hash_vec.len() != 32 {
197            eprintln!("[anti_entropy] skipping malformed hash (len={})", hash_vec.len());
198            continue;
199        }
200
201        let mut hash = [0u8; 32];
202        hash.copy_from_slice(hash_vec);
203
204        // Download segment from peer.
205        let data = match client.fetch_segment(&hash) {
206            Ok(d) => d,
207            Err(e) => {
208                eprintln!("[anti_entropy] fetch_segment error: {}", e);
209                continue;
210            }
211        };
212
213        // Import segment into local engine (includes BLAKE3 verification + LWW merge).
214        let result = {
215            match engine.lock() {
216                Ok(mut eng) => eng.import_segment(&data, &hash),
217                Err(_) => {
218                    eprintln!("[anti_entropy] engine lock poisoned (import_segment)");
219                    continue;
220                }
221            }
222        };
223
224        match result {
225            Ok(ImportResult::Applied { keys_written, keys_skipped }) => {
226                // Remove from pending and update total.
227                cursor.segments_pending.retain(|h| h != hash_vec);
228                cursor.segments_applied_total += 1;
229                if let Err(e) = flush_cursor(&cursor, &cursor_path) {
230                    eprintln!("[anti_entropy] cursor flush (applied) error: {}", e);
231                }
232                eprintln!(
233                    "[anti_entropy] applied segment {}: {} written, {} skipped",
234                    hex_str(&hash),
235                    keys_written,
236                    keys_skipped
237                );
238
239                // Upload to remote store if configured (D08). Non-fatal on error.
240                if let Some(rs) = remote_store {
241                    if let Err(e) = rs.upload(&hash, &data) {
242                        eprintln!(
243                            "[anti_entropy] remote_store upload warning for {}: {}",
244                            hex_str(&hash),
245                            e
246                        );
247                    }
248                }
249            }
250            Ok(ImportResult::Skipped) => {
251                // Already present — remove from pending.
252                cursor.segments_pending.retain(|h| h != hash_vec);
253                cursor.segments_applied_total += 1;
254                if let Err(e) = flush_cursor(&cursor, &cursor_path) {
255                    eprintln!("[anti_entropy] cursor flush (skipped) error: {}", e);
256                }
257            }
258            Ok(ImportResult::HashMismatch) => {
259                // Do NOT remove from pending — will retry next cycle.
260                eprintln!(
261                    "[anti_entropy] BLAKE3 mismatch for segment {} — will retry",
262                    hex_str(&hash)
263                );
264            }
265            Err(e) => {
266                eprintln!("[anti_entropy] import_segment error: {}", e);
267                // Leave in pending — will retry next cycle.
268            }
269        }
270    }
271
272    // Step 9: Update cursor with the peer root we just synced to.
273    cursor.last_known_merkle_root = peer_root.to_vec();
274    if let Err(e) = flush_cursor(&cursor, &cursor_path) {
275        eprintln!("[anti_entropy] cursor flush (final) error: {}", e);
276    }
277}
278
279/// Compute the cursor file path for a given peer.
280fn cursor_file_path(db_path: &Path, peer_id: &str) -> PathBuf {
281    db_path.join("sync").join(format!("{}.cursor", peer_id))
282}
283
284/// Load cursor from disk. Returns a default cursor on parse failure or missing file (D08).
285///
286/// Corrupt cursor is treated as empty to avoid blocking sync on bad state.
287fn load_cursor(cursor_path: &Path) -> PeerCursor {
288    match std::fs::File::open(cursor_path) {
289        Ok(file) => {
290            rmp_serde::from_read(file).unwrap_or_default()
291        }
292        Err(_) => PeerCursor::default(),
293    }
294}
295
296/// Flush cursor atomically: write to `.tmp`, then rename to final path (D08, T-04-09).
297///
298/// Atomic write prevents corrupt cursor state on crash mid-write.
299fn flush_cursor(cursor: &PeerCursor, cursor_path: &Path) -> Result<(), std::io::Error> {
300    // Ensure the sync/ directory exists.
301    if let Some(parent) = cursor_path.parent() {
302        std::fs::create_dir_all(parent)?;
303    }
304
305    let tmp_path = cursor_path.with_extension("cursor.tmp");
306
307    let bytes = rmp_serde::to_vec(cursor)
308        .map_err(|e| std::io::Error::other(e.to_string()))?;
309
310    std::fs::write(&tmp_path, &bytes)?;
311    std::fs::rename(&tmp_path, cursor_path)?;
312
313    Ok(())
314}
315
316/// Return current Unix timestamp in seconds.
317fn now_secs() -> u64 {
318    SystemTime::now()
319        .duration_since(UNIX_EPOCH)
320        .unwrap_or_default()
321        .as_secs()
322}
323
324/// Format a 32-byte hash as a hex string.
325fn hex_str(hash: &[u8; 32]) -> String {
326    hash.iter().map(|b| format!("{:02x}", b)).collect()
327}
328
329