use std::collections::HashSet;
use ipld_core::ipld::Ipld;
use crate::error::{Error, RepoError};
use crate::objects::{Edge, IndexSet, Node};
use crate::prolly::{self, Cursor, ProllyKey};
use crate::repo::readonly::{ReadonlyRepo, decode_from_store};
use super::adjacency::{load_incoming, load_outgoing};
use super::build::prop_value_hash;
#[derive(Clone, Debug)]
#[non_exhaustive]
pub enum PropPredicate {
Eq(Ipld),
}
impl PropPredicate {
pub fn eq(value: impl Into<Ipld>) -> Self {
Self::Eq(value.into())
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum Direction {
Outgoing,
Incoming,
}
#[derive(Clone, Debug)]
#[non_exhaustive]
pub struct QueryHit {
pub node: Node,
pub edges: Vec<Edge>,
pub incoming_edges: Vec<Edge>,
pub edges_truncated: bool,
}
impl QueryHit {
pub fn edges_by_label(&self, label: &str) -> Vec<&Edge> {
self.edges.iter().filter(|e| e.etype == label).collect()
}
pub fn edges_by_label_iter<'a>(
&'a self,
label: &'a str,
) -> impl Iterator<Item = &'a Edge> + 'a {
self.edges.iter().filter(move |e| e.etype == label)
}
pub fn incoming_by_label(&self, label: &str) -> Vec<&Edge> {
self.incoming_edges
.iter()
.filter(|e| e.etype == label)
.collect()
}
}
#[derive(Clone, Debug)]
pub struct Query<'a> {
repo: &'a ReadonlyRepo,
label: Option<String>,
prop_filter: Option<(String, PropPredicate)>,
with_outgoing: Vec<String>,
with_incoming: Vec<String>,
limit: Option<usize>,
adjacency_cap: usize,
}
impl<'a> Query<'a> {
pub const DEFAULT_ADJACENCY_CAP: usize = 10_000;
#[must_use]
pub const fn new(repo: &'a ReadonlyRepo) -> Self {
Self {
repo,
label: None,
prop_filter: None,
with_outgoing: Vec::new(),
with_incoming: Vec::new(),
limit: None,
adjacency_cap: Self::DEFAULT_ADJACENCY_CAP,
}
}
#[must_use]
pub fn label(mut self, label: impl Into<String>) -> Self {
self.label = Some(label.into());
self
}
#[must_use]
pub fn where_prop(mut self, name: impl Into<String>, pred: PropPredicate) -> Self {
self.prop_filter = Some((name.into(), pred));
self
}
#[must_use]
pub fn where_eq(self, name: impl Into<String>, value: impl Into<Ipld>) -> Self {
self.where_prop(name, PropPredicate::eq(value))
}
#[must_use]
pub fn with_outgoing(mut self, edge_label: impl Into<String>) -> Self {
self.with_outgoing.push(edge_label.into());
self
}
#[must_use]
pub fn with_incoming(mut self, edge_label: impl Into<String>) -> Self {
self.with_incoming.push(edge_label.into());
self
}
#[must_use]
pub fn with_any_direction(mut self, edge_label: impl Into<String>) -> Self {
let l = edge_label.into();
self.with_outgoing.push(l.clone());
self.with_incoming.push(l);
self
}
#[must_use]
pub const fn adjacency_cap(mut self, cap: usize) -> Self {
self.adjacency_cap = cap;
self
}
#[must_use]
pub const fn limit(mut self, n: usize) -> Self {
self.limit = Some(n);
self
}
pub fn first(mut self) -> Result<Option<QueryHit>, Error> {
self.limit = Some(1);
let mut hits = self.execute()?;
Ok(hits.pop())
}
pub fn one(mut self) -> Result<QueryHit, Error> {
self.limit = Some(2);
let hits = self.execute()?;
match hits.len() {
0 => Err(RepoError::NotFound.into()),
1 => Ok(hits.into_iter().next().expect("checked len")),
_ => Err(RepoError::AmbiguousMatch.into()),
}
}
pub fn execute(self) -> Result<Vec<QueryHit>, Error> {
let bs = self.repo.blockstore().clone();
let Some(commit) = self.repo.head_commit() else {
return Err(RepoError::Uninitialized.into());
};
let indexes = match &commit.indexes {
Some(idx_cid) => Some(decode_from_store::<IndexSet, _>(&*bs, idx_cid)?),
None => None,
};
let want_out: HashSet<&str> = self.with_outgoing.iter().map(String::as_str).collect();
let want_in: HashSet<&str> = self.with_incoming.iter().map(String::as_str).collect();
let adj_cap = self.adjacency_cap;
let mut hits: Vec<QueryHit> = Vec::new();
let cap = self.limit.unwrap_or(usize::MAX);
let build_hit = |node: Node, indexes: Option<&IndexSet>| -> Result<QueryHit, Error> {
let (out_edges, out_trunc) = load_outgoing(&*bs, indexes, node.id, &want_out, adj_cap)?;
let (mut in_edges, in_trunc) =
load_incoming(&*bs, indexes, node.id, &want_in, adj_cap)?;
if !in_edges.is_empty() && !out_edges.is_empty() {
let out_ids: HashSet<_> = out_edges.iter().map(|e| e.id).collect();
in_edges.retain(|e| {
!(e.src == e.dst && out_ids.contains(&e.id))
});
}
Ok(QueryHit {
node,
edges: out_edges,
incoming_edges: in_edges,
edges_truncated: out_trunc || in_trunc,
})
};
match (&self.label, &self.prop_filter, indexes.as_ref()) {
(Some(label), Some((prop, PropPredicate::Eq(value))), Some(idx)) => {
if let Some(tree_root) = idx.nodes_by_prop.get(label).and_then(|m| m.get(prop)) {
let key = ProllyKey::new(prop_value_hash(value)?);
if let Some(node_cid) = prolly::lookup(&*bs, tree_root, &key)? {
let node: Node = decode_from_store(&*bs, &node_cid)?;
if node.ntype == *label && node.props.get(prop) == Some(value) {
hits.push(build_hit(node, indexes.as_ref())?);
}
}
}
}
(Some(label), None, Some(idx)) => {
if let Some(tree_root) = idx.nodes_by_label.get(label) {
let cursor = Cursor::new(&*bs, tree_root)?;
for entry in cursor {
let (_k, node_cid) = entry?;
let node: Node = decode_from_store(&*bs, &node_cid)?;
hits.push(build_hit(node, indexes.as_ref())?);
if hits.len() >= cap {
break;
}
}
}
}
_ => {
let cursor = Cursor::new(&*bs, &commit.nodes)?;
for entry in cursor {
let (_k, node_cid) = entry?;
let node: Node = decode_from_store(&*bs, &node_cid)?;
if let Some(ref lbl) = self.label
&& &node.ntype != lbl
{
continue;
}
if let Some((ref prop, PropPredicate::Eq(ref value))) = self.prop_filter
&& node.props.get(prop) != Some(value)
{
continue;
}
hits.push(build_hit(node, indexes.as_ref())?);
if hits.len() >= cap {
break;
}
}
}
}
Ok(hits)
}
}