use std::collections::{HashMap, HashSet};
use std::fmt::Write;
use crate::{
metadata::{loader::MetadataLoader, tables::TableId},
utils::graph::IndexedGraph,
Error::GraphError,
Result,
};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub(crate) enum LoaderKey {
Table(TableId),
Special {
sequence: usize,
},
}
#[derive(Default)]
pub(crate) struct LoaderGraph<'a> {
loaders: HashMap<LoaderKey, &'a dyn MetadataLoader>,
dependents: HashMap<TableId, HashSet<LoaderKey>>,
dependencies: HashMap<LoaderKey, HashSet<TableId>>,
special_counter: usize,
}
impl<'a> LoaderGraph<'a> {
pub fn new() -> Self {
Self::default()
}
pub fn add_loader(&mut self, loader: &'a dyn MetadataLoader) {
let loader_key = if let Some(table_id) = loader.table_id() {
LoaderKey::Table(table_id)
} else {
let key = LoaderKey::Special {
sequence: self.special_counter,
};
self.special_counter += 1;
key
};
self.loaders.insert(loader_key.clone(), loader);
self.dependencies.entry(loader_key.clone()).or_default();
if let LoaderKey::Table(table_id) = loader_key {
self.dependents.entry(table_id).or_default();
}
}
pub fn build_relationships(&mut self) -> Result<()> {
self.dependencies
.values_mut()
.for_each(std::collections::HashSet::clear);
self.dependents
.values_mut()
.for_each(std::collections::HashSet::clear);
for (loader_key, loader) in &self.loaders {
for dep_table_id in loader.dependencies() {
let has_table_loader = self.loaders.keys().any(
|key| matches!(key, LoaderKey::Table(table_id) if table_id == dep_table_id),
);
if !has_table_loader {
return Err(GraphError(format!(
"Loader {loader_key:?} depends on table {dep_table_id:?}, but no loader for that table exists"
)));
}
self.dependencies
.get_mut(loader_key)
.ok_or_else(|| {
GraphError(format!(
"Internal error: loader {loader_key:?} not found in dependencies map"
))
})?
.insert(*dep_table_id);
self.dependents
.get_mut(dep_table_id)
.ok_or_else(|| {
GraphError(format!(
"Internal error: table {dep_table_id:?} not found in dependents map"
))
})?
.insert(loader_key.clone());
}
}
#[cfg(debug_assertions)]
{
self.check_circular_dependencies()?;
let _test = self.dump_execution_plan()?;
}
Ok(())
}
fn check_circular_dependencies(&self) -> Result<()> {
let graph = self.build_indexed_graph()?;
if graph.is_empty() {
return Ok(());
}
if let Some(cycle) = graph.find_any_cycle() {
if let Some(table_id) = cycle.first() {
return Err(GraphError(format!(
"Circular dependency detected involving table {table_id:?}"
)));
}
return Err(GraphError(
"Circular dependency detected in loader graph".to_string(),
));
}
Ok(())
}
fn build_indexed_graph(&self) -> Result<IndexedGraph<TableId, ()>> {
let mut graph: IndexedGraph<TableId, ()> = IndexedGraph::new();
for loader_key in self.loaders.keys() {
if let LoaderKey::Table(table_id) = loader_key {
graph.add_node(*table_id);
}
}
for deps in self.dependencies.values() {
for dep_table_id in deps {
graph.add_node(*dep_table_id);
}
}
for (loader_key, deps) in &self.dependencies {
if let LoaderKey::Table(source_table_id) = loader_key {
for dep_table_id in deps {
graph.add_edge(*source_table_id, *dep_table_id, ())?;
}
}
}
Ok(graph)
}
pub fn topological_levels(&self) -> Result<Vec<Vec<&'a dyn MetadataLoader>>> {
let mut execution_levels = Vec::new();
let mut unscheduled_loaders = self.loaders.keys().cloned().collect::<HashSet<_>>();
let mut satisfied_dependencies = HashSet::new();
while !unscheduled_loaders.is_empty() {
let ready_table_loaders =
self.find_ready_loaders(&unscheduled_loaders, &satisfied_dependencies, |key| {
matches!(key, LoaderKey::Table(_))
});
let mut current_level = Vec::new();
for loader_key in &ready_table_loaders {
if let Some(loader) = self.loaders.get(loader_key) {
current_level.push(*loader);
}
unscheduled_loaders.remove(loader_key);
if let LoaderKey::Table(table_id) = loader_key {
satisfied_dependencies.insert(*table_id);
}
}
let table_progress = !current_level.is_empty();
if table_progress {
execution_levels.push(current_level);
}
let ready_special_loaders =
self.find_ready_loaders(&unscheduled_loaders, &satisfied_dependencies, |key| {
matches!(key, LoaderKey::Special { .. })
});
let special_progress = !ready_special_loaders.is_empty();
if special_progress {
let mut special_level = Vec::new();
for loader_key in &ready_special_loaders {
if let Some(loader) = self.loaders.get(loader_key) {
special_level.push(*loader);
}
unscheduled_loaders.remove(loader_key);
}
execution_levels.push(special_level);
}
if !unscheduled_loaders.is_empty() && !table_progress && !special_progress {
return Err(GraphError(
"Unable to resolve dependency order, possible circular dependency".to_string(),
));
}
}
Ok(execution_levels)
}
fn find_ready_loaders<F>(
&self,
unscheduled: &HashSet<LoaderKey>,
satisfied: &HashSet<TableId>,
type_filter: F,
) -> Vec<LoaderKey>
where
F: Fn(&LoaderKey) -> bool,
{
unscheduled
.iter()
.filter(|loader_key| {
type_filter(loader_key)
&& self
.dependencies
.get(loader_key)
.is_none_or(|deps| deps.iter().all(|dep| satisfied.contains(dep)))
})
.cloned()
.collect()
}
pub fn dump_execution_plan(&self) -> Result<String> {
let levels = self.topological_levels()?;
let mut result = String::new();
for (level_idx, level) in levels.iter().enumerate() {
let _ = writeln!(result, "Level {level_idx}: [");
for loader in level {
let loader_key = self
.loaders
.iter()
.find(|(_, &l)| std::ptr::eq(*loader, l))
.map(|(key, _)| key)
.expect("Loader not found in graph");
let deps = self.dependencies.get(loader_key).map_or_else(
|| "None".to_string(),
|d| {
d.iter()
.map(|id| format!("{id:?}"))
.collect::<Vec<_>>()
.join(", ")
},
);
let _ = writeln!(result, " {loader_key:?} (depends on: {deps})");
}
let _ = writeln!(result, "]");
}
Ok(result)
}
}