Skip to main content

a3s_code_core/orchestrator/
handle.rs

1//! SubAgent 句柄
2
3use crate::error::Result;
4use crate::orchestrator::{
5    agent::SubAgentEventStream, ControlSignal, OrchestratorEvent, SubAgentActivity, SubAgentConfig,
6    SubAgentState,
7};
8use std::sync::Arc;
9use tokio::sync::broadcast;
10use tokio::sync::RwLock;
11
12/// SubAgent 句柄
13///
14/// 用于控制和查询 SubAgent 的状态。
15#[derive(Clone)]
16pub struct SubAgentHandle {
17    /// SubAgent ID
18    pub id: String,
19
20    /// SubAgent 配置
21    pub(crate) config: SubAgentConfig,
22
23    /// 创建时间(Unix 时间戳,毫秒)
24    pub(crate) created_at: u64,
25
26    /// 控制信号发送器
27    control_tx: tokio::sync::mpsc::Sender<ControlSignal>,
28
29    /// Per-subagent event broadcaster used by `handle.events()`.
30    subagent_event_tx: broadcast::Sender<OrchestratorEvent>,
31
32    /// Backlog of already-emitted events for this specific subagent.
33    event_history: Arc<RwLock<std::collections::VecDeque<OrchestratorEvent>>>,
34
35    /// 状态
36    state: Arc<RwLock<SubAgentState>>,
37
38    /// 当前活动
39    pub(crate) activity: Arc<RwLock<SubAgentActivity>>,
40
41    /// 任务句柄(保持任务存活,不直接访问)
42    #[allow(dead_code)]
43    task_handle: Arc<tokio::task::JoinHandle<Result<String>>>,
44}
45
46pub(crate) struct SubAgentHandleParts {
47    pub id: String,
48    pub config: SubAgentConfig,
49    pub control_tx: tokio::sync::mpsc::Sender<ControlSignal>,
50    pub subagent_event_tx: tokio::sync::broadcast::Sender<OrchestratorEvent>,
51    pub event_history: Arc<RwLock<std::collections::VecDeque<OrchestratorEvent>>>,
52    pub state: Arc<RwLock<SubAgentState>>,
53    pub activity: Arc<RwLock<SubAgentActivity>>,
54    pub task_handle: tokio::task::JoinHandle<Result<String>>,
55}
56
57impl SubAgentHandle {
58    /// 创建新的句柄
59    pub(crate) fn new(parts: SubAgentHandleParts) -> Self {
60        Self {
61            id: parts.id,
62            config: parts.config,
63            created_at: std::time::SystemTime::now()
64                .duration_since(std::time::UNIX_EPOCH)
65                .unwrap()
66                .as_millis() as u64,
67            control_tx: parts.control_tx,
68            subagent_event_tx: parts.subagent_event_tx,
69            event_history: parts.event_history,
70            state: parts.state,
71            activity: parts.activity,
72            task_handle: Arc::new(parts.task_handle),
73        }
74    }
75
76    /// 获取当前状态
77    ///
78    /// 注意:此方法使用非阻塞读取。如果状态锁当前被持有,返回 Initializing 状态。
79    /// 在异步上下文中,建议使用 `state_async()` 以获得准确的状态。
80    pub fn state(&self) -> SubAgentState {
81        self.state
82            .try_read()
83            .map(|guard| guard.clone())
84            .unwrap_or(SubAgentState::Initializing)
85    }
86
87    /// 异步获取当前状态
88    pub async fn state_async(&self) -> SubAgentState {
89        self.state.read().await.clone()
90    }
91
92    /// 获取当前活动
93    pub async fn activity(&self) -> SubAgentActivity {
94        self.activity.read().await.clone()
95    }
96
97    /// 获取配置
98    pub fn config(&self) -> &SubAgentConfig {
99        &self.config
100    }
101
102    /// 获取创建时间
103    pub fn created_at(&self) -> u64 {
104        self.created_at
105    }
106
107    /// 发送控制信号
108    pub async fn send_control(&self, signal: ControlSignal) -> Result<()> {
109        self.control_tx
110            .send(signal)
111            .await
112            .map_err(|_| anyhow::anyhow!("Failed to send control signal: channel closed"))?;
113        Ok(())
114    }
115
116    /// 暂停执行
117    pub async fn pause(&self) -> Result<()> {
118        self.send_control(ControlSignal::Pause).await
119    }
120
121    /// 恢复执行
122    pub async fn resume(&self) -> Result<()> {
123        self.send_control(ControlSignal::Resume).await
124    }
125
126    /// 取消执行
127    pub async fn cancel(&self) -> Result<()> {
128        self.send_control(ControlSignal::Cancel).await
129    }
130
131    /// 调整参数
132    pub async fn adjust_params(
133        &self,
134        max_steps: Option<usize>,
135        timeout_ms: Option<u64>,
136    ) -> Result<()> {
137        self.send_control(ControlSignal::AdjustParams {
138            max_steps,
139            timeout_ms,
140        })
141        .await
142    }
143
144    /// 注入新提示词
145    pub async fn inject_prompt(&self, prompt: impl Into<String>) -> Result<()> {
146        self.send_control(ControlSignal::InjectPrompt {
147            prompt: prompt.into(),
148        })
149        .await
150    }
151
152    /// 等待完成
153    pub async fn wait(&self) -> Result<String> {
154        // Note: 这里需要克隆 Arc 来避免移动所有权问题
155        // 实际使用中,应该只调用一次 wait()
156        loop {
157            let state = self.state_async().await;
158            if state.is_terminal() {
159                match state {
160                    SubAgentState::Completed { output, .. } => return Ok(output),
161                    SubAgentState::Cancelled => {
162                        return Err(anyhow::anyhow!("SubAgent was cancelled").into())
163                    }
164                    SubAgentState::Error { message } => {
165                        return Err(anyhow::anyhow!("SubAgent error: {}", message).into())
166                    }
167                    _ => unreachable!(),
168                }
169            }
170            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
171        }
172    }
173
174    /// 是否已完成
175    pub fn is_done(&self) -> bool {
176        self.state().is_terminal()
177    }
178
179    /// 是否正在运行
180    pub fn is_running(&self) -> bool {
181        self.state().is_running()
182    }
183
184    /// 是否已暂停
185    pub fn is_paused(&self) -> bool {
186        self.state().is_paused()
187    }
188
189    /// Subscribe to events for this SubAgent.
190    ///
191    /// Returns a filtered event stream that only includes events for this SubAgent.
192    pub fn events(&self) -> SubAgentEventStream {
193        let rx = self.subagent_event_tx.subscribe();
194        let history = self
195            .event_history
196            .try_read()
197            .map(|events| events.clone())
198            .unwrap_or_default();
199        SubAgentEventStream {
200            history,
201            rx,
202            filter_id: self.id.clone(),
203        }
204    }
205}
206
207impl std::fmt::Debug for SubAgentHandle {
208    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209        f.debug_struct("SubAgentHandle")
210            .field("id", &self.id)
211            .field("state", &self.state())
212            .finish()
213    }
214}