use std::fs;
use std::io::{BufRead, BufReader};
use std::path::Path;
use std::process::Command;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use crate::environment::EnvironmentBox;
use crate::state::SwarmState;
use crate::types::{Action, ActionResult, WorkerId};
use super::escalation::EscalationReason;
use super::worker::{
Guidance, GuidanceContext, Issue, Priority, RelevantState, WorkResult, WorkerAgent,
WorkerStateDelta,
};
pub fn run_bash(command: &str, working_dir: Option<&str>) -> ActionResult {
let start = Instant::now();
let mut cmd = Command::new("sh");
cmd.arg("-c").arg(command);
if let Some(dir) = working_dir {
cmd.current_dir(dir);
}
match cmd.output() {
Ok(output) => {
let stdout = String::from_utf8_lossy(&output.stdout).to_string();
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
if output.status.success() {
ActionResult::success(stdout, start.elapsed())
} else {
ActionResult::failure(
format!("Exit code: {:?}\nstderr: {}", output.status.code(), stderr),
start.elapsed(),
)
}
}
Err(e) => ActionResult::failure(format!("Failed to execute: {}", e), start.elapsed()),
}
}
pub fn run_read(path: &str) -> ActionResult {
let start = Instant::now();
match fs::read_to_string(path) {
Ok(content) => ActionResult::success(content, start.elapsed()),
Err(e) => ActionResult::failure(format!("Failed to read {}: {}", path, e), start.elapsed()),
}
}
pub fn run_write(path: &str, content: &str) -> ActionResult {
let start = Instant::now();
if let Some(parent) = Path::new(path).parent() {
if !parent.exists() {
if let Err(e) = fs::create_dir_all(parent) {
return ActionResult::failure(
format!("Failed to create directory: {}", e),
start.elapsed(),
);
}
}
}
match fs::write(path, content) {
Ok(()) => ActionResult::success(format!("Written to {}", path), start.elapsed()),
Err(e) => {
ActionResult::failure(format!("Failed to write {}: {}", path, e), start.elapsed())
}
}
}
pub fn run_grep(pattern: &str, path: &str) -> ActionResult {
let start = Instant::now();
let file = match fs::File::open(path) {
Ok(f) => f,
Err(e) => {
return ActionResult::failure(
format!("Failed to open {}: {}", path, e),
start.elapsed(),
)
}
};
let reader = BufReader::new(file);
let mut matches = Vec::new();
for (line_num, line) in reader.lines().enumerate() {
if let Ok(line) = line {
if line.contains(pattern) {
matches.push(format!("{}:{}", line_num + 1, line));
}
}
}
ActionResult::success(matches.join("\n"), start.elapsed())
}
pub fn execute_action(action: &Action, working_dir: Option<&str>) -> ActionResult {
match action.name.as_str() {
"Bash" => {
let command = action.params.target.as_deref().unwrap_or("");
run_bash(command, working_dir)
}
"Read" => {
let path = action.params.target.as_deref().unwrap_or("");
run_read(path)
}
"Write" => {
let path = action.params.target.as_deref().unwrap_or("");
let content = action
.params
.args
.get("content")
.map(|s| s.as_str())
.unwrap_or("");
run_write(path, content)
}
"Grep" => {
let pattern = action
.params
.args
.get("pattern")
.map(|s| s.as_str())
.unwrap_or("");
let path = action.params.target.as_deref().unwrap_or(".");
run_grep(pattern, path)
}
_ => ActionResult::failure(
format!("Unsupported action: {}", action.name),
Duration::ZERO,
),
}
}
pub struct GenericWorker {
id: WorkerId,
name: String,
working_dir: Option<String>,
escalation_threshold: u32,
consecutive_failures: AtomicUsize,
require_guidance: bool,
}
impl GenericWorker {
pub fn new(id: usize) -> Self {
Self {
id: WorkerId(id),
name: format!("generic_{}", id),
working_dir: None,
escalation_threshold: 0,
consecutive_failures: AtomicUsize::new(0),
require_guidance: false,
}
}
pub fn with_name(mut self, name: impl Into<String>) -> Self {
self.name = name.into();
self
}
pub fn with_working_dir(mut self, dir: impl Into<String>) -> Self {
self.working_dir = Some(dir.into());
self
}
pub fn with_escalation_threshold(mut self, threshold: u32) -> Self {
self.escalation_threshold = threshold;
self
}
pub fn with_require_guidance(mut self, required: bool) -> Self {
self.require_guidance = required;
self
}
fn execute_with_environment(&self, state: &SwarmState, action: &Action) -> Option<WorkResult> {
state
.shared
.extensions
.get::<EnvironmentBox>()
.map(|env| env.step(self.id, action))
}
}
impl WorkerAgent for GenericWorker {
fn think_and_act(&self, state: &SwarmState, guidance: Option<&Guidance>) -> WorkResult {
let Some(guidance) = guidance else {
if self.require_guidance {
return WorkResult::NeedsGuidance {
reason: "No guidance received".to_string(),
context: GuidanceContext {
issue: Issue {
description: "Worker requires guidance to proceed".to_string(),
severity: Priority::Normal,
},
options: vec![],
relevant_state: RelevantState::default(),
},
};
}
return WorkResult::Idle;
};
let Some(action) = guidance.actions.first() else {
return WorkResult::Idle;
};
if let Some(work_result) = self.execute_with_environment(state, action) {
let is_failure = match &work_result {
WorkResult::Acted { action_result, .. } => !action_result.success,
WorkResult::Done { success, .. } => !success,
_ => false,
};
if is_failure {
let failures = self.consecutive_failures.fetch_add(1, Ordering::SeqCst) + 1;
if self.escalation_threshold > 0 && failures >= self.escalation_threshold as usize {
self.consecutive_failures.store(0, Ordering::SeqCst);
return WorkResult::Escalate {
reason: EscalationReason::ConsecutiveFailures(failures as u32),
context: Some(format!(
"Action '{}' failed {} times",
action.name, failures
)),
};
}
} else {
self.consecutive_failures.store(0, Ordering::SeqCst);
}
return work_result;
}
let result = execute_action(action, self.working_dir.as_deref());
if !result.success {
let failures = self.consecutive_failures.fetch_add(1, Ordering::SeqCst) + 1;
if self.escalation_threshold > 0 && failures >= self.escalation_threshold as usize {
self.consecutive_failures.store(0, Ordering::SeqCst);
return WorkResult::Escalate {
reason: EscalationReason::ConsecutiveFailures(failures as u32),
context: Some(format!(
"Action '{}' failed {} times",
action.name, failures
)),
};
}
return WorkResult::acted(result);
}
self.consecutive_failures.store(0, Ordering::SeqCst);
let delta = WorkerStateDelta::new().with_cache(
format!("{}:last", self.name),
format!("tick:{},action:{}", state.shared.tick, action.name).into_bytes(),
100,
);
WorkResult::acted_with_delta(result, delta)
}
fn id(&self) -> WorkerId {
self.id
}
fn name(&self) -> &str {
&self.name
}
}
pub struct ExtensionAwareWorker {
id: WorkerId,
name: String,
}
impl ExtensionAwareWorker {
pub fn new(id: usize) -> Self {
Self {
id: WorkerId(id),
name: format!("extension_worker_{}", id),
}
}
pub fn with_name(mut self, name: impl Into<String>) -> Self {
self.name = name.into();
self
}
}
impl WorkerAgent for ExtensionAwareWorker {
fn think_and_act(&self, state: &SwarmState, guidance: Option<&Guidance>) -> WorkResult {
use crate::types::ActionResult;
if let Some(counter) = state.shared.extensions.get::<AtomicUsize>() {
let old = counter.fetch_add(1, Ordering::SeqCst);
let mut delta = WorkerStateDelta::new();
delta = delta.with_shared(
format!("extension_worker:{}:count", self.id.0),
format!("{}", old + 1).into_bytes(),
);
if let Some(g) = guidance {
if let Some(content) = &g.content {
delta = delta.with_shared(
format!("extension_worker:{}:guidance", self.id.0),
content.as_bytes().to_vec(),
);
}
}
return WorkResult::acted_with_delta(
ActionResult::success(format!("counter: {}", old + 1), Duration::from_millis(1)),
delta,
);
}
WorkResult::NeedsGuidance {
reason: "No shared counter extension found".to_string(),
context: GuidanceContext {
issue: Issue {
description: "Extension 'AtomicUsize' is not registered".to_string(),
severity: Priority::High,
},
options: vec![super::worker::ProposedOption {
description: "Register AtomicUsize extension".to_string(),
pros: vec!["Enables shared counter functionality".to_string()],
cons: vec![],
}],
relevant_state: RelevantState::default(),
},
}
}
fn id(&self) -> WorkerId {
self.id
}
fn name(&self) -> &str {
&self.name
}
}
pub struct ProgressWorker {
id: WorkerId,
name: String,
total_ticks: u32,
current_tick: AtomicUsize,
}
impl ProgressWorker {
pub fn new(id: usize, total_ticks: u32) -> Self {
Self {
id: WorkerId(id),
name: format!("progress_worker_{}", id),
total_ticks,
current_tick: AtomicUsize::new(0),
}
}
pub fn with_name(mut self, name: impl Into<String>) -> Self {
self.name = name.into();
self
}
}
impl WorkerAgent for ProgressWorker {
fn think_and_act(&self, _state: &SwarmState, _guidance: Option<&Guidance>) -> WorkResult {
use crate::types::ActionResult;
let current = self.current_tick.fetch_add(1, Ordering::SeqCst) + 1;
let progress = current as f32 / self.total_ticks as f32;
if current >= self.total_ticks as usize {
let mut delta = WorkerStateDelta::new();
delta = delta.with_shared(
format!("progress_worker:{}:status", self.id.0),
b"completed".to_vec(),
);
WorkResult::acted_with_delta(
ActionResult::success("completed", Duration::from_millis(1)),
delta,
)
} else {
WorkResult::Continuing { progress }
}
}
fn id(&self) -> WorkerId {
self.id
}
fn name(&self) -> &str {
&self.name
}
}
#[cfg(test)]
mod tests {
use super::*;
fn get_output_string(result: &ActionResult) -> String {
result
.output
.as_ref()
.map(|o| o.as_text())
.unwrap_or_default()
}
#[test]
fn test_run_bash_echo() {
let result = run_bash("echo hello", None);
assert!(result.success);
assert!(get_output_string(&result).contains("hello"));
}
#[test]
fn test_run_bash_failure() {
let result = run_bash("exit 1", None);
assert!(!result.success);
}
#[test]
fn test_unsupported_action() {
let action = Action {
name: "Unknown".to_string(),
params: Default::default(),
};
let result = execute_action(&action, None);
assert!(!result.success);
assert!(result
.error
.as_ref()
.is_some_and(|e| e.contains("Unsupported")));
}
}