a3s_code_core/orchestrator/
wrapper.rs1use crate::error::Result;
9use crate::orchestrator::{
10 ControlSignal, OrchestratorEvent, SubAgentActivity, SubAgentConfig, SubAgentState,
11};
12use std::sync::Arc;
13use tokio::sync::{broadcast, mpsc, RwLock};
14
15pub struct SubAgentWrapper {
19 id: String,
21
22 config: SubAgentConfig,
24
25 event_tx: broadcast::Sender<OrchestratorEvent>,
27
28 control_rx: mpsc::Receiver<ControlSignal>,
30
31 state: Arc<RwLock<SubAgentState>>,
33
34 activity: Arc<RwLock<SubAgentActivity>>,
36}
37
38impl SubAgentWrapper {
39 pub fn new(
41 id: String,
42 config: SubAgentConfig,
43 event_tx: broadcast::Sender<OrchestratorEvent>,
44 control_rx: mpsc::Receiver<ControlSignal>,
45 state: Arc<RwLock<SubAgentState>>,
46 activity: Arc<RwLock<SubAgentActivity>>,
47 ) -> Self {
48 Self {
49 id,
50 config,
51 event_tx,
52 control_rx,
53 state,
54 activity,
55 }
56 }
57
58 pub async fn execute(mut self) -> Result<String> {
65 self.update_state(SubAgentState::Running).await;
67
68 let result = self.execute_placeholder().await;
71
72 match &result {
74 Ok(output) => {
75 self.update_state(SubAgentState::Completed {
76 success: true,
77 output: output.clone(),
78 })
79 .await;
80
81 let _ = self.event_tx.send(OrchestratorEvent::SubAgentCompleted {
83 id: self.id.clone(),
84 success: true,
85 output: output.clone(),
86 duration_ms: 0, token_usage: None,
88 });
89 }
90 Err(e) => {
91 let current_state = self.state.read().await.clone();
93 if !matches!(current_state, SubAgentState::Cancelled) {
94 self.update_state(SubAgentState::Error {
95 message: e.to_string(),
96 })
97 .await;
98 }
99
100 let _ = self.event_tx.send(OrchestratorEvent::SubAgentCompleted {
102 id: self.id.clone(),
103 success: false,
104 output: e.to_string(),
105 duration_ms: 0,
106 token_usage: None,
107 });
108 }
109 }
110
111 result
112 }
113
114 async fn execute_placeholder(&mut self) -> Result<String> {
116 for step in 1..=5 {
118 while let Ok(signal) = self.control_rx.try_recv() {
120 self.handle_control_signal(signal).await?;
121 }
122
123 if matches!(*self.state.read().await, SubAgentState::Cancelled) {
125 return Err(anyhow::anyhow!("Cancelled by orchestrator").into());
126 }
127
128 while matches!(*self.state.read().await, SubAgentState::Paused) {
130 *self.activity.write().await = SubAgentActivity::WaitingForControl {
131 reason: "Paused by orchestrator".to_string(),
132 };
133 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
134 while let Ok(signal) = self.control_rx.try_recv() {
136 self.handle_control_signal(signal).await?;
137 }
138 }
139
140 *self.activity.write().await = SubAgentActivity::CallingTool {
142 tool_name: "read".to_string(),
143 args: serde_json::json!({"path": "/tmp/file.txt"}),
144 };
145
146 let _ = self.event_tx.send(OrchestratorEvent::ToolExecutionStarted {
148 id: self.id.clone(),
149 tool_id: format!("tool-{}", step),
150 tool_name: "read".to_string(),
151 args: serde_json::json!({"path": "/tmp/file.txt"}),
152 });
153
154 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
155
156 *self.activity.write().await = SubAgentActivity::RequestingLlm { message_count: 3 };
158
159 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
160
161 *self.activity.write().await = SubAgentActivity::Idle;
163
164 let _ = self.event_tx.send(OrchestratorEvent::SubAgentProgress {
166 id: self.id.clone(),
167 step,
168 total_steps: 5,
169 message: format!("Step {}/5 completed", step),
170 });
171 }
172
173 Ok(format!(
174 "Placeholder result for SubAgent {} ({})",
175 self.id, self.config.agent_type
176 ))
177 }
178
179 async fn handle_control_signal(&mut self, signal: ControlSignal) -> Result<()> {
181 let _ = self
183 .event_tx
184 .send(OrchestratorEvent::ControlSignalReceived {
185 id: self.id.clone(),
186 signal: signal.clone(),
187 });
188
189 let result = match signal {
190 ControlSignal::Pause => {
191 self.update_state(SubAgentState::Paused).await;
192 Ok(())
193 }
194 ControlSignal::Resume => {
195 self.update_state(SubAgentState::Running).await;
196 Ok(())
197 }
198 ControlSignal::Cancel => {
199 self.update_state(SubAgentState::Cancelled).await;
200 Err(anyhow::anyhow!("Cancelled by orchestrator").into())
201 }
202 ControlSignal::AdjustParams { .. } => {
203 Ok(())
205 }
206 ControlSignal::InjectPrompt { .. } => {
207 Ok(())
209 }
210 };
211
212 let _ = self.event_tx.send(OrchestratorEvent::ControlSignalApplied {
214 id: self.id.clone(),
215 signal,
216 success: result.is_ok(),
217 error: result.as_ref().err().map(|e| format!("{}", e)),
218 });
219
220 result
221 }
222
223 async fn update_state(&self, new_state: SubAgentState) {
225 let old_state = {
226 let mut state = self.state.write().await;
227 let old = state.clone();
228 *state = new_state.clone();
229 old
230 };
231
232 let _ = self.event_tx.send(OrchestratorEvent::SubAgentStateChanged {
234 id: self.id.clone(),
235 old_state,
236 new_state,
237 });
238 }
239}