a3s_code_core/orchestrator/
handle.rs1use crate::error::Result;
4use crate::orchestrator::{
5 agent::SubAgentEventStream, ControlSignal, OrchestratorEvent, SubAgentActivity, SubAgentConfig,
6 SubAgentState,
7};
8use std::sync::Arc;
9use tokio::sync::RwLock;
10
11#[derive(Clone)]
15pub struct SubAgentHandle {
16 pub id: String,
18
19 pub(crate) config: SubAgentConfig,
21
22 pub(crate) created_at: u64,
24
25 control_tx: tokio::sync::mpsc::Sender<ControlSignal>,
27
28 event_tx: tokio::sync::broadcast::Sender<OrchestratorEvent>,
30
31 state: Arc<RwLock<SubAgentState>>,
33
34 pub(crate) activity: Arc<RwLock<SubAgentActivity>>,
36
37 #[allow(dead_code)]
39 task_handle: Arc<tokio::task::JoinHandle<Result<String>>>,
40}
41
42impl SubAgentHandle {
43 pub(crate) fn new(
45 id: String,
46 config: SubAgentConfig,
47 control_tx: tokio::sync::mpsc::Sender<ControlSignal>,
48 event_tx: tokio::sync::broadcast::Sender<OrchestratorEvent>,
49 state: Arc<RwLock<SubAgentState>>,
50 activity: Arc<RwLock<SubAgentActivity>>,
51 task_handle: tokio::task::JoinHandle<Result<String>>,
52 ) -> Self {
53 Self {
54 id,
55 config,
56 created_at: std::time::SystemTime::now()
57 .duration_since(std::time::UNIX_EPOCH)
58 .unwrap()
59 .as_millis() as u64,
60 control_tx,
61 event_tx,
62 state,
63 activity,
64 task_handle: Arc::new(task_handle),
65 }
66 }
67
68 pub fn state(&self) -> SubAgentState {
73 self.state
74 .try_read()
75 .map(|guard| guard.clone())
76 .unwrap_or(SubAgentState::Initializing)
77 }
78
79 pub async fn state_async(&self) -> SubAgentState {
81 self.state.read().await.clone()
82 }
83
84 pub async fn activity(&self) -> SubAgentActivity {
86 self.activity.read().await.clone()
87 }
88
89 pub fn config(&self) -> &SubAgentConfig {
91 &self.config
92 }
93
94 pub fn created_at(&self) -> u64 {
96 self.created_at
97 }
98
99 pub async fn send_control(&self, signal: ControlSignal) -> Result<()> {
101 self.control_tx
102 .send(signal)
103 .await
104 .map_err(|_| anyhow::anyhow!("Failed to send control signal: channel closed"))?;
105 Ok(())
106 }
107
108 pub async fn pause(&self) -> Result<()> {
110 self.send_control(ControlSignal::Pause).await
111 }
112
113 pub async fn resume(&self) -> Result<()> {
115 self.send_control(ControlSignal::Resume).await
116 }
117
118 pub async fn cancel(&self) -> Result<()> {
120 self.send_control(ControlSignal::Cancel).await
121 }
122
123 pub async fn adjust_params(
125 &self,
126 max_steps: Option<usize>,
127 timeout_ms: Option<u64>,
128 ) -> Result<()> {
129 self.send_control(ControlSignal::AdjustParams {
130 max_steps,
131 timeout_ms,
132 })
133 .await
134 }
135
136 pub async fn inject_prompt(&self, prompt: impl Into<String>) -> Result<()> {
138 self.send_control(ControlSignal::InjectPrompt {
139 prompt: prompt.into(),
140 })
141 .await
142 }
143
144 pub async fn wait(&self) -> Result<String> {
146 loop {
149 let state = self.state_async().await;
150 if state.is_terminal() {
151 match state {
152 SubAgentState::Completed { output, .. } => return Ok(output),
153 SubAgentState::Cancelled => {
154 return Err(anyhow::anyhow!("SubAgent was cancelled").into())
155 }
156 SubAgentState::Error { message } => {
157 return Err(anyhow::anyhow!("SubAgent error: {}", message).into())
158 }
159 _ => unreachable!(),
160 }
161 }
162 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
163 }
164 }
165
166 pub fn is_done(&self) -> bool {
168 self.state().is_terminal()
169 }
170
171 pub fn is_running(&self) -> bool {
173 self.state().is_running()
174 }
175
176 pub fn is_paused(&self) -> bool {
178 self.state().is_paused()
179 }
180
181 pub fn events(&self) -> SubAgentEventStream {
185 let rx = self.event_tx.subscribe();
186 SubAgentEventStream {
187 rx,
188 filter_id: self.id.clone(),
189 }
190 }
191}
192
193impl std::fmt::Debug for SubAgentHandle {
194 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195 f.debug_struct("SubAgentHandle")
196 .field("id", &self.id)
197 .field("state", &self.state())
198 .finish()
199 }
200}