1use crate::node::Node;
2use crate::types::{Entry, NodeId};
3use anyhow::Result;
4use futures::future::join_all;
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::future::Future;
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::time::interval;
11use tracing::{info, warn};
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct SyncMessage {
16 pub sender_id: NodeId,
18 pub sender_uuid: Vec<u8>,
20 pub sender_ack: HashMap<NodeId, u64>,
22 pub entries: Vec<Entry>,
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct SyncResponse {
29 pub peer_id: NodeId,
30 pub entries: Vec<Entry>,
31 pub progress: HashMap<NodeId, u64>, pub is_snapshot: bool, }
34
35pub trait ExchangeInterface: Send + Sync + 'static {
36 fn uuid(&self) -> Vec<u8> {
37 Vec::new()
38 }
39 fn query_uuid(&self, _node_id: NodeId) -> Option<Vec<u8>> {
40 None
41 }
42 fn sync_to(
43 &self,
44 node: &Node,
45 peer: NodeId,
46 msg: SyncMessage,
47 ) -> impl Future<Output = Result<SyncResponse>> + Send;
48}
49
50#[derive(Debug, Clone)]
52pub struct SyncConfig {
53 pub interval: Duration,
55 pub timeout: Duration,
57}
58
59impl Default for SyncConfig {
60 fn default() -> Self {
61 Self {
62 interval: Duration::from_secs(30),
63 timeout: Duration::from_secs(10),
64 }
65 }
66}
67
68pub struct SyncManager<Net> {
70 store: Node,
71 app: Net,
72 config: SyncConfig,
73}
74
75impl<Net: ExchangeInterface + Clone> SyncManager<Net> {
76 pub fn new(store: Node, network: Net) -> Self {
77 Self::with_config(store, network, SyncConfig::default())
78 }
79
80 pub fn with_config(store: Node, network: Net, config: SyncConfig) -> Self {
81 Self {
82 store,
83 app: network,
84 config,
85 }
86 }
87
88 pub async fn bootstrap(&self) -> Result<()> {
91 let my_id = self.store.read().id;
92
93 let peers = self.store.read().get_peers();
94 let results = self.sync_to_all_peers().await;
95 let mut success_count = 0;
96 for (peer, result) in results {
97 match result {
98 Ok(_) => {
99 success_count += 1;
100 info!("Successfully bootstrapped from peer {peer}");
101 }
102 Err(err) => {
103 warn!("Failed to bootstrap from peer {peer}: {err:?}");
104 }
105 }
106 }
107
108 let mut max_seq_found = 0u64;
109
110 let store = self.store.read();
112 for peer_state in store.get_all_peer_states().values() {
113 for entry in &peer_state.log {
114 if entry.meta.node == my_id && entry.meta.seq > max_seq_found {
115 max_seq_found = entry.meta.seq;
116 }
117 }
118 }
119
120 for (_, entry) in store.iter_all_including_tombstones() {
122 if entry.meta.node == my_id && entry.meta.seq > max_seq_found {
123 max_seq_found = entry.meta.seq;
124 }
125 }
126 drop(store);
127
128 if max_seq_found > 0 {
130 let new_next_seq = max_seq_found + 1;
131 self.store.write().ensure_next_seq(new_next_seq);
132 }
133
134 if success_count == 0 && !peers.is_empty() {
135 warn!("Bootstrap: Failed to sync from any peer, proceeding anyway");
136 } else {
137 info!(
138 "Bootstrap: Successfully synced from {}/{} peers",
139 success_count,
140 peers.len()
141 );
142 let status = self.store.read().status();
143 info!("Node status after bootstrap: {:#?}", status);
144 }
145
146 Ok(())
147 }
148
149 pub async fn start_sync_tasks(self: Arc<Self>) {
151 let sync_manager = self.clone();
152 tokio::spawn(async move {
153 sync_manager.periodic_log_exchange().await;
154 });
155 }
156
157 async fn periodic_log_exchange(&self) {
159 let mut ticker = interval(self.config.interval);
160
161 loop {
162 ticker.tick().await;
163
164 let results = self.sync_to_all_peers().await;
165 for (peer, result) in results {
166 match result {
167 Ok(_) => {
168 info!("Successfully synced with peer {peer}");
169 }
170 Err(e) => {
171 warn!("Failed to sync with peer {peer}: {e:?}");
172 }
173 }
174 }
175 }
176 }
177
178 #[tracing::instrument(skip(self, msg), fields(from = msg.sender_id))]
185 pub fn handle_sync(&self, msg: SyncMessage) -> Result<SyncResponse> {
186 let peer_progress = msg.sender_ack.clone();
187 let peer_id = msg.sender_id;
188 if let Some(expected_uuid) = self.app.query_uuid(peer_id) {
189 if expected_uuid != msg.sender_uuid {
190 warn!(
191 "UUID mismatch for peer {peer_id}: expected {:?}, got {:?}",
192 hex::encode(expected_uuid),
193 hex::encode(msg.sender_uuid)
194 );
195 anyhow::bail!("UUID mismatch for peer {peer_id}. Don't reuse node IDs for peers.");
196 }
197 }
198 let mut state = self.store.write();
199 state.apply_pushed_entries(msg)?;
201 let (entries, is_snapshot) = match state.get_peer_missing_logs(&peer_progress) {
204 Some(entries) => {
205 info!(
206 "Returning {} incremental log entries to node {peer_id}",
207 entries.len(),
208 );
209 (entries, false)
210 }
211 None => {
212 let entries = state.kv_to_log_entries();
213 info!(
214 "Returning snapshot ({} entries) to node {peer_id}",
215 entries.len(),
216 );
217 (entries, true)
218 }
219 };
220
221 let my_progress = state.get_local_ack();
223
224 let my_id = state.id;
227 let peer_ack = *my_progress.get(&my_id).unwrap_or(&0);
228 let _ = state.update_peer_ack(peer_id, peer_ack);
229
230 Ok(SyncResponse {
231 peer_id: my_id,
232 progress: my_progress,
233 entries,
234 is_snapshot,
235 })
236 }
237
238 async fn sync_to_all_peers(&self) -> Vec<(NodeId, Result<()>)> {
242 let peers = self.store.read().get_peers();
243
244 if peers.is_empty() {
245 info!("No peers to bootstrap from, starting fresh");
246 return vec![];
247 }
248
249 info!("Syncing with {} peers...", peers.len());
250
251 let sync_futures: Vec<_> = peers
253 .iter()
254 .map(|&peer| async move { (peer, self.sync_to(peer).await) })
255 .collect();
256
257 join_all(sync_futures).await
258 }
259
260 #[tracing::instrument(skip(self))]
267 async fn sync_to(&self, peer: NodeId) -> Result<()> {
268 let timeout = self.config.timeout;
269 let (sender_id, sender_ack, entries) = {
271 let state = self.store.read();
272 let peer_ack_for_us = state.get_peer_state(peer).map_or(0, |p| p.peer_ack);
274 let sender_id = state.id;
275 let entries = state
277 .get_peer_logs_since(sender_id, peer_ack_for_us)
278 .unwrap_or_default();
279 let sender_ack = state.get_local_ack();
280 (sender_id, sender_ack, entries)
281 };
282
283 info!("Sending {} log entries to peer {peer}", entries.len());
284 let msg = SyncMessage {
286 sender_id,
287 sender_uuid: self.app.uuid(),
288 sender_ack,
289 entries,
290 };
291
292 let result = tokio::time::timeout(timeout, self.app.sync_to(&self.store, peer, msg))
293 .await
294 .map_err(|_| anyhow::anyhow!("sync request timed out after {:?}", timeout))?;
295
296 match result {
297 Ok(response) => self.store.write().apply_pulled_entries(response),
298 Err(e) => {
299 warn!("Log exchange with peer {peer} failed: {e}");
300 Err(e)
301 }
302 }
303 }
304}