1use crate::InvocationContext;
2use adk_artifact::ArtifactService;
3use adk_core::{Agent, Content, EventStream, Memory, Result, RunConfig};
4use adk_session::SessionService;
5use async_stream::stream;
6use std::sync::Arc;
7use tracing::Instrument;
8
9pub struct RunnerConfig {
10 pub app_name: String,
11 pub agent: Arc<dyn Agent>,
12 pub session_service: Arc<dyn SessionService>,
13 pub artifact_service: Option<Arc<dyn ArtifactService>>,
14 pub memory_service: Option<Arc<dyn Memory>>,
15 #[allow(dead_code)]
18 pub run_config: Option<RunConfig>,
19}
20
21pub struct Runner {
22 app_name: String,
23 root_agent: Arc<dyn Agent>,
24 session_service: Arc<dyn SessionService>,
25 artifact_service: Option<Arc<dyn ArtifactService>>,
26 memory_service: Option<Arc<dyn Memory>>,
27 run_config: RunConfig,
28}
29
30impl Runner {
31 pub fn new(config: RunnerConfig) -> Result<Self> {
32 Ok(Self {
33 app_name: config.app_name,
34 root_agent: config.agent,
35 session_service: config.session_service,
36 artifact_service: config.artifact_service,
37 memory_service: config.memory_service,
38 run_config: config.run_config.unwrap_or_default(),
39 })
40 }
41
42 pub async fn run(
43 &self,
44 user_id: String,
45 session_id: String,
46 user_content: Content,
47 ) -> Result<EventStream> {
48 let app_name = self.app_name.clone();
49 let session_service = self.session_service.clone();
50 let root_agent = self.root_agent.clone();
51 let artifact_service = self.artifact_service.clone();
52 let memory_service = self.memory_service.clone();
53 let run_config = self.run_config.clone();
54
55 let s = stream! {
56 let session = match session_service
58 .get(adk_session::GetRequest {
59 app_name: app_name.clone(),
60 user_id: user_id.clone(),
61 session_id: session_id.clone(),
62 num_recent_events: None,
63 after: None,
64 })
65 .await
66 {
67 Ok(s) => s,
68 Err(e) => {
69 yield Err(e);
70 return;
71 }
72 };
73
74 let agent_to_run = Self::find_agent_to_run(&root_agent, session.as_ref());
76
77 let artifact_service_clone = artifact_service.clone();
79 let memory_service_clone = memory_service.clone();
80
81 let invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
83 let mut ctx = InvocationContext::new(
84 invocation_id.clone(),
85 agent_to_run.clone(),
86 user_id.clone(),
87 app_name.clone(),
88 session_id.clone(),
89 user_content.clone(),
90 Arc::from(session),
91 );
92
93 if let Some(service) = artifact_service {
95 let scoped = adk_artifact::ScopedArtifacts::new(
97 service,
98 app_name.clone(),
99 user_id.clone(),
100 session_id.clone(),
101 );
102 ctx = ctx.with_artifacts(Arc::new(scoped));
103 }
104 if let Some(memory) = memory_service {
105 ctx = ctx.with_memory(memory);
106 }
107
108 ctx = ctx.with_run_config(run_config.clone());
110
111 let ctx = Arc::new(ctx);
112
113 let mut user_event = adk_core::Event::new(&invocation_id);
115 user_event.author = "user".to_string();
116 user_event.llm_response.content = Some(user_content.clone());
117
118 ctx.mutable_session().append_event(user_event.clone());
121
122 if let Err(e) = session_service.append_event(&session_id, user_event).await {
123 yield Err(e);
124 return;
125 }
126
127 let agent_span = tracing::info_span!(
129 "agent.execute",
130 "gcp.vertex.agent.invocation_id" = %invocation_id,
131 "gcp.vertex.agent.session_id" = %session_id,
132 "gcp.vertex.agent.event_id" = %invocation_id, "agent.name" = %agent_to_run.name()
134 );
135
136 let mut agent_stream = match agent_to_run.run(ctx.clone()).instrument(agent_span).await {
137 Ok(s) => s,
138 Err(e) => {
139 yield Err(e);
140 return;
141 }
142 };
143
144 use futures::StreamExt;
146 let mut transfer_target: Option<String> = None;
147
148 while let Some(result) = agent_stream.next().await {
149 match result {
150 Ok(event) => {
151 if let Some(target) = &event.actions.transfer_to_agent {
153 transfer_target = Some(target.clone());
154 }
155
156 if !event.actions.state_delta.is_empty() {
162 ctx.mutable_session().apply_state_delta(&event.actions.state_delta);
163 }
164
165 ctx.mutable_session().append_event(event.clone());
167
168 if let Err(e) = session_service.append_event(&session_id, event.clone()).await {
170 yield Err(e);
171 return;
172 }
173 yield Ok(event);
174 }
175 Err(e) => {
176 yield Err(e);
177 return;
178 }
179 }
180 }
181
182 if let Some(target_name) = transfer_target {
184 if let Some(target_agent) = Self::find_agent(&root_agent, &target_name) {
185 let transfer_invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
187 let mut transfer_ctx = InvocationContext::with_mutable_session(
188 transfer_invocation_id.clone(),
189 target_agent.clone(),
190 user_id.clone(),
191 app_name.clone(),
192 session_id.clone(),
193 user_content.clone(),
194 ctx.mutable_session().clone(),
195 );
196
197 if let Some(service) = artifact_service_clone {
198 let scoped = adk_artifact::ScopedArtifacts::new(
199 service,
200 app_name.clone(),
201 user_id.clone(),
202 session_id.clone(),
203 );
204 transfer_ctx = transfer_ctx.with_artifacts(Arc::new(scoped));
205 }
206 if let Some(memory) = memory_service_clone {
207 transfer_ctx = transfer_ctx.with_memory(memory);
208 }
209
210 let transfer_ctx = Arc::new(transfer_ctx);
211
212 let mut transfer_stream = match target_agent.run(transfer_ctx.clone()).await {
214 Ok(s) => s,
215 Err(e) => {
216 yield Err(e);
217 return;
218 }
219 };
220
221 while let Some(result) = transfer_stream.next().await {
223 match result {
224 Ok(event) => {
225 if !event.actions.state_delta.is_empty() {
227 transfer_ctx.mutable_session().apply_state_delta(&event.actions.state_delta);
228 }
229
230 transfer_ctx.mutable_session().append_event(event.clone());
232
233 if let Err(e) = session_service.append_event(&session_id, event.clone()).await {
234 yield Err(e);
235 return;
236 }
237 yield Ok(event);
238 }
239 Err(e) => {
240 yield Err(e);
241 return;
242 }
243 }
244 }
245 }
246 }
247 };
248
249 Ok(Box::pin(s))
250 }
251
252 pub fn find_agent_to_run(
254 root_agent: &Arc<dyn Agent>,
255 session: &dyn adk_session::Session,
256 ) -> Arc<dyn Agent> {
257 let events = session.events();
259 for i in (0..events.len()).rev() {
260 if let Some(event) = events.at(i) {
261 if let Some(target_name) = &event.actions.transfer_to_agent {
263 if let Some(agent) = Self::find_agent(root_agent, target_name) {
264 return agent;
265 }
266 }
267
268 if event.author == "user" {
269 continue;
270 }
271
272 if let Some(agent) = Self::find_agent(root_agent, &event.author) {
274 if Self::is_transferable(root_agent, &agent) {
276 return agent;
277 }
278 }
279 }
280 }
281
282 root_agent.clone()
284 }
285
286 fn is_transferable(root_agent: &Arc<dyn Agent>, agent: &Arc<dyn Agent>) -> bool {
288 let _ = (root_agent, agent);
291 true
292 }
293
294 pub fn find_agent(current: &Arc<dyn Agent>, target_name: &str) -> Option<Arc<dyn Agent>> {
296 if current.name() == target_name {
297 return Some(current.clone());
298 }
299
300 for sub_agent in current.sub_agents() {
301 if let Some(found) = Self::find_agent(sub_agent, target_name) {
302 return Some(found);
303 }
304 }
305
306 None
307 }
308}