floxide_core/distributed/
event_log.rs1use serde::{Deserialize, Serialize};
2use std::fmt::Debug;
3use std::sync::{Arc, Mutex};
4use std::time::{SystemTime, UNIX_EPOCH};
5use std::vec::Vec;
6use uuid::Uuid;
7
8use crate::merge::Merge;
9
10#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
11pub struct LoggedEvent<E> {
12 pub uuid: Uuid,
13 pub timestamp: u128,
14 pub event: E,
15}
16
17#[derive(Clone)]
19pub struct EventLog<E> {
20 events: Arc<Mutex<Vec<LoggedEvent<E>>>>,
21}
22
23impl<E> Default for EventLog<E> {
24 fn default() -> Self {
25 Self {
26 events: Arc::new(Mutex::new(Vec::new())),
27 }
28 }
29}
30
31impl<E: Debug + Clone> Debug for EventLog<E> {
32 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33 write!(
34 f,
35 "EventLog {{ {:?} }}",
36 self.events
37 .lock()
38 .unwrap()
39 .iter()
40 .map(|e| e.event.clone())
41 .collect::<Vec<_>>()
42 )
43 }
44}
45
46impl<E: Clone> Merge for EventLog<E> {
47 fn merge(&mut self, other: Self) {
53 let self_ptr = Arc::as_ptr(&self.events) as usize;
54 let other_ptr = Arc::as_ptr(&other.events) as usize;
55 if self_ptr == other_ptr {
56 return;
58 }
59 let (first, second) = if self_ptr < other_ptr {
61 (&self.events, &other.events)
62 } else {
63 (&other.events, &self.events)
64 };
65 let mut first_guard = first.lock().unwrap();
66 let mut second_guard = second.lock().unwrap();
67 if self_ptr < other_ptr {
71 first_guard.extend(second_guard.drain(..));
72 first_guard.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then(a.uuid.cmp(&b.uuid)));
73 first_guard.dedup_by_key(|e| e.uuid);
74 } else {
75 second_guard.extend(first_guard.drain(..));
76 second_guard.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then(a.uuid.cmp(&b.uuid)));
77 second_guard.dedup_by_key(|e| e.uuid);
78 *first_guard = second_guard.clone();
80 }
81 }
82}
83
84impl<E: Serialize> Serialize for EventLog<E> {
85 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
86 where
87 S: serde::Serializer,
88 {
89 let events = self.events.lock().unwrap();
90 events.serialize(serializer)
91 }
92}
93
94impl<'de, E: Deserialize<'de>> Deserialize<'de> for EventLog<E> {
95 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
96 where
97 D: serde::Deserializer<'de>,
98 {
99 let events = Vec::<LoggedEvent<E>>::deserialize(deserializer)?;
100 Ok(EventLog {
101 events: Arc::new(Mutex::new(events)),
102 })
103 }
104}
105
106impl<E> EventLog<E> {
107 pub fn new() -> Self {
108 Self {
109 events: Arc::new(Mutex::new(Vec::new())),
110 }
111 }
112
113 pub fn append(&self, event: E) {
114 let logged = LoggedEvent {
115 uuid: Uuid::new_v4(),
116 timestamp: SystemTime::now()
117 .duration_since(UNIX_EPOCH)
118 .unwrap()
119 .as_millis(),
120 event,
121 };
122 self.events.lock().unwrap().push(logged);
123 }
124
125 pub fn iter(&self) -> Vec<LoggedEvent<E>>
126 where
127 E: Clone,
128 {
129 self.events.lock().unwrap().clone()
130 }
131
132 pub fn apply_all<S, F>(&self, state: &mut S, apply_fn: F)
134 where
135 F: Fn(&E, &mut S),
136 E: Clone,
137 {
138 for logged in self.iter() {
139 apply_fn(&logged.event, state);
140 }
141 }
142
143 pub fn apply_all_default<S, F>(&self, apply_fn: F) -> S
145 where
146 S: Default,
147 F: Fn(&E, &mut S),
148 E: Clone,
149 {
150 let mut state = S::default();
151 self.apply_all(&mut state, apply_fn);
152 state
153 }
154}
155
156#[cfg(test)]
157mod tests {
158 use super::*;
159 use std::collections::{HashMap, HashSet};
160
161 #[derive(Debug, Clone, PartialEq, Eq)]
163 enum WorkflowEvent {
164 WorkStarted { id: u64 },
165 WorkCompleted { id: u64, result: i32 },
166 WorkFailed { id: u64, reason: String },
167 }
168
169 #[derive(Debug, Default, PartialEq, Eq)]
171 struct WorkflowState {
172 running: HashSet<u64>,
173 completed: HashMap<u64, i32>,
174 failed: HashMap<u64, String>,
175 }
176
177 #[test]
178 fn test_append_and_iter() {
179 let log = EventLog::new();
180 log.append(WorkflowEvent::WorkStarted { id: 1 });
181 log.append(WorkflowEvent::WorkCompleted { id: 1, result: 42 });
182 let events: Vec<_> = log.iter();
183 assert_eq!(events.len(), 2);
184 assert_eq!(events[0].event, WorkflowEvent::WorkStarted { id: 1 });
185 assert_eq!(
186 events[1].event,
187 WorkflowEvent::WorkCompleted { id: 1, result: 42 }
188 );
189 }
190
191 #[test]
192 fn test_apply_all() {
193 let log = EventLog::new();
194 log.append(WorkflowEvent::WorkStarted { id: 1 });
195 log.append(WorkflowEvent::WorkStarted { id: 2 });
196 log.append(WorkflowEvent::WorkCompleted { id: 1, result: 10 });
197 log.append(WorkflowEvent::WorkFailed {
198 id: 2,
199 reason: "error".to_string(),
200 });
201
202 let mut state = WorkflowState::default();
203 log.apply_all(&mut state, |event, state| match event {
204 WorkflowEvent::WorkStarted { id } => {
205 state.running.insert(*id);
206 }
207 WorkflowEvent::WorkCompleted { id, result } => {
208 state.running.remove(id);
209 state.completed.insert(*id, *result);
210 }
211 WorkflowEvent::WorkFailed { id, reason } => {
212 state.running.remove(id);
213 state.failed.insert(*id, reason.clone());
214 }
215 });
216
217 assert!(!state.running.contains(&1));
218 assert!(!state.running.contains(&2));
219 assert_eq!(state.completed.get(&1), Some(&10));
220 assert_eq!(state.failed.get(&2), Some(&"error".to_string()));
221 }
222
223 #[test]
224 fn test_apply_all_default() {
225 let log = EventLog::new();
226 log.append(WorkflowEvent::WorkStarted { id: 1 });
227 log.append(WorkflowEvent::WorkStarted { id: 2 });
228 log.append(WorkflowEvent::WorkCompleted { id: 1, result: 10 });
229 log.append(WorkflowEvent::WorkFailed {
230 id: 2,
231 reason: "error".to_string(),
232 });
233
234 let state = log.apply_all_default(|event, state: &mut WorkflowState| match event {
235 WorkflowEvent::WorkStarted { id } => {
236 state.running.insert(*id);
237 }
238 WorkflowEvent::WorkCompleted { id, result } => {
239 state.running.remove(id);
240 state.completed.insert(*id, *result);
241 }
242 WorkflowEvent::WorkFailed { id, reason } => {
243 state.running.remove(id);
244 state.failed.insert(*id, reason.clone());
245 }
246 });
247
248 assert!(!state.running.contains(&1));
249 assert!(!state.running.contains(&2));
250 assert_eq!(state.completed.get(&1), Some(&10));
251 assert_eq!(state.failed.get(&2), Some(&"error".to_string()));
252 }
253}