Skip to main content

myko/wire/event/
mod.rs

1use std::sync::Arc;
2
3use chrono::Utc;
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6
7use crate::{TS, common::to_value::ToValue, core::item::AnyItem, item::Eventable};
8
9#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, TS)]
10pub enum MEventType {
11    SET,
12    DEL,
13}
14
15/// Options that can be attached to an event
16#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize, TS)]
17#[serde(rename_all = "camelCase")]
18#[ts(export)]
19pub struct EventOptions {
20    /// When true, relationship cascades are skipped for this event.
21    /// Used to prevent infinite loops during cascade processing.
22    #[serde(default)]
23    pub prevent_relationship_updates: bool,
24    /// When true, the event is not persisted to durable backend.
25    /// Used for events from durable backend (to avoid re-publishing).
26    #[serde(default)]
27    pub prevent_persist: bool,
28    /// When true, this event was replicated from a peer server.
29    /// Used to prevent re-broadcasting to peers and avoid cascade loops.
30    #[serde(default)]
31    pub from_peer: Option<bool>,
32}
33
34#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, TS)]
35#[serde(rename_all = "camelCase")]
36#[ts(export)]
37pub struct MEvent {
38    pub item: Value,
39
40    pub change_type: MEventType,
41
42    pub item_type: String,
43
44    #[serde(default = "utc_now_iso")]
45    pub created_at: String,
46
47    #[serde(default = "generate_random_uuid")]
48    pub tx: String,
49
50    pub source_id: Option<String>,
51
52    /// Optional event options (e.g., prevent_relationship_updates)
53    #[serde(default, skip_serializing_if = "Option::is_none")]
54    pub options: Option<EventOptions>,
55}
56
57fn generate_random_uuid() -> String {
58    uuid::Uuid::new_v4().to_string()
59}
60
61fn utc_now_iso() -> String {
62    Utc::now().to_rfc3339()
63}
64
65impl MEvent {
66    /// Parse an MEvent from a JSON string.
67    ///
68    /// NOTE: The name `from_str_trim` is historical - it no longer trims whitespace from
69    /// the input. JSON parsers handle structural whitespace correctly, and blindly removing
70    /// whitespace was destroying string values (e.g., "hello world" → "helloworld").
71    pub fn from_str_trim(s: &str) -> Result<MEvent, serde_json::Error> {
72        serde_json::from_str(s)
73    }
74
75    pub fn from_cbor(s: &[u8]) -> Result<MEvent, ciborium::de::Error<std::io::Error>> {
76        ciborium::de::from_reader(s)
77    }
78
79    pub fn item_json(&self) -> Value {
80        self.item.clone()
81    }
82
83    pub fn from_item(item: &impl Eventable, change_type: MEventType, source_id: &str) -> MEvent {
84        MEvent {
85            item: serde_json::to_value(item).unwrap(),
86            change_type,
87            item_type: item.entity_type().to_string(),
88            created_at: Utc::now().to_rfc3339(),
89            tx: uuid::Uuid::new_v4().to_string(),
90            source_id: Some(source_id.to_string()),
91            options: None,
92        }
93    }
94
95    /// Create an event with options
96    pub fn from_item_with_options(
97        item: &impl Eventable,
98        change_type: MEventType,
99        source_id: &str,
100        options: Option<EventOptions>,
101    ) -> MEvent {
102        MEvent {
103            item: serde_json::to_value(item).unwrap(),
104            change_type,
105            item_type: item.entity_type().to_string(),
106            created_at: Utc::now().to_rfc3339(),
107            tx: uuid::Uuid::new_v4().to_string(),
108            source_id: Some(source_id.to_string()),
109            options,
110        }
111    }
112
113    /// Create a DEL event from a typed entity.
114    pub fn del(item: &impl Eventable, source_id: &str) -> MEvent {
115        MEvent {
116            item: serde_json::to_value(item).unwrap(),
117            change_type: MEventType::DEL,
118            item_type: item.entity_type().to_string(),
119            created_at: Utc::now().to_rfc3339(),
120            tx: uuid::Uuid::new_v4().to_string(),
121            source_id: Some(source_id.to_string()),
122            options: None,
123        }
124    }
125
126    /// Create a DEL event from a dynamic item.
127    pub fn del_from_any(item: &Arc<dyn AnyItem>, source_id: &str) -> MEvent {
128        MEvent {
129            item: item.to_value(),
130            change_type: MEventType::DEL,
131            item_type: item.entity_type().to_string(),
132            created_at: Utc::now().to_rfc3339(),
133            tx: uuid::Uuid::new_v4().to_string(),
134            source_id: Some(source_id.to_string()),
135            options: None,
136        }
137    }
138
139    /// Create a SET event from a JSON value
140    pub fn set_from_value(entity_type: &str, value: Value, source_id: &str) -> MEvent {
141        MEvent {
142            item: value,
143            change_type: MEventType::SET,
144            item_type: entity_type.to_string(),
145            created_at: Utc::now().to_rfc3339(),
146            tx: uuid::Uuid::new_v4().to_string(),
147            source_id: Some(source_id.to_string()),
148            options: None,
149        }
150    }
151
152    /// Check if relationship updates should be prevented for this event
153    pub fn prevent_relationship_updates(&self) -> bool {
154        self.options
155            .as_ref()
156            .map(|o| o.prevent_relationship_updates)
157            .unwrap_or(false)
158    }
159
160    /// Check if this event was replicated from a peer server
161    pub fn is_from_peer(&self) -> bool {
162        self.options
163            .as_ref()
164            .and_then(|o| o.from_peer)
165            .unwrap_or(false)
166    }
167
168    pub fn change_type(&self) -> MEventType {
169        self.change_type
170    }
171
172    pub fn item_type(&self) -> String {
173        self.item_type.to_string()
174    }
175
176    /// Strip null bytes from all string fields and the item Value tree.
177    /// PostgreSQL rejects `\0` in text/jsonb columns, and binary protocol
178    /// executors (ATEM, Crestron, AMX, etc.) can produce strings containing
179    /// null bytes from raw protocol data.
180    pub fn sanitize_null_bytes(&mut self) {
181        fn sanitize_string(s: &mut String) {
182            if s.as_bytes().contains(&0) {
183                *s = s.replace('\0', "");
184            }
185        }
186
187        fn sanitize_value(v: &mut Value) {
188            match v {
189                Value::String(s) => sanitize_string(s),
190                Value::Array(arr) => arr.iter_mut().for_each(sanitize_value),
191                Value::Object(map) => {
192                    // NOTE(ts): serde_json::Map keys are Strings but we can't
193                    // mutate keys in place. Rebuild the map if any key has nulls.
194                    let has_bad_key = map.keys().any(|k| k.as_bytes().contains(&0));
195                    if has_bad_key {
196                        let entries: Vec<_> = std::mem::take(map)
197                            .into_iter()
198                            .map(|(k, mut v)| {
199                                sanitize_value(&mut v);
200                                (k.replace('\0', ""), v)
201                            })
202                            .collect();
203                        *map = entries.into_iter().collect();
204                    } else {
205                        map.values_mut().for_each(sanitize_value);
206                    }
207                }
208                _ => {}
209            }
210        }
211
212        sanitize_string(&mut self.item_type);
213        sanitize_string(&mut self.created_at);
214        sanitize_string(&mut self.tx);
215        if let Some(ref mut sid) = self.source_id {
216            sanitize_string(sid);
217        }
218        sanitize_value(&mut self.item);
219    }
220}