Skip to main content

oxgraph_db/database/
writer.rs

1//! The single writer transaction: mutators, typed surface, and commit.
2
3use std::{collections::BTreeSet, sync::Arc};
4
5use super::{Db, open::open_log_for_append};
6use crate::{
7    Bound, CommitSeq, DbError, ElementId, GraphProjectionDefinition, GraphProjectionSpec,
8    IncidenceId, IndexId, LabelId, ProjectionDefinition, ProjectionId, PropertyKeyId,
9    PropertySubject, PropertyType, PropertyValue, RelationId, RelationTypeId, RoleId, Schema,
10    TransactionId,
11    catalog::{IndexDefinition, PropertyFamily},
12    lock::WriterLock,
13    overlay::{Snapshot, StateView, WriteOverlay},
14    typed::{Assignable, EqualityIndex, Key, ValueType},
15    wal,
16};
17
18/// Single writer transaction.
19///
20/// Mutations accumulate into a private write overlay layered over the parent
21/// snapshot; reads fall through the overlay then the base. `commit` appends the
22/// overlay's mutation log to the WAL (when dirty) and publishes a fresh snapshot;
23/// `rollback` drops the overlay and appends nothing.
24///
25/// # Performance
26///
27/// Creating and moving a writer is `O(1)`; each mutation is `O(log change)`.
28pub struct Writer<'db> {
29    /// Db receiving the commit.
30    pub(super) database: &'db mut Db,
31    /// Parent snapshot the writer layers over (its base + frozen overlay).
32    pub(super) parent: Arc<Snapshot>,
33    /// Private mutable delta this writer accumulates.
34    pub(super) delta: WriteOverlay,
35    /// Writer transaction id (session-local until a dirty commit makes it
36    /// durable).
37    pub(super) transaction_id: TransactionId,
38    /// Held single-writer advisory lock. Its [`Drop`] releases the lock when this
39    /// transaction ends (on `rollback`, or on any early-return error path); a
40    /// successful dirty [`Self::commit`] releases it explicitly with `drop` so a
41    /// triggered auto-checkpoint can re-acquire it.
42    pub(super) lock: WriterLock,
43}
44
45impl Writer<'_> {
46    /// Registers a structural incidence role.
47    ///
48    /// # Errors
49    ///
50    /// Returns [`DbError`] when the name already exists or ID allocation fails.
51    ///
52    /// # Performance
53    ///
54    /// This method is `O(log role count + name length)`.
55    pub fn register_role(&mut self, name: impl Into<String>) -> Result<RoleId, DbError> {
56        self.delta.register_role(name.into())
57    }
58
59    /// Registers an element or relation label.
60    ///
61    /// # Errors
62    ///
63    /// Returns [`DbError`] when the name already exists or ID allocation fails.
64    ///
65    /// # Performance
66    ///
67    /// This method is `O(log label count + name length)`.
68    pub fn register_label(&mut self, name: impl Into<String>) -> Result<LabelId, DbError> {
69        self.delta.register_label(name.into())
70    }
71
72    /// Registers a relation type.
73    ///
74    /// # Errors
75    ///
76    /// Returns [`DbError`] when the name already exists or ID allocation fails.
77    ///
78    /// # Performance
79    ///
80    /// This method is `O(log relation type count + name length)`.
81    pub fn register_relation_type(
82        &mut self,
83        name: impl Into<String>,
84    ) -> Result<RelationTypeId, DbError> {
85        self.delta.register_relation_type(name.into())
86    }
87
88    /// Registers a typed property key.
89    ///
90    /// # Errors
91    ///
92    /// Returns [`DbError`] when the name already exists or ID allocation fails.
93    ///
94    /// # Performance
95    ///
96    /// This method is `O(log property key count + name length)`.
97    pub fn register_property_key(
98        &mut self,
99        name: impl Into<String>,
100        family: PropertyFamily,
101        value_type: PropertyType,
102    ) -> Result<PropertyKeyId, DbError> {
103        self.delta
104            .register_property_key(name.into(), family, value_type)
105    }
106
107    /// Defines a physical projection.
108    ///
109    /// # Errors
110    ///
111    /// Returns [`DbError`] when referenced catalog IDs are unknown, the
112    /// projection name already exists, or ID allocation fails.
113    ///
114    /// # Performance
115    ///
116    /// This method is `O(definition size + catalog lookup cost)`.
117    pub fn define_projection(
118        &mut self,
119        definition: ProjectionDefinition,
120    ) -> Result<ProjectionId, DbError> {
121        self.validate_projection_definition(&definition)?;
122        self.delta.register_projection(definition)
123    }
124
125    /// Defines an index.
126    ///
127    /// # Errors
128    ///
129    /// Returns [`DbError`] when referenced catalog IDs are unknown, the index
130    /// name already exists, or ID allocation fails.
131    ///
132    /// # Performance
133    ///
134    /// This method is `O(definition size + catalog lookup cost)`.
135    pub fn define_index(
136        &mut self,
137        name: impl Into<String>,
138        definition: IndexDefinition,
139    ) -> Result<IndexId, DbError> {
140        self.validate_index_definition(&definition)?;
141        self.delta.register_index(name.into(), definition)
142    }
143
144    /// Applies a declarative [`Schema`] idempotently (register-or-get every
145    /// declared item), returning the resolved [`Bound`] handle bag. Re-applying
146    /// the same schema reuses existing ids; a name that already exists with a
147    /// conflicting shape is a [`DbError::SchemaConflict`].
148    ///
149    /// # Errors
150    ///
151    /// Returns [`DbError`] on a shape conflict, an undeclared referenced name (an
152    /// index's key, a projection's role/type), or id-allocation failure.
153    ///
154    /// # Performance
155    ///
156    /// This method is `O(declared items × log catalog)`.
157    pub fn apply_schema(&mut self, schema: &Schema) -> Result<Bound, DbError> {
158        let mut bound = Bound::default();
159        for name in &schema.roles {
160            let id = match self.merged().catalog().role_id(name) {
161                Some(id) => id,
162                None => self.register_role(name.clone())?,
163            };
164            bound.roles.insert(name.clone(), id);
165        }
166        for name in &schema.labels {
167            let id = match self.merged().catalog().label_id(name) {
168                Some(id) => id,
169                None => self.register_label(name.clone())?,
170            };
171            bound.labels.insert(name.clone(), id);
172        }
173        for name in &schema.relation_types {
174            let id = match self.merged().catalog().relation_type_id(name) {
175                Some(id) => id,
176                None => self.register_relation_type(name.clone())?,
177            };
178            bound.relation_types.insert(name.clone(), id);
179        }
180        for (name, family, value_type) in &schema.keys {
181            let id = self.register_key_or_get(name, *family, *value_type)?;
182            bound.keys.insert(name.clone(), (id, *value_type));
183        }
184        for (name, key_name) in &schema.equality_indexes {
185            let (key_id, value_type) = *bound.keys.get(key_name).ok_or_else(|| {
186                DbError::Catalog(crate::error::CatalogError::UnknownName {
187                    kind: "property key",
188                    name: key_name.clone(),
189                })
190            })?;
191            let id = match self.merged().catalog().index_id(name) {
192                Some(id) => id,
193                None => self.define_index(
194                    name.clone(),
195                    IndexDefinition::PropertyEquality { key: key_id },
196                )?,
197            };
198            bound
199                .equality_indexes
200                .insert(name.clone(), (id, value_type));
201        }
202        for spec in &schema.graph_projections {
203            let id = match self.merged().catalog().projection_id(&spec.name) {
204                Some(id) => id,
205                None => self.define_graph_projection(spec, &bound)?,
206            };
207            bound.projections.insert(spec.name.clone(), id);
208        }
209        Ok(bound)
210    }
211
212    /// Registers a property key, or returns the existing id when the name is
213    /// already present with a matching family and value type.
214    ///
215    /// # Errors
216    ///
217    /// Returns [`DbError::SchemaConflict`] when the name exists with a different
218    /// family or value type.
219    ///
220    /// # Performance
221    ///
222    /// This method is `O(log catalog)`.
223    fn register_key_or_get(
224        &mut self,
225        name: &str,
226        family: PropertyFamily,
227        value_type: PropertyType,
228    ) -> Result<PropertyKeyId, DbError> {
229        let Some(existing) = self.merged().catalog().property_key_id(name) else {
230            return self.register_property_key(name.to_owned(), family, value_type);
231        };
232        let matches = self
233            .merged()
234            .catalog()
235            .property_key(existing)
236            .is_some_and(|def| def.family == family && def.value_type == value_type);
237        if matches {
238            Ok(existing)
239        } else {
240            Err(DbError::Catalog(
241                crate::error::CatalogError::SchemaConflict {
242                    name: name.to_owned(),
243                    reason: "property key family/value type differs from the existing catalog entry",
244                },
245            ))
246        }
247    }
248
249    /// Defines a graph projection from a spec, resolving its relation-type and
250    /// role names through `bound`.
251    ///
252    /// # Errors
253    ///
254    /// Returns [`DbError::UnknownName`] when a referenced role/type is unbound, or
255    /// a definition error.
256    ///
257    /// # Performance
258    ///
259    /// This method is `O(relation-type count × log catalog)`.
260    fn define_graph_projection(
261        &mut self,
262        spec: &GraphProjectionSpec,
263        bound: &Bound,
264    ) -> Result<ProjectionId, DbError> {
265        let mut relation_types = BTreeSet::new();
266        for name in &spec.relation_types {
267            relation_types.insert(bound.relation_type(name)?);
268        }
269        let source_role = bound.role(&spec.source_role)?;
270        let target_role = bound.role(&spec.target_role)?;
271        self.define_projection(ProjectionDefinition::Graph(GraphProjectionDefinition {
272            name: spec.name.clone(),
273            relation_types,
274            source_role,
275            target_role,
276        }))
277    }
278
279    /// Creates a canonical element.
280    ///
281    /// # Errors
282    ///
283    /// Returns [`DbError::IdOverflow`] when element IDs are exhausted.
284    ///
285    /// # Performance
286    ///
287    /// This method is `O(log element change)`.
288    pub fn create_element(&mut self) -> Result<ElementId, DbError> {
289        self.delta.create_element()
290    }
291
292    /// Creates a canonical relation.
293    ///
294    /// # Errors
295    ///
296    /// Returns [`DbError::IdOverflow`] when relation IDs are exhausted.
297    ///
298    /// # Performance
299    ///
300    /// This method is `O(log relation change)`.
301    pub fn create_relation(&mut self) -> Result<RelationId, DbError> {
302        self.delta.create_relation()
303    }
304
305    /// Creates a canonical incidence.
306    ///
307    /// # Errors
308    ///
309    /// Returns [`DbError`] when referenced IDs are unknown or incidence IDs are
310    /// exhausted.
311    ///
312    /// # Performance
313    ///
314    /// This method is `O(log incidence change + reference lookup cost)`.
315    pub fn create_incidence(
316        &mut self,
317        relation: RelationId,
318        element: ElementId,
319        role: RoleId,
320    ) -> Result<IncidenceId, DbError> {
321        self.require_relation(relation)?;
322        self.require_element(element)?;
323        self.require_role(role)?;
324        self.delta.create_incidence(relation, element, role)
325    }
326
327    /// Tombstones a canonical element and its incidences.
328    ///
329    /// # Errors
330    ///
331    /// Returns [`DbError::UnknownElement`] when the element is not visible.
332    ///
333    /// # Performance
334    ///
335    /// This method is `O(log n + degree)` via the reverse-adjacency index.
336    pub(crate) fn tombstone_element(&mut self, id: ElementId) -> Result<(), DbError> {
337        self.require_element(id)?;
338        // Cascade: every incidence on the element — resolved in O(log n + degree)
339        // through the reverse-adjacency index, not a full incidence scan — is
340        // tombstoned too.
341        let incidences: Vec<IncidenceId> = self
342            .merged()
343            .element_incidences(id)
344            .into_iter()
345            .map(|record| record.id)
346            .collect();
347        let base = self.parent.base_records();
348        self.delta.tombstone_element(base, id);
349        for incidence in incidences {
350            self.delta
351                .tombstone_incidence(self.parent.base_records(), incidence);
352        }
353        Ok(())
354    }
355
356    /// Tombstones a canonical relation and its incidences.
357    ///
358    /// # Errors
359    ///
360    /// Returns [`DbError::UnknownRelation`] when the relation is not visible.
361    ///
362    /// # Performance
363    ///
364    /// This method is `O(log n + degree)` via the reverse-adjacency index.
365    pub(crate) fn tombstone_relation(&mut self, id: RelationId) -> Result<(), DbError> {
366        self.require_relation(id)?;
367        // Cascade: every incidence in the relation — resolved in O(log n + degree)
368        // through the reverse-adjacency index, not a full incidence scan.
369        let incidences: Vec<IncidenceId> = self
370            .merged()
371            .relation_incidences(id)
372            .into_iter()
373            .map(|record| record.id)
374            .collect();
375        let base = self.parent.base_records();
376        self.delta.tombstone_relation(base, id);
377        for incidence in incidences {
378            self.delta
379                .tombstone_incidence(self.parent.base_records(), incidence);
380        }
381        Ok(())
382    }
383
384    /// Tombstones a canonical incidence.
385    ///
386    /// # Errors
387    ///
388    /// Returns [`DbError::UnknownIncidence`] when the incidence is not visible.
389    ///
390    /// # Performance
391    ///
392    /// This method is `O(log incidence change)`.
393    pub(crate) fn tombstone_incidence(&mut self, id: IncidenceId) -> Result<(), DbError> {
394        self.require_incidence(id)?;
395        self.delta
396            .tombstone_incidence(self.parent.base_records(), id);
397        Ok(())
398    }
399
400    /// Adds a label to an element.
401    ///
402    /// # Errors
403    ///
404    /// Returns [`DbError`] when the element or label is unknown.
405    ///
406    /// # Performance
407    ///
408    /// This method is `O(log element change + log label count)`.
409    pub(crate) fn add_element_label(
410        &mut self,
411        element: ElementId,
412        label: LabelId,
413    ) -> Result<(), DbError> {
414        self.require_element(element)?;
415        self.require_label(label)?;
416        self.delta
417            .add_element_label(self.parent.base_records(), element, label);
418        Ok(())
419    }
420
421    /// Adds a label to a relation.
422    ///
423    /// # Errors
424    ///
425    /// Returns [`DbError`] when the relation or label is unknown.
426    ///
427    /// # Performance
428    ///
429    /// This method is `O(log relation change + log label count)`.
430    pub(crate) fn add_relation_label(
431        &mut self,
432        relation: RelationId,
433        label: LabelId,
434    ) -> Result<(), DbError> {
435        self.require_relation(relation)?;
436        self.require_label(label)?;
437        self.delta
438            .add_relation_label(self.parent.base_records(), relation, label);
439        Ok(())
440    }
441
442    /// Sets a relation type.
443    ///
444    /// # Errors
445    ///
446    /// Returns [`DbError`] when the relation or relation type is unknown.
447    ///
448    /// # Performance
449    ///
450    /// This method is `O(log relation change + log relation type count)`.
451    pub fn set_relation_type(
452        &mut self,
453        relation: RelationId,
454        relation_type: RelationTypeId,
455    ) -> Result<(), DbError> {
456        self.require_relation(relation)?;
457        self.require_relation_type(relation_type)?;
458        self.delta
459            .set_relation_type(self.parent.base_records(), relation, relation_type);
460        Ok(())
461    }
462
463    /// Sets a property value.
464    ///
465    /// # Errors
466    ///
467    /// Returns [`DbError`] when the subject or key is unknown, or the value
468    /// does not match the key schema.
469    ///
470    /// # Performance
471    ///
472    /// This method is `O(log subject change + log key count)`.
473    pub(crate) fn set_property(
474        &mut self,
475        subject: PropertySubject,
476        key: PropertyKeyId,
477        value: PropertyValue,
478    ) -> Result<(), DbError> {
479        // Referential integrity: the subject must be visible (this rejects an
480        // orphan property against a tombstoned/absent subject at the transaction
481        // boundary — the overlay layer is permissive by design).
482        self.require_subject(subject)?;
483        let definition = self
484            .merged()
485            .catalog()
486            .property_key(key)
487            .cloned()
488            .ok_or_else(|| DbError::unknown(key))?;
489        if definition.family != subject.family() {
490            return Err(DbError::Query(
491                crate::error::QueryError::WrongPropertyFamily {
492                    expected: definition.family,
493                    actual: subject.family(),
494                },
495            ));
496        }
497        if definition.value_type != value.value_type() {
498            return Err(DbError::Query(
499                crate::error::QueryError::PropertyTypeMismatch {
500                    expected: definition.value_type,
501                    actual: value.value_type(),
502                },
503            ));
504        }
505        self.delta
506            .set_property(self.parent.base_records(), subject, key, value);
507        Ok(())
508    }
509
510    /// Removes a property value.
511    ///
512    /// # Errors
513    ///
514    /// Returns [`DbError`] when the subject or key is unknown.
515    ///
516    /// # Performance
517    ///
518    /// This method is `O(log subject change + log key count)`.
519    pub(crate) fn remove_property(
520        &mut self,
521        subject: PropertySubject,
522        key: PropertyKeyId,
523    ) -> Result<(), DbError> {
524        self.require_subject(subject)?;
525        if self.merged().catalog().property_key(key).is_none() {
526            return Err(DbError::unknown(key));
527        }
528        self.delta
529            .remove_property(self.parent.base_records(), subject, key);
530        Ok(())
531    }
532
533    /// Resolves the property key an equality index covers.
534    ///
535    /// # Errors
536    ///
537    /// Returns [`DbError::UnknownIndex`] when `index` is unknown, or an
538    /// unsupported-query error when it is not a property-equality index.
539    ///
540    /// # Performance
541    ///
542    /// This method is `O(log index count)`.
543    fn equality_index_key(&self, index: IndexId) -> Result<PropertyKeyId, DbError> {
544        let view = self.merged();
545        let entry = view
546            .catalog()
547            .index(index)
548            .ok_or_else(|| DbError::unknown(index))?;
549        match &entry.definition {
550            IndexDefinition::PropertyEquality { key } => Ok(*key),
551            _other => Err(DbError::unsupported(
552                "reconcile requires a property-equality index",
553            )),
554        }
555    }
556
557    /// Inserts or updates the element whose value under `index` equals `value`,
558    /// returning its canonical id — reused when an element already carries that
559    /// identity value (id stable across reconcile), freshly minted (a never-reused
560    /// id, with the identity property set) otherwise.
561    ///
562    /// # Errors
563    ///
564    /// Returns [`DbError`] when `index` is not an equality index or the value
565    /// type mismatches the key schema.
566    ///
567    /// # Performance
568    ///
569    /// This method is `O(log n + value length)` — a probe plus, on a miss, a mint.
570    pub fn upsert_element<T: ValueType>(
571        &mut self,
572        index: EqualityIndex<T>,
573        value: impl Assignable<T>,
574    ) -> Result<ElementId, DbError> {
575        let value = value.into_value()?;
576        let key = self.equality_index_key(index.id())?;
577        let existing = self
578            .merged()
579            .property_equal(key, &value)
580            .into_iter()
581            .find_map(|subject| match subject {
582                PropertySubject::Element(id) => Some(id),
583                PropertySubject::Relation(_) | PropertySubject::Incidence(_) => None,
584            });
585        if let Some(id) = existing {
586            return Ok(id);
587        }
588        let element = self.create_element()?;
589        self.set_property(PropertySubject::Element(element), key, value)?;
590        Ok(element)
591    }
592
593    /// Inserts or updates the relation whose value under `index` equals `value`,
594    /// returning its canonical id. On a miss it mints the relation, sets its type
595    /// and identity property, and creates one incidence per `(element, role)`
596    /// endpoint; on a hit the existing relation (with its endpoints) is reused
597    /// unchanged — the identity value encodes the endpoints, so they are immutable.
598    ///
599    /// # Errors
600    ///
601    /// Returns [`DbError`] when `index` is not an equality index, the value type
602    /// mismatches, or an endpoint element does not exist.
603    ///
604    /// # Performance
605    ///
606    /// This method is `O(log n + endpoints)` — a probe plus, on a miss, a mint.
607    pub fn upsert_relation<T: ValueType>(
608        &mut self,
609        index: EqualityIndex<T>,
610        value: impl Assignable<T>,
611        relation_type: RelationTypeId,
612        endpoints: &[(ElementId, RoleId)],
613    ) -> Result<RelationId, DbError> {
614        let value = value.into_value()?;
615        let key = self.equality_index_key(index.id())?;
616        let existing = self
617            .merged()
618            .property_equal(key, &value)
619            .into_iter()
620            .find_map(|subject| match subject {
621                PropertySubject::Relation(id) => Some(id),
622                PropertySubject::Element(_) | PropertySubject::Incidence(_) => None,
623            });
624        if let Some(id) = existing {
625            return Ok(id);
626        }
627        let relation = self.create_relation()?;
628        self.set_relation_type(relation, relation_type)?;
629        self.set_property(PropertySubject::Relation(relation), key, value)?;
630        for (element, role) in endpoints {
631            self.create_incidence(relation, *element, *role)?;
632        }
633        Ok(relation)
634    }
635
636    /// Tombstones every subject carried by `index` whose identity value is NOT in
637    /// `keep`, cascading each subject's incidences in `O(degree)` via the
638    /// reverse-adjacency index. The prune half of a reconcile: after upserting
639    /// every desired subject, `retain` removes the vanished complement.
640    ///
641    /// # Errors
642    ///
643    /// Returns [`DbError`] when `index` is not an equality index or a `keep` value
644    /// type mismatches the key schema.
645    ///
646    /// # Performance
647    ///
648    /// This method is `O(family size + removed × degree)`.
649    pub fn retain<T: ValueType, V: Assignable<T> + Copy>(
650        &mut self,
651        index: EqualityIndex<T>,
652        keep: &[V],
653    ) -> Result<(), DbError> {
654        let key = self.equality_index_key(index.id())?;
655        let mut keep_values: BTreeSet<PropertyValue> = BTreeSet::new();
656        for value in keep {
657            keep_values.insert((*value).into_value()?);
658        }
659        let stale: Vec<PropertySubject> = self
660            .merged()
661            .property_key_subjects(key)
662            .into_iter()
663            .filter(|(_subject, value)| !keep_values.contains(value))
664            .map(|(subject, _value)| subject)
665            .collect();
666        for subject in stale {
667            match subject {
668                PropertySubject::Element(id) => self.tombstone_element(id)?,
669                PropertySubject::Relation(id) => self.tombstone_relation(id)?,
670                PropertySubject::Incidence(id) => self.tombstone_incidence(id)?,
671            }
672        }
673        Ok(())
674    }
675
676    /// Sets a typed property on a subject; the value type is checked at compile
677    /// time against the key.
678    ///
679    /// # Errors
680    ///
681    /// Returns [`DbError`] when the subject is absent, the value is out of range,
682    /// or the value type mismatches the key schema.
683    ///
684    /// # Performance
685    ///
686    /// This method is `O(log change + log keys)`.
687    pub fn set<T: ValueType>(
688        &mut self,
689        subject: impl Into<PropertySubject>,
690        key: Key<T>,
691        value: impl Assignable<T>,
692    ) -> Result<(), DbError> {
693        self.set_property(subject.into(), key.id(), value.into_value()?)
694    }
695
696    /// Removes a typed property from a subject.
697    ///
698    /// # Errors
699    ///
700    /// Returns [`DbError`] when the subject is absent or the key is unknown.
701    ///
702    /// # Performance
703    ///
704    /// This method is `O(log change + log keys)`.
705    pub fn unset<T: ValueType>(
706        &mut self,
707        subject: impl Into<PropertySubject>,
708        key: Key<T>,
709    ) -> Result<(), DbError> {
710        self.remove_property(subject.into(), key.id())
711    }
712
713    /// Adds a label to an element or relation subject.
714    ///
715    /// # Errors
716    ///
717    /// Returns [`DbError`] when the subject is absent, the label is unknown, or
718    /// the subject is an incidence (incidences carry no labels).
719    ///
720    /// # Performance
721    ///
722    /// This method is `O(log change + log labels)`.
723    pub fn add_label(
724        &mut self,
725        subject: impl Into<PropertySubject>,
726        label: LabelId,
727    ) -> Result<(), DbError> {
728        match subject.into() {
729            PropertySubject::Element(id) => self.add_element_label(id, label),
730            PropertySubject::Relation(id) => self.add_relation_label(id, label),
731            PropertySubject::Incidence(_) => {
732                Err(DbError::unsupported("incidences do not carry labels"))
733            }
734        }
735    }
736
737    /// Tombstones any subject by id, cascading a relation's or element's
738    /// incidences in `O(degree)` via the reverse-adjacency index.
739    ///
740    /// # Errors
741    ///
742    /// Returns [`DbError`] when the subject is not visible.
743    ///
744    /// # Performance
745    ///
746    /// This method is `O(log change + degree)`.
747    pub fn tombstone(&mut self, subject: impl Into<PropertySubject>) -> Result<(), DbError> {
748        match subject.into() {
749            PropertySubject::Element(id) => self.tombstone_element(id),
750            PropertySubject::Relation(id) => self.tombstone_relation(id),
751            PropertySubject::Incidence(id) => self.tombstone_incidence(id),
752        }
753    }
754
755    /// Commits this write transaction durably.
756    ///
757    /// A non-dirty commit returns the parent's commit sequence without appending
758    /// to the WAL or publishing. A dirty commit encodes the overlay's mutation
759    /// log into one WAL frame (with the watermark op last), appends it with an
760    /// fsync (truncating back to the captured EOF on any write error so no
761    /// interior torn record survives), THEN folds the delta into a fresh
762    /// `Arc<Overlay>` and publishes a new `Arc<Snapshot>`.
763    ///
764    /// After publishing, a dirty commit consults the configured
765    /// [`CheckpointPolicy`]: it releases the writer lock FIRST (so the fold can
766    /// re-acquire it), then folds when the delta-log has outgrown the base. The
767    /// committed frame is already durable, so an auto-fold failure does not lose
768    /// data; it is surfaced to the caller.
769    ///
770    /// # Errors
771    ///
772    /// Returns [`DbError`] when commit-sequence allocation, frame encoding, the
773    /// durable append, or a triggered auto-checkpoint fold fails.
774    ///
775    /// # Performance
776    ///
777    /// This method is `O(change)` for the dirty path — flat as the base grows.
778    /// The publish step shares the parent snapshot's already-materialized
779    /// [`crate::overlay::BaseRecords`] and derived index by `Arc` (a commit never
780    /// folds, so the base is byte-identical within the generation), so it neither
781    /// re-decodes the base nor rebuilds the index. A triggered fold adds
782    /// `O(visible state bytes)` on top.
783    pub(crate) fn commit(mut self) -> Result<CommitSeq, DbError> {
784        if self.delta.is_empty() {
785            // Non-dirty commit: no append, no publish, no durable id advance.
786            return Ok(self.parent.lsn());
787        }
788        let lsn = self
789            .parent
790            .lsn()
791            .checked_next()
792            .ok_or(DbError::Txn(crate::error::TxnError::CommitSeqOverflow))?;
793        let (ops, blob) = self.delta.take_frame();
794        let frame = wal::encode_commit(
795            lsn.get(),
796            self.transaction_id.get(),
797            self.database.base_generation,
798            &ops,
799            &blob,
800        )?;
801        let mut log = open_log_for_append(&self.database.root, self.database.base_generation)?;
802        wal::append_commit(&mut log, &frame)?;
803
804        // Durable: the delta was seeded from the parent overlay and only added
805        // this writer's changes, so freezing it directly is the full new
806        // published overlay (parent state + this commit). The parent overlay was
807        // never mutated — this is a brand-new frozen `Arc<Overlay>`, so a reader
808        // pinning the parent is unaffected.
809        let new_overlay = Arc::new(self.delta.freeze());
810        // A commit never folds, so the new snapshot pins the SAME base generation
811        // as the parent — the base wire bytes are byte-identical, and so are the
812        // owned records and the derived index built from them. Share the parent's
813        // `Arc<BaseRecords>` (and its `BaseIndex`) instead of re-decoding the base
814        // and rebuilding the index, which keeps a single-element commit `O(change)`
815        // rather than `O(base)` regardless of how large the base has grown.
816        let snapshot = Snapshot::with_shared_base_records(
817            self.parent.generation(),
818            lsn,
819            Arc::clone(self.parent.base()),
820            new_overlay,
821            Arc::clone(self.parent.base_records()),
822        );
823        self.database.current = Arc::new(snapshot);
824        self.database.last_transaction_id = self.transaction_id;
825        // Release the writer lock before any auto-fold so the fold can re-acquire
826        // it (a partial move out of `self`, legal because `Writer` has
827        // no `Drop` impl; the remaining `&mut Db` borrow stays live).
828        drop(self.lock);
829        self.database.maybe_auto_checkpoint()?;
830        Ok(lsn)
831    }
832
833    /// Returns the merged read view this writer sees (overlay over base).
834    ///
835    /// # Performance
836    ///
837    /// This method is `O(1)` to construct.
838    fn merged(&self) -> crate::overlay::WriteMergedState<'_> {
839        crate::overlay::WriteMergedState::new(self.parent.base_records(), &self.delta)
840    }
841
842    /// Requires an element to be visible in the writer's merged view.
843    ///
844    /// # Errors
845    ///
846    /// Returns [`DbError::UnknownElement`] when absent.
847    ///
848    /// # Performance
849    ///
850    /// This method is `O(log change + log n)`.
851    fn require_element(&self, id: ElementId) -> Result<(), DbError> {
852        if self.merged().contains_element(id) {
853            Ok(())
854        } else {
855            Err(DbError::unknown(id))
856        }
857    }
858
859    /// Requires a relation to be visible.
860    ///
861    /// # Errors
862    ///
863    /// Returns [`DbError::UnknownRelation`] when absent.
864    ///
865    /// # Performance
866    ///
867    /// This method is `O(log change + log n)`.
868    fn require_relation(&self, id: RelationId) -> Result<(), DbError> {
869        if self.merged().contains_relation(id) {
870            Ok(())
871        } else {
872            Err(DbError::unknown(id))
873        }
874    }
875
876    /// Requires an incidence to be visible.
877    ///
878    /// # Errors
879    ///
880    /// Returns [`DbError::UnknownIncidence`] when absent.
881    ///
882    /// # Performance
883    ///
884    /// This method is `O(log change + log n)`.
885    fn require_incidence(&self, id: IncidenceId) -> Result<(), DbError> {
886        if self.merged().contains_incidence(id) {
887            Ok(())
888        } else {
889            Err(DbError::unknown(id))
890        }
891    }
892
893    /// Requires a role to exist in the merged catalog.
894    ///
895    /// # Errors
896    ///
897    /// Returns [`DbError::UnknownRole`] when absent.
898    ///
899    /// # Performance
900    ///
901    /// This method is `O(log role count)`.
902    fn require_role(&self, id: RoleId) -> Result<(), DbError> {
903        if self.delta.catalog().role(id).is_some() {
904            Ok(())
905        } else {
906            Err(DbError::unknown(id))
907        }
908    }
909
910    /// Requires a label to exist in the merged catalog.
911    ///
912    /// # Errors
913    ///
914    /// Returns [`DbError::UnknownLabel`] when absent.
915    ///
916    /// # Performance
917    ///
918    /// This method is `O(log label count)`.
919    fn require_label(&self, id: LabelId) -> Result<(), DbError> {
920        if self.delta.catalog().label(id).is_some() {
921            Ok(())
922        } else {
923            Err(DbError::unknown(id))
924        }
925    }
926
927    /// Requires a relation type to exist in the merged catalog.
928    ///
929    /// # Errors
930    ///
931    /// Returns [`DbError::UnknownRelationType`] when absent.
932    ///
933    /// # Performance
934    ///
935    /// This method is `O(log relation type count)`.
936    fn require_relation_type(&self, id: RelationTypeId) -> Result<(), DbError> {
937        if self.delta.catalog().relation_type(id).is_some() {
938            Ok(())
939        } else {
940            Err(DbError::unknown(id))
941        }
942    }
943
944    /// Requires a property subject to be visible.
945    ///
946    /// # Errors
947    ///
948    /// Returns the matching `Unknown*` error when the subject is absent.
949    ///
950    /// # Performance
951    ///
952    /// This method is `O(log change + log n)`.
953    fn require_subject(&self, subject: PropertySubject) -> Result<(), DbError> {
954        match subject {
955            PropertySubject::Element(id) => self.require_element(id),
956            PropertySubject::Relation(id) => self.require_relation(id),
957            PropertySubject::Incidence(id) => self.require_incidence(id),
958        }
959    }
960
961    /// Validates one projection definition against the merged catalog.
962    ///
963    /// # Errors
964    ///
965    /// Returns [`DbError`] when a referenced role or relation type is unknown.
966    ///
967    /// # Performance
968    ///
969    /// This method is `O(definition size)`.
970    fn validate_projection_definition(
971        &self,
972        definition: &ProjectionDefinition,
973    ) -> Result<(), DbError> {
974        match definition {
975            ProjectionDefinition::Graph(graph) => {
976                self.require_role(graph.source_role)?;
977                self.require_role(graph.target_role)?;
978                for relation_type in &graph.relation_types {
979                    self.require_relation_type(*relation_type)?;
980                }
981                Ok(())
982            }
983            ProjectionDefinition::Hypergraph(hyper) => {
984                for role in &hyper.source_roles {
985                    self.require_role(*role)?;
986                }
987                for role in &hyper.target_roles {
988                    self.require_role(*role)?;
989                }
990                for relation_type in &hyper.relation_types {
991                    self.require_relation_type(*relation_type)?;
992                }
993                Ok(())
994            }
995        }
996    }
997
998    /// Validates one index definition against the merged catalog.
999    ///
1000    /// # Errors
1001    ///
1002    /// Returns [`DbError`] when a referenced catalog id is unknown or a
1003    /// composite index has no keys.
1004    ///
1005    /// # Performance
1006    ///
1007    /// This method is `O(definition size)`.
1008    fn validate_index_definition(&self, definition: &IndexDefinition) -> Result<(), DbError> {
1009        let catalog = self.delta.catalog();
1010        match definition {
1011            IndexDefinition::Label { label } => self.require_label(*label),
1012            IndexDefinition::RelationType { relation_type } => {
1013                self.require_relation_type(*relation_type)
1014            }
1015            IndexDefinition::PropertyEquality { key } | IndexDefinition::PropertyRange { key } => {
1016                self.require_property_key(*key)
1017            }
1018            IndexDefinition::CompositeEquality { keys } => {
1019                if keys.is_empty() {
1020                    return Err(DbError::unsupported(
1021                        "composite equality index requires at least one key",
1022                    ));
1023                }
1024                for key in keys {
1025                    self.require_property_key(*key)?;
1026                }
1027                Ok(())
1028            }
1029            IndexDefinition::Projection { projection } => catalog
1030                .projection(*projection)
1031                .is_some()
1032                .then_some(())
1033                .ok_or_else(|| DbError::unknown(*projection)),
1034        }
1035    }
1036
1037    /// Requires a property key to exist in the merged catalog.
1038    ///
1039    /// # Errors
1040    ///
1041    /// Returns [`DbError::UnknownPropertyKey`] when absent.
1042    ///
1043    /// # Performance
1044    ///
1045    /// This method is `O(log property key count)`.
1046    fn require_property_key(&self, id: PropertyKeyId) -> Result<(), DbError> {
1047        if self.delta.catalog().property_key(id).is_some() {
1048            Ok(())
1049        } else {
1050            Err(DbError::unknown(id))
1051        }
1052    }
1053}