1use std::sync::Arc;
2
3use teaql_core::{Record, Value};
4
5use crate::{RuntimeError, UserContext};
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub enum EntityEventKind {
9 Created,
10 Updated,
11 Deleted,
12 Recovered,
13}
14
15#[derive(Debug, Clone, PartialEq)]
16pub struct EntityPropertyChange {
17 pub field: String,
18 pub old_value: Option<Value>,
19 pub new_value: Option<Value>,
20}
21
22impl EntityPropertyChange {
23 pub fn new(
24 field: impl Into<String>,
25 old_value: Option<Value>,
26 new_value: Option<Value>,
27 ) -> Self {
28 Self {
29 field: field.into(),
30 old_value,
31 new_value,
32 }
33 }
34}
35
36#[derive(Debug, Clone, PartialEq)]
37pub struct EntityEvent {
38 pub kind: EntityEventKind,
39 pub entity: String,
40 pub values: Record,
41 pub updated_fields: Vec<String>,
42 pub old_values: Option<Record>,
43 pub new_values: Option<Record>,
44 pub changes: Vec<EntityPropertyChange>,
45}
46
47impl EntityEvent {
48 pub fn created(entity: impl Into<String>, values: Record) -> Self {
49 let changes = values
50 .iter()
51 .map(|(field, value)| {
52 EntityPropertyChange::new(field.clone(), None, Some(value.clone()))
53 })
54 .collect();
55 Self {
56 kind: EntityEventKind::Created,
57 entity: entity.into(),
58 values: values.clone(),
59 updated_fields: Vec::new(),
60 old_values: None,
61 new_values: Some(values),
62 changes,
63 }
64 }
65
66 pub fn updated(entity: impl Into<String>, values: Record) -> Self {
67 let updated_fields = values.keys().cloned().collect::<Vec<_>>();
68 let changes = Self::changes_for_fields(None, Some(&values), &updated_fields);
69 Self {
70 kind: EntityEventKind::Updated,
71 entity: entity.into(),
72 values: values.clone(),
73 updated_fields,
74 old_values: None,
75 new_values: Some(values),
76 changes,
77 }
78 }
79
80 pub fn updated_with_old_values(
81 entity: impl Into<String>,
82 values: Record,
83 old_values: Option<Record>,
84 new_values: Record,
85 updated_fields: Vec<String>,
86 ) -> Self {
87 let changes =
88 Self::changes_for_fields(old_values.as_ref(), Some(&new_values), &updated_fields);
89 Self {
90 kind: EntityEventKind::Updated,
91 entity: entity.into(),
92 values,
93 updated_fields,
94 old_values,
95 new_values: Some(new_values),
96 changes,
97 }
98 }
99
100 pub fn deleted(entity: impl Into<String>, id: Value, expected_version: Option<i64>) -> Self {
101 let mut values = Record::from([("id".to_owned(), id)]);
102 if let Some(version) = expected_version {
103 values.insert("version".to_owned(), Value::I64(version));
104 }
105 Self {
106 kind: EntityEventKind::Deleted,
107 entity: entity.into(),
108 values,
109 updated_fields: Vec::new(),
110 old_values: None,
111 new_values: None,
112 changes: Vec::new(),
113 }
114 }
115
116 pub fn deleted_with_old_values(
117 entity: impl Into<String>,
118 id: Value,
119 expected_version: Option<i64>,
120 old_values: Option<Record>,
121 ) -> Self {
122 let mut event = Self::deleted(entity, id, expected_version);
123 event.changes = old_values
124 .as_ref()
125 .map(|values| {
126 values
127 .iter()
128 .map(|(field, value)| {
129 EntityPropertyChange::new(field.clone(), Some(value.clone()), None)
130 })
131 .collect()
132 })
133 .unwrap_or_default();
134 event.old_values = old_values;
135 event
136 }
137
138 pub fn recovered(entity: impl Into<String>, id: Value, expected_version: i64) -> Self {
139 let values = Record::from([
140 ("id".to_owned(), id),
141 ("version".to_owned(), Value::I64(expected_version)),
142 ]);
143 Self {
144 kind: EntityEventKind::Recovered,
145 entity: entity.into(),
146 values,
147 updated_fields: Vec::new(),
148 old_values: None,
149 new_values: None,
150 changes: Vec::new(),
151 }
152 }
153
154 pub fn recovered_with_old_values(
155 entity: impl Into<String>,
156 id: Value,
157 expected_version: i64,
158 old_values: Option<Record>,
159 ) -> Self {
160 let recovered_version = -expected_version + 1;
161 let mut new_values = old_values.clone().unwrap_or_default();
162 new_values.insert("id".to_owned(), id.clone());
163 new_values.insert("version".to_owned(), Value::I64(recovered_version));
164 let mut event = Self::recovered(entity, id, expected_version);
165 event.old_values = old_values;
166 event.new_values = Some(new_values.clone());
167 event.changes = Self::changes_for_fields(
168 event.old_values.as_ref(),
169 Some(&new_values),
170 &["version".to_owned()],
171 );
172 event
173 }
174
175 fn changes_for_fields(
176 old_values: Option<&Record>,
177 new_values: Option<&Record>,
178 fields: &[String],
179 ) -> Vec<EntityPropertyChange> {
180 fields
181 .iter()
182 .map(|field| {
183 EntityPropertyChange::new(
184 field.clone(),
185 old_values.and_then(|values| values.get(field).cloned()),
186 new_values.and_then(|values| values.get(field).cloned()),
187 )
188 })
189 .collect()
190 }
191}
192
193pub trait EntityEventSink: Send + Sync {
194 fn on_event(&self, ctx: &UserContext, event: &EntityEvent) -> Result<(), RuntimeError>;
195}
196
197#[derive(Default, Clone)]
198pub struct InMemoryEntityEventSink {
199 sinks: Vec<Arc<dyn EntityEventSink>>,
200}
201
202impl InMemoryEntityEventSink {
203 pub fn new() -> Self {
204 Self::default()
205 }
206
207 pub fn register(&mut self, sink: impl EntityEventSink + 'static) {
208 self.sinks.push(Arc::new(sink));
209 }
210
211 pub fn with_sink(mut self, sink: impl EntityEventSink + 'static) -> Self {
212 self.register(sink);
213 self
214 }
215}
216
217impl EntityEventSink for InMemoryEntityEventSink {
218 fn on_event(&self, ctx: &UserContext, event: &EntityEvent) -> Result<(), RuntimeError> {
219 for sink in &self.sinks {
220 sink.on_event(ctx, event)?;
221 }
222 Ok(())
223 }
224}