use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use edgestore::{Engine, ImportResult, RemoteStore};
use edgestore::replication::ReplicationProtocol;
use crate::http_client::HttpReplicationClient;
#[derive(serde::Serialize, serde::Deserialize, Default)]
pub struct PeerCursor {
pub last_known_merkle_root: Vec<u8>,
pub segments_pending: Vec<Vec<u8>>,
pub last_attempt_secs: u64,
pub segments_applied_total: u64,
}
pub struct AntiEntropyLoop {
engine: Arc<Mutex<Engine>>,
peer_url: String,
peer_id: String,
db_path: PathBuf,
pub interval_secs: u64,
remote_store: Option<Arc<dyn RemoteStore>>,
}
impl AntiEntropyLoop {
pub fn new(
engine: Arc<Mutex<Engine>>,
peer_url: String,
peer_id: String,
db_path: PathBuf,
) -> Self {
AntiEntropyLoop {
engine,
peer_url,
peer_id,
db_path,
interval_secs: 30,
remote_store: None,
}
}
pub fn with_remote_store(mut self, store: Arc<dyn RemoteStore>) -> Self {
self.remote_store = Some(store);
self
}
pub fn with_interval(mut self, secs: u64) -> Self {
self.interval_secs = secs;
self
}
pub fn start(self) -> std::thread::JoinHandle<()> {
std::thread::spawn(move || {
loop {
std::thread::sleep(Duration::from_secs(self.interval_secs));
run_once(
&self.engine,
&self.peer_url,
&self.peer_id,
&self.db_path,
self.remote_store.as_deref(),
);
}
})
}
}
fn run_once(
engine: &Arc<Mutex<Engine>>,
peer_url: &str,
peer_id: &str,
db_path: &Path,
remote_store: Option<&dyn RemoteStore>,
) {
let cursor_path = cursor_file_path(db_path, peer_id);
let mut cursor = load_cursor(&cursor_path);
cursor.last_attempt_secs = now_secs();
if let Err(e) = flush_cursor(&cursor, &cursor_path) {
eprintln!("[anti_entropy] cursor flush error: {}", e);
}
let client = HttpReplicationClient::new(peer_url);
let peer_root = match client.merkle_root() {
Ok(r) => r,
Err(e) => {
eprintln!("[anti_entropy] peer {} merkle_root error: {}", peer_id, e);
return;
}
};
let in_sync = {
match engine.lock() {
Ok(eng) => match eng.compare_merkle(&peer_root) {
Ok(same) => same,
Err(e) => {
eprintln!("[anti_entropy] compare_merkle error: {}", e);
return;
}
},
Err(_) => {
eprintln!("[anti_entropy] engine lock poisoned");
return;
}
}
};
if in_sync {
cursor.last_known_merkle_root = peer_root.to_vec();
if let Err(e) = flush_cursor(&cursor, &cursor_path) {
eprintln!("[anti_entropy] cursor flush (in-sync) error: {}", e);
}
return;
}
let peer_segments = match client.list_segments() {
Ok(segs) => segs,
Err(e) => {
eprintln!("[anti_entropy] peer {} list_segments error: {}", peer_id, e);
return;
}
};
let missing: Vec<[u8; 32]> = {
match engine.lock() {
Ok(eng) => eng.missing_segments(&peer_segments),
Err(_) => {
eprintln!("[anti_entropy] engine lock poisoned (missing_segments)");
return;
}
}
};
cursor.segments_pending = missing.iter().map(|h| h.to_vec()).collect();
if let Err(e) = flush_cursor(&cursor, &cursor_path) {
eprintln!("[anti_entropy] cursor flush (pending) error: {}", e);
}
let pending_hashes: Vec<Vec<u8>> = cursor.segments_pending.clone();
for hash_vec in &pending_hashes {
if hash_vec.len() != 32 {
eprintln!("[anti_entropy] skipping malformed hash (len={})", hash_vec.len());
continue;
}
let mut hash = [0u8; 32];
hash.copy_from_slice(hash_vec);
let data = match client.fetch_segment(&hash) {
Ok(d) => d,
Err(e) => {
eprintln!("[anti_entropy] fetch_segment error: {}", e);
continue;
}
};
let result = {
match engine.lock() {
Ok(mut eng) => eng.import_segment(&data, &hash),
Err(_) => {
eprintln!("[anti_entropy] engine lock poisoned (import_segment)");
continue;
}
}
};
match result {
Ok(ImportResult::Applied { keys_written, keys_skipped }) => {
cursor.segments_pending.retain(|h| h != hash_vec);
cursor.segments_applied_total += 1;
if let Err(e) = flush_cursor(&cursor, &cursor_path) {
eprintln!("[anti_entropy] cursor flush (applied) error: {}", e);
}
eprintln!(
"[anti_entropy] applied segment {}: {} written, {} skipped",
hex_str(&hash),
keys_written,
keys_skipped
);
if let Some(rs) = remote_store {
if let Err(e) = rs.upload(&hash, &data) {
eprintln!(
"[anti_entropy] remote_store upload warning for {}: {}",
hex_str(&hash),
e
);
}
}
}
Ok(ImportResult::Skipped) => {
cursor.segments_pending.retain(|h| h != hash_vec);
cursor.segments_applied_total += 1;
if let Err(e) = flush_cursor(&cursor, &cursor_path) {
eprintln!("[anti_entropy] cursor flush (skipped) error: {}", e);
}
}
Ok(ImportResult::HashMismatch) => {
eprintln!(
"[anti_entropy] BLAKE3 mismatch for segment {} — will retry",
hex_str(&hash)
);
}
Err(e) => {
eprintln!("[anti_entropy] import_segment error: {}", e);
}
}
}
cursor.last_known_merkle_root = peer_root.to_vec();
if let Err(e) = flush_cursor(&cursor, &cursor_path) {
eprintln!("[anti_entropy] cursor flush (final) error: {}", e);
}
}
fn cursor_file_path(db_path: &Path, peer_id: &str) -> PathBuf {
db_path.join("sync").join(format!("{}.cursor", peer_id))
}
fn load_cursor(cursor_path: &Path) -> PeerCursor {
match std::fs::File::open(cursor_path) {
Ok(file) => {
rmp_serde::from_read(file).unwrap_or_default()
}
Err(_) => PeerCursor::default(),
}
}
fn flush_cursor(cursor: &PeerCursor, cursor_path: &Path) -> Result<(), std::io::Error> {
if let Some(parent) = cursor_path.parent() {
std::fs::create_dir_all(parent)?;
}
let tmp_path = cursor_path.with_extension("cursor.tmp");
let bytes = rmp_serde::to_vec(cursor)
.map_err(|e| std::io::Error::other(e.to_string()))?;
std::fs::write(&tmp_path, &bytes)?;
std::fs::rename(&tmp_path, cursor_path)?;
Ok(())
}
fn now_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}
fn hex_str(hash: &[u8; 32]) -> String {
hash.iter().map(|b| format!("{:02x}", b)).collect()
}