Skip to main content

nodedb_types/
clone.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Clone catalog types shared between the catalog layer and the control plane.
4
5use serde::{Deserialize, Serialize};
6
7use crate::{DatabaseId, Lsn};
8
9/// Maximum clone depth (source → clone → clone-of-clone …).
10///
11/// A clone at depth 9 is rejected with `CLONE_DEPTH_EXCEEDED`.
12pub const MAX_CLONE_DEPTH: u32 = 8;
13
14/// Identifies the source of a copy-on-write database clone.
15///
16/// Stored on each cloned `StoredCollection`; the read and write planners
17/// consult it to decide whether source delegation is needed.
18#[derive(
19    Debug,
20    Clone,
21    PartialEq,
22    Eq,
23    Serialize,
24    Deserialize,
25    zerompk::ToMessagePack,
26    zerompk::FromMessagePack,
27    rkyv::Archive,
28    rkyv::Serialize,
29    rkyv::Deserialize,
30)]
31#[msgpack(map)]
32pub struct CloneOrigin {
33    /// The source database this collection was cloned from.
34    pub source_database: DatabaseId,
35    /// Collection name in the source database (scoped to `source_database`).
36    pub source_collection: String,
37    /// WAL LSN up to which source rows are delegated on reads.
38    /// Rows in the source with LSN > `as_of_lsn` are invisible through
39    /// this clone regardless of query time.
40    pub as_of_lsn: Lsn,
41    /// WAL LSN at the moment this clone was created. Used to detect
42    /// bitemporal queries that pre-date the clone.
43    pub clone_created_at: Lsn,
44    /// Surrogate high-water captured from the source's `SurrogateAssigner`
45    /// at clone-create time.  KV bindings allocated AFTER this value
46    /// belong strictly to source-side writes that must not be visible
47    /// from the resulting clone — the lazy KV read path uses this
48    /// ceiling to filter source-delegated rows.  `None` on legacy clones
49    /// created before this field existed (no isolation enforced —
50    /// matches the prior behaviour).
51    #[serde(default)]
52    #[msgpack(default)]
53    pub kv_surrogate_ceiling: Option<u32>,
54}
55
56/// Materialization state of a copy-on-write clone.
57///
58/// Exhaustive matches are required everywhere this enum is matched — no
59/// `_ =>` arms.
60#[derive(
61    Debug,
62    Clone,
63    PartialEq,
64    Eq,
65    Serialize,
66    Deserialize,
67    zerompk::ToMessagePack,
68    zerompk::FromMessagePack,
69    rkyv::Archive,
70    rkyv::Serialize,
71    rkyv::Deserialize,
72    Default,
73)]
74pub enum CloneStatus {
75    /// Reads delegate to source up to `as_of_lsn`; writes go to target.
76    /// This is the initial state immediately after a `CLONE DATABASE`.
77    #[default]
78    Shadowed,
79    /// Background materializer is copying source rows into target storage.
80    /// Reads still delegate to source for rows not yet copied.
81    Materializing {
82        /// LSN watermark of the materializer's current position in the source.
83        progress_lsn: Lsn,
84        /// Bytes materialised so far (best-effort estimate).
85        bytes_done: u64,
86        /// Total bytes to materialise (best-effort estimate; 0 = unknown).
87        bytes_total: u64,
88    },
89    /// All source rows are physically present in target storage.
90    /// Source delegation is no longer needed.
91    Materialized,
92}
93
94#[cfg(test)]
95mod tests {
96    use super::*;
97
98    fn round_trip_msgpack<T>(val: &T) -> T
99    where
100        T: zerompk::ToMessagePack + for<'a> zerompk::FromMessagePack<'a>,
101    {
102        let bytes = zerompk::to_msgpack_vec(val).expect("serialize");
103        zerompk::from_msgpack(&bytes).expect("deserialize")
104    }
105
106    fn round_trip_serde<T>(val: &T) -> T
107    where
108        T: Serialize + for<'de> Deserialize<'de>,
109    {
110        let json = sonic_rs::to_string(val).expect("serde serialize");
111        sonic_rs::from_str(&json).expect("serde deserialize")
112    }
113
114    fn sample_origin() -> CloneOrigin {
115        CloneOrigin {
116            source_database: DatabaseId::DEFAULT,
117            source_collection: "users".to_string(),
118            as_of_lsn: Lsn::new(42_000),
119            clone_created_at: Lsn::new(42_100),
120            kv_surrogate_ceiling: Some(7),
121        }
122    }
123
124    #[test]
125    fn clone_status_shadowed_msgpack() {
126        let s = CloneStatus::Shadowed;
127        assert_eq!(round_trip_msgpack(&s), s);
128    }
129
130    #[test]
131    fn clone_status_materializing_msgpack() {
132        let s = CloneStatus::Materializing {
133            progress_lsn: Lsn::new(1_000),
134            bytes_done: 512,
135            bytes_total: 1_024,
136        };
137        assert_eq!(round_trip_msgpack(&s), s);
138    }
139
140    #[test]
141    fn clone_status_materialized_msgpack() {
142        let s = CloneStatus::Materialized;
143        assert_eq!(round_trip_msgpack(&s), s);
144    }
145
146    #[test]
147    fn clone_status_shadowed_serde() {
148        let s = CloneStatus::Shadowed;
149        assert_eq!(round_trip_serde(&s), s);
150    }
151
152    #[test]
153    fn clone_status_materializing_serde() {
154        let s = CloneStatus::Materializing {
155            progress_lsn: Lsn::new(1_000),
156            bytes_done: 512,
157            bytes_total: 1_024,
158        };
159        assert_eq!(round_trip_serde(&s), s);
160    }
161
162    #[test]
163    fn clone_status_materialized_serde() {
164        let s = CloneStatus::Materialized;
165        assert_eq!(round_trip_serde(&s), s);
166    }
167
168    #[test]
169    fn clone_origin_msgpack() {
170        let o = sample_origin();
171        assert_eq!(round_trip_msgpack(&o), o);
172    }
173
174    #[test]
175    fn clone_origin_serde() {
176        let o = sample_origin();
177        assert_eq!(round_trip_serde(&o), o);
178    }
179
180    #[test]
181    fn max_clone_depth_value() {
182        assert_eq!(MAX_CLONE_DEPTH, 8);
183    }
184
185    #[test]
186    fn clone_status_default_is_shadowed() {
187        assert_eq!(CloneStatus::default(), CloneStatus::Shadowed);
188    }
189}