use crate::error::{M1ndError, M1ndResult};
use crate::graph::Graph;
use crate::types::*;
use serde::Serialize;
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::time::Instant;
pub const DEFAULT_MAX_DEPTH: u8 = 15;
pub const DEFAULT_NUM_PARTICLES: u32 = 2;
pub const MAX_PARTICLES: u32 = 100;
pub const MAX_ACTIVE_PARTICLES: usize = 10_000;
pub const DEFAULT_MIN_EDGE_WEIGHT: f32 = 0.1;
pub const DEFAULT_TURBULENCE_THRESHOLD: f32 = 0.5;
pub const DEFAULT_LOCK_PATTERNS: &[&str] = &[
r"asyncio\.Lock",
r"threading\.Lock",
r"Mutex",
r"RwLock",
r"Semaphore",
r"asyncio\.Semaphore",
r"Lock\(\)",
r"\.acquire\(",
r"\.lock\(",
];
pub const DEFAULT_READ_ONLY_PATTERNS: &[&str] = &[
r"get_", r"read_", r"fetch_", r"list_", r"is_", r"has_", r"check_", r"count_", r"len\(",
r"\bGET\b", r"select ", r"SELECT ",
];
const ENTRY_POINT_PATTERNS: &[&str] = &[
"handle_",
"route_",
"api_",
"endpoint_",
"on_",
"cmd_",
"tick_",
"daemon_",
];
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)]
pub enum TurbulenceSeverity {
Critical,
High,
Medium,
Low,
}
#[derive(Clone, Debug, Serialize)]
pub struct TurbulencePoint {
pub node: NodeId,
pub node_label: String,
pub particle_count: u32,
pub has_lock: bool,
pub is_read_only: bool,
pub turbulence_score: f32,
pub severity: TurbulenceSeverity,
pub entry_pairs: Vec<(String, String)>,
pub nearest_upstream_lock: Option<String>,
pub paths: Vec<Vec<String>>,
}
#[derive(Clone, Debug, Serialize)]
pub struct ValvePoint {
pub node: NodeId,
pub node_label: String,
pub lock_type: String,
pub particles_serialized: u32,
pub downstream_protected: u32,
}
#[derive(Clone, Debug, Serialize)]
pub struct ProtectedNode {
pub node_label: String,
pub lock_ids: Vec<String>,
pub original_score: Option<f32>,
pub suppressed: bool,
}
#[derive(Clone, Debug, Serialize)]
pub struct FlowSummary {
pub total_entry_points: u32,
pub total_particles: u32,
pub total_nodes_visited: u32,
pub turbulence_count: u32,
pub valve_count: u32,
pub advisory_lock_protected_count: u32,
pub coverage_pct: f32,
pub elapsed_ms: f64,
}
#[derive(Clone, Debug, Serialize)]
pub struct FlowSimulationResult {
pub turbulence_points: Vec<TurbulencePoint>,
pub valve_points: Vec<ValvePoint>,
pub protected_nodes: Vec<ProtectedNode>,
pub summary: FlowSummary,
}
#[derive(Clone, Debug)]
pub struct FlowConfig {
pub lock_patterns: Vec<String>,
pub read_only_patterns: Vec<String>,
pub max_depth: u8,
pub turbulence_threshold: f32,
pub include_paths: bool,
pub max_particles: u32,
pub min_edge_weight: f32,
pub max_total_steps: usize,
pub scope_filter: Option<String>,
pub advisory_lock_protected_nodes: BTreeMap<String, Vec<String>>,
}
impl Default for FlowConfig {
fn default() -> Self {
Self {
lock_patterns: Vec::new(),
read_only_patterns: Vec::new(),
max_depth: DEFAULT_MAX_DEPTH,
turbulence_threshold: DEFAULT_TURBULENCE_THRESHOLD,
include_paths: true,
max_particles: MAX_PARTICLES,
min_edge_weight: DEFAULT_MIN_EDGE_WEIGHT,
max_total_steps: 50_000,
scope_filter: None,
advisory_lock_protected_nodes: BTreeMap::new(),
}
}
}
impl FlowConfig {
pub fn with_defaults() -> Self {
Self {
lock_patterns: DEFAULT_LOCK_PATTERNS
.iter()
.map(|s| s.to_string())
.collect(),
read_only_patterns: DEFAULT_READ_ONLY_PATTERNS
.iter()
.map(|s| s.to_string())
.collect(),
..Default::default()
}
}
pub fn with_patterns(lock_patterns: &[String], read_only_patterns: &[String]) -> Self {
Self {
lock_patterns: lock_patterns.to_vec(),
read_only_patterns: read_only_patterns.to_vec(),
..Default::default()
}
}
}
#[derive(Clone)]
struct Particle {
id: u32,
origin: NodeId,
path: Vec<NodeId>,
position: NodeId,
depth: u8,
serialized_by: Option<NodeId>,
visited: Vec<bool>,
}
struct NodeAccumulator {
arrivals: Vec<Vec<ParticleArrival>>,
}
#[derive(Clone)]
struct ParticleArrival {
origin: NodeId,
particle_id: u32,
serialized_by: Option<NodeId>,
path: Vec<NodeId>,
}
impl NodeAccumulator {
fn new(num_nodes: usize) -> Self {
Self {
arrivals: vec![Vec::new(); num_nodes],
}
}
#[inline]
fn record(&mut self, node: NodeId, arrival: ParticleArrival) {
let idx = node.as_usize();
if idx < self.arrivals.len() {
self.arrivals[idx].push(arrival);
}
}
fn flow_turbulent_nodes(&self) -> Vec<(NodeId, &Vec<ParticleArrival>)> {
self.arrivals
.iter()
.enumerate()
.filter_map(|(i, arrivals)| {
if arrivals.is_empty() {
return None;
}
let mut origins = BTreeSet::new();
for a in arrivals {
origins.insert(a.origin.0);
}
if origins.len() > 1 {
Some((NodeId::new(i as u32), arrivals))
} else {
None
}
})
.collect()
}
}
struct ValveTracker {
valves: BTreeMap<u32, (String, u32)>,
}
impl ValveTracker {
fn new() -> Self {
Self {
valves: BTreeMap::new(),
}
}
fn record_serialization(&mut self, node: NodeId, lock_type: &str) {
let entry = self
.valves
.entry(node.0)
.or_insert_with(|| (lock_type.to_string(), 0));
entry.1 += 1;
}
}
fn flow_matches_any_pattern(text: &str, patterns: &[String]) -> Option<String> {
let text_lower = text.to_lowercase();
for pat in patterns {
let clean = flow_clean_pattern(pat);
if text_lower.contains(&clean) {
return Some(pat.clone());
}
}
None
}
fn flow_clean_pattern(pat: &str) -> String {
pat.to_lowercase()
.replace(['\\', '^', '$'], "")
.replace("\\b", "")
}
fn flow_node_label(graph: &Graph, node: NodeId) -> String {
let idx = node.as_usize();
if idx < graph.nodes.count as usize {
graph.strings.resolve(graph.nodes.label[idx]).to_string()
} else {
format!("node_{}", node.0)
}
}
fn flow_node_text(graph: &Graph, node: NodeId) -> String {
let idx = node.as_usize();
if idx >= graph.nodes.count as usize {
return String::new();
}
let label = graph.strings.resolve(graph.nodes.label[idx]);
let excerpt = graph.nodes.provenance[idx]
.excerpt
.map(|e| graph.strings.resolve(e))
.unwrap_or("");
format!("{} {}", label, excerpt)
}
fn flow_is_valve(graph: &Graph, node: NodeId, config: &FlowConfig) -> Option<String> {
let text = flow_node_text(graph, node);
if let Some(pat) = flow_matches_any_pattern(&text, &config.lock_patterns) {
return Some(pat);
}
let idx = node.as_usize();
if idx < graph.nodes.count as usize {
let range = graph.csr.out_range(node);
for j in range {
let target = graph.csr.targets[j];
let target_text = flow_node_text(graph, target);
if let Some(pat) = flow_matches_any_pattern(&target_text, &config.lock_patterns) {
return Some(pat);
}
}
}
let text_lower = text.to_lowercase();
let heuristic_keywords = [
"lock",
"mutex",
"guard",
"semaphore",
"synchronize",
"serialize",
];
for kw in &heuristic_keywords {
if text_lower.contains(kw) {
return Some(format!("heuristic:{}", kw));
}
}
None
}
fn flow_is_read_only(graph: &Graph, node: NodeId, config: &FlowConfig) -> bool {
let text = flow_node_text(graph, node);
if flow_matches_any_pattern(&text, &config.read_only_patterns).is_none() {
return false;
}
let idx = node.as_usize();
if idx < graph.nodes.count as usize {
let range = graph.csr.out_range(node);
for j in range {
let target = graph.csr.targets[j];
let target_text = flow_node_text(graph, target).to_lowercase();
if target_text.contains("set_")
|| target_text.contains("write_")
|| target_text.contains("update_")
|| target_text.contains("delete_")
|| target_text.contains("insert_")
|| target_text.contains("put_")
|| target_text.contains("remove_")
|| target_text.contains("mutate")
{
return false;
}
}
}
true
}
fn flow_count_downstream(graph: &Graph, node: NodeId, max_depth: u8) -> u32 {
let n = graph.num_nodes() as usize;
let mut visited = vec![false; n];
let mut queue = VecDeque::new();
let idx = node.as_usize();
if idx >= n {
return 0;
}
visited[idx] = true;
queue.push_back((node, 0u8));
let mut count = 0u32;
while let Some((current, depth)) = queue.pop_front() {
if depth >= max_depth {
continue;
}
let range = graph.csr.out_range(current);
for j in range {
let target = graph.csr.targets[j];
let tidx = target.as_usize();
if tidx < n && !visited[tidx] {
visited[tidx] = true;
count += 1;
queue.push_back((target, depth + 1));
}
}
}
count
}
fn flow_find_nearest_upstream_lock(
graph: &Graph,
path: &[NodeId],
config: &FlowConfig,
) -> Option<NodeId> {
path.iter()
.rev()
.find(|&&node| flow_is_valve(graph, node, config).is_some())
.copied()
}
fn flow_in_degree(graph: &Graph, node: NodeId) -> u32 {
let idx = node.as_usize();
if idx >= graph.nodes.count as usize {
return 0;
}
let range = graph.csr.in_range(node);
(range.end - range.start) as u32
}
fn flow_max_in_degree(graph: &Graph) -> u32 {
let n = graph.num_nodes();
let mut max_deg = 1u32;
for i in 0..n {
let node = NodeId::new(i);
let deg = flow_in_degree(graph, node);
if deg > max_deg {
max_deg = deg;
}
}
max_deg
}
pub struct FlowEngine;
impl Default for FlowEngine {
fn default() -> Self {
Self::new()
}
}
impl FlowEngine {
pub fn new() -> Self {
Self
}
pub fn simulate(
&self,
graph: &Graph,
entry_nodes: &[NodeId],
num_particles: u32,
config: &FlowConfig,
) -> M1ndResult<FlowSimulationResult> {
let start = Instant::now();
let n = graph.num_nodes() as usize;
if n == 0 || entry_nodes.is_empty() {
return Err(M1ndError::NoEntryPoints);
}
let num_particles = num_particles.min(config.max_particles);
let max_depth = config.max_depth;
let edges = graph.num_edges() as f64;
let nodes = n as f64;
let density = if nodes > 0.0 { edges / nodes } else { 1.0 };
let budget_scale = if density > 10.0 {
(10.0 / density).max(0.1)
} else {
1.0
};
let effective_steps = ((config.max_total_steps as f64) * budget_scale) as usize;
let max_total_steps = effective_steps.max(1000);
let mut accumulator = NodeAccumulator::new(n);
let mut valve_tracker = ValveTracker::new();
let mut global_visited = vec![false; n];
let mut total_particles_spawned = 0u32;
let scope_allowed: Option<Vec<bool>> = config.scope_filter.as_ref().map(|filter| {
let filter_lower = filter.to_lowercase();
(0..n)
.map(|i| {
let label = graph.strings.resolve(graph.nodes.label[i]);
label.to_lowercase().contains(&filter_lower)
})
.collect()
});
let mut global_steps: usize = 0;
for &entry in entry_nodes {
let entry_idx = entry.as_usize();
if entry_idx >= n {
continue;
}
for p_idx in 0..num_particles {
total_particles_spawned += 1;
let pid = total_particles_spawned;
let mut visited = vec![false; n];
visited[entry_idx] = true;
global_visited[entry_idx] = true;
let mut queue: VecDeque<Particle> = VecDeque::new();
let initial = Particle {
id: pid,
origin: entry,
path: vec![entry],
position: entry,
depth: 0,
serialized_by: None,
visited,
};
accumulator.record(
entry,
ParticleArrival {
origin: entry,
particle_id: pid,
serialized_by: None,
path: vec![entry],
},
);
queue.push_back(initial);
let mut active_count = 1usize;
while let Some(particle) = queue.pop_front() {
if particle.depth >= max_depth {
continue;
}
let pos = particle.position;
let pos_idx = pos.as_usize();
if pos_idx >= n {
continue;
}
let mut serialized_by = particle.serialized_by;
if let Some(lock_type) = flow_is_valve(graph, pos, config) {
serialized_by = Some(pos);
valve_tracker.record_serialization(pos, &lock_type);
}
if global_steps >= max_total_steps {
break;
}
let range = graph.csr.out_range(pos);
for j in range {
global_steps += 1;
if global_steps >= max_total_steps {
break;
}
if graph.csr.inhibitory[j] {
continue;
}
let weight = graph.csr.read_weight(EdgeIdx::new(j as u32)).get();
if weight < config.min_edge_weight {
continue;
}
let target = graph.csr.targets[j];
let tidx = target.as_usize();
if tidx >= n {
continue;
}
if let Some(ref allowed) = scope_allowed {
if !allowed[tidx] {
continue;
}
}
if particle.visited[tidx] {
accumulator.record(
target,
ParticleArrival {
origin: entry,
particle_id: pid,
serialized_by,
path: if config.include_paths {
let mut p = particle.path.clone();
p.push(target);
p
} else {
Vec::new()
},
},
);
global_visited[tidx] = true;
continue;
}
if active_count >= MAX_ACTIVE_PARTICLES {
break;
}
let mut new_path = if config.include_paths {
let mut p = particle.path.clone();
p.push(target);
p
} else {
Vec::new()
};
accumulator.record(
target,
ParticleArrival {
origin: entry,
particle_id: pid,
serialized_by,
path: new_path.clone(),
},
);
global_visited[tidx] = true;
let mut child_visited = particle.visited.clone();
child_visited[tidx] = true;
let child = Particle {
id: pid,
origin: entry,
path: if config.include_paths {
new_path
} else {
Vec::new()
},
position: target,
depth: particle.depth + 1,
serialized_by,
visited: child_visited,
};
queue.push_back(child);
active_count += 1;
}
if active_count >= MAX_ACTIVE_PARTICLES {
break;
}
if global_steps >= max_total_steps {
break;
}
}
}
}
let max_in_deg = flow_max_in_degree(graph);
let turbulent = accumulator.flow_turbulent_nodes();
let mut turbulence_points = Vec::new();
let mut protected_nodes: Vec<ProtectedNode> = Vec::new();
for (node, arrivals) in &turbulent {
let node_label = flow_node_label(graph, *node);
let has_lock = flow_is_valve(graph, *node, config).is_some();
let is_read_only = flow_is_read_only(graph, *node, config);
let advisory_lock_ids = config
.advisory_lock_protected_nodes
.get(&node_label)
.cloned()
.unwrap_or_default();
let is_advisory_protected = !advisory_lock_ids.is_empty();
let mut origins_unserialized: BTreeSet<u32> = BTreeSet::new();
let mut all_origins: BTreeSet<u32> = BTreeSet::new();
for a in *arrivals {
all_origins.insert(a.origin.0);
if a.serialized_by.is_none() {
origins_unserialized.insert(a.origin.0);
}
}
let particle_count = if origins_unserialized.len() > 1 {
origins_unserialized.len() as u32
} else if all_origins.len() > 1 {
all_origins.len() as u32
} else {
continue;
};
let base_score = (particle_count as f32) / (num_particles as f32).max(1.0);
let base_score = base_score.min(1.0);
let nearest_lock = arrivals
.iter()
.find_map(|a| flow_find_nearest_upstream_lock(graph, &a.path, config));
let lock_factor = if has_lock {
0.0
} else if nearest_lock.is_some() {
0.3 } else {
1.0 };
let read_factor = if is_read_only { 0.2 } else { 1.0 };
let advisory_factor = if is_advisory_protected { 0.15 } else { 1.0 };
let in_deg = flow_in_degree(graph, *node);
let centrality_normalized = (in_deg as f32) / (max_in_deg as f32).max(1.0);
let centrality_factor = 0.5 + 0.5 * centrality_normalized;
let utility_factor = if is_read_only && centrality_normalized > 0.9 {
0.1
} else {
1.0
};
let pre_advisory_score =
base_score * lock_factor * read_factor * centrality_factor * utility_factor;
let turbulence_score = pre_advisory_score * advisory_factor;
if is_advisory_protected {
let suppressed = turbulence_score < config.turbulence_threshold
&& pre_advisory_score >= config.turbulence_threshold;
protected_nodes.push(ProtectedNode {
node_label: node_label.clone(),
lock_ids: advisory_lock_ids,
original_score: Some(pre_advisory_score),
suppressed,
});
}
if turbulence_score < config.turbulence_threshold {
continue;
}
let severity = if turbulence_score >= 0.8
&& particle_count >= 3
&& nearest_lock.is_none()
&& !has_lock
&& !is_advisory_protected
{
TurbulenceSeverity::Critical
} else if turbulence_score >= 0.6 {
TurbulenceSeverity::High
} else if turbulence_score >= 0.3 {
TurbulenceSeverity::Medium
} else {
TurbulenceSeverity::Low
};
let origin_list: Vec<u32> = all_origins.iter().copied().collect();
let mut entry_pairs = Vec::new();
for i in 0..origin_list.len() {
for j in (i + 1)..origin_list.len() {
let a_label = flow_node_label(graph, NodeId::new(origin_list[i]));
let b_label = flow_node_label(graph, NodeId::new(origin_list[j]));
entry_pairs.push((a_label, b_label));
}
}
let paths = if config.include_paths {
arrivals
.iter()
.filter(|a| !a.path.is_empty())
.map(|a| a.path.iter().map(|n| flow_node_label(graph, *n)).collect())
.collect()
} else {
Vec::new()
};
let nearest_upstream_lock_label = nearest_lock.map(|n| flow_node_label(graph, n));
turbulence_points.push(TurbulencePoint {
node: *node,
node_label,
particle_count,
has_lock,
is_read_only,
turbulence_score,
severity,
entry_pairs,
nearest_upstream_lock: nearest_upstream_lock_label,
paths,
});
}
turbulence_points.sort_by(|a, b| {
b.turbulence_score
.partial_cmp(&a.turbulence_score)
.unwrap_or(std::cmp::Ordering::Equal)
.then_with(|| a.node.0.cmp(&b.node.0))
});
let mut valve_points: Vec<ValvePoint> = valve_tracker
.valves
.iter()
.map(|(&node_id, (lock_type, serialized))| {
let node = NodeId::new(node_id);
let downstream = flow_count_downstream(graph, node, config.max_depth);
ValvePoint {
node,
node_label: flow_node_label(graph, node),
lock_type: lock_type.clone(),
particles_serialized: *serialized,
downstream_protected: downstream,
}
})
.collect();
valve_points.sort_by_key(|v| v.node.0);
let visited_count = global_visited.iter().filter(|&&v| v).count() as u32;
let coverage_pct = if n > 0 {
(visited_count as f32) / (n as f32)
} else {
0.0
};
let elapsed_ms = start.elapsed().as_secs_f64() * 1000.0;
protected_nodes.sort_by(|a, b| a.node_label.cmp(&b.node_label));
let advisory_lock_protected_count = protected_nodes.len() as u32;
Ok(FlowSimulationResult {
summary: FlowSummary {
total_entry_points: entry_nodes.len() as u32,
total_particles: total_particles_spawned,
total_nodes_visited: visited_count,
turbulence_count: turbulence_points.len() as u32,
valve_count: valve_points.len() as u32,
advisory_lock_protected_count,
coverage_pct,
elapsed_ms,
},
turbulence_points,
valve_points,
protected_nodes,
})
}
pub fn discover_entry_points(&self, graph: &Graph, max_entries: usize) -> Vec<NodeId> {
let n = graph.num_nodes();
if n == 0 {
return Vec::new();
}
let mut candidates: Vec<(NodeId, f32)> = Vec::new();
for i in 0..n {
let node = NodeId::new(i);
let idx = i as usize;
let nt = graph.nodes.node_type[idx];
let is_function = matches!(nt, NodeType::Function);
let label = graph.strings.resolve(graph.nodes.label[idx]).to_lowercase();
let matches_pattern = ENTRY_POINT_PATTERNS.iter().any(|p| label.contains(p));
if matches_pattern && is_function {
let priority = if graph.pagerank_computed {
graph.nodes.pagerank[idx].get()
} else {
let range = graph.csr.in_range(node);
(range.end - range.start) as f32
};
candidates.push((node, priority));
}
}
if candidates.is_empty() {
for i in 0..n {
let node = NodeId::new(i);
let idx = i as usize;
let nt = graph.nodes.node_type[idx];
if !matches!(nt, NodeType::File) {
continue;
}
let label = graph.strings.resolve(graph.nodes.label[idx]).to_lowercase();
let matches_pattern = ENTRY_POINT_PATTERNS.iter().any(|p| label.contains(p));
let is_entry_file = label.contains("main")
|| label.contains("app.")
|| label.contains("server")
|| label.contains("__init__");
if matches_pattern || is_entry_file {
let priority = if graph.pagerank_computed {
graph.nodes.pagerank[idx].get()
} else {
let range = graph.csr.in_range(node);
(range.end - range.start) as f32
};
candidates.push((node, priority));
}
}
}
candidates.sort_by(|a, b| {
b.1.partial_cmp(&a.1)
.unwrap_or(std::cmp::Ordering::Equal)
.then_with(|| a.0 .0.cmp(&b.0 .0))
});
let cap = max_entries.min(100);
candidates.truncate(cap);
candidates.into_iter().map(|(node, _)| node).collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::graph::Graph;
use crate::types::*;
fn two_node_graph(label_a: &str, label_b: &str, relation: &str) -> Graph {
let mut g = Graph::new();
g.add_node("a", label_a, NodeType::Function, &[], 1.0, 0.5)
.unwrap();
g.add_node("b", label_b, NodeType::Function, &[], 0.8, 0.3)
.unwrap();
g.add_edge(
NodeId::new(0),
NodeId::new(1),
relation,
FiniteF32::new(0.9),
EdgeDirection::Forward,
false,
FiniteF32::new(0.5),
)
.unwrap();
g.finalize().unwrap();
g
}
fn convergent_graph() -> Graph {
let mut g = Graph::new();
g.add_node("entry1", "handle_alpha", NodeType::Function, &[], 1.0, 0.5)
.unwrap(); g.add_node("entry2", "handle_beta", NodeType::Function, &[], 1.0, 0.5)
.unwrap(); g.add_node("shared", "shared_state", NodeType::Function, &[], 0.9, 0.4)
.unwrap(); g.add_node("sink", "output", NodeType::Function, &[], 0.5, 0.2)
.unwrap(); g.add_edge(
NodeId::new(0),
NodeId::new(2),
"calls",
FiniteF32::new(0.9),
EdgeDirection::Forward,
false,
FiniteF32::new(0.5),
)
.unwrap();
g.add_edge(
NodeId::new(1),
NodeId::new(2),
"calls",
FiniteF32::new(0.9),
EdgeDirection::Forward,
false,
FiniteF32::new(0.5),
)
.unwrap();
g.add_edge(
NodeId::new(2),
NodeId::new(3),
"calls",
FiniteF32::new(0.8),
EdgeDirection::Forward,
false,
FiniteF32::new(0.3),
)
.unwrap();
g.finalize().unwrap();
g
}
#[test]
fn empty_graph_returns_no_entry_points_error() {
let mut g = Graph::new();
g.finalize().unwrap();
let engine = FlowEngine::new();
let config = FlowConfig::default();
let result = engine.simulate(&g, &[], 2, &config);
assert!(matches!(
result,
Err(crate::error::M1ndError::NoEntryPoints)
));
}
#[test]
fn turbulence_detected_on_convergent_graph() {
let g = convergent_graph();
let engine = FlowEngine::new();
let mut config = FlowConfig::default();
config.turbulence_threshold = 0.0; config.lock_patterns = Vec::new();
config.read_only_patterns = Vec::new();
let entry_nodes = vec![NodeId::new(0), NodeId::new(1)];
let result = engine.simulate(&g, &entry_nodes, 1, &config).unwrap();
assert!(
result.summary.turbulence_count > 0,
"expected turbulence at convergence node, got 0"
);
}
#[test]
fn valve_detected_on_lock_node() {
let mut g = Graph::new();
g.add_node("ep", "handle_req", NodeType::Function, &[], 1.0, 0.5)
.unwrap();
g.add_node("lk", "mutex_guard", NodeType::Function, &[], 0.9, 0.4)
.unwrap();
g.add_node("wr", "write_state", NodeType::Function, &[], 0.8, 0.3)
.unwrap();
g.add_edge(
NodeId::new(0),
NodeId::new(1),
"calls",
FiniteF32::new(0.9),
EdgeDirection::Forward,
false,
FiniteF32::new(0.5),
)
.unwrap();
g.add_edge(
NodeId::new(1),
NodeId::new(2),
"calls",
FiniteF32::new(0.9),
EdgeDirection::Forward,
false,
FiniteF32::new(0.5),
)
.unwrap();
g.finalize().unwrap();
let engine = FlowEngine::new();
let mut config = FlowConfig::default();
config.lock_patterns = Vec::new();
config.read_only_patterns = Vec::new();
let result = engine.simulate(&g, &[NodeId::new(0)], 1, &config).unwrap();
assert!(
result.summary.valve_count > 0,
"expected a valve at mutex_guard node"
);
}
#[test]
fn max_depth_limits_propagation() {
let mut g = Graph::new();
for i in 0..5u32 {
g.add_node(
&format!("n{}", i),
&format!("node_{}", i),
NodeType::Function,
&[],
1.0,
0.5,
)
.unwrap();
}
for i in 0..4u32 {
g.add_edge(
NodeId::new(i),
NodeId::new(i + 1),
"calls",
FiniteF32::new(0.9),
EdgeDirection::Forward,
false,
FiniteF32::new(0.5),
)
.unwrap();
}
g.finalize().unwrap();
let engine = FlowEngine::new();
let mut config = FlowConfig::default();
config.max_depth = 2; config.lock_patterns = Vec::new();
config.read_only_patterns = Vec::new();
let result = engine.simulate(&g, &[NodeId::new(0)], 1, &config).unwrap();
assert!(
result.summary.total_nodes_visited <= 3,
"expected at most 3 nodes visited with max_depth=2, got {}",
result.summary.total_nodes_visited
);
}
#[test]
fn max_steps_budget_stops_simulation() {
let mut g = Graph::new();
for i in 0..10u32 {
g.add_node(
&format!("n{}", i),
&format!("fn_{}", i),
NodeType::Function,
&[],
1.0,
0.5,
)
.unwrap();
}
for i in 0..10u32 {
for j in 0..10u32 {
if i != j {
let _ = g.add_edge(
NodeId::new(i),
NodeId::new(j),
"calls",
FiniteF32::new(0.9),
EdgeDirection::Forward,
false,
FiniteF32::new(0.5),
);
}
}
}
g.finalize().unwrap();
let engine = FlowEngine::new();
let mut config = FlowConfig::default();
config.max_total_steps = 5; config.lock_patterns = Vec::new();
config.read_only_patterns = Vec::new();
let result = engine.simulate(&g, &[NodeId::new(0)], 1, &config);
assert!(
result.is_ok(),
"simulation should succeed even with tiny budget"
);
}
#[test]
fn scope_filter_restricts_visited_nodes() {
let mut g = Graph::new();
g.add_node("e", "entry_point", NodeType::Function, &[], 1.0, 0.5)
.unwrap();
g.add_node("a", "alpha_fn", NodeType::Function, &[], 0.9, 0.4)
.unwrap();
g.add_node("b", "beta_fn", NodeType::Function, &[], 0.8, 0.3)
.unwrap();
g.add_edge(
NodeId::new(0),
NodeId::new(1),
"calls",
FiniteF32::new(0.9),
EdgeDirection::Forward,
false,
FiniteF32::new(0.5),
)
.unwrap();
g.add_edge(
NodeId::new(1),
NodeId::new(2),
"calls",
FiniteF32::new(0.9),
EdgeDirection::Forward,
false,
FiniteF32::new(0.5),
)
.unwrap();
g.finalize().unwrap();
let engine = FlowEngine::new();
let mut config = FlowConfig::default();
config.scope_filter = Some("alpha".to_string()); config.lock_patterns = Vec::new();
config.read_only_patterns = Vec::new();
let result = engine.simulate(&g, &[NodeId::new(0)], 1, &config).unwrap();
assert!(
result.summary.total_nodes_visited <= 2,
"scope filter should restrict to at most entry + alpha, got {}",
result.summary.total_nodes_visited
);
}
#[test]
fn read_only_node_gets_reduced_turbulence() {
let mut g = Graph::new();
g.add_node("e1", "handle_alpha", NodeType::Function, &[], 1.0, 0.5)
.unwrap(); g.add_node("e2", "handle_beta", NodeType::Function, &[], 1.0, 0.5)
.unwrap(); g.add_node("ro", "get_state", NodeType::Function, &[], 0.9, 0.4)
.unwrap(); g.add_edge(
NodeId::new(0),
NodeId::new(2),
"calls",
FiniteF32::new(0.9),
EdgeDirection::Forward,
false,
FiniteF32::new(0.5),
)
.unwrap();
g.add_edge(
NodeId::new(1),
NodeId::new(2),
"calls",
FiniteF32::new(0.9),
EdgeDirection::Forward,
false,
FiniteF32::new(0.5),
)
.unwrap();
g.finalize().unwrap();
let engine = FlowEngine::new();
let mut config = FlowConfig::with_defaults();
config.turbulence_threshold = 0.0;
let result = engine
.simulate(&g, &[NodeId::new(0), NodeId::new(1)], 1, &config)
.unwrap();
for tp in &result.turbulence_points {
if tp.node_label.contains("get_state") {
assert!(
tp.turbulence_score <= 0.3,
"read-only node should have low turbulence score, got {}",
tp.turbulence_score
);
}
}
}
#[test]
fn auto_discover_finds_handle_functions() {
let mut g = Graph::new();
g.add_node("h1", "handle_request", NodeType::Function, &[], 1.0, 0.5)
.unwrap();
g.add_node("h2", "handle_event", NodeType::Function, &[], 0.9, 0.4)
.unwrap();
g.add_node("u", "utility_helper", NodeType::Function, &[], 0.5, 0.2)
.unwrap();
g.add_edge(
NodeId::new(0),
NodeId::new(2),
"calls",
FiniteF32::new(0.8),
EdgeDirection::Forward,
false,
FiniteF32::new(0.3),
)
.unwrap();
g.add_edge(
NodeId::new(1),
NodeId::new(2),
"calls",
FiniteF32::new(0.8),
EdgeDirection::Forward,
false,
FiniteF32::new(0.3),
)
.unwrap();
g.finalize().unwrap();
let engine = FlowEngine::new();
let entries = engine.discover_entry_points(&g, 10);
assert_eq!(
entries.len(),
2,
"expected 2 handle_ entry points, got {}",
entries.len()
);
}
#[test]
fn advisory_lock_suppresses_turbulence() {
let g = convergent_graph();
let engine = FlowEngine::new();
let mut config_no_lock = FlowConfig::default();
config_no_lock.turbulence_threshold = 0.0;
config_no_lock.lock_patterns = Vec::new();
config_no_lock.read_only_patterns = Vec::new();
let entry_nodes = vec![NodeId::new(0), NodeId::new(1)];
let result_no_lock = engine
.simulate(&g, &entry_nodes, 1, &config_no_lock)
.unwrap();
assert!(
result_no_lock.summary.turbulence_count > 0,
"baseline: expected turbulence without advisory lock"
);
assert!(result_no_lock.protected_nodes.is_empty());
assert_eq!(result_no_lock.summary.advisory_lock_protected_count, 0);
let mut config_locked = FlowConfig::default();
config_locked.turbulence_threshold = 0.0;
config_locked.lock_patterns = Vec::new();
config_locked.read_only_patterns = Vec::new();
config_locked.advisory_lock_protected_nodes.insert(
"shared_state".to_string(),
vec!["lock_jimi_001".to_string()],
);
let result_locked = engine
.simulate(&g, &entry_nodes, 1, &config_locked)
.unwrap();
assert_eq!(
result_locked.summary.advisory_lock_protected_count, 1,
"expected 1 protected node"
);
assert_eq!(result_locked.protected_nodes[0].node_label, "shared_state");
assert_eq!(
result_locked.protected_nodes[0].lock_ids,
vec!["lock_jimi_001"]
);
assert!(result_locked.protected_nodes[0].original_score.is_some());
let tp_no_lock = result_no_lock
.turbulence_points
.iter()
.find(|tp| tp.node_label == "shared_state");
let tp_locked = result_locked
.turbulence_points
.iter()
.find(|tp| tp.node_label == "shared_state");
if let (Some(no_lock), Some(locked)) = (tp_no_lock, tp_locked) {
assert!(
locked.turbulence_score < no_lock.turbulence_score,
"advisory lock should reduce turbulence score: {} should be < {}",
locked.turbulence_score,
no_lock.turbulence_score
);
let ratio = locked.turbulence_score / no_lock.turbulence_score;
assert!(
(ratio - 0.15).abs() < 0.01,
"expected ~0.15 ratio, got {}",
ratio
);
}
}
#[test]
fn advisory_lock_suppresses_below_threshold() {
let g = convergent_graph();
let engine = FlowEngine::new();
let mut config = FlowConfig::default();
config.turbulence_threshold = 0.3;
config.lock_patterns = Vec::new();
config.read_only_patterns = Vec::new();
config.advisory_lock_protected_nodes.insert(
"shared_state".to_string(),
vec!["lock_test_001".to_string()],
);
let entry_nodes = vec![NodeId::new(0), NodeId::new(1)];
let result = engine.simulate(&g, &entry_nodes, 1, &config).unwrap();
let protected = result
.protected_nodes
.iter()
.find(|p| p.node_label == "shared_state");
assert!(
protected.is_some(),
"shared_state should appear in protected_nodes"
);
let protected = protected.unwrap();
assert!(
protected.suppressed,
"shared_state should be marked as suppressed (score dropped below threshold)"
);
let in_turbulence = result
.turbulence_points
.iter()
.any(|tp| tp.node_label == "shared_state");
assert!(
!in_turbulence,
"suppressed node should not appear in turbulence_points"
);
}
#[test]
fn advisory_lock_does_not_affect_unprotected_nodes() {
let g = convergent_graph();
let engine = FlowEngine::new();
let mut config = FlowConfig::default();
config.turbulence_threshold = 0.0;
config.lock_patterns = Vec::new();
config.read_only_patterns = Vec::new();
config.advisory_lock_protected_nodes.insert(
"unrelated_node".to_string(),
vec!["lock_other_001".to_string()],
);
let entry_nodes = vec![NodeId::new(0), NodeId::new(1)];
let result = engine.simulate(&g, &entry_nodes, 1, &config).unwrap();
assert!(
result.protected_nodes.is_empty(),
"unrelated lock should not produce protected_nodes"
);
assert_eq!(result.summary.advisory_lock_protected_count, 0);
assert!(
result.summary.turbulence_count > 0,
"turbulence should still be detected for unprotected nodes"
);
}
#[test]
fn advisory_lock_multiple_lock_ids() {
let g = convergent_graph();
let engine = FlowEngine::new();
let mut config = FlowConfig::default();
config.turbulence_threshold = 0.0;
config.lock_patterns = Vec::new();
config.read_only_patterns = Vec::new();
config.advisory_lock_protected_nodes.insert(
"shared_state".to_string(),
vec!["lock_a_001".to_string(), "lock_b_002".to_string()],
);
let entry_nodes = vec![NodeId::new(0), NodeId::new(1)];
let result = engine.simulate(&g, &entry_nodes, 1, &config).unwrap();
let protected = result
.protected_nodes
.iter()
.find(|p| p.node_label == "shared_state")
.expect("shared_state should be in protected_nodes");
assert_eq!(protected.lock_ids.len(), 2);
assert!(protected.lock_ids.contains(&"lock_a_001".to_string()));
assert!(protected.lock_ids.contains(&"lock_b_002".to_string()));
}
#[test]
fn advisory_lock_prevents_critical_severity() {
let mut g = Graph::new();
for i in 0..4u32 {
g.add_node(
&format!("e{}", i),
&format!("handle_ep{}", i),
NodeType::Function,
&[],
1.0,
0.5,
)
.unwrap();
}
g.add_node(
"shared",
"shared_critical",
NodeType::Function,
&[],
0.9,
0.4,
)
.unwrap(); for i in 0..4u32 {
g.add_edge(
NodeId::new(i),
NodeId::new(4),
"calls",
FiniteF32::new(0.9),
EdgeDirection::Forward,
false,
FiniteF32::new(0.5),
)
.unwrap();
}
g.finalize().unwrap();
let engine = FlowEngine::new();
let entries: Vec<NodeId> = (0..4).map(NodeId::new).collect();
let mut config_no_lock = FlowConfig::default();
config_no_lock.turbulence_threshold = 0.0;
config_no_lock.lock_patterns = Vec::new();
config_no_lock.read_only_patterns = Vec::new();
let result_no = engine.simulate(&g, &entries, 1, &config_no_lock).unwrap();
let tp_no = result_no
.turbulence_points
.iter()
.find(|tp| tp.node_label == "shared_critical");
assert!(tp_no.is_some(), "should find turbulence without lock");
let mut config_locked = FlowConfig::default();
config_locked.turbulence_threshold = 0.0;
config_locked.lock_patterns = Vec::new();
config_locked.read_only_patterns = Vec::new();
config_locked.advisory_lock_protected_nodes.insert(
"shared_critical".to_string(),
vec!["lock_crit_001".to_string()],
);
let result_locked = engine.simulate(&g, &entries, 1, &config_locked).unwrap();
let tp_locked = result_locked
.turbulence_points
.iter()
.find(|tp| tp.node_label == "shared_critical");
if let Some(tp) = tp_locked {
assert_ne!(
tp.severity,
TurbulenceSeverity::Critical,
"advisory-locked node should not be Critical severity"
);
}
}
#[test]
fn empty_advisory_lock_map_backward_compatible() {
let g = convergent_graph();
let engine = FlowEngine::new();
let config = FlowConfig {
turbulence_threshold: 0.0,
lock_patterns: Vec::new(),
read_only_patterns: Vec::new(),
advisory_lock_protected_nodes: BTreeMap::new(),
..FlowConfig::default()
};
let entry_nodes = vec![NodeId::new(0), NodeId::new(1)];
let result = engine.simulate(&g, &entry_nodes, 1, &config).unwrap();
assert!(result.protected_nodes.is_empty());
assert_eq!(result.summary.advisory_lock_protected_count, 0);
assert!(result.summary.turbulence_count > 0);
}
}