use std::io::{self, Read, Write};
use bincode::{config::standard, decode_from_slice, encode_to_vec, Decode, Encode};
use crate::infinitedb_core::snapshot::Snapshot;
use crate::infinitedb_sync::{
delta::Delta,
merkle::MerkleTree,
transport::SyncEnvelope,
};
#[derive(Debug, Encode, Decode)]
pub enum SyncMessage {
MerkleRoot { root: [u8; 32] },
MerkleTree(MerkleTree),
Delta(Delta),
Ack { applied_revision: u64 },
OperationBatch(Vec<SyncEnvelope>),
Error { message: String },
}
pub fn write_message<W: Write>(sink: &mut W, msg: &SyncMessage) -> io::Result<()> {
let payload = encode_to_vec(msg, standard())
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
let len = payload.len() as u64;
sink.write_all(&len.to_le_bytes())?;
sink.write_all(&payload)
}
pub fn read_message<R: Read>(src: &mut R) -> io::Result<SyncMessage> {
let mut len_buf = [0u8; 8];
src.read_exact(&mut len_buf)?;
let len = u64::from_le_bytes(len_buf) as usize;
if len > 256 * 1024 * 1024 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"SyncMessage payload exceeds 256 MiB limit",
));
}
let mut payload = vec![0u8; len];
src.read_exact(&mut payload)?;
let (msg, _) = decode_from_slice::<SyncMessage, _>(&payload, standard())
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
Ok(msg)
}
pub fn encode_snapshot(snapshot: &Snapshot) -> io::Result<Vec<u8>> {
encode_to_vec(snapshot, standard())
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
}
pub fn decode_snapshot(bytes: &[u8]) -> io::Result<Snapshot> {
let (snap, _) = decode_from_slice::<Snapshot, _>(bytes, standard())
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
Ok(snap)
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
#[test]
fn roundtrip_merkle_root_message() {
let msg = SyncMessage::MerkleRoot { root: [7u8; 32] };
let mut buf = Vec::new();
write_message(&mut buf, &msg).unwrap();
let mut cursor = Cursor::new(buf);
let decoded = read_message(&mut cursor).unwrap();
assert!(matches!(decoded, SyncMessage::MerkleRoot { root } if root == [7u8; 32]));
}
#[test]
fn roundtrip_ack_message() {
let msg = SyncMessage::Ack { applied_revision: 42 };
let mut buf = Vec::new();
write_message(&mut buf, &msg).unwrap();
let mut cursor = Cursor::new(buf);
let decoded = read_message(&mut cursor).unwrap();
assert!(matches!(decoded, SyncMessage::Ack { applied_revision: 42 }));
}
#[test]
fn roundtrip_operation_batch_message() {
use crate::infinitedb_core::address::{Address, DimensionVector, RevisionId, SpaceId};
use crate::infinitedb_sync::transport::SyncOperation;
let batch = vec![SyncEnvelope {
op_id: 99,
op: SyncOperation::Write {
address: Address::new(SpaceId(1), DimensionVector::new(vec![1, 2])),
revision: RevisionId(3),
data: vec![7, 8],
},
}];
let msg = SyncMessage::OperationBatch(batch);
let mut buf = Vec::new();
write_message(&mut buf, &msg).unwrap();
let mut cursor = Cursor::new(buf);
let decoded = read_message(&mut cursor).unwrap();
assert!(matches!(decoded, SyncMessage::OperationBatch(v) if v.len() == 1 && v[0].op_id == 99));
}
}