Skip to main content

selene_graph/
mutator.rs

1//! Typed mutation funnel per spec 03 section 4.3.
2
3mod assignment;
4mod catalog;
5mod composite_property_index;
6mod delete;
7mod delete_set;
8mod factory_reset;
9mod property_index;
10mod remove;
11mod text_index;
12mod vector_index;
13
14use std::sync::Arc;
15
16use roaring::RoaringBitmap;
17use selene_core::{
18    Change, DbString, EdgeId, GraphId, LabelDiff, LabelSet, NodeId, PropertyDiff, PropertyMap,
19    SchemaChange,
20};
21
22use crate::adjacency::AdjacencyEdge;
23use crate::error::{GraphError, GraphResult};
24use crate::graph_types::{GraphTypeDef, PropertyTypeDef};
25use crate::index_provider::{IndexProvider, ProviderTag};
26use crate::store::RowIndex;
27use crate::type_validator::{EntityId, TypeViolation};
28use crate::write_txn::WriteTxn;
29
30/// Borrowed mutation builder for one write transaction.
31pub struct Mutator<'tx, 'g> {
32    txn: &'tx mut WriteTxn<'g>,
33}
34
35impl<'tx, 'g> Mutator<'tx, 'g> {
36    pub(crate) fn new(txn: &'tx mut WriteTxn<'g>) -> Self {
37        Self { txn }
38    }
39
40    /// Create a node, emit `Change::NodeCreated`, and return its ID.
41    ///
42    /// # Errors
43    ///
44    /// Returns [`GraphError::RowSpaceExhausted`] when the dense row store fills
45    /// the v1 row-index range (max 2^32 rows).
46    pub fn create_node(&mut self, labels: LabelSet, mut props: PropertyMap) -> GraphResult<NodeId> {
47        fill_node_defaults(self.txn.read(), &labels, &mut props)?;
48        assignment::coerce_node_properties(self.txn.read(), &labels, &mut props)?;
49        let id = self.txn.allocator.allocate_node();
50        {
51            let graph = self.txn.guard_mut();
52            // BRIEF-Item-4c: append at the dense end (row = current row count)
53            // instead of `id - 1` arithmetic. After 4b compaction the monotonic
54            // high-water id far exceeds the dense row count, so an arith row would
55            // re-pad exactly the holes compaction reclaimed; append keeps the store
56            // dense and never resurrects a reclaimed slot. The u32 row-space cap
57            // therefore moves from the id value to the row count.
58            let row = u32::try_from(graph.node_store.len())
59                .ok()
60                // u32::MAX is reserved as RowIndex::TOMBSTONE; the last real row
61                // is u32::MAX - 1, so a live row never aliases the sentinel.
62                .filter(|&row| row != u32::MAX)
63                .ok_or(GraphError::RowSpaceExhausted {
64                    kind: "node",
65                    rows: graph.node_store.len() as u64,
66                    max_rows: u32::MAX as u64,
67                })?;
68            // BRIEF-153 fix-cycle C2: run property-index admission BEFORE
69            // mutating row state so a cap-exhaustion error rolls back
70            // cleanly with no half-written row. Index updates only touch
71            // their own maps; node_store stays untouched if any admission
72            // fails. The txn boundary publishes everything atomically on
73            // commit, so a partial-index publication is impossible.
74            crate::property_index::apply_node_create(
75                &mut graph.property_index,
76                &labels,
77                &props,
78                row,
79            )?;
80            crate::composite_property_index::apply_node_create(
81                &mut graph.composite_property_index,
82                &labels,
83                &props,
84                row,
85            )?;
86            crate::vector_index::apply_node_create(&mut graph.vector_index, &labels, &props, row)?;
87            crate::text_index::apply_node_create(&mut graph.text_index, &labels, &props, row, id);
88            graph.node_store.labels.push(labels.clone());
89            graph.node_store.properties.push(props.clone());
90            graph.node_store.row_to_id.push(id);
91            graph.node_store.alive_mut().insert(row);
92            // BRIEF-Item-4a: bind the external id to its row in both directions.
93            // The live commit path never re-runs `rebuild_id_maps`, so the
94            // `id -> row` map must be populated here. The row is remappable once
95            // 4b compaction renumbers rows under stable ids.
96            graph.node_id_to_row.insert(id, RowIndex::new(row));
97            insert_node_labels(&mut graph.idx_label, row, &labels);
98        }
99        self.txn.changes.push(Change::NodeCreated {
100            id,
101            labels,
102            properties: props,
103        });
104        Ok(id)
105    }
106
107    /// Create an edge between two alive nodes.
108    pub fn create_edge(
109        &mut self,
110        label: DbString,
111        source: NodeId,
112        target: NodeId,
113        mut props: PropertyMap,
114    ) -> GraphResult<EdgeId> {
115        self.require_live_node(source)?;
116        self.require_live_node(target)?;
117        fill_edge_defaults(self.txn.read(), label.clone(), source, target, &mut props)?;
118        assignment::coerce_edge_properties(
119            self.txn.read(),
120            label.clone(),
121            source,
122            target,
123            &mut props,
124        )?;
125        let id = self.txn.allocator.allocate_edge();
126        {
127            let graph = self.txn.guard_mut();
128            // BRIEF-Item-4c: append at the dense end (see create_node).
129            let row = u32::try_from(graph.edge_store.len())
130                .ok()
131                .filter(|&row| row != u32::MAX) // u32::MAX is RowIndex::TOMBSTONE
132                .ok_or(GraphError::RowSpaceExhausted {
133                    kind: "edge",
134                    rows: graph.edge_store.len() as u64,
135                    max_rows: u32::MAX as u64,
136                })?;
137            crate::property_index::apply_edge_create(
138                &mut graph.edge_property_index,
139                &label,
140                &props,
141                row,
142            )?;
143            graph.edge_store.label.push(label.clone());
144            graph.edge_store.source.push(source);
145            graph.edge_store.target.push(target);
146            graph.edge_store.properties.push(props.clone());
147            graph.edge_store.row_to_id.push(id);
148            graph.edge_store.alive_mut().insert(row);
149            // BRIEF-Item-4a: bind the external edge id to its row (live path).
150            graph.edge_id_to_row.insert(id, RowIndex::new(row));
151            insert_index_row(&mut graph.idx_edge_label, label.clone(), row);
152
153            graph
154                .adjacency_out
155                .entry(source)
156                .or_default()
157                .add(AdjacencyEdge {
158                    label: label.clone(),
159                    neighbor: target,
160                    edge_id: id,
161                });
162            graph
163                .adjacency_in
164                .entry(target)
165                .or_default()
166                .add(AdjacencyEdge {
167                    label: label.clone(),
168                    neighbor: source,
169                    edge_id: id,
170                });
171        }
172        self.txn.changes.push(Change::EdgeCreated {
173            id,
174            label,
175            source,
176            target,
177            properties: props,
178        });
179        Ok(id)
180    }
181
182    /// Update an alive node and emit `Change::NodeUpdated`.
183    pub fn update_node(
184        &mut self,
185        id: NodeId,
186        labels_diff: LabelDiff,
187        mut props_diff: PropertyDiff,
188    ) -> GraphResult<()> {
189        let row = self.require_live_node(id)?;
190
191        // Compute the new label set without mutating the working graph yet.
192        let old_labels = self
193            .txn
194            .read()
195            .node_store
196            .labels
197            .get(row)
198            .cloned()
199            .unwrap_or_default();
200        let mut labels = old_labels.clone();
201        for label in labels_diff.added.iter().cloned() {
202            labels.insert(label);
203        }
204        for label in labels_diff.removed.iter() {
205            labels.remove(label);
206        }
207        reject_immutable_node_update(self.txn.read(), id, &old_labels, &props_diff)?;
208        reject_immutable_node_update(self.txn.read(), id, &labels, &props_diff)?;
209        assignment::coerce_node_property_diff(self.txn.read(), &labels, &mut props_diff)?;
210
211        // Apply the property diff up front; if it errors we leave the working
212        // graph (including idx_label) untouched so the transaction can still
213        // be safely rolled back or aborted without leaking inconsistent state.
214        let old_props = self
215            .txn
216            .read()
217            .node_store
218            .properties
219            .get(row)
220            .cloned()
221            .unwrap_or_default();
222        let mut props = old_props.clone();
223        apply_property_diff(&mut props, &props_diff)?;
224
225        // BRIEF-153 fix-cycle C2: run property-index admission BEFORE
226        // mutating row state so a cap-exhaustion error rolls back cleanly
227        // with no half-written row. The index updates borrow `labels`/`props`;
228        // both move into the row stores only after every fallible step.
229        {
230            let graph = self.txn.guard_mut();
231            crate::property_index::apply_node_update(
232                &mut graph.property_index,
233                &old_labels,
234                &old_props,
235                &labels,
236                &props,
237                row as u32,
238            )?;
239            crate::composite_property_index::apply_node_update(
240                &mut graph.composite_property_index,
241                &old_labels,
242                &old_props,
243                &labels,
244                &props,
245                row as u32,
246            )?;
247            crate::vector_index::apply_node_update(
248                &mut graph.vector_index,
249                &old_labels,
250                &old_props,
251                &labels,
252                &props,
253                row as u32,
254            )?;
255            crate::text_index::apply_node_update(
256                &mut graph.text_index,
257                &old_labels,
258                &old_props,
259                &labels,
260                &props,
261                row as u32,
262                id,
263            );
264            graph.node_store.labels.set(row, labels);
265            graph.node_store.properties.set(row, props);
266            for label in labels_diff.added.iter().cloned() {
267                insert_index_row(&mut graph.idx_label, label, row as u32);
268            }
269            for label in labels_diff.removed.iter() {
270                remove_index_row(&mut graph.idx_label, label, row as u32);
271            }
272        }
273
274        self.txn.changes.push(Change::NodeUpdated {
275            id,
276            labels_diff,
277            properties_diff: props_diff,
278        });
279        Ok(())
280    }
281
282    /// Update an alive edge and emit `Change::EdgeUpdated`.
283    ///
284    /// Edge labels are immutable, so property updates do not touch
285    /// `idx_edge_label`.
286    pub fn update_edge(&mut self, id: EdgeId, mut props_diff: PropertyDiff) -> GraphResult<()> {
287        let row = self.require_live_edge(id)?;
288        reject_immutable_edge_update(self.txn.read(), id, &props_diff)?;
289        assignment::coerce_edge_property_diff(self.txn.read(), id, &mut props_diff)?;
290        let label = self
291            .txn
292            .read()
293            .edge_store
294            .label
295            .get(row)
296            .cloned()
297            .ok_or(GraphError::EdgeNotFound { id })?;
298        let old_props = self
299            .txn
300            .read()
301            .edge_store
302            .properties
303            .get(row)
304            .cloned()
305            .unwrap_or_default();
306        let mut props = old_props.clone();
307        apply_property_diff(&mut props, &props_diff)?;
308        {
309            let graph = self.txn.guard_mut();
310            graph.edge_store.properties.set(row, props.clone());
311            crate::property_index::apply_edge_update(
312                &mut graph.edge_property_index,
313                &label,
314                &old_props,
315                &props,
316                row as u32,
317            )?;
318        }
319        self.txn.changes.push(Change::EdgeUpdated {
320            id,
321            properties_diff: props_diff,
322        });
323        Ok(())
324    }
325
326    /// Append a raw [`SchemaChange`] WAL payload through the write funnel.
327    ///
328    /// This is a pass-through accumulator: catalog graph mutation and
329    /// closed-graph validation are handled by higher-level validation layers
330    /// (the typed catalog DDL methods on this `Mutator` — e.g. `create_node_type`
331    /// — call those layers and then funnel here).
332    ///
333    /// Why: this is the single, canonical funnel entry for a `SchemaChanged`
334    /// change record (hard rule 11 — every mutation routes through the one
335    /// `Mutator`). It is intentionally retained as a `pub` funnel surface even
336    /// though no GQL caller reaches it directly today: the catalog DDL methods
337    /// are the production producers, and keeping the low-level entry public means
338    /// any future schema-event producer routes through the same funnel rather
339    /// than re-implementing the write path. Tests and benches drive it directly
340    /// to exercise the raw funnel without the DDL validation layer on top.
341    pub fn schema_change(&mut self, graph: GraphId, change: SchemaChange) {
342        self.txn
343            .changes
344            .push(Change::SchemaChanged { graph, change });
345    }
346
347    /// Look up a registered index provider through the held write transaction.
348    #[must_use]
349    pub fn index_provider_by_tag(&self, tag: ProviderTag) -> Option<Arc<dyn IndexProvider>> {
350        self.txn
351            .providers
352            .iter()
353            .find(|provider| provider.provider_tag() == tag)
354            .map(Arc::clone)
355    }
356
357    /// Borrow the transaction-local working graph.
358    #[must_use]
359    pub fn read(&self) -> &crate::SeleneGraph {
360        self.txn.read()
361    }
362
363    fn require_live_node(&self, id: NodeId) -> GraphResult<usize> {
364        let graph = self.txn.read();
365        // Map-backed: a never-committed (aborted-tx hole) id is absent from the
366        // map -> NotFound. A deleted id stays mapped to its dead row -> NotAlive.
367        let row = graph
368            .row_for_node_id(id)
369            .ok_or(GraphError::NodeNotFound { id })?
370            .get();
371        if row as usize >= graph.node_store.len() {
372            return Err(GraphError::NodeNotFound { id });
373        }
374        if !graph.node_store.is_alive(row) {
375            return Err(GraphError::NodeNotAlive { id });
376        }
377        Ok(row as usize)
378    }
379
380    fn require_live_edge(&self, id: EdgeId) -> GraphResult<usize> {
381        let graph = self.txn.read();
382        let row = graph
383            .row_for_edge_id(id)
384            .ok_or(GraphError::EdgeNotFound { id })?
385            .get();
386        if row as usize >= graph.edge_store.len() {
387            return Err(GraphError::EdgeNotFound { id });
388        }
389        if !graph.edge_store.is_alive(row) {
390            return Err(GraphError::EdgeNotAlive { id });
391        }
392        Ok(row as usize)
393    }
394}
395
396fn insert_node_labels(
397    index: &mut imbl::HashMap<DbString, RoaringBitmap>,
398    row: u32,
399    labels: &LabelSet,
400) {
401    for label in labels.iter().cloned() {
402        insert_index_row(index, label, row);
403    }
404}
405
406fn remove_node_labels(
407    index: &mut imbl::HashMap<DbString, RoaringBitmap>,
408    row: u32,
409    labels: &LabelSet,
410) {
411    for label in labels.iter() {
412        remove_index_row(index, label, row);
413    }
414}
415
416fn insert_index_row(index: &mut imbl::HashMap<DbString, RoaringBitmap>, label: DbString, row: u32) {
417    // In-place insert via `entry().or_default()`: the rebuild path uses the same
418    // idiom (see `consistency.rs` / `typed_index.rs`). `guard_mut` already gives
419    // unique ownership of the bitmap (Arc::make_mut), so we never clone the whole
420    // RoaringBitmap per label per node — bulk-loading one label is O(N), not O(N²).
421    index.entry(label).or_default().insert(row);
422}
423
424fn remove_index_row(
425    index: &mut imbl::HashMap<DbString, RoaringBitmap>,
426    label: &DbString,
427    row: u32,
428) {
429    // Mirror `insert_index_row`: mutate the bitmap behind the imbl entry
430    // instead of cloning the whole RoaringBitmap for every row removed.
431    let now_empty = match index.get_mut(label) {
432        Some(bitmap) => {
433            bitmap.remove(row);
434            bitmap.is_empty()
435        }
436        None => false,
437    };
438    if now_empty {
439        index.remove(label);
440    }
441}
442
443fn fill_node_defaults(
444    graph: &crate::SeleneGraph,
445    labels: &LabelSet,
446    props: &mut PropertyMap,
447) -> GraphResult<()> {
448    let Some(graph_type) = graph.meta.bound_type.as_deref() else {
449        return Ok(());
450    };
451    let Some(node_type) = graph_type.find_node_type(labels) else {
452        return Ok(());
453    };
454    fill_property_defaults(&node_type.properties, props)
455}
456
457fn fill_edge_defaults(
458    graph: &crate::SeleneGraph,
459    label: DbString,
460    source: NodeId,
461    target: NodeId,
462    props: &mut PropertyMap,
463) -> GraphResult<()> {
464    let Some(graph_type) = graph.meta.bound_type.as_deref() else {
465        return Ok(());
466    };
467    let Some(source_type) = node_type_index_for_node(graph, graph_type, source) else {
468        return Ok(());
469    };
470    let Some(target_type) = node_type_index_for_node(graph, graph_type, target) else {
471        return Ok(());
472    };
473    let Some(edge_type) = graph_type.find_edge_type(label, source_type, target_type) else {
474        return Ok(());
475    };
476    fill_property_defaults(&edge_type.properties, props)
477}
478
479fn fill_property_defaults(
480    declarations: &[PropertyTypeDef],
481    props: &mut PropertyMap,
482) -> GraphResult<()> {
483    for declaration in declarations {
484        if props.contains_key(&declaration.name) {
485            continue;
486        }
487        if let Some(default) = &declaration.default {
488            props.set(declaration.name.clone(), default.to_value()?)?;
489        }
490    }
491    Ok(())
492}
493
494fn reject_immutable_node_update(
495    graph: &crate::SeleneGraph,
496    id: NodeId,
497    labels: &LabelSet,
498    diff: &PropertyDiff,
499) -> GraphResult<()> {
500    let Some(graph_type) = graph.meta.bound_type.as_deref() else {
501        return Ok(());
502    };
503    let Some(node_type) = graph_type.find_node_type(labels) else {
504        return Ok(());
505    };
506    reject_immutable_property_update(
507        EntityId::Node(id),
508        node_type.name.clone(),
509        &node_type.properties,
510        diff,
511    )
512}
513
514fn reject_immutable_edge_update(
515    graph: &crate::SeleneGraph,
516    id: EdgeId,
517    diff: &PropertyDiff,
518) -> GraphResult<()> {
519    let Some(graph_type) = graph.meta.bound_type.as_deref() else {
520        return Ok(());
521    };
522    let Some(label) = graph.edge_label(id).cloned() else {
523        return Ok(());
524    };
525    let Some((source, target)) = graph.edge_endpoints(id) else {
526        return Ok(());
527    };
528    let Some(source_type) = node_type_index_for_node(graph, graph_type, source) else {
529        return Ok(());
530    };
531    let Some(target_type) = node_type_index_for_node(graph, graph_type, target) else {
532        return Ok(());
533    };
534    let Some(edge_type) = graph_type.find_edge_type(label, source_type, target_type) else {
535        return Ok(());
536    };
537    reject_immutable_property_update(
538        EntityId::Edge(id),
539        edge_type.name.clone(),
540        &edge_type.properties,
541        diff,
542    )
543}
544
545fn node_type_index_for_node(
546    graph: &crate::SeleneGraph,
547    graph_type: &GraphTypeDef,
548    node: NodeId,
549) -> Option<u32> {
550    let labels = graph.node_labels(node)?;
551    graph_type.find_node_type_index(labels)
552}
553
554fn reject_immutable_property_update(
555    entity_id: EntityId,
556    declared_in: DbString,
557    declarations: &[PropertyTypeDef],
558    diff: &PropertyDiff,
559) -> GraphResult<()> {
560    for (key, _) in &diff.set {
561        reject_if_immutable(entity_id, declared_in.clone(), declarations, key.clone())?;
562    }
563    for key in &diff.removed {
564        reject_if_immutable(entity_id, declared_in.clone(), declarations, key.clone())?;
565    }
566    Ok(())
567}
568
569fn reject_if_immutable(
570    entity_id: EntityId,
571    declared_in: DbString,
572    declarations: &[PropertyTypeDef],
573    property: DbString,
574) -> GraphResult<()> {
575    if declarations
576        .iter()
577        .any(|declaration| declaration.name == property && declaration.immutable)
578    {
579        return Err(GraphError::TypeViolation(
580            TypeViolation::ImmutablePropertyUpdate {
581                entity_id,
582                property,
583                declared_in,
584            },
585        ));
586    }
587    Ok(())
588}
589
590fn apply_property_diff(map: &mut PropertyMap, diff: &PropertyDiff) -> GraphResult<()> {
591    for (key, value) in diff.set.iter() {
592        map.set(key.clone(), value.clone())?;
593    }
594    for key in diff.removed.iter() {
595        map.remove(key);
596    }
597    Ok(())
598}
599
600#[cfg(test)]
601mod tests;
602
603#[cfg(test)]
604mod id_map_tests;
605
606#[cfg(test)]
607mod row_cap_tests;
608
609#[cfg(test)]
610mod truncate_tests;
611
612#[cfg(test)]
613mod factory_reset_tests;
614
615#[cfg(test)]
616mod hub_delete_tests;
617
618#[cfg(test)]
619mod delete_set_tests;
620
621#[cfg(test)]
622mod payload_clear_tests;