Skip to main content

uni_common/core/
fork.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Type definitions for the fork feature.
5//!
6//! `Fork` = a named, durable, isolated branch of the graph. Each fork is
7//! backed by one Lance branch per dataset (vertex, edge-delta, adjacency).
8//! These types are persisted to `catalog/fork_registry.json` and
9//! `catalog/fork_schemas/{fork_id}.json`. Their lifecycle is governed by
10//! 2PC state machines.
11//!
12//! `SchemaDelta` is wired through Phase 1 with all instances empty —
13//! the merge infrastructure exists so Phase 2's on-the-fly label
14//! creation has a populated path to land into.
15
16// Rust guideline compliant
17
18use std::collections::BTreeMap;
19
20use chrono::{DateTime, Utc};
21use serde::{Deserialize, Serialize};
22use ulid::Ulid;
23
24use crate::core::schema::{DataType, EdgeTypeMeta, LabelMeta};
25
26/// Stable identifier for a fork. Display format is base32 ULID.
27///
28/// Newtype around [`ulid::Ulid`]; preserves time-ordering across
29/// processes and avoids the random-distribution drawbacks of UUIDv4.
30#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Ord, PartialOrd, Serialize, Deserialize)]
31#[serde(transparent)]
32pub struct ForkId(pub Ulid);
33
34impl ForkId {
35    /// Allocate a fresh ForkId using the system clock.
36    #[must_use]
37    pub fn new() -> Self {
38        Self(Ulid::new())
39    }
40
41    /// Parse a ForkId from its canonical string form.
42    ///
43    /// # Errors
44    ///
45    /// Returns an error if `s` is not a valid 26-character base32 ULID.
46    pub fn parse(s: &str) -> Result<Self, ulid::DecodeError> {
47        Ulid::from_string(s).map(Self)
48    }
49}
50
51impl Default for ForkId {
52    fn default() -> Self {
53        Self::new()
54    }
55}
56
57impl std::fmt::Display for ForkId {
58    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59        self.0.fmt(f)
60    }
61}
62
63/// Lifecycle status of a fork in the registry.
64///
65/// State machine: `Pending` → `Active` (create commit point); `Active` →
66/// `Tombstoned` → removed (drop commit point). Recovery resumes any
67/// non-`Active` state.
68#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
69#[serde(rename_all = "lowercase")]
70#[non_exhaustive]
71pub enum ForkStatus {
72    /// Registry entry persisted; some Lance branches not yet created.
73    Pending,
74    /// All Lance branches present; fork is reachable via `session.fork`.
75    Active,
76    /// Drop initiated; recovery will finish removing branches.
77    Tombstoned,
78}
79
80/// Metadata for a single fork.
81///
82/// One [`ForkInfo`] per fork in `catalog/fork_registry.json`. The
83/// `datasets` map is filled in step 4 of the create 2PC and is
84/// authoritative for which Lance branches the fork owns.
85#[derive(Clone, Debug, Serialize, Deserialize)]
86pub struct ForkInfo {
87    /// Stable identifier; persists across rename if rename ever lands.
88    pub id: ForkId,
89
90    /// Human-readable name (unique per database).
91    pub name: String,
92
93    /// Parent fork in a nested-fork chain. `None` ⇒ parent is primary.
94    /// Phase 1 always sets `None`.
95    #[serde(default)]
96    pub parent_fork_id: Option<ForkId>,
97
98    /// Snapshot id of primary at the moment the fork was created.
99    pub parent_snapshot_id: String,
100
101    /// Wall-clock UTC at fork creation.
102    pub created_at: DateTime<Utc>,
103
104    /// Wall-clock TTL expiry. `None` ⇒ never expires. Phase 1 always
105    /// `None`; the sweeper lands in Phase 4.
106    #[serde(default)]
107    pub ttl_expires_at: Option<DateTime<Utc>>,
108
109    /// Schema version (`Schema::schema_version`) at fork creation.
110    /// Captured day-one even though only Phase 7's schema-evolution
111    /// spike consumes it; backfilling later is impossible.
112    pub schema_version_at_creation: u32,
113
114    /// Map of `dataset_name` → `branch_name` for every Lance dataset
115    /// this fork owns. Branch names live under the dataset's `tree/`
116    /// directory in Lance's on-disk layout.
117    pub datasets: BTreeMap<String, String>,
118
119    /// Parent's MVCC version high-water-mark at the fork point. The fork
120    /// bootstraps its own version counter to this floor so a fork
121    /// transaction's `_version <= pin` read still sees inherited
122    /// (base_paths) rows, while fork writes get versions above it. Read on
123    /// every fork-session build; persisted so it is stable across re-opens
124    /// (it must be the *fork-point* version, never the live parent's).
125    /// `0` for legacy forks created before this field existed.
126    #[serde(default)]
127    pub fork_point_version_hwm: u64,
128
129    /// Lifecycle state. See [`ForkStatus`].
130    pub status: ForkStatus,
131}
132
133impl ForkInfo {
134    /// Convenience: build a `Pending` info ready for create 2PC step 2.
135    #[must_use]
136    pub fn new_pending(
137        id: ForkId,
138        name: impl Into<String>,
139        parent_snapshot_id: impl Into<String>,
140        schema_version: u32,
141    ) -> Self {
142        Self {
143            id,
144            name: name.into(),
145            parent_fork_id: None,
146            parent_snapshot_id: parent_snapshot_id.into(),
147            created_at: Utc::now(),
148            ttl_expires_at: None,
149            schema_version_at_creation: schema_version,
150            datasets: BTreeMap::new(),
151            fork_point_version_hwm: 0,
152            status: ForkStatus::Pending,
153        }
154    }
155}
156
157/// Adds a property to an existing label or edge type via the fork
158/// schema overlay. Phase 1 has no producer; Phase 2 fills this when
159/// `tx.execute("CREATE (n:Person {phone: ...})")` introduces a
160/// previously-unknown property on a fork.
161#[derive(Clone, Debug, Serialize, Deserialize)]
162pub struct PropertyAddition {
163    /// Owning label or edge-type name.
164    pub owner: String,
165    /// Whether `owner` is a label or an edge type.
166    pub owner_kind: PropertyOwnerKind,
167    /// New property name.
168    pub property: String,
169    /// Declared type.
170    pub data_type: DataType,
171    /// Whether the new property may be null.
172    pub nullable: bool,
173}
174
175/// Discriminator for [`PropertyAddition::owner_kind`].
176#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
177#[serde(rename_all = "lowercase")]
178pub enum PropertyOwnerKind {
179    /// Property attaches to a vertex label.
180    Label,
181    /// Property attaches to an edge type.
182    EdgeType,
183}
184
185/// Schema additions on top of primary, owned by a single fork.
186///
187/// Only *additions* — renames, drops, and type changes are spec
188/// non-goals (§14). Always read together with primary's schema:
189/// `merged = primary ⊕ delta`. The merge implementation lives in
190/// [`crate::core::schema::SchemaManager::with_overlay`].
191///
192/// # Invariant: fork-origin ids are fork-local (L7)
193///
194/// A label/edge-type id minted in this delta can collide with a primary id
195/// allocated after the fork point (both use `max(existing)+1` over their own
196/// view). This is benign: promote and storage resolve by NAME, never by a
197/// fork-origin numeric id. Do not trust these ids across the fork↔primary
198/// boundary.
199#[derive(Clone, Debug, Default, Serialize, Deserialize)]
200pub struct SchemaDelta {
201    /// Vertex labels new to this fork's schema.
202    #[serde(default)]
203    pub added_labels: Vec<(String, LabelMeta)>,
204
205    /// Edge types new to this fork's schema.
206    #[serde(default)]
207    pub added_edge_types: Vec<(String, EdgeTypeMeta)>,
208
209    /// Properties added to existing labels or edge types.
210    #[serde(default)]
211    pub added_properties: Vec<PropertyAddition>,
212}
213
214impl SchemaDelta {
215    /// Convenience: empty delta (the only valid Phase 1 value).
216    #[must_use]
217    pub fn empty() -> Self {
218        Self::default()
219    }
220
221    /// `true` if the delta contributes nothing on top of primary.
222    #[must_use]
223    pub fn is_empty(&self) -> bool {
224        self.added_labels.is_empty()
225            && self.added_edge_types.is_empty()
226            && self.added_properties.is_empty()
227    }
228
229    /// Compose `self` atop `base`: returns `base ⊕ self`.
230    ///
231    /// Phase 3 (nested forks): the effective schema for a child fork is
232    /// `primary ⊕ parent_overlay ⊕ child_overlay`. This helper folds the
233    /// chain bottom-up so the final result can be merged into primary
234    /// in a single [`crate::core::schema::SchemaManager::with_overlay`]
235    /// call.
236    ///
237    /// Collision policy: `self` wins. A child fork that re-declares the
238    /// same label or edge type as its parent overrides the parent's
239    /// entry. Property additions are deduplicated by `(owner, property)`,
240    /// with `self`'s entry winning.
241    #[must_use]
242    pub fn merge_atop(&self, base: &SchemaDelta) -> SchemaDelta {
243        use std::collections::BTreeMap;
244
245        // Labels: base first, then self overrides.
246        let mut labels: BTreeMap<String, LabelMeta> = BTreeMap::new();
247        for (name, meta) in &base.added_labels {
248            labels.insert(name.clone(), meta.clone());
249        }
250        for (name, meta) in &self.added_labels {
251            labels.insert(name.clone(), meta.clone());
252        }
253
254        let mut edge_types: BTreeMap<String, EdgeTypeMeta> = BTreeMap::new();
255        for (name, meta) in &base.added_edge_types {
256            edge_types.insert(name.clone(), meta.clone());
257        }
258        for (name, meta) in &self.added_edge_types {
259            edge_types.insert(name.clone(), meta.clone());
260        }
261
262        let mut properties: BTreeMap<(String, String), PropertyAddition> = BTreeMap::new();
263        for add in &base.added_properties {
264            properties.insert((add.owner.clone(), add.property.clone()), add.clone());
265        }
266        for add in &self.added_properties {
267            properties.insert((add.owner.clone(), add.property.clone()), add.clone());
268        }
269
270        SchemaDelta {
271            added_labels: labels.into_iter().collect(),
272            added_edge_types: edge_types.into_iter().collect(),
273            added_properties: properties.into_values().collect(),
274        }
275    }
276}
277
278/// Top-level on-disk shape of `catalog/fork_registry.json`.
279///
280/// Concurrent updates are serialized at the registry-handle layer in
281/// `uni-store`; this struct is just the wire format.
282#[derive(Clone, Debug, Default, Serialize, Deserialize)]
283pub struct ForkRegistryFile {
284    /// Fork name → metadata.
285    #[serde(default)]
286    pub forks: BTreeMap<String, ForkInfo>,
287}
288
289#[cfg(test)]
290mod tests {
291    use super::*;
292
293    #[test]
294    fn fork_id_roundtrip() {
295        let id = ForkId::new();
296        let s = id.to_string();
297        let parsed = ForkId::parse(&s).unwrap();
298        assert_eq!(id, parsed);
299    }
300
301    #[test]
302    fn fork_info_serde_roundtrip() {
303        let info = ForkInfo::new_pending(ForkId::new(), "scenario_1", "snap-abc", 17);
304        let json = serde_json::to_string(&info).unwrap();
305        let parsed: ForkInfo = serde_json::from_str(&json).unwrap();
306        assert_eq!(parsed.id, info.id);
307        assert_eq!(parsed.name, "scenario_1");
308        assert_eq!(parsed.parent_snapshot_id, "snap-abc");
309        assert_eq!(parsed.schema_version_at_creation, 17);
310        assert_eq!(parsed.status, ForkStatus::Pending);
311        assert!(parsed.datasets.is_empty());
312        assert!(parsed.parent_fork_id.is_none());
313        assert!(parsed.ttl_expires_at.is_none());
314    }
315
316    #[test]
317    fn registry_file_default_empty() {
318        let file = ForkRegistryFile::default();
319        let json = serde_json::to_string(&file).unwrap();
320        let parsed: ForkRegistryFile = serde_json::from_str(&json).unwrap();
321        assert!(parsed.forks.is_empty());
322    }
323
324    #[test]
325    fn schema_delta_default_is_empty() {
326        let d = SchemaDelta::default();
327        assert!(d.is_empty());
328    }
329
330    fn label_meta(id: u16) -> LabelMeta {
331        use crate::core::schema::SchemaElementState;
332        LabelMeta {
333            id,
334            created_at: chrono::Utc::now(),
335            state: SchemaElementState::Active,
336            description: None,
337        }
338    }
339
340    fn edge_type_meta(id: u32) -> EdgeTypeMeta {
341        use crate::core::schema::SchemaElementState;
342        EdgeTypeMeta {
343            id,
344            src_labels: vec!["A".into()],
345            dst_labels: vec!["A".into()],
346            state: SchemaElementState::Active,
347            description: None,
348        }
349    }
350
351    #[test]
352    fn merge_atop_unions_disjoint_labels_and_edge_types() {
353        let base = SchemaDelta {
354            added_labels: vec![("A".into(), label_meta(1))],
355            added_edge_types: vec![("E1".into(), edge_type_meta(10))],
356            ..Default::default()
357        };
358        let top = SchemaDelta {
359            added_labels: vec![("B".into(), label_meta(2))],
360            added_edge_types: vec![("E2".into(), edge_type_meta(20))],
361            ..Default::default()
362        };
363        let merged = top.merge_atop(&base);
364        let label_names: Vec<&str> = merged
365            .added_labels
366            .iter()
367            .map(|(n, _)| n.as_str())
368            .collect();
369        assert!(label_names.contains(&"A") && label_names.contains(&"B"));
370        let edge_names: Vec<&str> = merged
371            .added_edge_types
372            .iter()
373            .map(|(n, _)| n.as_str())
374            .collect();
375        assert!(edge_names.contains(&"E1") && edge_names.contains(&"E2"));
376    }
377
378    #[test]
379    fn merge_atop_self_wins_on_collision() {
380        let base = SchemaDelta {
381            added_labels: vec![("A".into(), label_meta(100))],
382            ..Default::default()
383        };
384        let top = SchemaDelta {
385            added_labels: vec![("A".into(), label_meta(200))],
386            ..Default::default()
387        };
388        let merged = top.merge_atop(&base);
389        assert_eq!(merged.added_labels.len(), 1);
390        assert_eq!(merged.added_labels[0].1.id, 200, "self must win");
391    }
392
393    #[test]
394    fn merge_atop_empty_base_is_self() {
395        let top = SchemaDelta {
396            added_labels: vec![("A".into(), label_meta(1))],
397            ..Default::default()
398        };
399        let merged = top.merge_atop(&SchemaDelta::empty());
400        assert_eq!(merged.added_labels.len(), 1);
401        assert_eq!(merged.added_labels[0].0, "A");
402    }
403
404    #[test]
405    fn merge_atop_empty_self_is_base() {
406        let base = SchemaDelta {
407            added_labels: vec![("A".into(), label_meta(1))],
408            ..Default::default()
409        };
410        let merged = SchemaDelta::empty().merge_atop(&base);
411        assert_eq!(merged.added_labels.len(), 1);
412        assert_eq!(merged.added_labels[0].0, "A");
413    }
414
415    #[test]
416    fn merge_atop_dedupes_properties_by_owner_and_name() {
417        let base_add = PropertyAddition {
418            owner: "Person".into(),
419            owner_kind: PropertyOwnerKind::Label,
420            property: "age".into(),
421            data_type: DataType::Int64,
422            nullable: true,
423        };
424        let top_add = PropertyAddition {
425            owner: "Person".into(),
426            owner_kind: PropertyOwnerKind::Label,
427            property: "age".into(),
428            data_type: DataType::String, // self wins; type differs.
429            nullable: false,
430        };
431        let base = SchemaDelta {
432            added_properties: vec![base_add],
433            ..Default::default()
434        };
435        let top = SchemaDelta {
436            added_properties: vec![top_add],
437            ..Default::default()
438        };
439        let merged = top.merge_atop(&base);
440        assert_eq!(merged.added_properties.len(), 1);
441        assert!(matches!(
442            merged.added_properties[0].data_type,
443            DataType::String
444        ));
445        assert!(!merged.added_properties[0].nullable);
446    }
447}