floxide_core/distributed/
event_log.rs

1use serde::{Deserialize, Serialize};
2use std::fmt::Debug;
3use std::sync::{Arc, Mutex};
4use std::time::{SystemTime, UNIX_EPOCH};
5use std::vec::Vec;
6use uuid::Uuid;
7
8use crate::merge::Merge;
9
10#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
11pub struct LoggedEvent<E> {
12    pub uuid: Uuid,
13    pub timestamp: u128,
14    pub event: E,
15}
16
17/// In-memory append-only event log with interior mutability.
18#[derive(Clone)]
19pub struct EventLog<E> {
20    events: Arc<Mutex<Vec<LoggedEvent<E>>>>,
21}
22
23impl<E> Default for EventLog<E> {
24    fn default() -> Self {
25        Self {
26            events: Arc::new(Mutex::new(Vec::new())),
27        }
28    }
29}
30
31impl<E: Debug + Clone> Debug for EventLog<E> {
32    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33        write!(
34            f,
35            "EventLog {{ {:?} }}",
36            self.events
37                .lock()
38                .unwrap()
39                .iter()
40                .map(|e| e.event.clone())
41                .collect::<Vec<_>>()
42        )
43    }
44}
45
46impl<E: Clone> Merge for EventLog<E> {
47    /// Merges another EventLog into this one, deduplicating by UUID and ordering by (timestamp, uuid).
48    ///
49    /// # Deadlock Prevention
50    /// - If `self` and `other` are the same Arc, merging is a no-op (prevents self-deadlock).
51    /// - Otherwise, always lock the two logs in a consistent order (by pointer address) to prevent lock order inversion deadlocks.
52    fn merge(&mut self, other: Self) {
53        let self_ptr = Arc::as_ptr(&self.events) as usize;
54        let other_ptr = Arc::as_ptr(&other.events) as usize;
55        if self_ptr == other_ptr {
56            // Prevent self-deadlock: merging a log with itself is a no-op
57            return;
58        }
59        // Lock in address order to prevent lock order inversion deadlocks
60        let (first, second) = if self_ptr < other_ptr {
61            (&self.events, &other.events)
62        } else {
63            (&other.events, &self.events)
64        };
65        let mut first_guard = first.lock().unwrap();
66        let mut second_guard = second.lock().unwrap();
67        // If self_ptr < other_ptr, first_guard is self, second_guard is other
68        // If self_ptr > other_ptr, first_guard is other, second_guard is self
69        // Always merge into self
70        if self_ptr < other_ptr {
71            first_guard.extend(second_guard.drain(..));
72            first_guard.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then(a.uuid.cmp(&b.uuid)));
73            first_guard.dedup_by_key(|e| e.uuid);
74        } else {
75            second_guard.extend(first_guard.drain(..));
76            second_guard.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then(a.uuid.cmp(&b.uuid)));
77            second_guard.dedup_by_key(|e| e.uuid);
78            // Move merged data back to self
79            *first_guard = second_guard.clone();
80        }
81    }
82}
83
84impl<E: Serialize> Serialize for EventLog<E> {
85    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
86    where
87        S: serde::Serializer,
88    {
89        let events = self.events.lock().unwrap();
90        events.serialize(serializer)
91    }
92}
93
94impl<'de, E: Deserialize<'de>> Deserialize<'de> for EventLog<E> {
95    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
96    where
97        D: serde::Deserializer<'de>,
98    {
99        let events = Vec::<LoggedEvent<E>>::deserialize(deserializer)?;
100        Ok(EventLog {
101            events: Arc::new(Mutex::new(events)),
102        })
103    }
104}
105
106impl<E> EventLog<E> {
107    pub fn new() -> Self {
108        Self {
109            events: Arc::new(Mutex::new(Vec::new())),
110        }
111    }
112
113    pub fn append(&self, event: E) {
114        let logged = LoggedEvent {
115            uuid: Uuid::new_v4(),
116            timestamp: SystemTime::now()
117                .duration_since(UNIX_EPOCH)
118                .unwrap()
119                .as_millis(),
120            event,
121        };
122        self.events.lock().unwrap().push(logged);
123    }
124
125    pub fn iter(&self) -> Vec<LoggedEvent<E>>
126    where
127        E: Clone,
128    {
129        self.events.lock().unwrap().clone()
130    }
131
132    /// Applies all events in order to the given state using the provided closure.
133    pub fn apply_all<S, F>(&self, state: &mut S, apply_fn: F)
134    where
135        F: Fn(&E, &mut S),
136        E: Clone,
137    {
138        for logged in self.iter() {
139            apply_fn(&logged.event, state);
140        }
141    }
142
143    /// Creates a state using Default, applies all events using the provided closure, and returns the resulting state.
144    pub fn apply_all_default<S, F>(&self, apply_fn: F) -> S
145    where
146        S: Default,
147        F: Fn(&E, &mut S),
148        E: Clone,
149    {
150        let mut state = S::default();
151        self.apply_all(&mut state, apply_fn);
152        state
153    }
154}
155
156#[cfg(test)]
157mod tests {
158    use super::*;
159    use std::collections::{HashMap, HashSet};
160
161    // Example event type
162    #[derive(Debug, Clone, PartialEq, Eq)]
163    enum WorkflowEvent {
164        WorkStarted { id: u64 },
165        WorkCompleted { id: u64, result: i32 },
166        WorkFailed { id: u64, reason: String },
167    }
168
169    // Example state type
170    #[derive(Debug, Default, PartialEq, Eq)]
171    struct WorkflowState {
172        running: HashSet<u64>,
173        completed: HashMap<u64, i32>,
174        failed: HashMap<u64, String>,
175    }
176
177    #[test]
178    fn test_append_and_iter() {
179        let log = EventLog::new();
180        log.append(WorkflowEvent::WorkStarted { id: 1 });
181        log.append(WorkflowEvent::WorkCompleted { id: 1, result: 42 });
182        let events: Vec<_> = log.iter();
183        assert_eq!(events.len(), 2);
184        assert_eq!(events[0].event, WorkflowEvent::WorkStarted { id: 1 });
185        assert_eq!(
186            events[1].event,
187            WorkflowEvent::WorkCompleted { id: 1, result: 42 }
188        );
189    }
190
191    #[test]
192    fn test_apply_all() {
193        let log = EventLog::new();
194        log.append(WorkflowEvent::WorkStarted { id: 1 });
195        log.append(WorkflowEvent::WorkStarted { id: 2 });
196        log.append(WorkflowEvent::WorkCompleted { id: 1, result: 10 });
197        log.append(WorkflowEvent::WorkFailed {
198            id: 2,
199            reason: "error".to_string(),
200        });
201
202        let mut state = WorkflowState::default();
203        log.apply_all(&mut state, |event, state| match event {
204            WorkflowEvent::WorkStarted { id } => {
205                state.running.insert(*id);
206            }
207            WorkflowEvent::WorkCompleted { id, result } => {
208                state.running.remove(id);
209                state.completed.insert(*id, *result);
210            }
211            WorkflowEvent::WorkFailed { id, reason } => {
212                state.running.remove(id);
213                state.failed.insert(*id, reason.clone());
214            }
215        });
216
217        assert!(!state.running.contains(&1));
218        assert!(!state.running.contains(&2));
219        assert_eq!(state.completed.get(&1), Some(&10));
220        assert_eq!(state.failed.get(&2), Some(&"error".to_string()));
221    }
222
223    #[test]
224    fn test_apply_all_default() {
225        let log = EventLog::new();
226        log.append(WorkflowEvent::WorkStarted { id: 1 });
227        log.append(WorkflowEvent::WorkStarted { id: 2 });
228        log.append(WorkflowEvent::WorkCompleted { id: 1, result: 10 });
229        log.append(WorkflowEvent::WorkFailed {
230            id: 2,
231            reason: "error".to_string(),
232        });
233
234        let state = log.apply_all_default(|event, state: &mut WorkflowState| match event {
235            WorkflowEvent::WorkStarted { id } => {
236                state.running.insert(*id);
237            }
238            WorkflowEvent::WorkCompleted { id, result } => {
239                state.running.remove(id);
240                state.completed.insert(*id, *result);
241            }
242            WorkflowEvent::WorkFailed { id, reason } => {
243                state.running.remove(id);
244                state.failed.insert(*id, reason.clone());
245            }
246        });
247
248        assert!(!state.running.contains(&1));
249        assert!(!state.running.contains(&2));
250        assert_eq!(state.completed.get(&1), Some(&10));
251        assert_eq!(state.failed.get(&2), Some(&"error".to_string()));
252    }
253}