wavekv 1.0.0

An embeddable, eventually consistent, distributed key-value store with peer-to-peer architecture
Documentation
use crate::node::Node;
use crate::types::{Entry, NodeId};
use anyhow::Result;
use futures::future::join_all;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::interval;
use tracing::{info, warn};

/// Bidirectional sync: sender includes their local_ack AND their new entries
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncMessage {
    /// The unique numeric identifier of the sender node
    pub sender_id: NodeId,
    /// Optional sender's UUID. This may be used to detect node id duplication
    pub sender_uuid: Vec<u8>,
    /// How far the sender has synced each node's logs (local_ack)
    pub sender_ack: HashMap<NodeId, u64>,
    /// Sender's new log entries (incremental or full dump)
    pub entries: Vec<Entry>,
}

/// Unified log exchange response
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncResponse {
    pub peer_id: NodeId,
    pub entries: Vec<Entry>,
    pub progress: HashMap<NodeId, u64>, // Responder's local_ack for each node
    pub is_snapshot: bool,              // Indicates if this is a full KV->log conversion
}

pub trait ExchangeInterface: Send + Sync + 'static {
    fn uuid(&self) -> Vec<u8> {
        Vec::new()
    }
    fn query_uuid(&self, _node_id: NodeId) -> Option<Vec<u8>> {
        None
    }
    fn sync_to(
        &self,
        node: &Node,
        peer: NodeId,
        msg: SyncMessage,
    ) -> impl Future<Output = Result<SyncResponse>> + Send;
}

/// Configuration for sync manager
#[derive(Debug, Clone)]
pub struct SyncConfig {
    /// Interval between sync attempts
    pub interval: Duration,
    /// Timeout for each sync request
    pub timeout: Duration,
}

impl Default for SyncConfig {
    fn default() -> Self {
        Self {
            interval: Duration::from_secs(30),
            timeout: Duration::from_secs(10),
        }
    }
}

/// Simplified sync manager
pub struct SyncManager<Net> {
    store: Node,
    app: Net,
    config: SyncConfig,
}

impl<Net: ExchangeInterface + Clone> SyncManager<Net> {
    pub fn new(store: Node, network: Net) -> Self {
        Self::with_config(store, network, SyncConfig::default())
    }

    pub fn with_config(store: Node, network: Net, config: SyncConfig) -> Self {
        Self {
            store,
            app: network,
            config,
        }
    }

    /// Bootstrap: Sync from all peers and recover next_seq before starting local operations
    /// This is critical after data loss to avoid sequence number reuse
    pub async fn bootstrap(&self) -> Result<()> {
        let my_id = self.store.read().id;

        let peers = self.store.read().get_peers();
        let results = self.sync_to_all_peers().await;
        let mut success_count = 0;
        for (peer, result) in results {
            match result {
                Ok(_) => {
                    success_count += 1;
                    info!("Successfully bootstrapped from peer {peer}");
                }
                Err(err) => {
                    warn!("Failed to bootstrap from peer {peer}: {err:?}");
                }
            }
        }

        let mut max_seq_found = 0u64;

        // Scan all received entries to find our highest seq
        let store = self.store.read();
        for peer_state in store.get_all_peer_states().values() {
            for entry in &peer_state.log {
                if entry.meta.node == my_id && entry.meta.seq > max_seq_found {
                    max_seq_found = entry.meta.seq;
                }
            }
        }

        // Also check the main data store
        for (_, entry) in store.iter_all_including_tombstones() {
            if entry.meta.node == my_id && entry.meta.seq > max_seq_found {
                max_seq_found = entry.meta.seq;
            }
        }
        drop(store);

        // Update next_seq if we found any of our own entries
        if max_seq_found > 0 {
            let new_next_seq = max_seq_found + 1;
            self.store.write().ensure_next_seq(new_next_seq);
        }

        if success_count == 0 && !peers.is_empty() {
            warn!("Bootstrap: Failed to sync from any peer, proceeding anyway");
        } else {
            info!(
                "Bootstrap: Successfully synced from {}/{} peers",
                success_count,
                peers.len()
            );
            let status = self.store.read().status();
            info!("Node status after bootstrap: {:#?}", status);
        }

        Ok(())
    }

    /// Start periodic log exchange with peers
    pub async fn start_sync_tasks(self: Arc<Self>) {
        let sync_manager = self.clone();
        tokio::spawn(async move {
            sync_manager.periodic_log_exchange().await;
        });
    }

    /// Periodic log exchange: send our logs to peers and request their logs
    async fn periodic_log_exchange(&self) {
        let mut ticker = interval(self.config.interval);

        loop {
            ticker.tick().await;

            let results = self.sync_to_all_peers().await;
            for (peer, result) in results {
                match result {
                    Ok(_) => {
                        info!("Successfully synced with peer {peer}");
                    }
                    Err(e) => {
                        warn!("Failed to sync with peer {peer}: {e:?}");
                    }
                }
            }
        }
    }

