1use crate::ChangeType;
4use rkyv::{Archive, Deserialize, Serialize};
5use serde::{Deserialize as SerdeDeserialize, Serialize as SerdeSerialize};
6
7#[derive(Debug, Clone, PartialEq, Archive, Serialize, Deserialize, SerdeSerialize, SerdeDeserialize)]
12pub struct ChangeLogEntry {
13 pub lsn: u64,
15 pub timestamp: u64,
17 pub entity_type: String,
19 pub entity_id: [u8; 16],
21 pub change_type: ChangeType,
23 pub changed_fields: Vec<String>,
25 pub before_data: Option<Vec<u8>>,
27 pub after_data: Option<Vec<u8>>,
29 pub schema_version: u64,
31}
32
33impl ChangeLogEntry {
34 pub fn insert(
36 entity_type: impl Into<String>,
37 entity_id: [u8; 16],
38 after_data: Vec<u8>,
39 changed_fields: Vec<String>,
40 schema_version: u64,
41 ) -> Self {
42 Self {
43 lsn: 0, timestamp: Self::current_timestamp(),
45 entity_type: entity_type.into(),
46 entity_id,
47 change_type: ChangeType::Insert,
48 changed_fields,
49 before_data: None,
50 after_data: Some(after_data),
51 schema_version,
52 }
53 }
54
55 pub fn update(
57 entity_type: impl Into<String>,
58 entity_id: [u8; 16],
59 before_data: Vec<u8>,
60 after_data: Vec<u8>,
61 changed_fields: Vec<String>,
62 schema_version: u64,
63 ) -> Self {
64 Self {
65 lsn: 0,
66 timestamp: Self::current_timestamp(),
67 entity_type: entity_type.into(),
68 entity_id,
69 change_type: ChangeType::Update,
70 changed_fields,
71 before_data: Some(before_data),
72 after_data: Some(after_data),
73 schema_version,
74 }
75 }
76
77 pub fn delete(
79 entity_type: impl Into<String>,
80 entity_id: [u8; 16],
81 before_data: Vec<u8>,
82 schema_version: u64,
83 ) -> Self {
84 Self {
85 lsn: 0,
86 timestamp: Self::current_timestamp(),
87 entity_type: entity_type.into(),
88 entity_id,
89 change_type: ChangeType::Delete,
90 changed_fields: vec![],
91 before_data: Some(before_data),
92 after_data: None,
93 schema_version,
94 }
95 }
96
97 fn current_timestamp() -> u64 {
99 std::time::SystemTime::now()
100 .duration_since(std::time::UNIX_EPOCH)
101 .map(|d| d.as_micros() as u64)
102 .unwrap_or(0)
103 }
104}
105
106#[derive(Debug, Clone, PartialEq, Archive, Serialize, Deserialize, SerdeSerialize, SerdeDeserialize)]
108pub struct StreamChangesRequest {
109 pub from_lsn: u64,
111 pub batch_size: u32,
113 pub entity_filter: Option<Vec<String>>,
115}
116
117impl StreamChangesRequest {
118 pub fn new(from_lsn: u64, batch_size: u32) -> Self {
120 Self {
121 from_lsn,
122 batch_size,
123 entity_filter: None,
124 }
125 }
126
127 pub fn with_entity_filter(mut self, entities: Vec<String>) -> Self {
129 self.entity_filter = Some(entities);
130 self
131 }
132}
133
134#[derive(Debug, Clone, PartialEq, Archive, Serialize, Deserialize, SerdeSerialize, SerdeDeserialize)]
136pub struct StreamChangesResponse {
137 pub entries: Vec<ChangeLogEntry>,
139 pub next_lsn: u64,
141 pub has_more: bool,
143}
144
145impl StreamChangesResponse {
146 pub fn new(entries: Vec<ChangeLogEntry>, next_lsn: u64, has_more: bool) -> Self {
148 Self {
149 entries,
150 next_lsn,
151 has_more,
152 }
153 }
154
155 pub fn empty(from_lsn: u64) -> Self {
157 Self {
158 entries: vec![],
159 next_lsn: from_lsn,
160 has_more: false,
161 }
162 }
163}
164
165#[derive(Debug, Clone, PartialEq, Archive, Serialize, Deserialize, SerdeSerialize, SerdeDeserialize)]
167pub enum ReplicationRole {
168 Primary,
170 Replica {
172 primary_addr: String,
174 },
175 Standalone,
177}
178
179impl ReplicationRole {
180 pub fn can_write(&self) -> bool {
182 !matches!(self, ReplicationRole::Replica { .. })
183 }
184
185 pub fn is_replica(&self) -> bool {
187 matches!(self, ReplicationRole::Replica { .. })
188 }
189
190 pub fn is_primary(&self) -> bool {
192 matches!(self, ReplicationRole::Primary)
193 }
194}
195
196impl Default for ReplicationRole {
197 fn default() -> Self {
198 ReplicationRole::Standalone
199 }
200}
201
202#[derive(Debug, Clone, PartialEq, Archive, Serialize, Deserialize, SerdeSerialize, SerdeDeserialize)]
204pub struct ReplicationStatus {
205 pub role: ReplicationRole,
207 pub current_lsn: u64,
209 pub lag_entries: u64,
211 pub lag_ms: u64,
213}
214
215impl ReplicationStatus {
216 pub fn new(role: ReplicationRole, current_lsn: u64) -> Self {
218 Self {
219 role,
220 current_lsn,
221 lag_entries: 0,
222 lag_ms: 0,
223 }
224 }
225
226 pub fn primary(current_lsn: u64) -> Self {
228 Self::new(ReplicationRole::Primary, current_lsn)
229 }
230
231 pub fn standalone(current_lsn: u64) -> Self {
233 Self::new(ReplicationRole::Standalone, current_lsn)
234 }
235
236 pub fn replica(primary_addr: String, applied_lsn: u64, lag_entries: u64, lag_ms: u64) -> Self {
238 Self {
239 role: ReplicationRole::Replica { primary_addr },
240 current_lsn: applied_lsn,
241 lag_entries,
242 lag_ms,
243 }
244 }
245}
246
247impl Default for ReplicationStatus {
248 fn default() -> Self {
249 Self::standalone(0)
250 }
251}
252
253#[cfg(test)]
254mod tests {
255 use super::*;
256
257 #[test]
258 fn test_changelog_entry_insert() {
259 let entry = ChangeLogEntry::insert(
260 "User",
261 [1u8; 16],
262 vec![1, 2, 3],
263 vec!["name".to_string(), "email".to_string()],
264 1,
265 );
266
267 assert_eq!(entry.lsn, 0); assert_eq!(entry.entity_type, "User");
269 assert_eq!(entry.change_type, ChangeType::Insert);
270 assert!(entry.before_data.is_none());
271 assert!(entry.after_data.is_some());
272 }
273
274 #[test]
275 fn test_changelog_entry_update() {
276 let entry = ChangeLogEntry::update(
277 "User",
278 [1u8; 16],
279 vec![1, 2, 3],
280 vec![4, 5, 6],
281 vec!["name".to_string()],
282 1,
283 );
284
285 assert_eq!(entry.change_type, ChangeType::Update);
286 assert!(entry.before_data.is_some());
287 assert!(entry.after_data.is_some());
288 assert_eq!(entry.changed_fields, vec!["name"]);
289 }
290
291 #[test]
292 fn test_changelog_entry_delete() {
293 let entry = ChangeLogEntry::delete("User", [1u8; 16], vec![1, 2, 3], 1);
294
295 assert_eq!(entry.change_type, ChangeType::Delete);
296 assert!(entry.before_data.is_some());
297 assert!(entry.after_data.is_none());
298 }
299
300 #[test]
301 fn test_stream_changes_request() {
302 let request = StreamChangesRequest::new(100, 50)
303 .with_entity_filter(vec!["User".to_string(), "Post".to_string()]);
304
305 assert_eq!(request.from_lsn, 100);
306 assert_eq!(request.batch_size, 50);
307 assert_eq!(
308 request.entity_filter,
309 Some(vec!["User".to_string(), "Post".to_string()])
310 );
311 }
312
313 #[test]
314 fn test_replication_role() {
315 assert!(ReplicationRole::Primary.can_write());
316 assert!(ReplicationRole::Standalone.can_write());
317 assert!(!ReplicationRole::Replica {
318 primary_addr: "localhost:5432".to_string()
319 }
320 .can_write());
321 }
322
323 #[test]
324 fn test_replication_status() {
325 let status = ReplicationStatus::replica("localhost:5432".to_string(), 100, 5, 50);
326
327 assert!(status.role.is_replica());
328 assert_eq!(status.current_lsn, 100);
329 assert_eq!(status.lag_entries, 5);
330 assert_eq!(status.lag_ms, 50);
331 }
332
333 #[test]
334 fn test_serialization_roundtrip() {
335 let entry = ChangeLogEntry::insert(
336 "User",
337 [1u8; 16],
338 vec![1, 2, 3, 4],
339 vec!["name".to_string()],
340 1,
341 );
342
343 let bytes = rkyv::to_bytes::<rkyv::rancor::Error>(&entry).unwrap();
344 let archived =
345 rkyv::access::<ArchivedChangeLogEntry, rkyv::rancor::Error>(&bytes).unwrap();
346 let deserialized: ChangeLogEntry =
347 rkyv::deserialize::<ChangeLogEntry, rkyv::rancor::Error>(archived).unwrap();
348
349 assert_eq!(entry.entity_type, deserialized.entity_type);
350 assert_eq!(entry.change_type, deserialized.change_type);
351 }
352}