Skip to main content

uni_common/core/
fork.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Type definitions for the fork feature.
5//!
6//! `Fork` = a named, durable, isolated branch of the graph. Each fork is
7//! backed by one Lance branch per dataset (vertex, edge-delta, adjacency).
8//! These types are persisted to `catalog/fork_registry.json` and
9//! `catalog/fork_schemas/{fork_id}.json`. Their lifecycle is governed by
10//! 2PC state machines.
11//!
12//! `SchemaDelta` is wired through Phase 1 with all instances empty —
13//! the merge infrastructure exists so Phase 2's on-the-fly label
14//! creation has a populated path to land into.
15
16// Rust guideline compliant
17
18use std::collections::BTreeMap;
19
20use chrono::{DateTime, Utc};
21use serde::{Deserialize, Serialize};
22use ulid::Ulid;
23
24use crate::core::schema::{DataType, EdgeTypeMeta, LabelMeta};
25
26/// Stable identifier for a fork. Display format is base32 ULID.
27///
28/// Newtype around [`ulid::Ulid`]; preserves time-ordering across
29/// processes and avoids the random-distribution drawbacks of UUIDv4.
30#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Ord, PartialOrd, Serialize, Deserialize)]
31#[serde(transparent)]
32pub struct ForkId(pub Ulid);
33
34impl ForkId {
35    /// Allocate a fresh ForkId using the system clock.
36    #[must_use]
37    pub fn new() -> Self {
38        Self(Ulid::new())
39    }
40
41    /// Parse a ForkId from its canonical string form.
42    ///
43    /// # Errors
44    ///
45    /// Returns an error if `s` is not a valid 26-character base32 ULID.
46    pub fn parse(s: &str) -> Result<Self, ulid::DecodeError> {
47        Ulid::from_string(s).map(Self)
48    }
49}
50
51impl Default for ForkId {
52    fn default() -> Self {
53        Self::new()
54    }
55}
56
57impl std::fmt::Display for ForkId {
58    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59        self.0.fmt(f)
60    }
61}
62
63/// Lifecycle status of a fork in the registry.
64///
65/// State machine: `Pending` → `Active` (create commit point); `Active` →
66/// `Tombstoned` → removed (drop commit point). Recovery resumes any
67/// non-`Active` state.
68#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
69#[serde(rename_all = "lowercase")]
70#[non_exhaustive]
71pub enum ForkStatus {
72    /// Registry entry persisted; some Lance branches not yet created.
73    Pending,
74    /// All Lance branches present; fork is reachable via `session.fork`.
75    Active,
76    /// Drop initiated; recovery will finish removing branches.
77    Tombstoned,
78}
79
80/// Metadata for a single fork.
81///
82/// One [`ForkInfo`] per fork in `catalog/fork_registry.json`. The
83/// `datasets` map is filled in step 4 of the create 2PC and is
84/// authoritative for which Lance branches the fork owns.
85#[derive(Clone, Debug, Serialize, Deserialize)]
86pub struct ForkInfo {
87    /// Stable identifier; persists across rename if rename ever lands.
88    pub id: ForkId,
89
90    /// Human-readable name (unique per database).
91    pub name: String,
92
93    /// Parent fork in a nested-fork chain. `None` ⇒ parent is primary.
94    /// Phase 1 always sets `None`.
95    #[serde(default)]
96    pub parent_fork_id: Option<ForkId>,
97
98    /// Snapshot id of primary at the moment the fork was created.
99    pub parent_snapshot_id: String,
100
101    /// Wall-clock UTC at fork creation.
102    pub created_at: DateTime<Utc>,
103
104    /// Wall-clock TTL expiry. `None` ⇒ never expires. Phase 1 always
105    /// `None`; the sweeper lands in Phase 4.
106    #[serde(default)]
107    pub ttl_expires_at: Option<DateTime<Utc>>,
108
109    /// Schema version (`Schema::schema_version`) at fork creation.
110    /// Captured day-one even though only Phase 7's schema-evolution
111    /// spike consumes it; backfilling later is impossible.
112    pub schema_version_at_creation: u32,
113
114    /// Map of `dataset_name` → `branch_name` for every Lance dataset
115    /// this fork owns. Branch names live under the dataset's `tree/`
116    /// directory in Lance's on-disk layout.
117    pub datasets: BTreeMap<String, String>,
118
119    /// Lifecycle state. See [`ForkStatus`].
120    pub status: ForkStatus,
121}
122
123impl ForkInfo {
124    /// Convenience: build a `Pending` info ready for create 2PC step 2.
125    #[must_use]
126    pub fn new_pending(
127        id: ForkId,
128        name: impl Into<String>,
129        parent_snapshot_id: impl Into<String>,
130        schema_version: u32,
131    ) -> Self {
132        Self {
133            id,
134            name: name.into(),
135            parent_fork_id: None,
136            parent_snapshot_id: parent_snapshot_id.into(),
137            created_at: Utc::now(),
138            ttl_expires_at: None,
139            schema_version_at_creation: schema_version,
140            datasets: BTreeMap::new(),
141            status: ForkStatus::Pending,
142        }
143    }
144}
145
146/// Adds a property to an existing label or edge type via the fork
147/// schema overlay. Phase 1 has no producer; Phase 2 fills this when
148/// `tx.execute("CREATE (n:Person {phone: ...})")` introduces a
149/// previously-unknown property on a fork.
150#[derive(Clone, Debug, Serialize, Deserialize)]
151pub struct PropertyAddition {
152    /// Owning label or edge-type name.
153    pub owner: String,
154    /// Whether `owner` is a label or an edge type.
155    pub owner_kind: PropertyOwnerKind,
156    /// New property name.
157    pub property: String,
158    /// Declared type.
159    pub data_type: DataType,
160    /// Whether the new property may be null.
161    pub nullable: bool,
162}
163
164/// Discriminator for [`PropertyAddition::owner_kind`].
165#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
166#[serde(rename_all = "lowercase")]
167pub enum PropertyOwnerKind {
168    /// Property attaches to a vertex label.
169    Label,
170    /// Property attaches to an edge type.
171    EdgeType,
172}
173
174/// Schema additions on top of primary, owned by a single fork.
175///
176/// Only *additions* — renames, drops, and type changes are spec
177/// non-goals (§14). Always read together with primary's schema:
178/// `merged = primary ⊕ delta`. The merge implementation lives in
179/// [`crate::core::schema::SchemaManager::with_overlay`].
180#[derive(Clone, Debug, Default, Serialize, Deserialize)]
181pub struct SchemaDelta {
182    /// Vertex labels new to this fork's schema.
183    #[serde(default)]
184    pub added_labels: Vec<(String, LabelMeta)>,
185
186    /// Edge types new to this fork's schema.
187    #[serde(default)]
188    pub added_edge_types: Vec<(String, EdgeTypeMeta)>,
189
190    /// Properties added to existing labels or edge types.
191    #[serde(default)]
192    pub added_properties: Vec<PropertyAddition>,
193}
194
195impl SchemaDelta {
196    /// Convenience: empty delta (the only valid Phase 1 value).
197    #[must_use]
198    pub fn empty() -> Self {
199        Self::default()
200    }
201
202    /// `true` if the delta contributes nothing on top of primary.
203    #[must_use]
204    pub fn is_empty(&self) -> bool {
205        self.added_labels.is_empty()
206            && self.added_edge_types.is_empty()
207            && self.added_properties.is_empty()
208    }
209
210    /// Compose `self` atop `base`: returns `base ⊕ self`.
211    ///
212    /// Phase 3 (nested forks): the effective schema for a child fork is
213    /// `primary ⊕ parent_overlay ⊕ child_overlay`. This helper folds the
214    /// chain bottom-up so the final result can be merged into primary
215    /// in a single [`crate::core::schema::SchemaManager::with_overlay`]
216    /// call.
217    ///
218    /// Collision policy: `self` wins. A child fork that re-declares the
219    /// same label or edge type as its parent overrides the parent's
220    /// entry. Property additions are deduplicated by `(owner, property)`,
221    /// with `self`'s entry winning.
222    #[must_use]
223    pub fn merge_atop(&self, base: &SchemaDelta) -> SchemaDelta {
224        use std::collections::BTreeMap;
225
226        // Labels: base first, then self overrides.
227        let mut labels: BTreeMap<String, LabelMeta> = BTreeMap::new();
228        for (name, meta) in &base.added_labels {
229            labels.insert(name.clone(), meta.clone());
230        }
231        for (name, meta) in &self.added_labels {
232            labels.insert(name.clone(), meta.clone());
233        }
234
235        let mut edge_types: BTreeMap<String, EdgeTypeMeta> = BTreeMap::new();
236        for (name, meta) in &base.added_edge_types {
237            edge_types.insert(name.clone(), meta.clone());
238        }
239        for (name, meta) in &self.added_edge_types {
240            edge_types.insert(name.clone(), meta.clone());
241        }
242
243        let mut properties: BTreeMap<(String, String), PropertyAddition> = BTreeMap::new();
244        for add in &base.added_properties {
245            properties.insert((add.owner.clone(), add.property.clone()), add.clone());
246        }
247        for add in &self.added_properties {
248            properties.insert((add.owner.clone(), add.property.clone()), add.clone());
249        }
250
251        SchemaDelta {
252            added_labels: labels.into_iter().collect(),
253            added_edge_types: edge_types.into_iter().collect(),
254            added_properties: properties.into_values().collect(),
255        }
256    }
257}
258
259/// Top-level on-disk shape of `catalog/fork_registry.json`.
260///
261/// Concurrent updates are serialized at the registry-handle layer in
262/// `uni-store`; this struct is just the wire format.
263#[derive(Clone, Debug, Default, Serialize, Deserialize)]
264pub struct ForkRegistryFile {
265    /// Fork name → metadata.
266    #[serde(default)]
267    pub forks: BTreeMap<String, ForkInfo>,
268}
269
270#[cfg(test)]
271mod tests {
272    use super::*;
273
274    #[test]
275    fn fork_id_roundtrip() {
276        let id = ForkId::new();
277        let s = id.to_string();
278        let parsed = ForkId::parse(&s).unwrap();
279        assert_eq!(id, parsed);
280    }
281
282    #[test]
283    fn fork_info_serde_roundtrip() {
284        let info = ForkInfo::new_pending(ForkId::new(), "scenario_1", "snap-abc", 17);
285        let json = serde_json::to_string(&info).unwrap();
286        let parsed: ForkInfo = serde_json::from_str(&json).unwrap();
287        assert_eq!(parsed.id, info.id);
288        assert_eq!(parsed.name, "scenario_1");
289        assert_eq!(parsed.parent_snapshot_id, "snap-abc");
290        assert_eq!(parsed.schema_version_at_creation, 17);
291        assert_eq!(parsed.status, ForkStatus::Pending);
292        assert!(parsed.datasets.is_empty());
293        assert!(parsed.parent_fork_id.is_none());
294        assert!(parsed.ttl_expires_at.is_none());
295    }
296
297    #[test]
298    fn registry_file_default_empty() {
299        let file = ForkRegistryFile::default();
300        let json = serde_json::to_string(&file).unwrap();
301        let parsed: ForkRegistryFile = serde_json::from_str(&json).unwrap();
302        assert!(parsed.forks.is_empty());
303    }
304
305    #[test]
306    fn schema_delta_default_is_empty() {
307        let d = SchemaDelta::default();
308        assert!(d.is_empty());
309    }
310
311    fn label_meta(id: u16) -> LabelMeta {
312        use crate::core::schema::SchemaElementState;
313        LabelMeta {
314            id,
315            created_at: chrono::Utc::now(),
316            state: SchemaElementState::Active,
317            description: None,
318        }
319    }
320
321    fn edge_type_meta(id: u32) -> EdgeTypeMeta {
322        use crate::core::schema::SchemaElementState;
323        EdgeTypeMeta {
324            id,
325            src_labels: vec!["A".into()],
326            dst_labels: vec!["A".into()],
327            state: SchemaElementState::Active,
328            description: None,
329        }
330    }
331
332    #[test]
333    fn merge_atop_unions_disjoint_labels_and_edge_types() {
334        let base = SchemaDelta {
335            added_labels: vec![("A".into(), label_meta(1))],
336            added_edge_types: vec![("E1".into(), edge_type_meta(10))],
337            ..Default::default()
338        };
339        let top = SchemaDelta {
340            added_labels: vec![("B".into(), label_meta(2))],
341            added_edge_types: vec![("E2".into(), edge_type_meta(20))],
342            ..Default::default()
343        };
344        let merged = top.merge_atop(&base);
345        let label_names: Vec<&str> = merged
346            .added_labels
347            .iter()
348            .map(|(n, _)| n.as_str())
349            .collect();
350        assert!(label_names.contains(&"A") && label_names.contains(&"B"));
351        let edge_names: Vec<&str> = merged
352            .added_edge_types
353            .iter()
354            .map(|(n, _)| n.as_str())
355            .collect();
356        assert!(edge_names.contains(&"E1") && edge_names.contains(&"E2"));
357    }
358
359    #[test]
360    fn merge_atop_self_wins_on_collision() {
361        let base = SchemaDelta {
362            added_labels: vec![("A".into(), label_meta(100))],
363            ..Default::default()
364        };
365        let top = SchemaDelta {
366            added_labels: vec![("A".into(), label_meta(200))],
367            ..Default::default()
368        };
369        let merged = top.merge_atop(&base);
370        assert_eq!(merged.added_labels.len(), 1);
371        assert_eq!(merged.added_labels[0].1.id, 200, "self must win");
372    }
373
374    #[test]
375    fn merge_atop_empty_base_is_self() {
376        let top = SchemaDelta {
377            added_labels: vec![("A".into(), label_meta(1))],
378            ..Default::default()
379        };
380        let merged = top.merge_atop(&SchemaDelta::empty());
381        assert_eq!(merged.added_labels.len(), 1);
382        assert_eq!(merged.added_labels[0].0, "A");
383    }
384
385    #[test]
386    fn merge_atop_empty_self_is_base() {
387        let base = SchemaDelta {
388            added_labels: vec![("A".into(), label_meta(1))],
389            ..Default::default()
390        };
391        let merged = SchemaDelta::empty().merge_atop(&base);
392        assert_eq!(merged.added_labels.len(), 1);
393        assert_eq!(merged.added_labels[0].0, "A");
394    }
395
396    #[test]
397    fn merge_atop_dedupes_properties_by_owner_and_name() {
398        let base_add = PropertyAddition {
399            owner: "Person".into(),
400            owner_kind: PropertyOwnerKind::Label,
401            property: "age".into(),
402            data_type: DataType::Int64,
403            nullable: true,
404        };
405        let top_add = PropertyAddition {
406            owner: "Person".into(),
407            owner_kind: PropertyOwnerKind::Label,
408            property: "age".into(),
409            data_type: DataType::String, // self wins; type differs.
410            nullable: false,
411        };
412        let base = SchemaDelta {
413            added_properties: vec![base_add],
414            ..Default::default()
415        };
416        let top = SchemaDelta {
417            added_properties: vec![top_add],
418            ..Default::default()
419        };
420        let merged = top.merge_atop(&base);
421        assert_eq!(merged.added_properties.len(), 1);
422        assert!(matches!(
423            merged.added_properties[0].data_type,
424            DataType::String
425        ));
426        assert!(!merged.added_properties[0].nullable);
427    }
428}