Skip to main content

teaql_runtime/
event.rs

1use std::sync::Arc;
2
3use teaql_core::{Record, Value};
4
5use crate::{RuntimeError, UserContext};
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub enum RawAuditEventKind {
9    Created,
10    Updated,
11    Deleted,
12    Recovered,
13    /// Emitted when a new table is created during schema bootstrap.
14    SchemaCreated,
15    /// Emitted when an existing table is verified during schema bootstrap.
16    SchemaVerified,
17    /// Emitted when a new column is added to an existing table (schema migration).
18    FieldAdded,
19    /// Emitted when initial seed data is inserted or updated during bootstrap.
20    DataSeeded,
21}
22
23#[derive(Debug, Clone, PartialEq)]
24pub struct EntityPropertyChange {
25    pub field: String,
26    pub old_value: Option<Value>,
27    pub new_value: Option<Value>,
28}
29
30impl EntityPropertyChange {
31    pub fn new(
32        field: impl Into<String>,
33        old_value: Option<Value>,
34        new_value: Option<Value>,
35    ) -> Self {
36        Self {
37            field: field.into(),
38            old_value,
39            new_value,
40        }
41    }
42}
43
44#[derive(Debug, Clone, PartialEq)]
45pub struct RawAuditEvent {
46    pub kind: RawAuditEventKind,
47    pub entity: String,
48    pub values: Record,
49    pub updated_fields: Vec<String>,
50    pub old_values: Option<Record>,
51    pub new_values: Option<Record>,
52    pub changes: Vec<EntityPropertyChange>,
53    /// Annotation trace chain from the graph save scope chain.
54    pub trace_chain: Vec<teaql_core::TraceNode>,
55}
56
57impl RawAuditEvent {
58    pub fn created(entity: impl Into<String>, values: Record) -> Self {
59        let changes = values
60            .iter()
61            .map(|(field, value)| {
62                EntityPropertyChange::new(field.clone(), None, Some(value.clone()))
63            })
64            .collect();
65        Self {
66            kind: RawAuditEventKind::Created,
67            entity: entity.into(),
68            values: values.clone(),
69            updated_fields: Vec::new(),
70            old_values: None,
71            new_values: Some(values),
72            changes,
73            trace_chain: Vec::new(),
74        }
75    }
76
77    pub fn updated(entity: impl Into<String>, values: Record) -> Self {
78        let updated_fields = values.keys().cloned().collect::<Vec<_>>();
79        let changes = Self::changes_for_fields(None, Some(&values), &updated_fields);
80        Self {
81            kind: RawAuditEventKind::Updated,
82            entity: entity.into(),
83            values: values.clone(),
84            updated_fields,
85            old_values: None,
86            new_values: Some(values),
87            changes,
88            trace_chain: Vec::new(),
89        }
90    }
91
92    pub fn updated_with_old_values(
93        entity: impl Into<String>,
94        values: Record,
95        old_values: Option<Record>,
96        new_values: Record,
97        updated_fields: Vec<String>,
98    ) -> Self {
99        let changes =
100            Self::changes_for_fields(old_values.as_ref(), Some(&new_values), &updated_fields);
101        Self {
102            kind: RawAuditEventKind::Updated,
103            entity: entity.into(),
104            values,
105            updated_fields,
106            old_values,
107            new_values: Some(new_values),
108            changes,
109            trace_chain: Vec::new(),
110        }
111    }
112
113    pub fn deleted(entity: impl Into<String>, id: Value, expected_version: Option<i64>) -> Self {
114        let mut values = Record::from([("id".to_owned(), id)]);
115        if let Some(version) = expected_version {
116            values.insert("version".to_owned(), Value::I64(version));
117        }
118        Self {
119            kind: RawAuditEventKind::Deleted,
120            entity: entity.into(),
121            values,
122            updated_fields: Vec::new(),
123            old_values: None,
124            new_values: None,
125            changes: Vec::new(),
126            trace_chain: Vec::new(),
127        }
128    }
129
130    pub fn deleted_with_old_values(
131        entity: impl Into<String>,
132        id: Value,
133        expected_version: Option<i64>,
134        old_values: Option<Record>,
135    ) -> Self {
136        let mut event = Self::deleted(entity, id, expected_version);
137        event.changes = old_values
138            .as_ref()
139            .map(|values| {
140                values
141                    .iter()
142                    .map(|(field, value)| {
143                        EntityPropertyChange::new(field.clone(), Some(value.clone()), None)
144                    })
145                    .collect()
146            })
147            .unwrap_or_default();
148        event.old_values = old_values;
149        event
150    }
151
152    pub fn recovered(entity: impl Into<String>, id: Value, expected_version: i64) -> Self {
153        let values = Record::from([
154            ("id".to_owned(), id),
155            ("version".to_owned(), Value::I64(expected_version)),
156        ]);
157        Self {
158            kind: RawAuditEventKind::Recovered,
159            entity: entity.into(),
160            values,
161            updated_fields: Vec::new(),
162            old_values: None,
163            new_values: None,
164            changes: Vec::new(),
165            trace_chain: Vec::new(),
166        }
167    }
168
169    pub fn recovered_with_old_values(
170        entity: impl Into<String>,
171        id: Value,
172        expected_version: i64,
173        old_values: Option<Record>,
174    ) -> Self {
175        let recovered_version = -expected_version + 1;
176        let mut new_values = old_values.clone().unwrap_or_default();
177        new_values.insert("id".to_owned(), id.clone());
178        new_values.insert("version".to_owned(), Value::I64(recovered_version));
179        let mut event = Self::recovered(entity, id, expected_version);
180        event.old_values = old_values;
181        event.new_values = Some(new_values.clone());
182        event.changes = Self::changes_for_fields(
183            event.old_values.as_ref(),
184            Some(&new_values),
185            &["version".to_owned()],
186        );
187        event
188    }
189
190    /// A new table was created during schema bootstrap.
191    pub fn schema_created(
192        entity: impl Into<String>,
193        table_name: impl Into<String>,
194        field_count: usize,
195    ) -> Self {
196        let entity = entity.into();
197        let values = Record::from([
198            ("table_name".to_owned(), Value::Text(table_name.into())),
199            ("field_count".to_owned(), Value::I64(field_count as i64)),
200        ]);
201        let changes = values.iter().map(|(k, v)| EntityPropertyChange::new(k.clone(), None, Some(v.clone()))).collect();
202        Self {
203            kind: RawAuditEventKind::SchemaCreated,
204            entity,
205            values,
206            updated_fields: Vec::new(),
207            old_values: None,
208            new_values: None,
209            changes,
210            trace_chain: Vec::new(),
211        }
212    }
213
214    /// An existing table was verified during schema bootstrap.
215    pub fn schema_verified(
216        entity: impl Into<String>,
217        table_name: impl Into<String>,
218        field_count: usize,
219    ) -> Self {
220        let entity = entity.into();
221        let values = Record::from([
222            ("table_name".to_owned(), Value::Text(table_name.into())),
223            ("field_count".to_owned(), Value::I64(field_count as i64)),
224        ]);
225        let changes = values.iter().map(|(k, v)| EntityPropertyChange::new(k.clone(), None, Some(v.clone()))).collect();
226        Self {
227            kind: RawAuditEventKind::SchemaVerified,
228            entity,
229            values,
230            updated_fields: Vec::new(),
231            old_values: None,
232            new_values: None,
233            changes,
234            trace_chain: Vec::new(),
235        }
236    }
237
238    /// A new column was added to an existing table (schema migration).
239    pub fn field_added(
240        entity: impl Into<String>,
241        table_name: impl Into<String>,
242        field_name: impl Into<String>,
243    ) -> Self {
244        let entity = entity.into();
245        let values = Record::from([
246            ("table_name".to_owned(), Value::Text(table_name.into())),
247            ("field_name".to_owned(), Value::Text(field_name.into())),
248        ]);
249        let changes = values.iter().map(|(k, v)| EntityPropertyChange::new(k.clone(), None, Some(v.clone()))).collect();
250        Self {
251            kind: RawAuditEventKind::FieldAdded,
252            entity,
253            values,
254            updated_fields: Vec::new(),
255            old_values: None,
256            new_values: None,
257            changes,
258            trace_chain: Vec::new(),
259        }
260    }
261
262    /// Initial seed data was inserted or updated during bootstrap.
263    pub fn data_seeded(
264        entity: impl Into<String>,
265        table_name: impl Into<String>,
266        inserted: usize,
267        updated: usize,
268    ) -> Self {
269        let entity = entity.into();
270        let values = Record::from([
271            ("table_name".to_owned(), Value::Text(table_name.into())),
272            ("inserted".to_owned(), Value::I64(inserted as i64)),
273            ("updated".to_owned(), Value::I64(updated as i64)),
274        ]);
275        let changes = values.iter().map(|(k, v)| EntityPropertyChange::new(k.clone(), None, Some(v.clone()))).collect();
276        Self {
277            kind: RawAuditEventKind::DataSeeded,
278            entity,
279            values,
280            updated_fields: Vec::new(),
281            old_values: None,
282            new_values: None,
283            changes,
284            trace_chain: Vec::new(),
285        }
286    }
287
288    fn changes_for_fields(
289        old_values: Option<&Record>,
290        new_values: Option<&Record>,
291        fields: &[String],
292    ) -> Vec<EntityPropertyChange> {
293        fields
294            .iter()
295            .map(|field| {
296                EntityPropertyChange::new(
297                    field.clone(),
298                    old_values.and_then(|values| values.get(field).cloned()),
299                    new_values.and_then(|values| values.get(field).cloned()),
300                )
301            })
302            .collect()
303    }
304
305    pub fn build_safe_event(
306        &self,
307        audit_mask_fields: &[String],
308        audit_value_max_len: Option<usize>,
309    ) -> SafeAuditEvent {
310        let mut safe_fields = Vec::new();
311        for change in &self.changes {
312            // For audit, if it's masked or we just want the new/old values, we should represent it stringified.
313            // Usually we care about the new value in SafeAuditEvent. Or maybe we want to represent the change.
314            // Based on design doc, we stringify the value and apply masks.
315            let raw_val_str = change.new_value.as_ref().map(|v| format!("{:?}", v));
316            let safe_field = build_safe_audit_field(
317                &change.field,
318                raw_val_str.as_deref(),
319                audit_mask_fields,
320                audit_value_max_len,
321            );
322            safe_fields.push(safe_field);
323        }
324
325        SafeAuditEvent {
326            kind: self.kind,
327            entity: self.entity.clone(),
328            fields: safe_fields,
329            trace_chain: self.trace_chain.clone(),
330        }
331    }
332}
333
334pub fn mask_audit_value(value: &str) -> String {
335    let chars: Vec<char> = value.chars().collect();
336    let len = chars.len();
337
338    if len == 0 {
339        return String::new();
340    }
341
342    if chars.iter().all(|c| c.is_ascii_digit()) {
343        return "*".repeat(len);
344    }
345
346    if len < 8 {
347        return "*".repeat(len);
348    }
349
350    let prefix: String = chars[0..2].iter().collect();
351    let suffix: String = chars[len - 2..len].iter().collect();
352    let middle = "*".repeat(len - 4);
353
354    format!("{}{}{}", prefix, middle, suffix)
355}
356
357pub fn limit_audit_value(value: &str, max_len: usize) -> (String, bool) {
358    let chars: Vec<char> = value.chars().collect();
359    let len = chars.len();
360
361    if len <= max_len {
362        return (value.to_string(), false);
363    }
364
365    if max_len <= 3 {
366        return ("*".repeat(max_len), true);
367    }
368
369    let marker = "...";
370    let keep_len = max_len - marker.len();
371    let head_len = keep_len / 2;
372    let tail_len = keep_len - head_len;
373
374    let head: String = chars[0..head_len].iter().collect();
375    let tail: String = chars[len - tail_len..len].iter().collect();
376
377    (format!("{}{}{}", head, marker, tail), true)
378}
379
380pub fn build_safe_audit_field(
381    field_name: &str,
382    raw_value: Option<&str>,
383    audit_mask_fields: &[String],
384    audit_value_max_len: Option<usize>,
385) -> SafeAuditField {
386    match raw_value {
387        None => SafeAuditField {
388            name: field_name.to_string(),
389            value: None,
390            masked: false,
391            truncated: false,
392            raw_length: None,
393            output_length: None,
394            mask_reason: None,
395            truncate_reason: None,
396        },
397        Some(raw) => {
398            let raw_length = raw.chars().count();
399            let should_mask = audit_mask_fields.iter().any(|f| f == field_name);
400
401            let mut value = if should_mask {
402                mask_audit_value(raw)
403            } else {
404                raw.to_string()
405            };
406
407            let mut truncated = false;
408            if let Some(max_len) = audit_value_max_len {
409                let result = limit_audit_value(&value, max_len);
410                value = result.0;
411                truncated = result.1;
412            }
413
414            let output_length = value.chars().count();
415
416            SafeAuditField {
417                name: field_name.to_string(),
418                value: Some(value),
419                masked: should_mask,
420                truncated,
421                raw_length: Some(raw_length),
422                output_length: Some(output_length),
423                mask_reason: if should_mask {
424                    Some("_audit_mask_fields".to_string())
425                } else {
426                    None
427                },
428                truncate_reason: if truncated {
429                    Some("_audit_value_max_len".to_string())
430                } else {
431                    None
432                },
433            }
434        }
435    }
436}
437
438pub trait RawAuditEventSink: Send + Sync {
439    fn on_event(&self, ctx: &UserContext, event: &RawAuditEvent) -> Result<(), RuntimeError>;
440}
441
442#[derive(Default, Clone)]
443pub struct InMemoryRawAuditEventSink {
444    sinks: Vec<Arc<dyn RawAuditEventSink>>,
445}
446
447impl InMemoryRawAuditEventSink {
448    pub fn new() -> Self {
449        Self::default()
450    }
451
452    pub fn register(&mut self, sink: impl RawAuditEventSink + 'static) {
453        self.sinks.push(Arc::new(sink));
454    }
455
456    pub fn with_sink(mut self, sink: impl RawAuditEventSink + 'static) -> Self {
457        self.register(sink);
458        self
459    }
460}
461
462impl RawAuditEventSink for InMemoryRawAuditEventSink {
463    fn on_event(&self, ctx: &UserContext, event: &RawAuditEvent) -> Result<(), RuntimeError> {
464        for sink in &self.sinks {
465            sink.on_event(ctx, event)?;
466        }
467        Ok(())
468    }
469}
470
471#[derive(Debug, Clone, PartialEq)]
472pub struct SafeAuditField {
473    pub name: String,
474    pub value: Option<String>,
475    pub masked: bool,
476    pub truncated: bool,
477    pub raw_length: Option<usize>,
478    pub output_length: Option<usize>,
479    pub mask_reason: Option<String>,
480    pub truncate_reason: Option<String>,
481}
482
483#[derive(Debug, Clone, PartialEq)]
484pub struct SafeAuditEvent {
485    pub kind: RawAuditEventKind,
486    pub entity: String,
487    pub fields: Vec<SafeAuditField>,
488    pub trace_chain: Vec<teaql_core::TraceNode>,
489}
490
491pub trait SafeAuditEventSink: Send + Sync {
492    fn on_safe_event(&self, ctx: &crate::UserContext, event: &SafeAuditEvent) -> Result<(), crate::RuntimeError>;
493}