1use std::sync::Arc;
2
3use rkyv::rancor::Failure;
4use rusted_ring::{AllocationError, EventAllocator};
5use xaeroflux_core::{
6 date_time::emit_secs,
7 event::{
8 CRDT_COUNTER_DECREMENT, CRDT_COUNTER_INCREMENT, CRDT_COUNTER_STATE, CRDT_REGISTER_STATE,
9 CRDT_REGISTER_WRITE, CRDT_SET_ADD, CRDT_SET_REMOVE, CRDT_SET_STATE, EventType, XaeroEvent,
10 },
11};
12
13#[derive(Clone)]
14pub enum Sort {
15 VectorClock,
17 OperationId,
19 LamportTimestamp,
21}
22
23#[allow(clippy::type_complexity)]
24impl Sort {
25 pub fn to_operator(
26 self,
27 _ea: &'static EventAllocator,
28 ) -> Arc<dyn Fn(&Arc<XaeroEvent>, &Arc<XaeroEvent>) -> std::cmp::Ordering + Send + Sync> {
29 match self {
30 Sort::VectorClock => Arc::new(
31 move |e1: &Arc<XaeroEvent>, e2: &Arc<XaeroEvent>| -> std::cmp::Ordering {
32 e1.latest_ts.cmp(&e2.latest_ts)
33 },
34 ),
35 _ => panic!("not supported yet"),
36 }
37 }
38}
39
40#[derive(Clone)]
41pub enum Fold {
42 LWWRegister,
44 ORSet,
46 GCounter,
48 PNCounter,
50}
51
52#[allow(clippy::type_complexity)]
53impl Fold {
54 pub fn to_operator(
55 self,
56 ea: &'static EventAllocator,
57 ) -> Arc<
58 dyn Fn(Arc<XaeroEvent>, Vec<Arc<XaeroEvent>>) -> Result<Arc<XaeroEvent>, AllocationError>
59 + Send
60 + Sync,
61 > {
62 match self {
63 Fold::LWWRegister => Arc::new(
64 move |acc: Arc<XaeroEvent>,
65 events: Vec<Arc<XaeroEvent>>|
66 -> Result<Arc<XaeroEvent>, AllocationError> {
67 let latest_write = events
69 .into_iter()
70 .filter(|e| {
71 matches!(
72 EventType::from_u8(e.event_type()),
73 EventType::ApplicationEvent(CRDT_REGISTER_WRITE)
74 )
75 })
76 .max_by_key(|event| event.latest_ts)
77 .unwrap_or(acc);
78
79 let allocated_event =
81 ea.allocate_event(latest_write.data(), CRDT_REGISTER_STATE as u32)?;
82 let evt = allocated_event.into_pooled_event_ptr();
83
84 Ok(Arc::new(XaeroEvent {
85 evt,
86 author_id: latest_write.author_id.clone(),
87 merkle_proof: latest_write.merkle_proof.clone(),
88 vector_clock: latest_write.vector_clock.clone(),
89 latest_ts: latest_write.latest_ts,
90 }))
91 },
92 ),
93
94 Fold::ORSet => Arc::new(
95 move |_acc: Arc<XaeroEvent>,
96 events: Vec<Arc<XaeroEvent>>|
97 -> Result<Arc<XaeroEvent>, AllocationError> {
98 use std::collections::HashSet;
99
100 let mut final_set: HashSet<&[u8]> = HashSet::new();
102 let mut removed_elements: HashSet<&[u8]> = HashSet::new();
103
104 for event in &events {
105 match EventType::from_u8(event.event_type()) {
106 EventType::ApplicationEvent(CRDT_SET_ADD) => {
107 final_set.insert(event.data());
108 }
109 EventType::ApplicationEvent(CRDT_SET_REMOVE) => {
110 removed_elements.insert(event.data());
111 }
112 _ => {}
113 }
114 }
115
116 for removed in &removed_elements {
118 final_set.remove(removed);
119 }
120
121 let final_elements: Vec<Vec<u8>> =
123 final_set.iter().map(|data| data.to_vec()).collect();
124
125 let serialized = rkyv::to_bytes::<rkyv::rancor::Failure>(&final_elements)
127 .map_err(|_| AllocationError::EventCreation("Serialization failed"))?;
128
129 let allocated_event = ea.allocate_event(&serialized, CRDT_SET_STATE as u32)?;
131 let evt = allocated_event.into_pooled_event_ptr();
132
133 Ok(Arc::new(XaeroEvent {
134 evt,
135 author_id: None,
136 merkle_proof: None,
137 vector_clock: None,
138 latest_ts: emit_secs(),
139 }))
140 },
141 ),
142
143 Fold::GCounter => Arc::new(
144 move |_acc: Arc<XaeroEvent>,
145 events: Vec<Arc<XaeroEvent>>|
146 -> Result<Arc<XaeroEvent>, AllocationError> {
147 let mut counter_value: i64 = 0;
148
149 for event in &events {
151 match EventType::from_u8(event.event_type()) {
152 EventType::ApplicationEvent(CRDT_COUNTER_INCREMENT) => {
153 let data = event.data();
155 if data.len() >= 8 {
156 let bytes: [u8; 8] = data[0..8].try_into().unwrap_or([0; 8]);
157 counter_value += i64::from_le_bytes(bytes);
158 }
159 }
160 _ => {} }
162 }
163
164 let serialized = counter_value.to_le_bytes().to_vec();
166 let allocated_event =
167 ea.allocate_event(&serialized, CRDT_COUNTER_STATE as u32)?;
168 let evt = allocated_event.into_pooled_event_ptr();
169
170 Ok(Arc::new(XaeroEvent {
171 evt,
172 author_id: None,
173 merkle_proof: None,
174 vector_clock: None,
175 latest_ts: emit_secs(),
176 }))
177 },
178 ),
179
180 Fold::PNCounter => Arc::new(
181 move |_acc: Arc<XaeroEvent>,
182 events: Vec<Arc<XaeroEvent>>|
183 -> Result<Arc<XaeroEvent>, AllocationError> {
184 let mut counter_value: i64 = 0;
185
186 for event in &events {
188 let data = event.data();
189 match EventType::from_u8(event.event_type()) {
190 EventType::ApplicationEvent(CRDT_COUNTER_INCREMENT) => {
191 if data.len() >= 8 {
192 let bytes: [u8; 8] = data[0..8].try_into().unwrap_or([0; 8]);
193 counter_value += i64::from_le_bytes(bytes);
194 }
195 }
196 EventType::ApplicationEvent(CRDT_COUNTER_DECREMENT) => {
197 if data.len() >= 8 {
198 let bytes: [u8; 8] = data[0..8].try_into().unwrap_or([0; 8]);
199 counter_value -= i64::from_le_bytes(bytes);
200 }
201 }
202 _ => {}
203 }
204 }
205
206 let serialized = counter_value.to_le_bytes().to_vec();
208 let allocated_event =
209 ea.allocate_event(&serialized, CRDT_COUNTER_STATE as u32)?;
210 let evt = allocated_event.into_pooled_event_ptr();
211
212 Ok(Arc::new(XaeroEvent {
213 evt,
214 author_id: None,
215 merkle_proof: None,
216 vector_clock: None,
217 latest_ts: emit_secs(),
218 }))
219 },
220 ),
221 }
222 }
223}
224
225#[derive(Clone)]
226pub enum Reduce {
227 CounterValue,
229 SetContents,
231 RegisterValue,
233}
234
235#[allow(clippy::type_complexity)]
236impl Reduce {
237 pub fn to_operator(
238 self,
239 ea: &'static EventAllocator,
240 ) -> Arc<dyn Fn(Vec<Arc<XaeroEvent>>) -> Result<Arc<XaeroEvent>, AllocationError> + Send + Sync>
241 {
242 match self {
243 Reduce::CounterValue => Arc::new(
244 move |events: Vec<Arc<XaeroEvent>>| -> Result<Arc<XaeroEvent>, AllocationError> {
245 for event in &events {
247 if matches!(
248 EventType::from_u8(event.event_type()),
249 EventType::ApplicationEvent(CRDT_COUNTER_STATE)
250 ) {
251 let allocated_event =
253 ea.allocate_event(event.data(), CRDT_COUNTER_STATE as u32)?;
254 let evt = allocated_event.into_pooled_event_ptr();
255
256 return Ok(Arc::new(XaeroEvent {
257 evt,
258 author_id: None,
259 merkle_proof: None,
260 vector_clock: None,
261 latest_ts: emit_secs(),
262 }));
263 }
264 }
265
266 let default_value = 0i64.to_le_bytes().to_vec();
268 let allocated_event =
269 ea.allocate_event(&default_value, CRDT_COUNTER_STATE as u32)?;
270 let evt = allocated_event.into_pooled_event_ptr();
271
272 Ok(Arc::new(XaeroEvent {
273 evt,
274 author_id: None,
275 merkle_proof: None,
276 vector_clock: None,
277 latest_ts: emit_secs(),
278 }))
279 },
280 ),
281
282 Reduce::SetContents => Arc::new(
283 move |events: Vec<Arc<XaeroEvent>>| -> Result<Arc<XaeroEvent>, AllocationError> {
284 for event in &events {
286 if matches!(
287 EventType::from_u8(event.event_type()),
288 EventType::ApplicationEvent(CRDT_SET_STATE)
289 ) {
290 let allocated_event =
292 ea.allocate_event(event.data(), CRDT_SET_STATE as u32)?;
293 let evt = allocated_event.into_pooled_event_ptr();
294
295 return Ok(Arc::new(XaeroEvent {
296 evt,
297 author_id: None,
298 merkle_proof: None,
299 vector_clock: None,
300 latest_ts: emit_secs(),
301 }));
302 }
303 }
304
305 let empty_set: Vec<Vec<u8>> = Vec::new();
307 let serialized = rkyv::to_bytes::<Failure>(&empty_set)
308 .map_err(|_| AllocationError::EventCreation("Serialization failed"))?;
309
310 let allocated_event = ea.allocate_event(&serialized, CRDT_SET_STATE as u32)?;
311 let evt = allocated_event.into_pooled_event_ptr();
312
313 Ok(Arc::new(XaeroEvent {
314 evt,
315 author_id: None,
316 merkle_proof: None,
317 vector_clock: None,
318 latest_ts: emit_secs(),
319 }))
320 },
321 ),
322
323 Reduce::RegisterValue => Arc::new(
324 move |events: Vec<Arc<XaeroEvent>>| -> Result<Arc<XaeroEvent>, AllocationError> {
325 for event in &events {
327 if matches!(
328 EventType::from_u8(event.event_type()),
329 EventType::ApplicationEvent(CRDT_REGISTER_STATE)
330 ) {
331 let allocated_event =
333 ea.allocate_event(event.data(), CRDT_REGISTER_STATE as u32)?;
334 let evt = allocated_event.into_pooled_event_ptr();
335
336 return Ok(Arc::new(XaeroEvent {
337 evt,
338 author_id: None,
339 merkle_proof: None,
340 vector_clock: None,
341 latest_ts: emit_secs(),
342 }));
343 }
344 }
345
346 let allocated_event = ea.allocate_event(&[], CRDT_REGISTER_STATE as u32)?;
348 let evt = allocated_event.into_pooled_event_ptr();
349
350 Ok(Arc::new(XaeroEvent {
351 evt,
352 author_id: None,
353 merkle_proof: None,
354 vector_clock: None,
355 latest_ts: emit_secs(),
356 }))
357 },
358 ),
359 }
360 }
361}