oxgraph_db/database/writer.rs
1//! The single writer transaction: mutators, typed surface, and commit.
2
3use std::{collections::BTreeSet, sync::Arc};
4
5use super::{Db, open::open_log_for_append};
6use crate::{
7 Bound, CommitSeq, DbError, ElementId, GraphProjectionDefinition, GraphProjectionSpec,
8 HypergraphProjectionDefinition, HypergraphProjectionSpec, IncidenceId, IndexId, LabelId,
9 ProjectionDefinition, ProjectionId, PropertyKeyId, PropertySubject, PropertyType,
10 PropertyValue, RelationId, RelationTypeId, RoleId, Schema, TransactionId,
11 catalog::{IndexDefinition, PropertyFamily},
12 lock::WriterLock,
13 overlay::{Snapshot, StateView, WriteOverlay},
14 typed::{Assignable, EqualityIndex, Key, ValueType},
15 wal,
16};
17
18/// Single writer transaction.
19///
20/// Mutations accumulate into a private write overlay layered over the parent
21/// snapshot; reads fall through the overlay then the base. `commit` appends the
22/// overlay's mutation log to the WAL (when dirty) and publishes a fresh snapshot;
23/// `rollback` drops the overlay and appends nothing.
24///
25/// # Performance
26///
27/// Creating and moving a writer is `O(1)`; each mutation is `O(log change)`.
28pub struct Writer<'db> {
29 /// Db receiving the commit.
30 pub(super) database: &'db mut Db,
31 /// Parent snapshot the writer layers over (its base + frozen overlay).
32 pub(super) parent: Arc<Snapshot>,
33 /// Private mutable delta this writer accumulates.
34 pub(super) delta: WriteOverlay,
35 /// Writer transaction id (session-local until a dirty commit makes it
36 /// durable).
37 pub(super) transaction_id: TransactionId,
38 /// Held single-writer advisory lock. Its [`Drop`] releases the lock when this
39 /// transaction ends (on `rollback`, or on any early-return error path); a
40 /// successful dirty [`Self::commit`] releases it explicitly with `drop` so a
41 /// triggered auto-checkpoint can re-acquire it.
42 pub(super) lock: WriterLock,
43}
44
45impl Writer<'_> {
46 /// Registers a structural incidence role.
47 ///
48 /// # Errors
49 ///
50 /// Returns [`DbError`] when the name already exists or ID allocation fails.
51 ///
52 /// # Performance
53 ///
54 /// This method is `O(log role count + name length)`.
55 pub fn register_role(&mut self, name: impl Into<String>) -> Result<RoleId, DbError> {
56 self.delta.register_role(name.into())
57 }
58
59 /// Registers an element or relation label.
60 ///
61 /// # Errors
62 ///
63 /// Returns [`DbError`] when the name already exists or ID allocation fails.
64 ///
65 /// # Performance
66 ///
67 /// This method is `O(log label count + name length)`.
68 pub fn register_label(&mut self, name: impl Into<String>) -> Result<LabelId, DbError> {
69 self.delta.register_label(name.into())
70 }
71
72 /// Registers a relation type.
73 ///
74 /// # Errors
75 ///
76 /// Returns [`DbError`] when the name already exists or ID allocation fails.
77 ///
78 /// # Performance
79 ///
80 /// This method is `O(log relation type count + name length)`.
81 pub fn register_relation_type(
82 &mut self,
83 name: impl Into<String>,
84 ) -> Result<RelationTypeId, DbError> {
85 self.delta.register_relation_type(name.into())
86 }
87
88 /// Registers a typed property key.
89 ///
90 /// # Errors
91 ///
92 /// Returns [`DbError`] when the name already exists or ID allocation fails.
93 ///
94 /// # Performance
95 ///
96 /// This method is `O(log property key count + name length)`.
97 pub fn register_property_key(
98 &mut self,
99 name: impl Into<String>,
100 family: PropertyFamily,
101 value_type: PropertyType,
102 ) -> Result<PropertyKeyId, DbError> {
103 self.delta
104 .register_property_key(name.into(), family, value_type)
105 }
106
107 /// Defines a physical projection.
108 ///
109 /// # Errors
110 ///
111 /// Returns [`DbError`] when referenced catalog IDs are unknown, the
112 /// projection name already exists, or ID allocation fails.
113 ///
114 /// # Performance
115 ///
116 /// This method is `O(definition size + catalog lookup cost)`.
117 pub fn define_projection(
118 &mut self,
119 definition: ProjectionDefinition,
120 ) -> Result<ProjectionId, DbError> {
121 self.validate_projection_definition(&definition)?;
122 self.delta.register_projection(definition)
123 }
124
125 /// Defines an index.
126 ///
127 /// # Errors
128 ///
129 /// Returns [`DbError`] when referenced catalog IDs are unknown, the index
130 /// name already exists, or ID allocation fails.
131 ///
132 /// # Performance
133 ///
134 /// This method is `O(definition size + catalog lookup cost)`.
135 pub fn define_index(
136 &mut self,
137 name: impl Into<String>,
138 definition: IndexDefinition,
139 ) -> Result<IndexId, DbError> {
140 self.validate_index_definition(&definition)?;
141 self.delta.register_index(name.into(), definition)
142 }
143
144 /// Applies a declarative [`Schema`] idempotently (register-or-get every
145 /// declared item), returning the resolved [`Bound`] handle bag. Re-applying
146 /// the same schema reuses existing ids; a name that already exists with a
147 /// conflicting shape is a [`DbError::SchemaConflict`].
148 ///
149 /// # Errors
150 ///
151 /// Returns [`DbError`] on a shape conflict, an undeclared referenced name (an
152 /// index's key, a projection's role/type), or id-allocation failure.
153 ///
154 /// # Performance
155 ///
156 /// This method is `O(declared items × log catalog)`.
157 pub fn apply_schema(&mut self, schema: &Schema) -> Result<Bound, DbError> {
158 let mut bound = Bound::default();
159 for name in &schema.roles {
160 let id = match self.merged().catalog().role_id(name) {
161 Some(id) => id,
162 None => self.register_role(name.clone())?,
163 };
164 bound.roles.insert(name.clone(), id);
165 }
166 for name in &schema.labels {
167 let id = match self.merged().catalog().label_id(name) {
168 Some(id) => id,
169 None => self.register_label(name.clone())?,
170 };
171 bound.labels.insert(name.clone(), id);
172 }
173 for name in &schema.relation_types {
174 let id = match self.merged().catalog().relation_type_id(name) {
175 Some(id) => id,
176 None => self.register_relation_type(name.clone())?,
177 };
178 bound.relation_types.insert(name.clone(), id);
179 }
180 for (name, family, value_type) in &schema.keys {
181 let id = self.register_key_or_get(name, *family, *value_type)?;
182 bound.keys.insert(name.clone(), (id, *value_type));
183 }
184 for (name, key_name) in &schema.equality_indexes {
185 let (key_id, value_type) = *bound.keys.get(key_name).ok_or_else(|| {
186 DbError::Catalog(crate::error::CatalogError::UnknownName {
187 kind: "property key",
188 name: key_name.clone(),
189 })
190 })?;
191 let id = match self.merged().catalog().index_id(name) {
192 Some(id) => id,
193 None => self.define_index(
194 name.clone(),
195 IndexDefinition::PropertyEquality { key: key_id },
196 )?,
197 };
198 bound
199 .equality_indexes
200 .insert(name.clone(), (id, value_type));
201 }
202 for spec in &schema.graph_projections {
203 let id = match self.merged().catalog().projection_id(&spec.name) {
204 Some(id) => id,
205 None => self.define_graph_projection(spec, &bound)?,
206 };
207 bound.projections.insert(spec.name.clone(), id);
208 }
209 for spec in &schema.hypergraph_projections {
210 let id = match self.merged().catalog().projection_id(&spec.name) {
211 Some(id) => id,
212 None => self.define_hypergraph_projection(spec, &bound)?,
213 };
214 bound.projections.insert(spec.name.clone(), id);
215 }
216 Ok(bound)
217 }
218
219 /// Registers a property key, or returns the existing id when the name is
220 /// already present with a matching family and value type.
221 ///
222 /// # Errors
223 ///
224 /// Returns [`DbError::SchemaConflict`] when the name exists with a different
225 /// family or value type.
226 ///
227 /// # Performance
228 ///
229 /// This method is `O(log catalog)`.
230 fn register_key_or_get(
231 &mut self,
232 name: &str,
233 family: PropertyFamily,
234 value_type: PropertyType,
235 ) -> Result<PropertyKeyId, DbError> {
236 let Some(existing) = self.merged().catalog().property_key_id(name) else {
237 return self.register_property_key(name.to_owned(), family, value_type);
238 };
239 let matches = self
240 .merged()
241 .catalog()
242 .property_key(existing)
243 .is_some_and(|def| def.family == family && def.value_type == value_type);
244 if matches {
245 Ok(existing)
246 } else {
247 Err(DbError::Catalog(
248 crate::error::CatalogError::SchemaConflict {
249 name: name.to_owned(),
250 reason: "property key family/value type differs from the existing catalog entry",
251 },
252 ))
253 }
254 }
255
256 /// Defines a graph projection from a spec, resolving its relation-type and
257 /// role names through `bound`.
258 ///
259 /// # Errors
260 ///
261 /// Returns [`DbError::UnknownName`] when a referenced role/type is unbound, or
262 /// a definition error.
263 ///
264 /// # Performance
265 ///
266 /// This method is `O(relation-type count × log catalog)`.
267 fn define_graph_projection(
268 &mut self,
269 spec: &GraphProjectionSpec,
270 bound: &Bound,
271 ) -> Result<ProjectionId, DbError> {
272 let mut relation_types = BTreeSet::new();
273 for name in &spec.relation_types {
274 relation_types.insert(bound.relation_type(name)?);
275 }
276 let source_role = bound.role(&spec.source_role)?;
277 let target_role = bound.role(&spec.target_role)?;
278 self.define_projection(ProjectionDefinition::Graph(GraphProjectionDefinition {
279 name: spec.name.clone(),
280 relation_types,
281 source_role,
282 target_role,
283 }))
284 }
285
286 /// Defines a hypergraph projection from a spec, resolving its relation-type
287 /// and role names through `bound`.
288 ///
289 /// # Errors
290 ///
291 /// Returns [`DbError::UnknownName`] when a referenced role/type is unbound, or
292 /// a definition error.
293 ///
294 /// # Performance
295 ///
296 /// This method is `O((relation-type count + role count) × log catalog)`.
297 fn define_hypergraph_projection(
298 &mut self,
299 spec: &HypergraphProjectionSpec,
300 bound: &Bound,
301 ) -> Result<ProjectionId, DbError> {
302 let mut relation_types = BTreeSet::new();
303 for name in &spec.relation_types {
304 relation_types.insert(bound.relation_type(name)?);
305 }
306 let mut source_roles = BTreeSet::new();
307 for name in &spec.source_roles {
308 source_roles.insert(bound.role(name)?);
309 }
310 let mut target_roles = BTreeSet::new();
311 for name in &spec.target_roles {
312 target_roles.insert(bound.role(name)?);
313 }
314 self.define_projection(ProjectionDefinition::Hypergraph(
315 HypergraphProjectionDefinition {
316 name: spec.name.clone(),
317 relation_types,
318 source_roles,
319 target_roles,
320 },
321 ))
322 }
323
324 /// Creates a canonical element.
325 ///
326 /// # Errors
327 ///
328 /// Returns [`DbError::IdOverflow`] when element IDs are exhausted.
329 ///
330 /// # Performance
331 ///
332 /// This method is `O(log element change)`.
333 pub fn create_element(&mut self) -> Result<ElementId, DbError> {
334 self.delta.create_element()
335 }
336
337 /// Creates a canonical relation.
338 ///
339 /// # Errors
340 ///
341 /// Returns [`DbError::IdOverflow`] when relation IDs are exhausted.
342 ///
343 /// # Performance
344 ///
345 /// This method is `O(log relation change)`.
346 pub fn create_relation(&mut self) -> Result<RelationId, DbError> {
347 self.delta.create_relation()
348 }
349
350 /// Creates a canonical incidence.
351 ///
352 /// # Errors
353 ///
354 /// Returns [`DbError`] when referenced IDs are unknown or incidence IDs are
355 /// exhausted.
356 ///
357 /// # Performance
358 ///
359 /// This method is `O(log incidence change + reference lookup cost)`.
360 pub fn create_incidence(
361 &mut self,
362 relation: RelationId,
363 element: ElementId,
364 role: RoleId,
365 ) -> Result<IncidenceId, DbError> {
366 self.require_relation(relation)?;
367 self.require_element(element)?;
368 self.require_role(role)?;
369 self.delta.create_incidence(relation, element, role)
370 }
371
372 /// Tombstones a canonical element and its incidences.
373 ///
374 /// # Errors
375 ///
376 /// Returns [`DbError::UnknownElement`] when the element is not visible.
377 ///
378 /// # Performance
379 ///
380 /// This method is `O(log n + degree)` via the reverse-adjacency index.
381 pub(crate) fn tombstone_element(&mut self, id: ElementId) -> Result<(), DbError> {
382 self.require_element(id)?;
383 // Cascade: every incidence on the element — resolved in O(log n + degree)
384 // through the reverse-adjacency index, not a full incidence scan — is
385 // tombstoned too.
386 let incidences: Vec<IncidenceId> = self
387 .merged()
388 .element_incidences(id)
389 .into_iter()
390 .map(|record| record.id)
391 .collect();
392 let base = self.parent.base_records();
393 self.delta.tombstone_element(base, id);
394 for incidence in incidences {
395 self.delta
396 .tombstone_incidence(self.parent.base_records(), incidence);
397 }
398 Ok(())
399 }
400
401 /// Tombstones a canonical relation and its incidences.
402 ///
403 /// # Errors
404 ///
405 /// Returns [`DbError::UnknownRelation`] when the relation is not visible.
406 ///
407 /// # Performance
408 ///
409 /// This method is `O(log n + degree)` via the reverse-adjacency index.
410 pub(crate) fn tombstone_relation(&mut self, id: RelationId) -> Result<(), DbError> {
411 self.require_relation(id)?;
412 // Cascade: every incidence in the relation — resolved in O(log n + degree)
413 // through the reverse-adjacency index, not a full incidence scan.
414 let incidences: Vec<IncidenceId> = self
415 .merged()
416 .relation_incidences(id)
417 .into_iter()
418 .map(|record| record.id)
419 .collect();
420 let base = self.parent.base_records();
421 self.delta.tombstone_relation(base, id);
422 for incidence in incidences {
423 self.delta
424 .tombstone_incidence(self.parent.base_records(), incidence);
425 }
426 Ok(())
427 }
428
429 /// Tombstones a canonical incidence.
430 ///
431 /// # Errors
432 ///
433 /// Returns [`DbError::UnknownIncidence`] when the incidence is not visible.
434 ///
435 /// # Performance
436 ///
437 /// This method is `O(log incidence change)`.
438 pub(crate) fn tombstone_incidence(&mut self, id: IncidenceId) -> Result<(), DbError> {
439 self.require_incidence(id)?;
440 self.delta
441 .tombstone_incidence(self.parent.base_records(), id);
442 Ok(())
443 }
444
445 /// Adds a label to an element.
446 ///
447 /// # Errors
448 ///
449 /// Returns [`DbError`] when the element or label is unknown.
450 ///
451 /// # Performance
452 ///
453 /// This method is `O(log element change + log label count)`.
454 pub(crate) fn add_element_label(
455 &mut self,
456 element: ElementId,
457 label: LabelId,
458 ) -> Result<(), DbError> {
459 self.require_element(element)?;
460 self.require_label(label)?;
461 self.delta
462 .add_element_label(self.parent.base_records(), element, label);
463 Ok(())
464 }
465
466 /// Adds a label to a relation.
467 ///
468 /// # Errors
469 ///
470 /// Returns [`DbError`] when the relation or label is unknown.
471 ///
472 /// # Performance
473 ///
474 /// This method is `O(log relation change + log label count)`.
475 pub(crate) fn add_relation_label(
476 &mut self,
477 relation: RelationId,
478 label: LabelId,
479 ) -> Result<(), DbError> {
480 self.require_relation(relation)?;
481 self.require_label(label)?;
482 self.delta
483 .add_relation_label(self.parent.base_records(), relation, label);
484 Ok(())
485 }
486
487 /// Sets a relation type.
488 ///
489 /// # Errors
490 ///
491 /// Returns [`DbError`] when the relation or relation type is unknown.
492 ///
493 /// # Performance
494 ///
495 /// This method is `O(log relation change + log relation type count)`.
496 pub fn set_relation_type(
497 &mut self,
498 relation: RelationId,
499 relation_type: RelationTypeId,
500 ) -> Result<(), DbError> {
501 self.require_relation(relation)?;
502 self.require_relation_type(relation_type)?;
503 self.delta
504 .set_relation_type(self.parent.base_records(), relation, relation_type);
505 Ok(())
506 }
507
508 /// Sets a property value.
509 ///
510 /// # Errors
511 ///
512 /// Returns [`DbError`] when the subject or key is unknown, or the value
513 /// does not match the key schema.
514 ///
515 /// # Performance
516 ///
517 /// This method is `O(log subject change + log key count)`.
518 pub(crate) fn set_property(
519 &mut self,
520 subject: PropertySubject,
521 key: PropertyKeyId,
522 value: PropertyValue,
523 ) -> Result<(), DbError> {
524 // Referential integrity: the subject must be visible (this rejects an
525 // orphan property against a tombstoned/absent subject at the transaction
526 // boundary — the overlay layer is permissive by design).
527 self.require_subject(subject)?;
528 let definition = self
529 .merged()
530 .catalog()
531 .property_key(key)
532 .cloned()
533 .ok_or_else(|| DbError::unknown(key))?;
534 if definition.family != subject.family() {
535 return Err(DbError::Query(
536 crate::error::QueryError::WrongPropertyFamily {
537 expected: definition.family,
538 actual: subject.family(),
539 },
540 ));
541 }
542 if definition.value_type != value.value_type() {
543 return Err(DbError::Query(
544 crate::error::QueryError::PropertyTypeMismatch {
545 expected: definition.value_type,
546 actual: value.value_type(),
547 },
548 ));
549 }
550 self.delta
551 .set_property(self.parent.base_records(), subject, key, value);
552 Ok(())
553 }
554
555 /// Removes a property value.
556 ///
557 /// # Errors
558 ///
559 /// Returns [`DbError`] when the subject or key is unknown.
560 ///
561 /// # Performance
562 ///
563 /// This method is `O(log subject change + log key count)`.
564 pub(crate) fn remove_property(
565 &mut self,
566 subject: PropertySubject,
567 key: PropertyKeyId,
568 ) -> Result<(), DbError> {
569 self.require_subject(subject)?;
570 if self.merged().catalog().property_key(key).is_none() {
571 return Err(DbError::unknown(key));
572 }
573 self.delta
574 .remove_property(self.parent.base_records(), subject, key);
575 Ok(())
576 }
577
578 /// Resolves the property key an equality index covers.
579 ///
580 /// # Errors
581 ///
582 /// Returns [`DbError::UnknownIndex`] when `index` is unknown, or an
583 /// unsupported-query error when it is not a property-equality index.
584 ///
585 /// # Performance
586 ///
587 /// This method is `O(log index count)`.
588 fn equality_index_key(&self, index: IndexId) -> Result<PropertyKeyId, DbError> {
589 let view = self.merged();
590 let entry = view
591 .catalog()
592 .index(index)
593 .ok_or_else(|| DbError::unknown(index))?;
594 match &entry.definition {
595 IndexDefinition::PropertyEquality { key } => Ok(*key),
596 _other => Err(DbError::unsupported(
597 "reconcile requires a property-equality index",
598 )),
599 }
600 }
601
602 /// Inserts or updates the element whose value under `index` equals `value`,
603 /// returning its canonical id — reused when an element already carries that
604 /// identity value (id stable across reconcile), freshly minted (a never-reused
605 /// id, with the identity property set) otherwise.
606 ///
607 /// # Errors
608 ///
609 /// Returns [`DbError`] when `index` is not an equality index or the value
610 /// type mismatches the key schema.
611 ///
612 /// # Performance
613 ///
614 /// This method is `O(log n + value length)` — a probe plus, on a miss, a mint.
615 pub fn upsert_element<T: ValueType>(
616 &mut self,
617 index: EqualityIndex<T>,
618 value: impl Assignable<T>,
619 ) -> Result<ElementId, DbError> {
620 let value = value.into_value()?;
621 let key = self.equality_index_key(index.id())?;
622 let existing = self
623 .merged()
624 .property_equal(key, &value)
625 .into_iter()
626 .find_map(|subject| match subject {
627 PropertySubject::Element(id) => Some(id),
628 PropertySubject::Relation(_) | PropertySubject::Incidence(_) => None,
629 });
630 if let Some(id) = existing {
631 return Ok(id);
632 }
633 let element = self.create_element()?;
634 self.set_property(PropertySubject::Element(element), key, value)?;
635 Ok(element)
636 }
637
638 /// Inserts or updates the relation whose value under `index` equals `value`,
639 /// returning its canonical id. On a miss it mints the relation, sets its type
640 /// and identity property, and creates one incidence per `(element, role)`
641 /// endpoint; on a hit the existing relation (with its endpoints) is reused
642 /// unchanged — the identity value encodes the endpoints, so they are immutable.
643 ///
644 /// # Errors
645 ///
646 /// Returns [`DbError`] when `index` is not an equality index, the value type
647 /// mismatches, or an endpoint element does not exist.
648 ///
649 /// # Performance
650 ///
651 /// This method is `O(log n + endpoints)` — a probe plus, on a miss, a mint.
652 pub fn upsert_relation<T: ValueType>(
653 &mut self,
654 index: EqualityIndex<T>,
655 value: impl Assignable<T>,
656 relation_type: RelationTypeId,
657 endpoints: &[(ElementId, RoleId)],
658 ) -> Result<RelationId, DbError> {
659 let value = value.into_value()?;
660 let key = self.equality_index_key(index.id())?;
661 let existing = self
662 .merged()
663 .property_equal(key, &value)
664 .into_iter()
665 .find_map(|subject| match subject {
666 PropertySubject::Relation(id) => Some(id),
667 PropertySubject::Element(_) | PropertySubject::Incidence(_) => None,
668 });
669 if let Some(id) = existing {
670 return Ok(id);
671 }
672 let relation = self.create_relation()?;
673 self.set_relation_type(relation, relation_type)?;
674 self.set_property(PropertySubject::Relation(relation), key, value)?;
675 for (element, role) in endpoints {
676 self.create_incidence(relation, *element, *role)?;
677 }
678 Ok(relation)
679 }
680
681 /// Tombstones every subject carried by `index` whose identity value is NOT in
682 /// `keep`, cascading each subject's incidences in `O(degree)` via the
683 /// reverse-adjacency index. The prune half of a reconcile: after upserting
684 /// every desired subject, `retain` removes the vanished complement.
685 ///
686 /// # Errors
687 ///
688 /// Returns [`DbError`] when `index` is not an equality index or a `keep` value
689 /// type mismatches the key schema.
690 ///
691 /// # Performance
692 ///
693 /// This method is `O(family size + removed × degree)`.
694 pub fn retain<T: ValueType, V: Assignable<T> + Copy>(
695 &mut self,
696 index: EqualityIndex<T>,
697 keep: &[V],
698 ) -> Result<(), DbError> {
699 let key = self.equality_index_key(index.id())?;
700 let mut keep_values: BTreeSet<PropertyValue> = BTreeSet::new();
701 for value in keep {
702 keep_values.insert((*value).into_value()?);
703 }
704 let stale: Vec<PropertySubject> = self
705 .merged()
706 .property_key_subjects(key)
707 .into_iter()
708 .filter(|(_subject, value)| !keep_values.contains(value))
709 .map(|(subject, _value)| subject)
710 .collect();
711 for subject in stale {
712 match subject {
713 PropertySubject::Element(id) => self.tombstone_element(id)?,
714 PropertySubject::Relation(id) => self.tombstone_relation(id)?,
715 PropertySubject::Incidence(id) => self.tombstone_incidence(id)?,
716 }
717 }
718 Ok(())
719 }
720
721 /// Sets a typed property on a subject; the value type is checked at compile
722 /// time against the key.
723 ///
724 /// # Errors
725 ///
726 /// Returns [`DbError`] when the subject is absent, the value is out of range,
727 /// or the value type mismatches the key schema.
728 ///
729 /// # Performance
730 ///
731 /// This method is `O(log change + log keys)`.
732 pub fn set<T: ValueType>(
733 &mut self,
734 subject: impl Into<PropertySubject>,
735 key: Key<T>,
736 value: impl Assignable<T>,
737 ) -> Result<(), DbError> {
738 self.set_property(subject.into(), key.id(), value.into_value()?)
739 }
740
741 /// Removes a typed property from a subject.
742 ///
743 /// # Errors
744 ///
745 /// Returns [`DbError`] when the subject is absent or the key is unknown.
746 ///
747 /// # Performance
748 ///
749 /// This method is `O(log change + log keys)`.
750 pub fn unset<T: ValueType>(
751 &mut self,
752 subject: impl Into<PropertySubject>,
753 key: Key<T>,
754 ) -> Result<(), DbError> {
755 self.remove_property(subject.into(), key.id())
756 }
757
758 /// Adds a label to an element or relation subject.
759 ///
760 /// # Errors
761 ///
762 /// Returns [`DbError`] when the subject is absent, the label is unknown, or
763 /// the subject is an incidence (incidences carry no labels).
764 ///
765 /// # Performance
766 ///
767 /// This method is `O(log change + log labels)`.
768 pub fn add_label(
769 &mut self,
770 subject: impl Into<PropertySubject>,
771 label: LabelId,
772 ) -> Result<(), DbError> {
773 match subject.into() {
774 PropertySubject::Element(id) => self.add_element_label(id, label),
775 PropertySubject::Relation(id) => self.add_relation_label(id, label),
776 PropertySubject::Incidence(_) => {
777 Err(DbError::unsupported("incidences do not carry labels"))
778 }
779 }
780 }
781
782 /// Tombstones any subject by id, cascading a relation's or element's
783 /// incidences in `O(degree)` via the reverse-adjacency index.
784 ///
785 /// # Errors
786 ///
787 /// Returns [`DbError`] when the subject is not visible.
788 ///
789 /// # Performance
790 ///
791 /// This method is `O(log change + degree)`.
792 pub fn tombstone(&mut self, subject: impl Into<PropertySubject>) -> Result<(), DbError> {
793 match subject.into() {
794 PropertySubject::Element(id) => self.tombstone_element(id),
795 PropertySubject::Relation(id) => self.tombstone_relation(id),
796 PropertySubject::Incidence(id) => self.tombstone_incidence(id),
797 }
798 }
799
800 /// Commits this write transaction durably.
801 ///
802 /// A non-dirty commit returns the parent's commit sequence without appending
803 /// to the WAL or publishing. A dirty commit encodes the overlay's mutation
804 /// log into one WAL frame (with the watermark op last), appends it with an
805 /// fsync (truncating back to the captured EOF on any write error so no
806 /// interior torn record survives), THEN folds the delta into a fresh
807 /// `Arc<Overlay>` and publishes a new `Arc<Snapshot>`.
808 ///
809 /// After publishing, a dirty commit consults the configured
810 /// [`CheckpointPolicy`]: it releases the writer lock FIRST (so the fold can
811 /// re-acquire it), then folds when the delta-log has outgrown the base. The
812 /// committed frame is already durable, so an auto-fold failure does not lose
813 /// data; it is surfaced to the caller.
814 ///
815 /// # Errors
816 ///
817 /// Returns [`DbError`] when commit-sequence allocation, frame encoding, the
818 /// durable append, or a triggered auto-checkpoint fold fails.
819 ///
820 /// # Performance
821 ///
822 /// This method is `O(change)` for the dirty path — flat as the base grows.
823 /// The publish step shares the parent snapshot's already-materialized
824 /// [`crate::overlay::BaseRecords`] and derived index by `Arc` (a commit never
825 /// folds, so the base is byte-identical within the generation), so it neither
826 /// re-decodes the base nor rebuilds the index. A triggered fold adds
827 /// `O(visible state bytes)` on top.
828 pub(crate) fn commit(mut self) -> Result<CommitSeq, DbError> {
829 if self.delta.is_empty() {
830 // Non-dirty commit: no append, no publish, no durable id advance.
831 return Ok(self.parent.lsn());
832 }
833 let lsn = self
834 .parent
835 .lsn()
836 .checked_next()
837 .ok_or(DbError::Txn(crate::error::TxnError::CommitSeqOverflow))?;
838 let (ops, blob) = self.delta.take_frame();
839 let frame = wal::encode_commit(
840 lsn.get(),
841 self.transaction_id.get(),
842 self.database.base_generation,
843 &ops,
844 &blob,
845 )?;
846 let mut log = open_log_for_append(&self.database.root, self.database.base_generation)?;
847 wal::append_commit(&mut log, &frame)?;
848
849 // Durable: the delta was seeded from the parent overlay and only added
850 // this writer's changes, so freezing it directly is the full new
851 // published overlay (parent state + this commit). The parent overlay was
852 // never mutated — this is a brand-new frozen `Arc<Overlay>`, so a reader
853 // pinning the parent is unaffected.
854 let new_overlay = Arc::new(self.delta.freeze());
855 // A commit never folds, so the new snapshot pins the SAME base generation
856 // as the parent — the base wire bytes are byte-identical, and so are the
857 // owned records and the derived index built from them. Share the parent's
858 // `Arc<BaseRecords>` (and its `BaseIndex`) instead of re-decoding the base
859 // and rebuilding the index, which keeps a single-element commit `O(change)`
860 // rather than `O(base)` regardless of how large the base has grown.
861 let snapshot = Snapshot::with_shared_base_records(
862 self.parent.generation(),
863 lsn,
864 Arc::clone(self.parent.base()),
865 new_overlay,
866 Arc::clone(self.parent.base_records()),
867 );
868 self.database.current = Arc::new(snapshot);
869 self.database.last_transaction_id = self.transaction_id;
870 // Release the writer lock before any auto-fold so the fold can re-acquire
871 // it (a partial move out of `self`, legal because `Writer` has
872 // no `Drop` impl; the remaining `&mut Db` borrow stays live).
873 drop(self.lock);
874 self.database.maybe_auto_checkpoint()?;
875 Ok(lsn)
876 }
877
878 /// Returns the merged read view this writer sees (overlay over base).
879 ///
880 /// # Performance
881 ///
882 /// This method is `O(1)` to construct.
883 fn merged(&self) -> crate::overlay::WriteMergedState<'_> {
884 crate::overlay::WriteMergedState::new(self.parent.base_records(), &self.delta)
885 }
886
887 /// Requires an element to be visible in the writer's merged view.
888 ///
889 /// # Errors
890 ///
891 /// Returns [`DbError::UnknownElement`] when absent.
892 ///
893 /// # Performance
894 ///
895 /// This method is `O(log change + log n)`.
896 fn require_element(&self, id: ElementId) -> Result<(), DbError> {
897 if self.merged().contains_element(id) {
898 Ok(())
899 } else {
900 Err(DbError::unknown(id))
901 }
902 }
903
904 /// Requires a relation to be visible.
905 ///
906 /// # Errors
907 ///
908 /// Returns [`DbError::UnknownRelation`] when absent.
909 ///
910 /// # Performance
911 ///
912 /// This method is `O(log change + log n)`.
913 fn require_relation(&self, id: RelationId) -> Result<(), DbError> {
914 if self.merged().contains_relation(id) {
915 Ok(())
916 } else {
917 Err(DbError::unknown(id))
918 }
919 }
920
921 /// Requires an incidence to be visible.
922 ///
923 /// # Errors
924 ///
925 /// Returns [`DbError::UnknownIncidence`] when absent.
926 ///
927 /// # Performance
928 ///
929 /// This method is `O(log change + log n)`.
930 fn require_incidence(&self, id: IncidenceId) -> Result<(), DbError> {
931 if self.merged().contains_incidence(id) {
932 Ok(())
933 } else {
934 Err(DbError::unknown(id))
935 }
936 }
937
938 /// Requires a role to exist in the merged catalog.
939 ///
940 /// # Errors
941 ///
942 /// Returns [`DbError::UnknownRole`] when absent.
943 ///
944 /// # Performance
945 ///
946 /// This method is `O(log role count)`.
947 fn require_role(&self, id: RoleId) -> Result<(), DbError> {
948 if self.delta.catalog().role(id).is_some() {
949 Ok(())
950 } else {
951 Err(DbError::unknown(id))
952 }
953 }
954
955 /// Requires a label to exist in the merged catalog.
956 ///
957 /// # Errors
958 ///
959 /// Returns [`DbError::UnknownLabel`] when absent.
960 ///
961 /// # Performance
962 ///
963 /// This method is `O(log label count)`.
964 fn require_label(&self, id: LabelId) -> Result<(), DbError> {
965 if self.delta.catalog().label(id).is_some() {
966 Ok(())
967 } else {
968 Err(DbError::unknown(id))
969 }
970 }
971
972 /// Requires a relation type to exist in the merged catalog.
973 ///
974 /// # Errors
975 ///
976 /// Returns [`DbError::UnknownRelationType`] when absent.
977 ///
978 /// # Performance
979 ///
980 /// This method is `O(log relation type count)`.
981 fn require_relation_type(&self, id: RelationTypeId) -> Result<(), DbError> {
982 if self.delta.catalog().relation_type(id).is_some() {
983 Ok(())
984 } else {
985 Err(DbError::unknown(id))
986 }
987 }
988
989 /// Requires a property subject to be visible.
990 ///
991 /// # Errors
992 ///
993 /// Returns the matching `Unknown*` error when the subject is absent.
994 ///
995 /// # Performance
996 ///
997 /// This method is `O(log change + log n)`.
998 fn require_subject(&self, subject: PropertySubject) -> Result<(), DbError> {
999 match subject {
1000 PropertySubject::Element(id) => self.require_element(id),
1001 PropertySubject::Relation(id) => self.require_relation(id),
1002 PropertySubject::Incidence(id) => self.require_incidence(id),
1003 }
1004 }
1005
1006 /// Validates one projection definition against the merged catalog.
1007 ///
1008 /// # Errors
1009 ///
1010 /// Returns [`DbError`] when a referenced role or relation type is unknown.
1011 ///
1012 /// # Performance
1013 ///
1014 /// This method is `O(definition size)`.
1015 fn validate_projection_definition(
1016 &self,
1017 definition: &ProjectionDefinition,
1018 ) -> Result<(), DbError> {
1019 match definition {
1020 ProjectionDefinition::Graph(graph) => {
1021 self.require_role(graph.source_role)?;
1022 self.require_role(graph.target_role)?;
1023 for relation_type in &graph.relation_types {
1024 self.require_relation_type(*relation_type)?;
1025 }
1026 Ok(())
1027 }
1028 ProjectionDefinition::Hypergraph(hyper) => {
1029 for role in &hyper.source_roles {
1030 self.require_role(*role)?;
1031 }
1032 for role in &hyper.target_roles {
1033 self.require_role(*role)?;
1034 }
1035 for relation_type in &hyper.relation_types {
1036 self.require_relation_type(*relation_type)?;
1037 }
1038 Ok(())
1039 }
1040 }
1041 }
1042
1043 /// Validates one index definition against the merged catalog.
1044 ///
1045 /// # Errors
1046 ///
1047 /// Returns [`DbError`] when a referenced catalog id is unknown or a
1048 /// composite index has no keys.
1049 ///
1050 /// # Performance
1051 ///
1052 /// This method is `O(definition size)`.
1053 fn validate_index_definition(&self, definition: &IndexDefinition) -> Result<(), DbError> {
1054 let catalog = self.delta.catalog();
1055 match definition {
1056 IndexDefinition::Label { label } => self.require_label(*label),
1057 IndexDefinition::RelationType { relation_type } => {
1058 self.require_relation_type(*relation_type)
1059 }
1060 IndexDefinition::PropertyEquality { key } | IndexDefinition::PropertyRange { key } => {
1061 self.require_property_key(*key)
1062 }
1063 IndexDefinition::CompositeEquality { keys } => {
1064 if keys.is_empty() {
1065 return Err(DbError::unsupported(
1066 "composite equality index requires at least one key",
1067 ));
1068 }
1069 for key in keys {
1070 self.require_property_key(*key)?;
1071 }
1072 Ok(())
1073 }
1074 IndexDefinition::Projection { projection } => catalog
1075 .projection(*projection)
1076 .is_some()
1077 .then_some(())
1078 .ok_or_else(|| DbError::unknown(*projection)),
1079 }
1080 }
1081
1082 /// Requires a property key to exist in the merged catalog.
1083 ///
1084 /// # Errors
1085 ///
1086 /// Returns [`DbError::UnknownPropertyKey`] when absent.
1087 ///
1088 /// # Performance
1089 ///
1090 /// This method is `O(log property key count)`.
1091 fn require_property_key(&self, id: PropertyKeyId) -> Result<(), DbError> {
1092 if self.delta.catalog().property_key(id).is_some() {
1093 Ok(())
1094 } else {
1095 Err(DbError::unknown(id))
1096 }
1097 }
1098}