extern crate alloc;
use alloc::{
string::{String, ToString},
vec::Vec,
};
#[derive(Clone, Debug)]
#[cfg_attr(feature = "std", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "std", serde(rename_all = "snake_case"))]
pub enum RecordOrigin {
Source,
Link { protocol: String, address: String },
Transform { input: String },
TransformJoin { inputs: Vec<String> },
Passive,
}
#[derive(Clone, Debug)]
#[cfg_attr(feature = "std", derive(serde::Serialize))]
pub struct GraphNode {
pub key: String,
pub origin: RecordOrigin,
pub buffer_type: String,
pub buffer_capacity: Option<usize>,
pub tap_count: usize,
pub has_outbound_link: bool,
}
#[derive(Clone, Debug)]
#[cfg_attr(feature = "std", derive(serde::Serialize))]
pub struct GraphEdge {
pub from: Option<String>,
pub to: Option<String>,
pub edge_type: EdgeType,
}
#[derive(Clone, Debug)]
#[cfg_attr(feature = "std", derive(serde::Serialize))]
#[cfg_attr(feature = "std", serde(rename_all = "snake_case"))]
pub enum EdgeType {
Source,
Link { protocol: String },
Transform,
TransformJoin,
Tap { index: usize },
LinkOut { protocol: String },
}
#[derive(Clone, Debug)]
pub struct RecordGraphInfo {
pub key: String,
pub origin: RecordOrigin,
pub buffer_type: String,
pub buffer_capacity: Option<usize>,
pub tap_count: usize,
pub has_outbound_link: bool,
}
#[derive(Clone, Debug)]
pub struct DependencyGraph {
pub nodes: Vec<GraphNode>,
pub edges: Vec<GraphEdge>,
pub topo_order: Vec<String>,
}
impl DependencyGraph {
pub fn build_and_validate(record_infos: &[RecordGraphInfo]) -> crate::DbResult<Self> {
use alloc::collections::VecDeque;
let all_keys: hashbrown::HashSet<&str> =
record_infos.iter().map(|info| info.key.as_str()).collect();
let mut transform_inputs: Vec<(&str, Vec<&str>)> = Vec::new();
for info in record_infos {
match &info.origin {
RecordOrigin::Transform { input } => {
transform_inputs.push((info.key.as_str(), alloc::vec![input.as_str()]));
}
RecordOrigin::TransformJoin { inputs } => {
let input_refs: Vec<&str> = inputs.iter().map(|s| s.as_str()).collect();
transform_inputs.push((info.key.as_str(), input_refs));
}
_ => {}
}
}
#[allow(unused_variables)] for (output_key, input_keys) in &transform_inputs {
for input_key in input_keys {
if !all_keys.contains(input_key) {
#[cfg(feature = "std")]
return Err(crate::DbError::TransformInputNotFound {
output_key: output_key.to_string(),
input_key: input_key.to_string(),
});
#[cfg(not(feature = "std"))]
return Err(crate::DbError::TransformInputNotFound {
_output_key: (),
_input_key: (),
});
}
}
}
let mut in_degree: hashbrown::HashMap<&str, usize> = hashbrown::HashMap::new();
let mut adjacency: hashbrown::HashMap<&str, Vec<&str>> = hashbrown::HashMap::new();
for key in &all_keys {
in_degree.entry(*key).or_insert(0);
adjacency.entry(*key).or_default();
}
for (output_key, input_keys) in &transform_inputs {
for input_key in input_keys {
adjacency.entry(*input_key).or_default().push(*output_key);
*in_degree.entry(*output_key).or_insert(0) += 1;
}
}
let mut queue: VecDeque<&str> = in_degree
.iter()
.filter(|(_, °)| deg == 0)
.map(|(&node, _)| node)
.collect();
let mut topo_order = Vec::new();
while let Some(node) = queue.pop_front() {
topo_order.push(node.to_string());
if let Some(neighbors) = adjacency.get(node) {
for &neighbor in neighbors {
if let Some(deg) = in_degree.get_mut(neighbor) {
*deg -= 1;
if *deg == 0 {
queue.push_back(neighbor);
}
}
}
}
}
if topo_order.len() != all_keys.len() {
#[cfg(feature = "std")]
{
let cycle_records: Vec<String> = in_degree
.iter()
.filter(|(_, °)| deg > 0)
.map(|(&k, _)| k.to_string())
.collect();
return Err(crate::DbError::CyclicDependency {
records: cycle_records,
});
}
#[cfg(not(feature = "std"))]
return Err(crate::DbError::CyclicDependency { _records: () });
}
let nodes: Vec<GraphNode> = record_infos
.iter()
.map(|info| GraphNode {
key: info.key.clone(),
origin: info.origin.clone(),
buffer_type: info.buffer_type.clone(),
buffer_capacity: info.buffer_capacity,
tap_count: info.tap_count,
has_outbound_link: info.has_outbound_link,
})
.collect();
let mut edges: Vec<GraphEdge> = Vec::new();
for info in record_infos {
match &info.origin {
RecordOrigin::Source => {
edges.push(GraphEdge {
from: None,
to: Some(info.key.clone()),
edge_type: EdgeType::Source,
});
}
RecordOrigin::Link { protocol, .. } => {
edges.push(GraphEdge {
from: None,
to: Some(info.key.clone()),
edge_type: EdgeType::Link {
protocol: protocol.clone(),
},
});
}
RecordOrigin::Transform { input } => {
edges.push(GraphEdge {
from: Some(input.clone()),
to: Some(info.key.clone()),
edge_type: EdgeType::Transform,
});
}
RecordOrigin::TransformJoin { inputs } => {
for input in inputs {
edges.push(GraphEdge {
from: Some(input.clone()),
to: Some(info.key.clone()),
edge_type: EdgeType::TransformJoin,
});
}
}
RecordOrigin::Passive => {
}
}
if info.has_outbound_link {
edges.push(GraphEdge {
from: Some(info.key.clone()),
to: None,
edge_type: EdgeType::LinkOut {
protocol: "unknown".to_string(), },
});
}
for tap_idx in 0..info.tap_count {
edges.push(GraphEdge {
from: Some(info.key.clone()),
to: None,
edge_type: EdgeType::Tap { index: tap_idx },
});
}
}
Ok(DependencyGraph {
nodes,
edges,
topo_order,
})
}
pub fn node(&self, key: &str) -> Option<&GraphNode> {
self.nodes.iter().find(|n| n.key == key)
}
pub fn nodes(&self) -> &[GraphNode] {
&self.nodes
}
pub fn edges(&self) -> &[GraphEdge] {
&self.edges
}
pub fn topo_order(&self) -> &[String] {
&self.topo_order
}
}