Skip to main content

teaql_runtime/
event.rs

1use std::sync::Arc;
2
3use teaql_core::{Record, Value};
4
5use crate::{RuntimeError, UserContext};
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub enum EntityEventKind {
9    Created,
10    Updated,
11    Deleted,
12    Recovered,
13}
14
15#[derive(Debug, Clone, PartialEq)]
16pub struct EntityPropertyChange {
17    pub field: String,
18    pub old_value: Option<Value>,
19    pub new_value: Option<Value>,
20}
21
22impl EntityPropertyChange {
23    pub fn new(
24        field: impl Into<String>,
25        old_value: Option<Value>,
26        new_value: Option<Value>,
27    ) -> Self {
28        Self {
29            field: field.into(),
30            old_value,
31            new_value,
32        }
33    }
34}
35
36#[derive(Debug, Clone, PartialEq)]
37pub struct EntityEvent {
38    pub kind: EntityEventKind,
39    pub entity: String,
40    pub values: Record,
41    pub updated_fields: Vec<String>,
42    pub old_values: Option<Record>,
43    pub new_values: Option<Record>,
44    pub changes: Vec<EntityPropertyChange>,
45}
46
47impl EntityEvent {
48    pub fn created(entity: impl Into<String>, values: Record) -> Self {
49        let changes = values
50            .iter()
51            .map(|(field, value)| {
52                EntityPropertyChange::new(field.clone(), None, Some(value.clone()))
53            })
54            .collect();
55        Self {
56            kind: EntityEventKind::Created,
57            entity: entity.into(),
58            values: values.clone(),
59            updated_fields: Vec::new(),
60            old_values: None,
61            new_values: Some(values),
62            changes,
63        }
64    }
65
66    pub fn updated(entity: impl Into<String>, values: Record) -> Self {
67        let updated_fields = values.keys().cloned().collect::<Vec<_>>();
68        let changes = Self::changes_for_fields(None, Some(&values), &updated_fields);
69        Self {
70            kind: EntityEventKind::Updated,
71            entity: entity.into(),
72            values: values.clone(),
73            updated_fields,
74            old_values: None,
75            new_values: Some(values),
76            changes,
77        }
78    }
79
80    pub fn updated_with_old_values(
81        entity: impl Into<String>,
82        values: Record,
83        old_values: Option<Record>,
84        new_values: Record,
85        updated_fields: Vec<String>,
86    ) -> Self {
87        let changes =
88            Self::changes_for_fields(old_values.as_ref(), Some(&new_values), &updated_fields);
89        Self {
90            kind: EntityEventKind::Updated,
91            entity: entity.into(),
92            values,
93            updated_fields,
94            old_values,
95            new_values: Some(new_values),
96            changes,
97        }
98    }
99
100    pub fn deleted(entity: impl Into<String>, id: Value, expected_version: Option<i64>) -> Self {
101        let mut values = Record::from([("id".to_owned(), id)]);
102        if let Some(version) = expected_version {
103            values.insert("version".to_owned(), Value::I64(version));
104        }
105        Self {
106            kind: EntityEventKind::Deleted,
107            entity: entity.into(),
108            values,
109            updated_fields: Vec::new(),
110            old_values: None,
111            new_values: None,
112            changes: Vec::new(),
113        }
114    }
115
116    pub fn deleted_with_old_values(
117        entity: impl Into<String>,
118        id: Value,
119        expected_version: Option<i64>,
120        old_values: Option<Record>,
121    ) -> Self {
122        let mut event = Self::deleted(entity, id, expected_version);
123        event.changes = old_values
124            .as_ref()
125            .map(|values| {
126                values
127                    .iter()
128                    .map(|(field, value)| {
129                        EntityPropertyChange::new(field.clone(), Some(value.clone()), None)
130                    })
131                    .collect()
132            })
133            .unwrap_or_default();
134        event.old_values = old_values;
135        event
136    }
137
138    pub fn recovered(entity: impl Into<String>, id: Value, expected_version: i64) -> Self {
139        let values = Record::from([
140            ("id".to_owned(), id),
141            ("version".to_owned(), Value::I64(expected_version)),
142        ]);
143        Self {
144            kind: EntityEventKind::Recovered,
145            entity: entity.into(),
146            values,
147            updated_fields: Vec::new(),
148            old_values: None,
149            new_values: None,
150            changes: Vec::new(),
151        }
152    }
153
154    pub fn recovered_with_old_values(
155        entity: impl Into<String>,
156        id: Value,
157        expected_version: i64,
158        old_values: Option<Record>,
159    ) -> Self {
160        let recovered_version = -expected_version + 1;
161        let mut new_values = old_values.clone().unwrap_or_default();
162        new_values.insert("id".to_owned(), id.clone());
163        new_values.insert("version".to_owned(), Value::I64(recovered_version));
164        let mut event = Self::recovered(entity, id, expected_version);
165        event.old_values = old_values;
166        event.new_values = Some(new_values.clone());
167        event.changes = Self::changes_for_fields(
168            event.old_values.as_ref(),
169            Some(&new_values),
170            &["version".to_owned()],
171        );
172        event
173    }
174
175    fn changes_for_fields(
176        old_values: Option<&Record>,
177        new_values: Option<&Record>,
178        fields: &[String],
179    ) -> Vec<EntityPropertyChange> {
180        fields
181            .iter()
182            .map(|field| {
183                EntityPropertyChange::new(
184                    field.clone(),
185                    old_values.and_then(|values| values.get(field).cloned()),
186                    new_values.and_then(|values| values.get(field).cloned()),
187                )
188            })
189            .collect()
190    }
191}
192
193pub trait EntityEventSink: Send + Sync {
194    fn on_event(&self, ctx: &UserContext, event: &EntityEvent) -> Result<(), RuntimeError>;
195}
196
197#[derive(Default, Clone)]
198pub struct InMemoryEntityEventSink {
199    sinks: Vec<Arc<dyn EntityEventSink>>,
200}
201
202impl InMemoryEntityEventSink {
203    pub fn new() -> Self {
204        Self::default()
205    }
206
207    pub fn register(&mut self, sink: impl EntityEventSink + 'static) {
208        self.sinks.push(Arc::new(sink));
209    }
210
211    pub fn with_sink(mut self, sink: impl EntityEventSink + 'static) -> Self {
212        self.register(sink);
213        self
214    }
215}
216
217impl EntityEventSink for InMemoryEntityEventSink {
218    fn on_event(&self, ctx: &UserContext, event: &EntityEvent) -> Result<(), RuntimeError> {
219        for sink in &self.sinks {
220            sink.on_event(ctx, event)?;
221        }
222        Ok(())
223    }
224}