Skip to main content

ruvector_memopt/algorithms/
mincut.rs

1//! MinCut algorithm for process clustering
2//!
3//! Groups processes that share memory pages or have parent-child relationships.
4//! Optimizing related processes together improves cache coherence.
5
6use std::collections::{HashMap, HashSet, VecDeque};
7use sysinfo::{System, ProcessesToUpdate};
8
9/// Edge in the process graph
10#[derive(Debug, Clone)]
11struct Edge {
12    from: u32,
13    to: u32,
14    weight: f64,
15}
16
17/// Process cluster identified by MinCut
18#[derive(Debug, Clone)]
19pub struct ProcessCluster {
20    pub id: usize,
21    pub processes: Vec<u32>,
22    pub total_memory_mb: f64,
23    pub connectivity: f64,
24}
25
26/// MinCut-based process clustering
27pub struct MinCutClusterer {
28    adjacency: HashMap<u32, Vec<(u32, f64)>>,
29    process_memory: HashMap<u32, u64>,
30    min_cluster_size: usize,
31}
32
33impl MinCutClusterer {
34    pub fn new() -> Self {
35        Self {
36            adjacency: HashMap::with_capacity(512),
37            process_memory: HashMap::with_capacity(512),
38            min_cluster_size: 2,
39        }
40    }
41
42    /// Build process graph from system state
43    pub fn build_graph(&mut self, system: &System) {
44        self.adjacency.clear();
45        self.process_memory.clear();
46
47        // Phase 1: Collect process info and build name -> PIDs index (O(n))
48        let mut name_groups: HashMap<String, Vec<u32>> = HashMap::new();
49
50        for (pid, process) in system.processes() {
51            let pid_u32 = pid.as_u32();
52            self.process_memory.insert(pid_u32, process.memory());
53
54            // Parent-child edge (strong connection)
55            if let Some(parent_pid) = process.parent() {
56                self.add_edge(pid_u32, parent_pid.as_u32(), 1.0);
57            }
58
59            // Index by name for O(n) same-name grouping
60            let name = process.name().to_string_lossy().to_lowercase();
61            name_groups.entry(name).or_default().push(pid_u32);
62        }
63
64        // Phase 2: Connect same-name processes (O(n) total across all groups)
65        for (_name, pids) in &name_groups {
66            if pids.len() < 2 { continue; }
67            // Connect each to the first (star topology) instead of all-pairs O(k^2)
68            let hub = pids[0];
69            for &pid in &pids[1..] {
70                self.add_edge(hub, pid, 0.5);
71            }
72        }
73    }
74
75    fn add_edge(&mut self, from: u32, to: u32, weight: f64) {
76        self.adjacency
77            .entry(from)
78            .or_insert_with(Vec::new)
79            .push((to, weight));
80        self.adjacency
81            .entry(to)
82            .or_insert_with(Vec::new)
83            .push((from, weight));
84    }
85
86    /// Find clusters using Karger's MinCut approximation
87    /// Returns groups of related processes
88    pub fn find_clusters(&self, target_clusters: usize) -> Vec<ProcessCluster> {
89        if self.adjacency.is_empty() {
90            return vec![];
91        }
92
93        // Use BFS-based clustering (faster than Karger's for our use case)
94        let mut visited: HashSet<u32> = HashSet::new();
95        let mut clusters = Vec::new();
96        let mut cluster_id = 0;
97
98        for &start_pid in self.adjacency.keys() {
99            if visited.contains(&start_pid) {
100                continue;
101            }
102
103            let cluster = self.bfs_cluster(start_pid, &mut visited);
104            if cluster.len() >= self.min_cluster_size {
105                let total_memory: u64 = cluster
106                    .iter()
107                    .filter_map(|pid| self.process_memory.get(pid))
108                    .sum();
109
110                let connectivity = self.calculate_connectivity(&cluster);
111
112                clusters.push(ProcessCluster {
113                    id: cluster_id,
114                    processes: cluster,
115                    total_memory_mb: total_memory as f64 / (1024.0 * 1024.0),
116                    connectivity,
117                });
118                cluster_id += 1;
119            }
120
121            if clusters.len() >= target_clusters {
122                break;
123            }
124        }
125
126        // Sort by total memory (optimize largest clusters first)
127        clusters.sort_by(|a, b| {
128            b.total_memory_mb
129                .partial_cmp(&a.total_memory_mb)
130                .unwrap_or(std::cmp::Ordering::Equal)
131        });
132
133        clusters
134    }
135
136    /// BFS to find connected component
137    fn bfs_cluster(&self, start: u32, visited: &mut HashSet<u32>) -> Vec<u32> {
138        let mut cluster = Vec::new();
139        let mut queue = VecDeque::new();
140        queue.push_back(start);
141
142        while let Some(pid) = queue.pop_front() {
143            if visited.contains(&pid) {
144                continue;
145            }
146            visited.insert(pid);
147            cluster.push(pid);
148
149            if let Some(neighbors) = self.adjacency.get(&pid) {
150                for &(neighbor, weight) in neighbors {
151                    if !visited.contains(&neighbor) && weight > 0.3 {
152                        queue.push_back(neighbor);
153                    }
154                }
155            }
156        }
157
158        cluster
159    }
160
161    /// Calculate internal connectivity of a cluster
162    fn calculate_connectivity(&self, cluster: &[u32]) -> f64 {
163        if cluster.len() < 2 {
164            return 0.0;
165        }
166
167        let cluster_set: HashSet<u32> = cluster.iter().copied().collect();
168        let mut internal_edges = 0.0;
169        let mut total_edges = 0.0;
170
171        for &pid in cluster {
172            if let Some(neighbors) = self.adjacency.get(&pid) {
173                for &(neighbor, weight) in neighbors {
174                    total_edges += weight;
175                    if cluster_set.contains(&neighbor) {
176                        internal_edges += weight;
177                    }
178                }
179            }
180        }
181
182        if total_edges > 0.0 {
183            internal_edges / total_edges
184        } else {
185            0.0
186        }
187    }
188
189    /// Get optimal trim order within a cluster
190    pub fn get_trim_order(&self, cluster: &ProcessCluster) -> Vec<u32> {
191        // Sort by memory usage (trim largest first within cluster)
192        let mut ordered = cluster.processes.clone();
193        ordered.sort_by(|a, b| {
194            let mem_a = self.process_memory.get(a).unwrap_or(&0);
195            let mem_b = self.process_memory.get(b).unwrap_or(&0);
196            mem_b.cmp(mem_a)
197        });
198        ordered
199    }
200
201    /// Statistics for benchmarking
202    pub fn stats(&self) -> MinCutStats {
203        MinCutStats {
204            total_processes: self.process_memory.len(),
205            total_edges: self.adjacency.values().map(|v| v.len()).sum::<usize>() / 2,
206            total_memory_mb: self.process_memory.values().sum::<u64>() as f64 / (1024.0 * 1024.0),
207        }
208    }
209}
210
211impl Default for MinCutClusterer {
212    fn default() -> Self {
213        Self::new()
214    }
215}
216
217#[derive(Debug, Clone)]
218pub struct MinCutStats {
219    pub total_processes: usize,
220    pub total_edges: usize,
221    pub total_memory_mb: f64,
222}
223
224#[cfg(test)]
225mod tests {
226    use super::*;
227
228    #[test]
229    fn test_mincut_clusterer() {
230        let mut clusterer = MinCutClusterer::new();
231
232        // Add some test edges manually
233        clusterer.add_edge(1, 2, 1.0);
234        clusterer.add_edge(2, 3, 1.0);
235        clusterer.add_edge(4, 5, 1.0);
236
237        clusterer.process_memory.insert(1, 100 * 1024 * 1024);
238        clusterer.process_memory.insert(2, 200 * 1024 * 1024);
239        clusterer.process_memory.insert(3, 150 * 1024 * 1024);
240        clusterer.process_memory.insert(4, 50 * 1024 * 1024);
241        clusterer.process_memory.insert(5, 75 * 1024 * 1024);
242
243        let clusters = clusterer.find_clusters(10);
244        assert!(!clusters.is_empty());
245
246        let stats = clusterer.stats();
247        assert_eq!(stats.total_processes, 5);
248    }
249}