Skip to main content

m1nd_core/
flow.rs

1// === m1nd-core/src/flow.rs ===
2//
3// Concurrent flow simulation for race condition detection.
4// Particles traverse the graph; collisions on shared mutable state
5// without synchronization are flagged as turbulence points.
6
7use crate::error::{M1ndError, M1ndResult};
8use crate::graph::Graph;
9use crate::types::*;
10use serde::Serialize;
11use std::collections::{BTreeMap, BTreeSet, VecDeque};
12use std::time::Instant;
13
14// ── Constants ──
15
16/// Default maximum traversal depth per particle.
17pub const DEFAULT_MAX_DEPTH: u8 = 15;
18/// Default number of particles spawned per entry point.
19pub const DEFAULT_NUM_PARTICLES: u32 = 2;
20/// Hard cap on total particles spawned across all entry points.
21pub const MAX_PARTICLES: u32 = 100;
22/// Hard cap on active in-flight particles per entry point to prevent memory blowup.
23pub const MAX_ACTIVE_PARTICLES: usize = 10_000;
24/// Default minimum edge weight — edges below this are skipped as noise.
25pub const DEFAULT_MIN_EDGE_WEIGHT: f32 = 0.1;
26/// Default turbulence score threshold — points below this are suppressed.
27pub const DEFAULT_TURBULENCE_THRESHOLD: f32 = 0.5;
28
29/// Default lock/synchronization patterns for valve detection (substring match).
30pub const DEFAULT_LOCK_PATTERNS: &[&str] = &[
31    r"asyncio\.Lock",
32    r"threading\.Lock",
33    r"Mutex",
34    r"RwLock",
35    r"Semaphore",
36    r"asyncio\.Semaphore",
37    r"Lock\(\)",
38    r"\.acquire\(",
39    r"\.lock\(",
40];
41
42/// Default read-only access patterns — nodes matching these are exempt from turbulence scoring.
43pub const DEFAULT_READ_ONLY_PATTERNS: &[&str] = &[
44    r"get_", r"read_", r"fetch_", r"list_", r"is_", r"has_", r"check_", r"count_", r"len\(",
45    r"\bGET\b", r"select ", r"SELECT ",
46];
47
48/// Entry point auto-discovery patterns (matched case-insensitively against node labels).
49const ENTRY_POINT_PATTERNS: &[&str] = &[
50    "handle_",
51    "route_",
52    "api_",
53    "endpoint_",
54    "on_",
55    "cmd_",
56    "tick_",
57    "daemon_",
58];
59
60// ── Core Types ──
61
62/// Severity of a detected turbulence point.
63#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)]
64pub enum TurbulenceSeverity {
65    /// Score >= 0.8, >=3 unsynchronized particles, no lock protection.
66    Critical,
67    /// Score >= 0.6.
68    High,
69    /// Score >= 0.3.
70    Medium,
71    /// Score above threshold but below 0.3.
72    Low,
73}
74
75/// A node reached by particles from more than one concurrent entry point.
76///
77/// Represents a potential race condition on shared mutable state.
78#[derive(Clone, Debug, Serialize)]
79pub struct TurbulencePoint {
80    /// Graph node ID of the turbulence point.
81    pub node: NodeId,
82    /// Human-readable label of the node.
83    pub node_label: String,
84    /// Number of distinct unsynchronized concurrent paths reaching this node.
85    pub particle_count: u32,
86    /// Whether this node itself is a lock/synchronization point.
87    pub has_lock: bool,
88    /// Whether this node was classified as read-only access.
89    pub is_read_only: bool,
90    /// Composite turbulence score in [0.0, 1.0].
91    pub turbulence_score: f32,
92    /// Severity classification.
93    pub severity: TurbulenceSeverity,
94    /// Pairs of entry point labels that both reached this node.
95    pub entry_pairs: Vec<(String, String)>,
96    /// Label of the nearest upstream lock in any particle's path, if any.
97    pub nearest_upstream_lock: Option<String>,
98    /// Paths taken by each particle to reach this node (if `include_paths`).
99    pub paths: Vec<Vec<String>>,
100}
101
102/// A lock/synchronization node that serializes concurrent particle paths.
103#[derive(Clone, Debug, Serialize)]
104pub struct ValvePoint {
105    /// Graph node ID of the valve.
106    pub node: NodeId,
107    /// Human-readable label of the valve node.
108    pub node_label: String,
109    /// Matched lock pattern string (e.g. "heuristic:mutex").
110    pub lock_type: String,
111    /// Number of particle arrivals serialized by this valve.
112    pub particles_serialized: u32,
113    /// BFS count of nodes downstream of the valve (depth-limited).
114    pub downstream_protected: u32,
115}
116
117/// Aggregate statistics for a flow simulation run.
118#[derive(Clone, Debug, Serialize)]
119pub struct FlowSummary {
120    /// Number of distinct entry points used.
121    pub total_entry_points: u32,
122    /// Total particles spawned across all entry points.
123    pub total_particles: u32,
124    /// Total distinct graph nodes visited.
125    pub total_nodes_visited: u32,
126    /// Number of turbulence points above threshold.
127    pub turbulence_count: u32,
128    /// Number of valve (lock) points detected.
129    pub valve_count: u32,
130    /// Fraction of graph nodes visited in [0.0, 1.0].
131    pub coverage_pct: f32,
132    /// Wall-clock time in milliseconds.
133    pub elapsed_ms: f64,
134}
135
136/// Complete result of a flow simulation.
137#[derive(Clone, Debug, Serialize)]
138pub struct FlowSimulationResult {
139    /// Turbulence points sorted by score descending.
140    pub turbulence_points: Vec<TurbulencePoint>,
141    /// Valve points sorted by node ID ascending.
142    pub valve_points: Vec<ValvePoint>,
143    /// Aggregate statistics.
144    pub summary: FlowSummary,
145}
146
147/// Configuration for the flow simulation engine.
148#[derive(Clone, Debug)]
149pub struct FlowConfig {
150    /// Patterns used to identify lock/synchronization nodes.
151    pub lock_patterns: Vec<String>,
152    /// Patterns used to identify read-only access nodes.
153    pub read_only_patterns: Vec<String>,
154    /// Maximum BFS depth per particle.
155    pub max_depth: u8,
156    /// Minimum turbulence score to include in results.
157    pub turbulence_threshold: f32,
158    /// Whether to record full path traces per particle.
159    pub include_paths: bool,
160    /// Maximum particles per entry point (capped at `MAX_PARTICLES`).
161    pub max_particles: u32,
162    /// Minimum edge weight to traverse — edges below this are skipped.
163    pub min_edge_weight: f32,
164    /// Global step budget across all particles (default 50000).
165    pub max_total_steps: usize,
166    /// Optional substring filter to limit particle scope to matching node labels.
167    pub scope_filter: Option<String>,
168}
169
170impl Default for FlowConfig {
171    fn default() -> Self {
172        Self {
173            lock_patterns: Vec::new(),
174            read_only_patterns: Vec::new(),
175            max_depth: DEFAULT_MAX_DEPTH,
176            turbulence_threshold: DEFAULT_TURBULENCE_THRESHOLD,
177            include_paths: true,
178            max_particles: MAX_PARTICLES,
179            min_edge_weight: DEFAULT_MIN_EDGE_WEIGHT,
180            max_total_steps: 50_000,
181            scope_filter: None,
182        }
183    }
184}
185
186impl FlowConfig {
187    /// Create config with default lock and read-only pattern strings.
188    pub fn with_defaults() -> Self {
189        Self {
190            lock_patterns: DEFAULT_LOCK_PATTERNS
191                .iter()
192                .map(|s| s.to_string())
193                .collect(),
194            read_only_patterns: DEFAULT_READ_ONLY_PATTERNS
195                .iter()
196                .map(|s| s.to_string())
197                .collect(),
198            ..Default::default()
199        }
200    }
201
202    /// Create config with user-provided pattern strings.
203    pub fn with_patterns(lock_patterns: &[String], read_only_patterns: &[String]) -> Self {
204        Self {
205            lock_patterns: lock_patterns.to_vec(),
206            read_only_patterns: read_only_patterns.to_vec(),
207            ..Default::default()
208        }
209    }
210}
211
212// ── Internal particle state ──
213
214/// A single particle flowing through the graph during simulation.
215#[derive(Clone)]
216struct Particle {
217    /// Unique particle ID within this simulation.
218    id: u32,
219    /// Entry point where this particle was spawned.
220    origin: NodeId,
221    /// Path taken so far (ordered node sequence).
222    path: Vec<NodeId>,
223    /// Current position in the graph.
224    position: NodeId,
225    /// Current depth from entry point.
226    depth: u8,
227    /// If serialized by a valve, which node.
228    serialized_by: Option<NodeId>,
229    /// Visited nodes (cycle detection). Vec<bool> for O(1) lookup.
230    visited: Vec<bool>,
231}
232
233/// Per-node arrival record: which particles from which origins arrived.
234struct NodeAccumulator {
235    /// Indexed by node id. Each entry: Vec of (origin NodeId, particle_id, serialized_by, path).
236    arrivals: Vec<Vec<ParticleArrival>>,
237}
238
239#[derive(Clone)]
240struct ParticleArrival {
241    origin: NodeId,
242    particle_id: u32,
243    serialized_by: Option<NodeId>,
244    path: Vec<NodeId>,
245}
246
247impl NodeAccumulator {
248    fn new(num_nodes: usize) -> Self {
249        Self {
250            arrivals: vec![Vec::new(); num_nodes],
251        }
252    }
253
254    #[inline]
255    fn record(&mut self, node: NodeId, arrival: ParticleArrival) {
256        let idx = node.as_usize();
257        if idx < self.arrivals.len() {
258            self.arrivals[idx].push(arrival);
259        }
260    }
261
262    /// Returns nodes with arrivals from >1 distinct origin.
263    fn flow_turbulent_nodes(&self) -> Vec<(NodeId, &Vec<ParticleArrival>)> {
264        self.arrivals
265            .iter()
266            .enumerate()
267            .filter_map(|(i, arrivals)| {
268                if arrivals.is_empty() {
269                    return None;
270                }
271                let mut origins = BTreeSet::new();
272                for a in arrivals {
273                    origins.insert(a.origin.0);
274                }
275                if origins.len() > 1 {
276                    Some((NodeId::new(i as u32), arrivals))
277                } else {
278                    None
279                }
280            })
281            .collect()
282    }
283}
284
285// ── Valve tracking ──
286
287struct ValveTracker {
288    /// node_id -> (matched pattern string, particles serialized count)
289    valves: BTreeMap<u32, (String, u32)>,
290}
291
292impl ValveTracker {
293    fn new() -> Self {
294        Self {
295            valves: BTreeMap::new(),
296        }
297    }
298
299    fn record_serialization(&mut self, node: NodeId, lock_type: &str) {
300        let entry = self
301            .valves
302            .entry(node.0)
303            .or_insert_with(|| (lock_type.to_string(), 0));
304        entry.1 += 1;
305    }
306}
307
308// ── Helper functions ──
309
310/// Check if a node label or excerpt matches any pattern (case-insensitive substring match).
311/// Patterns use simple substring matching -- regex metacharacters are stripped for matching.
312fn flow_matches_any_pattern(text: &str, patterns: &[String]) -> Option<String> {
313    let text_lower = text.to_lowercase();
314    for pat in patterns {
315        // Strip common regex metacharacters for substring matching since we don't have regex crate.
316        let clean = flow_clean_pattern(pat);
317        if text_lower.contains(&clean) {
318            return Some(pat.clone());
319        }
320    }
321    None
322}
323
324/// Clean a pattern string for simple substring matching:
325/// remove regex metacharacters like \, ^, $, etc. and lowercase.
326fn flow_clean_pattern(pat: &str) -> String {
327    pat.to_lowercase()
328        .replace(['\\', '^', '$'], "")
329        .replace("\\b", "")
330}
331
332/// Resolve a node label to a string.
333fn flow_node_label(graph: &Graph, node: NodeId) -> String {
334    let idx = node.as_usize();
335    if idx < graph.nodes.count as usize {
336        graph.strings.resolve(graph.nodes.label[idx]).to_string()
337    } else {
338        format!("node_{}", node.0)
339    }
340}
341
342/// Get the node label + excerpt combined text for pattern matching.
343fn flow_node_text(graph: &Graph, node: NodeId) -> String {
344    let idx = node.as_usize();
345    if idx >= graph.nodes.count as usize {
346        return String::new();
347    }
348    let label = graph.strings.resolve(graph.nodes.label[idx]);
349    let excerpt = graph.nodes.provenance[idx]
350        .excerpt
351        .map(|e| graph.strings.resolve(e))
352        .unwrap_or("");
353    format!("{} {}", label, excerpt)
354}
355
356/// Check if a node is a valve (lock/synchronization point).
357/// Returns the matched pattern string if it is.
358fn flow_is_valve(graph: &Graph, node: NodeId, config: &FlowConfig) -> Option<String> {
359    let text = flow_node_text(graph, node);
360    if let Some(pat) = flow_matches_any_pattern(&text, &config.lock_patterns) {
361        return Some(pat);
362    }
363
364    // Also check: does node have an outgoing edge to a node matching lock patterns?
365    let idx = node.as_usize();
366    if idx < graph.nodes.count as usize {
367        let range = graph.csr.out_range(node);
368        for j in range {
369            let target = graph.csr.targets[j];
370            let target_text = flow_node_text(graph, target);
371            if let Some(pat) = flow_matches_any_pattern(&target_text, &config.lock_patterns) {
372                return Some(pat);
373            }
374        }
375    }
376
377    // Heuristic fallback: case-insensitive keyword check
378    let text_lower = text.to_lowercase();
379    let heuristic_keywords = [
380        "lock",
381        "mutex",
382        "guard",
383        "semaphore",
384        "synchronize",
385        "serialize",
386    ];
387    for kw in &heuristic_keywords {
388        if text_lower.contains(kw) {
389            return Some(format!("heuristic:{}", kw));
390        }
391    }
392
393    None
394}
395
396/// Check if a node represents read-only access.
397fn flow_is_read_only(graph: &Graph, node: NodeId, config: &FlowConfig) -> bool {
398    let text = flow_node_text(graph, node);
399    if flow_matches_any_pattern(&text, &config.read_only_patterns).is_none() {
400        return false;
401    }
402
403    // Override: if downstream nodes look like writers, not read-only.
404    let idx = node.as_usize();
405    if idx < graph.nodes.count as usize {
406        let range = graph.csr.out_range(node);
407        for j in range {
408            let target = graph.csr.targets[j];
409            let target_text = flow_node_text(graph, target).to_lowercase();
410            // Check for write-like downstream nodes
411            if target_text.contains("set_")
412                || target_text.contains("write_")
413                || target_text.contains("update_")
414                || target_text.contains("delete_")
415                || target_text.contains("insert_")
416                || target_text.contains("put_")
417                || target_text.contains("remove_")
418                || target_text.contains("mutate")
419            {
420                return false;
421            }
422        }
423    }
424
425    true
426}
427
428/// Count downstream reachable nodes from a given node via BFS.
429fn flow_count_downstream(graph: &Graph, node: NodeId, max_depth: u8) -> u32 {
430    let n = graph.num_nodes() as usize;
431    let mut visited = vec![false; n];
432    let mut queue = VecDeque::new();
433    let idx = node.as_usize();
434    if idx >= n {
435        return 0;
436    }
437    visited[idx] = true;
438    queue.push_back((node, 0u8));
439    let mut count = 0u32;
440
441    while let Some((current, depth)) = queue.pop_front() {
442        if depth >= max_depth {
443            continue;
444        }
445        let range = graph.csr.out_range(current);
446        for j in range {
447            let target = graph.csr.targets[j];
448            let tidx = target.as_usize();
449            if tidx < n && !visited[tidx] {
450                visited[tidx] = true;
451                count += 1;
452                queue.push_back((target, depth + 1));
453            }
454        }
455    }
456
457    count
458}
459
460/// Find nearest upstream lock in a particle's path.
461fn flow_find_nearest_upstream_lock(
462    graph: &Graph,
463    path: &[NodeId],
464    config: &FlowConfig,
465) -> Option<NodeId> {
466    path.iter()
467        .rev()
468        .find(|&&node| flow_is_valve(graph, node, config).is_some())
469        .copied()
470}
471
472/// Compute in-degree for a node (used as centrality proxy per D-12).
473fn flow_in_degree(graph: &Graph, node: NodeId) -> u32 {
474    let idx = node.as_usize();
475    if idx >= graph.nodes.count as usize {
476        return 0;
477    }
478    let range = graph.csr.in_range(node);
479    (range.end - range.start) as u32
480}
481
482/// Max in-degree across all nodes (for normalization).
483fn flow_max_in_degree(graph: &Graph) -> u32 {
484    let n = graph.num_nodes();
485    let mut max_deg = 1u32;
486    for i in 0..n {
487        let node = NodeId::new(i);
488        let deg = flow_in_degree(graph, node);
489        if deg > max_deg {
490            max_deg = deg;
491        }
492    }
493    max_deg
494}
495
496// ── Engine ──
497
498/// Concurrent flow simulation engine for race condition detection.
499///
500/// Spawns particles at entry points and propagates them through the graph.
501/// Nodes reached by particles from more than one distinct entry point are
502/// flagged as turbulence points — potential race conditions on shared mutable state.
503pub struct FlowEngine;
504
505impl Default for FlowEngine {
506    fn default() -> Self {
507        Self::new()
508    }
509}
510
511impl FlowEngine {
512    /// Create a new `FlowEngine`.
513    pub fn new() -> Self {
514        Self
515    }
516
517    /// Run flow simulation. Main entry point for the MCP tool.
518    ///
519    /// # Parameters
520    /// - `graph`: finalized graph to simulate on
521    /// - `entry_nodes`: starting positions for particles (at least one required)
522    /// - `num_particles`: particles spawned per entry point (capped by `config.max_particles`)
523    /// - `config`: simulation parameters
524    ///
525    /// # Errors
526    /// Returns `M1ndError::NoEntryPoints` if `entry_nodes` is empty or graph has no nodes.
527    pub fn simulate(
528        &self,
529        graph: &Graph,
530        entry_nodes: &[NodeId],
531        num_particles: u32,
532        config: &FlowConfig,
533    ) -> M1ndResult<FlowSimulationResult> {
534        let start = Instant::now();
535        let n = graph.num_nodes() as usize;
536
537        // EC-8: empty graph or no entry points
538        if n == 0 || entry_nodes.is_empty() {
539            return Err(M1ndError::NoEntryPoints);
540        }
541
542        let num_particles = num_particles.min(config.max_particles);
543        let max_depth = config.max_depth;
544
545        // Auto-scale step budget for dense graphs to prevent latency explosion.
546        // Dense graph = high edges/nodes ratio. Scale budget down proportionally.
547        let edges = graph.num_edges() as f64;
548        let nodes = n as f64;
549        let density = if nodes > 0.0 { edges / nodes } else { 1.0 };
550        let budget_scale = if density > 10.0 {
551            // Very dense: reduce budget to prevent >100ms queries
552            (10.0 / density).max(0.1)
553        } else {
554            1.0
555        };
556        let effective_steps = ((config.max_total_steps as f64) * budget_scale) as usize;
557        let max_total_steps = effective_steps.max(1000); // floor at 1000
558
559        // Accumulator: track particle arrivals per node
560        let mut accumulator = NodeAccumulator::new(n);
561        // Valve tracker
562        let mut valve_tracker = ValveTracker::new();
563        // Global visited set for coverage tracking
564        let mut global_visited = vec![false; n];
565        // Total particle counter
566        let mut total_particles_spawned = 0u32;
567
568        // Pre-compute scope filter: which nodes are in scope
569        let scope_allowed: Option<Vec<bool>> = config.scope_filter.as_ref().map(|filter| {
570            let filter_lower = filter.to_lowercase();
571            (0..n)
572                .map(|i| {
573                    let label = graph.strings.resolve(graph.nodes.label[i]);
574                    label.to_lowercase().contains(&filter_lower)
575                })
576                .collect()
577        });
578
579        let mut global_steps: usize = 0;
580
581        // 1. Spawn particles at entry points and propagate via BFS
582        for &entry in entry_nodes {
583            let entry_idx = entry.as_usize();
584            if entry_idx >= n {
585                continue;
586            }
587
588            for p_idx in 0..num_particles {
589                total_particles_spawned += 1;
590                let pid = total_particles_spawned;
591
592                let mut visited = vec![false; n];
593                visited[entry_idx] = true;
594                global_visited[entry_idx] = true;
595
596                // BFS queue: (particle state snapshot for each active front)
597                let mut queue: VecDeque<Particle> = VecDeque::new();
598                let initial = Particle {
599                    id: pid,
600                    origin: entry,
601                    path: vec![entry],
602                    position: entry,
603                    depth: 0,
604                    serialized_by: None,
605                    visited,
606                };
607
608                // Record arrival at entry
609                accumulator.record(
610                    entry,
611                    ParticleArrival {
612                        origin: entry,
613                        particle_id: pid,
614                        serialized_by: None,
615                        path: vec![entry],
616                    },
617                );
618
619                queue.push_back(initial);
620
621                let mut active_count = 1usize;
622
623                while let Some(particle) = queue.pop_front() {
624                    if particle.depth >= max_depth {
625                        continue;
626                    }
627
628                    let pos = particle.position;
629                    let pos_idx = pos.as_usize();
630                    if pos_idx >= n {
631                        continue;
632                    }
633
634                    // Check if this node is a valve
635                    let mut serialized_by = particle.serialized_by;
636                    if let Some(lock_type) = flow_is_valve(graph, pos, config) {
637                        serialized_by = Some(pos);
638                        valve_tracker.record_serialization(pos, &lock_type);
639                    }
640
641                    // Check global step budget
642                    if global_steps >= max_total_steps {
643                        break;
644                    }
645
646                    // Propagate along outgoing causal edges
647                    let range = graph.csr.out_range(pos);
648                    for j in range {
649                        global_steps += 1;
650                        if global_steps >= max_total_steps {
651                            break;
652                        }
653
654                        // Skip inhibitory edges
655                        if graph.csr.inhibitory[j] {
656                            continue;
657                        }
658
659                        // Skip low-weight edges (noise)
660                        let weight = graph.csr.read_weight(EdgeIdx::new(j as u32)).get();
661                        if weight < config.min_edge_weight {
662                            continue;
663                        }
664
665                        let target = graph.csr.targets[j];
666                        let tidx = target.as_usize();
667                        if tidx >= n {
668                            continue;
669                        }
670
671                        // Scope filter: skip nodes not matching the scope
672                        if let Some(ref allowed) = scope_allowed {
673                            if !allowed[tidx] {
674                                continue;
675                            }
676                        }
677
678                        // EC-1: cycle detection per particle
679                        if particle.visited[tidx] {
680                            // Record arrival but don't propagate further
681                            accumulator.record(
682                                target,
683                                ParticleArrival {
684                                    origin: entry,
685                                    particle_id: pid,
686                                    serialized_by,
687                                    path: if config.include_paths {
688                                        let mut p = particle.path.clone();
689                                        p.push(target);
690                                        p
691                                    } else {
692                                        Vec::new()
693                                    },
694                                },
695                            );
696                            global_visited[tidx] = true;
697                            continue;
698                        }
699
700                        // FM-FLOW-011: cap active particles
701                        if active_count >= MAX_ACTIVE_PARTICLES {
702                            break;
703                        }
704
705                        let mut new_path = if config.include_paths {
706                            let mut p = particle.path.clone();
707                            p.push(target);
708                            p
709                        } else {
710                            Vec::new()
711                        };
712
713                        // Record arrival
714                        accumulator.record(
715                            target,
716                            ParticleArrival {
717                                origin: entry,
718                                particle_id: pid,
719                                serialized_by,
720                                path: new_path.clone(),
721                            },
722                        );
723
724                        global_visited[tidx] = true;
725
726                        // Create child particle
727                        let mut child_visited = particle.visited.clone();
728                        child_visited[tidx] = true;
729
730                        let child = Particle {
731                            id: pid,
732                            origin: entry,
733                            path: if config.include_paths {
734                                new_path
735                            } else {
736                                Vec::new()
737                            },
738                            position: target,
739                            depth: particle.depth + 1,
740                            serialized_by,
741                            visited: child_visited,
742                        };
743
744                        queue.push_back(child);
745                        active_count += 1;
746                    }
747
748                    // FM-FLOW-011: if too many active, stop spawning
749                    if active_count >= MAX_ACTIVE_PARTICLES {
750                        break;
751                    }
752                    // Global step budget exceeded
753                    if global_steps >= max_total_steps {
754                        break;
755                    }
756                }
757            }
758        }
759
760        // 2. Compute max in-degree for centrality normalization
761        let max_in_deg = flow_max_in_degree(graph);
762
763        // 3. Identify turbulence points (nodes with arrivals from >1 distinct origin)
764        let turbulent = accumulator.flow_turbulent_nodes();
765        let mut turbulence_points = Vec::new();
766
767        for (node, arrivals) in &turbulent {
768            let node_label = flow_node_label(graph, *node);
769            let has_lock = flow_is_valve(graph, *node, config).is_some();
770            let is_read_only = flow_is_read_only(graph, *node, config);
771
772            // Count unserialized particles from distinct origins
773            let mut origins_unserialized: BTreeSet<u32> = BTreeSet::new();
774            let mut all_origins: BTreeSet<u32> = BTreeSet::new();
775            for a in *arrivals {
776                all_origins.insert(a.origin.0);
777                if a.serialized_by.is_none() {
778                    origins_unserialized.insert(a.origin.0);
779                }
780            }
781
782            // Effective particle count: only unserialized particles from distinct origins
783            let particle_count = if origins_unserialized.len() > 1 {
784                origins_unserialized.len() as u32
785            } else if all_origins.len() > 1 {
786                all_origins.len() as u32
787            } else {
788                continue;
789            };
790
791            // Turbulence scoring per PRD F5
792            let base_score = (particle_count as f32) / (num_particles as f32).max(1.0);
793            let base_score = base_score.min(1.0);
794
795            // Find nearest upstream lock from any particle's path
796            let nearest_lock = arrivals
797                .iter()
798                .find_map(|a| flow_find_nearest_upstream_lock(graph, &a.path, config));
799
800            let lock_factor = if has_lock {
801                0.0
802            } else if nearest_lock.is_some() {
803                0.3 // partially protected
804            } else {
805                1.0 // no protection
806            };
807
808            let read_factor = if is_read_only { 0.2 } else { 1.0 };
809
810            // Centrality factor: use in-degree as proxy (D-12)
811            let in_deg = flow_in_degree(graph, *node);
812            let centrality_normalized = (in_deg as f32) / (max_in_deg as f32).max(1.0);
813            let centrality_factor = 0.5 + 0.5 * centrality_normalized;
814
815            // EC-10: high fan-out utility nodes get score reduction
816            let utility_factor = if is_read_only && centrality_normalized > 0.9 {
817                0.1
818            } else {
819                1.0
820            };
821
822            let turbulence_score =
823                base_score * lock_factor * read_factor * centrality_factor * utility_factor;
824
825            if turbulence_score < config.turbulence_threshold {
826                continue;
827            }
828
829            // Severity mapping per PRD F5
830            let severity = if turbulence_score >= 0.8
831                && particle_count >= 3
832                && nearest_lock.is_none()
833                && !has_lock
834            {
835                TurbulenceSeverity::Critical
836            } else if turbulence_score >= 0.6 {
837                TurbulenceSeverity::High
838            } else if turbulence_score >= 0.3 {
839                TurbulenceSeverity::Medium
840            } else {
841                TurbulenceSeverity::Low
842            };
843
844            // Entry point pair attribution (F6)
845            let origin_list: Vec<u32> = all_origins.iter().copied().collect();
846            let mut entry_pairs = Vec::new();
847            for i in 0..origin_list.len() {
848                for j in (i + 1)..origin_list.len() {
849                    let a_label = flow_node_label(graph, NodeId::new(origin_list[i]));
850                    let b_label = flow_node_label(graph, NodeId::new(origin_list[j]));
851                    entry_pairs.push((a_label, b_label));
852                }
853            }
854
855            // Collect paths if requested
856            let paths = if config.include_paths {
857                arrivals
858                    .iter()
859                    .filter(|a| !a.path.is_empty())
860                    .map(|a| a.path.iter().map(|n| flow_node_label(graph, *n)).collect())
861                    .collect()
862            } else {
863                Vec::new()
864            };
865
866            let nearest_upstream_lock_label = nearest_lock.map(|n| flow_node_label(graph, n));
867
868            turbulence_points.push(TurbulencePoint {
869                node: *node,
870                node_label,
871                particle_count,
872                has_lock,
873                is_read_only,
874                turbulence_score,
875                severity,
876                entry_pairs,
877                nearest_upstream_lock: nearest_upstream_lock_label,
878                paths,
879            });
880        }
881
882        // Sort by turbulence score descending (deterministic: FM-FLOW-012)
883        turbulence_points.sort_by(|a, b| {
884            b.turbulence_score
885                .partial_cmp(&a.turbulence_score)
886                .unwrap_or(std::cmp::Ordering::Equal)
887                .then_with(|| a.node.0.cmp(&b.node.0))
888        });
889
890        // 4. Build valve points (F7)
891        let mut valve_points: Vec<ValvePoint> = valve_tracker
892            .valves
893            .iter()
894            .map(|(&node_id, (lock_type, serialized))| {
895                let node = NodeId::new(node_id);
896                let downstream = flow_count_downstream(graph, node, config.max_depth);
897                ValvePoint {
898                    node,
899                    node_label: flow_node_label(graph, node),
900                    lock_type: lock_type.clone(),
901                    particles_serialized: *serialized,
902                    downstream_protected: downstream,
903                }
904            })
905            .collect();
906
907        // Sort valves deterministically by node id
908        valve_points.sort_by_key(|v| v.node.0);
909
910        // 5. Compute coverage
911        let visited_count = global_visited.iter().filter(|&&v| v).count() as u32;
912        let coverage_pct = if n > 0 {
913            (visited_count as f32) / (n as f32)
914        } else {
915            0.0
916        };
917
918        let elapsed_ms = start.elapsed().as_secs_f64() * 1000.0;
919
920        Ok(FlowSimulationResult {
921            summary: FlowSummary {
922                total_entry_points: entry_nodes.len() as u32,
923                total_particles: total_particles_spawned,
924                total_nodes_visited: visited_count,
925                turbulence_count: turbulence_points.len() as u32,
926                valve_count: valve_points.len() as u32,
927                coverage_pct,
928                elapsed_ms,
929            },
930            turbulence_points,
931            valve_points,
932        })
933    }
934
935    /// Auto-discover entry points from graph topology and naming patterns.
936    ///
937    /// Prefers `Function` nodes matching `ENTRY_POINT_PATTERNS`. Falls back to
938    /// `File` nodes with entry-like names if no functions match.
939    /// Results are sorted by PageRank (or in-degree) descending, capped at 100.
940    ///
941    /// # Parameters
942    /// - `graph`: finalized graph to search
943    /// - `max_entries`: maximum number of entry points to return (capped at 100)
944    pub fn discover_entry_points(&self, graph: &Graph, max_entries: usize) -> Vec<NodeId> {
945        let n = graph.num_nodes();
946        if n == 0 {
947            return Vec::new();
948        }
949
950        let mut candidates: Vec<(NodeId, f32)> = Vec::new();
951
952        for i in 0..n {
953            let node = NodeId::new(i);
954            let idx = i as usize;
955
956            // Filter by node_type: Function preferred, File as fallback
957            let nt = graph.nodes.node_type[idx];
958            let is_function = matches!(nt, NodeType::Function);
959
960            let label = graph.strings.resolve(graph.nodes.label[idx]).to_lowercase();
961
962            // Check against entry point patterns
963            let matches_pattern = ENTRY_POINT_PATTERNS.iter().any(|p| label.contains(p));
964
965            if matches_pattern && is_function {
966                // Use pagerank if available, otherwise in-degree as priority
967                let priority = if graph.pagerank_computed {
968                    graph.nodes.pagerank[idx].get()
969                } else {
970                    let range = graph.csr.in_range(node);
971                    (range.end - range.start) as f32
972                };
973                candidates.push((node, priority));
974            }
975        }
976
977        // If no function-level entries found, try file-level with entry patterns
978        if candidates.is_empty() {
979            for i in 0..n {
980                let node = NodeId::new(i);
981                let idx = i as usize;
982                let nt = graph.nodes.node_type[idx];
983                if !matches!(nt, NodeType::File) {
984                    continue;
985                }
986                let label = graph.strings.resolve(graph.nodes.label[idx]).to_lowercase();
987                let matches_pattern = ENTRY_POINT_PATTERNS.iter().any(|p| label.contains(p));
988                // Also match files with "main", "app", "server" in name
989                let is_entry_file = label.contains("main")
990                    || label.contains("app.")
991                    || label.contains("server")
992                    || label.contains("__init__");
993
994                if matches_pattern || is_entry_file {
995                    let priority = if graph.pagerank_computed {
996                        graph.nodes.pagerank[idx].get()
997                    } else {
998                        let range = graph.csr.in_range(node);
999                        (range.end - range.start) as f32
1000                    };
1001                    candidates.push((node, priority));
1002                }
1003            }
1004        }
1005
1006        // Sort by priority descending (deterministic: break ties by node id)
1007        candidates.sort_by(|a, b| {
1008            b.1.partial_cmp(&a.1)
1009                .unwrap_or(std::cmp::Ordering::Equal)
1010                .then_with(|| a.0 .0.cmp(&b.0 .0))
1011        });
1012
1013        // Cap at max_entries (AC-6: cap at 100 per PRD F8)
1014        let cap = max_entries.min(100);
1015        candidates.truncate(cap);
1016
1017        candidates.into_iter().map(|(node, _)| node).collect()
1018    }
1019}
1020
1021#[cfg(test)]
1022mod tests {
1023    use super::*;
1024    use crate::graph::Graph;
1025    use crate::types::*;
1026
1027    // ── Helpers ──
1028
1029    /// Build a minimal 2-node finalized graph: A → B
1030    fn two_node_graph(label_a: &str, label_b: &str, relation: &str) -> Graph {
1031        let mut g = Graph::new();
1032        g.add_node("a", label_a, NodeType::Function, &[], 1.0, 0.5)
1033            .unwrap();
1034        g.add_node("b", label_b, NodeType::Function, &[], 0.8, 0.3)
1035            .unwrap();
1036        g.add_edge(
1037            NodeId::new(0),
1038            NodeId::new(1),
1039            relation,
1040            FiniteF32::new(0.9),
1041            EdgeDirection::Forward,
1042            false,
1043            FiniteF32::new(0.5),
1044        )
1045        .unwrap();
1046        g.finalize().unwrap();
1047        g
1048    }
1049
1050    /// Build a 4-node graph with two entry points converging on a shared node:
1051    ///   entry1 → shared
1052    ///   entry2 → shared
1053    ///   shared → sink
1054    fn convergent_graph() -> Graph {
1055        let mut g = Graph::new();
1056        g.add_node("entry1", "handle_alpha", NodeType::Function, &[], 1.0, 0.5)
1057            .unwrap(); // 0
1058        g.add_node("entry2", "handle_beta", NodeType::Function, &[], 1.0, 0.5)
1059            .unwrap(); // 1
1060        g.add_node("shared", "shared_state", NodeType::Function, &[], 0.9, 0.4)
1061            .unwrap(); // 2
1062        g.add_node("sink", "output", NodeType::Function, &[], 0.5, 0.2)
1063            .unwrap(); // 3
1064        g.add_edge(
1065            NodeId::new(0),
1066            NodeId::new(2),
1067            "calls",
1068            FiniteF32::new(0.9),
1069            EdgeDirection::Forward,
1070            false,
1071            FiniteF32::new(0.5),
1072        )
1073        .unwrap();
1074        g.add_edge(
1075            NodeId::new(1),
1076            NodeId::new(2),
1077            "calls",
1078            FiniteF32::new(0.9),
1079            EdgeDirection::Forward,
1080            false,
1081            FiniteF32::new(0.5),
1082        )
1083        .unwrap();
1084        g.add_edge(
1085            NodeId::new(2),
1086            NodeId::new(3),
1087            "calls",
1088            FiniteF32::new(0.8),
1089            EdgeDirection::Forward,
1090            false,
1091            FiniteF32::new(0.3),
1092        )
1093        .unwrap();
1094        g.finalize().unwrap();
1095        g
1096    }
1097
1098    // ── Test 1: empty graph returns Err(NoEntryPoints) ──
1099    #[test]
1100    fn empty_graph_returns_no_entry_points_error() {
1101        let mut g = Graph::new();
1102        g.finalize().unwrap();
1103        let engine = FlowEngine::new();
1104        let config = FlowConfig::default();
1105        let result = engine.simulate(&g, &[], 2, &config);
1106        assert!(matches!(
1107            result,
1108            Err(crate::error::M1ndError::NoEntryPoints)
1109        ));
1110    }
1111
1112    // ── Test 2: turbulence_detection — two entry points converging produce a turbulence point ──
1113    #[test]
1114    fn turbulence_detected_on_convergent_graph() {
1115        let g = convergent_graph();
1116        let engine = FlowEngine::new();
1117        let mut config = FlowConfig::default();
1118        config.turbulence_threshold = 0.0; // capture everything
1119        config.lock_patterns = Vec::new();
1120        config.read_only_patterns = Vec::new();
1121
1122        let entry_nodes = vec![NodeId::new(0), NodeId::new(1)];
1123        let result = engine.simulate(&g, &entry_nodes, 1, &config).unwrap();
1124        // shared_state (node 2) receives particles from two distinct origins
1125        assert!(
1126            result.summary.turbulence_count > 0,
1127            "expected turbulence at convergence node, got 0"
1128        );
1129    }
1130
1131    // ── Test 3: valve_detection — node with lock label becomes a valve ──
1132    #[test]
1133    fn valve_detected_on_lock_node() {
1134        let mut g = Graph::new();
1135        g.add_node("ep", "handle_req", NodeType::Function, &[], 1.0, 0.5)
1136            .unwrap();
1137        g.add_node("lk", "mutex_guard", NodeType::Function, &[], 0.9, 0.4)
1138            .unwrap();
1139        g.add_node("wr", "write_state", NodeType::Function, &[], 0.8, 0.3)
1140            .unwrap();
1141        g.add_edge(
1142            NodeId::new(0),
1143            NodeId::new(1),
1144            "calls",
1145            FiniteF32::new(0.9),
1146            EdgeDirection::Forward,
1147            false,
1148            FiniteF32::new(0.5),
1149        )
1150        .unwrap();
1151        g.add_edge(
1152            NodeId::new(1),
1153            NodeId::new(2),
1154            "calls",
1155            FiniteF32::new(0.9),
1156            EdgeDirection::Forward,
1157            false,
1158            FiniteF32::new(0.5),
1159        )
1160        .unwrap();
1161        g.finalize().unwrap();
1162
1163        let engine = FlowEngine::new();
1164        let mut config = FlowConfig::default();
1165        // mutex is in the heuristic keyword list, so no need to add explicit patterns
1166        config.lock_patterns = Vec::new();
1167        config.read_only_patterns = Vec::new();
1168
1169        let result = engine.simulate(&g, &[NodeId::new(0)], 1, &config).unwrap();
1170        // mutex_guard heuristic should register a valve
1171        assert!(
1172            result.summary.valve_count > 0,
1173            "expected a valve at mutex_guard node"
1174        );
1175    }
1176
1177    // ── Test 4: max_depth — particles don't travel beyond max_depth ──
1178    #[test]
1179    fn max_depth_limits_propagation() {
1180        // Chain: 0 → 1 → 2 → 3 → 4 (5 nodes)
1181        let mut g = Graph::new();
1182        for i in 0..5u32 {
1183            g.add_node(
1184                &format!("n{}", i),
1185                &format!("node_{}", i),
1186                NodeType::Function,
1187                &[],
1188                1.0,
1189                0.5,
1190            )
1191            .unwrap();
1192        }
1193        for i in 0..4u32 {
1194            g.add_edge(
1195                NodeId::new(i),
1196                NodeId::new(i + 1),
1197                "calls",
1198                FiniteF32::new(0.9),
1199                EdgeDirection::Forward,
1200                false,
1201                FiniteF32::new(0.5),
1202            )
1203            .unwrap();
1204        }
1205        g.finalize().unwrap();
1206
1207        let engine = FlowEngine::new();
1208        let mut config = FlowConfig::default();
1209        config.max_depth = 2; // only 2 hops from entry
1210        config.lock_patterns = Vec::new();
1211        config.read_only_patterns = Vec::new();
1212
1213        let result = engine.simulate(&g, &[NodeId::new(0)], 1, &config).unwrap();
1214        // nodes 0, 1, 2 visited (depth 0, 1, 2); nodes 3, 4 NOT visited
1215        assert!(
1216            result.summary.total_nodes_visited <= 3,
1217            "expected at most 3 nodes visited with max_depth=2, got {}",
1218            result.summary.total_nodes_visited
1219        );
1220    }
1221
1222    // ── Test 5: max_steps budget — simulation stops at budget ──
1223    #[test]
1224    fn max_steps_budget_stops_simulation() {
1225        // Dense graph: 10 nodes fully connected
1226        let mut g = Graph::new();
1227        for i in 0..10u32 {
1228            g.add_node(
1229                &format!("n{}", i),
1230                &format!("fn_{}", i),
1231                NodeType::Function,
1232                &[],
1233                1.0,
1234                0.5,
1235            )
1236            .unwrap();
1237        }
1238        for i in 0..10u32 {
1239            for j in 0..10u32 {
1240                if i != j {
1241                    let _ = g.add_edge(
1242                        NodeId::new(i),
1243                        NodeId::new(j),
1244                        "calls",
1245                        FiniteF32::new(0.9),
1246                        EdgeDirection::Forward,
1247                        false,
1248                        FiniteF32::new(0.5),
1249                    );
1250                }
1251            }
1252        }
1253        g.finalize().unwrap();
1254
1255        let engine = FlowEngine::new();
1256        let mut config = FlowConfig::default();
1257        config.max_total_steps = 5; // tiny budget
1258        config.lock_patterns = Vec::new();
1259        config.read_only_patterns = Vec::new();
1260
1261        // Should complete without panic or hang
1262        let result = engine.simulate(&g, &[NodeId::new(0)], 1, &config);
1263        assert!(
1264            result.is_ok(),
1265            "simulation should succeed even with tiny budget"
1266        );
1267    }
1268
1269    // ── Test 6: scope_filter — only nodes matching filter are visited ──
1270    #[test]
1271    fn scope_filter_restricts_visited_nodes() {
1272        // 3-node graph: entry → alpha_fn → beta_fn
1273        let mut g = Graph::new();
1274        g.add_node("e", "entry_point", NodeType::Function, &[], 1.0, 0.5)
1275            .unwrap();
1276        g.add_node("a", "alpha_fn", NodeType::Function, &[], 0.9, 0.4)
1277            .unwrap();
1278        g.add_node("b", "beta_fn", NodeType::Function, &[], 0.8, 0.3)
1279            .unwrap();
1280        g.add_edge(
1281            NodeId::new(0),
1282            NodeId::new(1),
1283            "calls",
1284            FiniteF32::new(0.9),
1285            EdgeDirection::Forward,
1286            false,
1287            FiniteF32::new(0.5),
1288        )
1289        .unwrap();
1290        g.add_edge(
1291            NodeId::new(1),
1292            NodeId::new(2),
1293            "calls",
1294            FiniteF32::new(0.9),
1295            EdgeDirection::Forward,
1296            false,
1297            FiniteF32::new(0.5),
1298        )
1299        .unwrap();
1300        g.finalize().unwrap();
1301
1302        let engine = FlowEngine::new();
1303        let mut config = FlowConfig::default();
1304        config.scope_filter = Some("alpha".to_string()); // only alpha_fn passes
1305        config.lock_patterns = Vec::new();
1306        config.read_only_patterns = Vec::new();
1307
1308        let result = engine.simulate(&g, &[NodeId::new(0)], 1, &config).unwrap();
1309        // entry always visited (scope filter only restricts propagation targets),
1310        // alpha_fn matches scope → visited; beta_fn does NOT match → not visited
1311        assert!(
1312            result.summary.total_nodes_visited <= 2,
1313            "scope filter should restrict to at most entry + alpha, got {}",
1314            result.summary.total_nodes_visited
1315        );
1316    }
1317
1318    // ── Test 7: read_only — read-only node reduces turbulence score ──
1319    #[test]
1320    fn read_only_node_gets_reduced_turbulence() {
1321        // Two entries both converging on a "get_" prefixed node
1322        let mut g = Graph::new();
1323        g.add_node("e1", "handle_alpha", NodeType::Function, &[], 1.0, 0.5)
1324            .unwrap(); // 0
1325        g.add_node("e2", "handle_beta", NodeType::Function, &[], 1.0, 0.5)
1326            .unwrap(); // 1
1327        g.add_node("ro", "get_state", NodeType::Function, &[], 0.9, 0.4)
1328            .unwrap(); // 2
1329        g.add_edge(
1330            NodeId::new(0),
1331            NodeId::new(2),
1332            "calls",
1333            FiniteF32::new(0.9),
1334            EdgeDirection::Forward,
1335            false,
1336            FiniteF32::new(0.5),
1337        )
1338        .unwrap();
1339        g.add_edge(
1340            NodeId::new(1),
1341            NodeId::new(2),
1342            "calls",
1343            FiniteF32::new(0.9),
1344            EdgeDirection::Forward,
1345            false,
1346            FiniteF32::new(0.5),
1347        )
1348        .unwrap();
1349        g.finalize().unwrap();
1350
1351        let engine = FlowEngine::new();
1352        let mut config = FlowConfig::with_defaults();
1353        config.turbulence_threshold = 0.0;
1354
1355        let result = engine
1356            .simulate(&g, &[NodeId::new(0), NodeId::new(1)], 1, &config)
1357            .unwrap();
1358        // get_state is read-only: score multiplied by 0.2 factor.
1359        // Either no turbulence points, or turbulence score is low.
1360        for tp in &result.turbulence_points {
1361            if tp.node_label.contains("get_state") {
1362                assert!(
1363                    tp.turbulence_score <= 0.3,
1364                    "read-only node should have low turbulence score, got {}",
1365                    tp.turbulence_score
1366                );
1367            }
1368        }
1369    }
1370
1371    // ── Test 8: auto_discover — entry point discovery finds handle_ functions ──
1372    #[test]
1373    fn auto_discover_finds_handle_functions() {
1374        let mut g = Graph::new();
1375        g.add_node("h1", "handle_request", NodeType::Function, &[], 1.0, 0.5)
1376            .unwrap();
1377        g.add_node("h2", "handle_event", NodeType::Function, &[], 0.9, 0.4)
1378            .unwrap();
1379        g.add_node("u", "utility_helper", NodeType::Function, &[], 0.5, 0.2)
1380            .unwrap();
1381        g.add_edge(
1382            NodeId::new(0),
1383            NodeId::new(2),
1384            "calls",
1385            FiniteF32::new(0.8),
1386            EdgeDirection::Forward,
1387            false,
1388            FiniteF32::new(0.3),
1389        )
1390        .unwrap();
1391        g.add_edge(
1392            NodeId::new(1),
1393            NodeId::new(2),
1394            "calls",
1395            FiniteF32::new(0.8),
1396            EdgeDirection::Forward,
1397            false,
1398            FiniteF32::new(0.3),
1399        )
1400        .unwrap();
1401        g.finalize().unwrap();
1402
1403        let engine = FlowEngine::new();
1404        let entries = engine.discover_entry_points(&g, 10);
1405        // Both handle_ functions should be discovered
1406        assert_eq!(
1407            entries.len(),
1408            2,
1409            "expected 2 handle_ entry points, got {}",
1410            entries.len()
1411        );
1412    }
1413}