Skip to main content

hematite/agent/
swarm.rs

1use super::inference::InferenceEngine;
2use super::parser::{Hunk, WorkerTask};
3use std::fs;
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6use tokio::sync::oneshot;
7use tokio::task::JoinSet;
8
9pub enum ReviewResponse {
10    Accept,
11    Reject,
12    Retry,
13}
14
15pub enum SwarmMessage {
16    Progress(String, u8),
17    ReviewRequest {
18        worker_id: String,
19        file_path: PathBuf,
20        before: String,
21        after: String,
22        tx: oneshot::Sender<ReviewResponse>,
23    },
24    Done,
25}
26
27/// The Core parallel orchestrator locking background models to strict 12GB KV cache boundaries.
28pub struct SwarmCoordinator {
29    pub engine: Arc<InferenceEngine>,
30    pub scratch_dir: PathBuf,
31    pub worker_model: Option<String>,
32    pub gpu_state: Arc<crate::ui::gpu_monitor::GpuState>,
33    #[allow(dead_code)]
34    pub professional: bool,
35}
36
37impl SwarmCoordinator {
38    pub fn new(
39        engine: Arc<InferenceEngine>,
40        gpu_state: Arc<crate::ui::gpu_monitor::GpuState>,
41        worker_model: Option<String>,
42        professional: bool,
43    ) -> Self {
44        let root = crate::tools::file_ops::workspace_root();
45        let hematite_dir = root.join(".hematite");
46        let scratch_dir = hematite_dir.join("scratch");
47
48        let gitignore_path = root.join(".gitignore");
49        if gitignore_path.exists() {
50            if let Ok(content) = std::fs::read_to_string(&gitignore_path) {
51                if !content.contains(".hematite") {
52                    let mut new_content = content;
53                    if !new_content.ends_with('\n') {
54                        new_content.push('\n');
55                    }
56                    new_content.push_str(".hematite/\n");
57                    let _ = std::fs::write(&gitignore_path, new_content);
58                }
59            }
60        }
61
62        if !hematite_dir.exists() {
63            let _ = std::fs::create_dir_all(&hematite_dir);
64        }
65        if !scratch_dir.exists() {
66            let _ = std::fs::create_dir_all(&scratch_dir);
67        }
68
69        Self {
70            engine,
71            scratch_dir,
72            worker_model,
73            gpu_state,
74            professional,
75        }
76    }
77
78    /// Spawns parallel execution green-threads while respecting the hardware-aware limit.
79    pub async fn dispatch_swarm(
80        &self,
81        tasks: Vec<WorkerTask>,
82        progression_tx: tokio::sync::mpsc::Sender<SwarmMessage>,
83        max_workers: usize,
84    ) -> Result<(), String> {
85        let mut join_set = JoinSet::new();
86
87        // ── VRAM-Aware Throttling ──
88        // If VRAM is > 85% used, we drop to Sequential Mode to prevent crashes.
89        let vram_usage = self.gpu_state.ratio();
90        let is_sequential = vram_usage > 0.85;
91
92        if is_sequential {
93            let _ = progression_tx
94                .send(SwarmMessage::Progress("CPU/GPU GUARD".to_string(), 0))
95                .await;
96            let _ = progression_tx
97                .send(SwarmMessage::Progress(
98                    "LOW VRAM: Switching to Sequential Mode".to_string(),
99                    1,
100                ))
101                .await;
102        }
103
104        for task in tasks.into_iter().take(max_workers) {
105            let engine_clone = self.engine.clone();
106            let tx_clone = progression_tx.clone();
107            let scratch_path = self.scratch_dir.join(format!("worker_{}.diff", task.id));
108            let worker_job = async move {
109                // 1) Research
110                let _ = tx_clone
111                    .send(SwarmMessage::Progress(task.id.clone(), 25))
112                    .await;
113
114                // 2) Native Synthesis Gen (Batch context evaluation)
115                let prompt = format!(
116                    "TARGET: {}\nDIRECTIVE: {}\n\n[HEMATITE SYNTHESIS BAN]\nYou are explicitly forbidden from lazy delegation (e.g. saying 'based on worker findings'). You MUST execute a Synthesis Pass dynamically: 1) Read the actual findings. 2) Specify the concrete integration logic yourself. 3) Output code directly targeting the exact bounds.", 
117                    task.target, task.instruction
118                );
119
120                // Use the generate_task_worker path which respects asymmetric model IDs
121                if let Ok(res) = engine_clone.generate_task_worker(&prompt, true).await {
122                    let _ = tx_clone
123                        .send(SwarmMessage::Progress(task.id.clone(), 75))
124                        .await;
125
126                    // 3) Push directly into Scratchpad isolating original File Locks
127                    let _ = std::fs::write(&scratch_path, res.clone());
128                    let _ = tx_clone
129                        .send(SwarmMessage::Progress(task.id.clone(), 100))
130                        .await;
131
132                    // 4) High-End Oversight: Trigger Human Review for EVERY successful generation
133                    let target_path = PathBuf::from(task.target.clone());
134                    let before = if target_path.is_file() {
135                        std::fs::read_to_string(&target_path)
136                            .unwrap_or_else(|_| "[Error reading context]".to_string())
137                    } else {
138                        format!("[SYNERGY: Exploring {}]", task.target)
139                    };
140
141                    let (res_tx, res_rx) = oneshot::channel();
142                    let _ = tx_clone
143                        .send(SwarmMessage::ReviewRequest {
144                            worker_id: task.id.clone(),
145                            file_path: target_path.clone(),
146                            before,
147                            after: res.clone(),
148                            tx: res_tx,
149                        })
150                        .await;
151
152                    // Sync 2-Way Lock completely halting execution until Architect signs off
153                    let _ = res_rx.await;
154                }
155            };
156
157            if is_sequential {
158                worker_job.await;
159            } else {
160                join_set.spawn(worker_job);
161            }
162        }
163
164        // Orchestrator patiently waits natively evaluating background executions
165        while let Some(_) = join_set.join_next().await {
166            // Evaluates patches passively
167        }
168
169        let _ = progression_tx.send(SwarmMessage::Done).await;
170        Ok(())
171    }
172
173    /// Evaluates compiled scratchpad chunks backwards utilizing Reverse sorting organically slicing VRAM limits natively!
174    #[allow(dead_code)]
175    pub async fn apply_patches_descending(
176        &self,
177        file_path: &Path,
178        mut hunks: Vec<Hunk>,
179        progression_tx: tokio::sync::mpsc::Sender<SwarmMessage>,
180    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
181        let mut lines: Vec<String> = fs::read_to_string(file_path)?
182            .lines()
183            .map(|s| s.to_string())
184            .collect();
185
186        // The Golden Rule: Sort Descending natively
187        hunks.sort_by_key(|h| h.sort_key());
188
189        let mut i = 0;
190        while i < hunks.len() {
191            let current = &hunks[i];
192
193            // Look ahead for overlaps (Conflicts)
194            if i + 1 < hunks.len() && hunks[i + 1].end_line >= current.start_line {
195                // CONFLICT DETECTED: Tier 1 Synthesis Merge Pass targeting isolated context ranges
196                let mut retry_count = 0u32;
197                const MAX_CONFLICT_RETRIES: u32 = 3;
198                loop {
199                    if retry_count >= MAX_CONFLICT_RETRIES {
200                        // Give up and skip both conflicting hunks.
201                        i += 2;
202                        break;
203                    }
204                    // Safety Net Context Expansion: Double the inference bounds on retry dynamically mapping logic
205                    let padding: usize = 10 + (retry_count as usize * 10);
206                    let conflict_start = hunks[i + 1].start_line.saturating_sub(padding);
207                    let conflict_end = (current.end_line + padding).min(lines.len());
208                    let context = lines[conflict_start..conflict_end].join("\n");
209
210                    let prompt = if retry_count == 0 {
211                        format!("CONFLICT in {}.\nContext:\n{}\n\nWorker {} wants: {}\nWorker {} wants: {}\nResolve these into one block.",
212                        file_path.display(), context, current.worker_id, current.content, hunks[i+1].worker_id, hunks[i+1].content)
213                    } else {
214                        format!("CRITICAL: Your previous synthesis for this conflict was REJECTED by the human architect.\nThe merge you proposed was logically unsound.\nDO NOT REPEAT MISTAKES.\n\nCONFLICT in {}.\nContext:\n{}\n\nWorker {} wants: {}\nWorker {} wants: {}\nResolve these into one robust logical block.",
215                        file_path.display(), context, current.worker_id, current.content, hunks[i+1].worker_id, hunks[i+1].content)
216                    };
217
218                    // Scale Chaos temperature natively explicitly to evade deterministic loops
219                    let temp = if retry_count > 0 { 0.7 } else { 0.1 };
220
221                    let resolved_block = self
222                        .engine
223                        .generate_task_with_temp(&prompt, temp, true)
224                        .await
225                        .map_err(|e| -> Box<dyn std::error::Error + Send + Sync> {
226                            Box::from(e)
227                        })?;
228
229                    // Cross the dimensional bound! Halt Background native evaluating physical Main interaction!
230                    let (response_tx, response_rx) = oneshot::channel();
231                    let _ = progression_tx
232                        .send(SwarmMessage::ReviewRequest {
233                            worker_id: current.worker_id.clone(),
234                            file_path: file_path.to_path_buf(),
235                            before: context.clone(),
236                            after: resolved_block.clone(),
237                            tx: response_tx,
238                        })
239                        .await;
240
241                    // Sync 2-Way Lock completely halting Orchestrator
242                    match response_rx.await.unwrap_or(ReviewResponse::Reject) {
243                        ReviewResponse::Accept => {
244                            lines.splice(conflict_start..conflict_end, vec![resolved_block]);
245                            i += 2;
246                            break;
247                        }
248                        ReviewResponse::Retry => {
249                            retry_count += 1;
250                            continue; // Organically loops back utilizing dynamically expanded Context and Chaos Temps
251                        }
252                        ReviewResponse::Reject => {
253                            i += 2; // Jump over merged hunk traces explicitly abandoning patch overlay
254                            break;
255                        }
256                    }
257                }
258            } else {
259                // Safe absolute hunk application preventing index drift identically
260                let start_idx = current.start_line.saturating_sub(1);
261                let end_idx = current.end_line.min(lines.len());
262                let range = start_idx..end_idx;
263                lines.splice(range, vec![current.content.clone()]);
264                i += 1;
265            }
266        }
267
268        fs::write(file_path, lines.join("\n"))?;
269        Ok(())
270    }
271}
272
273impl Drop for SwarmCoordinator {
274    fn drop(&mut self) {
275        // Emergency Cleanup: Wipe the scratchpad contents.
276        // This fires on normal exit, Ctrl+C (via tokio's signal handler), or panic unwind.
277        if self.scratch_dir.exists() {
278            if let Ok(entries) = std::fs::read_dir(&self.scratch_dir) {
279                for entry in entries.flatten() {
280                    let p = entry.path();
281                    if p.is_file() {
282                        let _ = std::fs::remove_file(p);
283                    }
284                }
285            }
286        }
287        eprintln!("[Hematite] Swarm shutdown complete. Scratchpad wiped.");
288    }
289}