a3s_code_core/orchestrator/
handle.rs1use crate::error::Result;
4use crate::orchestrator::{
5 agent::SubAgentEventStream, ControlSignal, OrchestratorEvent, SubAgentActivity, SubAgentConfig,
6 SubAgentState,
7};
8use std::sync::Arc;
9use tokio::sync::broadcast;
10use tokio::sync::RwLock;
11
12#[derive(Clone)]
16pub struct SubAgentHandle {
17 pub id: String,
19
20 pub(crate) config: SubAgentConfig,
22
23 pub(crate) created_at: u64,
25
26 control_tx: tokio::sync::mpsc::Sender<ControlSignal>,
28
29 subagent_event_tx: broadcast::Sender<OrchestratorEvent>,
31
32 event_history: Arc<RwLock<std::collections::VecDeque<OrchestratorEvent>>>,
34
35 state: Arc<RwLock<SubAgentState>>,
37
38 pub(crate) activity: Arc<RwLock<SubAgentActivity>>,
40
41 #[allow(dead_code)]
43 task_handle: Arc<tokio::task::JoinHandle<Result<String>>>,
44}
45
46pub(crate) struct SubAgentHandleParts {
47 pub id: String,
48 pub config: SubAgentConfig,
49 pub control_tx: tokio::sync::mpsc::Sender<ControlSignal>,
50 pub subagent_event_tx: tokio::sync::broadcast::Sender<OrchestratorEvent>,
51 pub event_history: Arc<RwLock<std::collections::VecDeque<OrchestratorEvent>>>,
52 pub state: Arc<RwLock<SubAgentState>>,
53 pub activity: Arc<RwLock<SubAgentActivity>>,
54 pub task_handle: tokio::task::JoinHandle<Result<String>>,
55}
56
57impl SubAgentHandle {
58 pub(crate) fn new(parts: SubAgentHandleParts) -> Self {
60 Self {
61 id: parts.id,
62 config: parts.config,
63 created_at: std::time::SystemTime::now()
64 .duration_since(std::time::UNIX_EPOCH)
65 .unwrap()
66 .as_millis() as u64,
67 control_tx: parts.control_tx,
68 subagent_event_tx: parts.subagent_event_tx,
69 event_history: parts.event_history,
70 state: parts.state,
71 activity: parts.activity,
72 task_handle: Arc::new(parts.task_handle),
73 }
74 }
75
76 pub fn state(&self) -> SubAgentState {
81 self.state
82 .try_read()
83 .map(|guard| guard.clone())
84 .unwrap_or(SubAgentState::Initializing)
85 }
86
87 pub async fn state_async(&self) -> SubAgentState {
89 self.state.read().await.clone()
90 }
91
92 pub async fn activity(&self) -> SubAgentActivity {
94 self.activity.read().await.clone()
95 }
96
97 pub fn config(&self) -> &SubAgentConfig {
99 &self.config
100 }
101
102 pub fn created_at(&self) -> u64 {
104 self.created_at
105 }
106
107 pub async fn send_control(&self, signal: ControlSignal) -> Result<()> {
109 self.control_tx
110 .send(signal)
111 .await
112 .map_err(|_| anyhow::anyhow!("Failed to send control signal: channel closed"))?;
113 Ok(())
114 }
115
116 pub async fn pause(&self) -> Result<()> {
118 self.send_control(ControlSignal::Pause).await
119 }
120
121 pub async fn resume(&self) -> Result<()> {
123 self.send_control(ControlSignal::Resume).await
124 }
125
126 pub async fn cancel(&self) -> Result<()> {
128 self.send_control(ControlSignal::Cancel).await
129 }
130
131 pub async fn adjust_params(
133 &self,
134 max_steps: Option<usize>,
135 timeout_ms: Option<u64>,
136 ) -> Result<()> {
137 self.send_control(ControlSignal::AdjustParams {
138 max_steps,
139 timeout_ms,
140 })
141 .await
142 }
143
144 pub async fn inject_prompt(&self, prompt: impl Into<String>) -> Result<()> {
146 self.send_control(ControlSignal::InjectPrompt {
147 prompt: prompt.into(),
148 })
149 .await
150 }
151
152 pub async fn wait(&self) -> Result<String> {
154 loop {
157 let state = self.state_async().await;
158 if state.is_terminal() {
159 match state {
160 SubAgentState::Completed { output, .. } => return Ok(output),
161 SubAgentState::Cancelled => {
162 return Err(anyhow::anyhow!("SubAgent was cancelled").into())
163 }
164 SubAgentState::Error { message } => {
165 return Err(anyhow::anyhow!("SubAgent error: {}", message).into())
166 }
167 _ => unreachable!(),
168 }
169 }
170 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
171 }
172 }
173
174 pub fn is_done(&self) -> bool {
176 self.state().is_terminal()
177 }
178
179 pub fn is_running(&self) -> bool {
181 self.state().is_running()
182 }
183
184 pub fn is_paused(&self) -> bool {
186 self.state().is_paused()
187 }
188
189 pub fn events(&self) -> SubAgentEventStream {
193 let rx = self.subagent_event_tx.subscribe();
194 let history = self
195 .event_history
196 .try_read()
197 .map(|events| events.clone())
198 .unwrap_or_default();
199 SubAgentEventStream {
200 history,
201 rx,
202 filter_id: self.id.clone(),
203 }
204 }
205}
206
207impl std::fmt::Debug for SubAgentHandle {
208 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209 f.debug_struct("SubAgentHandle")
210 .field("id", &self.id)
211 .field("state", &self.state())
212 .finish()
213 }
214}