1use crate::InvocationContext;
2use adk_artifact::ArtifactService;
3use adk_core::{Agent, Content, EventStream, Memory, Result};
4use adk_session::SessionService;
5use async_stream::stream;
6use std::sync::Arc;
7
8pub struct RunnerConfig {
9 pub app_name: String,
10 pub agent: Arc<dyn Agent>,
11 pub session_service: Arc<dyn SessionService>,
12 pub artifact_service: Option<Arc<dyn ArtifactService>>,
13 pub memory_service: Option<Arc<dyn Memory>>,
14}
15
16pub struct Runner {
17 app_name: String,
18 root_agent: Arc<dyn Agent>,
19 session_service: Arc<dyn SessionService>,
20 artifact_service: Option<Arc<dyn ArtifactService>>,
21 memory_service: Option<Arc<dyn Memory>>,
22}
23
24impl Runner {
25 pub fn new(config: RunnerConfig) -> Result<Self> {
26 Ok(Self {
27 app_name: config.app_name,
28 root_agent: config.agent,
29 session_service: config.session_service,
30 artifact_service: config.artifact_service,
31 memory_service: config.memory_service,
32 })
33 }
34
35 pub async fn run(
36 &self,
37 user_id: String,
38 session_id: String,
39 user_content: Content,
40 ) -> Result<EventStream> {
41 let app_name = self.app_name.clone();
42 let session_service = self.session_service.clone();
43 let root_agent = self.root_agent.clone();
44 let artifact_service = self.artifact_service.clone();
45 let memory_service = self.memory_service.clone();
46
47 let s = stream! {
48 let session = match session_service
50 .get(adk_session::GetRequest {
51 app_name: app_name.clone(),
52 user_id: user_id.clone(),
53 session_id: session_id.clone(),
54 num_recent_events: None,
55 after: None,
56 })
57 .await
58 {
59 Ok(s) => s,
60 Err(e) => {
61 yield Err(e);
62 return;
63 }
64 };
65
66 let agent_to_run = Self::find_agent_to_run(&root_agent, session.as_ref());
68
69 let artifact_service_clone = artifact_service.clone();
71 let memory_service_clone = memory_service.clone();
72
73 let invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
75 let mut ctx = InvocationContext::new(
76 invocation_id.clone(),
77 agent_to_run.clone(),
78 user_id.clone(),
79 app_name.clone(),
80 session_id.clone(),
81 user_content.clone(),
82 Arc::from(session),
83 );
84
85 if let Some(service) = artifact_service {
87 let scoped = adk_artifact::ScopedArtifacts::new(
89 service,
90 app_name.clone(),
91 user_id.clone(),
92 session_id.clone(),
93 );
94 ctx = ctx.with_artifacts(Arc::new(scoped));
95 }
96 if let Some(memory) = memory_service {
97 ctx = ctx.with_memory(memory);
98 }
99
100 let ctx = Arc::new(ctx);
101
102 let mut user_event = adk_core::Event::new(&invocation_id);
104 user_event.author = "user".to_string();
105 user_event.llm_response.content = Some(user_content.clone());
106
107 if let Err(e) = session_service.append_event(&session_id, user_event).await {
108 yield Err(e);
109 return;
110 }
111
112 let mut agent_stream = match agent_to_run.run(ctx).await {
114 Ok(s) => s,
115 Err(e) => {
116 yield Err(e);
117 return;
118 }
119 };
120
121 use futures::StreamExt;
123 let mut transfer_target: Option<String> = None;
124
125 while let Some(result) = agent_stream.next().await {
126 match result {
127 Ok(event) => {
128 if let Some(target) = &event.actions.transfer_to_agent {
130 transfer_target = Some(target.clone());
131 }
132
133 if let Err(e) = session_service.append_event(&session_id, event.clone()).await {
135 yield Err(e);
136 return;
137 }
138 yield Ok(event);
139 }
140 Err(e) => {
141 yield Err(e);
142 return;
143 }
144 }
145 }
146
147 if let Some(target_name) = transfer_target {
149 if let Some(target_agent) = Self::find_agent(&root_agent, &target_name) {
150 let transfer_session = match session_service
152 .get(adk_session::GetRequest {
153 app_name: app_name.clone(),
154 user_id: user_id.clone(),
155 session_id: session_id.clone(),
156 num_recent_events: None,
157 after: None,
158 })
159 .await
160 {
161 Ok(s) => s,
162 Err(e) => {
163 yield Err(e);
164 return;
165 }
166 };
167
168 let transfer_invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
170 let mut transfer_ctx = InvocationContext::new(
171 transfer_invocation_id.clone(),
172 target_agent.clone(),
173 user_id.clone(),
174 app_name.clone(),
175 session_id.clone(),
176 user_content.clone(),
177 Arc::from(transfer_session),
178 );
179
180 if let Some(service) = artifact_service_clone {
181 let scoped = adk_artifact::ScopedArtifacts::new(
182 service,
183 app_name.clone(),
184 user_id.clone(),
185 session_id.clone(),
186 );
187 transfer_ctx = transfer_ctx.with_artifacts(Arc::new(scoped));
188 }
189 if let Some(memory) = memory_service_clone {
190 transfer_ctx = transfer_ctx.with_memory(memory);
191 }
192
193 let transfer_ctx = Arc::new(transfer_ctx);
194
195 let mut transfer_stream = match target_agent.run(transfer_ctx).await {
197 Ok(s) => s,
198 Err(e) => {
199 yield Err(e);
200 return;
201 }
202 };
203
204 while let Some(result) = transfer_stream.next().await {
206 match result {
207 Ok(event) => {
208 if let Err(e) = session_service.append_event(&session_id, event.clone()).await {
209 yield Err(e);
210 return;
211 }
212 yield Ok(event);
213 }
214 Err(e) => {
215 yield Err(e);
216 return;
217 }
218 }
219 }
220 }
221 }
222 };
223
224 Ok(Box::pin(s))
225 }
226
227 pub fn find_agent_to_run(
229 root_agent: &Arc<dyn Agent>,
230 session: &dyn adk_session::Session,
231 ) -> Arc<dyn Agent> {
232 let events = session.events();
234 for i in (0..events.len()).rev() {
235 if let Some(event) = events.at(i) {
236 if let Some(target_name) = &event.actions.transfer_to_agent {
238 if let Some(agent) = Self::find_agent(root_agent, target_name) {
239 return agent;
240 }
241 }
242
243 if event.author == "user" {
244 continue;
245 }
246
247 if let Some(agent) = Self::find_agent(root_agent, &event.author) {
249 if Self::is_transferable(root_agent, &agent) {
251 return agent;
252 }
253 }
254 }
255 }
256
257 root_agent.clone()
259 }
260
261 fn is_transferable(root_agent: &Arc<dyn Agent>, agent: &Arc<dyn Agent>) -> bool {
263 let _ = (root_agent, agent);
266 true
267 }
268
269 pub fn find_agent(current: &Arc<dyn Agent>, target_name: &str) -> Option<Arc<dyn Agent>> {
271 if current.name() == target_name {
272 return Some(current.clone());
273 }
274
275 for sub_agent in current.sub_agents() {
276 if let Some(found) = Self::find_agent(sub_agent, target_name) {
277 return Some(found);
278 }
279 }
280
281 None
282 }
283}
284
285