use std::time::Duration;
use noxu_util::{NULL_VLSN, Vlsn};
use crate::error::{RepError, Result};
use crate::net::channel::Channel;
use crate::stream::syncup::{
Matchpoint, SyncupView, VlsnEntry, find_matchpoint,
};
pub const SYNCUP_SERVICE_NAME: &str = "REP_SYNCUP";
const SYNCUP_TIMEOUT: Duration = Duration::from_secs(30);
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SyncupMsg {
EntryRequest { vlsn: Vlsn },
Entry { vlsn: Vlsn, lsn: u64, fingerprint: u64, is_sync: bool },
EntryNotFound,
AlternateMatchpoint {
vlsn: Vlsn,
lsn: u64,
fingerprint: u64,
is_sync: bool,
},
StartStream { start_vlsn: Vlsn },
RestoreRequest { failed_vlsn: Vlsn },
RestoreResponse,
}
const OP_ENTRY_REQUEST: u8 = 1;
const OP_ENTRY: u8 = 2;
const OP_ENTRY_NOT_FOUND: u8 = 3;
const OP_ALT_MATCHPOINT: u8 = 4;
const OP_START_STREAM: u8 = 5;
const OP_RESTORE_REQUEST: u8 = 6;
const OP_RESTORE_RESPONSE: u8 = 7;
impl SyncupMsg {
pub fn encode(&self) -> Vec<u8> {
let mut b = Vec::with_capacity(26);
match self {
SyncupMsg::EntryRequest { vlsn } => {
b.push(OP_ENTRY_REQUEST);
b.extend_from_slice(&vlsn.sequence().to_le_bytes());
}
SyncupMsg::Entry { vlsn, lsn, fingerprint, is_sync } => {
b.push(OP_ENTRY);
encode_record(&mut b, *vlsn, *lsn, *fingerprint, *is_sync);
}
SyncupMsg::EntryNotFound => b.push(OP_ENTRY_NOT_FOUND),
SyncupMsg::AlternateMatchpoint {
vlsn,
lsn,
fingerprint,
is_sync,
} => {
b.push(OP_ALT_MATCHPOINT);
encode_record(&mut b, *vlsn, *lsn, *fingerprint, *is_sync);
}
SyncupMsg::StartStream { start_vlsn } => {
b.push(OP_START_STREAM);
b.extend_from_slice(&start_vlsn.sequence().to_le_bytes());
}
SyncupMsg::RestoreRequest { failed_vlsn } => {
b.push(OP_RESTORE_REQUEST);
b.extend_from_slice(&failed_vlsn.sequence().to_le_bytes());
}
SyncupMsg::RestoreResponse => b.push(OP_RESTORE_RESPONSE),
}
b
}
pub fn decode(buf: &[u8]) -> Result<Self> {
if buf.is_empty() {
return Err(RepError::ProtocolError(
"syncup: empty message".into(),
));
}
let op = buf[0];
let body = &buf[1..];
match op {
OP_ENTRY_REQUEST => {
Ok(SyncupMsg::EntryRequest { vlsn: read_vlsn(body)? })
}
OP_ENTRY => {
let (vlsn, lsn, fingerprint, is_sync) = decode_record(body)?;
Ok(SyncupMsg::Entry { vlsn, lsn, fingerprint, is_sync })
}
OP_ENTRY_NOT_FOUND => Ok(SyncupMsg::EntryNotFound),
OP_ALT_MATCHPOINT => {
let (vlsn, lsn, fingerprint, is_sync) = decode_record(body)?;
Ok(SyncupMsg::AlternateMatchpoint {
vlsn,
lsn,
fingerprint,
is_sync,
})
}
OP_START_STREAM => {
Ok(SyncupMsg::StartStream { start_vlsn: read_vlsn(body)? })
}
OP_RESTORE_REQUEST => {
Ok(SyncupMsg::RestoreRequest { failed_vlsn: read_vlsn(body)? })
}
OP_RESTORE_RESPONSE => Ok(SyncupMsg::RestoreResponse),
other => Err(RepError::ProtocolError(format!(
"syncup: unknown opcode {other}"
))),
}
}
}
fn encode_record(
b: &mut Vec<u8>,
vlsn: Vlsn,
lsn: u64,
fingerprint: u64,
is_sync: bool,
) {
b.extend_from_slice(&vlsn.sequence().to_le_bytes());
b.extend_from_slice(&lsn.to_le_bytes());
b.extend_from_slice(&fingerprint.to_le_bytes());
b.push(is_sync as u8);
}
fn decode_record(body: &[u8]) -> Result<(Vlsn, u64, u64, bool)> {
if body.len() < 8 + 8 + 8 + 1 {
return Err(RepError::ProtocolError(format!(
"syncup: short record body ({} bytes)",
body.len()
)));
}
let seq = i64::from_le_bytes(body[0..8].try_into().unwrap());
let lsn = u64::from_le_bytes(body[8..16].try_into().unwrap());
let fingerprint = u64::from_le_bytes(body[16..24].try_into().unwrap());
let is_sync = body[24] != 0;
Ok((Vlsn::new(seq), lsn, fingerprint, is_sync))
}
fn read_vlsn(body: &[u8]) -> Result<Vlsn> {
if body.len() < 8 {
return Err(RepError::ProtocolError(format!(
"syncup: short vlsn body ({} bytes)",
body.len()
)));
}
Ok(Vlsn::new(i64::from_le_bytes(body[0..8].try_into().unwrap())))
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SyncupOutcome {
Matchpoint { matchpoint_vlsn: Vlsn, matchpoint_lsn: u64, start_vlsn: Vlsn },
NeedsRestore { failed_vlsn: Vlsn },
}
pub fn replica_syncup_handshake(
channel: &dyn Channel,
replica: &dyn SyncupView,
) -> Result<SyncupOutcome> {
let mut candidate = replica.last_sync();
let first = replica.first_vlsn();
if candidate.is_null() {
send(channel, &SyncupMsg::EntryRequest { vlsn: Vlsn::new(1) })?;
return match recv(channel)? {
SyncupMsg::Entry { vlsn, .. } => Ok(SyncupOutcome::Matchpoint {
matchpoint_vlsn: NULL_VLSN,
matchpoint_lsn: 0,
start_vlsn: vlsn, }),
_ => fall_back_to_restore(channel, Vlsn::new(1)),
};
}
let mut first_exchange = true;
loop {
let replica_entry = match replica.entry(candidate) {
Some(e) => e,
None => return fall_back_to_restore(channel, candidate),
};
send(channel, &SyncupMsg::EntryRequest { vlsn: candidate })?;
match recv(channel)? {
SyncupMsg::Entry { vlsn, lsn, fingerprint, .. } => {
if vlsn == candidate
&& lsn == replica_entry.lsn
&& fingerprint == replica_entry.fingerprint
{
return converge(channel, candidate, replica_entry.lsn);
}
}
SyncupMsg::AlternateMatchpoint { vlsn, .. } if first_exchange => {
if vlsn < first {
return fall_back_to_restore(channel, vlsn);
}
candidate = vlsn;
first_exchange = false;
continue; }
SyncupMsg::EntryNotFound => {
return fall_back_to_restore(channel, candidate);
}
other => {
return Err(RepError::ProtocolError(format!(
"syncup replica: unexpected response {other:?}"
)));
}
}
first_exchange = false;
match prev_sync(replica, candidate, first) {
Some(prev) => candidate = prev,
None => return fall_back_to_restore(channel, candidate),
}
}
}
fn converge(
channel: &dyn Channel,
matchpoint_vlsn: Vlsn,
matchpoint_lsn: u64,
) -> Result<SyncupOutcome> {
let start_vlsn = matchpoint_vlsn.next();
send(channel, &SyncupMsg::StartStream { start_vlsn })?;
Ok(SyncupOutcome::Matchpoint {
matchpoint_vlsn,
matchpoint_lsn,
start_vlsn,
})
}
fn fall_back_to_restore(
channel: &dyn Channel,
failed_vlsn: Vlsn,
) -> Result<SyncupOutcome> {
send(channel, &SyncupMsg::RestoreRequest { failed_vlsn })?;
let _ = recv(channel);
Ok(SyncupOutcome::NeedsRestore { failed_vlsn })
}
fn prev_sync(
replica: &dyn SyncupView,
from: Vlsn,
first: Vlsn,
) -> Option<Vlsn> {
let mut v = from.prev();
while !v.is_null() && v >= first {
if let Some(e) = replica.entry(v)
&& e.is_sync
{
return Some(v);
}
v = v.prev();
}
None
}
pub fn feeder_syncup_handshake(
channel: &dyn Channel,
feeder: &dyn SyncupView,
) -> Result<Option<Vlsn>> {
let mut first_response = true;
loop {
let msg = recv(channel)?;
match msg {
SyncupMsg::EntryRequest { vlsn } => {
let response =
make_entry_response(feeder, vlsn, first_response);
first_response = false;
send(channel, &response)?;
}
SyncupMsg::StartStream { start_vlsn } => {
return Ok(Some(start_vlsn));
}
SyncupMsg::RestoreRequest { .. } => {
send(channel, &SyncupMsg::RestoreResponse)?;
return Ok(None);
}
other => {
return Err(RepError::ProtocolError(format!(
"syncup feeder: unexpected request {other:?}"
)));
}
}
}
}
fn make_entry_response(
feeder: &dyn SyncupView,
request_vlsn: Vlsn,
is_first_response: bool,
) -> SyncupMsg {
let first = feeder.first_vlsn();
let last_sync = feeder.last_sync();
if !first.is_null() && request_vlsn < first {
return SyncupMsg::EntryNotFound;
}
if let Some(e) = feeder.entry(request_vlsn) {
return SyncupMsg::Entry {
vlsn: request_vlsn,
lsn: e.lsn,
fingerprint: e.fingerprint,
is_sync: e.is_sync,
};
}
if is_first_response
&& !last_sync.is_null()
&& let Some(e) = feeder.entry(last_sync)
{
return SyncupMsg::AlternateMatchpoint {
vlsn: last_sync,
lsn: e.lsn,
fingerprint: e.fingerprint,
is_sync: e.is_sync,
};
}
SyncupMsg::EntryNotFound
}
pub fn local_matchpoint(
replica: &dyn SyncupView,
feeder: &dyn SyncupView,
) -> Matchpoint {
find_matchpoint(replica, feeder)
}
fn send(channel: &dyn Channel, msg: &SyncupMsg) -> Result<()> {
channel.send(&msg.encode())
}
fn recv(channel: &dyn Channel) -> Result<SyncupMsg> {
let frame = channel.receive(SYNCUP_TIMEOUT)?.ok_or_else(|| {
RepError::NetworkError("syncup: no message received".into())
})?;
SyncupMsg::decode(&frame)
}
pub fn vlsn_entry(lsn: u64, fingerprint: u64, is_sync: bool) -> VlsnEntry {
VlsnEntry { lsn, fingerprint, is_sync }
}
#[cfg(test)]
mod tests {
use super::*;
use crate::net::channel::LocalChannelPair;
use std::collections::HashMap;
use std::sync::Arc;
struct MapView {
last_sync: Vlsn,
last_txn_end: Vlsn,
first: Vlsn,
entries: HashMap<i64, VlsnEntry>,
}
impl MapView {
fn new(first: i64, last_sync: i64, last_txn_end: i64) -> Self {
Self {
last_sync: Vlsn::new(last_sync),
last_txn_end: Vlsn::new(last_txn_end),
first: Vlsn::new(first),
entries: HashMap::new(),
}
}
fn put(mut self, v: i64, lsn: u64, fp: u64, sync: bool) -> Self {
self.entries
.insert(v, VlsnEntry { lsn, fingerprint: fp, is_sync: sync });
self
}
}
impl SyncupView for MapView {
fn last_sync(&self) -> Vlsn {
self.last_sync
}
fn last_txn_end(&self) -> Vlsn {
self.last_txn_end
}
fn first_vlsn(&self) -> Vlsn {
self.first
}
fn entry(&self, vlsn: Vlsn) -> Option<VlsnEntry> {
self.entries.get(&vlsn.sequence()).copied()
}
}
#[test]
fn test_msg_roundtrip() {
let msgs = vec![
SyncupMsg::EntryRequest { vlsn: Vlsn::new(7) },
SyncupMsg::Entry {
vlsn: Vlsn::new(7),
lsn: 0x1234,
fingerprint: 0xABCD,
is_sync: true,
},
SyncupMsg::EntryNotFound,
SyncupMsg::AlternateMatchpoint {
vlsn: Vlsn::new(5),
lsn: 0x500,
fingerprint: 0x55,
is_sync: false,
},
SyncupMsg::StartStream { start_vlsn: Vlsn::new(8) },
SyncupMsg::RestoreRequest { failed_vlsn: Vlsn::new(3) },
SyncupMsg::RestoreResponse,
];
for m in msgs {
assert_eq!(SyncupMsg::decode(&m.encode()).unwrap(), m);
}
}
#[test]
fn test_handshake_diverged_converges() {
let pair = LocalChannelPair::new();
let replica_ch: Arc<dyn Channel> = Arc::new(pair.channel_a);
let feeder_ch: Arc<dyn Channel> = Arc::new(pair.channel_b);
let replica = MapView::new(1, 6, 6)
.put(6, 0x600, 0xDEAD, true)
.put(5, 0x500, 0x55, false)
.put(4, 0x400, 0x44, true);
let feeder = MapView::new(1, 8, 8)
.put(8, 0x800, 0x88, true)
.put(6, 0x600, 0xBEEF, true)
.put(4, 0x400, 0x44, true);
let feeder_handle = std::thread::spawn(move || {
feeder_syncup_handshake(feeder_ch.as_ref(), &feeder)
});
let outcome =
replica_syncup_handshake(replica_ch.as_ref(), &replica).unwrap();
assert_eq!(
outcome,
SyncupOutcome::Matchpoint {
matchpoint_vlsn: Vlsn::new(4),
matchpoint_lsn: 0x400,
start_vlsn: Vlsn::new(5),
}
);
let feeder_start = feeder_handle.join().unwrap().unwrap();
assert_eq!(feeder_start, Some(Vlsn::new(5)));
}
#[test]
fn test_handshake_alternate_matchpoint() {
let pair = LocalChannelPair::new();
let replica_ch: Arc<dyn Channel> = Arc::new(pair.channel_a);
let feeder_ch: Arc<dyn Channel> = Arc::new(pair.channel_b);
let mut replica = MapView::new(1, 10, 10);
for v in 1..=10 {
replica = replica.put(v, (v as u64) * 0x100, v as u64, true);
}
let mut feeder = MapView::new(1, 8, 8);
for v in 1..=8 {
feeder = feeder.put(v, (v as u64) * 0x100, v as u64, true);
}
let feeder_handle = std::thread::spawn(move || {
feeder_syncup_handshake(feeder_ch.as_ref(), &feeder)
});
let outcome =
replica_syncup_handshake(replica_ch.as_ref(), &replica).unwrap();
assert_eq!(
outcome,
SyncupOutcome::Matchpoint {
matchpoint_vlsn: Vlsn::new(8),
matchpoint_lsn: 0x800,
start_vlsn: Vlsn::new(9),
}
);
assert_eq!(feeder_handle.join().unwrap().unwrap(), Some(Vlsn::new(9)));
}
#[test]
fn test_handshake_no_matchpoint_restore() {
let pair = LocalChannelPair::new();
let replica_ch: Arc<dyn Channel> = Arc::new(pair.channel_a);
let feeder_ch: Arc<dyn Channel> = Arc::new(pair.channel_b);
let replica = MapView::new(4, 6, 6)
.put(6, 0x600, 0x11, true)
.put(5, 0x500, 0x22, true)
.put(4, 0x400, 0x33, true);
let feeder = MapView::new(1, 8, 8)
.put(8, 0x800, 0x88, true)
.put(6, 0x600, 0x99, true)
.put(5, 0x500, 0x88, true)
.put(4, 0x400, 0x77, true);
let feeder_handle = std::thread::spawn(move || {
feeder_syncup_handshake(feeder_ch.as_ref(), &feeder)
});
let outcome =
replica_syncup_handshake(replica_ch.as_ref(), &replica).unwrap();
assert!(matches!(outcome, SyncupOutcome::NeedsRestore { .. }));
assert_eq!(feeder_handle.join().unwrap().unwrap(), None);
}
}