Skip to main content

a3s_code_core/orchestrator/
wrapper.rs

1//! SubAgent 包装器
2//!
3//! 包装 AgentLoop 执行,提供:
4//! - 事件转发到 Orchestrator
5//! - 控制信号处理
6//! - 状态管理
7
8use crate::error::Result;
9use crate::orchestrator::{
10    ControlSignal, OrchestratorEvent, SubAgentActivity, SubAgentConfig, SubAgentState,
11};
12use std::sync::Arc;
13use tokio::sync::{broadcast, mpsc, RwLock};
14
15/// SubAgent 包装器
16///
17/// TODO: 完整实现需要与 AgentLoop 集成
18pub struct SubAgentWrapper {
19    /// SubAgent ID
20    id: String,
21
22    /// 配置
23    config: SubAgentConfig,
24
25    /// 事件发送器
26    event_tx: broadcast::Sender<OrchestratorEvent>,
27
28    /// 控制信号接收器
29    control_rx: mpsc::Receiver<ControlSignal>,
30
31    /// 状态
32    state: Arc<RwLock<SubAgentState>>,
33
34    /// 当前活动
35    activity: Arc<RwLock<SubAgentActivity>>,
36}
37
38impl SubAgentWrapper {
39    /// 创建新的 wrapper
40    pub fn new(
41        id: String,
42        config: SubAgentConfig,
43        event_tx: broadcast::Sender<OrchestratorEvent>,
44        control_rx: mpsc::Receiver<ControlSignal>,
45        state: Arc<RwLock<SubAgentState>>,
46        activity: Arc<RwLock<SubAgentActivity>>,
47    ) -> Self {
48        Self {
49            id,
50            config,
51            event_tx,
52            control_rx,
53            state,
54            activity,
55        }
56    }
57
58    /// 执行 SubAgent
59    ///
60    /// TODO: 完整实现需要:
61    /// 1. 创建 AgentLoop
62    /// 2. 包装执行,转发事件
63    /// 3. 处理控制信号
64    pub async fn execute(mut self) -> Result<String> {
65        // 更新状态为运行中
66        self.update_state(SubAgentState::Running).await;
67
68        // TODO: 实际的 AgentLoop 执行
69        // 这里是占位符实现
70        let result = self.execute_placeholder().await;
71
72        // 更新状态为完成
73        match &result {
74            Ok(output) => {
75                self.update_state(SubAgentState::Completed {
76                    success: true,
77                    output: output.clone(),
78                })
79                .await;
80
81                // 发布完成事件
82                let _ = self.event_tx.send(OrchestratorEvent::SubAgentCompleted {
83                    id: self.id.clone(),
84                    success: true,
85                    output: output.clone(),
86                    duration_ms: 0, // TODO: 实际计时
87                    token_usage: None,
88                });
89            }
90            Err(e) => {
91                // 如果已经是 Cancelled 状态,不要覆盖
92                let current_state = self.state.read().await.clone();
93                if !matches!(current_state, SubAgentState::Cancelled) {
94                    self.update_state(SubAgentState::Error {
95                        message: e.to_string(),
96                    })
97                    .await;
98                }
99
100                // 发布完成事件
101                let _ = self.event_tx.send(OrchestratorEvent::SubAgentCompleted {
102                    id: self.id.clone(),
103                    success: false,
104                    output: e.to_string(),
105                    duration_ms: 0,
106                    token_usage: None,
107                });
108            }
109        }
110
111        result
112    }
113
114    /// 占位符执行(TODO: 替换为实际的 AgentLoop 集成)
115    async fn execute_placeholder(&mut self) -> Result<String> {
116        // 模拟多步执行
117        for step in 1..=5 {
118            // 检查控制信号(非阻塞)
119            while let Ok(signal) = self.control_rx.try_recv() {
120                self.handle_control_signal(signal).await?;
121            }
122
123            // 如果被取消,立即返回
124            if matches!(*self.state.read().await, SubAgentState::Cancelled) {
125                return Err(anyhow::anyhow!("Cancelled by orchestrator").into());
126            }
127
128            // 如果被暂停,等待恢复
129            while matches!(*self.state.read().await, SubAgentState::Paused) {
130                *self.activity.write().await = SubAgentActivity::WaitingForControl {
131                    reason: "Paused by orchestrator".to_string(),
132                };
133                tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
134                // 继续检查控制信号
135                while let Ok(signal) = self.control_rx.try_recv() {
136                    self.handle_control_signal(signal).await?;
137                }
138            }
139
140            // 模拟工具调用
141            *self.activity.write().await = SubAgentActivity::CallingTool {
142                tool_name: "read".to_string(),
143                args: serde_json::json!({"path": "/tmp/file.txt"}),
144            };
145
146            // 发布工具调用事件
147            let _ = self.event_tx.send(OrchestratorEvent::ToolExecutionStarted {
148                id: self.id.clone(),
149                tool_id: format!("tool-{}", step),
150                tool_name: "read".to_string(),
151                args: serde_json::json!({"path": "/tmp/file.txt"}),
152            });
153
154            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
155
156            // 模拟 LLM 请求
157            *self.activity.write().await = SubAgentActivity::RequestingLlm { message_count: 3 };
158
159            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
160
161            // 回到空闲状态
162            *self.activity.write().await = SubAgentActivity::Idle;
163
164            // 发布进度事件
165            let _ = self.event_tx.send(OrchestratorEvent::SubAgentProgress {
166                id: self.id.clone(),
167                step,
168                total_steps: 5,
169                message: format!("Step {}/5 completed", step),
170            });
171        }
172
173        Ok(format!(
174            "Placeholder result for SubAgent {} ({})",
175            self.id, self.config.agent_type
176        ))
177    }
178
179    /// 处理控制信号
180    async fn handle_control_signal(&mut self, signal: ControlSignal) -> Result<()> {
181        // 发布控制信号接收事件
182        let _ = self
183            .event_tx
184            .send(OrchestratorEvent::ControlSignalReceived {
185                id: self.id.clone(),
186                signal: signal.clone(),
187            });
188
189        let result = match signal {
190            ControlSignal::Pause => {
191                self.update_state(SubAgentState::Paused).await;
192                Ok(())
193            }
194            ControlSignal::Resume => {
195                self.update_state(SubAgentState::Running).await;
196                Ok(())
197            }
198            ControlSignal::Cancel => {
199                self.update_state(SubAgentState::Cancelled).await;
200                Err(anyhow::anyhow!("Cancelled by orchestrator").into())
201            }
202            ControlSignal::AdjustParams { .. } => {
203                // TODO: 实际参数调整逻辑
204                Ok(())
205            }
206            ControlSignal::InjectPrompt { .. } => {
207                // TODO: 实际提示词注入逻辑
208                Ok(())
209            }
210        };
211
212        // 发布控制信号应用事件
213        let _ = self.event_tx.send(OrchestratorEvent::ControlSignalApplied {
214            id: self.id.clone(),
215            signal,
216            success: result.is_ok(),
217            error: result.as_ref().err().map(|e| format!("{}", e)),
218        });
219
220        result
221    }
222
223    /// 更新状态
224    async fn update_state(&self, new_state: SubAgentState) {
225        let old_state = {
226            let mut state = self.state.write().await;
227            let old = state.clone();
228            *state = new_state.clone();
229            old
230        };
231
232        // 发布状态变更事件
233        let _ = self.event_tx.send(OrchestratorEvent::SubAgentStateChanged {
234            id: self.id.clone(),
235            old_state,
236            new_state,
237        });
238    }
239}