use serde::{Deserialize, Serialize};
use crate::{object::state::{DataBytes, DataVersion}, DataFormat};
#[derive(Serialize, Deserialize, Clone)]
#[cfg_attr(test, derive(Eq, PartialEq))]
pub struct Message {
pub sequence_number: u64,
pub acknowledgment_number: u64,
pub changes: Vec<Change>,
}
#[derive(Serialize, Deserialize, Clone)]
#[cfg_attr(test, derive(Eq, PartialEq))]
pub enum Change {
Options(OptionsChange),
Object(ObjectChange),
Tag(TagChange),
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Copy)]
pub enum Preference {
PreferYes = 1,
PreferNo = 2,
NoPreference = 3,
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct OptionsChange {
pub name: Option<String>,
pub custom: Option<String>, pub expose_capacity: Option<u32>,
pub expose_load_limit: Option<u32>,
pub diff_request: Option<Preference>,
}
#[derive(Serialize, Deserialize, Eq, PartialEq, Clone)]
pub enum DataChange {
Data { data_bytes: DataBytes },
Diff { old_data_version: Option<DataVersion>, diff: DataBytes },
}
#[derive(Debug, Serialize, Deserialize, Clone)]
#[cfg_attr(test, derive(Eq))]
pub struct ObjectChange {
pub descriptor_tags: Vec<String>,
pub descriptor_json: String, pub observer: Option<bool>,
pub exposer: Option<bool>,
pub data: Option<DataChange>,
pub data_format: Option<DataFormat>,
pub data_version: Option<DataVersion>,
pub data_synchronized: Option<u64>,
pub cost: Option<u32>,
}
#[cfg(test)]
impl PartialEq for ObjectChange {
fn eq(&self, other: &Self) -> bool {
self.descriptor_tags == other.descriptor_tags
&& self.descriptor_json == other.descriptor_json
&& self.observer == other.observer
&& self.exposer == other.exposer
&& self.data == other.data
&& self.data_format == other.data_format
&& self.data_version == other.data_version
&& self.cost == other.cost
&& ((self.data_synchronized.is_none() && other.data_synchronized.is_none())
|| (self.data_synchronized.is_some()
&& other.data_synchronized.is_some()
&& ((self.data_synchronized.unwrap() == 0 && other.data_synchronized.unwrap() == 0)
|| (self.data_synchronized.unwrap() != 0 && other.data_synchronized.unwrap() != 0)
&& ((self.data_synchronized.unwrap() as i64 - other.data_synchronized.unwrap() as i64).abs() < 100000))))
}
}
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
pub struct TagChange {
pub descriptor_json: String,
pub observer: Option<bool>,
pub exposer: Option<bool>,
pub cost: Option<u32>,
}
impl ObjectChange {
pub fn is_empty(&self) -> bool {
!(self.observer.is_some()
|| self.exposer.is_some()
|| self.data.is_some()
|| self.data_format.is_some()
|| self.data_version.is_some()
|| self.data_synchronized.is_some()
|| self.cost.is_some())
}
}
impl TagChange {
pub fn is_empty(&self) -> bool {
!(self.observer.is_some() || self.exposer.is_some())
}
}
#[derive(Debug)]
pub struct MessageSerializationError {
pub message: String,
}
impl Message {
pub fn serialize(&self) -> Result<Vec<u8>, MessageSerializationError> {
bincode::serialize(self).map_err(|error| MessageSerializationError { message: error.to_string() })
}
pub fn deserialize(buffer: &[u8]) -> Result<Message, MessageSerializationError> {
bincode::deserialize::<Message>(buffer).map_err(|error| MessageSerializationError { message: error.to_string() })
}
}
impl std::fmt::Debug for Message {
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if fmt.alternate() {
fmt.write_fmt(format_args!("┌───── Message seq:{} ack:{} changes:\n", self.sequence_number, self.acknowledgment_number))?;
for change in &self.changes {
change.fmt(fmt)?;
}
fmt.write_str("└╼")?;
} else {
fmt.write_fmt(format_args!("Message<seq:{},ack:{},changes:{}>", self.sequence_number, self.acknowledgment_number, self.changes.len()))?;
}
Ok(())
}
}
impl std::fmt::Debug for Change {
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if fmt.alternate() {
match &self {
Change::Options(change) => fmt.write_fmt(format_args!(
"│ Options -> name: {:?} custom: {:?} expose_capacity: {:?} expose_load_limit: {:?} diff_request: {:?}\n",
change.name, change.custom, change.expose_capacity, change.expose_load_limit, change.diff_request
))?,
Change::Object(change) => fmt.write_fmt(format_args!(
"│ Object -> {:?} {:?} observer: {:?} exposer: {:?} data_format: {:?} data_version: {:?} data: {:?} cost: {:?} last_synchronized: {:?}\n",
change.descriptor_tags,
change.descriptor_json,
change.observer,
change.exposer,
change.data_format,
change.data_version,
change.data,
change.cost,
change.data_synchronized
))?,
Change::Tag(change) => fmt.write_fmt(format_args!(
"│ Tag -> {:?} observer: {:?} exposer: {:?} cost: {:?}\n",
change.descriptor_json, change.observer, change.exposer, change.cost
))?,
};
} else {
match &self {
Change::Options(_change) => fmt.write_fmt(format_args!("Change<Options>"))?,
Change::Object(_change) => fmt.write_fmt(format_args!("Change<Object>"))?,
Change::Tag(_change) => fmt.write_fmt(format_args!("Change<Tag>"))?,
};
};
Ok(())
}
}
impl std::fmt::Debug for DataChange {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
DataChange::Data { data_bytes: data } => f.write_fmt(format_args!("Data:{}B", data.len())),
DataChange::Diff { old_data_version, diff } => f.write_fmt(format_args!("Diff:{:?}:{}B", old_data_version, diff.len())),
}
}
}
impl From<Option<bool>> for Preference {
fn from(value: Option<bool>) -> Self {
match value {
Some(true) => Preference::PreferYes,
Some(false) => Preference::PreferNo,
None => Preference::NoPreference,
}
}
}
impl Preference {
pub fn or(&self, other: Preference) -> Preference {
match self {
Preference::PreferYes | Preference::PreferNo => *self,
Preference::NoPreference => other,
}
}
pub fn unwrap_or(&self, default: bool) -> bool {
match self {
Preference::PreferYes => true,
Preference::PreferNo => false,
Preference::NoPreference => default,
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
#[test]
fn can_serialize_and_deserialize_message() {
let message = Message {
sequence_number: 1,
acknowledgment_number: 2,
changes: vec![Change::Object(ObjectChange {
descriptor_tags: vec!["{}".to_string()],
descriptor_json: "{}".to_string(),
observer: Some(true),
exposer: Some(true),
data: Some(DataChange::Data { data_bytes: Arc::new(b"aaaaa".to_vec()) }),
data_format: None,
data_version: None,
data_synchronized: None,
cost: None,
})],
};
let serialized = serde_json::to_string(&message).unwrap();
println!("{}", serialized);
let deserialized: Message = serde_json::from_str(&serialized).unwrap();
assert_eq!(message, deserialized);
let serialized = serde_yaml::to_string(&message).unwrap();
println!("{}", serialized);
let deserialized: Message = serde_yaml::from_str(&serialized).unwrap();
assert_eq!(message, deserialized);
let serialized = message.serialize().unwrap();
println!("{:?}", &serialized);
let deserialized = Message::deserialize(&serialized).unwrap();
assert_eq!(message, deserialized);
}
}