Skip to main content

selene_graph/
mutator.rs

1//! Typed mutation funnel per spec 03 section 4.3.
2
3mod assignment;
4mod catalog;
5mod composite_property_index;
6mod delete;
7mod delete_set;
8mod factory_reset;
9mod property_index;
10mod remove;
11mod text_index;
12mod vector_index;
13
14use std::sync::Arc;
15
16use roaring::RoaringBitmap;
17use selene_core::{
18    Change, DbString, EdgeId, GraphId, LabelDiff, LabelSet, NodeId, PropertyDiff, PropertyMap,
19    SchemaChange,
20};
21
22use crate::adjacency::AdjacencyEdge;
23use crate::error::{GraphError, GraphResult};
24use crate::graph_types::{GraphTypeDef, PropertyTypeDef};
25use crate::index_provider::{IndexProvider, ProviderTag};
26use crate::store::RowIndex;
27use crate::type_validator::{EntityId, TypeViolation};
28use crate::write_txn::WriteTxn;
29
30/// Borrowed mutation builder for one write transaction.
31pub struct Mutator<'tx, 'g> {
32    txn: &'tx mut WriteTxn<'g>,
33}
34
35impl<'tx, 'g> Mutator<'tx, 'g> {
36    pub(crate) fn new(txn: &'tx mut WriteTxn<'g>) -> Self {
37        Self { txn }
38    }
39
40    /// Create a node, emit `Change::NodeCreated`, and return its ID.
41    ///
42    /// # Errors
43    ///
44    /// Returns [`GraphError::RowSpaceExhausted`] when the dense row store fills
45    /// the v1 row-index range (max 2^32 rows).
46    pub fn create_node(&mut self, labels: LabelSet, mut props: PropertyMap) -> GraphResult<NodeId> {
47        fill_node_defaults(self.txn.read(), &labels, &mut props)?;
48        assignment::coerce_node_properties(self.txn.read(), &labels, &mut props)?;
49        let id = self.txn.allocator.allocate_node();
50        {
51            let graph = self.txn.guard_mut();
52            // BRIEF-Item-4c: append at the dense end (row = current row count)
53            // instead of `id - 1` arithmetic. After 4b compaction the monotonic
54            // high-water id far exceeds the dense row count, so an arith row would
55            // re-pad exactly the holes compaction reclaimed; append keeps the store
56            // dense and never resurrects a reclaimed slot. The u32 row-space cap
57            // therefore moves from the id value to the row count.
58            let row = u32::try_from(graph.node_store.len())
59                .ok()
60                // u32::MAX is reserved as RowIndex::TOMBSTONE; the last real row
61                // is u32::MAX - 1, so a live row never aliases the sentinel.
62                .filter(|&row| row != u32::MAX)
63                .ok_or(GraphError::RowSpaceExhausted {
64                    kind: "node",
65                    rows: graph.node_store.len() as u64,
66                    max_rows: u32::MAX as u64,
67                })?;
68            // BRIEF-153 fix-cycle C2: run property-index admission BEFORE
69            // mutating row state so a cap-exhaustion error rolls back
70            // cleanly with no half-written row. Index updates only touch
71            // their own maps; node_store stays untouched if any admission
72            // fails. The txn boundary publishes everything atomically on
73            // commit, so a partial-index publication is impossible.
74            crate::property_index::apply_node_create(
75                &mut graph.property_index,
76                &labels,
77                &props,
78                row,
79            )?;
80            crate::composite_property_index::apply_node_create(
81                &mut graph.composite_property_index,
82                &labels,
83                &props,
84                row,
85            )?;
86            crate::vector_index::apply_node_create(&mut graph.vector_index, &labels, &props, row)?;
87            crate::text_index::apply_node_create(&mut graph.text_index, &labels, &props, row, id);
88            graph.node_store.labels.push(labels.clone());
89            graph.node_store.properties.push(props.clone());
90            graph.node_store.row_to_id.push(id);
91            graph.node_store.alive_mut().insert(row);
92            // BRIEF-Item-4a: bind the external id to its row in both directions.
93            // The live commit path never re-runs `rebuild_id_maps`, so the
94            // `id -> row` map must be populated here. The row is remappable once
95            // 4b compaction renumbers rows under stable ids.
96            graph.node_id_to_row.insert(id, RowIndex::new(row));
97            insert_node_labels(&mut graph.idx_label, row, &labels);
98        }
99        self.txn.changes.push(Change::NodeCreated {
100            id,
101            labels,
102            properties: props,
103        });
104        Ok(id)
105    }
106
107    /// Create an edge between two alive nodes.
108    pub fn create_edge(
109        &mut self,
110        label: DbString,
111        source: NodeId,
112        target: NodeId,
113        mut props: PropertyMap,
114    ) -> GraphResult<EdgeId> {
115        self.require_live_node(source)?;
116        self.require_live_node(target)?;
117        fill_edge_defaults(self.txn.read(), label.clone(), source, target, &mut props)?;
118        assignment::coerce_edge_properties(
119            self.txn.read(),
120            label.clone(),
121            source,
122            target,
123            &mut props,
124        )?;
125        let id = self.txn.allocator.allocate_edge();
126        {
127            let graph = self.txn.guard_mut();
128            // BRIEF-Item-4c: append at the dense end (see create_node).
129            let row = u32::try_from(graph.edge_store.len())
130                .ok()
131                .filter(|&row| row != u32::MAX) // u32::MAX is RowIndex::TOMBSTONE
132                .ok_or(GraphError::RowSpaceExhausted {
133                    kind: "edge",
134                    rows: graph.edge_store.len() as u64,
135                    max_rows: u32::MAX as u64,
136                })?;
137            graph.edge_store.label.push(label.clone());
138            graph.edge_store.source.push(source);
139            graph.edge_store.target.push(target);
140            graph.edge_store.properties.push(props.clone());
141            graph.edge_store.row_to_id.push(id);
142            graph.edge_store.alive_mut().insert(row);
143            // BRIEF-Item-4a: bind the external edge id to its row (live path).
144            graph.edge_id_to_row.insert(id, RowIndex::new(row));
145            insert_index_row(&mut graph.idx_edge_label, label.clone(), row);
146
147            graph
148                .adjacency_out
149                .entry(source)
150                .or_default()
151                .add(AdjacencyEdge {
152                    label: label.clone(),
153                    neighbor: target,
154                    edge_id: id,
155                });
156            graph
157                .adjacency_in
158                .entry(target)
159                .or_default()
160                .add(AdjacencyEdge {
161                    label: label.clone(),
162                    neighbor: source,
163                    edge_id: id,
164                });
165        }
166        self.txn.changes.push(Change::EdgeCreated {
167            id,
168            label,
169            source,
170            target,
171            properties: props,
172        });
173        Ok(id)
174    }
175
176    /// Update an alive node and emit `Change::NodeUpdated`.
177    pub fn update_node(
178        &mut self,
179        id: NodeId,
180        labels_diff: LabelDiff,
181        mut props_diff: PropertyDiff,
182    ) -> GraphResult<()> {
183        let row = self.require_live_node(id)?;
184
185        // Compute the new label set without mutating the working graph yet.
186        let old_labels = self
187            .txn
188            .read()
189            .node_store
190            .labels
191            .get(row)
192            .cloned()
193            .unwrap_or_default();
194        let mut labels = old_labels.clone();
195        for label in labels_diff.added.iter().cloned() {
196            labels.insert(label);
197        }
198        for label in labels_diff.removed.iter() {
199            labels.remove(label);
200        }
201        reject_immutable_node_update(self.txn.read(), id, &old_labels, &props_diff)?;
202        reject_immutable_node_update(self.txn.read(), id, &labels, &props_diff)?;
203        assignment::coerce_node_property_diff(self.txn.read(), &labels, &mut props_diff)?;
204
205        // Apply the property diff up front; if it errors we leave the working
206        // graph (including idx_label) untouched so the transaction can still
207        // be safely rolled back or aborted without leaking inconsistent state.
208        let old_props = self
209            .txn
210            .read()
211            .node_store
212            .properties
213            .get(row)
214            .cloned()
215            .unwrap_or_default();
216        let mut props = old_props.clone();
217        apply_property_diff(&mut props, &props_diff)?;
218
219        // BRIEF-153 fix-cycle C2: run property-index admission BEFORE
220        // mutating row state so a cap-exhaustion error rolls back cleanly
221        // with no half-written row. The index updates borrow `labels`/`props`;
222        // both move into the row stores only after every fallible step.
223        {
224            let graph = self.txn.guard_mut();
225            crate::property_index::apply_node_update(
226                &mut graph.property_index,
227                &old_labels,
228                &old_props,
229                &labels,
230                &props,
231                row as u32,
232            )?;
233            crate::composite_property_index::apply_node_update(
234                &mut graph.composite_property_index,
235                &old_labels,
236                &old_props,
237                &labels,
238                &props,
239                row as u32,
240            )?;
241            crate::vector_index::apply_node_update(
242                &mut graph.vector_index,
243                &old_labels,
244                &old_props,
245                &labels,
246                &props,
247                row as u32,
248            )?;
249            crate::text_index::apply_node_update(
250                &mut graph.text_index,
251                &old_labels,
252                &old_props,
253                &labels,
254                &props,
255                row as u32,
256                id,
257            );
258            graph.node_store.labels.set(row, labels);
259            graph.node_store.properties.set(row, props);
260            for label in labels_diff.added.iter().cloned() {
261                insert_index_row(&mut graph.idx_label, label, row as u32);
262            }
263            for label in labels_diff.removed.iter() {
264                remove_index_row(&mut graph.idx_label, label, row as u32);
265            }
266        }
267
268        self.txn.changes.push(Change::NodeUpdated {
269            id,
270            labels_diff,
271            properties_diff: props_diff,
272        });
273        Ok(())
274    }
275
276    /// Update an alive edge and emit `Change::EdgeUpdated`.
277    ///
278    /// Edge labels are immutable, so property updates do not touch
279    /// `idx_edge_label`.
280    pub fn update_edge(&mut self, id: EdgeId, mut props_diff: PropertyDiff) -> GraphResult<()> {
281        let row = self.require_live_edge(id)?;
282        reject_immutable_edge_update(self.txn.read(), id, &props_diff)?;
283        assignment::coerce_edge_property_diff(self.txn.read(), id, &mut props_diff)?;
284        let mut props = self
285            .txn
286            .read()
287            .edge_store
288            .properties
289            .get(row)
290            .cloned()
291            .unwrap_or_default();
292        apply_property_diff(&mut props, &props_diff)?;
293        self.txn.guard_mut().edge_store.properties.set(row, props);
294        self.txn.changes.push(Change::EdgeUpdated {
295            id,
296            properties_diff: props_diff,
297        });
298        Ok(())
299    }
300
301    /// Append a raw [`SchemaChange`] WAL payload through the write funnel.
302    ///
303    /// This is a pass-through accumulator: catalog graph mutation and
304    /// closed-graph validation are handled by higher-level validation layers
305    /// (the typed catalog DDL methods on this `Mutator` — e.g. `create_node_type`
306    /// — call those layers and then funnel here).
307    ///
308    /// Why: this is the single, canonical funnel entry for a `SchemaChanged`
309    /// change record (hard rule 11 — every mutation routes through the one
310    /// `Mutator`). It is intentionally retained as a `pub` funnel surface even
311    /// though no GQL caller reaches it directly today: the catalog DDL methods
312    /// are the production producers, and keeping the low-level entry public means
313    /// any future schema-event producer routes through the same funnel rather
314    /// than re-implementing the write path. Tests and benches drive it directly
315    /// to exercise the raw funnel without the DDL validation layer on top.
316    pub fn schema_change(&mut self, graph: GraphId, change: SchemaChange) {
317        self.txn
318            .changes
319            .push(Change::SchemaChanged { graph, change });
320    }
321
322    /// Look up a registered index provider through the held write transaction.
323    #[must_use]
324    pub fn index_provider_by_tag(&self, tag: ProviderTag) -> Option<Arc<dyn IndexProvider>> {
325        self.txn
326            .providers
327            .iter()
328            .find(|provider| provider.provider_tag() == tag)
329            .map(Arc::clone)
330    }
331
332    /// Borrow the transaction-local working graph.
333    #[must_use]
334    pub fn read(&self) -> &crate::SeleneGraph {
335        self.txn.read()
336    }
337
338    fn require_live_node(&self, id: NodeId) -> GraphResult<usize> {
339        let graph = self.txn.read();
340        // Map-backed: a never-committed (aborted-tx hole) id is absent from the
341        // map -> NotFound. A deleted id stays mapped to its dead row -> NotAlive.
342        let row = graph
343            .row_for_node_id(id)
344            .ok_or(GraphError::NodeNotFound { id })?
345            .get();
346        if row as usize >= graph.node_store.len() {
347            return Err(GraphError::NodeNotFound { id });
348        }
349        if !graph.node_store.is_alive(row) {
350            return Err(GraphError::NodeNotAlive { id });
351        }
352        Ok(row as usize)
353    }
354
355    fn require_live_edge(&self, id: EdgeId) -> GraphResult<usize> {
356        let graph = self.txn.read();
357        let row = graph
358            .row_for_edge_id(id)
359            .ok_or(GraphError::EdgeNotFound { id })?
360            .get();
361        if row as usize >= graph.edge_store.len() {
362            return Err(GraphError::EdgeNotFound { id });
363        }
364        if !graph.edge_store.is_alive(row) {
365            return Err(GraphError::EdgeNotAlive { id });
366        }
367        Ok(row as usize)
368    }
369}
370
371fn insert_node_labels(
372    index: &mut imbl::HashMap<DbString, RoaringBitmap>,
373    row: u32,
374    labels: &LabelSet,
375) {
376    for label in labels.iter().cloned() {
377        insert_index_row(index, label, row);
378    }
379}
380
381fn remove_node_labels(
382    index: &mut imbl::HashMap<DbString, RoaringBitmap>,
383    row: u32,
384    labels: &LabelSet,
385) {
386    for label in labels.iter() {
387        remove_index_row(index, label, row);
388    }
389}
390
391fn insert_index_row(index: &mut imbl::HashMap<DbString, RoaringBitmap>, label: DbString, row: u32) {
392    // In-place insert via `entry().or_default()`: the rebuild path uses the same
393    // idiom (see `consistency.rs` / `typed_index.rs`). `guard_mut` already gives
394    // unique ownership of the bitmap (Arc::make_mut), so we never clone the whole
395    // RoaringBitmap per label per node — bulk-loading one label is O(N), not O(N²).
396    index.entry(label).or_default().insert(row);
397}
398
399fn remove_index_row(
400    index: &mut imbl::HashMap<DbString, RoaringBitmap>,
401    label: &DbString,
402    row: u32,
403) {
404    // Mirror `insert_index_row`: mutate the bitmap behind the imbl entry
405    // instead of cloning the whole RoaringBitmap for every row removed.
406    let now_empty = match index.get_mut(label) {
407        Some(bitmap) => {
408            bitmap.remove(row);
409            bitmap.is_empty()
410        }
411        None => false,
412    };
413    if now_empty {
414        index.remove(label);
415    }
416}
417
418fn fill_node_defaults(
419    graph: &crate::SeleneGraph,
420    labels: &LabelSet,
421    props: &mut PropertyMap,
422) -> GraphResult<()> {
423    let Some(graph_type) = graph.meta.bound_type.as_deref() else {
424        return Ok(());
425    };
426    let Some(node_type) = graph_type.find_node_type(labels) else {
427        return Ok(());
428    };
429    fill_property_defaults(&node_type.properties, props)
430}
431
432fn fill_edge_defaults(
433    graph: &crate::SeleneGraph,
434    label: DbString,
435    source: NodeId,
436    target: NodeId,
437    props: &mut PropertyMap,
438) -> GraphResult<()> {
439    let Some(graph_type) = graph.meta.bound_type.as_deref() else {
440        return Ok(());
441    };
442    let Some(source_type) = node_type_index_for_node(graph, graph_type, source) else {
443        return Ok(());
444    };
445    let Some(target_type) = node_type_index_for_node(graph, graph_type, target) else {
446        return Ok(());
447    };
448    let Some(edge_type) = graph_type.find_edge_type(label, source_type, target_type) else {
449        return Ok(());
450    };
451    fill_property_defaults(&edge_type.properties, props)
452}
453
454fn fill_property_defaults(
455    declarations: &[PropertyTypeDef],
456    props: &mut PropertyMap,
457) -> GraphResult<()> {
458    for declaration in declarations {
459        if props.contains_key(&declaration.name) {
460            continue;
461        }
462        if let Some(default) = &declaration.default {
463            props.set(declaration.name.clone(), default.to_value()?)?;
464        }
465    }
466    Ok(())
467}
468
469fn reject_immutable_node_update(
470    graph: &crate::SeleneGraph,
471    id: NodeId,
472    labels: &LabelSet,
473    diff: &PropertyDiff,
474) -> GraphResult<()> {
475    let Some(graph_type) = graph.meta.bound_type.as_deref() else {
476        return Ok(());
477    };
478    let Some(node_type) = graph_type.find_node_type(labels) else {
479        return Ok(());
480    };
481    reject_immutable_property_update(
482        EntityId::Node(id),
483        node_type.name.clone(),
484        &node_type.properties,
485        diff,
486    )
487}
488
489fn reject_immutable_edge_update(
490    graph: &crate::SeleneGraph,
491    id: EdgeId,
492    diff: &PropertyDiff,
493) -> GraphResult<()> {
494    let Some(graph_type) = graph.meta.bound_type.as_deref() else {
495        return Ok(());
496    };
497    let Some(label) = graph.edge_label(id).cloned() else {
498        return Ok(());
499    };
500    let Some((source, target)) = graph.edge_endpoints(id) else {
501        return Ok(());
502    };
503    let Some(source_type) = node_type_index_for_node(graph, graph_type, source) else {
504        return Ok(());
505    };
506    let Some(target_type) = node_type_index_for_node(graph, graph_type, target) else {
507        return Ok(());
508    };
509    let Some(edge_type) = graph_type.find_edge_type(label, source_type, target_type) else {
510        return Ok(());
511    };
512    reject_immutable_property_update(
513        EntityId::Edge(id),
514        edge_type.name.clone(),
515        &edge_type.properties,
516        diff,
517    )
518}
519
520fn node_type_index_for_node(
521    graph: &crate::SeleneGraph,
522    graph_type: &GraphTypeDef,
523    node: NodeId,
524) -> Option<u32> {
525    let labels = graph.node_labels(node)?;
526    graph_type.find_node_type_index(labels)
527}
528
529fn reject_immutable_property_update(
530    entity_id: EntityId,
531    declared_in: DbString,
532    declarations: &[PropertyTypeDef],
533    diff: &PropertyDiff,
534) -> GraphResult<()> {
535    for (key, _) in &diff.set {
536        reject_if_immutable(entity_id, declared_in.clone(), declarations, key.clone())?;
537    }
538    for key in &diff.removed {
539        reject_if_immutable(entity_id, declared_in.clone(), declarations, key.clone())?;
540    }
541    Ok(())
542}
543
544fn reject_if_immutable(
545    entity_id: EntityId,
546    declared_in: DbString,
547    declarations: &[PropertyTypeDef],
548    property: DbString,
549) -> GraphResult<()> {
550    if declarations
551        .iter()
552        .any(|declaration| declaration.name == property && declaration.immutable)
553    {
554        return Err(GraphError::TypeViolation(
555            TypeViolation::ImmutablePropertyUpdate {
556                entity_id,
557                property,
558                declared_in,
559            },
560        ));
561    }
562    Ok(())
563}
564
565fn apply_property_diff(map: &mut PropertyMap, diff: &PropertyDiff) -> GraphResult<()> {
566    for (key, value) in diff.set.iter() {
567        map.set(key.clone(), value.clone())?;
568    }
569    for key in diff.removed.iter() {
570        map.remove(key);
571    }
572    Ok(())
573}
574
575#[cfg(test)]
576mod tests;
577
578#[cfg(test)]
579mod id_map_tests;
580
581#[cfg(test)]
582mod row_cap_tests;
583
584#[cfg(test)]
585mod truncate_tests;
586
587#[cfg(test)]
588mod factory_reset_tests;
589
590#[cfg(test)]
591mod hub_delete_tests;
592
593#[cfg(test)]
594mod delete_set_tests;
595
596#[cfg(test)]
597mod payload_clear_tests;