hakuban 0.7.2

Data-object sharing library
Documentation
/*! Communication primitives; only for tests and custom transports
*/

use serde::{Deserialize, Serialize};

use crate::{object::state::{DataBytes, DataVersion}, DataFormat};

#[derive(Serialize, Deserialize, Clone)]
#[cfg_attr(test, derive(Eq, PartialEq))]
pub struct Message {
	pub sequence_number: u64,
	pub acknowledgment_number: u64,
	pub changes: Vec<Change>,
}


// this was just plain enum before, with struct-like fields.
// but serde doesn't seem to handle well missing fields in such enum values
// and it's less painful to create and mutate nested structs than such pseudo-structs
#[derive(Serialize, Deserialize, Clone)]
#[cfg_attr(test, derive(Eq, PartialEq))]
pub enum Change {
	Options(OptionsChange),
	Object(ObjectChange),
	Tag(TagChange),
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Copy)]
pub enum Preference {
	PreferYes = 1,
	PreferNo = 2,
	NoPreference = 3,
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct OptionsChange {
	pub name: Option<String>,
	pub custom: Option<String>, // JSON
	pub expose_capacity: Option<u32>,
	pub expose_load_limit: Option<u32>,
	pub diff_request: Option<Preference>,
}


#[derive(Serialize, Deserialize, Eq, PartialEq, Clone)]
pub enum DataChange {
	Data { data_bytes: DataBytes },
	Diff { old_data_version: Option<DataVersion>, diff: DataBytes },
}

#[derive(Debug, Serialize, Deserialize, Clone)]
#[cfg_attr(test, derive(Eq))]
pub struct ObjectChange {
	pub descriptor_tags: Vec<String>,
	pub descriptor_json: String, // JSON/ObjectDescriptor
	pub observer: Option<bool>,
	pub exposer: Option<bool>,
	pub data: Option<DataChange>,
	pub data_format: Option<DataFormat>,
	pub data_version: Option<DataVersion>,
	pub data_synchronized: Option<u64>,
	pub cost: Option<u32>,
}

#[cfg(test)]
impl PartialEq for ObjectChange {
	fn eq(&self, other: &Self) -> bool {
		self.descriptor_tags == other.descriptor_tags
			&& self.descriptor_json == other.descriptor_json
			&& self.observer == other.observer
			&& self.exposer == other.exposer
			&& self.data == other.data
			&& self.data_format == other.data_format
			&& self.data_version == other.data_version
			&& self.cost == other.cost
			&& ((self.data_synchronized.is_none() && other.data_synchronized.is_none())
				|| (self.data_synchronized.is_some()
					&& other.data_synchronized.is_some()
					&& ((self.data_synchronized.unwrap() == 0 && other.data_synchronized.unwrap() == 0)
						|| (self.data_synchronized.unwrap() != 0 && other.data_synchronized.unwrap() != 0)
							&& ((self.data_synchronized.unwrap() as i64 - other.data_synchronized.unwrap() as i64).abs() < 100000))))
	}
}


#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
pub struct TagChange {
	pub descriptor_json: String,
	pub observer: Option<bool>,
	pub exposer: Option<bool>,
	pub cost: Option<u32>,
}

impl ObjectChange {
	pub fn is_empty(&self) -> bool {
		!(self.observer.is_some()
			|| self.exposer.is_some()
			|| self.data.is_some()
			|| self.data_format.is_some()
			|| self.data_version.is_some()
			|| self.data_synchronized.is_some()
			|| self.cost.is_some())
	}
}

impl TagChange {
	pub fn is_empty(&self) -> bool {
		!(self.observer.is_some() || self.exposer.is_some())
	}
}

#[derive(Debug)]
pub struct MessageSerializationError {
	pub message: String,
}

impl Message {
	pub fn serialize(&self) -> Result<Vec<u8>, MessageSerializationError> {
		bincode::serialize(self).map_err(|error| MessageSerializationError { message: error.to_string() })
	}

