Skip to main content

a3s_code_core/orchestrator/
handle.rs

1//! SubAgent 句柄
2
3use crate::error::Result;
4use crate::orchestrator::{
5    agent::SubAgentEventStream, ControlSignal, OrchestratorEvent, SubAgentActivity, SubAgentConfig,
6    SubAgentState,
7};
8use std::sync::Arc;
9use tokio::sync::RwLock;
10
11/// SubAgent 句柄
12///
13/// 用于控制和查询 SubAgent 的状态。
14#[derive(Clone)]
15pub struct SubAgentHandle {
16    /// SubAgent ID
17    pub id: String,
18
19    /// SubAgent 配置
20    pub(crate) config: SubAgentConfig,
21
22    /// 创建时间(Unix 时间戳,毫秒)
23    pub(crate) created_at: u64,
24
25    /// 控制信号发送器
26    control_tx: tokio::sync::mpsc::Sender<ControlSignal>,
27
28    /// 事件广播发送器
29    event_tx: tokio::sync::broadcast::Sender<OrchestratorEvent>,
30
31    /// 状态
32    state: Arc<RwLock<SubAgentState>>,
33
34    /// 当前活动
35    pub(crate) activity: Arc<RwLock<SubAgentActivity>>,
36
37    /// 任务句柄(保持任务存活,不直接访问)
38    #[allow(dead_code)]
39    task_handle: Arc<tokio::task::JoinHandle<Result<String>>>,
40}
41
42impl SubAgentHandle {
43    /// 创建新的句柄
44    pub(crate) fn new(
45        id: String,
46        config: SubAgentConfig,
47        control_tx: tokio::sync::mpsc::Sender<ControlSignal>,
48        event_tx: tokio::sync::broadcast::Sender<OrchestratorEvent>,
49        state: Arc<RwLock<SubAgentState>>,
50        activity: Arc<RwLock<SubAgentActivity>>,
51        task_handle: tokio::task::JoinHandle<Result<String>>,
52    ) -> Self {
53        Self {
54            id,
55            config,
56            created_at: std::time::SystemTime::now()
57                .duration_since(std::time::UNIX_EPOCH)
58                .unwrap()
59                .as_millis() as u64,
60            control_tx,
61            event_tx,
62            state,
63            activity,
64            task_handle: Arc::new(task_handle),
65        }
66    }
67
68    /// 获取当前状态
69    ///
70    /// 注意:此方法使用非阻塞读取。如果状态锁当前被持有,返回 Initializing 状态。
71    /// 在异步上下文中,建议使用 `state_async()` 以获得准确的状态。
72    pub fn state(&self) -> SubAgentState {
73        self.state
74            .try_read()
75            .map(|guard| guard.clone())
76            .unwrap_or(SubAgentState::Initializing)
77    }
78
79    /// 异步获取当前状态
80    pub async fn state_async(&self) -> SubAgentState {
81        self.state.read().await.clone()
82    }
83
84    /// 获取当前活动
85    pub async fn activity(&self) -> SubAgentActivity {
86        self.activity.read().await.clone()
87    }
88
89    /// 获取配置
90    pub fn config(&self) -> &SubAgentConfig {
91        &self.config
92    }
93
94    /// 获取创建时间
95    pub fn created_at(&self) -> u64 {
96        self.created_at
97    }
98
99    /// 发送控制信号
100    pub async fn send_control(&self, signal: ControlSignal) -> Result<()> {
101        self.control_tx
102            .send(signal)
103            .await
104            .map_err(|_| anyhow::anyhow!("Failed to send control signal: channel closed"))?;
105        Ok(())
106    }
107
108    /// 暂停执行
109    pub async fn pause(&self) -> Result<()> {
110        self.send_control(ControlSignal::Pause).await
111    }
112
113    /// 恢复执行
114    pub async fn resume(&self) -> Result<()> {
115        self.send_control(ControlSignal::Resume).await
116    }
117
118    /// 取消执行
119    pub async fn cancel(&self) -> Result<()> {
120        self.send_control(ControlSignal::Cancel).await
121    }
122
123    /// 调整参数
124    pub async fn adjust_params(
125        &self,
126        max_steps: Option<usize>,
127        timeout_ms: Option<u64>,
128    ) -> Result<()> {
129        self.send_control(ControlSignal::AdjustParams {
130            max_steps,
131            timeout_ms,
132        })
133        .await
134    }
135
136    /// 注入新提示词
137    pub async fn inject_prompt(&self, prompt: impl Into<String>) -> Result<()> {
138        self.send_control(ControlSignal::InjectPrompt {
139            prompt: prompt.into(),
140        })
141        .await
142    }
143
144    /// 等待完成
145    pub async fn wait(&self) -> Result<String> {
146        // Note: 这里需要克隆 Arc 来避免移动所有权问题
147        // 实际使用中,应该只调用一次 wait()
148        loop {
149            let state = self.state_async().await;
150            if state.is_terminal() {
151                match state {
152                    SubAgentState::Completed { output, .. } => return Ok(output),
153                    SubAgentState::Cancelled => {
154                        return Err(anyhow::anyhow!("SubAgent was cancelled").into())
155                    }
156                    SubAgentState::Error { message } => {
157                        return Err(anyhow::anyhow!("SubAgent error: {}", message).into())
158                    }
159                    _ => unreachable!(),
160                }
161            }
162            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
163        }
164    }
165
166    /// 是否已完成
167    pub fn is_done(&self) -> bool {
168        self.state().is_terminal()
169    }
170
171    /// 是否正在运行
172    pub fn is_running(&self) -> bool {
173        self.state().is_running()
174    }
175
176    /// 是否已暂停
177    pub fn is_paused(&self) -> bool {
178        self.state().is_paused()
179    }
180
181    /// Subscribe to events for this SubAgent.
182    ///
183    /// Returns a filtered event stream that only includes events for this SubAgent.
184    pub fn events(&self) -> SubAgentEventStream {
185        let rx = self.event_tx.subscribe();
186        SubAgentEventStream {
187            rx,
188            filter_id: self.id.clone(),
189        }
190    }
191}
192
193impl std::fmt::Debug for SubAgentHandle {
194    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195        f.debug_struct("SubAgentHandle")
196            .field("id", &self.id)
197            .field("state", &self.state())
198            .finish()
199    }
200}