1use super::inference::InferenceEngine;
2use super::parser::{Hunk, WorkerTask};
3use std::fs;
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6use tokio::sync::oneshot;
7use tokio::task::JoinSet;
8
9pub enum ReviewResponse {
10 Accept,
11 Reject,
12 Retry,
13}
14
15pub enum SwarmMessage {
16 Progress(String, u8),
17 ReviewRequest {
18 worker_id: String,
19 file_path: PathBuf,
20 before: String,
21 after: String,
22 tx: oneshot::Sender<ReviewResponse>,
23 },
24 Done,
25}
26
27pub struct SwarmCoordinator {
29 pub engine: Arc<InferenceEngine>,
30 pub scratch_dir: PathBuf,
31 pub worker_model: Option<String>,
32 pub gpu_state: Arc<crate::ui::gpu_monitor::GpuState>,
33 #[allow(dead_code)]
34 pub professional: bool,
35}
36
37impl SwarmCoordinator {
38 pub fn new(
39 engine: Arc<InferenceEngine>,
40 gpu_state: Arc<crate::ui::gpu_monitor::GpuState>,
41 worker_model: Option<String>,
42 professional: bool,
43 ) -> Self {
44 let root = crate::tools::file_ops::workspace_root();
45 let hematite_dir = crate::tools::file_ops::hematite_dir();
46 let scratch_dir = hematite_dir.join("scratch");
47
48 let gitignore_path = root.join(".gitignore");
49 if gitignore_path.exists() {
50 if let Ok(content) = std::fs::read_to_string(&gitignore_path) {
51 if !content.contains(".hematite") {
52 let mut new_content = content;
53 if !new_content.ends_with('\n') {
54 new_content.push('\n');
55 }
56 new_content.push_str(".hematite/\n");
57 let _ = std::fs::write(&gitignore_path, new_content);
58 }
59 }
60 }
61
62 if !hematite_dir.exists() {
63 let _ = std::fs::create_dir_all(&hematite_dir);
64 }
65 if !scratch_dir.exists() {
66 let _ = std::fs::create_dir_all(&scratch_dir);
67 }
68
69 Self {
70 engine,
71 scratch_dir,
72 worker_model,
73 gpu_state,
74 professional,
75 }
76 }
77
78 pub async fn dispatch_swarm(
80 &self,
81 tasks: Vec<WorkerTask>,
82 progression_tx: tokio::sync::mpsc::Sender<SwarmMessage>,
83 max_workers: usize,
84 ) -> Result<(), String> {
85 let mut join_set = JoinSet::new();
86
87 let vram_usage = self.gpu_state.ratio();
90 let is_sequential = vram_usage > 0.85;
91
92 if is_sequential {
93 let _ = progression_tx
94 .send(SwarmMessage::Progress("CPU/GPU GUARD".to_string(), 0))
95 .await;
96 let _ = progression_tx
97 .send(SwarmMessage::Progress(
98 "LOW VRAM: Switching to Sequential Mode".to_string(),
99 1,
100 ))
101 .await;
102 }
103
104 for task in tasks.into_iter().take(max_workers) {
105 let engine_clone = self.engine.clone();
106 let tx_clone = progression_tx.clone();
107 let scratch_path = self.scratch_dir.join(format!("worker_{}.diff", task.id));
108 let worker_job = async move {
109 let _ = tx_clone
111 .send(SwarmMessage::Progress(task.id.clone(), 25))
112 .await;
113
114 let prompt = format!(
116 "TARGET: {}\nDIRECTIVE: {}\n\n[HEMATITE SYNTHESIS BAN]\nYou are explicitly forbidden from lazy delegation (e.g. saying 'based on worker findings'). You MUST execute a Synthesis Pass dynamically: 1) Read the actual findings. 2) Specify the concrete integration logic yourself. 3) Output code directly targeting the exact bounds.",
117 task.target, task.instruction
118 );
119
120 if let Ok(res) = engine_clone.generate_task_worker(&prompt, true).await {
122 let _ = tx_clone
123 .send(SwarmMessage::Progress(task.id.clone(), 75))
124 .await;
125
126 let _ = std::fs::write(&scratch_path, res.clone());
128 let _ = tx_clone
129 .send(SwarmMessage::Progress(task.id.clone(), 100))
130 .await;
131
132 let target_path = PathBuf::from(task.target.clone());
134 let before = if target_path.is_file() {
135 std::fs::read_to_string(&target_path)
136 .unwrap_or_else(|_| "[Error reading context]".to_string())
137 } else {
138 format!("[SYNERGY: Exploring {}]", task.target)
139 };
140
141 let (res_tx, res_rx) = oneshot::channel();
142 let _ = tx_clone
143 .send(SwarmMessage::ReviewRequest {
144 worker_id: task.id.clone(),
145 file_path: target_path.clone(),
146 before,
147 after: res.clone(),
148 tx: res_tx,
149 })
150 .await;
151
152 let _ = res_rx.await;
154 }
155 };
156
157 if is_sequential {
158 worker_job.await;
159 } else {
160 join_set.spawn(worker_job);
161 }
162 }
163
164 while let Some(_) = join_set.join_next().await {
166 }
168
169 let _ = progression_tx.send(SwarmMessage::Done).await;
170 Ok(())
171 }
172
173 #[allow(dead_code)]
175 pub async fn apply_patches_descending(
176 &self,
177 file_path: &Path,
178 mut hunks: Vec<Hunk>,
179 progression_tx: tokio::sync::mpsc::Sender<SwarmMessage>,
180 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
181 let mut lines: Vec<String> = fs::read_to_string(file_path)?
182 .lines()
183 .map(|s| s.to_string())
184 .collect();
185
186 hunks.sort_by_key(|h| h.sort_key());
188
189 let mut i = 0;
190 while i < hunks.len() {
191 let current = &hunks[i];
192
193 if i + 1 < hunks.len() && hunks[i + 1].end_line >= current.start_line {
195 let mut retry_count = 0u32;
197 const MAX_CONFLICT_RETRIES: u32 = 3;
198 loop {
199 if retry_count >= MAX_CONFLICT_RETRIES {
200 i += 2;
202 break;
203 }
204 let padding: usize = 10 + (retry_count as usize * 10);
206 let conflict_start = hunks[i + 1].start_line.saturating_sub(padding);
207 let conflict_end = (current.end_line + padding).min(lines.len());
208 let context = lines[conflict_start..conflict_end].join("\n");
209
210 let prompt = if retry_count == 0 {
211 format!("CONFLICT in {}.\nContext:\n{}\n\nWorker {} wants: {}\nWorker {} wants: {}\nResolve these into one block.",
212 file_path.display(), context, current.worker_id, current.content, hunks[i+1].worker_id, hunks[i+1].content)
213 } else {
214 format!("CRITICAL: Your previous synthesis for this conflict was REJECTED by the human architect.\nThe merge you proposed was logically unsound.\nDO NOT REPEAT MISTAKES.\n\nCONFLICT in {}.\nContext:\n{}\n\nWorker {} wants: {}\nWorker {} wants: {}\nResolve these into one robust logical block.",
215 file_path.display(), context, current.worker_id, current.content, hunks[i+1].worker_id, hunks[i+1].content)
216 };
217
218 let temp = if retry_count > 0 { 0.7 } else { 0.1 };
220
221 let resolved_block = self
222 .engine
223 .generate_task_with_temp(&prompt, temp, true)
224 .await
225 .map_err(|e| -> Box<dyn std::error::Error + Send + Sync> {
226 Box::from(e)
227 })?;
228
229 let (response_tx, response_rx) = oneshot::channel();
231 let _ = progression_tx
232 .send(SwarmMessage::ReviewRequest {
233 worker_id: current.worker_id.clone(),
234 file_path: file_path.to_path_buf(),
235 before: context.clone(),
236 after: resolved_block.clone(),
237 tx: response_tx,
238 })
239 .await;
240
241 match response_rx.await.unwrap_or(ReviewResponse::Reject) {
243 ReviewResponse::Accept => {
244 lines.splice(conflict_start..conflict_end, vec![resolved_block]);
245 i += 2;
246 break;
247 }
248 ReviewResponse::Retry => {
249 retry_count += 1;
250 continue; }
252 ReviewResponse::Reject => {
253 i += 2; break;
255 }
256 }
257 }
258 } else {
259 let start_idx = current.start_line.saturating_sub(1);
261 let end_idx = current.end_line.min(lines.len());
262 let range = start_idx..end_idx;
263 lines.splice(range, vec![current.content.clone()]);
264 i += 1;
265 }
266 }
267
268 fs::write(file_path, lines.join("\n"))?;
269 Ok(())
270 }
271}
272
273impl Drop for SwarmCoordinator {
274 fn drop(&mut self) {
275 if self.scratch_dir.exists() {
278 if let Ok(entries) = std::fs::read_dir(&self.scratch_dir) {
279 for entry in entries.flatten() {
280 let p = entry.path();
281 if p.is_file() {
282 let _ = std::fs::remove_file(p);
283 }
284 }
285 }
286 }
287 eprintln!("[Hematite] Swarm shutdown complete. Scratchpad wiped.");
288 }
289}