Skip to main content

teaql_runtime/
event.rs

1use std::sync::Arc;
2
3use teaql_core::{Record, Value};
4
5use crate::{RuntimeError, UserContext};
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub enum EntityEventKind {
9    Created,
10    Updated,
11    Deleted,
12    Recovered,
13}
14
15#[derive(Debug, Clone, PartialEq)]
16pub struct EntityPropertyChange {
17    pub field: String,
18    pub old_value: Option<Value>,
19    pub new_value: Option<Value>,
20}
21
22impl EntityPropertyChange {
23    pub fn new(
24        field: impl Into<String>,
25        old_value: Option<Value>,
26        new_value: Option<Value>,
27    ) -> Self {
28        Self {
29            field: field.into(),
30            old_value,
31            new_value,
32        }
33    }
34}
35
36#[derive(Debug, Clone, PartialEq)]
37pub struct EntityEvent {
38    pub kind: EntityEventKind,
39    pub entity: String,
40    pub values: Record,
41    pub updated_fields: Vec<String>,
42    pub old_values: Option<Record>,
43    pub new_values: Option<Record>,
44    pub changes: Vec<EntityPropertyChange>,
45    /// Annotation trace chain from the graph save scope chain.
46    pub trace_chain: Vec<teaql_core::TraceNode>,
47}
48
49impl EntityEvent {
50    pub fn created(entity: impl Into<String>, values: Record) -> Self {
51        let changes = values
52            .iter()
53            .map(|(field, value)| {
54                EntityPropertyChange::new(field.clone(), None, Some(value.clone()))
55            })
56            .collect();
57        Self {
58            kind: EntityEventKind::Created,
59            entity: entity.into(),
60            values: values.clone(),
61            updated_fields: Vec::new(),
62            old_values: None,
63            new_values: Some(values),
64            changes,
65            trace_chain: Vec::new(),
66        }
67    }
68
69    pub fn updated(entity: impl Into<String>, values: Record) -> Self {
70        let updated_fields = values.keys().cloned().collect::<Vec<_>>();
71        let changes = Self::changes_for_fields(None, Some(&values), &updated_fields);
72        Self {
73            kind: EntityEventKind::Updated,
74            entity: entity.into(),
75            values: values.clone(),
76            updated_fields,
77            old_values: None,
78            new_values: Some(values),
79            changes,
80            trace_chain: Vec::new(),
81        }
82    }
83
84    pub fn updated_with_old_values(
85        entity: impl Into<String>,
86        values: Record,
87        old_values: Option<Record>,
88        new_values: Record,
89        updated_fields: Vec<String>,
90    ) -> Self {
91        let changes =
92            Self::changes_for_fields(old_values.as_ref(), Some(&new_values), &updated_fields);
93        Self {
94            kind: EntityEventKind::Updated,
95            entity: entity.into(),
96            values,
97            updated_fields,
98            old_values,
99            new_values: Some(new_values),
100            changes,
101            trace_chain: Vec::new(),
102        }
103    }
104
105    pub fn deleted(entity: impl Into<String>, id: Value, expected_version: Option<i64>) -> Self {
106        let mut values = Record::from([("id".to_owned(), id)]);
107        if let Some(version) = expected_version {
108            values.insert("version".to_owned(), Value::I64(version));
109        }
110        Self {
111            kind: EntityEventKind::Deleted,
112            entity: entity.into(),
113            values,
114            updated_fields: Vec::new(),
115            old_values: None,
116            new_values: None,
117            changes: Vec::new(),
118            trace_chain: Vec::new(),
119        }
120    }
121
122    pub fn deleted_with_old_values(
123        entity: impl Into<String>,
124        id: Value,
125        expected_version: Option<i64>,
126        old_values: Option<Record>,
127    ) -> Self {
128        let mut event = Self::deleted(entity, id, expected_version);
129        event.changes = old_values
130            .as_ref()
131            .map(|values| {
132                values
133                    .iter()
134                    .map(|(field, value)| {
135                        EntityPropertyChange::new(field.clone(), Some(value.clone()), None)
136                    })
137                    .collect()
138            })
139            .unwrap_or_default();
140        event.old_values = old_values;
141        event
142    }
143
144    pub fn recovered(entity: impl Into<String>, id: Value, expected_version: i64) -> Self {
145        let values = Record::from([
146            ("id".to_owned(), id),
147            ("version".to_owned(), Value::I64(expected_version)),
148        ]);
149        Self {
150            kind: EntityEventKind::Recovered,
151            entity: entity.into(),
152            values,
153            updated_fields: Vec::new(),
154            old_values: None,
155            new_values: None,
156            changes: Vec::new(),
157            trace_chain: Vec::new(),
158        }
159    }
160
161    pub fn recovered_with_old_values(
162        entity: impl Into<String>,
163        id: Value,
164        expected_version: i64,
165        old_values: Option<Record>,
166    ) -> Self {
167        let recovered_version = -expected_version + 1;
168        let mut new_values = old_values.clone().unwrap_or_default();
169        new_values.insert("id".to_owned(), id.clone());
170        new_values.insert("version".to_owned(), Value::I64(recovered_version));
171        let mut event = Self::recovered(entity, id, expected_version);
172        event.old_values = old_values;
173        event.new_values = Some(new_values.clone());
174        event.changes = Self::changes_for_fields(
175            event.old_values.as_ref(),
176            Some(&new_values),
177            &["version".to_owned()],
178        );
179        event
180    }
181
182    fn changes_for_fields(
183        old_values: Option<&Record>,
184        new_values: Option<&Record>,
185        fields: &[String],
186    ) -> Vec<EntityPropertyChange> {
187        fields
188            .iter()
189            .map(|field| {
190                EntityPropertyChange::new(
191                    field.clone(),
192                    old_values.and_then(|values| values.get(field).cloned()),
193                    new_values.and_then(|values| values.get(field).cloned()),
194                )
195            })
196            .collect()
197    }
198}
199
200pub trait EntityEventSink: Send + Sync {
201    fn on_event(&self, ctx: &UserContext, event: &EntityEvent) -> Result<(), RuntimeError>;
202}
203
204#[derive(Default, Clone)]
205pub struct InMemoryEntityEventSink {
206    sinks: Vec<Arc<dyn EntityEventSink>>,
207}
208
209impl InMemoryEntityEventSink {
210    pub fn new() -> Self {
211        Self::default()
212    }
213
214    pub fn register(&mut self, sink: impl EntityEventSink + 'static) {
215        self.sinks.push(Arc::new(sink));
216    }
217
218    pub fn with_sink(mut self, sink: impl EntityEventSink + 'static) -> Self {
219        self.register(sink);
220        self
221    }
222}
223
224impl EntityEventSink for InMemoryEntityEventSink {
225    fn on_event(&self, ctx: &UserContext, event: &EntityEvent) -> Result<(), RuntimeError> {
226        for sink in &self.sinks {
227            sink.on_event(ctx, event)?;
228        }
229        Ok(())
230    }
231}