Skip to main content

xaeroflux_crdt/
lib.rs

1use std::sync::Arc;
2
3use rkyv::rancor::Failure;
4use rusted_ring::{AllocationError, EventAllocator};
5use xaeroflux_core::{
6    date_time::emit_secs,
7    event::{
8        CRDT_COUNTER_DECREMENT, CRDT_COUNTER_INCREMENT, CRDT_COUNTER_STATE, CRDT_REGISTER_STATE,
9        CRDT_REGISTER_WRITE, CRDT_SET_ADD, CRDT_SET_REMOVE, CRDT_SET_STATE, EventType, XaeroEvent,
10    },
11};
12
13#[derive(Clone)]
14pub enum Sort {
15    /// Sort by vector clock (true causal ordering)
16    VectorClock,
17    /// Sort by operation ID (deterministic global order)
18    OperationId,
19    /// Sort by custom lamport timestamp
20    LamportTimestamp,
21}
22
23#[allow(clippy::type_complexity)]
24impl Sort {
25    pub fn to_operator(
26        self,
27        _ea: &'static EventAllocator,
28    ) -> Arc<dyn Fn(&Arc<XaeroEvent>, &Arc<XaeroEvent>) -> std::cmp::Ordering + Send + Sync> {
29        match self {
30            Sort::VectorClock => Arc::new(
31                move |e1: &Arc<XaeroEvent>, e2: &Arc<XaeroEvent>| -> std::cmp::Ordering {
32                    e1.latest_ts.cmp(&e2.latest_ts)
33                },
34            ),
35            _ => panic!("not supported yet"),
36        }
37    }
38}
39
40#[derive(Clone)]
41pub enum Fold {
42    /// Last-Writer-Wins Register
43    LWWRegister,
44    /// Observed-Remove Set
45    ORSet,
46    /// G-Counter (increment-only)
47    GCounter,
48    /// PN-Counter (increment/decrement)
49    PNCounter,
50}
51
52#[allow(clippy::type_complexity)]
53impl Fold {
54    pub fn to_operator(
55        self,
56        ea: &'static EventAllocator,
57    ) -> Arc<
58        dyn Fn(Arc<XaeroEvent>, Vec<Arc<XaeroEvent>>) -> Result<Arc<XaeroEvent>, AllocationError>
59            + Send
60            + Sync,
61    > {
62        match self {
63            Fold::LWWRegister => Arc::new(
64                move |acc: Arc<XaeroEvent>,
65                      events: Vec<Arc<XaeroEvent>>|
66                      -> Result<Arc<XaeroEvent>, AllocationError> {
67                    // Find the latest write operation
68                    let latest_write = events
69                        .into_iter()
70                        .filter(|e| {
71                            matches!(
72                                EventType::from_u8(e.event_type()),
73                                EventType::ApplicationEvent(CRDT_REGISTER_WRITE)
74                            )
75                        })
76                        .max_by_key(|event| event.latest_ts)
77                        .unwrap_or(acc);
78
79                    // Create new state event with latest write data
80                    let allocated_event =
81                        ea.allocate_event(latest_write.data(), CRDT_REGISTER_STATE as u32)?;
82                    let evt = allocated_event.into_pooled_event_ptr();
83
84                    Ok(Arc::new(XaeroEvent {
85                        evt,
86                        author_id: latest_write.author_id.clone(),
87                        merkle_proof: latest_write.merkle_proof.clone(),
88                        vector_clock: latest_write.vector_clock.clone(),
89                        latest_ts: latest_write.latest_ts,
90                    }))
91                },
92            ),
93
94            Fold::ORSet => Arc::new(
95                move |_acc: Arc<XaeroEvent>,
96                      events: Vec<Arc<XaeroEvent>>|
97                      -> Result<Arc<XaeroEvent>, AllocationError> {
98                    use std::collections::HashSet;
99
100                    // Use references to avoid copying data during processing
101                    let mut final_set: HashSet<&[u8]> = HashSet::new();
102                    let mut removed_elements: HashSet<&[u8]> = HashSet::new();
103
104                    for event in &events {
105                        match EventType::from_u8(event.event_type()) {
106                            EventType::ApplicationEvent(CRDT_SET_ADD) => {
107                                final_set.insert(event.data());
108                            }
109                            EventType::ApplicationEvent(CRDT_SET_REMOVE) => {
110                                removed_elements.insert(event.data());
111                            }
112                            _ => {}
113                        }
114                    }
115
116                    // Remove elements
117                    for removed in &removed_elements {
118                        final_set.remove(removed);
119                    }
120
121                    // Convert to owned data for serialization
122                    let final_elements: Vec<Vec<u8>> =
123                        final_set.iter().map(|data| data.to_vec()).collect();
124
125                    // Use rkyv for serialization
126                    let serialized = rkyv::to_bytes::<rkyv::rancor::Failure>(&final_elements)
127                        .map_err(|_| AllocationError::EventCreation("Serialization failed"))?;
128
129                    // Allocate in ring buffer using new API
130                    let allocated_event = ea.allocate_event(&serialized, CRDT_SET_STATE as u32)?;
131                    let evt = allocated_event.into_pooled_event_ptr();
132
133                    Ok(Arc::new(XaeroEvent {
134                        evt,
135                        author_id: None,
136                        merkle_proof: None,
137                        vector_clock: None,
138                        latest_ts: emit_secs(),
139                    }))
140                },
141            ),
142
143            Fold::GCounter => Arc::new(
144                move |_acc: Arc<XaeroEvent>,
145                      events: Vec<Arc<XaeroEvent>>|
146                      -> Result<Arc<XaeroEvent>, AllocationError> {
147                    let mut counter_value: i64 = 0;
148
149                    // Sum all increment operations
150                    for event in &events {
151                        match EventType::from_u8(event.event_type()) {
152                            EventType::ApplicationEvent(CRDT_COUNTER_INCREMENT) => {
153                                // Simple: treat event data as raw i64 bytes
154                                let data = event.data();
155                                if data.len() >= 8 {
156                                    let bytes: [u8; 8] = data[0..8].try_into().unwrap_or([0; 8]);
157                                    counter_value += i64::from_le_bytes(bytes);
158                                }
159                            }
160                            _ => {} // G-Counter only increments
161                        }
162                    }
163
164                    // Store as raw i64 bytes
165                    let serialized = counter_value.to_le_bytes().to_vec();
166                    let allocated_event =
167                        ea.allocate_event(&serialized, CRDT_COUNTER_STATE as u32)?;
168                    let evt = allocated_event.into_pooled_event_ptr();
169
170                    Ok(Arc::new(XaeroEvent {
171                        evt,
172                        author_id: None,
173                        merkle_proof: None,
174                        vector_clock: None,
175                        latest_ts: emit_secs(),
176                    }))
177                },
178            ),
179
180            Fold::PNCounter => Arc::new(
181                move |_acc: Arc<XaeroEvent>,
182                      events: Vec<Arc<XaeroEvent>>|
183                      -> Result<Arc<XaeroEvent>, AllocationError> {
184                    let mut counter_value: i64 = 0;
185
186                    // Process increment and decrement operations
187                    for event in &events {
188                        let data = event.data();
189                        match EventType::from_u8(event.event_type()) {
190                            EventType::ApplicationEvent(CRDT_COUNTER_INCREMENT) => {
191                                if data.len() >= 8 {
192                                    let bytes: [u8; 8] = data[0..8].try_into().unwrap_or([0; 8]);
193                                    counter_value += i64::from_le_bytes(bytes);
194                                }
195                            }
196                            EventType::ApplicationEvent(CRDT_COUNTER_DECREMENT) => {
197                                if data.len() >= 8 {
198                                    let bytes: [u8; 8] = data[0..8].try_into().unwrap_or([0; 8]);
199                                    counter_value -= i64::from_le_bytes(bytes);
200                                }
201                            }
202                            _ => {}
203                        }
204                    }
205
206                    // Store as raw i64 bytes
207                    let serialized = counter_value.to_le_bytes().to_vec();
208                    let allocated_event =
209                        ea.allocate_event(&serialized, CRDT_COUNTER_STATE as u32)?;
210                    let evt = allocated_event.into_pooled_event_ptr();
211
212                    Ok(Arc::new(XaeroEvent {
213                        evt,
214                        author_id: None,
215                        merkle_proof: None,
216                        vector_clock: None,
217                        latest_ts: emit_secs(),
218                    }))
219                },
220            ),
221        }
222    }
223}
224
225#[derive(Clone)]
226pub enum Reduce {
227    /// Reduce to final counter value
228    CounterValue,
229    /// Reduce to final set contents
230    SetContents,
231    /// Reduce to final register value
232    RegisterValue,
233}
234
235#[allow(clippy::type_complexity)]
236impl Reduce {
237    pub fn to_operator(
238        self,
239        ea: &'static EventAllocator,
240    ) -> Arc<dyn Fn(Vec<Arc<XaeroEvent>>) -> Result<Arc<XaeroEvent>, AllocationError> + Send + Sync>
241    {
242        match self {
243            Reduce::CounterValue => Arc::new(
244                move |events: Vec<Arc<XaeroEvent>>| -> Result<Arc<XaeroEvent>, AllocationError> {
245                    // Extract the final counter value from the state event
246                    for event in &events {
247                        if matches!(
248                            EventType::from_u8(event.event_type()),
249                            EventType::ApplicationEvent(CRDT_COUNTER_STATE)
250                        ) {
251                            // Return the counter state as a new event
252                            let allocated_event =
253                                ea.allocate_event(event.data(), CRDT_COUNTER_STATE as u32)?;
254                            let evt = allocated_event.into_pooled_event_ptr();
255
256                            return Ok(Arc::new(XaeroEvent {
257                                evt,
258                                author_id: None,
259                                merkle_proof: None,
260                                vector_clock: None,
261                                latest_ts: emit_secs(),
262                            }));
263                        }
264                    }
265
266                    // Default to 0 if no state found
267                    let default_value = 0i64.to_le_bytes().to_vec();
268                    let allocated_event =
269                        ea.allocate_event(&default_value, CRDT_COUNTER_STATE as u32)?;
270                    let evt = allocated_event.into_pooled_event_ptr();
271
272                    Ok(Arc::new(XaeroEvent {
273                        evt,
274                        author_id: None,
275                        merkle_proof: None,
276                        vector_clock: None,
277                        latest_ts: emit_secs(),
278                    }))
279                },
280            ),
281
282            Reduce::SetContents => Arc::new(
283                move |events: Vec<Arc<XaeroEvent>>| -> Result<Arc<XaeroEvent>, AllocationError> {
284                    // Extract the final set contents from the state event
285                    for event in &events {
286                        if matches!(
287                            EventType::from_u8(event.event_type()),
288                            EventType::ApplicationEvent(CRDT_SET_STATE)
289                        ) {
290                            // Return the set state as a new event
291                            let allocated_event =
292                                ea.allocate_event(event.data(), CRDT_SET_STATE as u32)?;
293                            let evt = allocated_event.into_pooled_event_ptr();
294
295                            return Ok(Arc::new(XaeroEvent {
296                                evt,
297                                author_id: None,
298                                merkle_proof: None,
299                                vector_clock: None,
300                                latest_ts: emit_secs(),
301                            }));
302                        }
303                    }
304
305                    // Default to empty set
306                    let empty_set: Vec<Vec<u8>> = Vec::new();
307                    let serialized = rkyv::to_bytes::<Failure>(&empty_set)
308                        .map_err(|_| AllocationError::EventCreation("Serialization failed"))?;
309
310                    let allocated_event = ea.allocate_event(&serialized, CRDT_SET_STATE as u32)?;
311                    let evt = allocated_event.into_pooled_event_ptr();
312
313                    Ok(Arc::new(XaeroEvent {
314                        evt,
315                        author_id: None,
316                        merkle_proof: None,
317                        vector_clock: None,
318                        latest_ts: emit_secs(),
319                    }))
320                },
321            ),
322
323            Reduce::RegisterValue => Arc::new(
324                move |events: Vec<Arc<XaeroEvent>>| -> Result<Arc<XaeroEvent>, AllocationError> {
325                    // Extract the final register value from the state event
326                    for event in &events {
327                        if matches!(
328                            EventType::from_u8(event.event_type()),
329                            EventType::ApplicationEvent(CRDT_REGISTER_STATE)
330                        ) {
331                            // Return the register state as a new event
332                            let allocated_event =
333                                ea.allocate_event(event.data(), CRDT_REGISTER_STATE as u32)?;
334                            let evt = allocated_event.into_pooled_event_ptr();
335
336                            return Ok(Arc::new(XaeroEvent {
337                                evt,
338                                author_id: None,
339                                merkle_proof: None,
340                                vector_clock: None,
341                                latest_ts: emit_secs(),
342                            }));
343                        }
344                    }
345
346                    // Default to empty value
347                    let allocated_event = ea.allocate_event(&[], CRDT_REGISTER_STATE as u32)?;
348                    let evt = allocated_event.into_pooled_event_ptr();
349
350                    Ok(Arc::new(XaeroEvent {
351                        evt,
352                        author_id: None,
353                        merkle_proof: None,
354                        vector_clock: None,
355                        latest_ts: emit_secs(),
356                    }))
357                },
358            ),
359        }
360    }
361}