1use std::time::Duration;
37
38use noxu_util::{NULL_VLSN, Vlsn};
39
40use crate::error::{RepError, Result};
41use crate::net::channel::Channel;
42use crate::stream::syncup::{
43 Matchpoint, SyncupView, VlsnEntry, find_matchpoint,
44};
45
46pub const SYNCUP_SERVICE_NAME: &str = "REP_SYNCUP";
48
49const SYNCUP_TIMEOUT: Duration = Duration::from_secs(30);
51
52#[derive(Debug, Clone, PartialEq, Eq)]
60pub enum SyncupMsg {
61 EntryRequest { vlsn: Vlsn },
66 Entry { vlsn: Vlsn, lsn: u64, fingerprint: u64, is_sync: bool },
68 EntryNotFound,
71 AlternateMatchpoint {
74 vlsn: Vlsn,
75 lsn: u64,
76 fingerprint: u64,
77 is_sync: bool,
78 },
79 StartStream { start_vlsn: Vlsn },
82 RestoreRequest { failed_vlsn: Vlsn },
85 RestoreResponse,
88}
89
90const OP_ENTRY_REQUEST: u8 = 1;
92const OP_ENTRY: u8 = 2;
93const OP_ENTRY_NOT_FOUND: u8 = 3;
94const OP_ALT_MATCHPOINT: u8 = 4;
95const OP_START_STREAM: u8 = 5;
96const OP_RESTORE_REQUEST: u8 = 6;
97const OP_RESTORE_RESPONSE: u8 = 7;
98
99impl SyncupMsg {
100 pub fn encode(&self) -> Vec<u8> {
102 let mut b = Vec::with_capacity(26);
103 match self {
104 SyncupMsg::EntryRequest { vlsn } => {
105 b.push(OP_ENTRY_REQUEST);
106 b.extend_from_slice(&vlsn.sequence().to_le_bytes());
107 }
108 SyncupMsg::Entry { vlsn, lsn, fingerprint, is_sync } => {
109 b.push(OP_ENTRY);
110 encode_record(&mut b, *vlsn, *lsn, *fingerprint, *is_sync);
111 }
112 SyncupMsg::EntryNotFound => b.push(OP_ENTRY_NOT_FOUND),
113 SyncupMsg::AlternateMatchpoint {
114 vlsn,
115 lsn,
116 fingerprint,
117 is_sync,
118 } => {
119 b.push(OP_ALT_MATCHPOINT);
120 encode_record(&mut b, *vlsn, *lsn, *fingerprint, *is_sync);
121 }
122 SyncupMsg::StartStream { start_vlsn } => {
123 b.push(OP_START_STREAM);
124 b.extend_from_slice(&start_vlsn.sequence().to_le_bytes());
125 }
126 SyncupMsg::RestoreRequest { failed_vlsn } => {
127 b.push(OP_RESTORE_REQUEST);
128 b.extend_from_slice(&failed_vlsn.sequence().to_le_bytes());
129 }
130 SyncupMsg::RestoreResponse => b.push(OP_RESTORE_RESPONSE),
131 }
132 b
133 }
134
135 pub fn decode(buf: &[u8]) -> Result<Self> {
137 if buf.is_empty() {
138 return Err(RepError::ProtocolError(
139 "syncup: empty message".into(),
140 ));
141 }
142 let op = buf[0];
143 let body = &buf[1..];
144 match op {
145 OP_ENTRY_REQUEST => {
146 Ok(SyncupMsg::EntryRequest { vlsn: read_vlsn(body)? })
147 }
148 OP_ENTRY => {
149 let (vlsn, lsn, fingerprint, is_sync) = decode_record(body)?;
150 Ok(SyncupMsg::Entry { vlsn, lsn, fingerprint, is_sync })
151 }
152 OP_ENTRY_NOT_FOUND => Ok(SyncupMsg::EntryNotFound),
153 OP_ALT_MATCHPOINT => {
154 let (vlsn, lsn, fingerprint, is_sync) = decode_record(body)?;
155 Ok(SyncupMsg::AlternateMatchpoint {
156 vlsn,
157 lsn,
158 fingerprint,
159 is_sync,
160 })
161 }
162 OP_START_STREAM => {
163 Ok(SyncupMsg::StartStream { start_vlsn: read_vlsn(body)? })
164 }
165 OP_RESTORE_REQUEST => {
166 Ok(SyncupMsg::RestoreRequest { failed_vlsn: read_vlsn(body)? })
167 }
168 OP_RESTORE_RESPONSE => Ok(SyncupMsg::RestoreResponse),
169 other => Err(RepError::ProtocolError(format!(
170 "syncup: unknown opcode {other}"
171 ))),
172 }
173 }
174}
175
176fn encode_record(
177 b: &mut Vec<u8>,
178 vlsn: Vlsn,
179 lsn: u64,
180 fingerprint: u64,
181 is_sync: bool,
182) {
183 b.extend_from_slice(&vlsn.sequence().to_le_bytes());
184 b.extend_from_slice(&lsn.to_le_bytes());
185 b.extend_from_slice(&fingerprint.to_le_bytes());
186 b.push(is_sync as u8);
187}
188
189fn decode_record(body: &[u8]) -> Result<(Vlsn, u64, u64, bool)> {
190 if body.len() < 8 + 8 + 8 + 1 {
191 return Err(RepError::ProtocolError(format!(
192 "syncup: short record body ({} bytes)",
193 body.len()
194 )));
195 }
196 let seq = i64::from_le_bytes(body[0..8].try_into().unwrap());
197 let lsn = u64::from_le_bytes(body[8..16].try_into().unwrap());
198 let fingerprint = u64::from_le_bytes(body[16..24].try_into().unwrap());
199 let is_sync = body[24] != 0;
200 Ok((Vlsn::new(seq), lsn, fingerprint, is_sync))
201}
202
203fn read_vlsn(body: &[u8]) -> Result<Vlsn> {
204 if body.len() < 8 {
205 return Err(RepError::ProtocolError(format!(
206 "syncup: short vlsn body ({} bytes)",
207 body.len()
208 )));
209 }
210 Ok(Vlsn::new(i64::from_le_bytes(body[0..8].try_into().unwrap())))
211}
212
213#[derive(Debug, Clone, PartialEq, Eq)]
219pub enum SyncupOutcome {
220 Matchpoint { matchpoint_vlsn: Vlsn, matchpoint_lsn: u64, start_vlsn: Vlsn },
224 NeedsRestore { failed_vlsn: Vlsn },
226}
227
228pub fn replica_syncup_handshake(
244 channel: &dyn Channel,
245 replica: &dyn SyncupView,
246) -> Result<SyncupOutcome> {
247 let mut candidate = replica.last_sync();
249 let first = replica.first_vlsn();
250
251 if candidate.is_null() {
252 send(channel, &SyncupMsg::EntryRequest { vlsn: Vlsn::new(1) })?;
255 return match recv(channel)? {
256 SyncupMsg::Entry { vlsn, .. } => Ok(SyncupOutcome::Matchpoint {
257 matchpoint_vlsn: NULL_VLSN,
258 matchpoint_lsn: 0,
259 start_vlsn: vlsn, }),
261 _ => fall_back_to_restore(channel, Vlsn::new(1)),
262 };
263 }
264
265 let mut first_exchange = true;
267
268 loop {
269 let replica_entry = match replica.entry(candidate) {
271 Some(e) => e,
272 None => return fall_back_to_restore(channel, candidate),
273 };
274
275 send(channel, &SyncupMsg::EntryRequest { vlsn: candidate })?;
276 match recv(channel)? {
277 SyncupMsg::Entry { vlsn, lsn, fingerprint, .. } => {
278 if vlsn == candidate
279 && lsn == replica_entry.lsn
280 && fingerprint == replica_entry.fingerprint
281 {
282 return converge(channel, candidate, replica_entry.lsn);
284 }
285 }
287 SyncupMsg::AlternateMatchpoint { vlsn, .. } if first_exchange => {
288 if vlsn < first {
291 return fall_back_to_restore(channel, vlsn);
292 }
293 candidate = vlsn;
294 first_exchange = false;
295 continue; }
297 SyncupMsg::EntryNotFound => {
298 return fall_back_to_restore(channel, candidate);
299 }
300 other => {
301 return Err(RepError::ProtocolError(format!(
302 "syncup replica: unexpected response {other:?}"
303 )));
304 }
305 }
306 first_exchange = false;
307
308 match prev_sync(replica, candidate, first) {
310 Some(prev) => candidate = prev,
311 None => return fall_back_to_restore(channel, candidate),
312 }
313 }
314}
315
316fn converge(
318 channel: &dyn Channel,
319 matchpoint_vlsn: Vlsn,
320 matchpoint_lsn: u64,
321) -> Result<SyncupOutcome> {
322 let start_vlsn = matchpoint_vlsn.next();
323 send(channel, &SyncupMsg::StartStream { start_vlsn })?;
324 Ok(SyncupOutcome::Matchpoint {
325 matchpoint_vlsn,
326 matchpoint_lsn,
327 start_vlsn,
328 })
329}
330
331fn fall_back_to_restore(
334 channel: &dyn Channel,
335 failed_vlsn: Vlsn,
336) -> Result<SyncupOutcome> {
337 send(channel, &SyncupMsg::RestoreRequest { failed_vlsn })?;
338 let _ = recv(channel);
341 Ok(SyncupOutcome::NeedsRestore { failed_vlsn })
342}
343
344fn prev_sync(
345 replica: &dyn SyncupView,
346 from: Vlsn,
347 first: Vlsn,
348) -> Option<Vlsn> {
349 let mut v = from.prev();
350 while !v.is_null() && v >= first {
351 if let Some(e) = replica.entry(v)
352 && e.is_sync
353 {
354 return Some(v);
355 }
356 v = v.prev();
357 }
358 None
359}
360
361pub fn feeder_syncup_handshake(
377 channel: &dyn Channel,
378 feeder: &dyn SyncupView,
379) -> Result<Option<Vlsn>> {
380 let mut first_response = true;
381 loop {
382 let msg = recv(channel)?;
383 match msg {
384 SyncupMsg::EntryRequest { vlsn } => {
385 let response =
386 make_entry_response(feeder, vlsn, first_response);
387 first_response = false;
388 send(channel, &response)?;
389 }
390 SyncupMsg::StartStream { start_vlsn } => {
391 return Ok(Some(start_vlsn));
392 }
393 SyncupMsg::RestoreRequest { .. } => {
394 send(channel, &SyncupMsg::RestoreResponse)?;
395 return Ok(None);
396 }
397 other => {
398 return Err(RepError::ProtocolError(format!(
399 "syncup feeder: unexpected request {other:?}"
400 )));
401 }
402 }
403 }
404}
405
406fn make_entry_response(
409 feeder: &dyn SyncupView,
410 request_vlsn: Vlsn,
411 is_first_response: bool,
412) -> SyncupMsg {
413 let first = feeder.first_vlsn();
414 let last_sync = feeder.last_sync();
415
416 if !first.is_null() && request_vlsn < first {
418 return SyncupMsg::EntryNotFound;
419 }
420
421 if let Some(e) = feeder.entry(request_vlsn) {
423 return SyncupMsg::Entry {
424 vlsn: request_vlsn,
425 lsn: e.lsn,
426 fingerprint: e.fingerprint,
427 is_sync: e.is_sync,
428 };
429 }
430
431 if is_first_response
434 && !last_sync.is_null()
435 && let Some(e) = feeder.entry(last_sync)
436 {
437 return SyncupMsg::AlternateMatchpoint {
438 vlsn: last_sync,
439 lsn: e.lsn,
440 fingerprint: e.fingerprint,
441 is_sync: e.is_sync,
442 };
443 }
444
445 SyncupMsg::EntryNotFound
446}
447
448pub fn local_matchpoint(
458 replica: &dyn SyncupView,
459 feeder: &dyn SyncupView,
460) -> Matchpoint {
461 find_matchpoint(replica, feeder)
462}
463
464fn send(channel: &dyn Channel, msg: &SyncupMsg) -> Result<()> {
469 channel.send(&msg.encode())
470}
471
472fn recv(channel: &dyn Channel) -> Result<SyncupMsg> {
473 let frame = channel.receive(SYNCUP_TIMEOUT)?.ok_or_else(|| {
474 RepError::NetworkError("syncup: no message received".into())
475 })?;
476 SyncupMsg::decode(&frame)
477}
478
479pub fn vlsn_entry(lsn: u64, fingerprint: u64, is_sync: bool) -> VlsnEntry {
481 VlsnEntry { lsn, fingerprint, is_sync }
482}
483
484#[cfg(test)]
485mod tests {
486 use super::*;
487 use crate::net::channel::LocalChannelPair;
488 use std::collections::HashMap;
489 use std::sync::Arc;
490
491 struct MapView {
492 last_sync: Vlsn,
493 last_txn_end: Vlsn,
494 first: Vlsn,
495 entries: HashMap<i64, VlsnEntry>,
496 }
497 impl MapView {
498 fn new(first: i64, last_sync: i64, last_txn_end: i64) -> Self {
499 Self {
500 last_sync: Vlsn::new(last_sync),
501 last_txn_end: Vlsn::new(last_txn_end),
502 first: Vlsn::new(first),
503 entries: HashMap::new(),
504 }
505 }
506 fn put(mut self, v: i64, lsn: u64, fp: u64, sync: bool) -> Self {
507 self.entries
508 .insert(v, VlsnEntry { lsn, fingerprint: fp, is_sync: sync });
509 self
510 }
511 }
512 impl SyncupView for MapView {
513 fn last_sync(&self) -> Vlsn {
514 self.last_sync
515 }
516 fn last_txn_end(&self) -> Vlsn {
517 self.last_txn_end
518 }
519 fn first_vlsn(&self) -> Vlsn {
520 self.first
521 }
522 fn entry(&self, vlsn: Vlsn) -> Option<VlsnEntry> {
523 self.entries.get(&vlsn.sequence()).copied()
524 }
525 }
526
527 #[test]
528 fn test_msg_roundtrip() {
529 let msgs = vec![
530 SyncupMsg::EntryRequest { vlsn: Vlsn::new(7) },
531 SyncupMsg::Entry {
532 vlsn: Vlsn::new(7),
533 lsn: 0x1234,
534 fingerprint: 0xABCD,
535 is_sync: true,
536 },
537 SyncupMsg::EntryNotFound,
538 SyncupMsg::AlternateMatchpoint {
539 vlsn: Vlsn::new(5),
540 lsn: 0x500,
541 fingerprint: 0x55,
542 is_sync: false,
543 },
544 SyncupMsg::StartStream { start_vlsn: Vlsn::new(8) },
545 SyncupMsg::RestoreRequest { failed_vlsn: Vlsn::new(3) },
546 SyncupMsg::RestoreResponse,
547 ];
548 for m in msgs {
549 assert_eq!(SyncupMsg::decode(&m.encode()).unwrap(), m);
550 }
551 }
552
553 #[test]
556 fn test_handshake_diverged_converges() {
557 let pair = LocalChannelPair::new();
558 let replica_ch: Arc<dyn Channel> = Arc::new(pair.channel_a);
559 let feeder_ch: Arc<dyn Channel> = Arc::new(pair.channel_b);
560
561 let replica = MapView::new(1, 6, 6)
563 .put(6, 0x600, 0xDEAD, true)
564 .put(5, 0x500, 0x55, false)
565 .put(4, 0x400, 0x44, true);
566 let feeder = MapView::new(1, 8, 8)
568 .put(8, 0x800, 0x88, true)
569 .put(6, 0x600, 0xBEEF, true)
570 .put(4, 0x400, 0x44, true);
571
572 let feeder_handle = std::thread::spawn(move || {
573 feeder_syncup_handshake(feeder_ch.as_ref(), &feeder)
574 });
575
576 let outcome =
577 replica_syncup_handshake(replica_ch.as_ref(), &replica).unwrap();
578 assert_eq!(
579 outcome,
580 SyncupOutcome::Matchpoint {
581 matchpoint_vlsn: Vlsn::new(4),
582 matchpoint_lsn: 0x400,
583 start_vlsn: Vlsn::new(5),
584 }
585 );
586 let feeder_start = feeder_handle.join().unwrap().unwrap();
587 assert_eq!(feeder_start, Some(Vlsn::new(5)));
588 }
589
590 #[test]
593 fn test_handshake_alternate_matchpoint() {
594 let pair = LocalChannelPair::new();
595 let replica_ch: Arc<dyn Channel> = Arc::new(pair.channel_a);
596 let feeder_ch: Arc<dyn Channel> = Arc::new(pair.channel_b);
597
598 let mut replica = MapView::new(1, 10, 10);
600 for v in 1..=10 {
601 replica = replica.put(v, (v as u64) * 0x100, v as u64, true);
602 }
603 let mut feeder = MapView::new(1, 8, 8);
604 for v in 1..=8 {
605 feeder = feeder.put(v, (v as u64) * 0x100, v as u64, true);
606 }
607
608 let feeder_handle = std::thread::spawn(move || {
609 feeder_syncup_handshake(feeder_ch.as_ref(), &feeder)
610 });
611 let outcome =
612 replica_syncup_handshake(replica_ch.as_ref(), &replica).unwrap();
613 assert_eq!(
615 outcome,
616 SyncupOutcome::Matchpoint {
617 matchpoint_vlsn: Vlsn::new(8),
618 matchpoint_lsn: 0x800,
619 start_vlsn: Vlsn::new(9),
620 }
621 );
622 assert_eq!(feeder_handle.join().unwrap().unwrap(), Some(Vlsn::new(9)));
623 }
624
625 #[test]
627 fn test_handshake_no_matchpoint_restore() {
628 let pair = LocalChannelPair::new();
629 let replica_ch: Arc<dyn Channel> = Arc::new(pair.channel_a);
630 let feeder_ch: Arc<dyn Channel> = Arc::new(pair.channel_b);
631
632 let replica = MapView::new(4, 6, 6)
634 .put(6, 0x600, 0x11, true)
635 .put(5, 0x500, 0x22, true)
636 .put(4, 0x400, 0x33, true);
637 let feeder = MapView::new(1, 8, 8)
638 .put(8, 0x800, 0x88, true)
639 .put(6, 0x600, 0x99, true)
640 .put(5, 0x500, 0x88, true)
641 .put(4, 0x400, 0x77, true);
642
643 let feeder_handle = std::thread::spawn(move || {
644 feeder_syncup_handshake(feeder_ch.as_ref(), &feeder)
645 });
646 let outcome =
647 replica_syncup_handshake(replica_ch.as_ref(), &replica).unwrap();
648 assert!(matches!(outcome, SyncupOutcome::NeedsRestore { .. }));
649 assert_eq!(feeder_handle.join().unwrap().unwrap(), None);
650 }
651}