	pub fn deserialize(buffer: &[u8]) -> Result<Message, MessageSerializationError> {
		bincode::deserialize::<Message>(buffer).map_err(|error| MessageSerializationError { message: error.to_string() })
	}
}


impl std::fmt::Debug for Message {
	fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
		if fmt.alternate() {
			fmt.write_fmt(format_args!("┌───── Message seq:{} ack:{} changes:\n", self.sequence_number, self.acknowledgment_number))?;
			for change in &self.changes {
				change.fmt(fmt)?;
			}
			fmt.write_str("└╼")?;
		} else {
			fmt.write_fmt(format_args!("Message<seq:{},ack:{},changes:{}>", self.sequence_number, self.acknowledgment_number, self.changes.len()))?;
		}
		Ok(())
	}
}

impl std::fmt::Debug for Change {
	fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
		if fmt.alternate() {
			match &self {
				Change::Options(change) => fmt.write_fmt(format_args!(
					"│ Options -> name: {:?}  custom: {:?}  expose_capacity: {:?}  expose_load_limit: {:?}  diff_request: {:?}\n",
					change.name, change.custom, change.expose_capacity, change.expose_load_limit, change.diff_request
				))?,
				Change::Object(change) => fmt.write_fmt(format_args!(
					"│ Object -> {:?} {:?}  observer: {:?}  exposer: {:?}  data_format: {:?}  data_version: {:?} data: {:?} cost: {:?} last_synchronized: {:?}\n",
					change.descriptor_tags,
					change.descriptor_json,
					change.observer,
					change.exposer,
					change.data_format,
					change.data_version,
					change.data,
					change.cost,
					change.data_synchronized
				))?,
				Change::Tag(change) => fmt.write_fmt(format_args!(
					"│ Tag -> {:?}  observer: {:?}  exposer: {:?}  cost: {:?}\n",
					change.descriptor_json, change.observer, change.exposer, change.cost
				))?,
			};
		} else {
			match &self {
				Change::Options(_change) => fmt.write_fmt(format_args!("Change<Options>"))?,
				Change::Object(_change) => fmt.write_fmt(format_args!("Change<Object>"))?,
				Change::Tag(_change) => fmt.write_fmt(format_args!("Change<Tag>"))?,
			};
		};
		Ok(())
	}
}


impl std::fmt::Debug for DataChange {
	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
		match self {
			DataChange::Data { data_bytes: data } => f.write_fmt(format_args!("Data:{}B", data.len())),
			DataChange::Diff { old_data_version, diff } => f.write_fmt(format_args!("Diff:{:?}:{}B", old_data_version, diff.len())),
		}
	}
}

impl From<Option<bool>> for Preference {
	fn from(value: Option<bool>) -> Self {
		match value {
			Some(true) => Preference::PreferYes,
			Some(false) => Preference::PreferNo,
			None => Preference::NoPreference,
		}
	}
}

impl Preference {
	pub fn or(&self, other: Preference) -> Preference {
		match self {
			Preference::PreferYes | Preference::PreferNo => *self,
			Preference::NoPreference => other,
		}
	}

	pub fn unwrap_or(&self, default: bool) -> bool {
		match self {
			Preference::PreferYes => true,
			Preference::PreferNo => false,
			Preference::NoPreference => default,
		}
	}
}


#[cfg(test)]
mod tests {

	use std::sync::Arc;

	use super::*;

	#[test]
	fn can_serialize_and_deserialize_message() {
		let message = Message {
			sequence_number: 1,
			acknowledgment_number: 2,
			changes: vec![Change::Object(ObjectChange {
				descriptor_tags: vec!["{}".to_string()],
				descriptor_json: "{}".to_string(),
				observer: Some(true),
				exposer: Some(true),
				data: Some(DataChange::Data { data_bytes: Arc::new(b"aaaaa".to_vec()) }),
				data_format: None,
				data_version: None,
				data_synchronized: None,
				cost: None,
			})],
		};

		//JSON
		let serialized = serde_json::to_string(&message).unwrap();
		println!("{}", serialized);
		let deserialized: Message = serde_json::from_str(&serialized).unwrap();
		assert_eq!(message, deserialized);

		//YAML
		let serialized = serde_yaml::to_string(&message).unwrap();
		println!("{}", serialized);
		let deserialized: Message = serde_yaml::from_str(&serialized).unwrap();
		assert_eq!(message, deserialized);

		//Wire proto
		let serialized = message.serialize().unwrap();
		println!("{:?}", &serialized);
		let deserialized = Message::deserialize(&serialized).unwrap();
		assert_eq!(message, deserialized);
	}
}