Skip to main content

oxgraph_db/database/
writer.rs

1//! The single writer transaction: mutators, typed surface, and commit.
2
3use std::{collections::BTreeSet, sync::Arc};
4
5use super::{Db, open::open_log_for_append};
6use crate::{
7    Bound, CommitSeq, DbError, ElementId, GraphProjectionDefinition, GraphProjectionSpec,
8    HypergraphProjectionDefinition, HypergraphProjectionSpec, IncidenceId, IndexId, LabelId,
9    ProjectionDefinition, ProjectionId, PropertyKeyId, PropertySubject, PropertyType,
10    PropertyValue, RelationId, RelationTypeId, RoleId, Schema, TransactionId,
11    catalog::{IndexDefinition, PropertyFamily},
12    lock::WriterLock,
13    overlay::{Snapshot, StateView, WriteOverlay},
14    typed::{Assignable, EqualityIndex, Key, ValueType},
15    wal,
16};
17
18/// Single writer transaction.
19///
20/// Mutations accumulate into a private write overlay layered over the parent
21/// snapshot; reads fall through the overlay then the base. `commit` appends the
22/// overlay's mutation log to the WAL (when dirty) and publishes a fresh snapshot;
23/// `rollback` drops the overlay and appends nothing.
24///
25/// # Performance
26///
27/// Creating and moving a writer is `O(1)`; each mutation is `O(log change)`.
28pub struct Writer<'db> {
29    /// Db receiving the commit.
30    pub(super) database: &'db mut Db,
31    /// Parent snapshot the writer layers over (its base + frozen overlay).
32    pub(super) parent: Arc<Snapshot>,
33    /// Private mutable delta this writer accumulates.
34    pub(super) delta: WriteOverlay,
35    /// Writer transaction id (session-local until a dirty commit makes it
36    /// durable).
37    pub(super) transaction_id: TransactionId,
38    /// Held single-writer advisory lock. Its [`Drop`] releases the lock when this
39    /// transaction ends (on `rollback`, or on any early-return error path); a
40    /// successful dirty [`Self::commit`] releases it explicitly with `drop` so a
41    /// triggered auto-checkpoint can re-acquire it.
42    pub(super) lock: WriterLock,
43}
44
45impl Writer<'_> {
46    /// Registers a structural incidence role.
47    ///
48    /// # Errors
49    ///
50    /// Returns [`DbError`] when the name already exists or ID allocation fails.
51    ///
52    /// # Performance
53    ///
54    /// This method is `O(log role count + name length)`.
55    pub fn register_role(&mut self, name: impl Into<String>) -> Result<RoleId, DbError> {
56        self.delta.register_role(name.into())
57    }
58
59    /// Registers an element or relation label.
60    ///
61    /// # Errors
62    ///
63    /// Returns [`DbError`] when the name already exists or ID allocation fails.
64    ///
65    /// # Performance
66    ///
67    /// This method is `O(log label count + name length)`.
68    pub fn register_label(&mut self, name: impl Into<String>) -> Result<LabelId, DbError> {
69        self.delta.register_label(name.into())
70    }
71
72    /// Registers a relation type.
73    ///
74    /// # Errors
75    ///
76    /// Returns [`DbError`] when the name already exists or ID allocation fails.
77    ///
78    /// # Performance
79    ///
80    /// This method is `O(log relation type count + name length)`.
81    pub fn register_relation_type(
82        &mut self,
83        name: impl Into<String>,
84    ) -> Result<RelationTypeId, DbError> {
85        self.delta.register_relation_type(name.into())
86    }
87
88    /// Registers a typed property key.
89    ///
90    /// # Errors
91    ///
92    /// Returns [`DbError`] when the name already exists or ID allocation fails.
93    ///
94    /// # Performance
95    ///
96    /// This method is `O(log property key count + name length)`.
97    pub fn register_property_key(
98        &mut self,
99        name: impl Into<String>,
100        family: PropertyFamily,
101        value_type: PropertyType,
102    ) -> Result<PropertyKeyId, DbError> {
103        self.delta
104            .register_property_key(name.into(), family, value_type)
105    }
106
107    /// Defines a physical projection.
108    ///
109    /// # Errors
110    ///
111    /// Returns [`DbError`] when referenced catalog IDs are unknown, the
112    /// projection name already exists, or ID allocation fails.
113    ///
114    /// # Performance
115    ///
116    /// This method is `O(definition size + catalog lookup cost)`.
117    pub fn define_projection(
118        &mut self,
119        definition: ProjectionDefinition,
120    ) -> Result<ProjectionId, DbError> {
121        self.validate_projection_definition(&definition)?;
122        self.delta.register_projection(definition)
123    }
124
125    /// Defines an index.
126    ///
127    /// # Errors
128    ///
129    /// Returns [`DbError`] when referenced catalog IDs are unknown, the index
130    /// name already exists, or ID allocation fails.
131    ///
132    /// # Performance
133    ///
134    /// This method is `O(definition size + catalog lookup cost)`.
135    pub fn define_index(
136        &mut self,
137        name: impl Into<String>,
138        definition: IndexDefinition,
139    ) -> Result<IndexId, DbError> {
140        self.validate_index_definition(&definition)?;
141        self.delta.register_index(name.into(), definition)
142    }
143
144    /// Applies a declarative [`Schema`] idempotently (register-or-get every
145    /// declared item), returning the resolved [`Bound`] handle bag. Re-applying
146    /// the same schema reuses existing ids; a name that already exists with a
147    /// conflicting shape is a [`DbError::SchemaConflict`].
148    ///
149    /// # Errors
150    ///
151    /// Returns [`DbError`] on a shape conflict, an undeclared referenced name (an
152    /// index's key, a projection's role/type), or id-allocation failure.
153    ///
154    /// # Performance
155    ///
156    /// This method is `O(declared items × log catalog)`.
157    pub fn apply_schema(&mut self, schema: &Schema) -> Result<Bound, DbError> {
158        let mut bound = Bound::default();
159        for name in &schema.roles {
160            let id = match self.merged().catalog().role_id(name) {
161                Some(id) => id,
162                None => self.register_role(name.clone())?,
163            };
164            bound.roles.insert(name.clone(), id);
165        }
166        for name in &schema.labels {
167            let id = match self.merged().catalog().label_id(name) {
168                Some(id) => id,
169                None => self.register_label(name.clone())?,
170            };
171            bound.labels.insert(name.clone(), id);
172        }
173        for name in &schema.relation_types {
174            let id = match self.merged().catalog().relation_type_id(name) {
175                Some(id) => id,
176                None => self.register_relation_type(name.clone())?,
177            };
178            bound.relation_types.insert(name.clone(), id);
179        }
180        for (name, family, value_type) in &schema.keys {
181            let id = self.register_key_or_get(name, *family, *value_type)?;
182            bound.keys.insert(name.clone(), (id, *value_type));
183        }
184        for (name, key_name) in &schema.equality_indexes {
185            let (key_id, value_type) = *bound.keys.get(key_name).ok_or_else(|| {
186                DbError::Catalog(crate::error::CatalogError::UnknownName {
187                    kind: "property key",
188                    name: key_name.clone(),
189                })
190            })?;
191            let id = match self.merged().catalog().index_id(name) {
192                Some(id) => id,
193                None => self.define_index(
194                    name.clone(),
195                    IndexDefinition::PropertyEquality { key: key_id },
196                )?,
197            };
198            bound
199                .equality_indexes
200                .insert(name.clone(), (id, value_type));
201        }
202        for spec in &schema.graph_projections {
203            let id = match self.merged().catalog().projection_id(&spec.name) {
204                Some(id) => id,
205                None => self.define_graph_projection(spec, &bound)?,
206            };
207            bound.projections.insert(spec.name.clone(), id);
208        }
209        for spec in &schema.hypergraph_projections {
210            let id = match self.merged().catalog().projection_id(&spec.name) {
211                Some(id) => id,
212                None => self.define_hypergraph_projection(spec, &bound)?,
213            };
214            bound.projections.insert(spec.name.clone(), id);
215        }
216        Ok(bound)
217    }
218
219    /// Registers a property key, or returns the existing id when the name is
220    /// already present with a matching family and value type.
221    ///
222    /// # Errors
223    ///
224    /// Returns [`DbError::SchemaConflict`] when the name exists with a different
225    /// family or value type.
226    ///
227    /// # Performance
228    ///
229    /// This method is `O(log catalog)`.
230    fn register_key_or_get(
231        &mut self,
232        name: &str,
233        family: PropertyFamily,
234        value_type: PropertyType,
235    ) -> Result<PropertyKeyId, DbError> {
236        let Some(existing) = self.merged().catalog().property_key_id(name) else {
237            return self.register_property_key(name.to_owned(), family, value_type);
238        };
239        let matches = self
240            .merged()
241            .catalog()
242            .property_key(existing)
243            .is_some_and(|def| def.family == family && def.value_type == value_type);
244        if matches {
245            Ok(existing)
246        } else {
247            Err(DbError::Catalog(
248                crate::error::CatalogError::SchemaConflict {
249                    name: name.to_owned(),
250                    reason: "property key family/value type differs from the existing catalog entry",
251                },
252            ))
253        }
254    }
255
256    /// Defines a graph projection from a spec, resolving its relation-type and
257    /// role names through `bound`.
258    ///
259    /// # Errors
260    ///
261    /// Returns [`DbError::UnknownName`] when a referenced role/type is unbound, or
262    /// a definition error.
263    ///
264    /// # Performance
265    ///
266    /// This method is `O(relation-type count × log catalog)`.
267    fn define_graph_projection(
268        &mut self,
269        spec: &GraphProjectionSpec,
270        bound: &Bound,
271    ) -> Result<ProjectionId, DbError> {
272        let mut relation_types = BTreeSet::new();
273        for name in &spec.relation_types {
274            relation_types.insert(bound.relation_type(name)?);
275        }
276        let source_role = bound.role(&spec.source_role)?;
277        let target_role = bound.role(&spec.target_role)?;
278        self.define_projection(ProjectionDefinition::Graph(GraphProjectionDefinition {
279            name: spec.name.clone(),
280            relation_types,
281            source_role,
282            target_role,
283        }))
284    }
285
286    /// Defines a hypergraph projection from a spec, resolving its relation-type
287    /// and role names through `bound`.
288    ///
289    /// # Errors
290    ///
291    /// Returns [`DbError::UnknownName`] when a referenced role/type is unbound, or
292    /// a definition error.
293    ///
294    /// # Performance
295    ///
296    /// This method is `O((relation-type count + role count) × log catalog)`.
297    fn define_hypergraph_projection(
298        &mut self,
299        spec: &HypergraphProjectionSpec,
300        bound: &Bound,
301    ) -> Result<ProjectionId, DbError> {
302        let mut relation_types = BTreeSet::new();
303        for name in &spec.relation_types {
304            relation_types.insert(bound.relation_type(name)?);
305        }
306        let mut source_roles = BTreeSet::new();
307        for name in &spec.source_roles {
308            source_roles.insert(bound.role(name)?);
309        }
310        let mut target_roles = BTreeSet::new();
311        for name in &spec.target_roles {
312            target_roles.insert(bound.role(name)?);
313        }
314        self.define_projection(ProjectionDefinition::Hypergraph(
315            HypergraphProjectionDefinition {
316                name: spec.name.clone(),
317                relation_types,
318                source_roles,
319                target_roles,
320            },
321        ))
322    }
323
324    /// Creates a canonical element.
325    ///
326    /// # Errors
327    ///
328    /// Returns [`DbError::IdOverflow`] when element IDs are exhausted.
329    ///
330    /// # Performance
331    ///
332    /// This method is `O(log element change)`.
333    pub fn create_element(&mut self) -> Result<ElementId, DbError> {
334        self.delta.create_element()
335    }
336
337    /// Creates a canonical relation.
338    ///
339    /// # Errors
340    ///
341    /// Returns [`DbError::IdOverflow`] when relation IDs are exhausted.
342    ///
343    /// # Performance
344    ///
345    /// This method is `O(log relation change)`.
346    pub fn create_relation(&mut self) -> Result<RelationId, DbError> {
347        self.delta.create_relation()
348    }
349
350    /// Creates a canonical incidence.
351    ///
352    /// # Errors
353    ///
354    /// Returns [`DbError`] when referenced IDs are unknown or incidence IDs are
355    /// exhausted.
356    ///
357    /// # Performance
358    ///
359    /// This method is `O(log incidence change + reference lookup cost)`.
360    pub fn create_incidence(
361        &mut self,
362        relation: RelationId,
363        element: ElementId,
364        role: RoleId,
365    ) -> Result<IncidenceId, DbError> {
366        self.require_relation(relation)?;
367        self.require_element(element)?;
368        self.require_role(role)?;
369        self.delta.create_incidence(relation, element, role)
370    }
371
372    /// Tombstones a canonical element and its incidences.
373    ///
374    /// # Errors
375    ///
376    /// Returns [`DbError::UnknownElement`] when the element is not visible.
377    ///
378    /// # Performance
379    ///
380    /// This method is `O(log n + degree)` via the reverse-adjacency index.
381    pub(crate) fn tombstone_element(&mut self, id: ElementId) -> Result<(), DbError> {
382        self.require_element(id)?;
383        // Cascade: every incidence on the element — resolved in O(log n + degree)
384        // through the reverse-adjacency index, not a full incidence scan — is
385        // tombstoned too.
386        let incidences: Vec<IncidenceId> = self
387            .merged()
388            .element_incidences(id)
389            .into_iter()
390            .map(|record| record.id)
391            .collect();
392        let base = self.parent.base_records();
393        self.delta.tombstone_element(base, id);
394        for incidence in incidences {
395            self.delta
396                .tombstone_incidence(self.parent.base_records(), incidence);
397        }
398        Ok(())
399    }
400
401    /// Tombstones a canonical relation and its incidences.
402    ///
403    /// # Errors
404    ///
405    /// Returns [`DbError::UnknownRelation`] when the relation is not visible.
406    ///
407    /// # Performance
408    ///
409    /// This method is `O(log n + degree)` via the reverse-adjacency index.
410    pub(crate) fn tombstone_relation(&mut self, id: RelationId) -> Result<(), DbError> {
411        self.require_relation(id)?;
412        // Cascade: every incidence in the relation — resolved in O(log n + degree)
413        // through the reverse-adjacency index, not a full incidence scan.
414        let incidences: Vec<IncidenceId> = self
415            .merged()
416            .relation_incidences(id)
417            .into_iter()
418            .map(|record| record.id)
419            .collect();
420        let base = self.parent.base_records();
421        self.delta.tombstone_relation(base, id);
422        for incidence in incidences {
423            self.delta
424                .tombstone_incidence(self.parent.base_records(), incidence);
425        }
426        Ok(())
427    }
428
429    /// Tombstones a canonical incidence.
430    ///
431    /// # Errors
432    ///
433    /// Returns [`DbError::UnknownIncidence`] when the incidence is not visible.
434    ///
435    /// # Performance
436    ///
437    /// This method is `O(log incidence change)`.
438    pub(crate) fn tombstone_incidence(&mut self, id: IncidenceId) -> Result<(), DbError> {
439        self.require_incidence(id)?;
440        self.delta
441            .tombstone_incidence(self.parent.base_records(), id);
442        Ok(())
443    }
444
445    /// Adds a label to an element.
446    ///
447    /// # Errors
448    ///
449    /// Returns [`DbError`] when the element or label is unknown.
450    ///
451    /// # Performance
452    ///
453    /// This method is `O(log element change + log label count)`.
454    pub(crate) fn add_element_label(
455        &mut self,
456        element: ElementId,
457        label: LabelId,
458    ) -> Result<(), DbError> {
459        self.require_element(element)?;
460        self.require_label(label)?;
461        self.delta
462            .add_element_label(self.parent.base_records(), element, label);
463        Ok(())
464    }
465
466    /// Adds a label to a relation.
467    ///
468    /// # Errors
469    ///
470    /// Returns [`DbError`] when the relation or label is unknown.
471    ///
472    /// # Performance
473    ///
474    /// This method is `O(log relation change + log label count)`.
475    pub(crate) fn add_relation_label(
476        &mut self,
477        relation: RelationId,
478        label: LabelId,
479    ) -> Result<(), DbError> {
480        self.require_relation(relation)?;
481        self.require_label(label)?;
482        self.delta
483            .add_relation_label(self.parent.base_records(), relation, label);
484        Ok(())
485    }
486
487    /// Sets a relation type.
488    ///
489    /// # Errors
490    ///
491    /// Returns [`DbError`] when the relation or relation type is unknown.
492    ///
493    /// # Performance
494    ///
495    /// This method is `O(log relation change + log relation type count)`.
496    pub fn set_relation_type(
497        &mut self,
498        relation: RelationId,
499        relation_type: RelationTypeId,
500    ) -> Result<(), DbError> {
501        self.require_relation(relation)?;
502        self.require_relation_type(relation_type)?;
503        self.delta
504            .set_relation_type(self.parent.base_records(), relation, relation_type);
505        Ok(())
506    }
507
508    /// Sets a property value.
509    ///
510    /// # Errors
511    ///
512    /// Returns [`DbError`] when the subject or key is unknown, or the value
513    /// does not match the key schema.
514    ///
515    /// # Performance
516    ///
517    /// This method is `O(log subject change + log key count)`.
518    pub(crate) fn set_property(
519        &mut self,
520        subject: PropertySubject,
521        key: PropertyKeyId,
522        value: PropertyValue,
523    ) -> Result<(), DbError> {
524        // Referential integrity: the subject must be visible (this rejects an
525        // orphan property against a tombstoned/absent subject at the transaction
526        // boundary — the overlay layer is permissive by design).
527        self.require_subject(subject)?;
528        let definition = self
529            .merged()
530            .catalog()
531            .property_key(key)
532            .cloned()
533            .ok_or_else(|| DbError::unknown(key))?;
534        if definition.family != subject.family() {
535            return Err(DbError::Query(
536                crate::error::QueryError::WrongPropertyFamily {
537                    expected: definition.family,
538                    actual: subject.family(),
539                },
540            ));
541        }
542        if definition.value_type != value.value_type() {
543            return Err(DbError::Query(
544                crate::error::QueryError::PropertyTypeMismatch {
545                    expected: definition.value_type,
546                    actual: value.value_type(),
547                },
548            ));
549        }
550        self.delta
551            .set_property(self.parent.base_records(), subject, key, value);
552        Ok(())
553    }
554
555    /// Removes a property value.
556    ///
557    /// # Errors
558    ///
559    /// Returns [`DbError`] when the subject or key is unknown.
560    ///
561    /// # Performance
562    ///
563    /// This method is `O(log subject change + log key count)`.
564    pub(crate) fn remove_property(
565        &mut self,
566        subject: PropertySubject,
567        key: PropertyKeyId,
568    ) -> Result<(), DbError> {
569        self.require_subject(subject)?;
570        if self.merged().catalog().property_key(key).is_none() {
571            return Err(DbError::unknown(key));
572        }
573        self.delta
574            .remove_property(self.parent.base_records(), subject, key);
575        Ok(())
576    }
577
578    /// Resolves the property key an equality index covers.
579    ///
580    /// # Errors
581    ///
582    /// Returns [`DbError::UnknownIndex`] when `index` is unknown, or an
583    /// unsupported-query error when it is not a property-equality index.
584    ///
585    /// # Performance
586    ///
587    /// This method is `O(log index count)`.
588    fn equality_index_key(&self, index: IndexId) -> Result<PropertyKeyId, DbError> {
589        let view = self.merged();
590        let entry = view
591            .catalog()
592            .index(index)
593            .ok_or_else(|| DbError::unknown(index))?;
594        match &entry.definition {
595            IndexDefinition::PropertyEquality { key } => Ok(*key),
596            _other => Err(DbError::unsupported(
597                "reconcile requires a property-equality index",
598            )),
599        }
600    }
601
602    /// Inserts or updates the element whose value under `index` equals `value`,
603    /// returning its canonical id — reused when an element already carries that
604    /// identity value (id stable across reconcile), freshly minted (a never-reused
605    /// id, with the identity property set) otherwise.
606    ///
607    /// # Errors
608    ///
609    /// Returns [`DbError`] when `index` is not an equality index or the value
610    /// type mismatches the key schema.
611    ///
612    /// # Performance
613    ///
614    /// This method is `O(log n + value length)` — a probe plus, on a miss, a mint.
615    pub fn upsert_element<T: ValueType>(
616        &mut self,
617        index: EqualityIndex<T>,
618        value: impl Assignable<T>,
619    ) -> Result<ElementId, DbError> {
620        let value = value.into_value()?;
621        let key = self.equality_index_key(index.id())?;
622        let existing = self
623            .merged()
624            .property_equal(key, &value)
625            .into_iter()
626            .find_map(|subject| match subject {
627                PropertySubject::Element(id) => Some(id),
628                PropertySubject::Relation(_) | PropertySubject::Incidence(_) => None,
629            });
630        if let Some(id) = existing {
631            return Ok(id);
632        }
633        let element = self.create_element()?;
634        self.set_property(PropertySubject::Element(element), key, value)?;
635        Ok(element)
636    }
637
638    /// Inserts or updates the relation whose value under `index` equals `value`,
639    /// returning its canonical id. On a miss it mints the relation, sets its type
640    /// and identity property, and creates one incidence per `(element, role)`
641    /// endpoint; on a hit the existing relation (with its endpoints) is reused
642    /// unchanged — the identity value encodes the endpoints, so they are immutable.
643    ///
644    /// # Errors
645    ///
646    /// Returns [`DbError`] when `index` is not an equality index, the value type
647    /// mismatches, or an endpoint element does not exist.
648    ///
649    /// # Performance
650    ///
651    /// This method is `O(log n + endpoints)` — a probe plus, on a miss, a mint.
652    pub fn upsert_relation<T: ValueType>(
653        &mut self,
654        index: EqualityIndex<T>,
655        value: impl Assignable<T>,
656        relation_type: RelationTypeId,
657        endpoints: &[(ElementId, RoleId)],
658    ) -> Result<RelationId, DbError> {
659        let value = value.into_value()?;
660        let key = self.equality_index_key(index.id())?;
661        let existing = self
662            .merged()
663            .property_equal(key, &value)
664            .into_iter()
665            .find_map(|subject| match subject {
666                PropertySubject::Relation(id) => Some(id),
667                PropertySubject::Element(_) | PropertySubject::Incidence(_) => None,
668            });
669        if let Some(id) = existing {
670            return Ok(id);
671        }
672        let relation = self.create_relation()?;
673        self.set_relation_type(relation, relation_type)?;
674        self.set_property(PropertySubject::Relation(relation), key, value)?;
675        for (element, role) in endpoints {
676            self.create_incidence(relation, *element, *role)?;
677        }
678        Ok(relation)
679    }
680
681    /// Tombstones every subject carried by `index` whose identity value is NOT in
682    /// `keep`, cascading each subject's incidences in `O(degree)` via the
683    /// reverse-adjacency index. The prune half of a reconcile: after upserting
684    /// every desired subject, `retain` removes the vanished complement.
685    ///
686    /// # Errors
687    ///
688    /// Returns [`DbError`] when `index` is not an equality index or a `keep` value
689    /// type mismatches the key schema.
690    ///
691    /// # Performance
692    ///
693    /// This method is `O(family size + removed × degree)`.
694    pub fn retain<T: ValueType, V: Assignable<T> + Copy>(
695        &mut self,
696        index: EqualityIndex<T>,
697        keep: &[V],
698    ) -> Result<(), DbError> {
699        let key = self.equality_index_key(index.id())?;
700        let mut keep_values: BTreeSet<PropertyValue> = BTreeSet::new();
701        for value in keep {
702            keep_values.insert((*value).into_value()?);
703        }
704        let stale: Vec<PropertySubject> = self
705            .merged()
706            .property_key_subjects(key)
707            .into_iter()
708            .filter(|(_subject, value)| !keep_values.contains(value))
709            .map(|(subject, _value)| subject)
710            .collect();
711        for subject in stale {
712            match subject {
713                PropertySubject::Element(id) => self.tombstone_element(id)?,
714                PropertySubject::Relation(id) => self.tombstone_relation(id)?,
715                PropertySubject::Incidence(id) => self.tombstone_incidence(id)?,
716            }
717        }
718        Ok(())
719    }
720
721    /// Sets a typed property on a subject; the value type is checked at compile
722    /// time against the key.
723    ///
724    /// # Errors
725    ///
726    /// Returns [`DbError`] when the subject is absent, the value is out of range,
727    /// or the value type mismatches the key schema.
728    ///
729    /// # Performance
730    ///
731    /// This method is `O(log change + log keys)`.
732    pub fn set<T: ValueType>(
733        &mut self,
734        subject: impl Into<PropertySubject>,
735        key: Key<T>,
736        value: impl Assignable<T>,
737    ) -> Result<(), DbError> {
738        self.set_property(subject.into(), key.id(), value.into_value()?)
739    }
740
741    /// Removes a typed property from a subject.
742    ///
743    /// # Errors
744    ///
745    /// Returns [`DbError`] when the subject is absent or the key is unknown.
746    ///
747    /// # Performance
748    ///
749    /// This method is `O(log change + log keys)`.
750    pub fn unset<T: ValueType>(
751        &mut self,
752        subject: impl Into<PropertySubject>,
753        key: Key<T>,
754    ) -> Result<(), DbError> {
755        self.remove_property(subject.into(), key.id())
756    }
757
758    /// Adds a label to an element or relation subject.
759    ///
760    /// # Errors
761    ///
762    /// Returns [`DbError`] when the subject is absent, the label is unknown, or
763    /// the subject is an incidence (incidences carry no labels).
764    ///
765    /// # Performance
766    ///
767    /// This method is `O(log change + log labels)`.
768    pub fn add_label(
769        &mut self,
770        subject: impl Into<PropertySubject>,
771        label: LabelId,
772    ) -> Result<(), DbError> {
773        match subject.into() {
774            PropertySubject::Element(id) => self.add_element_label(id, label),
775            PropertySubject::Relation(id) => self.add_relation_label(id, label),
776            PropertySubject::Incidence(_) => {
777                Err(DbError::unsupported("incidences do not carry labels"))
778            }
779        }
780    }
781
782    /// Tombstones any subject by id, cascading a relation's or element's
783    /// incidences in `O(degree)` via the reverse-adjacency index.
784    ///
785    /// # Errors
786    ///
787    /// Returns [`DbError`] when the subject is not visible.
788    ///
789    /// # Performance
790    ///
791    /// This method is `O(log change + degree)`.
792    pub fn tombstone(&mut self, subject: impl Into<PropertySubject>) -> Result<(), DbError> {
793        match subject.into() {
794            PropertySubject::Element(id) => self.tombstone_element(id),
795            PropertySubject::Relation(id) => self.tombstone_relation(id),
796            PropertySubject::Incidence(id) => self.tombstone_incidence(id),
797        }
798    }
799
800    /// Commits this write transaction durably.
801    ///
802    /// A non-dirty commit returns the parent's commit sequence without appending
803    /// to the WAL or publishing. A dirty commit encodes the overlay's mutation
804    /// log into one WAL frame (with the watermark op last), appends it with an
805    /// fsync (truncating back to the captured EOF on any write error so no
806    /// interior torn record survives), THEN folds the delta into a fresh
807    /// `Arc<Overlay>` and publishes a new `Arc<Snapshot>`.
808    ///
809    /// After publishing, a dirty commit consults the configured
810    /// [`CheckpointPolicy`]: it releases the writer lock FIRST (so the fold can
811    /// re-acquire it), then folds when the delta-log has outgrown the base. The
812    /// committed frame is already durable, so an auto-fold failure does not lose
813    /// data; it is surfaced to the caller.
814    ///
815    /// # Errors
816    ///
817    /// Returns [`DbError`] when commit-sequence allocation, frame encoding, the
818    /// durable append, or a triggered auto-checkpoint fold fails.
819    ///
820    /// # Performance
821    ///
822    /// This method is `O(change)` for the dirty path — flat as the base grows.
823    /// The publish step shares the parent snapshot's already-materialized
824    /// [`crate::overlay::BaseRecords`] and derived index by `Arc` (a commit never
825    /// folds, so the base is byte-identical within the generation), so it neither
826    /// re-decodes the base nor rebuilds the index. A triggered fold adds
827    /// `O(visible state bytes)` on top.
828    pub(crate) fn commit(mut self) -> Result<CommitSeq, DbError> {
829        if self.delta.is_empty() {
830            // Non-dirty commit: no append, no publish, no durable id advance.
831            return Ok(self.parent.lsn());
832        }
833        let lsn = self
834            .parent
835            .lsn()
836            .checked_next()
837            .ok_or(DbError::Txn(crate::error::TxnError::CommitSeqOverflow))?;
838        let (ops, blob) = self.delta.take_frame();
839        let frame = wal::encode_commit(
840            lsn.get(),
841            self.transaction_id.get(),
842            self.database.base_generation,
843            &ops,
844            &blob,
845        )?;
846        let mut log = open_log_for_append(&self.database.root, self.database.base_generation)?;
847        wal::append_commit(&mut log, &frame)?;
848
849        // Durable: the delta was seeded from the parent overlay and only added
850        // this writer's changes, so freezing it directly is the full new
851        // published overlay (parent state + this commit). The parent overlay was
852        // never mutated — this is a brand-new frozen `Arc<Overlay>`, so a reader
853        // pinning the parent is unaffected.
854        let new_overlay = Arc::new(self.delta.freeze());
855        // A commit never folds, so the new snapshot pins the SAME base generation
856        // as the parent — the base wire bytes are byte-identical, and so are the
857        // owned records and the derived index built from them. Share the parent's
858        // `Arc<BaseRecords>` (and its `BaseIndex`) instead of re-decoding the base
859        // and rebuilding the index, which keeps a single-element commit `O(change)`
860        // rather than `O(base)` regardless of how large the base has grown.
861        let snapshot = Snapshot::with_shared_base_records(
862            self.parent.generation(),
863            lsn,
864            Arc::clone(self.parent.base()),
865            new_overlay,
866            Arc::clone(self.parent.base_records()),
867        );
868        self.database.current = Arc::new(snapshot);
869        self.database.last_transaction_id = self.transaction_id;
870        // Release the writer lock before any auto-fold so the fold can re-acquire
871        // it (a partial move out of `self`, legal because `Writer` has
872        // no `Drop` impl; the remaining `&mut Db` borrow stays live).
873        drop(self.lock);
874        self.database.maybe_auto_checkpoint()?;
875        Ok(lsn)
876    }
877
878    /// Returns the merged read view this writer sees (overlay over base).
879    ///
880    /// # Performance
881    ///
882    /// This method is `O(1)` to construct.
883    fn merged(&self) -> crate::overlay::WriteMergedState<'_> {
884        crate::overlay::WriteMergedState::new(self.parent.base_records(), &self.delta)
885    }
886
887    /// Requires an element to be visible in the writer's merged view.
888    ///
889    /// # Errors
890    ///
891    /// Returns [`DbError::UnknownElement`] when absent.
892    ///
893    /// # Performance
894    ///
895    /// This method is `O(log change + log n)`.
896    fn require_element(&self, id: ElementId) -> Result<(), DbError> {
897        if self.merged().contains_element(id) {
898            Ok(())
899        } else {
900            Err(DbError::unknown(id))
901        }
902    }
903
904    /// Requires a relation to be visible.
905    ///
906    /// # Errors
907    ///
908    /// Returns [`DbError::UnknownRelation`] when absent.
909    ///
910    /// # Performance
911    ///
912    /// This method is `O(log change + log n)`.
913    fn require_relation(&self, id: RelationId) -> Result<(), DbError> {
914        if self.merged().contains_relation(id) {
915            Ok(())
916        } else {
917            Err(DbError::unknown(id))
918        }
919    }
920
921    /// Requires an incidence to be visible.
922    ///
923    /// # Errors
924    ///
925    /// Returns [`DbError::UnknownIncidence`] when absent.
926    ///
927    /// # Performance
928    ///
929    /// This method is `O(log change + log n)`.
930    fn require_incidence(&self, id: IncidenceId) -> Result<(), DbError> {
931        if self.merged().contains_incidence(id) {
932            Ok(())
933        } else {
934            Err(DbError::unknown(id))
935        }
936    }
937
938    /// Requires a role to exist in the merged catalog.
939    ///
940    /// # Errors
941    ///
942    /// Returns [`DbError::UnknownRole`] when absent.
943    ///
944    /// # Performance
945    ///
946    /// This method is `O(log role count)`.
947    fn require_role(&self, id: RoleId) -> Result<(), DbError> {
948        if self.delta.catalog().role(id).is_some() {
949            Ok(())
950        } else {
951            Err(DbError::unknown(id))
952        }
953    }
954
955    /// Requires a label to exist in the merged catalog.
956    ///
957    /// # Errors
958    ///
959    /// Returns [`DbError::UnknownLabel`] when absent.
960    ///
961    /// # Performance
962    ///
963    /// This method is `O(log label count)`.
964    fn require_label(&self, id: LabelId) -> Result<(), DbError> {
965        if self.delta.catalog().label(id).is_some() {
966            Ok(())
967        } else {
968            Err(DbError::unknown(id))
969        }
970    }
971
972    /// Requires a relation type to exist in the merged catalog.
973    ///
974    /// # Errors
975    ///
976    /// Returns [`DbError::UnknownRelationType`] when absent.
977    ///
978    /// # Performance
979    ///
980    /// This method is `O(log relation type count)`.
981    fn require_relation_type(&self, id: RelationTypeId) -> Result<(), DbError> {
982        if self.delta.catalog().relation_type(id).is_some() {
983            Ok(())
984        } else {
985            Err(DbError::unknown(id))
986        }
987    }
988
989    /// Requires a property subject to be visible.
990    ///
991    /// # Errors
992    ///
993    /// Returns the matching `Unknown*` error when the subject is absent.
994    ///
995    /// # Performance
996    ///
997    /// This method is `O(log change + log n)`.
998    fn require_subject(&self, subject: PropertySubject) -> Result<(), DbError> {
999        match subject {
1000            PropertySubject::Element(id) => self.require_element(id),
1001            PropertySubject::Relation(id) => self.require_relation(id),
1002            PropertySubject::Incidence(id) => self.require_incidence(id),
1003        }
1004    }
1005
1006    /// Validates one projection definition against the merged catalog.
1007    ///
1008    /// # Errors
1009    ///
1010    /// Returns [`DbError`] when a referenced role or relation type is unknown.
1011    ///
1012    /// # Performance
1013    ///
1014    /// This method is `O(definition size)`.
1015    fn validate_projection_definition(
1016        &self,
1017        definition: &ProjectionDefinition,
1018    ) -> Result<(), DbError> {
1019        match definition {
1020            ProjectionDefinition::Graph(graph) => {
1021                self.require_role(graph.source_role)?;
1022                self.require_role(graph.target_role)?;
1023                for relation_type in &graph.relation_types {
1024                    self.require_relation_type(*relation_type)?;
1025                }
1026                Ok(())
1027            }
1028            ProjectionDefinition::Hypergraph(hyper) => {
1029                for role in &hyper.source_roles {
1030                    self.require_role(*role)?;
1031                }
1032                for role in &hyper.target_roles {
1033                    self.require_role(*role)?;
1034                }
1035                for relation_type in &hyper.relation_types {
1036                    self.require_relation_type(*relation_type)?;
1037                }
1038                Ok(())
1039            }
1040        }
1041    }
1042
1043    /// Validates one index definition against the merged catalog.
1044    ///
1045    /// # Errors
1046    ///
1047    /// Returns [`DbError`] when a referenced catalog id is unknown or a
1048    /// composite index has no keys.
1049    ///
1050    /// # Performance
1051    ///
1052    /// This method is `O(definition size)`.
1053    fn validate_index_definition(&self, definition: &IndexDefinition) -> Result<(), DbError> {
1054        let catalog = self.delta.catalog();
1055        match definition {
1056            IndexDefinition::Label { label } => self.require_label(*label),
1057            IndexDefinition::RelationType { relation_type } => {
1058                self.require_relation_type(*relation_type)
1059            }
1060            IndexDefinition::PropertyEquality { key } | IndexDefinition::PropertyRange { key } => {
1061                self.require_property_key(*key)
1062            }
1063            IndexDefinition::CompositeEquality { keys } => {
1064                if keys.is_empty() {
1065                    return Err(DbError::unsupported(
1066                        "composite equality index requires at least one key",
1067                    ));
1068                }
1069                for key in keys {
1070                    self.require_property_key(*key)?;
1071                }
1072                Ok(())
1073            }
1074            IndexDefinition::Projection { projection } => catalog
1075                .projection(*projection)
1076                .is_some()
1077                .then_some(())
1078                .ok_or_else(|| DbError::unknown(*projection)),
1079        }
1080    }
1081
1082    /// Requires a property key to exist in the merged catalog.
1083    ///
1084    /// # Errors
1085    ///
1086    /// Returns [`DbError::UnknownPropertyKey`] when absent.
1087    ///
1088    /// # Performance
1089    ///
1090    /// This method is `O(log property key count)`.
1091    fn require_property_key(&self, id: PropertyKeyId) -> Result<(), DbError> {
1092        if self.delta.catalog().property_key(id).is_some() {
1093            Ok(())
1094        } else {
1095            Err(DbError::unknown(id))
1096        }
1097    }
1098}