Skip to main content

nodedb_types/
mirror.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Mirror catalog types shared between the catalog layer and the control plane.
4//!
5//! A mirror database is a continuously-updated read-only replica of a source
6//! database in another cluster. Promotion to writable is one-way and permanent.
7//! Exhaustive matches are required everywhere these enums are matched — no
8//! `_ =>` arms.
9
10use serde::{Deserialize, Serialize};
11
12use crate::{DatabaseId, Lsn};
13
14/// Identifies the source of a mirror database.
15///
16/// Stored on `DatabaseDescriptor.mirror_origin`; the read planner and write
17/// rejector consult it to enforce mirror semantics.
18#[derive(
19    Debug,
20    Clone,
21    PartialEq,
22    Eq,
23    Serialize,
24    Deserialize,
25    zerompk::ToMessagePack,
26    zerompk::FromMessagePack,
27)]
28#[msgpack(map)]
29pub struct MirrorOrigin {
30    /// The cluster that the source database lives in.
31    pub source_cluster: String,
32    /// The source database being mirrored.
33    pub source_database: DatabaseId,
34    /// Replication mode: whether the source waits for mirror ack.
35    pub mode: MirrorMode,
36    /// WAL LSN last applied on this mirror.
37    pub last_applied: Lsn,
38    /// Current mirror lifecycle status.
39    pub status: MirrorStatus,
40}
41
42/// Replication mode for a mirror database.
43///
44/// Exhaustive matches are required everywhere this enum is matched — no
45/// `_ =>` arms.
46#[derive(
47    Debug,
48    Clone,
49    Copy,
50    PartialEq,
51    Eq,
52    Serialize,
53    Deserialize,
54    zerompk::ToMessagePack,
55    zerompk::FromMessagePack,
56)]
57pub enum MirrorMode {
58    /// Source waits for mirror ack before commit. Strict latency cost;
59    /// not recommended cross-region.
60    Sync,
61    /// Mirror trails source; lag is observable via `MirrorStatus::Degraded`.
62    Async,
63}
64
65/// Lifecycle status of a mirror database.
66///
67/// Exhaustive matches are required everywhere this enum is matched — no
68/// `_ =>` arms.
69#[derive(
70    Debug,
71    Clone,
72    PartialEq,
73    Eq,
74    Serialize,
75    Deserialize,
76    zerompk::ToMessagePack,
77    zerompk::FromMessagePack,
78)]
79pub enum MirrorStatus {
80    /// Initial snapshot transfer is in progress.
81    Bootstrapping {
82        /// Bytes of snapshot data received so far.
83        bytes_done: u64,
84        /// Total snapshot size in bytes (0 = unknown).
85        bytes_total: u64,
86    },
87    /// Log replication is active; mirror is caught up within normal lag bounds.
88    Following,
89    /// Mirror is receiving entries but has fallen behind the lag threshold.
90    Degraded {
91        /// Observed replication lag in milliseconds.
92        lag_ms: u64,
93    },
94    /// Source is unreachable; mirror is serving stale reads with growing lag.
95    Disconnected,
96    /// Mirror was promoted to a writable database. Source link is severed.
97    /// `mirror_origin` is retained as a historical lineage record.
98    Promoted,
99}
100
101/// Per-mirror lag record persisted in `_system.mirror_lag`.
102///
103/// Read by the metrics collector and the `SHOW DATABASE MIRROR STATUS` handler
104/// to produce the observable replication lag.
105#[derive(
106    Debug,
107    Clone,
108    PartialEq,
109    Eq,
110    Serialize,
111    Deserialize,
112    zerompk::ToMessagePack,
113    zerompk::FromMessagePack,
114)]
115#[msgpack(map)]
116pub struct MirrorLagRecord {
117    /// WAL LSN of the last entry successfully applied on this mirror.
118    pub last_applied_lsn: Lsn,
119    /// Wall-clock milliseconds (UNIX epoch) when `last_applied_lsn` was applied.
120    pub last_apply_ms: u64,
121}
122
123#[cfg(test)]
124mod tests {
125    use super::*;
126
127    fn round_trip_msgpack<T>(val: &T) -> T
128    where
129        T: zerompk::ToMessagePack + for<'a> zerompk::FromMessagePack<'a>,
130    {
131        let bytes = zerompk::to_msgpack_vec(val).expect("serialize");
132        zerompk::from_msgpack(&bytes).expect("deserialize")
133    }
134
135    fn round_trip_serde<T>(val: &T) -> T
136    where
137        T: Serialize + for<'de> Deserialize<'de>,
138    {
139        let json = sonic_rs::to_string(val).expect("serde serialize");
140        sonic_rs::from_str(&json).expect("serde deserialize")
141    }
142
143    fn sample_origin() -> MirrorOrigin {
144        MirrorOrigin {
145            source_cluster: "prod-us".to_string(),
146            source_database: DatabaseId::DEFAULT,
147            mode: MirrorMode::Async,
148            last_applied: Lsn::new(12_345),
149            status: MirrorStatus::Following,
150        }
151    }
152
153    #[test]
154    fn mirror_mode_msgpack_roundtrip() {
155        assert_eq!(round_trip_msgpack(&MirrorMode::Sync), MirrorMode::Sync);
156        assert_eq!(round_trip_msgpack(&MirrorMode::Async), MirrorMode::Async);
157    }
158
159    #[test]
160    fn mirror_mode_serde_roundtrip() {
161        assert_eq!(round_trip_serde(&MirrorMode::Sync), MirrorMode::Sync);
162        assert_eq!(round_trip_serde(&MirrorMode::Async), MirrorMode::Async);
163    }
164
165    #[test]
166    fn mirror_status_following_msgpack() {
167        let s = MirrorStatus::Following;
168        assert_eq!(round_trip_msgpack(&s), s);
169    }
170
171    #[test]
172    fn mirror_status_bootstrapping_msgpack() {
173        let s = MirrorStatus::Bootstrapping {
174            bytes_done: 1024,
175            bytes_total: 4096,
176        };
177        assert_eq!(round_trip_msgpack(&s), s);
178    }
179
180    #[test]
181    fn mirror_status_degraded_msgpack() {
182        let s = MirrorStatus::Degraded { lag_ms: 7500 };
183        assert_eq!(round_trip_msgpack(&s), s);
184    }
185
186    #[test]
187    fn mirror_status_disconnected_msgpack() {
188        let s = MirrorStatus::Disconnected;
189        assert_eq!(round_trip_msgpack(&s), s);
190    }
191
192    #[test]
193    fn mirror_status_promoted_msgpack() {
194        let s = MirrorStatus::Promoted;
195        assert_eq!(round_trip_msgpack(&s), s);
196    }
197
198    #[test]
199    fn mirror_status_serde_roundtrip() {
200        for s in [
201            MirrorStatus::Following,
202            MirrorStatus::Bootstrapping {
203                bytes_done: 0,
204                bytes_total: 0,
205            },
206            MirrorStatus::Degraded { lag_ms: 5001 },
207            MirrorStatus::Disconnected,
208            MirrorStatus::Promoted,
209        ] {
210            assert_eq!(round_trip_serde(&s), s);
211        }
212    }
213
214    #[test]
215    fn mirror_origin_msgpack_roundtrip() {
216        let o = sample_origin();
217        assert_eq!(round_trip_msgpack(&o), o);
218    }
219
220    #[test]
221    fn mirror_origin_serde_roundtrip() {
222        let o = sample_origin();
223        assert_eq!(round_trip_serde(&o), o);
224    }
225}