1use std::sync::Arc;
2
3use chrono::Utc;
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6
7use crate::{TS, common::to_value::ToValue, core::item::AnyItem, item::Eventable};
8
9#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, TS)]
10pub enum MEventType {
11 SET,
12 DEL,
13}
14
15#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize, TS)]
17#[serde(rename_all = "camelCase")]
18#[ts(export)]
19pub struct EventOptions {
20 #[serde(default)]
23 pub prevent_relationship_updates: bool,
24 #[serde(default)]
27 pub prevent_persist: bool,
28 #[serde(default)]
31 pub from_peer: Option<bool>,
32}
33
34#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, TS)]
35#[serde(rename_all = "camelCase")]
36#[ts(export)]
37pub struct MEvent {
38 pub item: Value,
39
40 pub change_type: MEventType,
41
42 pub item_type: String,
43
44 #[serde(default = "utc_now_iso")]
45 pub created_at: String,
46
47 #[serde(default = "generate_random_uuid")]
48 pub tx: String,
49
50 pub source_id: Option<String>,
51
52 #[serde(default, skip_serializing_if = "Option::is_none")]
54 pub options: Option<EventOptions>,
55}
56
57fn generate_random_uuid() -> String {
58 uuid::Uuid::new_v4().to_string()
59}
60
61fn utc_now_iso() -> String {
62 Utc::now().to_rfc3339()
63}
64
65impl MEvent {
66 pub fn from_str_trim(s: &str) -> Result<MEvent, serde_json::Error> {
72 serde_json::from_str(s)
73 }
74
75 pub fn from_cbor(s: &[u8]) -> Result<MEvent, ciborium::de::Error<std::io::Error>> {
76 ciborium::de::from_reader(s)
77 }
78
79 pub fn item_json(&self) -> Value {
80 self.item.clone()
81 }
82
83 pub fn from_item(item: &impl Eventable, change_type: MEventType, source_id: &str) -> MEvent {
84 MEvent {
85 item: serde_json::to_value(item).unwrap(),
86 change_type,
87 item_type: item.entity_type().to_string(),
88 created_at: Utc::now().to_rfc3339(),
89 tx: uuid::Uuid::new_v4().to_string(),
90 source_id: Some(source_id.to_string()),
91 options: None,
92 }
93 }
94
95 pub fn from_item_with_options(
97 item: &impl Eventable,
98 change_type: MEventType,
99 source_id: &str,
100 options: Option<EventOptions>,
101 ) -> MEvent {
102 MEvent {
103 item: serde_json::to_value(item).unwrap(),
104 change_type,
105 item_type: item.entity_type().to_string(),
106 created_at: Utc::now().to_rfc3339(),
107 tx: uuid::Uuid::new_v4().to_string(),
108 source_id: Some(source_id.to_string()),
109 options,
110 }
111 }
112
113 pub fn del(item: &impl Eventable, source_id: &str) -> MEvent {
115 MEvent {
116 item: serde_json::to_value(item).unwrap(),
117 change_type: MEventType::DEL,
118 item_type: item.entity_type().to_string(),
119 created_at: Utc::now().to_rfc3339(),
120 tx: uuid::Uuid::new_v4().to_string(),
121 source_id: Some(source_id.to_string()),
122 options: None,
123 }
124 }
125
126 pub fn del_from_any(item: &Arc<dyn AnyItem>, source_id: &str) -> MEvent {
128 MEvent {
129 item: item.to_value(),
130 change_type: MEventType::DEL,
131 item_type: item.entity_type().to_string(),
132 created_at: Utc::now().to_rfc3339(),
133 tx: uuid::Uuid::new_v4().to_string(),
134 source_id: Some(source_id.to_string()),
135 options: None,
136 }
137 }
138
139 pub fn set_from_value(entity_type: &str, value: Value, source_id: &str) -> MEvent {
141 MEvent {
142 item: value,
143 change_type: MEventType::SET,
144 item_type: entity_type.to_string(),
145 created_at: Utc::now().to_rfc3339(),
146 tx: uuid::Uuid::new_v4().to_string(),
147 source_id: Some(source_id.to_string()),
148 options: None,
149 }
150 }
151
152 pub fn prevent_relationship_updates(&self) -> bool {
154 self.options
155 .as_ref()
156 .map(|o| o.prevent_relationship_updates)
157 .unwrap_or(false)
158 }
159
160 pub fn is_from_peer(&self) -> bool {
162 self.options
163 .as_ref()
164 .and_then(|o| o.from_peer)
165 .unwrap_or(false)
166 }
167
168 pub fn change_type(&self) -> MEventType {
169 self.change_type
170 }
171
172 pub fn item_type(&self) -> String {
173 self.item_type.to_string()
174 }
175
176 pub fn sanitize_null_bytes(&mut self) {
181 fn sanitize_string(s: &mut String) {
182 if s.as_bytes().contains(&0) {
183 *s = s.replace('\0', "");
184 }
185 }
186
187 fn sanitize_value(v: &mut Value) {
188 match v {
189 Value::String(s) => sanitize_string(s),
190 Value::Array(arr) => arr.iter_mut().for_each(sanitize_value),
191 Value::Object(map) => {
192 let has_bad_key = map.keys().any(|k| k.as_bytes().contains(&0));
195 if has_bad_key {
196 let entries: Vec<_> = std::mem::take(map)
197 .into_iter()
198 .map(|(k, mut v)| {
199 sanitize_value(&mut v);
200 (k.replace('\0', ""), v)
201 })
202 .collect();
203 *map = entries.into_iter().collect();
204 } else {
205 map.values_mut().for_each(sanitize_value);
206 }
207 }
208 _ => {}
209 }
210 }
211
212 sanitize_string(&mut self.item_type);
213 sanitize_string(&mut self.created_at);
214 sanitize_string(&mut self.tx);
215 if let Some(ref mut sid) = self.source_id {
216 sanitize_string(sid);
217 }
218 sanitize_value(&mut self.item);
219 }
220}