Skip to main content

nodedb_crdt/state/
core.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! CrdtState core: document handle, row CRUD, uniqueness probes.
4
5use std::collections::HashSet;
6
7use loro::{LoroDoc, LoroMap, LoroValue, ValueOrContainer};
8
9use crate::error::{CrdtError, Result};
10use crate::validator::bitemporal::{VALID_UNTIL, VALID_UNTIL_OPEN};
11
12/// A row is live when its `_ts_valid_until` field is absent, null, or the
13/// open sentinel (`i64::MAX`). Rows with any finite `_ts_valid_until` are
14/// treated as superseded, independent of wall-clock time — the write path
15/// sets finite `_ts_valid_until` only when explicitly terminating a version.
16fn row_is_live(row: &LoroMap) -> bool {
17    match row.get(VALID_UNTIL) {
18        None => true,
19        Some(ValueOrContainer::Value(LoroValue::Null)) => true,
20        Some(ValueOrContainer::Value(LoroValue::I64(n))) => n == VALID_UNTIL_OPEN,
21        _ => true,
22    }
23}
24
25/// A CRDT state for a single tenant/namespace.
26pub struct CrdtState {
27    pub(super) doc: LoroDoc,
28    pub(super) peer_id: u64,
29    /// Array surrogate IDs that are considered "live" referents for
30    /// `BiTemporalFK` / `ForeignKey` constraint checks. Populated by the
31    /// caller from the array catalog before validation. Empty by default,
32    /// which means array-surrogate references are invisible to the constraint
33    /// checker — callers must register them for cross-engine FK validation.
34    array_surrogate_ids: HashSet<String>,
35}
36
37impl CrdtState {
38    /// Create a new empty state for the given peer.
39    pub fn new(peer_id: u64) -> Result<Self> {
40        let doc = LoroDoc::new();
41        doc.set_peer_id(peer_id)
42            .map_err(|e| CrdtError::Loro(format!("failed to set peer_id {peer_id}: {e}")))?;
43        Ok(Self {
44            doc,
45            peer_id,
46            array_surrogate_ids: HashSet::new(),
47        })
48    }
49
50    /// Register an array-engine surrogate ID as a valid referent for FK checks.
51    ///
52    /// Call this before running constraint validation whenever the referent
53    /// collection is backed by the array engine rather than the document/graph
54    /// engines. The ID must be the string form of the surrogate (e.g., the
55    /// decimal representation of the `Surrogate` value or the composite key
56    /// used by the array catalog).
57    pub fn register_array_surrogate(&mut self, id: String) {
58        self.array_surrogate_ids.insert(id);
59    }
60
61    /// Insert or update a row in a collection.
62    pub fn upsert(
63        &self,
64        collection: &str,
65        row_id: &str,
66        fields: &[(&str, LoroValue)],
67    ) -> Result<()> {
68        let coll = self.doc.get_map(collection);
69        let row_container = coll
70            .insert_container(row_id, LoroMap::new())
71            .map_err(|e| CrdtError::Loro(e.to_string()))?;
72        for (field, value) in fields {
73            row_container
74                .insert(field, value.clone())
75                .map_err(|e| CrdtError::Loro(e.to_string()))?;
76        }
77        Ok(())
78    }
79
80    /// Delete a row from a collection.
81    pub fn delete(&self, collection: &str, row_id: &str) -> Result<()> {
82        let coll = self.doc.get_map(collection);
83        coll.delete(row_id)
84            .map_err(|e| CrdtError::Loro(e.to_string()))?;
85        Ok(())
86    }
87
88    /// Delete all rows in a collection. Returns the number of rows deleted.
89    pub fn clear_collection(&self, collection: &str) -> Result<usize> {
90        let coll = self.doc.get_map(collection);
91        let keys: Vec<String> = coll.keys().map(|k| k.to_string()).collect();
92        let count = keys.len();
93        for key in &keys {
94            coll.delete(key)
95                .map_err(|e| CrdtError::Loro(e.to_string()))?;
96        }
97        Ok(count)
98    }
99
100    /// Read a single row's fields as a `LoroValue::Map`.
101    ///
102    /// Navigates via `LoroMap::get()` to avoid the expensive recursive
103    /// `get_deep_value()` clone on the entire row container.
104    pub fn read_row(&self, collection: &str, row_id: &str) -> Option<LoroValue> {
105        let coll = self.doc.get_map(collection);
106        match coll.get(row_id)? {
107            ValueOrContainer::Container(loro::Container::Map(m)) => Some(m.get_value()),
108            ValueOrContainer::Container(loro::Container::List(l)) => Some(l.get_value()),
109            ValueOrContainer::Container(_) => Some(LoroValue::Null),
110            ValueOrContainer::Value(v) => Some(v),
111        }
112    }
113
114    /// Read a single field from a row without cloning the entire row.
115    ///
116    /// This is the fast path for KV-style access where only one field
117    /// is needed. Avoids allocating a full Map for single-field reads.
118    ///
119    /// Shares the same `doc.get_map(collection).get(row_id)` lookup pattern
120    /// as `read_row`, but returns a single field value instead of the whole
121    /// row map — different return granularity, intentionally kept separate.
122    pub fn read_field(&self, collection: &str, row_id: &str, field: &str) -> Option<LoroValue> {
123        let coll = self.doc.get_map(collection);
124        let row_map = match coll.get(row_id)? {
125            ValueOrContainer::Container(loro::Container::Map(m)) => m,
126            ValueOrContainer::Value(v) => return Some(v),
127            _ => return None,
128        };
129        match row_map.get(field)? {
130            ValueOrContainer::Value(v) => Some(v),
131            ValueOrContainer::Container(loro::Container::Map(m)) => Some(m.get_value()),
132            ValueOrContainer::Container(loro::Container::List(l)) => Some(l.get_value()),
133            ValueOrContainer::Container(_) => Some(LoroValue::Null),
134        }
135    }
136
137    /// Check if a row exists in a collection.
138    ///
139    /// Checks the Loro-backed document/graph collections first. If not found
140    /// there, falls back to the registered array surrogate set so that
141    /// `BiTemporalFK` and `ForeignKey` constraints can reference array-engine
142    /// cells as valid referents without requiring a full cross-engine query.
143    pub fn row_exists(&self, collection: &str, row_id: &str) -> bool {
144        let coll = self.doc.get_map(collection);
145        if coll.get(row_id).is_some() {
146            return true;
147        }
148        self.array_surrogate_ids.contains(row_id)
149    }
150
151    /// List all collection names (top-level map keys in the Loro doc).
152    pub fn collection_names(&self) -> Vec<String> {
153        let root = self.doc.get_deep_value();
154        match root {
155            LoroValue::Map(map) => map.keys().map(|k| k.to_string()).collect(),
156            _ => Vec::new(),
157        }
158    }
159
160    /// Get all row IDs in a collection.
161    pub fn row_ids(&self, collection: &str) -> Vec<String> {
162        let coll = self.doc.get_map(collection);
163        coll.keys().map(|k| k.to_string()).collect()
164    }
165
166    /// Check if a value exists for the given field across all rows in a collection.
167    /// Used for UNIQUE constraint checking.
168    pub fn field_value_exists(&self, collection: &str, field: &str, value: &LoroValue) -> bool {
169        let coll = self.doc.get_map(collection);
170        for key in coll.keys() {
171            let path = format!("{collection}/{key}/{field}");
172            if let Some(voc) = self.doc.get_by_str_path(&path) {
173                let field_val = match voc {
174                    ValueOrContainer::Value(v) => v,
175                    ValueOrContainer::Container(_) => {
176                        continue;
177                    }
178                };
179                if &field_val == value {
180                    return true;
181                }
182            }
183        }
184        false
185    }
186
187    /// Bitemporal variant of [`field_value_exists`]: only considers rows
188    /// whose `_ts_valid_until` is open (absent or `i64::MAX`).
189    ///
190    /// A UNIQUE collision between a superseded version and a new live row
191    /// is not a violation — both may share the same value because they
192    /// represent the same logical entity at different valid-times.
193    pub fn field_value_exists_live(
194        &self,
195        collection: &str,
196        field: &str,
197        value: &LoroValue,
198    ) -> bool {
199        let coll = self.doc.get_map(collection);
200        for key in coll.keys() {
201            let row_map = match coll.get(&key) {
202                Some(ValueOrContainer::Container(loro::Container::Map(m))) => m,
203                _ => continue,
204            };
205            if !row_is_live(&row_map) {
206                continue;
207            }
208            let field_val = match row_map.get(field) {
209                Some(ValueOrContainer::Value(v)) => v,
210                _ => continue,
211            };
212            if &field_val == value {
213                return true;
214            }
215        }
216        false
217    }
218
219    /// Return row IDs currently "live" in a bitemporal collection
220    /// (rows whose `_ts_valid_until` is open). For non-bitemporal
221    /// collections every row is returned.
222    pub fn live_row_ids(&self, collection: &str) -> Vec<String> {
223        let coll = self.doc.get_map(collection);
224        let mut out = Vec::new();
225        for key in coll.keys() {
226            let row_map = match coll.get(&key) {
227                Some(ValueOrContainer::Container(loro::Container::Map(m))) => m,
228                _ => continue,
229            };
230            if row_is_live(&row_map) {
231                out.push(key.to_string());
232            }
233        }
234        out
235    }
236
237    /// Get the underlying LoroDoc for advanced operations.
238    pub fn doc(&self) -> &LoroDoc {
239        &self.doc
240    }
241
242    /// Peer ID of this state.
243    pub fn peer_id(&self) -> u64 {
244        self.peer_id
245    }
246}