1use crate::delta::DeltaBatch;
30use crate::serde_ga3;
31use crate::VectorClock;
32use cliffy_core::GA3;
33use serde::{Deserialize, Serialize};
34use std::collections::HashMap;
35use std::time::{Duration, Instant};
36use uuid::Uuid;
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct SyncMessage {
41 pub id: u64,
43 pub sender: Uuid,
45 pub payload: SyncPayload,
47 pub clock: VectorClock,
49 pub timestamp: u64,
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
55pub enum SyncPayload {
56 Hello(PeerInfo),
58
59 ClockRequest,
61
62 ClockResponse(VectorClock),
64
65 DeltaRequest {
67 since_clock: VectorClock,
69 },
70
71 DeltaResponse {
73 deltas: DeltaBatch,
75 has_more: bool,
77 },
78
79 FullState {
81 #[serde(with = "serde_ga3")]
83 state: GA3,
84 clock: VectorClock,
86 },
87
88 Heartbeat,
90
91 Ack {
93 message_id: u64,
95 applied_clock: VectorClock,
97 },
98
99 Goodbye,
101}
102
103#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct PeerInfo {
106 pub node_id: Uuid,
108 pub name: Option<String>,
110 pub capabilities: PeerCapabilities,
112 pub protocol_version: u32,
114}
115
116#[derive(Debug, Clone, Default, Serialize, Deserialize)]
118pub struct PeerCapabilities {
119 pub compressed_deltas: bool,
121 pub batch_operations: bool,
123 pub max_batch_size: usize,
125 pub full_state_sync: bool,
127}
128
129impl PeerCapabilities {
130 pub fn default_capabilities() -> Self {
132 Self {
133 compressed_deltas: true,
134 batch_operations: true,
135 max_batch_size: 100,
136 full_state_sync: true,
137 }
138 }
139}
140
141#[derive(Debug, Clone, Copy, PartialEq, Eq)]
143pub enum PeerConnectionState {
144 Discovered,
146 Syncing,
148 Synced,
150 Disconnected,
152 Gone,
154}
155
156#[derive(Debug, Clone)]
158pub struct PeerState {
159 pub info: PeerInfo,
161 pub last_clock: VectorClock,
163 pub connection_state: PeerConnectionState,
165 pub last_seen: Option<Instant>,
167 pub pending_acks: HashMap<u64, Instant>,
169 pub rtt_estimate: Option<Duration>,
171}
172
173impl PeerState {
174 pub fn new(info: PeerInfo, clock: VectorClock) -> Self {
176 Self {
177 info,
178 last_clock: clock,
179 connection_state: PeerConnectionState::Discovered,
180 last_seen: None,
181 pending_acks: HashMap::new(),
182 rtt_estimate: None,
183 }
184 }
185
186 pub fn touch(&mut self) {
188 self.last_seen = Some(Instant::now());
189 }
190
191 pub fn is_stale(&self, timeout: Duration) -> bool {
193 match self.last_seen {
194 Some(last) => last.elapsed() > timeout,
195 None => true,
196 }
197 }
198
199 pub fn expect_ack(&mut self, message_id: u64) {
201 self.pending_acks.insert(message_id, Instant::now());
202 }
203
204 pub fn receive_ack(&mut self, message_id: u64) {
206 if let Some(sent_time) = self.pending_acks.remove(&message_id) {
207 let rtt = sent_time.elapsed();
208 self.rtt_estimate = Some(match self.rtt_estimate {
209 Some(prev) => Duration::from_millis(
210 (prev.as_millis() as f64 * 0.8 + rtt.as_millis() as f64 * 0.2) as u64,
211 ),
212 None => rtt,
213 });
214 }
215 }
216}
217
218#[derive(Debug)]
220pub struct SyncState {
221 pub node_id: Uuid,
223 pub clock: VectorClock,
225 pub peers: HashMap<Uuid, PeerState>,
227 next_message_id: u64,
229 pub config: SyncConfig,
231}
232
233#[derive(Debug, Clone)]
235pub struct SyncConfig {
236 pub heartbeat_interval: Duration,
238 pub peer_timeout: Duration,
240 pub max_batch_size: usize,
242 pub prefer_compressed: bool,
244 pub protocol_version: u32,
246}
247
248impl Default for SyncConfig {
249 fn default() -> Self {
250 Self {
251 heartbeat_interval: Duration::from_secs(5),
252 peer_timeout: Duration::from_secs(30),
253 max_batch_size: 100,
254 prefer_compressed: true,
255 protocol_version: 1,
256 }
257 }
258}
259
260impl SyncState {
261 pub fn new(node_id: Uuid) -> Self {
263 Self {
264 node_id,
265 clock: VectorClock::new(),
266 peers: HashMap::new(),
267 next_message_id: 0,
268 config: SyncConfig::default(),
269 }
270 }
271
272 pub fn with_config(node_id: Uuid, config: SyncConfig) -> Self {
274 Self {
275 node_id,
276 clock: VectorClock::new(),
277 peers: HashMap::new(),
278 next_message_id: 0,
279 config,
280 }
281 }
282
283 pub fn register_peer(&mut self, peer_id: Uuid, clock: VectorClock) {
285 let info = PeerInfo {
286 node_id: peer_id,
287 name: None,
288 capabilities: PeerCapabilities::default_capabilities(),
289 protocol_version: self.config.protocol_version,
290 };
291 self.peers.insert(peer_id, PeerState::new(info, clock));
292 }
293
294 pub fn register_peer_with_info(&mut self, info: PeerInfo, clock: VectorClock) {
296 let peer_id = info.node_id;
297 self.peers.insert(peer_id, PeerState::new(info, clock));
298 }
299
300 pub fn remove_peer(&mut self, peer_id: &Uuid) {
302 self.peers.remove(peer_id);
303 }
304
305 pub fn get_peer(&self, peer_id: &Uuid) -> Option<&PeerState> {
307 self.peers.get(peer_id)
308 }
309
310 pub fn get_peer_mut(&mut self, peer_id: &Uuid) -> Option<&mut PeerState> {
312 self.peers.get_mut(peer_id)
313 }
314
315 pub fn tick(&mut self) -> u64 {
317 self.clock.tick(self.node_id);
318 let id = self.next_message_id;
319 self.next_message_id += 1;
320 id
321 }
322
323 pub fn create_hello(&mut self, name: Option<String>) -> SyncMessage {
325 let id = self.tick();
326 SyncMessage {
327 id,
328 sender: self.node_id,
329 payload: SyncPayload::Hello(PeerInfo {
330 node_id: self.node_id,
331 name,
332 capabilities: PeerCapabilities::default_capabilities(),
333 protocol_version: self.config.protocol_version,
334 }),
335 clock: self.clock.clone(),
336 timestamp: current_timestamp_ms(),
337 }
338 }
339
340 pub fn create_delta_request(&mut self, since_clock: VectorClock) -> SyncMessage {
342 let id = self.tick();
343 SyncMessage {
344 id,
345 sender: self.node_id,
346 payload: SyncPayload::DeltaRequest { since_clock },
347 clock: self.clock.clone(),
348 timestamp: current_timestamp_ms(),
349 }
350 }
351
352 pub fn create_delta_response(&mut self, deltas: DeltaBatch, has_more: bool) -> SyncMessage {
354 let id = self.tick();
355 SyncMessage {
356 id,
357 sender: self.node_id,
358 payload: SyncPayload::DeltaResponse { deltas, has_more },
359 clock: self.clock.clone(),
360 timestamp: current_timestamp_ms(),
361 }
362 }
363
364 pub fn create_full_state(&mut self, state: GA3) -> SyncMessage {
366 let id = self.tick();
367 SyncMessage {
368 id,
369 sender: self.node_id,
370 payload: SyncPayload::FullState {
371 state,
372 clock: self.clock.clone(),
373 },
374 clock: self.clock.clone(),
375 timestamp: current_timestamp_ms(),
376 }
377 }
378
379 pub fn create_heartbeat(&mut self) -> SyncMessage {
381 let id = self.tick();
382 SyncMessage {
383 id,
384 sender: self.node_id,
385 payload: SyncPayload::Heartbeat,
386 clock: self.clock.clone(),
387 timestamp: current_timestamp_ms(),
388 }
389 }
390
391 pub fn create_ack(&mut self, message_id: u64) -> SyncMessage {
393 let id = self.tick();
394 SyncMessage {
395 id,
396 sender: self.node_id,
397 payload: SyncPayload::Ack {
398 message_id,
399 applied_clock: self.clock.clone(),
400 },
401 clock: self.clock.clone(),
402 timestamp: current_timestamp_ms(),
403 }
404 }
405
406 pub fn create_goodbye(&mut self) -> SyncMessage {
408 let id = self.tick();
409 SyncMessage {
410 id,
411 sender: self.node_id,
412 payload: SyncPayload::Goodbye,
413 clock: self.clock.clone(),
414 timestamp: current_timestamp_ms(),
415 }
416 }
417
418 pub fn handle_message(&mut self, message: &SyncMessage) -> Option<SyncMessage> {
420 if let Some(peer) = self.peers.get_mut(&message.sender) {
422 peer.touch();
423 peer.last_clock = message.clock.clone();
424 }
425
426 self.clock.update(&message.clock);
428
429 match &message.payload {
430 SyncPayload::Hello(info) => {
431 self.register_peer_with_info(info.clone(), message.clock.clone());
432 Some(self.create_hello(None))
433 }
434 SyncPayload::ClockRequest => Some(SyncMessage {
435 id: self.tick(),
436 sender: self.node_id,
437 payload: SyncPayload::ClockResponse(self.clock.clone()),
438 clock: self.clock.clone(),
439 timestamp: current_timestamp_ms(),
440 }),
441 SyncPayload::Heartbeat => {
442 None
444 }
445 SyncPayload::Ack {
446 message_id,
447 applied_clock: _,
448 } => {
449 if let Some(peer) = self.peers.get_mut(&message.sender) {
450 peer.receive_ack(*message_id);
451 }
452 None
453 }
454 SyncPayload::Goodbye => {
455 if let Some(peer) = self.peers.get_mut(&message.sender) {
456 peer.connection_state = PeerConnectionState::Gone;
457 }
458 None
459 }
460 _ => None,
462 }
463 }
464
465 pub fn stale_peers(&self) -> Vec<Uuid> {
467 self.peers
468 .iter()
469 .filter(|(_, state)| state.is_stale(self.config.peer_timeout))
470 .map(|(id, _)| *id)
471 .collect()
472 }
473
474 pub fn peers_needing_heartbeat(&self) -> Vec<Uuid> {
476 self.peers
477 .iter()
478 .filter(|(_, state)| {
479 matches!(
480 state.connection_state,
481 PeerConnectionState::Synced | PeerConnectionState::Syncing
482 ) && state
483 .last_seen
484 .map(|t| t.elapsed() > self.config.heartbeat_interval / 2)
485 .unwrap_or(true)
486 })
487 .map(|(id, _)| *id)
488 .collect()
489 }
490}
491
492fn current_timestamp_ms() -> u64 {
494 std::time::SystemTime::now()
495 .duration_since(std::time::UNIX_EPOCH)
496 .map(|d| d.as_millis() as u64)
497 .unwrap_or(0)
498}
499
500#[cfg(test)]
501mod tests {
502 use super::*;
503
504 #[test]
505 fn test_sync_state_creation() {
506 let node_id = Uuid::new_v4();
507 let state = SyncState::new(node_id);
508
509 assert_eq!(state.node_id, node_id);
510 assert!(state.peers.is_empty());
511 }
512
513 #[test]
514 fn test_peer_registration() {
515 let node_id = Uuid::new_v4();
516 let mut state = SyncState::new(node_id);
517
518 let peer_id = Uuid::new_v4();
519 state.register_peer(peer_id, VectorClock::new());
520
521 assert!(state.get_peer(&peer_id).is_some());
522 assert_eq!(state.peers.len(), 1);
523 }
524
525 #[test]
526 fn test_create_hello_message() {
527 let node_id = Uuid::new_v4();
528 let mut state = SyncState::new(node_id);
529
530 let msg = state.create_hello(Some("Test Node".to_string()));
531
532 assert_eq!(msg.sender, node_id);
533 assert!(matches!(msg.payload, SyncPayload::Hello(_)));
534
535 if let SyncPayload::Hello(info) = msg.payload {
536 assert_eq!(info.name, Some("Test Node".to_string()));
537 }
538 }
539
540 #[test]
541 fn test_handle_hello_message() {
542 let node1_id = Uuid::new_v4();
543 let node2_id = Uuid::new_v4();
544
545 let mut state1 = SyncState::new(node1_id);
546 let mut state2 = SyncState::new(node2_id);
547
548 let hello = state2.create_hello(Some("Node 2".to_string()));
550 let response = state1.handle_message(&hello);
551
552 assert!(state1.get_peer(&node2_id).is_some());
554
555 assert!(response.is_some());
557 if let Some(msg) = response {
558 assert!(matches!(msg.payload, SyncPayload::Hello(_)));
559 }
560 }
561
562 #[test]
563 fn test_clock_updates_on_message() {
564 let node1_id = Uuid::new_v4();
565 let node2_id = Uuid::new_v4();
566
567 let mut state1 = SyncState::new(node1_id);
568 let mut state2 = SyncState::new(node2_id);
569
570 state2.tick();
572 state2.tick();
573 state2.tick();
574
575 let msg = state2.create_heartbeat();
576 state1.handle_message(&msg);
577
578 assert!(!state1.clock.happens_before(&state2.clock));
580 }
581
582 #[test]
583 fn test_peer_capabilities() {
584 let caps = PeerCapabilities::default_capabilities();
585
586 assert!(caps.compressed_deltas);
587 assert!(caps.batch_operations);
588 assert_eq!(caps.max_batch_size, 100);
589 }
590
591 #[test]
592 fn test_goodbye_handling() {
593 let node1_id = Uuid::new_v4();
594 let node2_id = Uuid::new_v4();
595
596 let mut state1 = SyncState::new(node1_id);
597 let mut state2 = SyncState::new(node2_id);
598
599 state1.register_peer(node2_id, VectorClock::new());
601
602 let goodbye = state2.create_goodbye();
604 state1.handle_message(&goodbye);
605
606 let peer = state1.get_peer(&node2_id).unwrap();
608 assert_eq!(peer.connection_state, PeerConnectionState::Gone);
609 }
610
611 #[test]
612 fn test_delta_request_response() {
613 let node_id = Uuid::new_v4();
614 let mut state = SyncState::new(node_id);
615
616 let delta_req = state.create_delta_request(VectorClock::new());
617
618 assert!(matches!(
619 delta_req.payload,
620 SyncPayload::DeltaRequest { .. }
621 ));
622
623 let batch = DeltaBatch::new();
624 let delta_resp = state.create_delta_response(batch, false);
625
626 assert!(matches!(
627 delta_resp.payload,
628 SyncPayload::DeltaResponse {
629 has_more: false,
630 ..
631 }
632 ));
633 }
634
635 #[test]
636 fn test_ack_rtt_tracking() {
637 let node_id = Uuid::new_v4();
638 let mut state = SyncState::new(node_id);
639
640 let peer_id = Uuid::new_v4();
641 state.register_peer(peer_id, VectorClock::new());
642
643 let peer = state.get_peer_mut(&peer_id).unwrap();
645 peer.expect_ack(42);
646
647 std::thread::sleep(Duration::from_millis(10));
649 peer.receive_ack(42);
650
651 assert!(peer.rtt_estimate.is_some());
652 assert!(peer.pending_acks.is_empty());
653 }
654}