Skip to main content

noxu_rep/stream/
syncup_protocol.rs

1//! REP-1 STEP 5 (B): the syncup wire protocol (feeder<->replica matchpoint
2//! negotiation).
3//!
4//! Port of the syncup message exchange in `FeederReplicaSyncup.java` /
5//! `ReplicaFeederSyncup.java`, using the message set from
6//! `BaseProtocol.{EntryRequest, Entry, EntryNotFound, AlternateMatchpoint,
7//! StartStream, RestoreRequest, RestoreResponse}`.
8//!
9//! ## Handshake
10//!
11//! The replica drives a backward search over its own log (the
12//! [`crate::stream::syncup_reader::SyncupLogView`]) and asks the feeder, for
13//! each candidate matchpoint VLSN, "do you hold the same record at this VLSN?"
14//! (`EntryRequest`). The feeder answers with:
15//!   - [`SyncupMsg::Entry`]  — "yes, here is my record at that VLSN"
16//!     (JE `Entry`);
17//!   - [`SyncupMsg::EntryNotFound`] — "I do not hold that VLSN" (below my
18//!     range) (JE `EntryNotFound`); or
19//!   - [`SyncupMsg::AlternateMatchpoint`] — "that VLSN is above my range; here
20//!     is my highest sync point as a counter-offer" (JE `AlternateMatchpoint`,
21//!     only on the first exchange).
22//!
23//! When the records match (same LSN + fingerprint), the replica sends
24//! [`SyncupMsg::StartStream`] with `matchpoint+1` and the two converge.
25//! Otherwise the replica scans to its previous sync point and repeats. If the
26//! search walks past the replica's contiguous range, the replica sends
27//! [`SyncupMsg::RestoreRequest`] and falls back to network restore
28//! (`ReplicaFeederSyncup.setupLogRefresh`).
29//!
30//! This module is the *transport* of that negotiation. The matchpoint
31//! *decision* is [`crate::stream::syncup::find_matchpoint`] /
32//! [`crate::stream::syncup::verify_rollback`]; the rollback *execution* is
33//! REP-1 STEP 5 (C). The driver in [`crate::stream::syncup_reader`] and the
34//! wiring in `replicated_environment` glue the three together.
35
36use std::time::Duration;
37
38use noxu_util::{NULL_VLSN, Vlsn};
39
40use crate::error::{RepError, Result};
41use crate::net::channel::Channel;
42use crate::stream::syncup::{
43    Matchpoint, SyncupView, VlsnEntry, find_matchpoint,
44};
45
46/// Service name registered with the dispatcher for the syncup handshake.
47pub const SYNCUP_SERVICE_NAME: &str = "REP_SYNCUP";
48
49/// Timeout for a single syncup message round-trip.
50const SYNCUP_TIMEOUT: Duration = Duration::from_secs(30);
51
52// ---------------------------------------------------------------------------
53// Wire messages
54// ---------------------------------------------------------------------------
55
56/// One syncup-protocol message. Wire form: a 1-byte opcode followed by a
57/// fixed body (little-endian). Mirrors the `BaseProtocol` message classes used
58/// during syncup.
59#[derive(Debug, Clone, PartialEq, Eq)]
60pub enum SyncupMsg {
61    /// Replica → feeder: "do you have VLSN X (at LSN Y on my side)?"
62    /// (JE `BaseProtocol.EntryRequest`). The LSN/fingerprint are the replica's
63    /// own values so the feeder need not be asked to compute equality — the
64    /// replica compares the returned [`SyncupMsg::Entry`].
65    EntryRequest { vlsn: Vlsn },
66    /// Feeder → replica: "here is my record at that VLSN" (JE `Entry`).
67    Entry { vlsn: Vlsn, lsn: u64, fingerprint: u64, is_sync: bool },
68    /// Feeder → replica: "I do not hold that VLSN" (below my range)
69    /// (JE `EntryNotFound`).
70    EntryNotFound,
71    /// Feeder → replica: "that VLSN is above my range; here is my highest sync
72    /// point as a counter-offer" (JE `AlternateMatchpoint`).
73    AlternateMatchpoint {
74        vlsn: Vlsn,
75        lsn: u64,
76        fingerprint: u64,
77        is_sync: bool,
78    },
79    /// Replica → feeder: "matchpoint agreed; start streaming from this VLSN"
80    /// (JE `StartStream`).
81    StartStream { start_vlsn: Vlsn },
82    /// Replica → feeder: "no matchpoint; I need a network restore"
83    /// (JE `RestoreRequest`).
84    RestoreRequest { failed_vlsn: Vlsn },
85    /// Feeder → replica: acknowledgement of a restore request
86    /// (JE `RestoreResponse`).
87    RestoreResponse,
88}
89
90// Opcodes.
91const OP_ENTRY_REQUEST: u8 = 1;
92const OP_ENTRY: u8 = 2;
93const OP_ENTRY_NOT_FOUND: u8 = 3;
94const OP_ALT_MATCHPOINT: u8 = 4;
95const OP_START_STREAM: u8 = 5;
96const OP_RESTORE_REQUEST: u8 = 6;
97const OP_RESTORE_RESPONSE: u8 = 7;
98
99impl SyncupMsg {
100    /// Serialize to wire bytes.
101    pub fn encode(&self) -> Vec<u8> {
102        let mut b = Vec::with_capacity(26);
103        match self {
104            SyncupMsg::EntryRequest { vlsn } => {
105                b.push(OP_ENTRY_REQUEST);
106                b.extend_from_slice(&vlsn.sequence().to_le_bytes());
107            }
108            SyncupMsg::Entry { vlsn, lsn, fingerprint, is_sync } => {
109                b.push(OP_ENTRY);
110                encode_record(&mut b, *vlsn, *lsn, *fingerprint, *is_sync);
111            }
112            SyncupMsg::EntryNotFound => b.push(OP_ENTRY_NOT_FOUND),
113            SyncupMsg::AlternateMatchpoint {
114                vlsn,
115                lsn,
116                fingerprint,
117                is_sync,
118            } => {
119                b.push(OP_ALT_MATCHPOINT);
120                encode_record(&mut b, *vlsn, *lsn, *fingerprint, *is_sync);
121            }
122            SyncupMsg::StartStream { start_vlsn } => {
123                b.push(OP_START_STREAM);
124                b.extend_from_slice(&start_vlsn.sequence().to_le_bytes());
125            }
126            SyncupMsg::RestoreRequest { failed_vlsn } => {
127                b.push(OP_RESTORE_REQUEST);
128                b.extend_from_slice(&failed_vlsn.sequence().to_le_bytes());
129            }
130            SyncupMsg::RestoreResponse => b.push(OP_RESTORE_RESPONSE),
131        }
132        b
133    }
134
135    /// Deserialize from wire bytes.
136    pub fn decode(buf: &[u8]) -> Result<Self> {
137        if buf.is_empty() {
138            return Err(RepError::ProtocolError(
139                "syncup: empty message".into(),
140            ));
141        }
142        let op = buf[0];
143        let body = &buf[1..];
144        match op {
145            OP_ENTRY_REQUEST => {
146                Ok(SyncupMsg::EntryRequest { vlsn: read_vlsn(body)? })
147            }
148            OP_ENTRY => {
149                let (vlsn, lsn, fingerprint, is_sync) = decode_record(body)?;
150                Ok(SyncupMsg::Entry { vlsn, lsn, fingerprint, is_sync })
151            }
152            OP_ENTRY_NOT_FOUND => Ok(SyncupMsg::EntryNotFound),
153            OP_ALT_MATCHPOINT => {
154                let (vlsn, lsn, fingerprint, is_sync) = decode_record(body)?;
155                Ok(SyncupMsg::AlternateMatchpoint {
156                    vlsn,
157                    lsn,
158                    fingerprint,
159                    is_sync,
160                })
161            }
162            OP_START_STREAM => {
163                Ok(SyncupMsg::StartStream { start_vlsn: read_vlsn(body)? })
164            }
165            OP_RESTORE_REQUEST => {
166                Ok(SyncupMsg::RestoreRequest { failed_vlsn: read_vlsn(body)? })
167            }
168            OP_RESTORE_RESPONSE => Ok(SyncupMsg::RestoreResponse),
169            other => Err(RepError::ProtocolError(format!(
170                "syncup: unknown opcode {other}"
171            ))),
172        }
173    }
174}
175
176fn encode_record(
177    b: &mut Vec<u8>,
178    vlsn: Vlsn,
179    lsn: u64,
180    fingerprint: u64,
181    is_sync: bool,
182) {
183    b.extend_from_slice(&vlsn.sequence().to_le_bytes());
184    b.extend_from_slice(&lsn.to_le_bytes());
185    b.extend_from_slice(&fingerprint.to_le_bytes());
186    b.push(is_sync as u8);
187}
188
189fn decode_record(body: &[u8]) -> Result<(Vlsn, u64, u64, bool)> {
190    if body.len() < 8 + 8 + 8 + 1 {
191        return Err(RepError::ProtocolError(format!(
192            "syncup: short record body ({} bytes)",
193            body.len()
194        )));
195    }
196    let seq = i64::from_le_bytes(body[0..8].try_into().unwrap());
197    let lsn = u64::from_le_bytes(body[8..16].try_into().unwrap());
198    let fingerprint = u64::from_le_bytes(body[16..24].try_into().unwrap());
199    let is_sync = body[24] != 0;
200    Ok((Vlsn::new(seq), lsn, fingerprint, is_sync))
201}
202
203fn read_vlsn(body: &[u8]) -> Result<Vlsn> {
204    if body.len() < 8 {
205        return Err(RepError::ProtocolError(format!(
206            "syncup: short vlsn body ({} bytes)",
207            body.len()
208        )));
209    }
210    Ok(Vlsn::new(i64::from_le_bytes(body[0..8].try_into().unwrap())))
211}
212
213// ---------------------------------------------------------------------------
214// Outcome of the handshake
215// ---------------------------------------------------------------------------
216
217/// Result of the replica's side of the syncup handshake.
218#[derive(Debug, Clone, PartialEq, Eq)]
219pub enum SyncupOutcome {
220    /// A matchpoint was agreed at `matchpoint_vlsn` (LSN `matchpoint_lsn`);
221    /// the replica must roll back its tail to it (if any) and resume
222    /// streaming from `start_vlsn`.
223    Matchpoint { matchpoint_vlsn: Vlsn, matchpoint_lsn: u64, start_vlsn: Vlsn },
224    /// No common matchpoint; the replica must do a network restore.
225    NeedsRestore { failed_vlsn: Vlsn },
226}
227
228// ---------------------------------------------------------------------------
229// Replica side of the handshake
230// ---------------------------------------------------------------------------
231
232/// Run the replica's side of the syncup handshake over `channel`, driven by
233/// the replica's local log view.
234///
235/// Port of `ReplicaFeederSyncup.findMatchpoint`: propose `lastSync`, walk back
236/// over the replica's sync points on mismatch, and converge on the highest
237/// common matchpoint. The feeder may counter with an `AlternateMatchpoint`
238/// (when the replica's first proposal is above the feeder's range), which is
239/// honoured if it falls inside the replica's contiguous range.
240///
241/// Returns [`SyncupOutcome::Matchpoint`] with `start_vlsn = matchpoint+1` on
242/// success, or [`SyncupOutcome::NeedsRestore`] if no matchpoint exists.
243pub fn replica_syncup_handshake(
244    channel: &dyn Channel,
245    replica: &dyn SyncupView,
246) -> Result<SyncupOutcome> {
247    // First candidate is the replica's lastSync (JE range.getLastSync).
248    let mut candidate = replica.last_sync();
249    let first = replica.first_vlsn();
250
251    if candidate.is_null() {
252        // No sync-able entries: ask the feeder for VLSN 1 (JE FIRST_VLSN).
253        // If the feeder lacks it, network restore.
254        send(channel, &SyncupMsg::EntryRequest { vlsn: Vlsn::new(1) })?;
255        return match recv(channel)? {
256            SyncupMsg::Entry { vlsn, .. } => Ok(SyncupOutcome::Matchpoint {
257                matchpoint_vlsn: NULL_VLSN,
258                matchpoint_lsn: 0,
259                start_vlsn: vlsn, // start at VLSN 1
260            }),
261            _ => fall_back_to_restore(channel, Vlsn::new(1)),
262        };
263    }
264
265    // First exchange accepts an AlternateMatchpoint counter-offer.
266    let mut first_exchange = true;
267
268    loop {
269        // Look at our own record at the candidate.
270        let replica_entry = match replica.entry(candidate) {
271            Some(e) => e,
272            None => return fall_back_to_restore(channel, candidate),
273        };
274
275        send(channel, &SyncupMsg::EntryRequest { vlsn: candidate })?;
276        match recv(channel)? {
277            SyncupMsg::Entry { vlsn, lsn, fingerprint, .. } => {
278                if vlsn == candidate
279                    && lsn == replica_entry.lsn
280                    && fingerprint == replica_entry.fingerprint
281                {
282                    // Matchpoint found.
283                    return converge(channel, candidate, replica_entry.lsn);
284                }
285                // Feeder holds this VLSN but the record differs: scan back.
286            }
287            SyncupMsg::AlternateMatchpoint { vlsn, .. } if first_exchange => {
288                // Feeder counter-offers a lower matchpoint. Honour it only if
289                // it is inside our contiguous range (JE getFeederRecord).
290                if vlsn < first {
291                    return fall_back_to_restore(channel, vlsn);
292                }
293                candidate = vlsn;
294                first_exchange = false;
295                continue; // re-request at the counter-offer
296            }
297            SyncupMsg::EntryNotFound => {
298                return fall_back_to_restore(channel, candidate);
299            }
300            other => {
301                return Err(RepError::ProtocolError(format!(
302                    "syncup replica: unexpected response {other:?}"
303                )));
304            }
305        }
306        first_exchange = false;
307
308        // No match at this candidate: scan to our previous sync point.
309        match prev_sync(replica, candidate, first) {
310            Some(prev) => candidate = prev,
311            None => return fall_back_to_restore(channel, candidate),
312        }
313    }
314}
315
316/// Send StartStream and return a Matchpoint outcome.
317fn converge(
318    channel: &dyn Channel,
319    matchpoint_vlsn: Vlsn,
320    matchpoint_lsn: u64,
321) -> Result<SyncupOutcome> {
322    let start_vlsn = matchpoint_vlsn.next();
323    send(channel, &SyncupMsg::StartStream { start_vlsn })?;
324    Ok(SyncupOutcome::Matchpoint {
325        matchpoint_vlsn,
326        matchpoint_lsn,
327        start_vlsn,
328    })
329}
330
331/// Send RestoreRequest and return a NeedsRestore outcome (consuming the
332/// feeder's RestoreResponse if it sends one).
333fn fall_back_to_restore(
334    channel: &dyn Channel,
335    failed_vlsn: Vlsn,
336) -> Result<SyncupOutcome> {
337    send(channel, &SyncupMsg::RestoreRequest { failed_vlsn })?;
338    // Best-effort: the feeder replies RestoreResponse; ignore receive errors
339    // (the channel may already be closing).
340    let _ = recv(channel);
341    Ok(SyncupOutcome::NeedsRestore { failed_vlsn })
342}
343
344fn prev_sync(
345    replica: &dyn SyncupView,
346    from: Vlsn,
347    first: Vlsn,
348) -> Option<Vlsn> {
349    let mut v = from.prev();
350    while !v.is_null() && v >= first {
351        if let Some(e) = replica.entry(v)
352            && e.is_sync
353        {
354            return Some(v);
355        }
356        v = v.prev();
357    }
358    None
359}
360
361// ---------------------------------------------------------------------------
362// Feeder side of the handshake
363// ---------------------------------------------------------------------------
364
365/// Run the feeder's side of the syncup handshake over `channel`, answering the
366/// replica's `EntryRequest`s from the feeder's local log view.
367///
368/// Port of `FeederReplicaSyncup.execute` /`makeResponseToEntryRequest`:
369///   - VLSN in range → [`SyncupMsg::Entry`];
370///   - VLSN below range → [`SyncupMsg::EntryNotFound`];
371///   - VLSN above range (first request only) → [`SyncupMsg::AlternateMatchpoint`]
372///     with the feeder's `lastSync`.
373///
374/// Loops until the replica sends `StartStream` (returns the agreed start VLSN)
375/// or `RestoreRequest` (returns `None`, network restore).
376pub fn feeder_syncup_handshake(
377    channel: &dyn Channel,
378    feeder: &dyn SyncupView,
379) -> Result<Option<Vlsn>> {
380    let mut first_response = true;
381    loop {
382        let msg = recv(channel)?;
383        match msg {
384            SyncupMsg::EntryRequest { vlsn } => {
385                let response =
386                    make_entry_response(feeder, vlsn, first_response);
387                first_response = false;
388                send(channel, &response)?;
389            }
390            SyncupMsg::StartStream { start_vlsn } => {
391                return Ok(Some(start_vlsn));
392            }
393            SyncupMsg::RestoreRequest { .. } => {
394                send(channel, &SyncupMsg::RestoreResponse)?;
395                return Ok(None);
396            }
397            other => {
398                return Err(RepError::ProtocolError(format!(
399                    "syncup feeder: unexpected request {other:?}"
400                )));
401            }
402        }
403    }
404}
405
406/// Build the feeder's response to an `EntryRequest`. JE
407/// `FeederReplicaSyncup.makeResponseToEntryRequest` (DEFAULT mode).
408fn make_entry_response(
409    feeder: &dyn SyncupView,
410    request_vlsn: Vlsn,
411    is_first_response: bool,
412) -> SyncupMsg {
413    let first = feeder.first_vlsn();
414    let last_sync = feeder.last_sync();
415
416    // Below the feeder's range → EntryNotFound.
417    if !first.is_null() && request_vlsn < first {
418        return SyncupMsg::EntryNotFound;
419    }
420
421    // In range and held → Entry.
422    if let Some(e) = feeder.entry(request_vlsn) {
423        return SyncupMsg::Entry {
424            vlsn: request_vlsn,
425            lsn: e.lsn,
426            fingerprint: e.fingerprint,
427            is_sync: e.is_sync,
428        };
429    }
430
431    // Above the feeder's range (not held, not below first): on the first
432    // response, counter-offer the feeder's lastSync (JE AlternateMatchpoint).
433    if is_first_response
434        && !last_sync.is_null()
435        && let Some(e) = feeder.entry(last_sync)
436    {
437        return SyncupMsg::AlternateMatchpoint {
438            vlsn: last_sync,
439            lsn: e.lsn,
440            fingerprint: e.fingerprint,
441            is_sync: e.is_sync,
442        };
443    }
444
445    SyncupMsg::EntryNotFound
446}
447
448// ---------------------------------------------------------------------------
449// Convenience: build a feeder view that answers from find_matchpoint logic
450// ---------------------------------------------------------------------------
451
452/// Compute the agreed matchpoint by running the replica handshake against a
453/// LOCAL feeder view (no channel) — the in-process fast path used when both
454/// nodes share an address space (the test harness). Equivalent in result to
455/// running [`replica_syncup_handshake`] + [`feeder_syncup_handshake`] over a
456/// real channel.
457pub fn local_matchpoint(
458    replica: &dyn SyncupView,
459    feeder: &dyn SyncupView,
460) -> Matchpoint {
461    find_matchpoint(replica, feeder)
462}
463
464// ---------------------------------------------------------------------------
465// Framed send/recv over the rep Channel
466// ---------------------------------------------------------------------------
467
468fn send(channel: &dyn Channel, msg: &SyncupMsg) -> Result<()> {
469    channel.send(&msg.encode())
470}
471
472fn recv(channel: &dyn Channel) -> Result<SyncupMsg> {
473    let frame = channel.receive(SYNCUP_TIMEOUT)?.ok_or_else(|| {
474        RepError::NetworkError("syncup: no message received".into())
475    })?;
476    SyncupMsg::decode(&frame)
477}
478
479/// Build a [`VlsnEntry`] (helper for callers constructing feeder views).
480pub fn vlsn_entry(lsn: u64, fingerprint: u64, is_sync: bool) -> VlsnEntry {
481    VlsnEntry { lsn, fingerprint, is_sync }
482}
483
484#[cfg(test)]
485mod tests {
486    use super::*;
487    use crate::net::channel::LocalChannelPair;
488    use std::collections::HashMap;
489    use std::sync::Arc;
490
491    struct MapView {
492        last_sync: Vlsn,
493        last_txn_end: Vlsn,
494        first: Vlsn,
495        entries: HashMap<i64, VlsnEntry>,
496    }
497    impl MapView {
498        fn new(first: i64, last_sync: i64, last_txn_end: i64) -> Self {
499            Self {
500                last_sync: Vlsn::new(last_sync),
501                last_txn_end: Vlsn::new(last_txn_end),
502                first: Vlsn::new(first),
503                entries: HashMap::new(),
504            }
505        }
506        fn put(mut self, v: i64, lsn: u64, fp: u64, sync: bool) -> Self {
507            self.entries
508                .insert(v, VlsnEntry { lsn, fingerprint: fp, is_sync: sync });
509            self
510        }
511    }
512    impl SyncupView for MapView {
513        fn last_sync(&self) -> Vlsn {
514            self.last_sync
515        }
516        fn last_txn_end(&self) -> Vlsn {
517            self.last_txn_end
518        }
519        fn first_vlsn(&self) -> Vlsn {
520            self.first
521        }
522        fn entry(&self, vlsn: Vlsn) -> Option<VlsnEntry> {
523            self.entries.get(&vlsn.sequence()).copied()
524        }
525    }
526
527    #[test]
528    fn test_msg_roundtrip() {
529        let msgs = vec![
530            SyncupMsg::EntryRequest { vlsn: Vlsn::new(7) },
531            SyncupMsg::Entry {
532                vlsn: Vlsn::new(7),
533                lsn: 0x1234,
534                fingerprint: 0xABCD,
535                is_sync: true,
536            },
537            SyncupMsg::EntryNotFound,
538            SyncupMsg::AlternateMatchpoint {
539                vlsn: Vlsn::new(5),
540                lsn: 0x500,
541                fingerprint: 0x55,
542                is_sync: false,
543            },
544            SyncupMsg::StartStream { start_vlsn: Vlsn::new(8) },
545            SyncupMsg::RestoreRequest { failed_vlsn: Vlsn::new(3) },
546            SyncupMsg::RestoreResponse,
547        ];
548        for m in msgs {
549            assert_eq!(SyncupMsg::decode(&m.encode()).unwrap(), m);
550        }
551    }
552
553    /// Full handshake over a LocalChannel: a diverged replica (VLSN 6/7 differ
554    /// from the feeder) converges on the highest common matchpoint (VLSN 4).
555    #[test]
556    fn test_handshake_diverged_converges() {
557        let pair = LocalChannelPair::new();
558        let replica_ch: Arc<dyn Channel> = Arc::new(pair.channel_a);
559        let feeder_ch: Arc<dyn Channel> = Arc::new(pair.channel_b);
560
561        // Replica: sync points at 6 (divergent) and 4 (common); 5 non-sync.
562        let replica = MapView::new(1, 6, 6)
563            .put(6, 0x600, 0xDEAD, true)
564            .put(5, 0x500, 0x55, false)
565            .put(4, 0x400, 0x44, true);
566        // Feeder: holds 8,6(diff),4(same).
567        let feeder = MapView::new(1, 8, 8)
568            .put(8, 0x800, 0x88, true)
569            .put(6, 0x600, 0xBEEF, true)
570            .put(4, 0x400, 0x44, true);
571
572        let feeder_handle = std::thread::spawn(move || {
573            feeder_syncup_handshake(feeder_ch.as_ref(), &feeder)
574        });
575
576        let outcome =
577            replica_syncup_handshake(replica_ch.as_ref(), &replica).unwrap();
578        assert_eq!(
579            outcome,
580            SyncupOutcome::Matchpoint {
581                matchpoint_vlsn: Vlsn::new(4),
582                matchpoint_lsn: 0x400,
583                start_vlsn: Vlsn::new(5),
584            }
585        );
586        let feeder_start = feeder_handle.join().unwrap().unwrap();
587        assert_eq!(feeder_start, Some(Vlsn::new(5)));
588    }
589
590    /// Replica's first proposal is above the feeder's range; feeder counters
591    /// with an AlternateMatchpoint (its lastSync=8) which the replica adopts.
592    #[test]
593    fn test_handshake_alternate_matchpoint() {
594        let pair = LocalChannelPair::new();
595        let replica_ch: Arc<dyn Channel> = Arc::new(pair.channel_a);
596        let feeder_ch: Arc<dyn Channel> = Arc::new(pair.channel_b);
597
598        // Replica has 1..=10, lastSync=10. Feeder only has 1..=8, lastSync=8.
599        let mut replica = MapView::new(1, 10, 10);
600        for v in 1..=10 {
601            replica = replica.put(v, (v as u64) * 0x100, v as u64, true);
602        }
603        let mut feeder = MapView::new(1, 8, 8);
604        for v in 1..=8 {
605            feeder = feeder.put(v, (v as u64) * 0x100, v as u64, true);
606        }
607
608        let feeder_handle = std::thread::spawn(move || {
609            feeder_syncup_handshake(feeder_ch.as_ref(), &feeder)
610        });
611        let outcome =
612            replica_syncup_handshake(replica_ch.as_ref(), &replica).unwrap();
613        // VLSN 8 record matches on both sides → matchpoint 8, start 9.
614        assert_eq!(
615            outcome,
616            SyncupOutcome::Matchpoint {
617                matchpoint_vlsn: Vlsn::new(8),
618                matchpoint_lsn: 0x800,
619                start_vlsn: Vlsn::new(9),
620            }
621        );
622        assert_eq!(feeder_handle.join().unwrap().unwrap(), Some(Vlsn::new(9)));
623    }
624
625    /// No common matchpoint → replica requests network restore.
626    #[test]
627    fn test_handshake_no_matchpoint_restore() {
628        let pair = LocalChannelPair::new();
629        let replica_ch: Arc<dyn Channel> = Arc::new(pair.channel_a);
630        let feeder_ch: Arc<dyn Channel> = Arc::new(pair.channel_b);
631
632        // Replica's records all differ from the feeder's.
633        let replica = MapView::new(4, 6, 6)
634            .put(6, 0x600, 0x11, true)
635            .put(5, 0x500, 0x22, true)
636            .put(4, 0x400, 0x33, true);
637        let feeder = MapView::new(1, 8, 8)
638            .put(8, 0x800, 0x88, true)
639            .put(6, 0x600, 0x99, true)
640            .put(5, 0x500, 0x88, true)
641            .put(4, 0x400, 0x77, true);
642
643        let feeder_handle = std::thread::spawn(move || {
644            feeder_syncup_handshake(feeder_ch.as_ref(), &feeder)
645        });
646        let outcome =
647            replica_syncup_handshake(replica_ch.as_ref(), &replica).unwrap();
648        assert!(matches!(outcome, SyncupOutcome::NeedsRestore { .. }));
649        assert_eq!(feeder_handle.join().unwrap().unwrap(), None);
650    }
651}