wavekv/
sync.rs

1use crate::node::Node;
2use crate::types::{Entry, NodeId};
3use anyhow::Result;
4use futures::future::join_all;
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::future::Future;
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::time::interval;
11use tracing::{info, warn};
12
13/// Bidirectional sync: sender includes their local_ack AND their new entries
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct SyncMessage {
16    /// The unique numeric identifier of the sender node
17    pub sender_id: NodeId,
18    /// Optional sender's UUID. This may be used to detect node id duplication
19    pub sender_uuid: Vec<u8>,
20    /// How far the sender has synced each node's logs (local_ack)
21    pub sender_ack: HashMap<NodeId, u64>,
22    /// Sender's new log entries (incremental or full dump)
23    pub entries: Vec<Entry>,
24}
25
26/// Unified log exchange response
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct SyncResponse {
29    pub peer_id: NodeId,
30    pub entries: Vec<Entry>,
31    pub progress: HashMap<NodeId, u64>, // Responder's local_ack for each node
32    pub is_snapshot: bool,              // Indicates if this is a full KV->log conversion
33}
34
35pub trait ExchangeInterface: Send + Sync + 'static {
36    fn uuid(&self) -> Vec<u8> {
37        Vec::new()
38    }
39    fn query_uuid(&self, _node_id: NodeId) -> Option<Vec<u8>> {
40        None
41    }
42    fn sync_to(
43        &self,
44        node: &Node,
45        peer: NodeId,
46        msg: SyncMessage,
47    ) -> impl Future<Output = Result<SyncResponse>> + Send;
48}
49
50/// Configuration for sync manager
51#[derive(Debug, Clone)]
52pub struct SyncConfig {
53    /// Interval between sync attempts
54    pub interval: Duration,
55    /// Timeout for each sync request
56    pub timeout: Duration,
57}
58
59impl Default for SyncConfig {
60    fn default() -> Self {
61        Self {
62            interval: Duration::from_secs(30),
63            timeout: Duration::from_secs(10),
64        }
65    }
66}
67
68/// Simplified sync manager
69pub struct SyncManager<Net> {
70    store: Node,
71    app: Net,
72    config: SyncConfig,
73}
74
75impl<Net: ExchangeInterface + Clone> SyncManager<Net> {
76    pub fn new(store: Node, network: Net) -> Self {
77        Self::with_config(store, network, SyncConfig::default())
78    }
79
80    pub fn with_config(store: Node, network: Net, config: SyncConfig) -> Self {
81        Self {
82            store,
83            app: network,
84            config,
85        }
86    }
87
88    /// Bootstrap: Sync from all peers and recover next_seq before starting local operations
89    /// This is critical after data loss to avoid sequence number reuse
90    pub async fn bootstrap(&self) -> Result<()> {
91        let my_id = self.store.read().id;
92
93        let peers = self.store.read().get_peers();
94        let results = self.sync_to_all_peers().await;
95        let mut success_count = 0;
96        for (peer, result) in results {
97            match result {
98                Ok(_) => {
99                    success_count += 1;
100                    info!("Successfully bootstrapped from peer {peer}");
101                }
102                Err(err) => {
103                    warn!("Failed to bootstrap from peer {peer}: {err:?}");
104                }
105            }
106        }
107
108        let mut max_seq_found = 0u64;
109
110        // Scan all received entries to find our highest seq
111        let store = self.store.read();
112        for peer_state in store.get_all_peer_states().values() {
113            for entry in &peer_state.log {
114                if entry.meta.node == my_id && entry.meta.seq > max_seq_found {
115                    max_seq_found = entry.meta.seq;
116                }
117            }
118        }
119
120        // Also check the main data store
121        for (_, entry) in store.iter_all_including_tombstones() {
122            if entry.meta.node == my_id && entry.meta.seq > max_seq_found {
123                max_seq_found = entry.meta.seq;
124            }
125        }
126        drop(store);
127
128        // Update next_seq if we found any of our own entries
129        if max_seq_found > 0 {
130            let new_next_seq = max_seq_found + 1;
131            self.store.write().ensure_next_seq(new_next_seq);
132        }
133
134        if success_count == 0 && !peers.is_empty() {
135            warn!("Bootstrap: Failed to sync from any peer, proceeding anyway");
136        } else {
137            info!(
138                "Bootstrap: Successfully synced from {}/{} peers",
139                success_count,
140                peers.len()
141            );
142            let status = self.store.read().status();
143            info!("Node status after bootstrap: {:#?}", status);
144        }
145
146        Ok(())
147    }
148
149    /// Start periodic log exchange with peers
150    pub async fn start_sync_tasks(self: Arc<Self>) {
151        let sync_manager = self.clone();
152        tokio::spawn(async move {
153            sync_manager.periodic_log_exchange().await;
154        });
155    }
156
157    /// Periodic log exchange: send our logs to peers and request their logs
158    async fn periodic_log_exchange(&self) {
159        let mut ticker = interval(self.config.interval);
160
161        loop {
162            ticker.tick().await;
163
164            let results = self.sync_to_all_peers().await;
165            for (peer, result) in results {
166                match result {
167                    Ok(_) => {
168                        info!("Successfully synced with peer {peer}");
169                    }
170                    Err(e) => {
171                        warn!("Failed to sync with peer {peer}: {e:?}");
172                    }
173                }
174            }
175        }
176    }
177
178    /// Handle incoming sync message (bidirectional sync)
179    ///
180    /// Protocol:
181    /// - Request contains only sender's own logs (entries from sender_id)
182    /// - sender_ack serves dual purpose: progress report + "since" parameter
183    /// - Response contains logs from ALL nodes based on sender_ack
184    #[tracing::instrument(skip(self, msg), fields(from = msg.sender_id))]
185    pub fn handle_sync(&self, msg: SyncMessage) -> Result<SyncResponse> {
186        let peer_progress = msg.sender_ack.clone();
187        let peer_id = msg.sender_id;
188        if let Some(expected_uuid) = self.app.query_uuid(peer_id) {
189            if expected_uuid != msg.sender_uuid {
190                warn!(
191                    "UUID mismatch for peer {peer_id}: expected {:?}, got {:?}",
192                    hex::encode(expected_uuid),
193                    hex::encode(msg.sender_uuid)
194                );
195                anyhow::bail!("UUID mismatch for peer {peer_id}. Don't reuse node IDs for peers.");
196            }
197        }
198        let mut state = self.store.write();
199        // Step 1: Apply sender's entries (only contains sender_id's logs)
200        state.apply_pushed_entries(msg)?;
201        // Step 2: Use sender_ack to determine what to send back
202        // Response can include logs from ALL nodes
203        let (entries, is_snapshot) = match state.get_peer_missing_logs(&peer_progress) {
204            Some(entries) => {
205                info!(
206                    "Returning {} incremental log entries to node {peer_id}",
207                    entries.len(),
208                );
209                (entries, false)
210            }
211            None => {
212                let entries = state.kv_to_log_entries();
213                info!(
214                    "Returning snapshot ({} entries) to node {peer_id}",
215                    entries.len(),
216                );
217                (entries, true)
218            }
219        };
220
221        // Step 3: Include our progress so sender can update their peer_ack for us
222        let my_progress = state.get_local_ack();
223
224        // Step 4: Update our peer_ack assuming the peer will accept our logs. If the peer
225        //         doesn't accept our logs, it will be updated in the next sync.
226        let my_id = state.id;
227        let peer_ack = *my_progress.get(&my_id).unwrap_or(&0);
228        let _ = state.update_peer_ack(peer_id, peer_ack);
229
230        Ok(SyncResponse {
231            peer_id: my_id,
232            progress: my_progress,
233            entries,
234            is_snapshot,
235        })
236    }
237
238    /// Perform log exchange with all peers (bidirectional)
239    ///
240    /// Returns Vec of (peer_id, Result<()>)
241    async fn sync_to_all_peers(&self) -> Vec<(NodeId, Result<()>)> {
242        let peers = self.store.read().get_peers();
243
244        if peers.is_empty() {
245            info!("No peers to bootstrap from, starting fresh");
246            return vec![];
247        }
248
249        info!("Syncing with {} peers...", peers.len());
250
251        // Sync from all peers in parallel
252        let sync_futures: Vec<_> = peers
253            .iter()
254            .map(|&peer| async move { (peer, self.sync_to(peer).await) })
255            .collect();
256
257        join_all(sync_futures).await
258    }
259
260    /// Perform log exchange with a peer (bidirectional)
261    ///
262    /// Protocol:
263    /// - Send only OUR node's logs (entries from store.id)
264    /// - Include our sender_ack (progress on all nodes)
265    /// - Peer responds with logs from ALL nodes they have
266    #[tracing::instrument(skip(self))]
267    async fn sync_to(&self, peer: NodeId) -> Result<()> {
268        let timeout = self.config.timeout;
269        // Prepare our local_ack to send (tells peer what we've synced)
270        let (sender_id, sender_ack, entries) = {
271            let state = self.store.read();
272            // Get only OUR node's log entries that peer hasn't ack'd yet
273            let peer_ack_for_us = state.get_peer_state(peer).map_or(0, |p| p.peer_ack);
274            let sender_id = state.id;
275            // Collect our entries with seq > peer_ack
276            let entries = state
277                .get_peer_logs_since(sender_id, peer_ack_for_us)
278                .unwrap_or_default();
279            let sender_ack = state.get_local_ack();
280            (sender_id, sender_ack, entries)
281        };
282
283        info!("Sending {} log entries to peer {peer}", entries.len());
284        // Send bidirectional sync message
285        let msg = SyncMessage {
286            sender_id,
287            sender_uuid: self.app.uuid(),
288            sender_ack,
289            entries,
290        };
291
292        let result = tokio::time::timeout(timeout, self.app.sync_to(&self.store, peer, msg))
293            .await
294            .map_err(|_| anyhow::anyhow!("sync request timed out after {:?}", timeout))?;
295
296        match result {
297            Ok(response) => self.store.write().apply_pulled_entries(response),
298            Err(e) => {
299                warn!("Log exchange with peer {peer} failed: {e}");
300                Err(e)
301            }
302        }
303    }
304}