    /// Handle incoming sync message (bidirectional sync)
    ///
    /// Protocol:
    /// - Request contains only sender's own logs (entries from sender_id)
    /// - sender_ack serves dual purpose: progress report + "since" parameter
    /// - Response contains logs from ALL nodes based on sender_ack
    #[tracing::instrument(skip(self, msg), fields(from = msg.sender_id))]
    pub fn handle_sync(&self, msg: SyncMessage) -> Result<SyncResponse> {
        let peer_progress = msg.sender_ack.clone();
        let peer_id = msg.sender_id;
        if let Some(expected_uuid) = self.app.query_uuid(peer_id) {
            if expected_uuid != msg.sender_uuid {
                warn!(
                    "UUID mismatch for peer {peer_id}: expected {:?}, got {:?}",
                    hex::encode(expected_uuid),
                    hex::encode(msg.sender_uuid)
                );
                anyhow::bail!("UUID mismatch for peer {peer_id}. Don't reuse node IDs for peers.");
            }
        }
        let mut state = self.store.write();
        // Step 1: Apply sender's entries (only contains sender_id's logs)
        state.apply_pushed_entries(msg)?;
        // Step 2: Use sender_ack to determine what to send back
        // Response can include logs from ALL nodes
        let (entries, is_snapshot) = match state.get_peer_missing_logs(&peer_progress) {
            Some(entries) => {
                info!(
                    "Returning {} incremental log entries to node {peer_id}",
                    entries.len(),
                );
                (entries, false)
            }
            None => {
                let entries = state.kv_to_log_entries();
                info!(
                    "Returning snapshot ({} entries) to node {peer_id}",
                    entries.len(),
                );
                (entries, true)
            }
        };

        // Step 3: Include our progress so sender can update their peer_ack for us
        let my_progress = state.get_local_ack();

        // Step 4: Update our peer_ack assuming the peer will accept our logs. If the peer
        //         doesn't accept our logs, it will be updated in the next sync.
        let my_id = state.id;
        let peer_ack = *my_progress.get(&my_id).unwrap_or(&0);
        let _ = state.update_peer_ack(peer_id, peer_ack);

        Ok(SyncResponse {
            peer_id: my_id,
            progress: my_progress,
            entries,
            is_snapshot,
        })
    }

    /// Perform log exchange with all peers (bidirectional)
    ///
    /// Returns Vec of (peer_id, Result<()>)
    async fn sync_to_all_peers(&self) -> Vec<(NodeId, Result<()>)> {
        let peers = self.store.read().get_peers();

        if peers.is_empty() {
            info!("No peers to bootstrap from, starting fresh");
            return vec![];
        }

        info!("Syncing with {} peers...", peers.len());

        // Sync from all peers in parallel
        let sync_futures: Vec<_> = peers
            .iter()
            .map(|&peer| async move { (peer, self.sync_to(peer).await) })
            .collect();

        join_all(sync_futures).await
    }

    /// Perform log exchange with a peer (bidirectional)
    ///
    /// Protocol:
    /// - Send only OUR node's logs (entries from store.id)
    /// - Include our sender_ack (progress on all nodes)
    /// - Peer responds with logs from ALL nodes they have
    #[tracing::instrument(skip(self))]
    async fn sync_to(&self, peer: NodeId) -> Result<()> {
        let timeout = self.config.timeout;
        // Prepare our local_ack to send (tells peer what we've synced)
        let (sender_id, sender_ack, entries) = {
            let state = self.store.read();
            // Get only OUR node's log entries that peer hasn't ack'd yet
            let peer_ack_for_us = state.get_peer_state(peer).map_or(0, |p| p.peer_ack);
            let sender_id = state.id;
            // Collect our entries with seq > peer_ack
            let entries = state
                .get_peer_logs_since(sender_id, peer_ack_for_us)
                .unwrap_or_default();
            let sender_ack = state.get_local_ack();
            (sender_id, sender_ack, entries)
        };

        info!("Sending {} log entries to peer {peer}", entries.len());
        // Send bidirectional sync message
        let msg = SyncMessage {
            sender_id,
            sender_uuid: self.app.uuid(),
            sender_ack,
            entries,
        };

        let result = tokio::time::timeout(timeout, self.app.sync_to(&self.store, peer, msg))
            .await
            .map_err(|_| anyhow::anyhow!("sync request timed out after {:?}", timeout))?;

        match result {
            Ok(response) => self.store.write().apply_pulled_entries(response),
            Err(e) => {
                warn!("Log exchange with peer {peer} failed: {e}");
                Err(e)
            }
        }
    }
}