1use crate::InvocationContext;
2use adk_artifact::ArtifactService;
3use adk_core::{Agent, Content, EventStream, Memory, Result, RunConfig};
4use adk_plugin::PluginManager;
5use adk_session::SessionService;
6use adk_skill::{SkillInjector, SkillInjectorConfig};
7use async_stream::stream;
8use std::sync::Arc;
9use tracing::Instrument;
10
11pub struct RunnerConfig {
12 pub app_name: String,
13 pub agent: Arc<dyn Agent>,
14 pub session_service: Arc<dyn SessionService>,
15 pub artifact_service: Option<Arc<dyn ArtifactService>>,
16 pub memory_service: Option<Arc<dyn Memory>>,
17 pub plugin_manager: Option<Arc<PluginManager>>,
18 #[allow(dead_code)]
21 pub run_config: Option<RunConfig>,
22 pub compaction_config: Option<adk_core::EventsCompactionConfig>,
26}
27
28pub struct Runner {
29 app_name: String,
30 root_agent: Arc<dyn Agent>,
31 session_service: Arc<dyn SessionService>,
32 artifact_service: Option<Arc<dyn ArtifactService>>,
33 memory_service: Option<Arc<dyn Memory>>,
34 plugin_manager: Option<Arc<PluginManager>>,
35 skill_injector: Option<Arc<SkillInjector>>,
36 run_config: RunConfig,
37 compaction_config: Option<adk_core::EventsCompactionConfig>,
38}
39
40impl Runner {
41 pub fn new(config: RunnerConfig) -> Result<Self> {
42 Ok(Self {
43 app_name: config.app_name,
44 root_agent: config.agent,
45 session_service: config.session_service,
46 artifact_service: config.artifact_service,
47 memory_service: config.memory_service,
48 plugin_manager: config.plugin_manager,
49 skill_injector: None,
50 run_config: config.run_config.unwrap_or_default(),
51 compaction_config: config.compaction_config,
52 })
53 }
54
55 pub fn with_skill_injector(mut self, injector: SkillInjector) -> Self {
59 self.skill_injector = Some(Arc::new(injector));
60 self
61 }
62
63 pub fn with_auto_skills(
65 mut self,
66 root: impl AsRef<std::path::Path>,
67 config: SkillInjectorConfig,
68 ) -> adk_skill::SkillResult<Self> {
69 let injector = SkillInjector::from_root(root, config)?;
70 self.skill_injector = Some(Arc::new(injector));
71 Ok(self)
72 }
73
74 pub async fn run(
75 &self,
76 user_id: String,
77 session_id: String,
78 user_content: Content,
79 ) -> Result<EventStream> {
80 let app_name = self.app_name.clone();
81 let session_service = self.session_service.clone();
82 let root_agent = self.root_agent.clone();
83 let artifact_service = self.artifact_service.clone();
84 let memory_service = self.memory_service.clone();
85 let plugin_manager = self.plugin_manager.clone();
86 let skill_injector = self.skill_injector.clone();
87 let run_config = self.run_config.clone();
88 let compaction_config = self.compaction_config.clone();
89
90 let s = stream! {
91 let session = match session_service
93 .get(adk_session::GetRequest {
94 app_name: app_name.clone(),
95 user_id: user_id.clone(),
96 session_id: session_id.clone(),
97 num_recent_events: None,
98 after: None,
99 })
100 .await
101 {
102 Ok(s) => s,
103 Err(e) => {
104 yield Err(e);
105 return;
106 }
107 };
108
109 let agent_to_run = Self::find_agent_to_run(&root_agent, session.as_ref());
111
112 let artifact_service_clone = artifact_service.clone();
114 let memory_service_clone = memory_service.clone();
115
116 let invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
118 let mut effective_user_content = user_content.clone();
119 let mut selected_skill_name = String::new();
120 let mut selected_skill_id = String::new();
121
122 if let Some(injector) = skill_injector.as_ref() {
123 if let Some(matched) = adk_skill::apply_skill_injection(
124 &mut effective_user_content,
125 injector.index(),
126 injector.policy(),
127 injector.max_injected_chars(),
128 ) {
129 selected_skill_name = matched.skill.name;
130 selected_skill_id = matched.skill.id;
131 }
132 }
133
134 let mut invocation_ctx = InvocationContext::new(
135 invocation_id.clone(),
136 agent_to_run.clone(),
137 user_id.clone(),
138 app_name.clone(),
139 session_id.clone(),
140 effective_user_content.clone(),
141 Arc::from(session),
142 );
143
144 if let Some(service) = artifact_service {
146 let scoped = adk_artifact::ScopedArtifacts::new(
148 service,
149 app_name.clone(),
150 user_id.clone(),
151 session_id.clone(),
152 );
153 invocation_ctx = invocation_ctx.with_artifacts(Arc::new(scoped));
154 }
155 if let Some(memory) = memory_service {
156 invocation_ctx = invocation_ctx.with_memory(memory);
157 }
158
159 invocation_ctx = invocation_ctx.with_run_config(run_config.clone());
161
162 let mut ctx = Arc::new(invocation_ctx);
163
164 if let Some(manager) = plugin_manager.as_ref() {
165 match manager
166 .run_before_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>)
167 .await
168 {
169 Ok(Some(content)) => {
170 let mut early_event = adk_core::Event::new(&invocation_id);
171 early_event.author = agent_to_run.name().to_string();
172 early_event.llm_response.content = Some(content);
173
174 ctx.mutable_session().append_event(early_event.clone());
175 if let Err(e) = session_service.append_event(&session_id, early_event.clone()).await {
176 yield Err(e);
177 return;
178 }
179
180 yield Ok(early_event);
181 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
182 return;
183 }
184 Ok(None) => {}
185 Err(e) => {
186 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
187 yield Err(e);
188 return;
189 }
190 }
191
192 match manager
193 .run_on_user_message(
194 ctx.clone() as Arc<dyn adk_core::InvocationContext>,
195 effective_user_content.clone(),
196 )
197 .await
198 {
199 Ok(Some(modified)) => {
200 effective_user_content = modified;
201
202 let mut refreshed_ctx = InvocationContext::with_mutable_session(
203 invocation_id.clone(),
204 agent_to_run.clone(),
205 user_id.clone(),
206 app_name.clone(),
207 session_id.clone(),
208 effective_user_content.clone(),
209 ctx.mutable_session().clone(),
210 );
211
212 if let Some(service) = artifact_service_clone.clone() {
213 let scoped = adk_artifact::ScopedArtifacts::new(
214 service,
215 app_name.clone(),
216 user_id.clone(),
217 session_id.clone(),
218 );
219 refreshed_ctx = refreshed_ctx.with_artifacts(Arc::new(scoped));
220 }
221 if let Some(memory) = memory_service_clone.clone() {
222 refreshed_ctx = refreshed_ctx.with_memory(memory);
223 }
224 refreshed_ctx = refreshed_ctx.with_run_config(run_config.clone());
225 ctx = Arc::new(refreshed_ctx);
226 }
227 Ok(None) => {}
228 Err(e) => {
229 if let Some(manager) = plugin_manager.as_ref() {
230 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
231 }
232 yield Err(e);
233 return;
234 }
235 }
236 }
237
238 let mut user_event = adk_core::Event::new(&invocation_id);
240 user_event.author = "user".to_string();
241 user_event.llm_response.content = Some(effective_user_content.clone());
242
243 ctx.mutable_session().append_event(user_event.clone());
246
247 if let Err(e) = session_service.append_event(&session_id, user_event).await {
248 if let Some(manager) = plugin_manager.as_ref() {
249 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
250 }
251 yield Err(e);
252 return;
253 }
254
255 let agent_span = tracing::info_span!(
257 "agent.execute",
258 "gcp.vertex.agent.invocation_id" = %invocation_id,
259 "gcp.vertex.agent.session_id" = %session_id,
260 "gcp.vertex.agent.event_id" = %invocation_id, "gen_ai.conversation.id" = %session_id,
262 "agent.name" = %agent_to_run.name(),
263 "adk.skills.selected_name" = %selected_skill_name,
264 "adk.skills.selected_id" = %selected_skill_id
265 );
266
267 let mut agent_stream = match agent_to_run.run(ctx.clone()).instrument(agent_span).await {
268 Ok(s) => s,
269 Err(e) => {
270 if let Some(manager) = plugin_manager.as_ref() {
271 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
272 }
273 yield Err(e);
274 return;
275 }
276 };
277
278 use futures::StreamExt;
280 let mut transfer_target: Option<String> = None;
281
282 while let Some(result) = agent_stream.next().await {
283 match result {
284 Ok(event) => {
285 let mut event = event;
286
287 if let Some(manager) = plugin_manager.as_ref() {
288 match manager
289 .run_on_event(
290 ctx.clone() as Arc<dyn adk_core::InvocationContext>,
291 event.clone(),
292 )
293 .await
294 {
295 Ok(Some(modified)) => {
296 event = modified;
297 }
298 Ok(None) => {}
299 Err(e) => {
300 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
301 yield Err(e);
302 return;
303 }
304 }
305 }
306
307 if let Some(target) = &event.actions.transfer_to_agent {
309 transfer_target = Some(target.clone());
310 }
311
312 if !event.actions.state_delta.is_empty() {
318 ctx.mutable_session().apply_state_delta(&event.actions.state_delta);
319 }
320
321 ctx.mutable_session().append_event(event.clone());
323
324 if let Err(e) = session_service.append_event(&session_id, event.clone()).await {
326 if let Some(manager) = plugin_manager.as_ref() {
327 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
328 }
329 yield Err(e);
330 return;
331 }
332 yield Ok(event);
333 }
334 Err(e) => {
335 if let Some(manager) = plugin_manager.as_ref() {
336 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
337 }
338 yield Err(e);
339 return;
340 }
341 }
342 }
343
344 if let Some(target_name) = transfer_target {
346 if let Some(target_agent) = Self::find_agent(&root_agent, &target_name) {
347 let transfer_invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
349 let mut transfer_ctx = InvocationContext::with_mutable_session(
350 transfer_invocation_id.clone(),
351 target_agent.clone(),
352 user_id.clone(),
353 app_name.clone(),
354 session_id.clone(),
355 effective_user_content.clone(),
356 ctx.mutable_session().clone(),
357 );
358
359 if let Some(service) = artifact_service_clone {
360 let scoped = adk_artifact::ScopedArtifacts::new(
361 service,
362 app_name.clone(),
363 user_id.clone(),
364 session_id.clone(),
365 );
366 transfer_ctx = transfer_ctx.with_artifacts(Arc::new(scoped));
367 }
368 if let Some(memory) = memory_service_clone {
369 transfer_ctx = transfer_ctx.with_memory(memory);
370 }
371
372 let transfer_ctx = Arc::new(transfer_ctx);
373
374 let mut transfer_stream = match target_agent.run(transfer_ctx.clone()).await {
376 Ok(s) => s,
377 Err(e) => {
378 if let Some(manager) = plugin_manager.as_ref() {
379 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
380 }
381 yield Err(e);
382 return;
383 }
384 };
385
386 while let Some(result) = transfer_stream.next().await {
388 match result {
389 Ok(event) => {
390 let mut event = event;
391 if let Some(manager) = plugin_manager.as_ref() {
392 match manager
393 .run_on_event(
394 transfer_ctx.clone() as Arc<dyn adk_core::InvocationContext>,
395 event.clone(),
396 )
397 .await
398 {
399 Ok(Some(modified)) => {
400 event = modified;
401 }
402 Ok(None) => {}
403 Err(e) => {
404 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
405 yield Err(e);
406 return;
407 }
408 }
409 }
410
411 if !event.actions.state_delta.is_empty() {
413 transfer_ctx.mutable_session().apply_state_delta(&event.actions.state_delta);
414 }
415
416 transfer_ctx.mutable_session().append_event(event.clone());
418
419 if let Err(e) = session_service.append_event(&session_id, event.clone()).await {
420 if let Some(manager) = plugin_manager.as_ref() {
421 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
422 }
423 yield Err(e);
424 return;
425 }
426 yield Ok(event);
427 }
428 Err(e) => {
429 if let Some(manager) = plugin_manager.as_ref() {
430 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
431 }
432 yield Err(e);
433 return;
434 }
435 }
436 }
437 }
438 }
439
440 if let Some(ref compaction_cfg) = compaction_config {
444 let all_events = ctx.mutable_session().as_ref().events_snapshot();
446 let invocation_count = all_events.iter()
447 .filter(|e| e.author == "user")
448 .count() as u32;
449
450 if invocation_count > 0 && invocation_count % compaction_cfg.compaction_interval == 0 {
451 let overlap = compaction_cfg.overlap_size as usize;
454
455 let user_msg_indices: Vec<usize> = all_events.iter()
457 .enumerate()
458 .filter(|(_, e)| e.author == "user")
459 .map(|(i, _)| i)
460 .collect();
461
462 let compact_up_to = if overlap == 0 {
465 all_events.len()
466 } else if user_msg_indices.len() > overlap {
467 user_msg_indices[user_msg_indices.len() - overlap]
469 } else {
470 0
472 };
473
474 if compact_up_to > 0 {
475 let events_to_compact = &all_events[..compact_up_to];
476
477 match compaction_cfg.summarizer.summarize_events(events_to_compact).await {
478 Ok(Some(compaction_event)) => {
479 if let Err(e) = session_service.append_event(
481 &session_id,
482 compaction_event.clone(),
483 ).await {
484 tracing::warn!(error = %e, "Failed to persist compaction event");
485 } else {
486 tracing::info!(
487 compacted_events = compact_up_to,
488 "Context compaction completed"
489 );
490 }
491 }
492 Ok(None) => {
493 tracing::debug!("Compaction summarizer returned no result");
494 }
495 Err(e) => {
496 tracing::warn!(error = %e, "Context compaction failed");
498 }
499 }
500 }
501 }
502 }
503
504 if let Some(manager) = plugin_manager.as_ref() {
505 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
506 }
507 };
508
509 Ok(Box::pin(s))
510 }
511
512 pub fn find_agent_to_run(
514 root_agent: &Arc<dyn Agent>,
515 session: &dyn adk_session::Session,
516 ) -> Arc<dyn Agent> {
517 let events = session.events();
519 for i in (0..events.len()).rev() {
520 if let Some(event) = events.at(i) {
521 if let Some(target_name) = &event.actions.transfer_to_agent {
523 if let Some(agent) = Self::find_agent(root_agent, target_name) {
524 return agent;
525 }
526 }
527
528 if event.author == "user" {
529 continue;
530 }
531
532 if let Some(agent) = Self::find_agent(root_agent, &event.author) {
534 if Self::is_transferable(root_agent, &agent) {
536 return agent;
537 }
538 }
539 }
540 }
541
542 root_agent.clone()
544 }
545
546 fn is_transferable(root_agent: &Arc<dyn Agent>, agent: &Arc<dyn Agent>) -> bool {
548 let _ = (root_agent, agent);
551 true
552 }
553
554 pub fn find_agent(current: &Arc<dyn Agent>, target_name: &str) -> Option<Arc<dyn Agent>> {
556 if current.name() == target_name {
557 return Some(current.clone());
558 }
559
560 for sub_agent in current.sub_agents() {
561 if let Some(found) = Self::find_agent(sub_agent, target_name) {
562 return Some(found);
563 }
564 }
565
566 None
567 }
568}