use crate::node::Node;
use crate::types::{Entry, NodeId};
use anyhow::Result;
use futures::future::join_all;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::interval;
use tracing::{info, warn};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncMessage {
pub sender_id: NodeId,
pub sender_uuid: Vec<u8>,
pub sender_ack: HashMap<NodeId, u64>,
pub entries: Vec<Entry>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncResponse {
pub peer_id: NodeId,
pub entries: Vec<Entry>,
pub progress: HashMap<NodeId, u64>, pub is_snapshot: bool, }
pub trait ExchangeInterface: Send + Sync + 'static {
fn uuid(&self) -> Vec<u8> {
Vec::new()
}
fn query_uuid(&self, _node_id: NodeId) -> Option<Vec<u8>> {
None
}
fn sync_to(
&self,
node: &Node,
peer: NodeId,
msg: SyncMessage,
) -> impl Future<Output = Result<SyncResponse>> + Send;
}
#[derive(Debug, Clone)]
pub struct SyncConfig {
pub interval: Duration,
pub timeout: Duration,
}
impl Default for SyncConfig {
fn default() -> Self {
Self {
interval: Duration::from_secs(30),
timeout: Duration::from_secs(10),
}
}
}
pub struct SyncManager<Net> {
store: Node,
app: Net,
config: SyncConfig,
}
impl<Net: ExchangeInterface + Clone> SyncManager<Net> {
pub fn new(store: Node, network: Net) -> Self {
Self::with_config(store, network, SyncConfig::default())
}
pub fn with_config(store: Node, network: Net, config: SyncConfig) -> Self {
Self {
store,
app: network,
config,
}
}
pub async fn bootstrap(&self) -> Result<()> {
let my_id = self.store.read().id;
let peers = self.store.read().get_peers();
let results = self.sync_to_all_peers().await;
let mut success_count = 0;
for (peer, result) in results {
match result {
Ok(_) => {
success_count += 1;
info!("Successfully bootstrapped from peer {peer}");
}
Err(err) => {
warn!("Failed to bootstrap from peer {peer}: {err:?}");
}
}
}
let mut max_seq_found = 0u64;
let store = self.store.read();
for peer_state in store.get_all_peer_states().values() {
for entry in &peer_state.log {
if entry.meta.node == my_id && entry.meta.seq > max_seq_found {
max_seq_found = entry.meta.seq;
}
}
}
for (_, entry) in store.iter_all_including_tombstones() {
if entry.meta.node == my_id && entry.meta.seq > max_seq_found {
max_seq_found = entry.meta.seq;
}
}
drop(store);
if max_seq_found > 0 {
let new_next_seq = max_seq_found + 1;
self.store.write().ensure_next_seq(new_next_seq);
}
if success_count == 0 && !peers.is_empty() {
warn!("Bootstrap: Failed to sync from any peer, proceeding anyway");
} else {
info!(
"Bootstrap: Successfully synced from {}/{} peers",
success_count,
peers.len()
);
let status = self.store.read().status();
info!("Node status after bootstrap: {:#?}", status);
}
Ok(())
}
pub async fn start_sync_tasks(self: Arc<Self>) {
let sync_manager = self.clone();
tokio::spawn(async move {
sync_manager.periodic_log_exchange().await;
});
}
async fn periodic_log_exchange(&self) {
let mut ticker = interval(self.config.interval);
loop {
ticker.tick().await;
let results = self.sync_to_all_peers().await;
for (peer, result) in results {
match result {
Ok(_) => {
info!("Successfully synced with peer {peer}");
}
Err(e) => {
warn!("Failed to sync with peer {peer}: {e:?}");
}
}
}
}
}
#[tracing::instrument(skip(self, msg), fields(from = msg.sender_id))]
pub fn handle_sync(&self, msg: SyncMessage) -> Result<SyncResponse> {
let peer_progress = msg.sender_ack.clone();
let peer_id = msg.sender_id;
if let Some(expected_uuid) = self.app.query_uuid(peer_id) {
if expected_uuid != msg.sender_uuid {
warn!(
"UUID mismatch for peer {peer_id}: expected {:?}, got {:?}",
hex::encode(expected_uuid),
hex::encode(msg.sender_uuid)
);
anyhow::bail!("UUID mismatch for peer {peer_id}. Don't reuse node IDs for peers.");
}
}
let mut state = self.store.write();
state.apply_pushed_entries(msg)?;
let (entries, is_snapshot) = match state.get_peer_missing_logs(&peer_progress) {
Some(entries) => {
info!(
"Returning {} incremental log entries to node {peer_id}",
entries.len(),
);
(entries, false)
}
None => {
let entries = state.kv_to_log_entries();
info!(
"Returning snapshot ({} entries) to node {peer_id}",
entries.len(),
);
(entries, true)
}
};
let my_progress = state.get_local_ack();
let my_id = state.id;
let peer_ack = *my_progress.get(&my_id).unwrap_or(&0);
let _ = state.update_peer_ack(peer_id, peer_ack);
Ok(SyncResponse {
peer_id: my_id,
progress: my_progress,
entries,
is_snapshot,
})
}
async fn sync_to_all_peers(&self) -> Vec<(NodeId, Result<()>)> {
let peers = self.store.read().get_peers();
if peers.is_empty() {
info!("No peers to bootstrap from, starting fresh");
return vec![];
}
info!("Syncing with {} peers...", peers.len());
let sync_futures: Vec<_> = peers
.iter()
.map(|&peer| async move { (peer, self.sync_to(peer).await) })
.collect();
join_all(sync_futures).await
}
#[tracing::instrument(skip(self))]
async fn sync_to(&self, peer: NodeId) -> Result<()> {
let timeout = self.config.timeout;
let (sender_id, sender_ack, entries) = {
let state = self.store.read();
let peer_ack_for_us = state.get_peer_state(peer).map_or(0, |p| p.peer_ack);
let sender_id = state.id;
let entries = state
.get_peer_logs_since(sender_id, peer_ack_for_us)
.unwrap_or_default();
let sender_ack = state.get_local_ack();
(sender_id, sender_ack, entries)
};
info!("Sending {} log entries to peer {peer}", entries.len());
let msg = SyncMessage {
sender_id,
sender_uuid: self.app.uuid(),
sender_ack,
entries,
};
let result = tokio::time::timeout(timeout, self.app.sync_to(&self.store, peer, msg))
.await
.map_err(|_| anyhow::anyhow!("sync request timed out after {:?}", timeout))?;
match result {
Ok(response) => self.store.write().apply_pulled_entries(response),
Err(e) => {
warn!("Log exchange with peer {peer} failed: {e}");
Err(e)
}
}
}
}