use std::{borrow::Cow, collections::BTreeSet, sync::Arc};
use oxgraph_algo::{
PageRankConfig, PageRankError, PageRankWorkspace, Uniform, longest_path_dag,
pagerank_graph_with_workspace,
};
use oxgraph_graph::{CanonicalElementIdentity, DenseElementIndex, LocalElementIdentity};
use super::IndexProbe;
use crate::{
Catalog, CheckpointGeneration, CommitSeq, DbError, Element, ElementId, IncidenceId,
IncidenceRecord, IndexId, PreparedQuery, ProjectionDefinition, ProjectionId, Properties,
PropertyKeyId, PropertySubject, PropertyValue, QueryResult, Relation, RelationId,
RelationTypeId,
catalog::IndexDefinition,
overlay::{Snapshot, StateView},
projection::{self, GraphProjection, HypergraphProjection, ProjectionElementId},
traversal::{self, Direction, Subgraph, Walk},
typed::{Assignable, EqualityIndex, ValueType},
};
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct ReadPin {
pub visible_commit_seq: CommitSeq,
pub generation: CheckpointGeneration,
}
pub struct Reader {
pub(super) snapshot: Arc<Snapshot>,
}
const fn follow_direction(direction: Direction, from: IncidenceId, to: IncidenceId) -> bool {
match direction {
Direction::Outgoing => from.get() < to.get(),
Direction::Incoming => from.get() > to.get(),
Direction::Both => true,
}
}
const fn assert_send_sync<T: Send + Sync>() {}
const _: () = assert_send_sync::<Reader>();
const _: () = assert_send_sync::<Arc<Snapshot>>();
impl Reader {
#[must_use]
pub fn pin(&self) -> ReadPin {
ReadPin {
visible_commit_seq: self.snapshot.lsn(),
generation: self.snapshot.generation(),
}
}
#[must_use]
pub fn catalog(&self) -> &Catalog {
self.snapshot.view().catalog_ref()
}
#[must_use]
pub fn element_count(&self) -> usize {
self.snapshot.view().element_count()
}
#[must_use]
pub fn relation_count(&self) -> usize {
self.snapshot.view().relation_count()
}
#[must_use]
pub fn incidence_count(&self) -> usize {
self.snapshot.view().incidence_count()
}
#[must_use]
pub fn element_ids(&self) -> Vec<ElementId> {
self.snapshot
.view()
.elements()
.map(|record| record.id)
.collect()
}
#[must_use]
pub fn relation_ids(&self) -> Vec<RelationId> {
self.snapshot
.view()
.relations()
.map(|record| record.id)
.collect()
}
#[must_use]
pub fn contains_element(&self, id: ElementId) -> bool {
self.snapshot.view().contains_element(id)
}
#[must_use]
pub fn contains_relation(&self, id: RelationId) -> bool {
self.snapshot.view().contains_relation(id)
}
#[must_use]
pub fn contains_incidence(&self, id: IncidenceId) -> bool {
self.snapshot.view().contains_incidence(id)
}
#[must_use]
pub fn element(&self, id: ElementId) -> Option<Element> {
let view = self.snapshot.view();
let record = view.element_ref(id)?;
let labels = record.labels.iter().collect();
let properties =
Properties::from_pairs(view.subject_properties(PropertySubject::Element(id)));
Some(Element::new(id, labels, properties))
}
#[must_use]
pub fn relation(&self, id: RelationId) -> Option<Relation> {
let view = self.snapshot.view();
let record = view.relation_ref(id)?;
let labels = record.labels.iter().collect();
let properties =
Properties::from_pairs(view.subject_properties(PropertySubject::Relation(id)));
Some(Relation::new(id, record.relation_type, labels, properties))
}
#[must_use]
pub fn incidence(&self, id: IncidenceId) -> Option<IncidenceRecord> {
self.snapshot.view().incidence_ref(id).map(Cow::into_owned)
}
#[must_use]
pub fn element_incidences(&self, id: ElementId) -> Vec<IncidenceRecord> {
self.snapshot.view().element_incidences(id)
}
#[must_use]
pub fn endpoints(&self, relation: RelationId) -> Option<(ElementId, ElementId)> {
let incidences = self.snapshot.view().relation_incidences(relation);
match incidences.as_slice() {
[first, second, ..] => Some((first.element, second.element)),
_too_few => None,
}
}
#[must_use]
pub fn neighbors(
&self,
element: ElementId,
relation_type: RelationTypeId,
direction: Direction,
) -> Vec<ElementId> {
let view = self.snapshot.view();
let mut neighbors = BTreeSet::new();
for incidence in view.element_incidences(element) {
let matches_type = view
.relation_ref(incidence.relation)
.is_some_and(|record| record.relation_type == Some(relation_type));
if !matches_type {
continue;
}
neighbors.extend(
view.relation_incidences(incidence.relation)
.into_iter()
.filter(|other| other.element != element)
.filter(|other| follow_direction(direction, incidence.id, other.id))
.map(|other| other.element),
);
}
neighbors.into_iter().collect()
}
#[must_use]
pub fn property(&self, subject: PropertySubject, key: PropertyKeyId) -> Option<PropertyValue> {
self.snapshot
.view()
.property_ref(subject, key)
.map(Cow::into_owned)
}
#[must_use]
pub fn value(
&self,
subject: PropertySubject,
key: PropertyKeyId,
) -> Option<Cow<'_, PropertyValue>> {
self.snapshot.view().property_ref(subject, key)
}
#[must_use]
pub fn text(&self, subject: PropertySubject, key: PropertyKeyId) -> Option<Arc<str>> {
match self.snapshot.view().property_ref(subject, key)?.as_ref() {
PropertyValue::Text(text) => Some(Arc::clone(text)),
_other => None,
}
}
pub fn element_by_key<T: ValueType>(
&self,
index: EqualityIndex<T>,
value: impl Assignable<T>,
) -> Result<Option<Element>, DbError> {
let value = value.into_value()?;
let matched = self
.lookup(index.id(), IndexProbe::Equal(&value))?
.into_iter()
.find_map(|subject| match subject {
PropertySubject::Element(id) => Some(id),
PropertySubject::Relation(_) | PropertySubject::Incidence(_) => None,
});
Ok(matched.and_then(|id| self.element(id)))
}
pub fn count(&self, index: IndexId) -> Result<usize, DbError> {
self.lookup(index, IndexProbe::All)
.map(|subjects| subjects.len())
}
pub fn lookup_property_equal(
&self,
key: PropertyKeyId,
value: &PropertyValue,
) -> Result<Vec<PropertySubject>, DbError> {
self.snapshot.view().typed_property_equal(key, value)
}
pub fn lookup_property_range(
&self,
key: PropertyKeyId,
min: &PropertyValue,
max: &PropertyValue,
) -> Result<Vec<PropertySubject>, DbError> {
self.snapshot.view().typed_property_range(key, min, max)
}
pub fn lookup(
&self,
index: IndexId,
lookup: IndexProbe<'_>,
) -> Result<Vec<PropertySubject>, DbError> {
let view = self.snapshot.view();
let entry = view
.catalog()
.index(index)
.ok_or_else(|| DbError::unknown(index))?;
match (&entry.definition, lookup) {
(IndexDefinition::Label { label }, IndexProbe::All) => Ok(view
.elements_with_label(*label)
.into_iter()
.map(PropertySubject::Element)
.collect()),
(IndexDefinition::Label { .. }, _lookup) => {
Err(DbError::unsupported("label index expects all lookup"))
}
(IndexDefinition::RelationType { relation_type }, IndexProbe::All) => Ok(view
.relations_with_type(*relation_type)
.into_iter()
.map(PropertySubject::Relation)
.collect()),
(IndexDefinition::RelationType { .. }, _lookup) => Err(DbError::unsupported(
"relation type index expects all lookup",
)),
(IndexDefinition::PropertyEquality { key }, IndexProbe::Equal(value)) => {
view.typed_property_equal(*key, value)
}
(IndexDefinition::PropertyEquality { .. }, _lookup) => Err(DbError::unsupported(
"property equality index expects equality lookup",
)),
(IndexDefinition::PropertyRange { key }, IndexProbe::Range { min, max }) => {
view.typed_property_range(*key, min, max)
}
(IndexDefinition::PropertyRange { .. }, _lookup) => Err(DbError::unsupported(
"property range index expects range lookup",
)),
(IndexDefinition::CompositeEquality { keys }, IndexProbe::Composite(values)) => {
view.typed_property_composite_equal(keys, values)
}
(IndexDefinition::CompositeEquality { .. }, _lookup) => Err(DbError::unsupported(
"composite equality index expects composite equality lookup",
)),
(IndexDefinition::Projection { projection }, IndexProbe::All) => {
self.projection_index_subjects(*projection)
}
(IndexDefinition::Projection { .. }, _lookup) => {
Err(DbError::unsupported("projection index expects all lookup"))
}
}
}
pub fn graph_projection(&self, id: ProjectionId) -> Result<GraphProjection, DbError> {
let view = self.snapshot.view();
let entry = view
.catalog()
.projection(id)
.ok_or_else(|| DbError::unknown(id))?;
match &entry.definition {
ProjectionDefinition::Graph(definition) => {
projection::GraphProjection::from_state(&view, definition.clone())
}
ProjectionDefinition::Hypergraph(_definition) => {
Err(DbError::invalid_projection("projection is not a graph"))
}
}
}
pub fn graph_projection_by_name(&self, name: &str) -> Result<GraphProjection, DbError> {
let id = self
.snapshot
.view()
.catalog()
.projection_id(name)
.ok_or_else(|| DbError::unsupported(format!("unknown projection {name}")))?;
self.graph_projection(id)
}
pub fn walk(
&self,
projection: ProjectionId,
seeds: &[ElementId],
walk: Walk,
) -> Result<Subgraph, DbError> {
if seeds.is_empty() || walk.limit == 0 {
return Ok(Subgraph::default());
}
let graph = self.graph_projection(projection)?;
traversal::walk_graph_projection(&graph, seeds, walk)
}
pub fn personalized_pagerank(
&self,
projection: ProjectionId,
seeds: &[ElementId],
config: PageRankConfig<f64>,
) -> Result<Vec<(ElementId, f64)>, DbError> {
let graph = self.graph_projection(projection)?;
let bound = graph.element_bound();
let element_count = u32::try_from(bound).map_err(|_| {
DbError::traversal("projection exceeds the personalized pagerank index bound")
})?;
let mut personalization = vec![0.0_f64; bound];
let mut seeded = false;
for &seed in seeds {
if let Some(local) = graph.local_element_id(seed) {
personalization[graph.element_index(local)] = 1.0;
seeded = true;
}
}
let mut ranks = vec![0.0_f64; bound];
let mut workspace = PageRankWorkspace::for_graph(&graph);
pagerank_graph_with_workspace(
&graph,
&Uniform,
(0..element_count).map(ProjectionElementId::new),
config,
seeded.then_some(personalization.as_slice()),
&mut ranks,
&mut workspace,
)
.map_err(|error| {
DbError::traversal(match error {
PageRankError::InvalidDamping { .. }
| PageRankError::InvalidTolerance { .. }
| PageRankError::InvalidMaxIterations => "invalid pagerank configuration",
PageRankError::NonConverged { .. } => "personalized pagerank did not converge",
_ => "personalized pagerank failed",
})
})?;
let mut ranked: Vec<(ElementId, f64)> = (0..element_count)
.map(|index| {
let local = ProjectionElementId::new(index);
(
graph.canonical_element_id(local),
ranks[graph.element_index(local)],
)
})
.collect();
ranked.sort_by(|left, right| right.1.total_cmp(&left.1));
Ok(ranked)
}
pub fn longest_path(
&self,
projection: ProjectionId,
elements: &[ElementId],
) -> Result<Vec<ElementId>, DbError> {
if elements.is_empty() {
return Ok(Vec::new());
}
let graph = self.graph_projection(projection)?;
let locals = elements
.iter()
.filter_map(|&element| graph.local_element_id(element))
.collect::<Vec<ProjectionElementId>>();
let path = longest_path_dag(&graph, &locals)
.map_err(|_| DbError::traversal("longest path found a cycle"))?;
Ok(path
.into_iter()
.map(|local| graph.canonical_element_id(local))
.collect())
}
pub fn hypergraph_projection(&self, id: ProjectionId) -> Result<HypergraphProjection, DbError> {
let view = self.snapshot.view();
let entry = view
.catalog()
.projection(id)
.ok_or_else(|| DbError::unknown(id))?;
match &entry.definition {
ProjectionDefinition::Hypergraph(definition) => {
projection::HypergraphProjection::from_state(&view, definition.clone())
}
ProjectionDefinition::Graph(_definition) => Err(DbError::invalid_projection(
"projection is not a hypergraph",
)),
}
}
pub fn run(&self, query: &PreparedQuery) -> Result<QueryResult, DbError> {
query.execute(&self.snapshot.view())
}
#[must_use]
pub fn explain(&self, query: &PreparedQuery) -> String {
query.explain()
}
fn projection_index_subjects(
&self,
projection: ProjectionId,
) -> Result<Vec<PropertySubject>, DbError> {
let view = self.snapshot.view();
let entry = view
.catalog()
.projection(projection)
.ok_or_else(|| DbError::unknown(projection))?;
match &entry.definition {
ProjectionDefinition::Graph(definition) => {
Ok(projection::GraphProjection::from_state(&view, definition.clone())?.subjects())
}
ProjectionDefinition::Hypergraph(definition) => Ok(
projection::HypergraphProjection::from_state(&view, definition.clone())?.subjects(),
),
}
}
}