1use std::sync::Arc;
2
3use teaql_core::{Record, Value};
4
5use crate::{RuntimeError, UserContext};
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub enum EntityEventKind {
9 Created,
10 Updated,
11 Deleted,
12 Recovered,
13 SchemaCreated,
15 SchemaVerified,
17 FieldAdded,
19 DataSeeded,
21}
22
23#[derive(Debug, Clone, PartialEq)]
24pub struct EntityPropertyChange {
25 pub field: String,
26 pub old_value: Option<Value>,
27 pub new_value: Option<Value>,
28}
29
30impl EntityPropertyChange {
31 pub fn new(
32 field: impl Into<String>,
33 old_value: Option<Value>,
34 new_value: Option<Value>,
35 ) -> Self {
36 Self {
37 field: field.into(),
38 old_value,
39 new_value,
40 }
41 }
42}
43
44#[derive(Debug, Clone, PartialEq)]
45pub struct EntityEvent {
46 pub kind: EntityEventKind,
47 pub entity: String,
48 pub values: Record,
49 pub updated_fields: Vec<String>,
50 pub old_values: Option<Record>,
51 pub new_values: Option<Record>,
52 pub changes: Vec<EntityPropertyChange>,
53 pub trace_chain: Vec<teaql_core::TraceNode>,
55}
56
57impl EntityEvent {
58 pub fn created(entity: impl Into<String>, values: Record) -> Self {
59 let changes = values
60 .iter()
61 .map(|(field, value)| {
62 EntityPropertyChange::new(field.clone(), None, Some(value.clone()))
63 })
64 .collect();
65 Self {
66 kind: EntityEventKind::Created,
67 entity: entity.into(),
68 values: values.clone(),
69 updated_fields: Vec::new(),
70 old_values: None,
71 new_values: Some(values),
72 changes,
73 trace_chain: Vec::new(),
74 }
75 }
76
77 pub fn updated(entity: impl Into<String>, values: Record) -> Self {
78 let updated_fields = values.keys().cloned().collect::<Vec<_>>();
79 let changes = Self::changes_for_fields(None, Some(&values), &updated_fields);
80 Self {
81 kind: EntityEventKind::Updated,
82 entity: entity.into(),
83 values: values.clone(),
84 updated_fields,
85 old_values: None,
86 new_values: Some(values),
87 changes,
88 trace_chain: Vec::new(),
89 }
90 }
91
92 pub fn updated_with_old_values(
93 entity: impl Into<String>,
94 values: Record,
95 old_values: Option<Record>,
96 new_values: Record,
97 updated_fields: Vec<String>,
98 ) -> Self {
99 let changes =
100 Self::changes_for_fields(old_values.as_ref(), Some(&new_values), &updated_fields);
101 Self {
102 kind: EntityEventKind::Updated,
103 entity: entity.into(),
104 values,
105 updated_fields,
106 old_values,
107 new_values: Some(new_values),
108 changes,
109 trace_chain: Vec::new(),
110 }
111 }
112
113 pub fn deleted(entity: impl Into<String>, id: Value, expected_version: Option<i64>) -> Self {
114 let mut values = Record::from([("id".to_owned(), id)]);
115 if let Some(version) = expected_version {
116 values.insert("version".to_owned(), Value::I64(version));
117 }
118 Self {
119 kind: EntityEventKind::Deleted,
120 entity: entity.into(),
121 values,
122 updated_fields: Vec::new(),
123 old_values: None,
124 new_values: None,
125 changes: Vec::new(),
126 trace_chain: Vec::new(),
127 }
128 }
129
130 pub fn deleted_with_old_values(
131 entity: impl Into<String>,
132 id: Value,
133 expected_version: Option<i64>,
134 old_values: Option<Record>,
135 ) -> Self {
136 let mut event = Self::deleted(entity, id, expected_version);
137 event.changes = old_values
138 .as_ref()
139 .map(|values| {
140 values
141 .iter()
142 .map(|(field, value)| {
143 EntityPropertyChange::new(field.clone(), Some(value.clone()), None)
144 })
145 .collect()
146 })
147 .unwrap_or_default();
148 event.old_values = old_values;
149 event
150 }
151
152 pub fn recovered(entity: impl Into<String>, id: Value, expected_version: i64) -> Self {
153 let values = Record::from([
154 ("id".to_owned(), id),
155 ("version".to_owned(), Value::I64(expected_version)),
156 ]);
157 Self {
158 kind: EntityEventKind::Recovered,
159 entity: entity.into(),
160 values,
161 updated_fields: Vec::new(),
162 old_values: None,
163 new_values: None,
164 changes: Vec::new(),
165 trace_chain: Vec::new(),
166 }
167 }
168
169 pub fn recovered_with_old_values(
170 entity: impl Into<String>,
171 id: Value,
172 expected_version: i64,
173 old_values: Option<Record>,
174 ) -> Self {
175 let recovered_version = -expected_version + 1;
176 let mut new_values = old_values.clone().unwrap_or_default();
177 new_values.insert("id".to_owned(), id.clone());
178 new_values.insert("version".to_owned(), Value::I64(recovered_version));
179 let mut event = Self::recovered(entity, id, expected_version);
180 event.old_values = old_values;
181 event.new_values = Some(new_values.clone());
182 event.changes = Self::changes_for_fields(
183 event.old_values.as_ref(),
184 Some(&new_values),
185 &["version".to_owned()],
186 );
187 event
188 }
189
190 pub fn schema_created(
192 entity: impl Into<String>,
193 table_name: impl Into<String>,
194 field_count: usize,
195 ) -> Self {
196 let entity = entity.into();
197 let values = Record::from([
198 ("table_name".to_owned(), Value::Text(table_name.into())),
199 ("field_count".to_owned(), Value::I64(field_count as i64)),
200 ]);
201 Self {
202 kind: EntityEventKind::SchemaCreated,
203 entity,
204 values,
205 updated_fields: Vec::new(),
206 old_values: None,
207 new_values: None,
208 changes: Vec::new(),
209 trace_chain: Vec::new(),
210 }
211 }
212
213 pub fn schema_verified(
215 entity: impl Into<String>,
216 table_name: impl Into<String>,
217 field_count: usize,
218 ) -> Self {
219 let entity = entity.into();
220 let values = Record::from([
221 ("table_name".to_owned(), Value::Text(table_name.into())),
222 ("field_count".to_owned(), Value::I64(field_count as i64)),
223 ]);
224 Self {
225 kind: EntityEventKind::SchemaVerified,
226 entity,
227 values,
228 updated_fields: Vec::new(),
229 old_values: None,
230 new_values: None,
231 changes: Vec::new(),
232 trace_chain: Vec::new(),
233 }
234 }
235
236 pub fn field_added(
238 entity: impl Into<String>,
239 table_name: impl Into<String>,
240 field_name: impl Into<String>,
241 ) -> Self {
242 let entity = entity.into();
243 let values = Record::from([
244 ("table_name".to_owned(), Value::Text(table_name.into())),
245 ("field_name".to_owned(), Value::Text(field_name.into())),
246 ]);
247 Self {
248 kind: EntityEventKind::FieldAdded,
249 entity,
250 values,
251 updated_fields: Vec::new(),
252 old_values: None,
253 new_values: None,
254 changes: Vec::new(),
255 trace_chain: Vec::new(),
256 }
257 }
258
259 pub fn data_seeded(
264 entity: impl Into<String>,
265 table_name: impl Into<String>,
266 inserted: usize,
267 updated: usize,
268 ) -> Self {
269 let entity = entity.into();
270 let values = Record::from([
271 ("table_name".to_owned(), Value::Text(table_name.into())),
272 ("inserted".to_owned(), Value::I64(inserted as i64)),
273 ("updated".to_owned(), Value::I64(updated as i64)),
274 ]);
275 Self {
276 kind: EntityEventKind::DataSeeded,
277 entity,
278 values,
279 updated_fields: Vec::new(),
280 old_values: None,
281 new_values: None,
282 changes: Vec::new(),
283 trace_chain: Vec::new(),
284 }
285 }
286
287 fn changes_for_fields(
288 old_values: Option<&Record>,
289 new_values: Option<&Record>,
290 fields: &[String],
291 ) -> Vec<EntityPropertyChange> {
292 fields
293 .iter()
294 .map(|field| {
295 EntityPropertyChange::new(
296 field.clone(),
297 old_values.and_then(|values| values.get(field).cloned()),
298 new_values.and_then(|values| values.get(field).cloned()),
299 )
300 })
301 .collect()
302 }
303}
304
305pub trait EntityEventSink: Send + Sync {
306 fn on_event(&self, ctx: &UserContext, event: &EntityEvent) -> Result<(), RuntimeError>;
307}
308
309#[derive(Default, Clone)]
310pub struct InMemoryEntityEventSink {
311 sinks: Vec<Arc<dyn EntityEventSink>>,
312}
313
314impl InMemoryEntityEventSink {
315 pub fn new() -> Self {
316 Self::default()
317 }
318
319 pub fn register(&mut self, sink: impl EntityEventSink + 'static) {
320 self.sinks.push(Arc::new(sink));
321 }
322
323 pub fn with_sink(mut self, sink: impl EntityEventSink + 'static) -> Self {
324 self.register(sink);
325 self
326 }
327}
328
329impl EntityEventSink for InMemoryEntityEventSink {
330 fn on_event(&self, ctx: &UserContext, event: &EntityEvent) -> Result<(), RuntimeError> {
331 for sink in &self.sinks {
332 sink.on_event(ctx, event)?;
333 }
334 Ok(())
335 }
336}