use crate::{
Aggregate, Aggregator, Meta, Metadata, MetadataFilter, Node, NodeNotInStoreSnafu, NodeStore,
};
use nalgebra::DMatrix;
use snafu::{ResultExt, Snafu};
use std::collections::{BTreeMap, BTreeSet};
use uuid::Uuid;
#[derive(Clone, Debug, Snafu)]
#[snafu(visibility(pub(crate)))]
pub enum EdgeStoreError {
NodeStore { source: crate::node::NodeStoreError },
MissingNodeStore,
}
#[derive(Clone, Debug, PartialEq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct Edge {
pub metadata: Metadata,
pub source: Uuid,
pub target: Uuid,
}
impl Edge {
pub fn new(metadata: Option<Metadata>, source: Uuid, target: Uuid) -> Self {
Edge {
metadata: metadata.unwrap_or_default(),
source: source.to_owned(),
target: target.to_owned(),
}
}
pub fn new_and_nodes(metadata: Option<Metadata>) -> (Self, Node, Node) {
let source = Node::default();
let target = Node::default();
(
Edge::new(metadata, source.id().to_owned(), target.id().to_owned()),
source,
target,
)
}
}
impl Meta for Edge {
fn get_meta(&self) -> &Metadata {
&self.metadata
}
}
#[derive(Clone, Debug, Default, PartialEq)]
#[cfg_attr(
feature = "serde",
derive(serde::Serialize, serde::Deserialize),
serde(default)
)]
pub struct EdgeStoreData {
edges: BTreeMap<Uuid, Edge>,
forward: BTreeMap<Uuid, BTreeMap<Uuid, BTreeSet<Uuid>>>,
reverse: BTreeMap<Uuid, BTreeMap<Uuid, BTreeSet<Uuid>>>,
aggregate: Aggregate,
}
impl HasEdgeStore for EdgeStoreData {
fn edge_store(&self) -> &EdgeStoreData {
self
}
fn edge_store_mut(&mut self) -> &mut EdgeStoreData {
self
}
}
impl EdgeStore for EdgeStoreData {}
pub trait HasEdgeStore {
fn edge_store(&self) -> &EdgeStoreData;
fn edge_store_mut(&mut self) -> &mut EdgeStoreData;
}
pub trait EdgeStore: HasEdgeStore {
fn edges_len(&self) -> usize {
self.edge_store().edges.len()
}
fn is_edges_empty(&self) -> bool {
self.edges_len() == 0
}
fn add_edge<N: NodeStore>(
&mut self,
edge: Edge,
safe: bool,
nodes: Option<&N>,
) -> Result<Option<Edge>, EdgeStoreError> {
if safe {
match nodes {
None => MissingNodeStoreSnafu.fail(),
Some(nodes) => self.check_edge(&edge, nodes),
}?;
}
let id = edge.id().to_owned();
let replaced = self.del_edge(&id);
let source = edge.source.to_owned();
let target = edge.target.to_owned();
let store = self.edge_store_mut();
store.aggregate.add(edge.get_meta());
store.edges.insert(id.to_owned(), edge);
store
.forward
.entry(source.to_owned())
.or_default()
.entry(target.to_owned())
.or_default()
.insert(id.to_owned());
store
.reverse
.entry(target)
.or_default()
.entry(source)
.or_default()
.insert(id);
Ok(replaced)
}
fn extend_edges<N: NodeStore>(
&mut self,
edges: Vec<Edge>,
safe: bool,
nodes: Option<&N>,
) -> Result<Vec<Edge>, EdgeStoreError> {
let mut replaced = vec![];
let store = self.edge_store_mut();
for edge in edges.into_iter() {
if let Some(edge) = store.add_edge(edge, safe, nodes)? {
replaced.push(edge);
}
}
Ok(replaced)
}
fn check_edge<N: NodeStore>(&self, edge: &Edge, nodes: &N) -> Result<(), EdgeStoreError> {
if !nodes.has_node(&edge.source) {
return NodeNotInStoreSnafu { id: edge.source }
.fail()
.with_context(|_| NodeStoreSnafu);
}
if !nodes.has_node(&edge.target) {
return NodeNotInStoreSnafu { id: edge.target }
.fail()
.with_context(|_| NodeStoreSnafu);
}
Ok(())
}
fn del_edge(&mut self, id: &Uuid) -> Option<Edge> {
let store = self.edge_store_mut();
if let Some(edge) = store.edges.remove(id) {
store.aggregate.subtract(edge.get_meta());
let id = edge.id();
let source = edge.source;
let target = edge.target;
if let Some(tgt_map) = store.forward.get_mut(&source) {
if let Some(edge_ids) = tgt_map.get_mut(&target) {
edge_ids.remove(id);
if edge_ids.is_empty() {
tgt_map.remove(&target);
}
}
if tgt_map.is_empty() {
store.forward.remove(&source);
}
}
if let Some(src_map) = store.reverse.get_mut(&target) {
if let Some(edge_ids) = src_map.get_mut(&source) {
edge_ids.remove(id);
if edge_ids.is_empty() {
src_map.remove(&source);
}
}
if src_map.is_empty() {
store.reverse.remove(&target);
}
}
return Some(edge);
}
None
}
fn get_edge(&self, id: &Uuid) -> Option<&Edge> {
self.edge_store().edges.get(id)
}
fn all_edges(&self) -> impl Iterator<Item = &Edge> {
self.edge_store().edges.values()
}
fn edge_ids_between(&self, source: &Uuid, target: &Uuid) -> impl Iterator<Item = Uuid> {
self.edge_store()
.forward
.get(source)
.and_then(|tgt_map| tgt_map.get(target))
.map(|ids| ids.iter().copied())
.unwrap_or_default()
}
fn edges_between(&self, source: &Uuid, target: &Uuid) -> impl Iterator<Item = &Edge> {
self.edge_ids_between(source, target)
.filter_map(|id| self.get_edge(&id))
}
fn edge_ids_between_all(
&self,
sources: &BTreeSet<Uuid>,
targets: &BTreeSet<Uuid>,
) -> impl Iterator<Item = Uuid> {
sources
.iter()
.flat_map(|s| targets.iter().map(|t| self.edge_ids_between(s, t)))
.flatten()
}
fn edges_between_all(
&self,
sources: &BTreeSet<Uuid>,
targets: &BTreeSet<Uuid>,
) -> impl Iterator<Item = &Edge> {
self.edge_ids_between_all(sources, targets)
.filter_map(|id| self.get_edge(&id))
}
fn aggregate_between(
&self,
source: &Uuid,
target: &Uuid,
filter: Option<&MetadataFilter>,
) -> Aggregate {
let mut agg = Aggregate::default();
for edge in self.edges_between(source, target) {
if let Some(filter) = filter {
if !filter.satisfies(edge) {
continue;
}
}
agg.add(edge.get_meta());
}
agg
}
fn aggregate_between_all(
&self,
sources: &BTreeSet<Uuid>,
targets: &BTreeSet<Uuid>,
filter: Option<&MetadataFilter>,
) -> Aggregate {
let mut agg = Aggregate::default();
for source in sources.iter() {
for target in targets.iter() {
agg.extend(self.aggregate_between(source, target, filter));
}
}
agg
}
fn aggregate_value_between(
&self,
source: &Uuid,
target: &Uuid,
aggregator: &Aggregator,
filter: Option<&MetadataFilter>,
fields: Option<&BTreeSet<String>>,
absolute: bool,
) -> f64 {
let agg = self.aggregate_between(source, target, filter);
agg.sum(aggregator, fields, absolute)
}
fn aggregate_map(
&self,
sources: &BTreeSet<Uuid>,
targets: &BTreeSet<Uuid>,
filter: Option<&MetadataFilter>,
) -> BTreeMap<Uuid, BTreeMap<Uuid, Aggregate>> {
let mut map = BTreeMap::new();
for source in sources.iter() {
for target in targets.iter() {
let agg = self.aggregate_between(source, target, filter);
map.entry(source.to_owned())
.or_insert(BTreeMap::<Uuid, Aggregate>::new())
.insert(target.to_owned(), agg);
}
}
map
}
fn aggregate_matrix(
&self,
sources: &[&Uuid],
targets: &[&Uuid],
filter: Option<&MetadataFilter>,
) -> Vec<Vec<Aggregate>> {
targets
.iter()
.map(|&target| {
sources
.iter()
.map(|&source| self.aggregate_between(source, target, filter))
.collect()
})
.collect()
}
fn outgoing_ids_from(&self, source: &Uuid) -> impl Iterator<Item = Uuid> {
self.edge_store()
.forward
.get(source)
.into_iter()
.flat_map(|tgt_map| tgt_map.values())
.flatten()
.copied()
}
fn outgoing_edges_from(&self, source: &Uuid) -> impl Iterator<Item = &Edge> {
self.outgoing_ids_from(source)
.filter_map(|id| self.get_edge(&id))
}
fn incoming_ids_to(&self, target: &Uuid) -> impl Iterator<Item = Uuid> {
self.edge_store()
.reverse
.get(target)
.into_iter()
.flat_map(|src_map| src_map.values())
.flatten()
.copied()
}
fn incoming_edges_to(&self, target: &Uuid) -> impl Iterator<Item = &Edge> {
self.incoming_ids_to(target)
.filter_map(|id| self.get_edge(&id))
}
fn targets_of(&self, source: &Uuid) -> impl Iterator<Item = Uuid> {
self.edge_store()
.forward
.get(source)
.into_iter()
.flat_map(|tgt_map| {
tgt_map.iter().filter_map(|(target, edges)| {
if edges.is_empty() {
None
} else {
Some(*target)
}
})
})
}
fn sources_to(&self, target: &Uuid) -> impl Iterator<Item = Uuid> {
self.edge_store()
.reverse
.get(target)
.into_iter()
.flat_map(|src_map| {
src_map.iter().filter_map(|(source, edges)| {
if edges.is_empty() {
None
} else {
Some(*source)
}
})
})
}
fn is_connected_to(
&self,
node: &Uuid,
others: &BTreeSet<Uuid>,
edge_ids: Option<&BTreeSet<Uuid>>,
) -> bool {
match edge_ids {
None => {
self.targets_of(node).any(|id| others.contains(&id))
|| self.sources_to(node).any(|id| others.contains(&id))
}
Some(edge_ids) => self
.edge_ids_between_all(&BTreeSet::from([node.to_owned()]), others)
.any(|edge_id| edge_ids.contains(&edge_id)),
}
}
fn adjacency_matrix(
&self,
sources: &Vec<&Uuid>,
targets: &Vec<&Uuid>,
aggregator: &Aggregator,
filter: Option<&MetadataFilter>,
fields: Option<&BTreeSet<String>>,
absolute: bool,
) -> DMatrix<f64> {
let mut matrix = DMatrix::<f64>::zeros(targets.len(), sources.len());
for (col, &source) in sources.iter().enumerate() {
for (row, &target) in targets.iter().enumerate() {
matrix[(row, col)] = self
.aggregate_value_between(source, target, aggregator, filter, fields, absolute)
}
}
matrix
}
fn edge_aggregate(&self) -> &Aggregate {
&self.edge_store().aggregate
}
}