Skip to main content

swarm_engine_core/agent/
worker_impl.rs

1//! Generic Worker - シェルコマンドを実行する汎用 Worker
2//!
3//! Manager からの Guidance に含まれる Action を直接実行する。
4//! Environment が Extensions に設定されていれば、そちら経由で実行する。
5
6use std::fs;
7use std::io::{BufRead, BufReader};
8use std::path::Path;
9use std::process::Command;
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::time::{Duration, Instant};
12
13use crate::environment::EnvironmentBox;
14use crate::state::SwarmState;
15use crate::types::{Action, ActionResult, WorkerId};
16
17use super::escalation::EscalationReason;
18use super::worker::{
19    Guidance, GuidanceContext, Issue, Priority, RelevantState, WorkResult, WorkerAgent,
20    WorkerStateDelta,
21};
22
23// ============================================================================
24// Shell Helpers(シンプルな実行関数)
25// ============================================================================
26
27/// シェルコマンドを実行
28pub fn run_bash(command: &str, working_dir: Option<&str>) -> ActionResult {
29    let start = Instant::now();
30
31    let mut cmd = Command::new("sh");
32    cmd.arg("-c").arg(command);
33
34    if let Some(dir) = working_dir {
35        cmd.current_dir(dir);
36    }
37
38    match cmd.output() {
39        Ok(output) => {
40            let stdout = String::from_utf8_lossy(&output.stdout).to_string();
41            let stderr = String::from_utf8_lossy(&output.stderr).to_string();
42
43            if output.status.success() {
44                ActionResult::success(stdout, start.elapsed())
45            } else {
46                ActionResult::failure(
47                    format!("Exit code: {:?}\nstderr: {}", output.status.code(), stderr),
48                    start.elapsed(),
49                )
50            }
51        }
52        Err(e) => ActionResult::failure(format!("Failed to execute: {}", e), start.elapsed()),
53    }
54}
55
56/// ファイルを読み込む
57pub fn run_read(path: &str) -> ActionResult {
58    let start = Instant::now();
59    match fs::read_to_string(path) {
60        Ok(content) => ActionResult::success(content, start.elapsed()),
61        Err(e) => ActionResult::failure(format!("Failed to read {}: {}", path, e), start.elapsed()),
62    }
63}
64
65/// ファイルに書き込む
66pub fn run_write(path: &str, content: &str) -> ActionResult {
67    let start = Instant::now();
68
69    // 親ディレクトリを作成
70    if let Some(parent) = Path::new(path).parent() {
71        if !parent.exists() {
72            if let Err(e) = fs::create_dir_all(parent) {
73                return ActionResult::failure(
74                    format!("Failed to create directory: {}", e),
75                    start.elapsed(),
76                );
77            }
78        }
79    }
80
81    match fs::write(path, content) {
82        Ok(()) => ActionResult::success(format!("Written to {}", path), start.elapsed()),
83        Err(e) => {
84            ActionResult::failure(format!("Failed to write {}: {}", path, e), start.elapsed())
85        }
86    }
87}
88
89/// パターン検索
90pub fn run_grep(pattern: &str, path: &str) -> ActionResult {
91    let start = Instant::now();
92
93    let file = match fs::File::open(path) {
94        Ok(f) => f,
95        Err(e) => {
96            return ActionResult::failure(
97                format!("Failed to open {}: {}", path, e),
98                start.elapsed(),
99            )
100        }
101    };
102
103    let reader = BufReader::new(file);
104    let mut matches = Vec::new();
105
106    for (line_num, line) in reader.lines().enumerate() {
107        if let Ok(line) = line {
108            if line.contains(pattern) {
109                matches.push(format!("{}:{}", line_num + 1, line));
110            }
111        }
112    }
113
114    ActionResult::success(matches.join("\n"), start.elapsed())
115}
116
117/// Action を実行(Bash/Read/Write/Grep をサポート)
118pub fn execute_action(action: &Action, working_dir: Option<&str>) -> ActionResult {
119    match action.name.as_str() {
120        "Bash" => {
121            let command = action.params.target.as_deref().unwrap_or("");
122            run_bash(command, working_dir)
123        }
124        "Read" => {
125            let path = action.params.target.as_deref().unwrap_or("");
126            run_read(path)
127        }
128        "Write" => {
129            let path = action.params.target.as_deref().unwrap_or("");
130            let content = action
131                .params
132                .args
133                .get("content")
134                .map(|s| s.as_str())
135                .unwrap_or("");
136            run_write(path, content)
137        }
138        "Grep" => {
139            let pattern = action
140                .params
141                .args
142                .get("pattern")
143                .map(|s| s.as_str())
144                .unwrap_or("");
145            let path = action.params.target.as_deref().unwrap_or(".");
146            run_grep(pattern, path)
147        }
148        _ => ActionResult::failure(
149            format!("Unsupported action: {}", action.name),
150            Duration::ZERO,
151        ),
152    }
153}
154
155// ============================================================================
156// GenericWorker
157// ============================================================================
158
159/// 汎用 Worker - Guidance の Action を直接実行
160///
161/// # サポートする Action
162/// - `Bash`: シェルコマンド実行
163/// - `Read`: ファイル読み込み
164/// - `Write`: ファイル書き込み
165/// - `Grep`: パターン検索
166///
167/// # 使用例
168///
169/// ```ignore
170/// let worker = GenericWorker::new(0)
171///     .with_working_dir("/path/to/project")
172///     .with_escalation_threshold(3);
173/// ```
174pub struct GenericWorker {
175    id: WorkerId,
176    name: String,
177    /// 作業ディレクトリ
178    working_dir: Option<String>,
179    /// Escalation 閾値(連続失敗回数、0=無効)
180    escalation_threshold: u32,
181    /// 連続失敗カウント
182    consecutive_failures: AtomicUsize,
183    /// Guidance 必須モード
184    require_guidance: bool,
185}
186
187impl GenericWorker {
188    pub fn new(id: usize) -> Self {
189        Self {
190            id: WorkerId(id),
191            name: format!("generic_{}", id),
192            working_dir: None,
193            escalation_threshold: 0,
194            consecutive_failures: AtomicUsize::new(0),
195            require_guidance: false,
196        }
197    }
198
199    pub fn with_name(mut self, name: impl Into<String>) -> Self {
200        self.name = name.into();
201        self
202    }
203
204    pub fn with_working_dir(mut self, dir: impl Into<String>) -> Self {
205        self.working_dir = Some(dir.into());
206        self
207    }
208
209    pub fn with_escalation_threshold(mut self, threshold: u32) -> Self {
210        self.escalation_threshold = threshold;
211        self
212    }
213
214    pub fn with_require_guidance(mut self, required: bool) -> Self {
215        self.require_guidance = required;
216        self
217    }
218
219    /// Environment 経由でアクションを実行
220    ///
221    /// Extensions に Environment があれば WorkResult を返す、なければ None
222    fn execute_with_environment(&self, state: &SwarmState, action: &Action) -> Option<WorkResult> {
223        state
224            .shared
225            .extensions
226            .get::<EnvironmentBox>()
227            .map(|env| env.step(self.id, action))
228    }
229}
230
231impl WorkerAgent for GenericWorker {
232    fn think_and_act(&self, state: &SwarmState, guidance: Option<&Guidance>) -> WorkResult {
233        // Guidance がない場合
234        let Some(guidance) = guidance else {
235            if self.require_guidance {
236                return WorkResult::NeedsGuidance {
237                    reason: "No guidance received".to_string(),
238                    context: GuidanceContext {
239                        issue: Issue {
240                            description: "Worker requires guidance to proceed".to_string(),
241                            severity: Priority::Normal,
242                        },
243                        options: vec![],
244                        relevant_state: RelevantState::default(),
245                    },
246                };
247            }
248            return WorkResult::Idle;
249        };
250
251        // Action がない場合
252        let Some(action) = guidance.actions.first() else {
253            return WorkResult::Idle;
254        };
255
256        // Environment 経由で実行を試みる
257        if let Some(work_result) = self.execute_with_environment(state, action) {
258            // 失敗時の Escalation チェック
259            let is_failure = match &work_result {
260                WorkResult::Acted { action_result, .. } => !action_result.success,
261                WorkResult::Done { success, .. } => !success,
262                _ => false,
263            };
264
265            if is_failure {
266                let failures = self.consecutive_failures.fetch_add(1, Ordering::SeqCst) + 1;
267
268                if self.escalation_threshold > 0 && failures >= self.escalation_threshold as usize {
269                    self.consecutive_failures.store(0, Ordering::SeqCst);
270                    return WorkResult::Escalate {
271                        reason: EscalationReason::ConsecutiveFailures(failures as u32),
272                        context: Some(format!(
273                            "Action '{}' failed {} times",
274                            action.name, failures
275                        )),
276                    };
277                }
278            } else {
279                self.consecutive_failures.store(0, Ordering::SeqCst);
280            }
281
282            // Environment が返した WorkResult をそのまま返す
283            return work_result;
284        }
285
286        // Environment がない場合: 従来の execute_action にフォールバック
287        let result = execute_action(action, self.working_dir.as_deref());
288
289        // 失敗処理
290        if !result.success {
291            let failures = self.consecutive_failures.fetch_add(1, Ordering::SeqCst) + 1;
292
293            if self.escalation_threshold > 0 && failures >= self.escalation_threshold as usize {
294                self.consecutive_failures.store(0, Ordering::SeqCst);
295                return WorkResult::Escalate {
296                    reason: EscalationReason::ConsecutiveFailures(failures as u32),
297                    context: Some(format!(
298                        "Action '{}' failed {} times",
299                        action.name, failures
300                    )),
301                };
302            }
303
304            return WorkResult::acted(result);
305        }
306
307        // 成功
308        self.consecutive_failures.store(0, Ordering::SeqCst);
309
310        let delta = WorkerStateDelta::new().with_cache(
311            format!("{}:last", self.name),
312            format!("tick:{},action:{}", state.shared.tick, action.name).into_bytes(),
313            100,
314        );
315
316        WorkResult::acted_with_delta(result, delta)
317    }
318
319    fn id(&self) -> WorkerId {
320        self.id
321    }
322
323    fn name(&self) -> &str {
324        &self.name
325    }
326}
327
328// ============================================================================
329// ExtensionAwareWorker
330// ============================================================================
331
332/// Extension を活用する Worker
333///
334/// SwarmState.shared.extensions から動的リソースを取得して使用。
335///
336/// # 使用例
337///
338/// ```ignore
339/// // Extension として設定された共有リソース
340/// struct SharedCounter(AtomicUsize);
341///
342/// let worker = ExtensionAwareWorker::new(0);
343///
344/// // Orchestrator に Extension を登録
345/// let orchestrator = OrchestratorBuilder::new()
346///     .add_worker(worker)
347///     .extension(SharedCounter(AtomicUsize::new(0)))
348///     .build(runtime);
349/// ```
350pub struct ExtensionAwareWorker {
351    id: WorkerId,
352    name: String,
353}
354
355impl ExtensionAwareWorker {
356    pub fn new(id: usize) -> Self {
357        Self {
358            id: WorkerId(id),
359            name: format!("extension_worker_{}", id),
360        }
361    }
362
363    pub fn with_name(mut self, name: impl Into<String>) -> Self {
364        self.name = name.into();
365        self
366    }
367}
368
369impl WorkerAgent for ExtensionAwareWorker {
370    fn think_and_act(&self, state: &SwarmState, guidance: Option<&Guidance>) -> WorkResult {
371        use crate::types::ActionResult;
372
373        // Extensions から共有カウンターを取得して操作
374        if let Some(counter) = state.shared.extensions.get::<AtomicUsize>() {
375            let old = counter.fetch_add(1, Ordering::SeqCst);
376
377            // SharedData に記録
378            let mut delta = WorkerStateDelta::new();
379            delta = delta.with_shared(
380                format!("extension_worker:{}:count", self.id.0),
381                format!("{}", old + 1).into_bytes(),
382            );
383
384            // Guidance の content を props として利用
385            if let Some(g) = guidance {
386                if let Some(content) = &g.content {
387                    delta = delta.with_shared(
388                        format!("extension_worker:{}:guidance", self.id.0),
389                        content.as_bytes().to_vec(),
390                    );
391                }
392            }
393
394            return WorkResult::acted_with_delta(
395                ActionResult::success(format!("counter: {}", old + 1), Duration::from_millis(1)),
396                delta,
397            );
398        }
399
400        // Extension がない場合は NeedsGuidance
401        WorkResult::NeedsGuidance {
402            reason: "No shared counter extension found".to_string(),
403            context: GuidanceContext {
404                issue: Issue {
405                    description: "Extension 'AtomicUsize' is not registered".to_string(),
406                    severity: Priority::High,
407                },
408                options: vec![super::worker::ProposedOption {
409                    description: "Register AtomicUsize extension".to_string(),
410                    pros: vec!["Enables shared counter functionality".to_string()],
411                    cons: vec![],
412                }],
413                relevant_state: RelevantState::default(),
414            },
415        }
416    }
417
418    fn id(&self) -> WorkerId {
419        self.id
420    }
421
422    fn name(&self) -> &str {
423        &self.name
424    }
425}
426
427// ============================================================================
428// ProgressWorker
429// ============================================================================
430
431/// 進捗を報告する Worker
432///
433/// Continuing を使って処理進捗を報告。
434/// 複数Tick にまたがる処理のシミュレーション。
435pub struct ProgressWorker {
436    id: WorkerId,
437    name: String,
438    /// 完了までのTick数
439    total_ticks: u32,
440    /// 現在の進捗(Atomic で &self から更新)
441    current_tick: AtomicUsize,
442}
443
444impl ProgressWorker {
445    pub fn new(id: usize, total_ticks: u32) -> Self {
446        Self {
447            id: WorkerId(id),
448            name: format!("progress_worker_{}", id),
449            total_ticks,
450            current_tick: AtomicUsize::new(0),
451        }
452    }
453
454    pub fn with_name(mut self, name: impl Into<String>) -> Self {
455        self.name = name.into();
456        self
457    }
458}
459
460impl WorkerAgent for ProgressWorker {
461    fn think_and_act(&self, _state: &SwarmState, _guidance: Option<&Guidance>) -> WorkResult {
462        use crate::types::ActionResult;
463
464        let current = self.current_tick.fetch_add(1, Ordering::SeqCst) + 1;
465        let progress = current as f32 / self.total_ticks as f32;
466
467        if current >= self.total_ticks as usize {
468            // 完了
469            let mut delta = WorkerStateDelta::new();
470            delta = delta.with_shared(
471                format!("progress_worker:{}:status", self.id.0),
472                b"completed".to_vec(),
473            );
474
475            WorkResult::acted_with_delta(
476                ActionResult::success("completed", Duration::from_millis(1)),
477                delta,
478            )
479        } else {
480            // 継続中
481            WorkResult::Continuing { progress }
482        }
483    }
484
485    fn id(&self) -> WorkerId {
486        self.id
487    }
488
489    fn name(&self) -> &str {
490        &self.name
491    }
492}
493
494// ============================================================================
495// Tests
496// ============================================================================
497
498#[cfg(test)]
499mod tests {
500    use super::*;
501
502    fn get_output_string(result: &ActionResult) -> String {
503        result
504            .output
505            .as_ref()
506            .map(|o| o.as_text())
507            .unwrap_or_default()
508    }
509
510    #[test]
511    fn test_run_bash_echo() {
512        let result = run_bash("echo hello", None);
513        assert!(result.success);
514        assert!(get_output_string(&result).contains("hello"));
515    }
516
517    #[test]
518    fn test_run_bash_failure() {
519        let result = run_bash("exit 1", None);
520        assert!(!result.success);
521    }
522
523    #[test]
524    fn test_unsupported_action() {
525        let action = Action {
526            name: "Unknown".to_string(),
527            params: Default::default(),
528        };
529        let result = execute_action(&action, None);
530        assert!(!result.success);
531        assert!(result
532            .error
533            .as_ref()
534            .is_some_and(|e| e.contains("Unsupported")));
535    }
536}