use std::io::BufRead;
use anyhow::{Context, bail};
use crate::delta::Delta;
use crate::digest::Digest;
use crate::serialize::{Deserializable, Serializable};
const MAGIC_NUMBER: u16 = 45_139;
#[derive(Debug, Eq, PartialEq)]
pub enum ChitchatMessage {
Syn { cluster_id: String, digest: Digest },
SynAck { digest: Digest, delta: Delta },
Ack { delta: Delta },
BadCluster,
#[cfg(test)]
PanicForTest,
}
#[derive(Clone, Copy, Eq, PartialEq)]
#[repr(u8)]
enum ProtocolVersion {
V0 = 0,
}
impl ProtocolVersion {
pub fn from_code(code: u8) -> Option<Self> {
match code {
0 => Some(Self::V0),
_ => None,
}
}
pub fn to_code(self) -> u8 {
self as u8
}
}
#[derive(Clone, Copy)]
#[repr(u8)]
enum MessageType {
Syn = 0,
SynAck = 1u8,
Ack = 2u8,
BadCluster = 3u8,
#[cfg(test)]
PanicForTest = 255u8,
}
impl MessageType {
pub fn from_code(code: u8) -> Option<Self> {
match code {
0 => Some(Self::Syn),
1 => Some(Self::SynAck),
2 => Some(Self::Ack),
3 => Some(Self::BadCluster),
#[cfg(test)]
255 => Some(Self::PanicForTest),
_ => None,
}
}
pub fn to_code(self) -> u8 {
self as u8
}
}
impl Serializable for ChitchatMessage {
fn serialize(&self, buf: &mut Vec<u8>) {
buf.extend(MAGIC_NUMBER.to_le_bytes());
ProtocolVersion::V0.to_code().serialize(buf);
match self {
ChitchatMessage::Syn { cluster_id, digest } => {
buf.push(MessageType::Syn.to_code());
digest.serialize(buf);
cluster_id.serialize(buf);
}
ChitchatMessage::SynAck { digest, delta } => {
buf.push(MessageType::SynAck.to_code());
digest.serialize(buf);
delta.serialize(buf);
}
ChitchatMessage::Ack { delta } => {
buf.push(MessageType::Ack.to_code());
delta.serialize(buf);
}
ChitchatMessage::BadCluster => {
buf.push(MessageType::BadCluster.to_code());
}
#[cfg(test)]
ChitchatMessage::PanicForTest => {
buf.push(MessageType::PanicForTest.to_code());
}
}
}
fn serialized_len(&self) -> usize {
2 + 1
+ match self {
ChitchatMessage::Syn { cluster_id, digest } => {
1 + cluster_id.serialized_len() + digest.serialized_len()
}
ChitchatMessage::SynAck { digest, delta } => {
1 + digest.serialized_len() + delta.serialized_len()
}
ChitchatMessage::Ack { delta } => 1 + delta.serialized_len(),
ChitchatMessage::BadCluster => 1,
#[cfg(test)]
ChitchatMessage::PanicForTest => 1,
}
}
}
impl Deserializable for ChitchatMessage {
fn deserialize(buf: &mut &[u8]) -> anyhow::Result<Self> {
if buf.len() < 3 {
bail!("buffer too small to store the magic number and the protocol version");
}
let magic_number = u16::from_le_bytes(buf[0..2].try_into().unwrap());
if magic_number != MAGIC_NUMBER {
bail!("invalid chitchat magic number");
}
let protocol_version =
ProtocolVersion::from_code(buf[2]).context("invalid protocol version")?;
if protocol_version != ProtocolVersion::V0 {
bail!(
"unsupported protocol version `{}`",
protocol_version.to_code()
)
}
buf.consume(3);
let message_type = buf
.first()
.copied()
.and_then(MessageType::from_code)
.context("invalid message type")?;
buf.consume(1);
match message_type {
MessageType::Syn => {
let digest = Digest::deserialize(buf)?;
let cluster_id = String::deserialize(buf)?;
Ok(Self::Syn { cluster_id, digest })
}
MessageType::SynAck => {
let digest = Digest::deserialize(buf)?;
let delta = Delta::deserialize(buf)?;
Ok(Self::SynAck { digest, delta })
}
MessageType::Ack => {
let delta = Delta::deserialize(buf)?;
Ok(Self::Ack { delta })
}
MessageType::BadCluster => Ok(Self::BadCluster),
#[cfg(test)]
MessageType::PanicForTest => Ok(Self::PanicForTest),
}
}
}
#[cfg(test)]
mod tests {
use crate::serialize::test_serdeser_aux;
use crate::{ChitchatId, ChitchatMessage, Delta, Digest, Heartbeat};
#[test]
fn test_syn() {
{
let syn = ChitchatMessage::Syn {
cluster_id: "cluster-a".to_string(),
digest: Digest::default(),
};
test_serdeser_aux(&syn, 17);
}
{
let mut digest = Digest::default();
let node = ChitchatId::for_local_test(10_001);
digest.add_node(node, Heartbeat(0), 0, 0);
let syn = ChitchatMessage::Syn {
cluster_id: "cluster-a".to_string(),
digest,
};
test_serdeser_aux(&syn, 68);
}
}
#[test]
fn test_syn_ack() {
{
let syn_ack = ChitchatMessage::SynAck {
digest: Digest::default(),
delta: Delta::default(),
};
test_serdeser_aux(&syn_ack, 7);
}
{
let mut digest = Digest::default();
let node = ChitchatId::for_local_test(10_001);
digest.add_node(node, Heartbeat(0), 0, 0);
let mut delta = Delta::default();
let node = ChitchatId::for_local_test(10_001);
delta.add_node(node.clone(), 0u64, 0u64);
delta.add_kv(&node, "key", "value", 1, true);
delta.set_serialized_len(70);
let syn_ack = ChitchatMessage::SynAck { digest, delta };
test_serdeser_aux(&syn_ack, 2 + 1 + 1 + 53 + 70);
}
}
#[test]
fn test_ack() {
{
let delta = Delta::default();
let ack = ChitchatMessage::Ack { delta };
test_serdeser_aux(&ack, 5);
}
{
let mut delta = Delta::default();
let node = ChitchatId::for_local_test(10_001);
delta.add_node(node.clone(), 0u64, 0u64);
delta.add_kv(&node, "key", "value", 1, true);
delta.set_serialized_len(70);
let ack = ChitchatMessage::Ack { delta };
test_serdeser_aux(&ack, 2 + 1 + 1 + 70);
}
}
#[test]
fn test_bad_cluster() {
test_serdeser_aux(&ChitchatMessage::BadCluster, 4);
}
}