1use super::inference::InferenceEngine;
2use super::parser::{Hunk, WorkerTask};
3use std::fs;
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6use tokio::sync::oneshot;
7use tokio::task::JoinSet;
8
9pub enum ReviewResponse {
10 Accept,
11 Reject,
12 Retry,
13}
14
15pub enum SwarmMessage {
16 Progress(String, u8),
17 ReviewRequest {
18 worker_id: String,
19 file_path: PathBuf,
20 before: String,
21 after: String,
22 tx: oneshot::Sender<ReviewResponse>,
23 },
24 Done,
25}
26
27pub struct SwarmCoordinator {
29 pub engine: Arc<InferenceEngine>,
30 pub scratch_dir: PathBuf,
31 pub worker_model: Option<String>,
32 pub gpu_state: Arc<crate::ui::gpu_monitor::GpuState>,
33 #[allow(dead_code)]
34 pub professional: bool,
35}
36
37impl SwarmCoordinator {
38 pub fn new(
39 engine: Arc<InferenceEngine>,
40 gpu_state: Arc<crate::ui::gpu_monitor::GpuState>,
41 worker_model: Option<String>,
42 professional: bool,
43 ) -> Self {
44 let root = crate::tools::file_ops::workspace_root();
45 let hematite_dir = crate::tools::file_ops::hematite_dir();
46 let scratch_dir = hematite_dir.join("scratch");
47
48 let gitignore_path = root.join(".gitignore");
49 if gitignore_path.exists() {
50 if let Ok(content) = std::fs::read_to_string(&gitignore_path) {
51 if !content.contains(".hematite") {
52 let mut new_content = content;
53 if !new_content.ends_with('\n') {
54 new_content.push('\n');
55 }
56 new_content.push_str(".hematite/\n");
57 let _ = std::fs::write(&gitignore_path, new_content);
58 }
59 }
60 }
61
62 if !hematite_dir.exists() {
63 let _ = std::fs::create_dir_all(&hematite_dir);
64 }
65 if !scratch_dir.exists() {
66 let _ = std::fs::create_dir_all(&scratch_dir);
67 }
68
69 Self {
70 engine,
71 scratch_dir,
72 worker_model,
73 gpu_state,
74 professional,
75 }
76 }
77
78 pub async fn dispatch_swarm(
80 &self,
81 tasks: Vec<WorkerTask>,
82 progression_tx: tokio::sync::mpsc::Sender<SwarmMessage>,
83 max_workers: usize,
84 ) -> Result<(), String> {
85 let mut join_set = JoinSet::new();
86
87 let vram_usage = self.gpu_state.ratio();
90 let is_sequential = vram_usage > 0.85;
91
92 if is_sequential {
93 let _ = progression_tx
94 .send(SwarmMessage::Progress("CPU/GPU GUARD".to_string(), 0))
95 .await;
96 let _ = progression_tx
97 .send(SwarmMessage::Progress(
98 "LOW VRAM: Switching to Sequential Mode".to_string(),
99 1,
100 ))
101 .await;
102 }
103
104 for task in tasks.into_iter().take(max_workers) {
105 let engine_clone = self.engine.clone();
106 let tx_clone = progression_tx.clone();
107 let scratch_path = self.scratch_dir.join(format!("worker_{}.diff", task.id));
108 let worker_job = async move {
109 let _ = tx_clone
111 .send(SwarmMessage::Progress(task.id.clone(), 25))
112 .await;
113
114 let prompt = format!(
116 "TARGET: {}\nDIRECTIVE: {}\n\n[HEMATITE SYNTHESIS BAN]\nYou are explicitly forbidden from lazy delegation (e.g. saying 'based on worker findings'). You MUST execute a Synthesis Pass dynamically: 1) Read the actual findings. 2) Specify the concrete integration logic yourself. 3) Output code directly targeting the exact bounds.",
117 task.target, task.instruction
118 );
119
120 if let Ok(res) = engine_clone.generate_task_worker(&prompt, true).await {
122 let _ = tx_clone
123 .send(SwarmMessage::Progress(task.id.clone(), 75))
124 .await;
125
126 let _ = std::fs::write(&scratch_path, res.clone());
128 let _ = tx_clone
129 .send(SwarmMessage::Progress(task.id.clone(), 100))
130 .await;
131
132 let target_path = PathBuf::from(task.target.clone());
134 let before = if target_path.is_file() {
135 std::fs::read_to_string(&target_path)
136 .unwrap_or_else(|_| "[Error reading context]".to_string())
137 } else {
138 format!("[SYNERGY: Exploring {}]", task.target)
139 };
140
141 let (res_tx, res_rx) = oneshot::channel();
142 let _ = tx_clone
143 .send(SwarmMessage::ReviewRequest {
144 worker_id: task.id.clone(),
145 file_path: target_path.clone(),
146 before,
147 after: res.clone(),
148 tx: res_tx,
149 })
150 .await;
151
152 let _ = res_rx.await;
154 }
155 };
156
157 if is_sequential {
158 worker_job.await;
159 } else {
160 join_set.spawn(worker_job);
161 }
162 }
163
164 while join_set.join_next().await.is_some() {
166 }
168
169 let _ = progression_tx.send(SwarmMessage::Done).await;
170 Ok(())
171 }
172
173 #[allow(dead_code)]
175 pub async fn apply_patches_descending(
176 &self,
177 file_path: &Path,
178 mut hunks: Vec<Hunk>,
179 progression_tx: tokio::sync::mpsc::Sender<SwarmMessage>,
180 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
181 let content = fs::read_to_string(file_path)?;
182 let mut lines: Vec<String> = {
183 let n = content.matches('\n').count() + 1;
184 let mut v = Vec::with_capacity(n);
185 v.extend(content.lines().map(|s| s.to_string()));
186 v
187 };
188
189 hunks.sort_by_key(|h| h.sort_key());
191
192 let mut i = 0;
193 while i < hunks.len() {
194 let current = &hunks[i];
195
196 if i + 1 < hunks.len() && hunks[i + 1].end_line >= current.start_line {
198 let mut retry_count = 0u32;
200 const MAX_CONFLICT_RETRIES: u32 = 3;
201 loop {
202 if retry_count >= MAX_CONFLICT_RETRIES {
203 i += 2;
205 break;
206 }
207 let padding: usize = 10 + (retry_count as usize * 10);
209 let conflict_start = hunks[i + 1].start_line.saturating_sub(padding + 1);
211 let conflict_end = (current.end_line + padding).min(lines.len());
212 let context = lines[conflict_start..conflict_end].join("\n");
213
214 let prompt = if retry_count == 0 {
215 format!("CONFLICT in {}.\nContext:\n{}\n\nWorker {} wants: {}\nWorker {} wants: {}\nResolve these into one block.",
216 file_path.display(), context, current.worker_id, current.content, hunks[i+1].worker_id, hunks[i+1].content)
217 } else {
218 format!("CRITICAL: Your previous synthesis for this conflict was REJECTED by the human architect.\nThe merge you proposed was logically unsound.\nDO NOT REPEAT MISTAKES.\n\nCONFLICT in {}.\nContext:\n{}\n\nWorker {} wants: {}\nWorker {} wants: {}\nResolve these into one robust logical block.",
219 file_path.display(), context, current.worker_id, current.content, hunks[i+1].worker_id, hunks[i+1].content)
220 };
221
222 let temp = if retry_count > 0 { 0.7 } else { 0.1 };
224
225 let resolved_block = self
226 .engine
227 .generate_task_with_temp(&prompt, temp, true)
228 .await
229 .map_err(|e| -> Box<dyn std::error::Error + Send + Sync> {
230 Box::from(e)
231 })?;
232
233 let (response_tx, response_rx) = oneshot::channel();
235 let _ = progression_tx
236 .send(SwarmMessage::ReviewRequest {
237 worker_id: current.worker_id.clone(),
238 file_path: file_path.to_path_buf(),
239 before: context.clone(),
240 after: resolved_block.clone(),
241 tx: response_tx,
242 })
243 .await;
244
245 match response_rx.await.unwrap_or(ReviewResponse::Reject) {
247 ReviewResponse::Accept => {
248 lines.splice(conflict_start..conflict_end, vec![resolved_block]);
249 i += 2;
250 break;
251 }
252 ReviewResponse::Retry => {
253 retry_count += 1;
254 continue; }
256 ReviewResponse::Reject => {
257 i += 2; break;
259 }
260 }
261 }
262 } else {
263 let start_idx = current.start_line.saturating_sub(1);
265 let end_idx = current.end_line.min(lines.len());
266 let range = start_idx..end_idx;
267 lines.splice(range, vec![current.content.clone()]);
268 i += 1;
269 }
270 }
271
272 fs::write(file_path, lines.join("\n"))?;
273 Ok(())
274 }
275}
276
277impl Drop for SwarmCoordinator {
278 fn drop(&mut self) {
279 if self.scratch_dir.exists() {
282 if let Ok(entries) = std::fs::read_dir(&self.scratch_dir) {
283 for entry in entries.flatten() {
284 let p = entry.path();
285 if p.is_file() {
286 let _ = std::fs::remove_file(p);
287 }
288 }
289 }
290 }
291 eprintln!("[Hematite] Swarm shutdown complete. Scratchpad wiped.");
292 }
293}