use crate::organization::god_object::types::{ModuleSplit, StageType};
use crate::organization::type_based_clustering::{MethodSignature, TypeInfo};
use std::collections::{HashMap, HashSet, VecDeque};
pub struct DataFlowAnalyzer;
#[derive(Clone, Debug)]
pub struct TypeFlowGraph {
pub producers: HashMap<String, Vec<String>>,
pub consumers: HashMap<String, Vec<String>>,
pub edges: Vec<TypeFlowEdge>,
}
#[derive(Clone, Debug)]
pub struct TypeFlowEdge {
pub from_type: String,
pub to_type: String,
pub via_method: String,
pub transformation_type: TransformationType,
}
#[derive(Clone, Debug, PartialEq)]
pub enum TransformationType {
Direct,
Aggregation,
Decomposition,
Enrichment,
Expansion,
}
#[derive(Clone, Debug)]
pub struct PipelineStage {
pub stage_name: String,
pub input_types: Vec<String>,
pub output_types: Vec<String>,
pub methods: Vec<String>,
pub stage_type: StageType,
pub depth: usize,
}
impl DataFlowAnalyzer {
pub fn build_type_flow_graph(
&self,
signatures: &[MethodSignature],
_call_graph: &HashMap<String, Vec<String>>,
) -> TypeFlowGraph {
let mut graph = TypeFlowGraph {
producers: HashMap::new(),
consumers: HashMap::new(),
edges: Vec::new(),
};
for sig in signatures {
if let Some(ret_type) = &sig.return_type {
graph
.producers
.entry(ret_type.name.clone())
.or_default()
.push(sig.name.clone());
}
for param in &sig.param_types {
graph
.consumers
.entry(param.name.clone())
.or_default()
.push(sig.name.clone());
}
}
for sig in signatures {
if let Some(ret_type) = &sig.return_type {
for param in &sig.param_types {
let transformation_type = self.classify_transformation(
¶m.name,
&ret_type.name,
&sig.param_types,
ret_type,
);
graph.edges.push(TypeFlowEdge {
from_type: param.name.clone(),
to_type: ret_type.name.clone(),
via_method: sig.name.clone(),
transformation_type,
});
}
}
}
graph
}
fn classify_transformation(
&self,
_from_type: &str,
to_type: &str,
all_params: &[TypeInfo],
_ret_type: &TypeInfo,
) -> TransformationType {
if all_params.len() > 1 {
return TransformationType::Aggregation;
}
if to_type.starts_with("Result") || to_type.starts_with("Option") {
return TransformationType::Enrichment;
}
if to_type.starts_with("Vec") {
return TransformationType::Expansion;
}
if to_type.contains(',') || to_type.starts_with('(') {
return TransformationType::Decomposition;
}
TransformationType::Direct
}
pub fn detect_pipeline_stages(
&self,
graph: &TypeFlowGraph,
signatures: &[MethodSignature],
) -> Result<Vec<PipelineStage>, String> {
let source_types = self.find_source_types(graph);
let sink_types = self.find_sink_types(graph);
let type_depths = self.compute_type_depths(graph, &source_types)?;
let mut depth_groups: HashMap<usize, Vec<String>> = HashMap::new();
for sig in signatures {
if let Some(ret_type) = &sig.return_type {
let depth = type_depths.get(&ret_type.name).unwrap_or(&0);
depth_groups
.entry(*depth)
.or_default()
.push(sig.name.clone());
}
}
let mut stages = Vec::new();
for (depth, methods) in depth_groups {
let stage = self.create_stage_from_methods(
depth,
&methods,
signatures,
graph,
&source_types,
&sink_types,
);
stages.push(stage);
}
stages.sort_by_key(|s| s.depth);
Ok(stages)
}
fn find_source_types(&self, graph: &TypeFlowGraph) -> HashSet<String> {
graph
.producers
.keys()
.filter(|type_name| !graph.consumers.contains_key(*type_name))
.cloned()
.collect()
}
fn find_sink_types(&self, graph: &TypeFlowGraph) -> HashSet<String> {
graph
.consumers
.keys()
.filter(|type_name| !graph.producers.contains_key(*type_name))
.cloned()
.collect()
}
fn compute_type_depths(
&self,
graph: &TypeFlowGraph,
sources: &HashSet<String>,
) -> Result<HashMap<String, usize>, String> {
let sccs = self.find_strongly_connected_components(graph)?;
let scc_graph = self.build_scc_graph(graph, &sccs);
let mut depths = HashMap::new();
let mut scc_depths = HashMap::new();
let source_sccs: Vec<_> = sccs
.iter()
.enumerate()
.filter(|(_, component)| component.iter().any(|node| sources.contains(node)))
.map(|(idx, _)| idx)
.collect();
let mut queue: VecDeque<(usize, usize)> = source_sccs.iter().map(|&idx| (idx, 0)).collect();
while let Some((scc_idx, depth)) = queue.pop_front() {
if let Some(&existing_depth) = scc_depths.get(&scc_idx) {
if existing_depth <= depth {
continue;
}
}
scc_depths.insert(scc_idx, depth);
for node in &sccs[scc_idx] {
depths.insert(node.clone(), depth);
}
if let Some(successors) = scc_graph.get(&scc_idx) {
for &succ_idx in successors {
queue.push_back((succ_idx, depth + 1));
}
}
}
Ok(depths)
}
fn find_strongly_connected_components(
&self,
graph: &TypeFlowGraph,
) -> Result<Vec<Vec<String>>, String> {
let mut index = 0;
let mut stack = Vec::new();
let mut indices: HashMap<String, usize> = HashMap::new();
let mut lowlinks: HashMap<String, usize> = HashMap::new();
let mut on_stack: HashSet<String> = HashSet::new();
let mut sccs = Vec::new();
let mut nodes: HashSet<String> = HashSet::new();
for edge in &graph.edges {
nodes.insert(edge.from_type.clone());
nodes.insert(edge.to_type.clone());
}
for node in nodes {
if !indices.contains_key(&node) {
self.tarjan_visit(
&node,
graph,
&mut index,
&mut stack,
&mut indices,
&mut lowlinks,
&mut on_stack,
&mut sccs,
);
}
}
Ok(sccs)
}
#[allow(clippy::too_many_arguments, clippy::only_used_in_recursion)]
fn tarjan_visit(
&self,
node: &str,
graph: &TypeFlowGraph,
index: &mut usize,
stack: &mut Vec<String>,
indices: &mut HashMap<String, usize>,
lowlinks: &mut HashMap<String, usize>,
on_stack: &mut HashSet<String>,
sccs: &mut Vec<Vec<String>>,
) {
indices.insert(node.to_string(), *index);
lowlinks.insert(node.to_string(), *index);
*index += 1;
stack.push(node.to_string());
on_stack.insert(node.to_string());
for edge in &graph.edges {
if edge.from_type == node {
let successor = &edge.to_type;
if !indices.contains_key(successor) {
self.tarjan_visit(
successor, graph, index, stack, indices, lowlinks, on_stack, sccs,
);
let succ_lowlink = lowlinks[successor];
if let Some(node_lowlink) = lowlinks.get_mut(node) {
*node_lowlink = (*node_lowlink).min(succ_lowlink);
}
} else if on_stack.contains(successor) {
let succ_index = indices[successor];
if let Some(node_lowlink) = lowlinks.get_mut(node) {
*node_lowlink = (*node_lowlink).min(succ_index);
}
}
}
}
if lowlinks[node] == indices[node] {
let mut component = Vec::new();
loop {
let Some(w) = stack.pop() else {
break; };
on_stack.remove(&w);
component.push(w.clone());
if w == node {
break;
}
}
if !component.is_empty() {
sccs.push(component);
}
}
}
fn build_scc_graph(
&self,
graph: &TypeFlowGraph,
sccs: &[Vec<String>],
) -> HashMap<usize, Vec<usize>> {
let mut node_to_scc: HashMap<String, usize> = HashMap::new();
for (idx, component) in sccs.iter().enumerate() {
for node in component {
node_to_scc.insert(node.clone(), idx);
}
}
let mut scc_graph: HashMap<usize, Vec<usize>> = HashMap::new();
for edge in &graph.edges {
let from_scc = node_to_scc[&edge.from_type];
let to_scc = node_to_scc[&edge.to_type];
if from_scc != to_scc {
scc_graph.entry(from_scc).or_default().push(to_scc);
}
}
for edges in scc_graph.values_mut() {
edges.sort_unstable();
edges.dedup();
}
scc_graph
}
#[allow(clippy::too_many_arguments)]
fn create_stage_from_methods(
&self,
depth: usize,
methods: &[String],
signatures: &[MethodSignature],
graph: &TypeFlowGraph,
_sources: &HashSet<String>,
sinks: &HashSet<String>,
) -> PipelineStage {
let mut input_types = HashSet::new();
let mut output_types = HashSet::new();
for method_name in methods {
if let Some(sig) = signatures.iter().find(|s| s.name == *method_name) {
for param in &sig.param_types {
input_types.insert(param.name.clone());
}
if let Some(ret) = &sig.return_type {
output_types.insert(ret.name.clone());
}
}
}
let stage_type = if depth == 0 {
StageType::Source
} else if output_types.iter().any(|t| sinks.contains(t)) {
StageType::Sink
} else {
self.infer_stage_type(methods, signatures, graph)
};
let stage_name = self.suggest_stage_name(&stage_type, &output_types);
PipelineStage {
stage_name,
input_types: input_types.into_iter().collect(),
output_types: output_types.into_iter().collect(),
methods: methods.to_vec(),
stage_type,
depth,
}
}
fn infer_stage_type(
&self,
methods: &[String],
_signatures: &[MethodSignature],
graph: &TypeFlowGraph,
) -> StageType {
let transformations: Vec<_> = graph
.edges
.iter()
.filter(|e| methods.contains(&e.via_method))
.map(|e| &e.transformation_type)
.collect();
let validate_count = transformations
.iter()
.filter(|t| ***t == TransformationType::Enrichment)
.count();
let aggregate_count = transformations
.iter()
.filter(|t| ***t == TransformationType::Aggregation)
.count();
if validate_count > transformations.len() / 2 {
StageType::Validate
} else if aggregate_count > transformations.len() / 2 {
StageType::Aggregate
} else {
StageType::Transform
}
}
fn suggest_stage_name(&self, stage_type: &StageType, output_types: &HashSet<String>) -> String {
let domain_types: Vec<_> = output_types
.iter()
.filter(|t| !self.is_generic_type(t))
.collect();
let primary_type = domain_types
.iter()
.max_by_key(|t| {
let name = t.as_str();
let domain_bonus = if name.ends_with("Analysis")
|| name.ends_with("Metrics")
|| name.ends_with("Result")
|| name.ends_with("Data")
{
100
} else {
0
};
name.len() + domain_bonus
})
.map(|s| s.as_str())
.unwrap_or("Unknown");
let snake_case = self.to_snake_case(primary_type);
match stage_type {
StageType::Source => snake_case,
StageType::Transform => format!("{}_transform", snake_case),
StageType::Validate => format!("{}_validation", snake_case),
StageType::Aggregate => format!("{}_aggregation", snake_case),
StageType::Sink => format!("{}_output", snake_case),
}
}
fn is_generic_type(&self, type_name: &str) -> bool {
matches!(
type_name,
"String"
| "str"
| "Vec"
| "Option"
| "Result"
| "HashMap"
| "HashSet"
| "BTreeMap"
| "BTreeSet"
| "usize"
| "isize"
| "u32"
| "i32"
| "u64"
| "i64"
| "f32"
| "f64"
| "bool"
| "char"
)
}
fn to_snake_case(&self, s: &str) -> String {
let mut result = String::new();
for (i, ch) in s.chars().enumerate() {
if ch.is_uppercase() && i > 0 {
result.push('_');
}
result.push(ch.to_lowercase().next().unwrap_or(ch));
}
result
}
pub fn generate_pipeline_recommendations(
&self,
stages: &[PipelineStage],
base_name: &str,
) -> Vec<ModuleSplit> {
stages
.iter()
.map(|stage| {
let responsibility = self.describe_stage_responsibility(stage);
let module_name = format!("{}/{}", base_name, stage.stage_name);
ModuleSplit {
suggested_name: module_name,
responsibility,
methods_to_move: stage.methods.clone(),
data_flow_stage: Some(stage.stage_type.clone()),
pipeline_position: Some(stage.depth),
input_types: stage.input_types.clone(),
output_types: stage.output_types.clone(),
method_count: stage.methods.len(),
..Default::default()
}
})
.collect()
}
fn describe_stage_responsibility(&self, stage: &PipelineStage) -> String {
let inputs = stage.input_types.join(", ");
let outputs = stage.output_types.join(", ");
match stage.stage_type {
StageType::Source => format!(
"Source stage: Produce {} for downstream processing",
outputs
),
StageType::Transform => format!("Transform {} into {}", inputs, outputs),
StageType::Validate => format!("Validate and enrich {} into {}", inputs, outputs),
StageType::Aggregate => format!("Aggregate {} into {}", inputs, outputs),
StageType::Sink => format!("Sink stage: Consume {} for final output", inputs),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_type_flow_graph_construction() {
let signatures = vec![
MethodSignature {
name: "parse".to_string(),
param_types: vec![TypeInfo {
name: "String".to_string(),
..Default::default()
}],
return_type: Some(TypeInfo {
name: "ParsedData".to_string(),
..Default::default()
}),
self_type: None,
},
MethodSignature {
name: "validate".to_string(),
param_types: vec![TypeInfo {
name: "ParsedData".to_string(),
..Default::default()
}],
return_type: Some(TypeInfo {
name: "Result".to_string(),
..Default::default()
}),
self_type: None,
},
];
let analyzer = DataFlowAnalyzer;
let graph = analyzer.build_type_flow_graph(&signatures, &HashMap::new());
assert_eq!(graph.producers.get("ParsedData").unwrap(), &vec!["parse"]);
assert_eq!(
graph.consumers.get("ParsedData").unwrap(),
&vec!["validate"]
);
assert_eq!(graph.edges.len(), 2);
}
#[test]
fn test_transformation_classification() {
let analyzer = DataFlowAnalyzer;
let result_transform = analyzer.classify_transformation(
"Data",
"Result",
&[],
&TypeInfo {
name: "Result".to_string(),
..Default::default()
},
);
assert_eq!(result_transform, TransformationType::Enrichment);
let vec_transform = analyzer.classify_transformation(
"Item",
"Vec",
&[],
&TypeInfo {
name: "Vec".to_string(),
..Default::default()
},
);
assert_eq!(vec_transform, TransformationType::Expansion);
}
#[test]
fn test_pipeline_stage_detection() {
let signatures = vec![
MethodSignature {
name: "parse".to_string(),
param_types: vec![TypeInfo {
name: "String".to_string(),
..Default::default()
}],
return_type: Some(TypeInfo {
name: "ParsedData".to_string(),
..Default::default()
}),
self_type: None,
},
MethodSignature {
name: "validate".to_string(),
param_types: vec![TypeInfo {
name: "ParsedData".to_string(),
..Default::default()
}],
return_type: Some(TypeInfo {
name: "Result".to_string(),
..Default::default()
}),
self_type: None,
},
];
let analyzer = DataFlowAnalyzer;
let graph = analyzer.build_type_flow_graph(&signatures, &HashMap::new());
let stages = analyzer
.detect_pipeline_stages(&graph, &signatures)
.unwrap();
assert!(!stages.is_empty());
assert_eq!(stages[0].depth, 0);
}
}