Skip to main content

teaql_runtime/
event.rs

1use std::sync::Arc;
2
3use teaql_core::{Record, Value};
4
5use crate::{RuntimeError, UserContext};
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub enum RawAuditEventKind {
9    Created,
10    Updated,
11    Deleted,
12    Recovered,
13    /// Emitted when a new table is created during schema bootstrap.
14    SchemaCreated,
15    /// Emitted when an existing table is verified during schema bootstrap.
16    SchemaVerified,
17    /// Emitted when a new column is added to an existing table (schema migration).
18    FieldAdded,
19    /// Emitted when initial seed data is inserted or updated during bootstrap.
20    DataSeeded,
21}
22
23#[derive(Debug, Clone, PartialEq)]
24pub struct EntityPropertyChange {
25    pub field: String,
26    pub old_value: Option<Value>,
27    pub new_value: Option<Value>,
28}
29
30impl EntityPropertyChange {
31    pub fn new(
32        field: impl Into<String>,
33        old_value: Option<Value>,
34        new_value: Option<Value>,
35    ) -> Self {
36        Self {
37            field: field.into(),
38            old_value,
39            new_value,
40        }
41    }
42}
43
44#[derive(Debug, Clone, PartialEq)]
45pub struct RawAuditEvent {
46    pub kind: RawAuditEventKind,
47    pub entity: String,
48    pub values: Record,
49    pub updated_fields: Vec<String>,
50    pub old_values: Option<Record>,
51    pub new_values: Option<Record>,
52    pub changes: Vec<EntityPropertyChange>,
53    /// Annotation trace chain from the graph save scope chain.
54    pub trace_chain: Vec<teaql_core::TraceNode>,
55}
56
57impl RawAuditEvent {
58    pub fn created(entity: impl Into<String>, values: Record) -> Self {
59        let changes = values
60            .iter()
61            .map(|(field, value)| {
62                EntityPropertyChange::new(field.clone(), None, Some(value.clone()))
63            })
64            .collect();
65        Self {
66            kind: RawAuditEventKind::Created,
67            entity: entity.into(),
68            values: values.clone(),
69            updated_fields: Vec::new(),
70            old_values: None,
71            new_values: Some(values),
72            changes,
73            trace_chain: Vec::new(),
74        }
75    }
76
77    pub fn updated(entity: impl Into<String>, values: Record) -> Self {
78        let updated_fields = values.keys().cloned().collect::<Vec<_>>();
79        let changes = Self::changes_for_fields(None, Some(&values), &updated_fields);
80        Self {
81            kind: RawAuditEventKind::Updated,
82            entity: entity.into(),
83            values: values.clone(),
84            updated_fields,
85            old_values: None,
86            new_values: Some(values),
87            changes,
88            trace_chain: Vec::new(),
89        }
90    }
91
92    pub fn updated_with_old_values(
93        entity: impl Into<String>,
94        values: Record,
95        old_values: Option<Record>,
96        new_values: Record,
97        updated_fields: Vec<String>,
98    ) -> Self {
99        let changes =
100            Self::changes_for_fields(old_values.as_ref(), Some(&new_values), &updated_fields);
101        Self {
102            kind: RawAuditEventKind::Updated,
103            entity: entity.into(),
104            values,
105            updated_fields,
106            old_values,
107            new_values: Some(new_values),
108            changes,
109            trace_chain: Vec::new(),
110        }
111    }
112
113    pub fn deleted(entity: impl Into<String>, id: Value, expected_version: Option<i64>) -> Self {
114        let mut values = Record::from([("id".to_owned(), id)]);
115        if let Some(version) = expected_version {
116            values.insert("version".to_owned(), Value::I64(version));
117        }
118        Self {
119            kind: RawAuditEventKind::Deleted,
120            entity: entity.into(),
121            values,
122            updated_fields: Vec::new(),
123            old_values: None,
124            new_values: None,
125            changes: Vec::new(),
126            trace_chain: Vec::new(),
127        }
128    }
129
130    pub fn deleted_with_old_values(
131        entity: impl Into<String>,
132        id: Value,
133        expected_version: Option<i64>,
134        old_values: Option<Record>,
135    ) -> Self {
136        let mut event = Self::deleted(entity, id, expected_version);
137        event.changes = old_values
138            .as_ref()
139            .map(|values| {
140                values
141                    .iter()
142                    .map(|(field, value)| {
143                        EntityPropertyChange::new(field.clone(), Some(value.clone()), None)
144                    })
145                    .collect()
146            })
147            .unwrap_or_default();
148        event.old_values = old_values;
149        event
150    }
151
152    pub fn recovered(entity: impl Into<String>, id: Value, expected_version: i64) -> Self {
153        let values = Record::from([
154            ("id".to_owned(), id),
155            ("version".to_owned(), Value::I64(expected_version)),
156        ]);
157        Self {
158            kind: RawAuditEventKind::Recovered,
159            entity: entity.into(),
160            values,
161            updated_fields: Vec::new(),
162            old_values: None,
163            new_values: None,
164            changes: Vec::new(),
165            trace_chain: Vec::new(),
166        }
167    }
168
169    pub fn recovered_with_old_values(
170        entity: impl Into<String>,
171        id: Value,
172        expected_version: i64,
173        old_values: Option<Record>,
174    ) -> Self {
175        let recovered_version = -expected_version + 1;
176        let mut new_values = old_values.clone().unwrap_or_default();
177        new_values.insert("id".to_owned(), id.clone());
178        new_values.insert("version".to_owned(), Value::I64(recovered_version));
179        let mut event = Self::recovered(entity, id, expected_version);
180        event.old_values = old_values;
181        event.new_values = Some(new_values.clone());
182        event.changes = Self::changes_for_fields(
183            event.old_values.as_ref(),
184            Some(&new_values),
185            &["version".to_owned()],
186        );
187        event
188    }
189
190    /// A new table was created during schema bootstrap.
191    pub fn schema_created(
192        entity: impl Into<String>,
193        table_name: impl Into<String>,
194        field_count: usize,
195    ) -> Self {
196        let entity = entity.into();
197        let values = Record::from([
198            ("table_name".to_owned(), Value::Text(table_name.into())),
199            ("field_count".to_owned(), Value::I64(field_count as i64)),
200        ]);
201        let changes = values.iter().map(|(k, v)| EntityPropertyChange::new(k.clone(), None, Some(v.clone()))).collect();
202        Self {
203            kind: RawAuditEventKind::SchemaCreated,
204            entity,
205            values,
206            updated_fields: Vec::new(),
207            old_values: None,
208            new_values: None,
209            changes,
210            trace_chain: Vec::new(),
211        }
212    }
213
214    /// An existing table was verified during schema bootstrap.
215    pub fn schema_verified(
216        entity: impl Into<String>,
217        table_name: impl Into<String>,
218        field_count: usize,
219    ) -> Self {
220        let entity = entity.into();
221        let values = Record::from([
222            ("table_name".to_owned(), Value::Text(table_name.into())),
223            ("field_count".to_owned(), Value::I64(field_count as i64)),
224        ]);
225        let changes = values.iter().map(|(k, v)| EntityPropertyChange::new(k.clone(), None, Some(v.clone()))).collect();
226        Self {
227            kind: RawAuditEventKind::SchemaVerified,
228            entity,
229            values,
230            updated_fields: Vec::new(),
231            old_values: None,
232            new_values: None,
233            changes,
234            trace_chain: Vec::new(),
235        }
236    }
237
238    /// A new column was added to an existing table (schema migration).
239    pub fn field_added(
240        entity: impl Into<String>,
241        table_name: impl Into<String>,
242        field_name: impl Into<String>,
243    ) -> Self {
244        let entity = entity.into();
245        let values = Record::from([
246            ("table_name".to_owned(), Value::Text(table_name.into())),
247            ("field_name".to_owned(), Value::Text(field_name.into())),
248        ]);
249        let changes = values.iter().map(|(k, v)| EntityPropertyChange::new(k.clone(), None, Some(v.clone()))).collect();
250        Self {
251            kind: RawAuditEventKind::FieldAdded,
252            entity,
253            values,
254            updated_fields: Vec::new(),
255            old_values: None,
256            new_values: None,
257            changes,
258            trace_chain: Vec::new(),
259        }
260    }
261
262    /// Initial seed data was inserted or updated during bootstrap.
263    pub fn data_seeded(
264        entity: impl Into<String>,
265        table_name: impl Into<String>,
266        inserted: usize,
267        updated: usize,
268    ) -> Self {
269        let entity = entity.into();
270        let values = Record::from([
271            ("table_name".to_owned(), Value::Text(table_name.into())),
272            ("inserted".to_owned(), Value::I64(inserted as i64)),
273            ("updated".to_owned(), Value::I64(updated as i64)),
274        ]);
275        let changes = values.iter().map(|(k, v)| EntityPropertyChange::new(k.clone(), None, Some(v.clone()))).collect();
276        Self {
277            kind: RawAuditEventKind::DataSeeded,
278            entity,
279            values,
280            updated_fields: Vec::new(),
281            old_values: None,
282            new_values: None,
283            changes,
284            trace_chain: Vec::new(),
285        }
286    }
287
288    fn changes_for_fields(
289        old_values: Option<&Record>,
290        new_values: Option<&Record>,
291        fields: &[String],
292    ) -> Vec<EntityPropertyChange> {
293        fields
294            .iter()
295            .map(|field| {
296                EntityPropertyChange::new(
297                    field.clone(),
298                    old_values.and_then(|values| values.get(field).cloned()),
299                    new_values.and_then(|values| values.get(field).cloned()),
300                )
301            })
302            .collect()
303    }
304
305    pub fn build_safe_event(
306        &self,
307        audit_mask_fields: &[String],
308        audit_value_max_len: Option<usize>,
309    ) -> SafeAuditEvent {
310        let mut safe_fields = Vec::new();
311        for change in &self.changes {
312            if change.field.starts_with('_') {
313                continue;
314            }
315            // For audit, if it's masked or we just want the new/old values, we should represent it stringified.
316            // Usually we care about the new value in SafeAuditEvent. Or maybe we want to represent the change.
317            // Based on design doc, we stringify the value and apply masks.
318            let raw_val_str = change.new_value.as_ref().map(|v| format!("{:?}", v));
319            let safe_field = build_safe_audit_field(
320                &change.field,
321                raw_val_str.as_deref(),
322                audit_mask_fields,
323                audit_value_max_len,
324            );
325            safe_fields.push(safe_field);
326        }
327
328        SafeAuditEvent {
329            kind: self.kind,
330            entity: self.entity.clone(),
331            fields: safe_fields,
332            trace_chain: self.trace_chain.clone(),
333        }
334    }
335}
336
337pub fn mask_audit_value(value: &str) -> String {
338    let chars: Vec<char> = value.chars().collect();
339    let len = chars.len();
340
341    if len == 0 {
342        return String::new();
343    }
344
345    if chars.iter().all(|c| c.is_ascii_digit()) {
346        return "*".repeat(len);
347    }
348
349    if len < 8 {
350        return "*".repeat(len);
351    }
352
353    let prefix: String = chars[0..2].iter().collect();
354    let suffix: String = chars[len - 2..len].iter().collect();
355    let middle = "*".repeat(len - 4);
356
357    format!("{}{}{}", prefix, middle, suffix)
358}
359
360pub fn limit_audit_value(value: &str, max_len: usize) -> (String, bool) {
361    let chars: Vec<char> = value.chars().collect();
362    let len = chars.len();
363
364    if len <= max_len {
365        return (value.to_string(), false);
366    }
367
368    if max_len <= 3 {
369        return ("*".repeat(max_len), true);
370    }
371
372    let marker = "...";
373    let keep_len = max_len - marker.len();
374    let head_len = keep_len / 2;
375    let tail_len = keep_len - head_len;
376
377    let head: String = chars[0..head_len].iter().collect();
378    let tail: String = chars[len - tail_len..len].iter().collect();
379
380    (format!("{}{}{}", head, marker, tail), true)
381}
382
383pub fn build_safe_audit_field(
384    field_name: &str,
385    raw_value: Option<&str>,
386    audit_mask_fields: &[String],
387    audit_value_max_len: Option<usize>,
388) -> SafeAuditField {
389    match raw_value {
390        None => SafeAuditField {
391            name: field_name.to_string(),
392            value: None,
393            masked: false,
394            truncated: false,
395            raw_length: None,
396            output_length: None,
397            mask_reason: None,
398            truncate_reason: None,
399        },
400        Some(raw) => {
401            let raw_length = raw.chars().count();
402            let should_mask = audit_mask_fields.iter().any(|f| f == field_name);
403
404            let mut value = if should_mask {
405                mask_audit_value(raw)
406            } else {
407                raw.to_string()
408            };
409
410            let mut truncated = false;
411            if let Some(max_len) = audit_value_max_len {
412                let result = limit_audit_value(&value, max_len);
413                value = result.0;
414                truncated = result.1;
415            }
416
417            let output_length = value.chars().count();
418
419            SafeAuditField {
420                name: field_name.to_string(),
421                value: Some(value),
422                masked: should_mask,
423                truncated,
424                raw_length: Some(raw_length),
425                output_length: Some(output_length),
426                mask_reason: if should_mask {
427                    Some("_audit_mask_fields".to_string())
428                } else {
429                    None
430                },
431                truncate_reason: if truncated {
432                    Some("_audit_value_max_len".to_string())
433                } else {
434                    None
435                },
436            }
437        }
438    }
439}
440
441pub trait RawAuditEventSink: Send + Sync {
442    fn on_event(&self, ctx: &UserContext, event: &RawAuditEvent) -> Result<(), RuntimeError>;
443}
444
445#[derive(Default, Clone)]
446pub struct InMemoryRawAuditEventSink {
447    sinks: Vec<Arc<dyn RawAuditEventSink>>,
448}
449
450impl InMemoryRawAuditEventSink {
451    pub fn new() -> Self {
452        Self::default()
453    }
454
455    pub fn register(&mut self, sink: impl RawAuditEventSink + 'static) {
456        self.sinks.push(Arc::new(sink));
457    }
458
459    pub fn with_sink(mut self, sink: impl RawAuditEventSink + 'static) -> Self {
460        self.register(sink);
461        self
462    }
463}
464
465impl RawAuditEventSink for InMemoryRawAuditEventSink {
466    fn on_event(&self, ctx: &UserContext, event: &RawAuditEvent) -> Result<(), RuntimeError> {
467        for sink in &self.sinks {
468            sink.on_event(ctx, event)?;
469        }
470        Ok(())
471    }
472}
473
474#[derive(Debug, Clone, PartialEq)]
475pub struct SafeAuditField {
476    pub name: String,
477    pub value: Option<String>,
478    pub masked: bool,
479    pub truncated: bool,
480    pub raw_length: Option<usize>,
481    pub output_length: Option<usize>,
482    pub mask_reason: Option<String>,
483    pub truncate_reason: Option<String>,
484}
485
486#[derive(Debug, Clone, PartialEq)]
487pub struct SafeAuditEvent {
488    pub kind: RawAuditEventKind,
489    pub entity: String,
490    pub fields: Vec<SafeAuditField>,
491    pub trace_chain: Vec<teaql_core::TraceNode>,
492}
493
494pub trait SafeAuditEventSink: Send + Sync {
495    fn on_safe_event(&self, ctx: &crate::UserContext, event: &SafeAuditEvent) -> Result<(), crate::RuntimeError>;
496}