oxgraph_db/database/reader.rs
1//! Read transactions: a pinned snapshot and the full read/query surface.
2
3use std::{borrow::Cow, collections::BTreeSet, sync::Arc};
4
5use oxgraph_algo::{
6 HypergraphPageRankWorkspace, PageRankConfig, PageRankError, PageRankWorkspace, Uniform,
7 longest_path_dag, pagerank_graph_with_workspace, pagerank_hypergraph_with_workspace,
8};
9use oxgraph_graph::{
10 CanonicalElementIdentity, CanonicalRelationIdentity, DenseElementIndex, DenseRelationIndex,
11 LocalElementIdentity,
12};
13
14use super::IndexProbe;
15use crate::{
16 Catalog, CheckpointGeneration, CommitSeq, DbError, Element, ElementId, IncidenceId,
17 IncidenceRecord, IndexId, PreparedQuery, ProjectionDefinition, ProjectionId, Properties,
18 PropertyKeyId, PropertySubject, PropertyValue, QueryResult, Relation, RelationId,
19 RelationTypeId,
20 catalog::IndexDefinition,
21 overlay::{Snapshot, StateView},
22 projection::{
23 self, GraphProjection, HypergraphProjection, ProjectionElementId, ProjectionRelationId,
24 },
25 traversal::{self, Direction, Subgraph, Walk},
26 typed::{Assignable, EqualityIndex, ValueType},
27};
28
29/// Reader pin identifying the visible database generation.
30///
31/// # Performance
32///
33/// Copying and comparing a pin is `O(1)`.
34#[derive(Clone, Copy, Debug, Eq, PartialEq)]
35pub struct ReadPin {
36 /// Pinned visible commit sequence.
37 pub visible_commit_seq: CommitSeq,
38 /// Pinned checkpoint generation.
39 pub generation: CheckpointGeneration,
40}
41
42/// Personalized hypergraph `PageRank` output.
43///
44/// Unlike binary [`Reader::personalized_pagerank`], a hypergraph ranks both
45/// participants and the hyperedges themselves: `elements` are the projection's
46/// participant vertices and `relations` are its hyperedges, each paired with its
47/// rank and ordered highest first.
48///
49/// # Performance
50///
51/// Cloning is `O(elements + relations)`.
52#[derive(Clone, Debug, Default, PartialEq)]
53pub struct HypergraphPageRank {
54 /// Participant elements paired with rank, highest first.
55 pub elements: Vec<(ElementId, f64)>,
56 /// Hyperedges (relations) paired with rank, highest first.
57 pub relations: Vec<(RelationId, f64)>,
58}
59
60/// Read transaction over a pinned snapshot.
61///
62/// A read transaction owns its own `Arc<Snapshot>` and never borrows the
63/// [`Db`], so it stays valid across a later `begin_write`/`checkpoint` on
64/// the same handle (it cloned the snapshot before the write borrowed `&mut`). It
65/// is [`Send`] + [`Sync`] (asserted below).
66///
67/// # Performance
68///
69/// Creating and cloning a read transaction is `O(1)`: it shares the pinned
70/// snapshot through an `Arc`, not by copying.
71pub struct Reader {
72 /// The pinned snapshot this reader observes.
73 pub(super) snapshot: Arc<Snapshot>,
74}
75
76/// Returns whether a [`Reader::neighbors`] walk should follow the edge from the
77/// incidence `from` (the queried element's incidence) to the incidence `to`
78/// (a candidate neighbor's incidence) under `direction`.
79///
80/// Endpoint roles are encoded by incidence-creation order: the source endpoint
81/// has the lower incidence id. `Outgoing` follows source→target (the queried
82/// element is the source, so `from < to`), `Incoming` follows target→source, and
83/// `Both` follows either side.
84///
85/// # Performance
86///
87/// This function is `O(1)`.
88const fn follow_direction(direction: Direction, from: IncidenceId, to: IncidenceId) -> bool {
89 match direction {
90 Direction::Outgoing => from.get() < to.get(),
91 Direction::Incoming => from.get() > to.get(),
92 Direction::Both => true,
93 }
94}
95
96/// `Reader` MUST be `Send + Sync`: it pins only an `Arc<Snapshot>`,
97/// which holds `Arc`-shared `Send + Sync` data (no `Rc`/`RefCell` reachable).
98const fn assert_send_sync<T: Send + Sync>() {}
99const _: () = assert_send_sync::<Reader>();
100const _: () = assert_send_sync::<Arc<Snapshot>>();
101
102impl Reader {
103 /// Returns this transaction's reader pin.
104 ///
105 /// # Performance
106 ///
107 /// This method is `O(1)`.
108 #[must_use]
109 pub fn pin(&self) -> ReadPin {
110 ReadPin {
111 visible_commit_seq: self.snapshot.lsn(),
112 generation: self.snapshot.generation(),
113 }
114 }
115
116 /// Returns catalog metadata.
117 ///
118 /// # Performance
119 ///
120 /// This method is `O(1)`.
121 #[must_use]
122 pub fn catalog(&self) -> &Catalog {
123 self.snapshot.view().catalog_ref()
124 }
125
126 /// Returns visible element count.
127 ///
128 /// # Performance
129 ///
130 /// This method is `O(base + overlay change)`.
131 #[must_use]
132 pub fn element_count(&self) -> usize {
133 self.snapshot.view().element_count()
134 }
135
136 /// Returns visible relation count.
137 ///
138 /// # Performance
139 ///
140 /// This method is `O(base + overlay change)`.
141 #[must_use]
142 pub fn relation_count(&self) -> usize {
143 self.snapshot.view().relation_count()
144 }
145
146 /// Returns visible incidence count.
147 ///
148 /// # Performance
149 ///
150 /// This method is `O(base + overlay change)`.
151 #[must_use]
152 pub fn incidence_count(&self) -> usize {
153 self.snapshot.view().incidence_count()
154 }
155
156 /// Returns every visible element id in id order.
157 ///
158 /// # Performance
159 ///
160 /// This method is `O(element count)`.
161 #[must_use]
162 pub fn element_ids(&self) -> Vec<ElementId> {
163 self.snapshot
164 .view()
165 .elements()
166 .map(|record| record.id)
167 .collect()
168 }
169
170 /// Returns every visible relation id in id order.
171 ///
172 /// # Performance
173 ///
174 /// This method is `O(relation count)`.
175 #[must_use]
176 pub fn relation_ids(&self) -> Vec<RelationId> {
177 self.snapshot
178 .view()
179 .relations()
180 .map(|record| record.id)
181 .collect()
182 }
183
184 /// Returns whether an element exists.
185 ///
186 /// # Performance
187 ///
188 /// This method is `O(log change + log n)`.
189 #[must_use]
190 pub fn contains_element(&self, id: ElementId) -> bool {
191 self.snapshot.view().contains_element(id)
192 }
193
194 /// Returns whether a relation exists.
195 ///
196 /// # Performance
197 ///
198 /// This method is `O(log change + log n)`.
199 #[must_use]
200 pub fn contains_relation(&self, id: RelationId) -> bool {
201 self.snapshot.view().contains_relation(id)
202 }
203
204 /// Returns whether an incidence exists.
205 ///
206 /// # Performance
207 ///
208 /// This method is `O(log change + log n)`.
209 #[must_use]
210 pub fn contains_incidence(&self, id: IncidenceId) -> bool {
211 self.snapshot.view().contains_incidence(id)
212 }
213
214 /// Returns an owned element view — id, labels, and all properties read in one
215 /// call.
216 ///
217 /// # Performance
218 ///
219 /// This method is `O(log n + label count + property count)`.
220 #[must_use]
221 pub fn element(&self, id: ElementId) -> Option<Element> {
222 let view = self.snapshot.view();
223 let record = view.element_ref(id)?;
224 let labels = record.labels.iter().collect();
225 let properties =
226 Properties::from_pairs(view.subject_properties(PropertySubject::Element(id)));
227 Some(Element::new(id, labels, properties))
228 }
229
230 /// Returns an owned relation view — id, type, labels, and all properties read
231 /// in one call.
232 ///
233 /// # Performance
234 ///
235 /// This method is `O(log n + label count + property count)`.
236 #[must_use]
237 pub fn relation(&self, id: RelationId) -> Option<Relation> {
238 let view = self.snapshot.view();
239 let record = view.relation_ref(id)?;
240 let labels = record.labels.iter().collect();
241 let properties =
242 Properties::from_pairs(view.subject_properties(PropertySubject::Relation(id)));
243 Some(Relation::new(id, record.relation_type, labels, properties))
244 }
245
246 /// Returns an owned incidence record.
247 ///
248 /// # Performance
249 ///
250 /// This method is `O(log change + log n)`.
251 #[must_use]
252 pub fn incidence(&self, id: IncidenceId) -> Option<IncidenceRecord> {
253 self.snapshot.view().incidence_ref(id).map(Cow::into_owned)
254 }
255
256 /// Returns every visible incidence attached to an element, in ascending
257 /// incidence-id order.
258 ///
259 /// The merged set mixes overlay-owned and base-borrowed records, so this
260 /// returns an owned [`Vec`] ([`IncidenceRecord`] is [`Copy`], so the copy is
261 /// cheap).
262 ///
263 /// # Performance
264 ///
265 /// This method is `O(base incidences + overlay incidence change)`.
266 #[must_use]
267 pub fn element_incidences(&self, id: ElementId) -> Vec<IncidenceRecord> {
268 self.snapshot.view().element_incidences(id)
269 }
270
271 /// Returns every visible incidence carried by a relation, in ascending
272 /// incidence-id order — the n-ary analogue of [`Self::endpoints`].
273 ///
274 /// Each record pairs a participant element with the structural role it plays,
275 /// so a consumer can read back a hyperedge's full roled participant set.
276 ///
277 /// # Performance
278 ///
279 /// This method is `O(degree)` over the relation's incidences.
280 #[must_use]
281 pub fn relation_incidences(&self, relation: RelationId) -> Vec<IncidenceRecord> {
282 self.snapshot.view().relation_incidences(relation)
283 }
284
285 /// Returns a binary relation's two endpoint elements, ordered by ascending
286 /// incidence id.
287 ///
288 /// Reads the relation's incidences from the reverse-adjacency index and
289 /// returns the elements carried by its first two incidences in id order. A
290 /// relation with fewer than two visible incidences returns `None`. This
291 /// reports endpoints structurally, without consulting any projection's
292 /// source/target roles — use [`Self::neighbors`] when role direction matters.
293 ///
294 /// # Performance
295 ///
296 /// This method is `O(degree)` over the relation's incidences.
297 #[must_use]
298 pub fn endpoints(&self, relation: RelationId) -> Option<(ElementId, ElementId)> {
299 let incidences = self.snapshot.view().relation_incidences(relation);
300 match incidences.as_slice() {
301 [first, second, ..] => Some((first.element, second.element)),
302 _too_few => None,
303 }
304 }
305
306 /// Returns the elements reachable from `element` along relations of
307 /// `relation_type`, in ascending element-id order.
308 ///
309 /// Direction selects the role `element` must play on each relation. Endpoint
310 /// roles are encoded by incidence-creation order: a binary relation's source
311 /// is its lower incidence id and its target the higher (see
312 /// [`Self::endpoints`]). `Outgoing` requires `element` to be the source (and
313 /// yields the target), `Incoming` requires it to be the target (and yields
314 /// the source), and `Both` yields the opposite endpoint either way. Resolved
315 /// over the reverse-adjacency index — each incidence of `element` whose
316 /// relation has the requested type contributes that relation's other
317 /// endpoint — so this works for any binary relation without a materialized
318 /// projection.
319 ///
320 /// # Performance
321 ///
322 /// This method is `O(degree of element + sum of touched relation degrees)`.
323 #[must_use]
324 pub fn neighbors(
325 &self,
326 element: ElementId,
327 relation_type: RelationTypeId,
328 direction: Direction,
329 ) -> Vec<ElementId> {
330 let view = self.snapshot.view();
331 let mut neighbors = BTreeSet::new();
332 for incidence in view.element_incidences(element) {
333 let matches_type = view
334 .relation_ref(incidence.relation)
335 .is_some_and(|record| record.relation_type == Some(relation_type));
336 if !matches_type {
337 continue;
338 }
339 // The incidence id encodes the endpoint role: the source endpoint is
340 // created first (lower incidence id), the target second. Compare
341 // `element`'s incidence id against each other endpoint's to decide
342 // which side `element` is on, then follow per the requested direction.
343 neighbors.extend(
344 view.relation_incidences(incidence.relation)
345 .into_iter()
346 .filter(|other| other.element != element)
347 .filter(|other| follow_direction(direction, incidence.id, other.id))
348 .map(|other| other.element),
349 );
350 }
351 neighbors.into_iter().collect()
352 }
353
354 /// Returns one owned property value.
355 ///
356 /// Prefer [`Self::value`] (borrowed `Cow`) or [`Self::text`] (borrowed
357 /// `&str`) when the value does not need to outlive this reader; owning is
358 /// correct only across commit boundaries.
359 ///
360 /// # Performance
361 ///
362 /// This method is `O(log subjects + log keys)` plus one value clone
363 /// (`O(1)` for every variant — text payloads are `Arc`-shared).
364 #[must_use]
365 pub fn property(&self, subject: PropertySubject, key: PropertyKeyId) -> Option<PropertyValue> {
366 self.snapshot
367 .view()
368 .property_ref(subject, key)
369 .map(Cow::into_owned)
370 }
371
372 /// Returns one property value borrowed from this reader's pinned snapshot:
373 /// `Cow::Borrowed` for any committed value, `Cow::Owned` only when the
374 /// value lives in the published overlay's delta.
375 ///
376 /// # Performance
377 ///
378 /// This method is `O(log subjects + log keys)`; the borrowed arm clones
379 /// nothing.
380 #[must_use]
381 pub fn value(
382 &self,
383 subject: PropertySubject,
384 key: PropertyKeyId,
385 ) -> Option<Cow<'_, PropertyValue>> {
386 self.snapshot.view().property_ref(subject, key)
387 }
388
389 /// Returns one text property's `Arc`-shared payload, or `None` when the
390 /// property is absent or not text.
391 ///
392 /// # Performance
393 ///
394 /// This method is `O(log subjects + log keys)`; the text bytes are never
395 /// copied (the shared payload is reference-counted for both base- and
396 /// overlay-resident values).
397 #[must_use]
398 pub fn text(&self, subject: PropertySubject, key: PropertyKeyId) -> Option<Arc<str>> {
399 match self.snapshot.view().property_ref(subject, key)?.as_ref() {
400 PropertyValue::Text(text) => Some(Arc::clone(text)),
401 _other => None,
402 }
403 }
404
405 /// Returns the owned element whose value in `index` equals `value`, or `None`
406 /// when no element matches.
407 ///
408 /// # Errors
409 ///
410 /// Returns [`DbError`] when the index is unknown or is not an equality index.
411 ///
412 /// # Performance
413 ///
414 /// This method is `O(log n + label count + property count)`.
415 pub fn element_by_key<T: ValueType>(
416 &self,
417 index: EqualityIndex<T>,
418 value: impl Assignable<T>,
419 ) -> Result<Option<Element>, DbError> {
420 let value = value.into_value()?;
421 let matched = self
422 .lookup(index.id(), IndexProbe::Equal(&value))?
423 .into_iter()
424 .find_map(|subject| match subject {
425 PropertySubject::Element(id) => Some(id),
426 PropertySubject::Relation(_) | PropertySubject::Incidence(_) => None,
427 });
428 Ok(matched.and_then(|id| self.element(id)))
429 }
430
431 /// Returns the number of subjects carried by a membership index (a label or
432 /// relation-type index).
433 ///
434 /// # Errors
435 ///
436 /// Returns [`DbError`] when the index is unknown or does not support
437 /// membership enumeration.
438 ///
439 /// # Performance
440 ///
441 /// This method is `O(indexed family size)`.
442 pub fn count(&self, index: IndexId) -> Result<usize, DbError> {
443 self.lookup(index, IndexProbe::All)
444 .map(|subjects| subjects.len())
445 }
446
447 /// Looks up subjects with a property value.
448 ///
449 /// # Errors
450 ///
451 /// Returns [`DbError`] when the property key is unknown or `value` does not
452 /// match the key schema.
453 ///
454 /// # Performance
455 ///
456 /// This method is `O(property subject count)`.
457 pub fn lookup_property_equal(
458 &self,
459 key: PropertyKeyId,
460 value: &PropertyValue,
461 ) -> Result<Vec<PropertySubject>, DbError> {
462 self.snapshot.view().typed_property_equal(key, value)
463 }
464
465 /// Looks up subjects with a property inside an inclusive range.
466 ///
467 /// # Errors
468 ///
469 /// Returns [`DbError`] when the property key is unknown or either bound
470 /// does not match the key schema.
471 ///
472 /// # Performance
473 ///
474 /// This method is `O(property subject count)`.
475 pub fn lookup_property_range(
476 &self,
477 key: PropertyKeyId,
478 min: &PropertyValue,
479 max: &PropertyValue,
480 ) -> Result<Vec<PropertySubject>, DbError> {
481 self.snapshot.view().typed_property_range(key, min, max)
482 }
483
484 /// Executes an index lookup.
485 ///
486 /// # Errors
487 ///
488 /// Returns [`DbError`] when the index is unknown, the lookup shape does not
489 /// match the index kind, or supplied property values do not match catalog
490 /// schemas.
491 ///
492 /// # Performance
493 ///
494 /// This method is `O(indexed family size)`.
495 pub fn lookup(
496 &self,
497 index: IndexId,
498 lookup: IndexProbe<'_>,
499 ) -> Result<Vec<PropertySubject>, DbError> {
500 let view = self.snapshot.view();
501 let entry = view
502 .catalog()
503 .index(index)
504 .ok_or_else(|| DbError::unknown(index))?;
505 match (&entry.definition, lookup) {
506 (IndexDefinition::Label { label }, IndexProbe::All) => Ok(view
507 .elements_with_label(*label)
508 .into_iter()
509 .map(PropertySubject::Element)
510 .collect()),
511 (IndexDefinition::Label { .. }, _lookup) => {
512 Err(DbError::unsupported("label index expects all lookup"))
513 }
514 (IndexDefinition::RelationType { relation_type }, IndexProbe::All) => Ok(view
515 .relations_with_type(*relation_type)
516 .into_iter()
517 .map(PropertySubject::Relation)
518 .collect()),
519 (IndexDefinition::RelationType { .. }, _lookup) => Err(DbError::unsupported(
520 "relation type index expects all lookup",
521 )),
522 (IndexDefinition::PropertyEquality { key }, IndexProbe::Equal(value)) => {
523 view.typed_property_equal(*key, value)
524 }
525 (IndexDefinition::PropertyEquality { .. }, _lookup) => Err(DbError::unsupported(
526 "property equality index expects equality lookup",
527 )),
528 (IndexDefinition::PropertyRange { key }, IndexProbe::Range { min, max }) => {
529 view.typed_property_range(*key, min, max)
530 }
531 (IndexDefinition::PropertyRange { .. }, _lookup) => Err(DbError::unsupported(
532 "property range index expects range lookup",
533 )),
534 (IndexDefinition::CompositeEquality { keys }, IndexProbe::Composite(values)) => {
535 view.typed_property_composite_equal(keys, values)
536 }
537 (IndexDefinition::CompositeEquality { .. }, _lookup) => Err(DbError::unsupported(
538 "composite equality index expects composite equality lookup",
539 )),
540 (IndexDefinition::Projection { projection }, IndexProbe::All) => {
541 self.projection_index_subjects(*projection)
542 }
543 (IndexDefinition::Projection { .. }, _lookup) => {
544 Err(DbError::unsupported("projection index expects all lookup"))
545 }
546 }
547 }
548
549 /// Materializes a graph projection.
550 ///
551 /// # Errors
552 ///
553 /// Returns [`DbError`] when the projection is unknown, is not a graph, or
554 /// fails validation against current topology.
555 ///
556 /// # Performance
557 ///
558 /// This method is `O(relation count * incidence count)`.
559 pub fn graph_projection(&self, id: ProjectionId) -> Result<GraphProjection, DbError> {
560 let view = self.snapshot.view();
561 let entry = view
562 .catalog()
563 .projection(id)
564 .ok_or_else(|| DbError::unknown(id))?;
565 match &entry.definition {
566 ProjectionDefinition::Graph(definition) => {
567 projection::GraphProjection::from_state(&view, definition.clone())
568 }
569 ProjectionDefinition::Hypergraph(_definition) => {
570 Err(DbError::invalid_projection("projection is not a graph"))
571 }
572 }
573 }
574
575 /// Materializes a graph projection by catalog name.
576 ///
577 /// # Errors
578 ///
579 /// Returns [`DbError`] when the projection is unknown, is not a graph, or
580 /// fails validation against current topology.
581 ///
582 /// # Performance
583 ///
584 /// This method is `O(log projection count + relation count * incidence count)`.
585 pub fn graph_projection_by_name(&self, name: &str) -> Result<GraphProjection, DbError> {
586 let id = self
587 .snapshot
588 .view()
589 .catalog()
590 .projection_id(name)
591 .ok_or_else(|| DbError::unsupported(format!("unknown projection {name}")))?;
592 self.graph_projection(id)
593 }
594
595 /// Walks a cataloged graph projection from canonical seed elements,
596 /// returning the discovered nodes AND the projection edges among them.
597 ///
598 /// Nodes are unique canonical elements in BFS first-discovery order; depth is
599 /// the shortest discovered hop count from any seed. Edges connect two
600 /// discovered nodes, ordered deterministically and unique by relation, so the
601 /// [`Subgraph`] never references a node it omitted.
602 ///
603 /// # Errors
604 ///
605 /// Returns [`DbError`] when the projection is unknown, is not a graph,
606 /// cannot be materialized, or a seed element is not part of the projection.
607 ///
608 /// # Performance
609 ///
610 /// This method is `O(relation count * incidence count + visited edges)`.
611 pub fn walk(
612 &self,
613 projection: ProjectionId,
614 seeds: &[ElementId],
615 walk: Walk,
616 ) -> Result<Subgraph, DbError> {
617 if seeds.is_empty() || walk.limit == 0 {
618 return Ok(Subgraph::default());
619 }
620 let graph = self.graph_projection(projection)?;
621 traversal::walk_graph_projection(&graph, seeds, walk)
622 }
623
624 /// Ranks a cataloged graph projection by personalized `PageRank`, returning
625 /// every projection element paired with its rank, ordered highest first.
626 ///
627 /// `seeds` are the restart (teleport) set: rank mass returns to them on each
628 /// damping step, biasing the stationary distribution toward elements
629 /// reachable from the seeds (random walk with restart). The seed weights are
630 /// normalized internally, so passing the seed elements is sufficient. With no
631 /// seeds this is the uniform-teleport `PageRank` over the projection.
632 ///
633 /// # Errors
634 ///
635 /// Returns [`DbError`] when the projection is unknown, is not a graph, cannot
636 /// be materialized, or `PageRank` rejects the configuration (the
637 /// [`PageRankConfig`] was invalid or the power iteration did not converge).
638 /// Seeds absent from the projection are ignored rather than erroring; with no
639 /// resolvable seed this is the uniform-teleport rank.
640 ///
641 /// # Performance
642 ///
643 /// This method is `O(relation count * incidence count + iterations *
644 /// (visible elements + visible edges) + visible elements * log(visible
645 /// elements))` — the trailing term is the final rank sort.
646 pub fn personalized_pagerank(
647 &self,
648 projection: ProjectionId,
649 seeds: &[ElementId],
650 config: PageRankConfig<f64>,
651 ) -> Result<Vec<(ElementId, f64)>, DbError> {
652 let graph = self.graph_projection(projection)?;
653 let bound = graph.element_bound();
654 let element_count = u32::try_from(bound).map_err(|_| {
655 DbError::traversal("projection exceeds the personalized pagerank index bound")
656 })?;
657
658 let mut personalization = vec![0.0_f64; bound];
659 let mut seeded = false;
660 for &seed in seeds {
661 if let Some(local) = graph.local_element_id(seed) {
662 personalization[graph.element_index(local)] = 1.0;
663 seeded = true;
664 }
665 }
666
667 let mut ranks = vec![0.0_f64; bound];
668 let mut workspace = PageRankWorkspace::for_graph(&graph);
669 pagerank_graph_with_workspace(
670 &graph,
671 &Uniform,
672 (0..element_count).map(ProjectionElementId::new),
673 config,
674 seeded.then_some(personalization.as_slice()),
675 &mut ranks,
676 &mut workspace,
677 )
678 .map_err(|error| {
679 DbError::traversal(match error {
680 PageRankError::InvalidDamping { .. }
681 | PageRankError::InvalidTolerance { .. }
682 | PageRankError::InvalidMaxIterations => "invalid pagerank configuration",
683 PageRankError::NonConverged { .. } => "personalized pagerank did not converge",
684 _ => "personalized pagerank failed",
685 })
686 })?;
687
688 let mut ranked: Vec<(ElementId, f64)> = (0..element_count)
689 .map(|index| {
690 let local = ProjectionElementId::new(index);
691 (
692 graph.canonical_element_id(local),
693 ranks[graph.element_index(local)],
694 )
695 })
696 .collect();
697 ranked.sort_by(|left, right| right.1.total_cmp(&left.1));
698 Ok(ranked)
699 }
700
701 /// Ranks a cataloged hypergraph projection by personalized `PageRank`,
702 /// returning both participant elements and hyperedges paired with their
703 /// ranks, each ordered highest first.
704 ///
705 /// The hypergraph analogue of [`Self::personalized_pagerank`]. `seeds` are
706 /// the restart (teleport) set over participant elements; their weights are
707 /// normalized internally. With no resolvable seed this is the uniform-teleport
708 /// rank. Because the hypergraph walk alternates element → hyperedge →
709 /// element, ranking both surfaces central participants *and* central
710 /// hyperedges (e.g. impl groups or modules as whole units).
711 ///
712 /// # Errors
713 ///
714 /// Returns [`DbError`] when the projection is unknown, is not a hypergraph,
715 /// cannot be materialized, or `PageRank` rejects the configuration (invalid
716 /// [`PageRankConfig`] or non-convergence). Seeds absent from the projection
717 /// are ignored.
718 ///
719 /// # Performance
720 ///
721 /// This method is `O(relation count * incidence count + iterations *
722 /// (elements + relations + incidences) + (elements + relations) * log(...))`
723 /// — the trailing terms are the two final rank sorts.
724 pub fn personalized_hypergraph_pagerank(
725 &self,
726 projection: ProjectionId,
727 seeds: &[ElementId],
728 config: PageRankConfig<f64>,
729 ) -> Result<HypergraphPageRank, DbError> {
730 let hypergraph = self.hypergraph_projection(projection)?;
731 let element_bound = hypergraph.element_bound();
732 let relation_bound = hypergraph.relation_bound();
733 let bound_error =
734 || DbError::traversal("projection exceeds the personalized pagerank index bound");
735 let element_count = u32::try_from(element_bound).map_err(|_| bound_error())?;
736 let relation_count = u32::try_from(relation_bound).map_err(|_| bound_error())?;
737
738 // The teleport vector spans participants then hyperedges; seeds are
739 // participant elements, so only the element prefix is personalized.
740 let mut personalization = vec![0.0_f64; element_bound.saturating_add(relation_bound)];
741 let mut seeded = false;
742 for &seed in seeds {
743 if let Some(local) = hypergraph.local_element_id(seed) {
744 personalization[hypergraph.element_index(local)] = 1.0;
745 seeded = true;
746 }
747 }
748
749 let mut element_ranks = vec![0.0_f64; element_bound];
750 let mut relation_ranks = vec![0.0_f64; relation_bound];
751 let mut workspace = HypergraphPageRankWorkspace::for_hypergraph(&hypergraph);
752 pagerank_hypergraph_with_workspace(
753 &hypergraph,
754 &Uniform,
755 (0..element_count).map(ProjectionElementId::new),
756 (0..relation_count).map(ProjectionRelationId::new),
757 config,
758 seeded.then_some(personalization.as_slice()),
759 &mut element_ranks,
760 &mut relation_ranks,
761 &mut workspace,
762 )
763 .map_err(|error| {
764 DbError::traversal(match error {
765 PageRankError::InvalidDamping { .. }
766 | PageRankError::InvalidTolerance { .. }
767 | PageRankError::InvalidMaxIterations => "invalid pagerank configuration",
768 PageRankError::NonConverged { .. } => "personalized pagerank did not converge",
769 _ => "personalized pagerank failed",
770 })
771 })?;
772
773 let mut elements: Vec<(ElementId, f64)> = (0..element_count)
774 .map(|index| {
775 let local = ProjectionElementId::new(index);
776 (
777 hypergraph.canonical_element_id(local),
778 element_ranks[hypergraph.element_index(local)],
779 )
780 })
781 .collect();
782 elements.sort_by(|left, right| right.1.total_cmp(&left.1));
783
784 let mut relations: Vec<(RelationId, f64)> = (0..relation_count)
785 .map(|index| {
786 let local = ProjectionRelationId::new(index);
787 (
788 hypergraph.canonical_relation_id(local),
789 relation_ranks[hypergraph.relation_index(local)],
790 )
791 })
792 .collect();
793 relations.sort_by(|left, right| right.1.total_cmp(&left.1));
794
795 Ok(HypergraphPageRank {
796 elements,
797 relations,
798 })
799 }
800
801 /// Returns the longest chain of canonical elements along the projection's
802 /// outgoing edges within the subgraph induced by `elements`.
803 ///
804 /// Only edges whose endpoints are both in `elements` participate. The path
805 /// lists each element once from start to end; its length in edges is
806 /// `path.len() - 1`. An empty `elements` slice yields an empty path.
807 ///
808 /// # Errors
809 ///
810 /// Returns [`DbError`] when the projection is unknown, is not a graph, cannot
811 /// be materialized, or the induced subgraph contains a cycle. Elements absent
812 /// from the projection are ignored, so the chain is computed over the present
813 /// subset.
814 ///
815 /// # Performance
816 ///
817 /// This method is `O(relation count * incidence count + visible elements +
818 /// visible edges)`.
819 pub fn longest_path(
820 &self,
821 projection: ProjectionId,
822 elements: &[ElementId],
823 ) -> Result<Vec<ElementId>, DbError> {
824 if elements.is_empty() {
825 return Ok(Vec::new());
826 }
827 let graph = self.graph_projection(projection)?;
828 let locals = elements
829 .iter()
830 .filter_map(|&element| graph.local_element_id(element))
831 .collect::<Vec<ProjectionElementId>>();
832 let path = longest_path_dag(&graph, &locals)
833 .map_err(|_| DbError::traversal("longest path found a cycle"))?;
834 Ok(path
835 .into_iter()
836 .map(|local| graph.canonical_element_id(local))
837 .collect())
838 }
839
840 /// Materializes a hypergraph projection.
841 ///
842 /// # Errors
843 ///
844 /// Returns [`DbError`] when the projection is unknown, is not a hypergraph,
845 /// or fails validation against current topology.
846 ///
847 /// # Performance
848 ///
849 /// This method is `O(relation count * incidence count)`.
850 pub fn hypergraph_projection(&self, id: ProjectionId) -> Result<HypergraphProjection, DbError> {
851 let view = self.snapshot.view();
852 let entry = view
853 .catalog()
854 .projection(id)
855 .ok_or_else(|| DbError::unknown(id))?;
856 match &entry.definition {
857 ProjectionDefinition::Hypergraph(definition) => {
858 projection::HypergraphProjection::from_state(&view, definition.clone())
859 }
860 ProjectionDefinition::Graph(_definition) => Err(DbError::invalid_projection(
861 "projection is not a hypergraph",
862 )),
863 }
864 }
865
866 /// Executes a prepared query.
867 ///
868 /// # Errors
869 ///
870 /// Returns [`DbError`] when execution cannot materialize a referenced
871 /// projection.
872 ///
873 /// # Performance
874 ///
875 /// This method is `O(plan output + projection build cost when used)`.
876 pub fn run(&self, query: &PreparedQuery) -> Result<QueryResult, DbError> {
877 query.execute(&self.snapshot.view())
878 }
879
880 /// Explains a prepared query.
881 ///
882 /// # Performance
883 ///
884 /// This method is `O(plan size)`.
885 #[must_use]
886 pub fn explain(&self, query: &PreparedQuery) -> String {
887 query.explain()
888 }
889
890 /// Materializes subjects represented by a projection index.
891 ///
892 /// # Errors
893 ///
894 /// Returns [`DbError`] when the projection is unknown or cannot be
895 /// materialized.
896 ///
897 /// # Performance
898 ///
899 /// This method is `O(relation count * incidence count)`.
900 fn projection_index_subjects(
901 &self,
902 projection: ProjectionId,
903 ) -> Result<Vec<PropertySubject>, DbError> {
904 let view = self.snapshot.view();
905 let entry = view
906 .catalog()
907 .projection(projection)
908 .ok_or_else(|| DbError::unknown(projection))?;
909 match &entry.definition {
910 ProjectionDefinition::Graph(definition) => {
911 Ok(projection::GraphProjection::from_state(&view, definition.clone())?.subjects())
912 }
913 ProjectionDefinition::Hypergraph(definition) => Ok(
914 projection::HypergraphProjection::from_state(&view, definition.clone())?.subjects(),
915 ),
916 }
917 }
918}