oxgraph_db/database/writer.rs
1//! The single writer transaction: mutators, typed surface, and commit.
2
3use std::{collections::BTreeSet, sync::Arc};
4
5use super::{Db, open::open_log_for_append};
6use crate::{
7 Bound, CommitSeq, DbError, ElementId, GraphProjectionDefinition, GraphProjectionSpec,
8 IncidenceId, IndexId, LabelId, ProjectionDefinition, ProjectionId, PropertyKeyId,
9 PropertySubject, PropertyType, PropertyValue, RelationId, RelationTypeId, RoleId, Schema,
10 TransactionId,
11 catalog::{IndexDefinition, PropertyFamily},
12 lock::WriterLock,
13 overlay::{Snapshot, StateView, WriteOverlay},
14 typed::{Assignable, EqualityIndex, Key, ValueType},
15 wal,
16};
17
18/// Single writer transaction.
19///
20/// Mutations accumulate into a private write overlay layered over the parent
21/// snapshot; reads fall through the overlay then the base. `commit` appends the
22/// overlay's mutation log to the WAL (when dirty) and publishes a fresh snapshot;
23/// `rollback` drops the overlay and appends nothing.
24///
25/// # Performance
26///
27/// Creating and moving a writer is `O(1)`; each mutation is `O(log change)`.
28pub struct Writer<'db> {
29 /// Db receiving the commit.
30 pub(super) database: &'db mut Db,
31 /// Parent snapshot the writer layers over (its base + frozen overlay).
32 pub(super) parent: Arc<Snapshot>,
33 /// Private mutable delta this writer accumulates.
34 pub(super) delta: WriteOverlay,
35 /// Writer transaction id (session-local until a dirty commit makes it
36 /// durable).
37 pub(super) transaction_id: TransactionId,
38 /// Held single-writer advisory lock. Its [`Drop`] releases the lock when this
39 /// transaction ends (on `rollback`, or on any early-return error path); a
40 /// successful dirty [`Self::commit`] releases it explicitly with `drop` so a
41 /// triggered auto-checkpoint can re-acquire it.
42 pub(super) lock: WriterLock,
43}
44
45impl Writer<'_> {
46 /// Registers a structural incidence role.
47 ///
48 /// # Errors
49 ///
50 /// Returns [`DbError`] when the name already exists or ID allocation fails.
51 ///
52 /// # Performance
53 ///
54 /// This method is `O(log role count + name length)`.
55 pub fn register_role(&mut self, name: impl Into<String>) -> Result<RoleId, DbError> {
56 self.delta.register_role(name.into())
57 }
58
59 /// Registers an element or relation label.
60 ///
61 /// # Errors
62 ///
63 /// Returns [`DbError`] when the name already exists or ID allocation fails.
64 ///
65 /// # Performance
66 ///
67 /// This method is `O(log label count + name length)`.
68 pub fn register_label(&mut self, name: impl Into<String>) -> Result<LabelId, DbError> {
69 self.delta.register_label(name.into())
70 }
71
72 /// Registers a relation type.
73 ///
74 /// # Errors
75 ///
76 /// Returns [`DbError`] when the name already exists or ID allocation fails.
77 ///
78 /// # Performance
79 ///
80 /// This method is `O(log relation type count + name length)`.
81 pub fn register_relation_type(
82 &mut self,
83 name: impl Into<String>,
84 ) -> Result<RelationTypeId, DbError> {
85 self.delta.register_relation_type(name.into())
86 }
87
88 /// Registers a typed property key.
89 ///
90 /// # Errors
91 ///
92 /// Returns [`DbError`] when the name already exists or ID allocation fails.
93 ///
94 /// # Performance
95 ///
96 /// This method is `O(log property key count + name length)`.
97 pub fn register_property_key(
98 &mut self,
99 name: impl Into<String>,
100 family: PropertyFamily,
101 value_type: PropertyType,
102 ) -> Result<PropertyKeyId, DbError> {
103 self.delta
104 .register_property_key(name.into(), family, value_type)
105 }
106
107 /// Defines a physical projection.
108 ///
109 /// # Errors
110 ///
111 /// Returns [`DbError`] when referenced catalog IDs are unknown, the
112 /// projection name already exists, or ID allocation fails.
113 ///
114 /// # Performance
115 ///
116 /// This method is `O(definition size + catalog lookup cost)`.
117 pub fn define_projection(
118 &mut self,
119 definition: ProjectionDefinition,
120 ) -> Result<ProjectionId, DbError> {
121 self.validate_projection_definition(&definition)?;
122 self.delta.register_projection(definition)
123 }
124
125 /// Defines an index.
126 ///
127 /// # Errors
128 ///
129 /// Returns [`DbError`] when referenced catalog IDs are unknown, the index
130 /// name already exists, or ID allocation fails.
131 ///
132 /// # Performance
133 ///
134 /// This method is `O(definition size + catalog lookup cost)`.
135 pub fn define_index(
136 &mut self,
137 name: impl Into<String>,
138 definition: IndexDefinition,
139 ) -> Result<IndexId, DbError> {
140 self.validate_index_definition(&definition)?;
141 self.delta.register_index(name.into(), definition)
142 }
143
144 /// Applies a declarative [`Schema`] idempotently (register-or-get every
145 /// declared item), returning the resolved [`Bound`] handle bag. Re-applying
146 /// the same schema reuses existing ids; a name that already exists with a
147 /// conflicting shape is a [`DbError::SchemaConflict`].
148 ///
149 /// # Errors
150 ///
151 /// Returns [`DbError`] on a shape conflict, an undeclared referenced name (an
152 /// index's key, a projection's role/type), or id-allocation failure.
153 ///
154 /// # Performance
155 ///
156 /// This method is `O(declared items × log catalog)`.
157 pub fn apply_schema(&mut self, schema: &Schema) -> Result<Bound, DbError> {
158 let mut bound = Bound::default();
159 for name in &schema.roles {
160 let id = match self.merged().catalog().role_id(name) {
161 Some(id) => id,
162 None => self.register_role(name.clone())?,
163 };
164 bound.roles.insert(name.clone(), id);
165 }
166 for name in &schema.labels {
167 let id = match self.merged().catalog().label_id(name) {
168 Some(id) => id,
169 None => self.register_label(name.clone())?,
170 };
171 bound.labels.insert(name.clone(), id);
172 }
173 for name in &schema.relation_types {
174 let id = match self.merged().catalog().relation_type_id(name) {
175 Some(id) => id,
176 None => self.register_relation_type(name.clone())?,
177 };
178 bound.relation_types.insert(name.clone(), id);
179 }
180 for (name, family, value_type) in &schema.keys {
181 let id = self.register_key_or_get(name, *family, *value_type)?;
182 bound.keys.insert(name.clone(), (id, *value_type));
183 }
184 for (name, key_name) in &schema.equality_indexes {
185 let (key_id, value_type) = *bound.keys.get(key_name).ok_or_else(|| {
186 DbError::Catalog(crate::error::CatalogError::UnknownName {
187 kind: "property key",
188 name: key_name.clone(),
189 })
190 })?;
191 let id = match self.merged().catalog().index_id(name) {
192 Some(id) => id,
193 None => self.define_index(
194 name.clone(),
195 IndexDefinition::PropertyEquality { key: key_id },
196 )?,
197 };
198 bound
199 .equality_indexes
200 .insert(name.clone(), (id, value_type));
201 }
202 for spec in &schema.graph_projections {
203 let id = match self.merged().catalog().projection_id(&spec.name) {
204 Some(id) => id,
205 None => self.define_graph_projection(spec, &bound)?,
206 };
207 bound.projections.insert(spec.name.clone(), id);
208 }
209 Ok(bound)
210 }
211
212 /// Registers a property key, or returns the existing id when the name is
213 /// already present with a matching family and value type.
214 ///
215 /// # Errors
216 ///
217 /// Returns [`DbError::SchemaConflict`] when the name exists with a different
218 /// family or value type.
219 ///
220 /// # Performance
221 ///
222 /// This method is `O(log catalog)`.
223 fn register_key_or_get(
224 &mut self,
225 name: &str,
226 family: PropertyFamily,
227 value_type: PropertyType,
228 ) -> Result<PropertyKeyId, DbError> {
229 let Some(existing) = self.merged().catalog().property_key_id(name) else {
230 return self.register_property_key(name.to_owned(), family, value_type);
231 };
232 let matches = self
233 .merged()
234 .catalog()
235 .property_key(existing)
236 .is_some_and(|def| def.family == family && def.value_type == value_type);
237 if matches {
238 Ok(existing)
239 } else {
240 Err(DbError::Catalog(
241 crate::error::CatalogError::SchemaConflict {
242 name: name.to_owned(),
243 reason: "property key family/value type differs from the existing catalog entry",
244 },
245 ))
246 }
247 }
248
249 /// Defines a graph projection from a spec, resolving its relation-type and
250 /// role names through `bound`.
251 ///
252 /// # Errors
253 ///
254 /// Returns [`DbError::UnknownName`] when a referenced role/type is unbound, or
255 /// a definition error.
256 ///
257 /// # Performance
258 ///
259 /// This method is `O(relation-type count × log catalog)`.
260 fn define_graph_projection(
261 &mut self,
262 spec: &GraphProjectionSpec,
263 bound: &Bound,
264 ) -> Result<ProjectionId, DbError> {
265 let mut relation_types = BTreeSet::new();
266 for name in &spec.relation_types {
267 relation_types.insert(bound.relation_type(name)?);
268 }
269 let source_role = bound.role(&spec.source_role)?;
270 let target_role = bound.role(&spec.target_role)?;
271 self.define_projection(ProjectionDefinition::Graph(GraphProjectionDefinition {
272 name: spec.name.clone(),
273 relation_types,
274 source_role,
275 target_role,
276 }))
277 }
278
279 /// Creates a canonical element.
280 ///
281 /// # Errors
282 ///
283 /// Returns [`DbError::IdOverflow`] when element IDs are exhausted.
284 ///
285 /// # Performance
286 ///
287 /// This method is `O(log element change)`.
288 pub fn create_element(&mut self) -> Result<ElementId, DbError> {
289 self.delta.create_element()
290 }
291
292 /// Creates a canonical relation.
293 ///
294 /// # Errors
295 ///
296 /// Returns [`DbError::IdOverflow`] when relation IDs are exhausted.
297 ///
298 /// # Performance
299 ///
300 /// This method is `O(log relation change)`.
301 pub fn create_relation(&mut self) -> Result<RelationId, DbError> {
302 self.delta.create_relation()
303 }
304
305 /// Creates a canonical incidence.
306 ///
307 /// # Errors
308 ///
309 /// Returns [`DbError`] when referenced IDs are unknown or incidence IDs are
310 /// exhausted.
311 ///
312 /// # Performance
313 ///
314 /// This method is `O(log incidence change + reference lookup cost)`.
315 pub fn create_incidence(
316 &mut self,
317 relation: RelationId,
318 element: ElementId,
319 role: RoleId,
320 ) -> Result<IncidenceId, DbError> {
321 self.require_relation(relation)?;
322 self.require_element(element)?;
323 self.require_role(role)?;
324 self.delta.create_incidence(relation, element, role)
325 }
326
327 /// Tombstones a canonical element and its incidences.
328 ///
329 /// # Errors
330 ///
331 /// Returns [`DbError::UnknownElement`] when the element is not visible.
332 ///
333 /// # Performance
334 ///
335 /// This method is `O(log n + degree)` via the reverse-adjacency index.
336 pub(crate) fn tombstone_element(&mut self, id: ElementId) -> Result<(), DbError> {
337 self.require_element(id)?;
338 // Cascade: every incidence on the element — resolved in O(log n + degree)
339 // through the reverse-adjacency index, not a full incidence scan — is
340 // tombstoned too.
341 let incidences: Vec<IncidenceId> = self
342 .merged()
343 .element_incidences(id)
344 .into_iter()
345 .map(|record| record.id)
346 .collect();
347 let base = self.parent.base_records();
348 self.delta.tombstone_element(base, id);
349 for incidence in incidences {
350 self.delta
351 .tombstone_incidence(self.parent.base_records(), incidence);
352 }
353 Ok(())
354 }
355
356 /// Tombstones a canonical relation and its incidences.
357 ///
358 /// # Errors
359 ///
360 /// Returns [`DbError::UnknownRelation`] when the relation is not visible.
361 ///
362 /// # Performance
363 ///
364 /// This method is `O(log n + degree)` via the reverse-adjacency index.
365 pub(crate) fn tombstone_relation(&mut self, id: RelationId) -> Result<(), DbError> {
366 self.require_relation(id)?;
367 // Cascade: every incidence in the relation — resolved in O(log n + degree)
368 // through the reverse-adjacency index, not a full incidence scan.
369 let incidences: Vec<IncidenceId> = self
370 .merged()
371 .relation_incidences(id)
372 .into_iter()
373 .map(|record| record.id)
374 .collect();
375 let base = self.parent.base_records();
376 self.delta.tombstone_relation(base, id);
377 for incidence in incidences {
378 self.delta
379 .tombstone_incidence(self.parent.base_records(), incidence);
380 }
381 Ok(())
382 }
383
384 /// Tombstones a canonical incidence.
385 ///
386 /// # Errors
387 ///
388 /// Returns [`DbError::UnknownIncidence`] when the incidence is not visible.
389 ///
390 /// # Performance
391 ///
392 /// This method is `O(log incidence change)`.
393 pub(crate) fn tombstone_incidence(&mut self, id: IncidenceId) -> Result<(), DbError> {
394 self.require_incidence(id)?;
395 self.delta
396 .tombstone_incidence(self.parent.base_records(), id);
397 Ok(())
398 }
399
400 /// Adds a label to an element.
401 ///
402 /// # Errors
403 ///
404 /// Returns [`DbError`] when the element or label is unknown.
405 ///
406 /// # Performance
407 ///
408 /// This method is `O(log element change + log label count)`.
409 pub(crate) fn add_element_label(
410 &mut self,
411 element: ElementId,
412 label: LabelId,
413 ) -> Result<(), DbError> {
414 self.require_element(element)?;
415 self.require_label(label)?;
416 self.delta
417 .add_element_label(self.parent.base_records(), element, label);
418 Ok(())
419 }
420
421 /// Adds a label to a relation.
422 ///
423 /// # Errors
424 ///
425 /// Returns [`DbError`] when the relation or label is unknown.
426 ///
427 /// # Performance
428 ///
429 /// This method is `O(log relation change + log label count)`.
430 pub(crate) fn add_relation_label(
431 &mut self,
432 relation: RelationId,
433 label: LabelId,
434 ) -> Result<(), DbError> {
435 self.require_relation(relation)?;
436 self.require_label(label)?;
437 self.delta
438 .add_relation_label(self.parent.base_records(), relation, label);
439 Ok(())
440 }
441
442 /// Sets a relation type.
443 ///
444 /// # Errors
445 ///
446 /// Returns [`DbError`] when the relation or relation type is unknown.
447 ///
448 /// # Performance
449 ///
450 /// This method is `O(log relation change + log relation type count)`.
451 pub fn set_relation_type(
452 &mut self,
453 relation: RelationId,
454 relation_type: RelationTypeId,
455 ) -> Result<(), DbError> {
456 self.require_relation(relation)?;
457 self.require_relation_type(relation_type)?;
458 self.delta
459 .set_relation_type(self.parent.base_records(), relation, relation_type);
460 Ok(())
461 }
462
463 /// Sets a property value.
464 ///
465 /// # Errors
466 ///
467 /// Returns [`DbError`] when the subject or key is unknown, or the value
468 /// does not match the key schema.
469 ///
470 /// # Performance
471 ///
472 /// This method is `O(log subject change + log key count)`.
473 pub(crate) fn set_property(
474 &mut self,
475 subject: PropertySubject,
476 key: PropertyKeyId,
477 value: PropertyValue,
478 ) -> Result<(), DbError> {
479 // Referential integrity: the subject must be visible (this rejects an
480 // orphan property against a tombstoned/absent subject at the transaction
481 // boundary — the overlay layer is permissive by design).
482 self.require_subject(subject)?;
483 let definition = self
484 .merged()
485 .catalog()
486 .property_key(key)
487 .cloned()
488 .ok_or_else(|| DbError::unknown(key))?;
489 if definition.family != subject.family() {
490 return Err(DbError::Query(
491 crate::error::QueryError::WrongPropertyFamily {
492 expected: definition.family,
493 actual: subject.family(),
494 },
495 ));
496 }
497 if definition.value_type != value.value_type() {
498 return Err(DbError::Query(
499 crate::error::QueryError::PropertyTypeMismatch {
500 expected: definition.value_type,
501 actual: value.value_type(),
502 },
503 ));
504 }
505 self.delta
506 .set_property(self.parent.base_records(), subject, key, value);
507 Ok(())
508 }
509
510 /// Removes a property value.
511 ///
512 /// # Errors
513 ///
514 /// Returns [`DbError`] when the subject or key is unknown.
515 ///
516 /// # Performance
517 ///
518 /// This method is `O(log subject change + log key count)`.
519 pub(crate) fn remove_property(
520 &mut self,
521 subject: PropertySubject,
522 key: PropertyKeyId,
523 ) -> Result<(), DbError> {
524 self.require_subject(subject)?;
525 if self.merged().catalog().property_key(key).is_none() {
526 return Err(DbError::unknown(key));
527 }
528 self.delta
529 .remove_property(self.parent.base_records(), subject, key);
530 Ok(())
531 }
532
533 /// Resolves the property key an equality index covers.
534 ///
535 /// # Errors
536 ///
537 /// Returns [`DbError::UnknownIndex`] when `index` is unknown, or an
538 /// unsupported-query error when it is not a property-equality index.
539 ///
540 /// # Performance
541 ///
542 /// This method is `O(log index count)`.
543 fn equality_index_key(&self, index: IndexId) -> Result<PropertyKeyId, DbError> {
544 let view = self.merged();
545 let entry = view
546 .catalog()
547 .index(index)
548 .ok_or_else(|| DbError::unknown(index))?;
549 match &entry.definition {
550 IndexDefinition::PropertyEquality { key } => Ok(*key),
551 _other => Err(DbError::unsupported(
552 "reconcile requires a property-equality index",
553 )),
554 }
555 }
556
557 /// Inserts or updates the element whose value under `index` equals `value`,
558 /// returning its canonical id — reused when an element already carries that
559 /// identity value (id stable across reconcile), freshly minted (a never-reused
560 /// id, with the identity property set) otherwise.
561 ///
562 /// # Errors
563 ///
564 /// Returns [`DbError`] when `index` is not an equality index or the value
565 /// type mismatches the key schema.
566 ///
567 /// # Performance
568 ///
569 /// This method is `O(log n + value length)` — a probe plus, on a miss, a mint.
570 pub fn upsert_element<T: ValueType>(
571 &mut self,
572 index: EqualityIndex<T>,
573 value: impl Assignable<T>,
574 ) -> Result<ElementId, DbError> {
575 let value = value.into_value()?;
576 let key = self.equality_index_key(index.id())?;
577 let existing = self
578 .merged()
579 .property_equal(key, &value)
580 .into_iter()
581 .find_map(|subject| match subject {
582 PropertySubject::Element(id) => Some(id),
583 PropertySubject::Relation(_) | PropertySubject::Incidence(_) => None,
584 });
585 if let Some(id) = existing {
586 return Ok(id);
587 }
588 let element = self.create_element()?;
589 self.set_property(PropertySubject::Element(element), key, value)?;
590 Ok(element)
591 }
592
593 /// Inserts or updates the relation whose value under `index` equals `value`,
594 /// returning its canonical id. On a miss it mints the relation, sets its type
595 /// and identity property, and creates one incidence per `(element, role)`
596 /// endpoint; on a hit the existing relation (with its endpoints) is reused
597 /// unchanged — the identity value encodes the endpoints, so they are immutable.
598 ///
599 /// # Errors
600 ///
601 /// Returns [`DbError`] when `index` is not an equality index, the value type
602 /// mismatches, or an endpoint element does not exist.
603 ///
604 /// # Performance
605 ///
606 /// This method is `O(log n + endpoints)` — a probe plus, on a miss, a mint.
607 pub fn upsert_relation<T: ValueType>(
608 &mut self,
609 index: EqualityIndex<T>,
610 value: impl Assignable<T>,
611 relation_type: RelationTypeId,
612 endpoints: &[(ElementId, RoleId)],
613 ) -> Result<RelationId, DbError> {
614 let value = value.into_value()?;
615 let key = self.equality_index_key(index.id())?;
616 let existing = self
617 .merged()
618 .property_equal(key, &value)
619 .into_iter()
620 .find_map(|subject| match subject {
621 PropertySubject::Relation(id) => Some(id),
622 PropertySubject::Element(_) | PropertySubject::Incidence(_) => None,
623 });
624 if let Some(id) = existing {
625 return Ok(id);
626 }
627 let relation = self.create_relation()?;
628 self.set_relation_type(relation, relation_type)?;
629 self.set_property(PropertySubject::Relation(relation), key, value)?;
630 for (element, role) in endpoints {
631 self.create_incidence(relation, *element, *role)?;
632 }
633 Ok(relation)
634 }
635
636 /// Tombstones every subject carried by `index` whose identity value is NOT in
637 /// `keep`, cascading each subject's incidences in `O(degree)` via the
638 /// reverse-adjacency index. The prune half of a reconcile: after upserting
639 /// every desired subject, `retain` removes the vanished complement.
640 ///
641 /// # Errors
642 ///
643 /// Returns [`DbError`] when `index` is not an equality index or a `keep` value
644 /// type mismatches the key schema.
645 ///
646 /// # Performance
647 ///
648 /// This method is `O(family size + removed × degree)`.
649 pub fn retain<T: ValueType, V: Assignable<T> + Copy>(
650 &mut self,
651 index: EqualityIndex<T>,
652 keep: &[V],
653 ) -> Result<(), DbError> {
654 let key = self.equality_index_key(index.id())?;
655 let mut keep_values: BTreeSet<PropertyValue> = BTreeSet::new();
656 for value in keep {
657 keep_values.insert((*value).into_value()?);
658 }
659 let stale: Vec<PropertySubject> = self
660 .merged()
661 .property_key_subjects(key)
662 .into_iter()
663 .filter(|(_subject, value)| !keep_values.contains(value))
664 .map(|(subject, _value)| subject)
665 .collect();
666 for subject in stale {
667 match subject {
668 PropertySubject::Element(id) => self.tombstone_element(id)?,
669 PropertySubject::Relation(id) => self.tombstone_relation(id)?,
670 PropertySubject::Incidence(id) => self.tombstone_incidence(id)?,
671 }
672 }
673 Ok(())
674 }
675
676 /// Sets a typed property on a subject; the value type is checked at compile
677 /// time against the key.
678 ///
679 /// # Errors
680 ///
681 /// Returns [`DbError`] when the subject is absent, the value is out of range,
682 /// or the value type mismatches the key schema.
683 ///
684 /// # Performance
685 ///
686 /// This method is `O(log change + log keys)`.
687 pub fn set<T: ValueType>(
688 &mut self,
689 subject: impl Into<PropertySubject>,
690 key: Key<T>,
691 value: impl Assignable<T>,
692 ) -> Result<(), DbError> {
693 self.set_property(subject.into(), key.id(), value.into_value()?)
694 }
695
696 /// Removes a typed property from a subject.
697 ///
698 /// # Errors
699 ///
700 /// Returns [`DbError`] when the subject is absent or the key is unknown.
701 ///
702 /// # Performance
703 ///
704 /// This method is `O(log change + log keys)`.
705 pub fn unset<T: ValueType>(
706 &mut self,
707 subject: impl Into<PropertySubject>,
708 key: Key<T>,
709 ) -> Result<(), DbError> {
710 self.remove_property(subject.into(), key.id())
711 }
712
713 /// Adds a label to an element or relation subject.
714 ///
715 /// # Errors
716 ///
717 /// Returns [`DbError`] when the subject is absent, the label is unknown, or
718 /// the subject is an incidence (incidences carry no labels).
719 ///
720 /// # Performance
721 ///
722 /// This method is `O(log change + log labels)`.
723 pub fn add_label(
724 &mut self,
725 subject: impl Into<PropertySubject>,
726 label: LabelId,
727 ) -> Result<(), DbError> {
728 match subject.into() {
729 PropertySubject::Element(id) => self.add_element_label(id, label),
730 PropertySubject::Relation(id) => self.add_relation_label(id, label),
731 PropertySubject::Incidence(_) => {
732 Err(DbError::unsupported("incidences do not carry labels"))
733 }
734 }
735 }
736
737 /// Tombstones any subject by id, cascading a relation's or element's
738 /// incidences in `O(degree)` via the reverse-adjacency index.
739 ///
740 /// # Errors
741 ///
742 /// Returns [`DbError`] when the subject is not visible.
743 ///
744 /// # Performance
745 ///
746 /// This method is `O(log change + degree)`.
747 pub fn tombstone(&mut self, subject: impl Into<PropertySubject>) -> Result<(), DbError> {
748 match subject.into() {
749 PropertySubject::Element(id) => self.tombstone_element(id),
750 PropertySubject::Relation(id) => self.tombstone_relation(id),
751 PropertySubject::Incidence(id) => self.tombstone_incidence(id),
752 }
753 }
754
755 /// Commits this write transaction durably.
756 ///
757 /// A non-dirty commit returns the parent's commit sequence without appending
758 /// to the WAL or publishing. A dirty commit encodes the overlay's mutation
759 /// log into one WAL frame (with the watermark op last), appends it with an
760 /// fsync (truncating back to the captured EOF on any write error so no
761 /// interior torn record survives), THEN folds the delta into a fresh
762 /// `Arc<Overlay>` and publishes a new `Arc<Snapshot>`.
763 ///
764 /// After publishing, a dirty commit consults the configured
765 /// [`CheckpointPolicy`]: it releases the writer lock FIRST (so the fold can
766 /// re-acquire it), then folds when the delta-log has outgrown the base. The
767 /// committed frame is already durable, so an auto-fold failure does not lose
768 /// data; it is surfaced to the caller.
769 ///
770 /// # Errors
771 ///
772 /// Returns [`DbError`] when commit-sequence allocation, frame encoding, the
773 /// durable append, or a triggered auto-checkpoint fold fails.
774 ///
775 /// # Performance
776 ///
777 /// This method is `O(change)` for the dirty path — flat as the base grows.
778 /// The publish step shares the parent snapshot's already-materialized
779 /// [`crate::overlay::BaseRecords`] and derived index by `Arc` (a commit never
780 /// folds, so the base is byte-identical within the generation), so it neither
781 /// re-decodes the base nor rebuilds the index. A triggered fold adds
782 /// `O(visible state bytes)` on top.
783 pub(crate) fn commit(mut self) -> Result<CommitSeq, DbError> {
784 if self.delta.is_empty() {
785 // Non-dirty commit: no append, no publish, no durable id advance.
786 return Ok(self.parent.lsn());
787 }
788 let lsn = self
789 .parent
790 .lsn()
791 .checked_next()
792 .ok_or(DbError::Txn(crate::error::TxnError::CommitSeqOverflow))?;
793 let (ops, blob) = self.delta.take_frame();
794 let frame = wal::encode_commit(
795 lsn.get(),
796 self.transaction_id.get(),
797 self.database.base_generation,
798 &ops,
799 &blob,
800 )?;
801 let mut log = open_log_for_append(&self.database.root, self.database.base_generation)?;
802 wal::append_commit(&mut log, &frame)?;
803
804 // Durable: the delta was seeded from the parent overlay and only added
805 // this writer's changes, so freezing it directly is the full new
806 // published overlay (parent state + this commit). The parent overlay was
807 // never mutated — this is a brand-new frozen `Arc<Overlay>`, so a reader
808 // pinning the parent is unaffected.
809 let new_overlay = Arc::new(self.delta.freeze());
810 // A commit never folds, so the new snapshot pins the SAME base generation
811 // as the parent — the base wire bytes are byte-identical, and so are the
812 // owned records and the derived index built from them. Share the parent's
813 // `Arc<BaseRecords>` (and its `BaseIndex`) instead of re-decoding the base
814 // and rebuilding the index, which keeps a single-element commit `O(change)`
815 // rather than `O(base)` regardless of how large the base has grown.
816 let snapshot = Snapshot::with_shared_base_records(
817 self.parent.generation(),
818 lsn,
819 Arc::clone(self.parent.base()),
820 new_overlay,
821 Arc::clone(self.parent.base_records()),
822 );
823 self.database.current = Arc::new(snapshot);
824 self.database.last_transaction_id = self.transaction_id;
825 // Release the writer lock before any auto-fold so the fold can re-acquire
826 // it (a partial move out of `self`, legal because `Writer` has
827 // no `Drop` impl; the remaining `&mut Db` borrow stays live).
828 drop(self.lock);
829 self.database.maybe_auto_checkpoint()?;
830 Ok(lsn)
831 }
832
833 /// Returns the merged read view this writer sees (overlay over base).
834 ///
835 /// # Performance
836 ///
837 /// This method is `O(1)` to construct.
838 fn merged(&self) -> crate::overlay::WriteMergedState<'_> {
839 crate::overlay::WriteMergedState::new(self.parent.base_records(), &self.delta)
840 }
841
842 /// Requires an element to be visible in the writer's merged view.
843 ///
844 /// # Errors
845 ///
846 /// Returns [`DbError::UnknownElement`] when absent.
847 ///
848 /// # Performance
849 ///
850 /// This method is `O(log change + log n)`.
851 fn require_element(&self, id: ElementId) -> Result<(), DbError> {
852 if self.merged().contains_element(id) {
853 Ok(())
854 } else {
855 Err(DbError::unknown(id))
856 }
857 }
858
859 /// Requires a relation to be visible.
860 ///
861 /// # Errors
862 ///
863 /// Returns [`DbError::UnknownRelation`] when absent.
864 ///
865 /// # Performance
866 ///
867 /// This method is `O(log change + log n)`.
868 fn require_relation(&self, id: RelationId) -> Result<(), DbError> {
869 if self.merged().contains_relation(id) {
870 Ok(())
871 } else {
872 Err(DbError::unknown(id))
873 }
874 }
875
876 /// Requires an incidence to be visible.
877 ///
878 /// # Errors
879 ///
880 /// Returns [`DbError::UnknownIncidence`] when absent.
881 ///
882 /// # Performance
883 ///
884 /// This method is `O(log change + log n)`.
885 fn require_incidence(&self, id: IncidenceId) -> Result<(), DbError> {
886 if self.merged().contains_incidence(id) {
887 Ok(())
888 } else {
889 Err(DbError::unknown(id))
890 }
891 }
892
893 /// Requires a role to exist in the merged catalog.
894 ///
895 /// # Errors
896 ///
897 /// Returns [`DbError::UnknownRole`] when absent.
898 ///
899 /// # Performance
900 ///
901 /// This method is `O(log role count)`.
902 fn require_role(&self, id: RoleId) -> Result<(), DbError> {
903 if self.delta.catalog().role(id).is_some() {
904 Ok(())
905 } else {
906 Err(DbError::unknown(id))
907 }
908 }
909
910 /// Requires a label to exist in the merged catalog.
911 ///
912 /// # Errors
913 ///
914 /// Returns [`DbError::UnknownLabel`] when absent.
915 ///
916 /// # Performance
917 ///
918 /// This method is `O(log label count)`.
919 fn require_label(&self, id: LabelId) -> Result<(), DbError> {
920 if self.delta.catalog().label(id).is_some() {
921 Ok(())
922 } else {
923 Err(DbError::unknown(id))
924 }
925 }
926
927 /// Requires a relation type to exist in the merged catalog.
928 ///
929 /// # Errors
930 ///
931 /// Returns [`DbError::UnknownRelationType`] when absent.
932 ///
933 /// # Performance
934 ///
935 /// This method is `O(log relation type count)`.
936 fn require_relation_type(&self, id: RelationTypeId) -> Result<(), DbError> {
937 if self.delta.catalog().relation_type(id).is_some() {
938 Ok(())
939 } else {
940 Err(DbError::unknown(id))
941 }
942 }
943
944 /// Requires a property subject to be visible.
945 ///
946 /// # Errors
947 ///
948 /// Returns the matching `Unknown*` error when the subject is absent.
949 ///
950 /// # Performance
951 ///
952 /// This method is `O(log change + log n)`.
953 fn require_subject(&self, subject: PropertySubject) -> Result<(), DbError> {
954 match subject {
955 PropertySubject::Element(id) => self.require_element(id),
956 PropertySubject::Relation(id) => self.require_relation(id),
957 PropertySubject::Incidence(id) => self.require_incidence(id),
958 }
959 }
960
961 /// Validates one projection definition against the merged catalog.
962 ///
963 /// # Errors
964 ///
965 /// Returns [`DbError`] when a referenced role or relation type is unknown.
966 ///
967 /// # Performance
968 ///
969 /// This method is `O(definition size)`.
970 fn validate_projection_definition(
971 &self,
972 definition: &ProjectionDefinition,
973 ) -> Result<(), DbError> {
974 match definition {
975 ProjectionDefinition::Graph(graph) => {
976 self.require_role(graph.source_role)?;
977 self.require_role(graph.target_role)?;
978 for relation_type in &graph.relation_types {
979 self.require_relation_type(*relation_type)?;
980 }
981 Ok(())
982 }
983 ProjectionDefinition::Hypergraph(hyper) => {
984 for role in &hyper.source_roles {
985 self.require_role(*role)?;
986 }
987 for role in &hyper.target_roles {
988 self.require_role(*role)?;
989 }
990 for relation_type in &hyper.relation_types {
991 self.require_relation_type(*relation_type)?;
992 }
993 Ok(())
994 }
995 }
996 }
997
998 /// Validates one index definition against the merged catalog.
999 ///
1000 /// # Errors
1001 ///
1002 /// Returns [`DbError`] when a referenced catalog id is unknown or a
1003 /// composite index has no keys.
1004 ///
1005 /// # Performance
1006 ///
1007 /// This method is `O(definition size)`.
1008 fn validate_index_definition(&self, definition: &IndexDefinition) -> Result<(), DbError> {
1009 let catalog = self.delta.catalog();
1010 match definition {
1011 IndexDefinition::Label { label } => self.require_label(*label),
1012 IndexDefinition::RelationType { relation_type } => {
1013 self.require_relation_type(*relation_type)
1014 }
1015 IndexDefinition::PropertyEquality { key } | IndexDefinition::PropertyRange { key } => {
1016 self.require_property_key(*key)
1017 }
1018 IndexDefinition::CompositeEquality { keys } => {
1019 if keys.is_empty() {
1020 return Err(DbError::unsupported(
1021 "composite equality index requires at least one key",
1022 ));
1023 }
1024 for key in keys {
1025 self.require_property_key(*key)?;
1026 }
1027 Ok(())
1028 }
1029 IndexDefinition::Projection { projection } => catalog
1030 .projection(*projection)
1031 .is_some()
1032 .then_some(())
1033 .ok_or_else(|| DbError::unknown(*projection)),
1034 }
1035 }
1036
1037 /// Requires a property key to exist in the merged catalog.
1038 ///
1039 /// # Errors
1040 ///
1041 /// Returns [`DbError::UnknownPropertyKey`] when absent.
1042 ///
1043 /// # Performance
1044 ///
1045 /// This method is `O(log property key count)`.
1046 fn require_property_key(&self, id: PropertyKeyId) -> Result<(), DbError> {
1047 if self.delta.catalog().property_key(id).is_some() {
1048 Ok(())
1049 } else {
1050 Err(DbError::unknown(id))
1051 }
1052 }
1053}