Skip to main content

ormdb_proto/
replication.rs

1//! Replication and CDC protocol types.
2
3use crate::ChangeType;
4use rkyv::{Archive, Deserialize, Serialize};
5use serde::{Deserialize as SerdeDeserialize, Serialize as SerdeSerialize};
6
7/// Persistent change log entry for CDC/replication.
8///
9/// Each mutation generates a changelog entry that is persisted with
10/// a monotonically increasing LSN (Log Sequence Number).
11#[derive(Debug, Clone, PartialEq, Archive, Serialize, Deserialize, SerdeSerialize, SerdeDeserialize)]
12pub struct ChangeLogEntry {
13    /// Log Sequence Number - monotonically increasing identifier.
14    pub lsn: u64,
15    /// Timestamp in microseconds since epoch.
16    pub timestamp: u64,
17    /// The entity type that was modified.
18    pub entity_type: String,
19    /// The ID of the modified entity.
20    pub entity_id: [u8; 16],
21    /// Type of change (Insert, Update, Delete).
22    pub change_type: ChangeType,
23    /// List of fields that changed.
24    pub changed_fields: Vec<String>,
25    /// Full before-state (rkyv serialized bytes), None for inserts.
26    pub before_data: Option<Vec<u8>>,
27    /// Full after-state (rkyv serialized bytes), None for deletes.
28    pub after_data: Option<Vec<u8>>,
29    /// Schema version when the change occurred.
30    pub schema_version: u64,
31}
32
33impl ChangeLogEntry {
34    /// Create a new changelog entry for an insert operation.
35    pub fn insert(
36        entity_type: impl Into<String>,
37        entity_id: [u8; 16],
38        after_data: Vec<u8>,
39        changed_fields: Vec<String>,
40        schema_version: u64,
41    ) -> Self {
42        Self {
43            lsn: 0, // Assigned by ChangeLog
44            timestamp: Self::current_timestamp(),
45            entity_type: entity_type.into(),
46            entity_id,
47            change_type: ChangeType::Insert,
48            changed_fields,
49            before_data: None,
50            after_data: Some(after_data),
51            schema_version,
52        }
53    }
54
55    /// Create a new changelog entry for an update operation.
56    pub fn update(
57        entity_type: impl Into<String>,
58        entity_id: [u8; 16],
59        before_data: Vec<u8>,
60        after_data: Vec<u8>,
61        changed_fields: Vec<String>,
62        schema_version: u64,
63    ) -> Self {
64        Self {
65            lsn: 0,
66            timestamp: Self::current_timestamp(),
67            entity_type: entity_type.into(),
68            entity_id,
69            change_type: ChangeType::Update,
70            changed_fields,
71            before_data: Some(before_data),
72            after_data: Some(after_data),
73            schema_version,
74        }
75    }
76
77    /// Create a new changelog entry for a delete operation.
78    pub fn delete(
79        entity_type: impl Into<String>,
80        entity_id: [u8; 16],
81        before_data: Vec<u8>,
82        schema_version: u64,
83    ) -> Self {
84        Self {
85            lsn: 0,
86            timestamp: Self::current_timestamp(),
87            entity_type: entity_type.into(),
88            entity_id,
89            change_type: ChangeType::Delete,
90            changed_fields: vec![],
91            before_data: Some(before_data),
92            after_data: None,
93            schema_version,
94        }
95    }
96
97    /// Get current timestamp in microseconds.
98    fn current_timestamp() -> u64 {
99        std::time::SystemTime::now()
100            .duration_since(std::time::UNIX_EPOCH)
101            .map(|d| d.as_micros() as u64)
102            .unwrap_or(0)
103    }
104}
105
106/// Request to stream changes from the changelog.
107#[derive(Debug, Clone, PartialEq, Archive, Serialize, Deserialize, SerdeSerialize, SerdeDeserialize)]
108pub struct StreamChangesRequest {
109    /// Starting LSN (inclusive).
110    pub from_lsn: u64,
111    /// Maximum number of entries to return.
112    pub batch_size: u32,
113    /// Optional filter by entity type.
114    pub entity_filter: Option<Vec<String>>,
115}
116
117impl StreamChangesRequest {
118    /// Create a new stream changes request.
119    pub fn new(from_lsn: u64, batch_size: u32) -> Self {
120        Self {
121            from_lsn,
122            batch_size,
123            entity_filter: None,
124        }
125    }
126
127    /// Add an entity type filter.
128    pub fn with_entity_filter(mut self, entities: Vec<String>) -> Self {
129        self.entity_filter = Some(entities);
130        self
131    }
132}
133
134/// Response containing a batch of changelog entries.
135#[derive(Debug, Clone, PartialEq, Archive, Serialize, Deserialize, SerdeSerialize, SerdeDeserialize)]
136pub struct StreamChangesResponse {
137    /// The changelog entries.
138    pub entries: Vec<ChangeLogEntry>,
139    /// The next LSN to request (for pagination).
140    pub next_lsn: u64,
141    /// Whether there are more entries available.
142    pub has_more: bool,
143}
144
145impl StreamChangesResponse {
146    /// Create a new stream changes response.
147    pub fn new(entries: Vec<ChangeLogEntry>, next_lsn: u64, has_more: bool) -> Self {
148        Self {
149            entries,
150            next_lsn,
151            has_more,
152        }
153    }
154
155    /// Create an empty response.
156    pub fn empty(from_lsn: u64) -> Self {
157        Self {
158            entries: vec![],
159            next_lsn: from_lsn,
160            has_more: false,
161        }
162    }
163}
164
165/// Replication role of a server instance.
166#[derive(Debug, Clone, PartialEq, Archive, Serialize, Deserialize, SerdeSerialize, SerdeDeserialize)]
167pub enum ReplicationRole {
168    /// Primary server - accepts writes.
169    Primary,
170    /// Replica server - read-only, replicates from primary.
171    Replica {
172        /// Address of the primary server.
173        primary_addr: String,
174    },
175    /// Standalone server - not participating in replication.
176    Standalone,
177}
178
179impl ReplicationRole {
180    /// Check if this role allows writes.
181    pub fn can_write(&self) -> bool {
182        !matches!(self, ReplicationRole::Replica { .. })
183    }
184
185    /// Check if this is a replica.
186    pub fn is_replica(&self) -> bool {
187        matches!(self, ReplicationRole::Replica { .. })
188    }
189
190    /// Check if this is the primary.
191    pub fn is_primary(&self) -> bool {
192        matches!(self, ReplicationRole::Primary)
193    }
194}
195
196impl Default for ReplicationRole {
197    fn default() -> Self {
198        ReplicationRole::Standalone
199    }
200}
201
202/// Current replication status of a server.
203#[derive(Debug, Clone, PartialEq, Archive, Serialize, Deserialize, SerdeSerialize, SerdeDeserialize)]
204pub struct ReplicationStatus {
205    /// Current replication role.
206    pub role: ReplicationRole,
207    /// Current LSN (highest written for primary, highest applied for replica).
208    pub current_lsn: u64,
209    /// Replication lag in number of entries (replica only).
210    pub lag_entries: u64,
211    /// Replication lag in milliseconds (replica only).
212    pub lag_ms: u64,
213}
214
215impl ReplicationStatus {
216    /// Create a new replication status.
217    pub fn new(role: ReplicationRole, current_lsn: u64) -> Self {
218        Self {
219            role,
220            current_lsn,
221            lag_entries: 0,
222            lag_ms: 0,
223        }
224    }
225
226    /// Create status for a primary server.
227    pub fn primary(current_lsn: u64) -> Self {
228        Self::new(ReplicationRole::Primary, current_lsn)
229    }
230
231    /// Create status for a standalone server.
232    pub fn standalone(current_lsn: u64) -> Self {
233        Self::new(ReplicationRole::Standalone, current_lsn)
234    }
235
236    /// Create status for a replica with lag information.
237    pub fn replica(primary_addr: String, applied_lsn: u64, lag_entries: u64, lag_ms: u64) -> Self {
238        Self {
239            role: ReplicationRole::Replica { primary_addr },
240            current_lsn: applied_lsn,
241            lag_entries,
242            lag_ms,
243        }
244    }
245}
246
247impl Default for ReplicationStatus {
248    fn default() -> Self {
249        Self::standalone(0)
250    }
251}
252
253#[cfg(test)]
254mod tests {
255    use super::*;
256
257    #[test]
258    fn test_changelog_entry_insert() {
259        let entry = ChangeLogEntry::insert(
260            "User",
261            [1u8; 16],
262            vec![1, 2, 3],
263            vec!["name".to_string(), "email".to_string()],
264            1,
265        );
266
267        assert_eq!(entry.lsn, 0); // Not assigned yet
268        assert_eq!(entry.entity_type, "User");
269        assert_eq!(entry.change_type, ChangeType::Insert);
270        assert!(entry.before_data.is_none());
271        assert!(entry.after_data.is_some());
272    }
273
274    #[test]
275    fn test_changelog_entry_update() {
276        let entry = ChangeLogEntry::update(
277            "User",
278            [1u8; 16],
279            vec![1, 2, 3],
280            vec![4, 5, 6],
281            vec!["name".to_string()],
282            1,
283        );
284
285        assert_eq!(entry.change_type, ChangeType::Update);
286        assert!(entry.before_data.is_some());
287        assert!(entry.after_data.is_some());
288        assert_eq!(entry.changed_fields, vec!["name"]);
289    }
290
291    #[test]
292    fn test_changelog_entry_delete() {
293        let entry = ChangeLogEntry::delete("User", [1u8; 16], vec![1, 2, 3], 1);
294
295        assert_eq!(entry.change_type, ChangeType::Delete);
296        assert!(entry.before_data.is_some());
297        assert!(entry.after_data.is_none());
298    }
299
300    #[test]
301    fn test_stream_changes_request() {
302        let request = StreamChangesRequest::new(100, 50)
303            .with_entity_filter(vec!["User".to_string(), "Post".to_string()]);
304
305        assert_eq!(request.from_lsn, 100);
306        assert_eq!(request.batch_size, 50);
307        assert_eq!(
308            request.entity_filter,
309            Some(vec!["User".to_string(), "Post".to_string()])
310        );
311    }
312
313    #[test]
314    fn test_replication_role() {
315        assert!(ReplicationRole::Primary.can_write());
316        assert!(ReplicationRole::Standalone.can_write());
317        assert!(!ReplicationRole::Replica {
318            primary_addr: "localhost:5432".to_string()
319        }
320        .can_write());
321    }
322
323    #[test]
324    fn test_replication_status() {
325        let status = ReplicationStatus::replica("localhost:5432".to_string(), 100, 5, 50);
326
327        assert!(status.role.is_replica());
328        assert_eq!(status.current_lsn, 100);
329        assert_eq!(status.lag_entries, 5);
330        assert_eq!(status.lag_ms, 50);
331    }
332
333    #[test]
334    fn test_serialization_roundtrip() {
335        let entry = ChangeLogEntry::insert(
336            "User",
337            [1u8; 16],
338            vec![1, 2, 3, 4],
339            vec!["name".to_string()],
340            1,
341        );
342
343        let bytes = rkyv::to_bytes::<rkyv::rancor::Error>(&entry).unwrap();
344        let archived =
345            rkyv::access::<ArchivedChangeLogEntry, rkyv::rancor::Error>(&bytes).unwrap();
346        let deserialized: ChangeLogEntry =
347            rkyv::deserialize::<ChangeLogEntry, rkyv::rancor::Error>(archived).unwrap();
348
349        assert_eq!(entry.entity_type, deserialized.entity_type);
350        assert_eq!(entry.change_type, deserialized.change_type);
351    }
352}