1use std::sync::Arc;
2
3use teaql_core::{Record, Value};
4
5use crate::{RuntimeError, UserContext};
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub enum EntityEventKind {
9 Created,
10 Updated,
11 Deleted,
12 Recovered,
13}
14
15#[derive(Debug, Clone, PartialEq)]
16pub struct EntityPropertyChange {
17 pub field: String,
18 pub old_value: Option<Value>,
19 pub new_value: Option<Value>,
20}
21
22impl EntityPropertyChange {
23 pub fn new(
24 field: impl Into<String>,
25 old_value: Option<Value>,
26 new_value: Option<Value>,
27 ) -> Self {
28 Self {
29 field: field.into(),
30 old_value,
31 new_value,
32 }
33 }
34}
35
36#[derive(Debug, Clone, PartialEq)]
37pub struct EntityEvent {
38 pub kind: EntityEventKind,
39 pub entity: String,
40 pub values: Record,
41 pub updated_fields: Vec<String>,
42 pub old_values: Option<Record>,
43 pub new_values: Option<Record>,
44 pub changes: Vec<EntityPropertyChange>,
45 pub comment: Option<String>,
47}
48
49impl EntityEvent {
50 pub fn created(entity: impl Into<String>, values: Record) -> Self {
51 let changes = values
52 .iter()
53 .map(|(field, value)| {
54 EntityPropertyChange::new(field.clone(), None, Some(value.clone()))
55 })
56 .collect();
57 Self {
58 kind: EntityEventKind::Created,
59 entity: entity.into(),
60 values: values.clone(),
61 updated_fields: Vec::new(),
62 old_values: None,
63 new_values: Some(values),
64 changes,
65 comment: None,
66 }
67 }
68
69 pub fn updated(entity: impl Into<String>, values: Record) -> Self {
70 let updated_fields = values.keys().cloned().collect::<Vec<_>>();
71 let changes = Self::changes_for_fields(None, Some(&values), &updated_fields);
72 Self {
73 kind: EntityEventKind::Updated,
74 entity: entity.into(),
75 values: values.clone(),
76 updated_fields,
77 old_values: None,
78 new_values: Some(values),
79 changes,
80 comment: None,
81 }
82 }
83
84 pub fn updated_with_old_values(
85 entity: impl Into<String>,
86 values: Record,
87 old_values: Option<Record>,
88 new_values: Record,
89 updated_fields: Vec<String>,
90 ) -> Self {
91 let changes =
92 Self::changes_for_fields(old_values.as_ref(), Some(&new_values), &updated_fields);
93 Self {
94 kind: EntityEventKind::Updated,
95 entity: entity.into(),
96 values,
97 updated_fields,
98 old_values,
99 new_values: Some(new_values),
100 changes,
101 comment: None,
102 }
103 }
104
105 pub fn deleted(entity: impl Into<String>, id: Value, expected_version: Option<i64>) -> Self {
106 let mut values = Record::from([("id".to_owned(), id)]);
107 if let Some(version) = expected_version {
108 values.insert("version".to_owned(), Value::I64(version));
109 }
110 Self {
111 kind: EntityEventKind::Deleted,
112 entity: entity.into(),
113 values,
114 updated_fields: Vec::new(),
115 old_values: None,
116 new_values: None,
117 changes: Vec::new(),
118 comment: None,
119 }
120 }
121
122 pub fn deleted_with_old_values(
123 entity: impl Into<String>,
124 id: Value,
125 expected_version: Option<i64>,
126 old_values: Option<Record>,
127 ) -> Self {
128 let mut event = Self::deleted(entity, id, expected_version);
129 event.changes = old_values
130 .as_ref()
131 .map(|values| {
132 values
133 .iter()
134 .map(|(field, value)| {
135 EntityPropertyChange::new(field.clone(), Some(value.clone()), None)
136 })
137 .collect()
138 })
139 .unwrap_or_default();
140 event.old_values = old_values;
141 event
142 }
143
144 pub fn recovered(entity: impl Into<String>, id: Value, expected_version: i64) -> Self {
145 let values = Record::from([
146 ("id".to_owned(), id),
147 ("version".to_owned(), Value::I64(expected_version)),
148 ]);
149 Self {
150 kind: EntityEventKind::Recovered,
151 entity: entity.into(),
152 values,
153 updated_fields: Vec::new(),
154 old_values: None,
155 new_values: None,
156 changes: Vec::new(),
157 comment: None,
158 }
159 }
160
161 pub fn recovered_with_old_values(
162 entity: impl Into<String>,
163 id: Value,
164 expected_version: i64,
165 old_values: Option<Record>,
166 ) -> Self {
167 let recovered_version = -expected_version + 1;
168 let mut new_values = old_values.clone().unwrap_or_default();
169 new_values.insert("id".to_owned(), id.clone());
170 new_values.insert("version".to_owned(), Value::I64(recovered_version));
171 let mut event = Self::recovered(entity, id, expected_version);
172 event.old_values = old_values;
173 event.new_values = Some(new_values.clone());
174 event.changes = Self::changes_for_fields(
175 event.old_values.as_ref(),
176 Some(&new_values),
177 &["version".to_owned()],
178 );
179 event
180 }
181
182 fn changes_for_fields(
183 old_values: Option<&Record>,
184 new_values: Option<&Record>,
185 fields: &[String],
186 ) -> Vec<EntityPropertyChange> {
187 fields
188 .iter()
189 .map(|field| {
190 EntityPropertyChange::new(
191 field.clone(),
192 old_values.and_then(|values| values.get(field).cloned()),
193 new_values.and_then(|values| values.get(field).cloned()),
194 )
195 })
196 .collect()
197 }
198}
199
200pub trait EntityEventSink: Send + Sync {
201 fn on_event(&self, ctx: &UserContext, event: &EntityEvent) -> Result<(), RuntimeError>;
202}
203
204#[derive(Default, Clone)]
205pub struct InMemoryEntityEventSink {
206 sinks: Vec<Arc<dyn EntityEventSink>>,
207}
208
209impl InMemoryEntityEventSink {
210 pub fn new() -> Self {
211 Self::default()
212 }
213
214 pub fn register(&mut self, sink: impl EntityEventSink + 'static) {
215 self.sinks.push(Arc::new(sink));
216 }
217
218 pub fn with_sink(mut self, sink: impl EntityEventSink + 'static) -> Self {
219 self.register(sink);
220 self
221 }
222}
223
224impl EntityEventSink for InMemoryEntityEventSink {
225 fn on_event(&self, ctx: &UserContext, event: &EntityEvent) -> Result<(), RuntimeError> {
226 for sink in &self.sinks {
227 sink.on_event(ctx, event)?;
228 }
229 Ok(())
230 }
231}