1use serde::{Deserialize, Serialize};
5use std::sync::atomic::{AtomicU64, Ordering};
6
7static SEQUENCE: AtomicU64 = AtomicU64::new(0);
8
9#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
10pub enum Operation {
11 Create,
12 Update,
13 Delete,
14}
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct ChangeEvent {
18 pub sequence: u64,
19 pub entity: String,
20 pub id: String,
21 pub operation: Operation,
22 pub data: Option<serde_json::Value>,
23 #[serde(skip_serializing_if = "Option::is_none")]
24 pub operation_id: Option<String>,
25 #[serde(skip_serializing_if = "Option::is_none")]
26 pub sender: Option<String>,
27 #[serde(skip_serializing_if = "Option::is_none")]
28 pub client_id: Option<String>,
29 #[serde(skip_serializing_if = "Option::is_none")]
30 pub scope: Option<(String, String)>,
31 #[serde(skip)]
32 pub recipients: Option<Vec<String>>,
33}
34
35impl ChangeEvent {
36 pub fn new(
37 entity: String,
38 id: String,
39 operation: Operation,
40 data: Option<serde_json::Value>,
41 ) -> Self {
42 Self {
43 sequence: SEQUENCE.fetch_add(1, Ordering::SeqCst),
44 entity,
45 id,
46 operation,
47 data,
48 operation_id: None,
49 sender: None,
50 client_id: None,
51 scope: None,
52 recipients: None,
53 }
54 }
55
56 #[must_use]
57 pub fn with_operation_id(mut self, operation_id: String) -> Self {
58 self.operation_id = Some(operation_id);
59 self
60 }
61
62 #[must_use]
63 pub fn with_sender(mut self, sender: Option<String>) -> Self {
64 self.sender = sender;
65 self
66 }
67
68 #[must_use]
69 pub fn with_client_id(mut self, client_id: Option<String>) -> Self {
70 self.client_id = client_id;
71 self
72 }
73
74 #[must_use]
75 pub fn with_scope(mut self, scope: Option<(String, String)>) -> Self {
76 self.scope = scope;
77 self
78 }
79
80 #[must_use]
81 pub fn create(entity: String, id: String, data: serde_json::Value) -> Self {
82 Self::new(entity, id, Operation::Create, Some(data))
83 }
84
85 #[must_use]
86 pub fn update(entity: String, id: String, data: serde_json::Value) -> Self {
87 Self::new(entity, id, Operation::Update, Some(data))
88 }
89
90 #[must_use]
91 pub fn delete(entity: String, id: String, data: serde_json::Value) -> Self {
92 Self::new(entity, id, Operation::Delete, Some(data))
93 }
94
95 #[must_use]
96 pub fn event_topic(&self, num_partitions: u8) -> String {
97 let event_type = match self.operation {
98 Operation::Create => "created",
99 Operation::Update => "updated",
100 Operation::Delete => "deleted",
101 };
102
103 if let Some((ref scope_entity, ref scope_value)) = self.scope {
104 if *scope_entity == self.entity {
105 return format!("$DB/{scope_entity}/{scope_value}/events/{event_type}");
106 }
107 return format!(
108 "$DB/{scope_entity}/{scope_value}/{}/events/{event_type}",
109 self.entity
110 );
111 }
112
113 if num_partitions > 0 {
114 let partition = self.partition(num_partitions);
115 format!("$DB/{}/events/p{partition}/{}", self.entity, self.id)
116 } else {
117 format!("$DB/{}/events/{}", self.entity, self.id)
118 }
119 }
120
121 #[must_use]
122 #[allow(clippy::cast_possible_truncation)]
123 pub fn partition(&self, num_partitions: u8) -> u8 {
124 use std::collections::hash_map::DefaultHasher;
125 use std::hash::{Hash, Hasher};
126
127 if num_partitions == 0 {
128 return 0;
129 }
130
131 let key = format!("{}:{}", self.entity, self.id);
132 let mut hasher = DefaultHasher::new();
133 key.hash(&mut hasher);
134 (hasher.finish() % u64::from(num_partitions)) as u8
135 }
136}
137
138#[cfg(test)]
139mod tests {
140 use super::*;
141
142 #[test]
143 fn test_partition_zero_partitions() {
144 let event = ChangeEvent::create("users".into(), "1".into(), serde_json::json!({}));
145 assert_eq!(event.partition(0), 0);
146 }
147
148 #[test]
149 fn test_partition_determinism() {
150 let event = ChangeEvent::create("orders".into(), "123".into(), serde_json::json!({}));
151 let p1 = event.partition(8);
152 let p2 = event.partition(8);
153 assert_eq!(p1, p2);
154 }
155
156 #[test]
157 fn test_partition_distribution() {
158 let mut counts = [0u32; 8];
159 for i in 0..100 {
160 let event = ChangeEvent::create("orders".into(), i.to_string(), serde_json::json!({}));
161 let partition = event.partition(8);
162 counts[partition as usize] += 1;
163 }
164 for count in counts {
165 assert!(count > 0, "partition should have at least one event");
166 }
167 }
168
169 #[test]
170 fn test_partition_same_entity_same_id() {
171 let e1 = ChangeEvent::create("orders".into(), "42".into(), serde_json::json!({"a": 1}));
172 let e2 = ChangeEvent::update("orders".into(), "42".into(), serde_json::json!({"b": 2}));
173 let e3 = ChangeEvent::delete("orders".into(), "42".into(), serde_json::json!({"a": 1}));
174
175 assert_eq!(e1.partition(8), e2.partition(8));
176 assert_eq!(e2.partition(8), e3.partition(8));
177 }
178}