1use crate::InvocationContext;
2use adk_artifact::ArtifactService;
3use adk_core::{Agent, Content, EventStream, Memory, Result};
4use adk_session::SessionService;
5use async_stream::stream;
6use std::sync::Arc;
7
8pub struct RunnerConfig {
9 pub app_name: String,
10 pub agent: Arc<dyn Agent>,
11 pub session_service: Arc<dyn SessionService>,
12 pub artifact_service: Option<Arc<dyn ArtifactService>>,
13 pub memory_service: Option<Arc<dyn Memory>>,
14}
15
16pub struct Runner {
17 app_name: String,
18 root_agent: Arc<dyn Agent>,
19 session_service: Arc<dyn SessionService>,
20 artifact_service: Option<Arc<dyn ArtifactService>>,
21 memory_service: Option<Arc<dyn Memory>>,
22}
23
24impl Runner {
25 pub fn new(config: RunnerConfig) -> Result<Self> {
26 Ok(Self {
27 app_name: config.app_name,
28 root_agent: config.agent,
29 session_service: config.session_service,
30 artifact_service: config.artifact_service,
31 memory_service: config.memory_service,
32 })
33 }
34
35 pub async fn run(
36 &self,
37 user_id: String,
38 session_id: String,
39 user_content: Content,
40 ) -> Result<EventStream> {
41 let app_name = self.app_name.clone();
42 let session_service = self.session_service.clone();
43 let root_agent = self.root_agent.clone();
44 let artifact_service = self.artifact_service.clone();
45 let memory_service = self.memory_service.clone();
46
47 let s = stream! {
48 let session = match session_service
50 .get(adk_session::GetRequest {
51 app_name: app_name.clone(),
52 user_id: user_id.clone(),
53 session_id: session_id.clone(),
54 num_recent_events: None,
55 after: None,
56 })
57 .await
58 {
59 Ok(s) => s,
60 Err(e) => {
61 yield Err(e);
62 return;
63 }
64 };
65
66 let agent_to_run = Self::find_agent_to_run(&root_agent, session.as_ref());
68
69 let artifact_service_clone = artifact_service.clone();
71 let memory_service_clone = memory_service.clone();
72
73 let invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
75 let mut ctx = InvocationContext::new(
76 invocation_id.clone(),
77 agent_to_run.clone(),
78 user_id.clone(),
79 app_name.clone(),
80 session_id.clone(),
81 user_content.clone(),
82 Arc::from(session),
83 );
84
85 if let Some(service) = artifact_service {
87 let scoped = adk_artifact::ScopedArtifacts::new(
89 service,
90 app_name.clone(),
91 user_id.clone(),
92 session_id.clone(),
93 );
94 ctx = ctx.with_artifacts(Arc::new(scoped));
95 }
96 if let Some(memory) = memory_service {
97 ctx = ctx.with_memory(memory);
98 }
99
100 let ctx = Arc::new(ctx);
101
102 let mut user_event = adk_core::Event::new(&invocation_id);
104 user_event.author = "user".to_string();
105 user_event.llm_response.content = Some(user_content.clone());
106
107 ctx.mutable_session().append_event(user_event.clone());
110
111 if let Err(e) = session_service.append_event(&session_id, user_event).await {
112 yield Err(e);
113 return;
114 }
115
116 let mut agent_stream = match agent_to_run.run(ctx.clone()).await {
118 Ok(s) => s,
119 Err(e) => {
120 yield Err(e);
121 return;
122 }
123 };
124
125 use futures::StreamExt;
127 let mut transfer_target: Option<String> = None;
128
129 while let Some(result) = agent_stream.next().await {
130 match result {
131 Ok(event) => {
132 if let Some(target) = &event.actions.transfer_to_agent {
134 transfer_target = Some(target.clone());
135 }
136
137 if !event.actions.state_delta.is_empty() {
143 ctx.mutable_session().apply_state_delta(&event.actions.state_delta);
144 }
145
146 ctx.mutable_session().append_event(event.clone());
148
149 if let Err(e) = session_service.append_event(&session_id, event.clone()).await {
151 yield Err(e);
152 return;
153 }
154 yield Ok(event);
155 }
156 Err(e) => {
157 yield Err(e);
158 return;
159 }
160 }
161 }
162
163 if let Some(target_name) = transfer_target {
165 if let Some(target_agent) = Self::find_agent(&root_agent, &target_name) {
166 let transfer_invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
168 let mut transfer_ctx = InvocationContext::with_mutable_session(
169 transfer_invocation_id.clone(),
170 target_agent.clone(),
171 user_id.clone(),
172 app_name.clone(),
173 session_id.clone(),
174 user_content.clone(),
175 ctx.mutable_session().clone(),
176 );
177
178 if let Some(service) = artifact_service_clone {
179 let scoped = adk_artifact::ScopedArtifacts::new(
180 service,
181 app_name.clone(),
182 user_id.clone(),
183 session_id.clone(),
184 );
185 transfer_ctx = transfer_ctx.with_artifacts(Arc::new(scoped));
186 }
187 if let Some(memory) = memory_service_clone {
188 transfer_ctx = transfer_ctx.with_memory(memory);
189 }
190
191 let transfer_ctx = Arc::new(transfer_ctx);
192
193 let mut transfer_stream = match target_agent.run(transfer_ctx.clone()).await {
195 Ok(s) => s,
196 Err(e) => {
197 yield Err(e);
198 return;
199 }
200 };
201
202 while let Some(result) = transfer_stream.next().await {
204 match result {
205 Ok(event) => {
206 if !event.actions.state_delta.is_empty() {
208 transfer_ctx.mutable_session().apply_state_delta(&event.actions.state_delta);
209 }
210
211 transfer_ctx.mutable_session().append_event(event.clone());
213
214 if let Err(e) = session_service.append_event(&session_id, event.clone()).await {
215 yield Err(e);
216 return;
217 }
218 yield Ok(event);
219 }
220 Err(e) => {
221 yield Err(e);
222 return;
223 }
224 }
225 }
226 }
227 }
228 };
229
230 Ok(Box::pin(s))
231 }
232
233 pub fn find_agent_to_run(
235 root_agent: &Arc<dyn Agent>,
236 session: &dyn adk_session::Session,
237 ) -> Arc<dyn Agent> {
238 let events = session.events();
240 for i in (0..events.len()).rev() {
241 if let Some(event) = events.at(i) {
242 if let Some(target_name) = &event.actions.transfer_to_agent {
244 if let Some(agent) = Self::find_agent(root_agent, target_name) {
245 return agent;
246 }
247 }
248
249 if event.author == "user" {
250 continue;
251 }
252
253 if let Some(agent) = Self::find_agent(root_agent, &event.author) {
255 if Self::is_transferable(root_agent, &agent) {
257 return agent;
258 }
259 }
260 }
261 }
262
263 root_agent.clone()
265 }
266
267 fn is_transferable(root_agent: &Arc<dyn Agent>, agent: &Arc<dyn Agent>) -> bool {
269 let _ = (root_agent, agent);
272 true
273 }
274
275 pub fn find_agent(current: &Arc<dyn Agent>, target_name: &str) -> Option<Arc<dyn Agent>> {
277 if current.name() == target_name {
278 return Some(current.clone());
279 }
280
281 for sub_agent in current.sub_agents() {
282 if let Some(found) = Self::find_agent(sub_agent, target_name) {
283 return Some(found);
284 }
285 }
286
287 None
288 }
289}