1use std::fs;
7use std::io::{BufRead, BufReader};
8use std::path::Path;
9use std::process::Command;
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::time::{Duration, Instant};
12
13use crate::environment::EnvironmentBox;
14use crate::state::SwarmState;
15use crate::types::{Action, ActionResult, WorkerId};
16
17use super::escalation::EscalationReason;
18use super::worker::{
19 Guidance, GuidanceContext, Issue, Priority, RelevantState, WorkResult, WorkerAgent,
20 WorkerStateDelta,
21};
22
23pub fn run_bash(command: &str, working_dir: Option<&str>) -> ActionResult {
29 let start = Instant::now();
30
31 let mut cmd = Command::new("sh");
32 cmd.arg("-c").arg(command);
33
34 if let Some(dir) = working_dir {
35 cmd.current_dir(dir);
36 }
37
38 match cmd.output() {
39 Ok(output) => {
40 let stdout = String::from_utf8_lossy(&output.stdout).to_string();
41 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
42
43 if output.status.success() {
44 ActionResult::success(stdout, start.elapsed())
45 } else {
46 ActionResult::failure(
47 format!("Exit code: {:?}\nstderr: {}", output.status.code(), stderr),
48 start.elapsed(),
49 )
50 }
51 }
52 Err(e) => ActionResult::failure(format!("Failed to execute: {}", e), start.elapsed()),
53 }
54}
55
56pub fn run_read(path: &str) -> ActionResult {
58 let start = Instant::now();
59 match fs::read_to_string(path) {
60 Ok(content) => ActionResult::success(content, start.elapsed()),
61 Err(e) => ActionResult::failure(format!("Failed to read {}: {}", path, e), start.elapsed()),
62 }
63}
64
65pub fn run_write(path: &str, content: &str) -> ActionResult {
67 let start = Instant::now();
68
69 if let Some(parent) = Path::new(path).parent() {
71 if !parent.exists() {
72 if let Err(e) = fs::create_dir_all(parent) {
73 return ActionResult::failure(
74 format!("Failed to create directory: {}", e),
75 start.elapsed(),
76 );
77 }
78 }
79 }
80
81 match fs::write(path, content) {
82 Ok(()) => ActionResult::success(format!("Written to {}", path), start.elapsed()),
83 Err(e) => {
84 ActionResult::failure(format!("Failed to write {}: {}", path, e), start.elapsed())
85 }
86 }
87}
88
89pub fn run_grep(pattern: &str, path: &str) -> ActionResult {
91 let start = Instant::now();
92
93 let file = match fs::File::open(path) {
94 Ok(f) => f,
95 Err(e) => {
96 return ActionResult::failure(
97 format!("Failed to open {}: {}", path, e),
98 start.elapsed(),
99 )
100 }
101 };
102
103 let reader = BufReader::new(file);
104 let mut matches = Vec::new();
105
106 for (line_num, line) in reader.lines().enumerate() {
107 if let Ok(line) = line {
108 if line.contains(pattern) {
109 matches.push(format!("{}:{}", line_num + 1, line));
110 }
111 }
112 }
113
114 ActionResult::success(matches.join("\n"), start.elapsed())
115}
116
117pub fn execute_action(action: &Action, working_dir: Option<&str>) -> ActionResult {
119 match action.name.as_str() {
120 "Bash" => {
121 let command = action.params.target.as_deref().unwrap_or("");
122 run_bash(command, working_dir)
123 }
124 "Read" => {
125 let path = action.params.target.as_deref().unwrap_or("");
126 run_read(path)
127 }
128 "Write" => {
129 let path = action.params.target.as_deref().unwrap_or("");
130 let content = action
131 .params
132 .args
133 .get("content")
134 .map(|s| s.as_str())
135 .unwrap_or("");
136 run_write(path, content)
137 }
138 "Grep" => {
139 let pattern = action
140 .params
141 .args
142 .get("pattern")
143 .map(|s| s.as_str())
144 .unwrap_or("");
145 let path = action.params.target.as_deref().unwrap_or(".");
146 run_grep(pattern, path)
147 }
148 _ => ActionResult::failure(
149 format!("Unsupported action: {}", action.name),
150 Duration::ZERO,
151 ),
152 }
153}
154
155pub struct GenericWorker {
175 id: WorkerId,
176 name: String,
177 working_dir: Option<String>,
179 escalation_threshold: u32,
181 consecutive_failures: AtomicUsize,
183 require_guidance: bool,
185}
186
187impl GenericWorker {
188 pub fn new(id: usize) -> Self {
189 Self {
190 id: WorkerId(id),
191 name: format!("generic_{}", id),
192 working_dir: None,
193 escalation_threshold: 0,
194 consecutive_failures: AtomicUsize::new(0),
195 require_guidance: false,
196 }
197 }
198
199 pub fn with_name(mut self, name: impl Into<String>) -> Self {
200 self.name = name.into();
201 self
202 }
203
204 pub fn with_working_dir(mut self, dir: impl Into<String>) -> Self {
205 self.working_dir = Some(dir.into());
206 self
207 }
208
209 pub fn with_escalation_threshold(mut self, threshold: u32) -> Self {
210 self.escalation_threshold = threshold;
211 self
212 }
213
214 pub fn with_require_guidance(mut self, required: bool) -> Self {
215 self.require_guidance = required;
216 self
217 }
218
219 fn execute_with_environment(&self, state: &SwarmState, action: &Action) -> Option<WorkResult> {
223 state
224 .shared
225 .extensions
226 .get::<EnvironmentBox>()
227 .map(|env| env.step(self.id, action))
228 }
229}
230
231impl WorkerAgent for GenericWorker {
232 fn think_and_act(&self, state: &SwarmState, guidance: Option<&Guidance>) -> WorkResult {
233 let Some(guidance) = guidance else {
235 if self.require_guidance {
236 return WorkResult::NeedsGuidance {
237 reason: "No guidance received".to_string(),
238 context: GuidanceContext {
239 issue: Issue {
240 description: "Worker requires guidance to proceed".to_string(),
241 severity: Priority::Normal,
242 },
243 options: vec![],
244 relevant_state: RelevantState::default(),
245 },
246 };
247 }
248 return WorkResult::Idle;
249 };
250
251 let Some(action) = guidance.actions.first() else {
253 return WorkResult::Idle;
254 };
255
256 if let Some(work_result) = self.execute_with_environment(state, action) {
258 let is_failure = match &work_result {
260 WorkResult::Acted { action_result, .. } => !action_result.success,
261 WorkResult::Done { success, .. } => !success,
262 _ => false,
263 };
264
265 if is_failure {
266 let failures = self.consecutive_failures.fetch_add(1, Ordering::SeqCst) + 1;
267
268 if self.escalation_threshold > 0 && failures >= self.escalation_threshold as usize {
269 self.consecutive_failures.store(0, Ordering::SeqCst);
270 return WorkResult::Escalate {
271 reason: EscalationReason::ConsecutiveFailures(failures as u32),
272 context: Some(format!(
273 "Action '{}' failed {} times",
274 action.name, failures
275 )),
276 };
277 }
278 } else {
279 self.consecutive_failures.store(0, Ordering::SeqCst);
280 }
281
282 return work_result;
284 }
285
286 let result = execute_action(action, self.working_dir.as_deref());
288
289 if !result.success {
291 let failures = self.consecutive_failures.fetch_add(1, Ordering::SeqCst) + 1;
292
293 if self.escalation_threshold > 0 && failures >= self.escalation_threshold as usize {
294 self.consecutive_failures.store(0, Ordering::SeqCst);
295 return WorkResult::Escalate {
296 reason: EscalationReason::ConsecutiveFailures(failures as u32),
297 context: Some(format!(
298 "Action '{}' failed {} times",
299 action.name, failures
300 )),
301 };
302 }
303
304 return WorkResult::acted(result);
305 }
306
307 self.consecutive_failures.store(0, Ordering::SeqCst);
309
310 let delta = WorkerStateDelta::new().with_cache(
311 format!("{}:last", self.name),
312 format!("tick:{},action:{}", state.shared.tick, action.name).into_bytes(),
313 100,
314 );
315
316 WorkResult::acted_with_delta(result, delta)
317 }
318
319 fn id(&self) -> WorkerId {
320 self.id
321 }
322
323 fn name(&self) -> &str {
324 &self.name
325 }
326}
327
328pub struct ExtensionAwareWorker {
351 id: WorkerId,
352 name: String,
353}
354
355impl ExtensionAwareWorker {
356 pub fn new(id: usize) -> Self {
357 Self {
358 id: WorkerId(id),
359 name: format!("extension_worker_{}", id),
360 }
361 }
362
363 pub fn with_name(mut self, name: impl Into<String>) -> Self {
364 self.name = name.into();
365 self
366 }
367}
368
369impl WorkerAgent for ExtensionAwareWorker {
370 fn think_and_act(&self, state: &SwarmState, guidance: Option<&Guidance>) -> WorkResult {
371 use crate::types::ActionResult;
372
373 if let Some(counter) = state.shared.extensions.get::<AtomicUsize>() {
375 let old = counter.fetch_add(1, Ordering::SeqCst);
376
377 let mut delta = WorkerStateDelta::new();
379 delta = delta.with_shared(
380 format!("extension_worker:{}:count", self.id.0),
381 format!("{}", old + 1).into_bytes(),
382 );
383
384 if let Some(g) = guidance {
386 if let Some(content) = &g.content {
387 delta = delta.with_shared(
388 format!("extension_worker:{}:guidance", self.id.0),
389 content.as_bytes().to_vec(),
390 );
391 }
392 }
393
394 return WorkResult::acted_with_delta(
395 ActionResult::success(format!("counter: {}", old + 1), Duration::from_millis(1)),
396 delta,
397 );
398 }
399
400 WorkResult::NeedsGuidance {
402 reason: "No shared counter extension found".to_string(),
403 context: GuidanceContext {
404 issue: Issue {
405 description: "Extension 'AtomicUsize' is not registered".to_string(),
406 severity: Priority::High,
407 },
408 options: vec![super::worker::ProposedOption {
409 description: "Register AtomicUsize extension".to_string(),
410 pros: vec!["Enables shared counter functionality".to_string()],
411 cons: vec![],
412 }],
413 relevant_state: RelevantState::default(),
414 },
415 }
416 }
417
418 fn id(&self) -> WorkerId {
419 self.id
420 }
421
422 fn name(&self) -> &str {
423 &self.name
424 }
425}
426
427pub struct ProgressWorker {
436 id: WorkerId,
437 name: String,
438 total_ticks: u32,
440 current_tick: AtomicUsize,
442}
443
444impl ProgressWorker {
445 pub fn new(id: usize, total_ticks: u32) -> Self {
446 Self {
447 id: WorkerId(id),
448 name: format!("progress_worker_{}", id),
449 total_ticks,
450 current_tick: AtomicUsize::new(0),
451 }
452 }
453
454 pub fn with_name(mut self, name: impl Into<String>) -> Self {
455 self.name = name.into();
456 self
457 }
458}
459
460impl WorkerAgent for ProgressWorker {
461 fn think_and_act(&self, _state: &SwarmState, _guidance: Option<&Guidance>) -> WorkResult {
462 use crate::types::ActionResult;
463
464 let current = self.current_tick.fetch_add(1, Ordering::SeqCst) + 1;
465 let progress = current as f32 / self.total_ticks as f32;
466
467 if current >= self.total_ticks as usize {
468 let mut delta = WorkerStateDelta::new();
470 delta = delta.with_shared(
471 format!("progress_worker:{}:status", self.id.0),
472 b"completed".to_vec(),
473 );
474
475 WorkResult::acted_with_delta(
476 ActionResult::success("completed", Duration::from_millis(1)),
477 delta,
478 )
479 } else {
480 WorkResult::Continuing { progress }
482 }
483 }
484
485 fn id(&self) -> WorkerId {
486 self.id
487 }
488
489 fn name(&self) -> &str {
490 &self.name
491 }
492}
493
494#[cfg(test)]
499mod tests {
500 use super::*;
501
502 fn get_output_string(result: &ActionResult) -> String {
503 result
504 .output
505 .as_ref()
506 .map(|o| o.as_text())
507 .unwrap_or_default()
508 }
509
510 #[test]
511 fn test_run_bash_echo() {
512 let result = run_bash("echo hello", None);
513 assert!(result.success);
514 assert!(get_output_string(&result).contains("hello"));
515 }
516
517 #[test]
518 fn test_run_bash_failure() {
519 let result = run_bash("exit 1", None);
520 assert!(!result.success);
521 }
522
523 #[test]
524 fn test_unsupported_action() {
525 let action = Action {
526 name: "Unknown".to_string(),
527 params: Default::default(),
528 };
529 let result = execute_action(&action, None);
530 assert!(!result.success);
531 assert!(result
532 .error
533 .as_ref()
534 .is_some_and(|e| e.contains("Unsupported")));
535 }
536}