Skip to main content

mqdb_core/
events.rs

1// Copyright 2025-2026 LabOverWire. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use serde::{Deserialize, Serialize};
5use std::sync::atomic::{AtomicU64, Ordering};
6
7static SEQUENCE: AtomicU64 = AtomicU64::new(0);
8
9#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
10pub enum Operation {
11    Create,
12    Update,
13    Delete,
14}
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct ChangeEvent {
18    pub sequence: u64,
19    pub entity: String,
20    pub id: String,
21    pub operation: Operation,
22    pub data: Option<serde_json::Value>,
23    #[serde(skip_serializing_if = "Option::is_none")]
24    pub operation_id: Option<String>,
25    #[serde(skip_serializing_if = "Option::is_none")]
26    pub sender: Option<String>,
27    #[serde(skip_serializing_if = "Option::is_none")]
28    pub client_id: Option<String>,
29    #[serde(skip_serializing_if = "Option::is_none")]
30    pub scope: Option<(String, String)>,
31    #[serde(skip)]
32    pub recipients: Option<Vec<String>>,
33}
34
35impl ChangeEvent {
36    pub fn new(
37        entity: String,
38        id: String,
39        operation: Operation,
40        data: Option<serde_json::Value>,
41    ) -> Self {
42        Self {
43            sequence: SEQUENCE.fetch_add(1, Ordering::SeqCst),
44            entity,
45            id,
46            operation,
47            data,
48            operation_id: None,
49            sender: None,
50            client_id: None,
51            scope: None,
52            recipients: None,
53        }
54    }
55
56    #[must_use]
57    pub fn with_operation_id(mut self, operation_id: String) -> Self {
58        self.operation_id = Some(operation_id);
59        self
60    }
61
62    #[must_use]
63    pub fn with_sender(mut self, sender: Option<String>) -> Self {
64        self.sender = sender;
65        self
66    }
67
68    #[must_use]
69    pub fn with_client_id(mut self, client_id: Option<String>) -> Self {
70        self.client_id = client_id;
71        self
72    }
73
74    #[must_use]
75    pub fn with_scope(mut self, scope: Option<(String, String)>) -> Self {
76        self.scope = scope;
77        self
78    }
79
80    #[must_use]
81    pub fn create(entity: String, id: String, data: serde_json::Value) -> Self {
82        Self::new(entity, id, Operation::Create, Some(data))
83    }
84
85    #[must_use]
86    pub fn update(entity: String, id: String, data: serde_json::Value) -> Self {
87        Self::new(entity, id, Operation::Update, Some(data))
88    }
89
90    #[must_use]
91    pub fn delete(entity: String, id: String, data: serde_json::Value) -> Self {
92        Self::new(entity, id, Operation::Delete, Some(data))
93    }
94
95    #[must_use]
96    pub fn event_topic(&self, num_partitions: u8) -> String {
97        let event_type = match self.operation {
98            Operation::Create => "created",
99            Operation::Update => "updated",
100            Operation::Delete => "deleted",
101        };
102
103        if let Some((ref scope_entity, ref scope_value)) = self.scope {
104            if *scope_entity == self.entity {
105                return format!("$DB/{scope_entity}/{scope_value}/events/{event_type}");
106            }
107            return format!(
108                "$DB/{scope_entity}/{scope_value}/{}/events/{event_type}",
109                self.entity
110            );
111        }
112
113        if num_partitions > 0 {
114            let partition = self.partition(num_partitions);
115            format!("$DB/{}/events/p{partition}/{}", self.entity, self.id)
116        } else {
117            format!("$DB/{}/events/{}", self.entity, self.id)
118        }
119    }
120
121    #[must_use]
122    #[allow(clippy::cast_possible_truncation)]
123    pub fn partition(&self, num_partitions: u8) -> u8 {
124        use std::collections::hash_map::DefaultHasher;
125        use std::hash::{Hash, Hasher};
126
127        if num_partitions == 0 {
128            return 0;
129        }
130
131        let key = format!("{}:{}", self.entity, self.id);
132        let mut hasher = DefaultHasher::new();
133        key.hash(&mut hasher);
134        (hasher.finish() % u64::from(num_partitions)) as u8
135    }
136}
137
138#[cfg(test)]
139mod tests {
140    use super::*;
141
142    #[test]
143    fn test_partition_zero_partitions() {
144        let event = ChangeEvent::create("users".into(), "1".into(), serde_json::json!({}));
145        assert_eq!(event.partition(0), 0);
146    }
147
148    #[test]
149    fn test_partition_determinism() {
150        let event = ChangeEvent::create("orders".into(), "123".into(), serde_json::json!({}));
151        let p1 = event.partition(8);
152        let p2 = event.partition(8);
153        assert_eq!(p1, p2);
154    }
155
156    #[test]
157    fn test_partition_distribution() {
158        let mut counts = [0u32; 8];
159        for i in 0..100 {
160            let event = ChangeEvent::create("orders".into(), i.to_string(), serde_json::json!({}));
161            let partition = event.partition(8);
162            counts[partition as usize] += 1;
163        }
164        for count in counts {
165            assert!(count > 0, "partition should have at least one event");
166        }
167    }
168
169    #[test]
170    fn test_partition_same_entity_same_id() {
171        let e1 = ChangeEvent::create("orders".into(), "42".into(), serde_json::json!({"a": 1}));
172        let e2 = ChangeEvent::update("orders".into(), "42".into(), serde_json::json!({"b": 2}));
173        let e3 = ChangeEvent::delete("orders".into(), "42".into(), serde_json::json!({"a": 1}));
174
175        assert_eq!(e1.partition(8), e2.partition(8));
176        assert_eq!(e2.partition(8), e3.partition(8));
177    }
178}