use std::{io::Cursor, sync::Arc};
use chrono::Utc;
use rmp_serde::Deserializer;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use ts_rs::TS;
use crate::{common::to_value::ToValue, core::item::AnyItem, item::Eventable};
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, TS)]
pub enum MEventType {
SET,
DEL,
}
#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export)]
pub struct EventOptions {
#[serde(default)]
pub prevent_relationship_updates: bool,
#[serde(default)]
pub prevent_persist: bool,
#[serde(default)]
pub from_peer: Option<bool>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export)]
pub struct MEvent {
pub item: Value,
pub change_type: MEventType,
pub item_type: String,
#[serde(default = "utc_now_iso")]
pub created_at: String,
#[serde(default = "generate_random_uuid")]
pub tx: String,
pub source_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub options: Option<EventOptions>,
}
fn generate_random_uuid() -> String {
uuid::Uuid::new_v4().to_string()
}
fn utc_now_iso() -> String {
Utc::now().to_rfc3339()
}
impl MEvent {
pub fn from_str_trim(s: &str) -> Result<MEvent, serde_json::Error> {
serde_json::from_str(s)
}
pub fn from_mp(s: &[u8]) -> Result<MEvent, rmp_serde::decode::Error> {
let cur = Cursor::new(s);
let mut de = Deserializer::new(cur);
Deserialize::deserialize(&mut de)
}
pub fn item_json(&self) -> Value {
self.item.clone()
}
pub fn from_item(item: &impl Eventable, change_type: MEventType, source_id: &str) -> MEvent {
MEvent {
item: serde_json::to_value(item).unwrap(),
change_type,
item_type: item.entity_type().to_string(),
created_at: Utc::now().to_rfc3339(),
tx: uuid::Uuid::new_v4().to_string(),
source_id: Some(source_id.to_string()),
options: None,
}
}
pub fn from_item_with_options(
item: &impl Eventable,
change_type: MEventType,
source_id: &str,
options: Option<EventOptions>,
) -> MEvent {
MEvent {
item: serde_json::to_value(item).unwrap(),
change_type,
item_type: item.entity_type().to_string(),
created_at: Utc::now().to_rfc3339(),
tx: uuid::Uuid::new_v4().to_string(),
source_id: Some(source_id.to_string()),
options,
}
}
pub fn del(item: &impl Eventable, source_id: &str) -> MEvent {
MEvent {
item: serde_json::to_value(item).unwrap(),
change_type: MEventType::DEL,
item_type: item.entity_type().to_string(),
created_at: Utc::now().to_rfc3339(),
tx: uuid::Uuid::new_v4().to_string(),
source_id: Some(source_id.to_string()),
options: None,
}
}
pub fn del_from_any(item: &Arc<dyn AnyItem>, source_id: &str) -> MEvent {
MEvent {
item: item.to_value(),
change_type: MEventType::DEL,
item_type: item.entity_type().to_string(),
created_at: Utc::now().to_rfc3339(),
tx: uuid::Uuid::new_v4().to_string(),
source_id: Some(source_id.to_string()),
options: None,
}
}
pub fn set_from_value(entity_type: &str, value: Value, source_id: &str) -> MEvent {
MEvent {
item: value,
change_type: MEventType::SET,
item_type: entity_type.to_string(),
created_at: Utc::now().to_rfc3339(),
tx: uuid::Uuid::new_v4().to_string(),
source_id: Some(source_id.to_string()),
options: None,
}
}
pub fn prevent_relationship_updates(&self) -> bool {
self.options
.as_ref()
.map(|o| o.prevent_relationship_updates)
.unwrap_or(false)
}
pub fn is_from_peer(&self) -> bool {
self.options
.as_ref()
.and_then(|o| o.from_peer)
.unwrap_or(false)
}
pub fn change_type(&self) -> MEventType {
self.change_type
}
pub fn item_type(&self) -> String {
self.item_type.to_string()
}
pub fn sanitize_null_bytes(&mut self) {
fn sanitize_string(s: &mut String) {
if s.as_bytes().contains(&0) {
*s = s.replace('\0', "");
}
}
fn sanitize_value(v: &mut Value) {
match v {
Value::String(s) => sanitize_string(s),
Value::Array(arr) => arr.iter_mut().for_each(sanitize_value),
Value::Object(map) => {
let has_bad_key = map.keys().any(|k| k.as_bytes().contains(&0));
if has_bad_key {
let entries: Vec<_> = std::mem::take(map)
.into_iter()
.map(|(k, mut v)| {
sanitize_value(&mut v);
(k.replace('\0', ""), v)
})
.collect();
*map = entries.into_iter().collect();
} else {
map.values_mut().for_each(sanitize_value);
}
}
_ => {}
}
}
sanitize_string(&mut self.item_type);
sanitize_string(&mut self.created_at);
sanitize_string(&mut self.tx);
if let Some(ref mut sid) = self.source_id {
sanitize_string(sid);
}
sanitize_value(&mut self.item);
}
}