Skip to main content

teaql_runtime/
event.rs

1use std::sync::Arc;
2
3use teaql_core::{Record, Value};
4
5use crate::{RuntimeError, UserContext};
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub enum EntityEventKind {
9    Created,
10    Updated,
11    Deleted,
12    Recovered,
13    /// Emitted when a new table is created during schema bootstrap.
14    SchemaCreated,
15    /// Emitted when an existing table is verified during schema bootstrap.
16    SchemaVerified,
17    /// Emitted when a new column is added to an existing table (schema migration).
18    FieldAdded,
19    /// Emitted when initial seed data is inserted or updated during bootstrap.
20    DataSeeded,
21}
22
23#[derive(Debug, Clone, PartialEq)]
24pub struct EntityPropertyChange {
25    pub field: String,
26    pub old_value: Option<Value>,
27    pub new_value: Option<Value>,
28}
29
30impl EntityPropertyChange {
31    pub fn new(
32        field: impl Into<String>,
33        old_value: Option<Value>,
34        new_value: Option<Value>,
35    ) -> Self {
36        Self {
37            field: field.into(),
38            old_value,
39            new_value,
40        }
41    }
42}
43
44#[derive(Debug, Clone, PartialEq)]
45pub struct EntityEvent {
46    pub kind: EntityEventKind,
47    pub entity: String,
48    pub values: Record,
49    pub updated_fields: Vec<String>,
50    pub old_values: Option<Record>,
51    pub new_values: Option<Record>,
52    pub changes: Vec<EntityPropertyChange>,
53    /// Annotation trace chain from the graph save scope chain.
54    pub trace_chain: Vec<teaql_core::TraceNode>,
55}
56
57impl EntityEvent {
58    pub fn created(entity: impl Into<String>, values: Record) -> Self {
59        let changes = values
60            .iter()
61            .map(|(field, value)| {
62                EntityPropertyChange::new(field.clone(), None, Some(value.clone()))
63            })
64            .collect();
65        Self {
66            kind: EntityEventKind::Created,
67            entity: entity.into(),
68            values: values.clone(),
69            updated_fields: Vec::new(),
70            old_values: None,
71            new_values: Some(values),
72            changes,
73            trace_chain: Vec::new(),
74        }
75    }
76
77    pub fn updated(entity: impl Into<String>, values: Record) -> Self {
78        let updated_fields = values.keys().cloned().collect::<Vec<_>>();
79        let changes = Self::changes_for_fields(None, Some(&values), &updated_fields);
80        Self {
81            kind: EntityEventKind::Updated,
82            entity: entity.into(),
83            values: values.clone(),
84            updated_fields,
85            old_values: None,
86            new_values: Some(values),
87            changes,
88            trace_chain: Vec::new(),
89        }
90    }
91
92    pub fn updated_with_old_values(
93        entity: impl Into<String>,
94        values: Record,
95        old_values: Option<Record>,
96        new_values: Record,
97        updated_fields: Vec<String>,
98    ) -> Self {
99        let changes =
100            Self::changes_for_fields(old_values.as_ref(), Some(&new_values), &updated_fields);
101        Self {
102            kind: EntityEventKind::Updated,
103            entity: entity.into(),
104            values,
105            updated_fields,
106            old_values,
107            new_values: Some(new_values),
108            changes,
109            trace_chain: Vec::new(),
110        }
111    }
112
113    pub fn deleted(entity: impl Into<String>, id: Value, expected_version: Option<i64>) -> Self {
114        let mut values = Record::from([("id".to_owned(), id)]);
115        if let Some(version) = expected_version {
116            values.insert("version".to_owned(), Value::I64(version));
117        }
118        Self {
119            kind: EntityEventKind::Deleted,
120            entity: entity.into(),
121            values,
122            updated_fields: Vec::new(),
123            old_values: None,
124            new_values: None,
125            changes: Vec::new(),
126            trace_chain: Vec::new(),
127        }
128    }
129
130    pub fn deleted_with_old_values(
131        entity: impl Into<String>,
132        id: Value,
133        expected_version: Option<i64>,
134        old_values: Option<Record>,
135    ) -> Self {
136        let mut event = Self::deleted(entity, id, expected_version);
137        event.changes = old_values
138            .as_ref()
139            .map(|values| {
140                values
141                    .iter()
142                    .map(|(field, value)| {
143                        EntityPropertyChange::new(field.clone(), Some(value.clone()), None)
144                    })
145                    .collect()
146            })
147            .unwrap_or_default();
148        event.old_values = old_values;
149        event
150    }
151
152    pub fn recovered(entity: impl Into<String>, id: Value, expected_version: i64) -> Self {
153        let values = Record::from([
154            ("id".to_owned(), id),
155            ("version".to_owned(), Value::I64(expected_version)),
156        ]);
157        Self {
158            kind: EntityEventKind::Recovered,
159            entity: entity.into(),
160            values,
161            updated_fields: Vec::new(),
162            old_values: None,
163            new_values: None,
164            changes: Vec::new(),
165            trace_chain: Vec::new(),
166        }
167    }
168
169    pub fn recovered_with_old_values(
170        entity: impl Into<String>,
171        id: Value,
172        expected_version: i64,
173        old_values: Option<Record>,
174    ) -> Self {
175        let recovered_version = -expected_version + 1;
176        let mut new_values = old_values.clone().unwrap_or_default();
177        new_values.insert("id".to_owned(), id.clone());
178        new_values.insert("version".to_owned(), Value::I64(recovered_version));
179        let mut event = Self::recovered(entity, id, expected_version);
180        event.old_values = old_values;
181        event.new_values = Some(new_values.clone());
182        event.changes = Self::changes_for_fields(
183            event.old_values.as_ref(),
184            Some(&new_values),
185            &["version".to_owned()],
186        );
187        event
188    }
189
190    /// A new table was created during schema bootstrap.
191    pub fn schema_created(
192        entity: impl Into<String>,
193        table_name: impl Into<String>,
194        field_count: usize,
195    ) -> Self {
196        let entity = entity.into();
197        let values = Record::from([
198            ("table_name".to_owned(), Value::Text(table_name.into())),
199            ("field_count".to_owned(), Value::I64(field_count as i64)),
200        ]);
201        Self {
202            kind: EntityEventKind::SchemaCreated,
203            entity,
204            values,
205            updated_fields: Vec::new(),
206            old_values: None,
207            new_values: None,
208            changes: Vec::new(),
209            trace_chain: Vec::new(),
210        }
211    }
212
213    /// An existing table was verified during schema bootstrap.
214    pub fn schema_verified(
215        entity: impl Into<String>,
216        table_name: impl Into<String>,
217        field_count: usize,
218    ) -> Self {
219        let entity = entity.into();
220        let values = Record::from([
221            ("table_name".to_owned(), Value::Text(table_name.into())),
222            ("field_count".to_owned(), Value::I64(field_count as i64)),
223        ]);
224        Self {
225            kind: EntityEventKind::SchemaVerified,
226            entity,
227            values,
228            updated_fields: Vec::new(),
229            old_values: None,
230            new_values: None,
231            changes: Vec::new(),
232            trace_chain: Vec::new(),
233        }
234    }
235
236    /// A new column was added to an existing table (schema migration).
237    pub fn field_added(
238        entity: impl Into<String>,
239        table_name: impl Into<String>,
240        field_name: impl Into<String>,
241    ) -> Self {
242        let entity = entity.into();
243        let values = Record::from([
244            ("table_name".to_owned(), Value::Text(table_name.into())),
245            ("field_name".to_owned(), Value::Text(field_name.into())),
246        ]);
247        Self {
248            kind: EntityEventKind::FieldAdded,
249            entity,
250            values,
251            updated_fields: Vec::new(),
252            old_values: None,
253            new_values: None,
254            changes: Vec::new(),
255            trace_chain: Vec::new(),
256        }
257    }
258
259    /// Initial seed data was inserted or updated during bootstrap.
260    ///
261    /// - `inserted`: number of new records inserted
262    /// - `updated`: number of existing records updated
263    pub fn data_seeded(
264        entity: impl Into<String>,
265        table_name: impl Into<String>,
266        inserted: usize,
267        updated: usize,
268    ) -> Self {
269        let entity = entity.into();
270        let values = Record::from([
271            ("table_name".to_owned(), Value::Text(table_name.into())),
272            ("inserted".to_owned(), Value::I64(inserted as i64)),
273            ("updated".to_owned(), Value::I64(updated as i64)),
274        ]);
275        Self {
276            kind: EntityEventKind::DataSeeded,
277            entity,
278            values,
279            updated_fields: Vec::new(),
280            old_values: None,
281            new_values: None,
282            changes: Vec::new(),
283            trace_chain: Vec::new(),
284        }
285    }
286
287    fn changes_for_fields(
288        old_values: Option<&Record>,
289        new_values: Option<&Record>,
290        fields: &[String],
291    ) -> Vec<EntityPropertyChange> {
292        fields
293            .iter()
294            .map(|field| {
295                EntityPropertyChange::new(
296                    field.clone(),
297                    old_values.and_then(|values| values.get(field).cloned()),
298                    new_values.and_then(|values| values.get(field).cloned()),
299                )
300            })
301            .collect()
302    }
303}
304
305pub trait EntityEventSink: Send + Sync {
306    fn on_event(&self, ctx: &UserContext, event: &EntityEvent) -> Result<(), RuntimeError>;
307}
308
309#[derive(Default, Clone)]
310pub struct InMemoryEntityEventSink {
311    sinks: Vec<Arc<dyn EntityEventSink>>,
312}
313
314impl InMemoryEntityEventSink {
315    pub fn new() -> Self {
316        Self::default()
317    }
318
319    pub fn register(&mut self, sink: impl EntityEventSink + 'static) {
320        self.sinks.push(Arc::new(sink));
321    }
322
323    pub fn with_sink(mut self, sink: impl EntityEventSink + 'static) -> Self {
324        self.register(sink);
325        self
326    }
327}
328
329impl EntityEventSink for InMemoryEntityEventSink {
330    fn on_event(&self, ctx: &UserContext, event: &EntityEvent) -> Result<(), RuntimeError> {
331        for sink in &self.sinks {
332            sink.on_event(ctx, event)?;
333        }
334        Ok(())
335    }
336}