1use crate::InvocationContext;
2use crate::cache::CacheManager;
3use adk_artifact::ArtifactService;
4use adk_core::{
5 Agent, CacheCapable, Content, ContextCacheConfig, EventStream, Memory, ReadonlyContext, Result,
6 RunConfig,
7};
8use adk_plugin::PluginManager;
9use adk_session::SessionService;
10use adk_skill::{SkillInjector, SkillInjectorConfig};
11use async_stream::stream;
12use std::sync::Arc;
13use tokio_util::sync::CancellationToken;
14use tracing::Instrument;
15
16pub struct RunnerConfig {
17 pub app_name: String,
18 pub agent: Arc<dyn Agent>,
19 pub session_service: Arc<dyn SessionService>,
20 pub artifact_service: Option<Arc<dyn ArtifactService>>,
21 pub memory_service: Option<Arc<dyn Memory>>,
22 pub plugin_manager: Option<Arc<PluginManager>>,
23 #[allow(dead_code)]
26 pub run_config: Option<RunConfig>,
27 pub compaction_config: Option<adk_core::EventsCompactionConfig>,
31 pub context_cache_config: Option<ContextCacheConfig>,
35 pub cache_capable: Option<Arc<dyn CacheCapable>>,
38 pub request_context: Option<adk_core::RequestContext>,
42 pub cancellation_token: Option<CancellationToken>,
44}
45
46pub struct Runner {
47 app_name: String,
48 root_agent: Arc<dyn Agent>,
49 session_service: Arc<dyn SessionService>,
50 artifact_service: Option<Arc<dyn ArtifactService>>,
51 memory_service: Option<Arc<dyn Memory>>,
52 plugin_manager: Option<Arc<PluginManager>>,
53 skill_injector: Option<Arc<SkillInjector>>,
54 run_config: RunConfig,
55 compaction_config: Option<adk_core::EventsCompactionConfig>,
56 context_cache_config: Option<ContextCacheConfig>,
57 cache_capable: Option<Arc<dyn CacheCapable>>,
58 cache_manager: Option<Arc<tokio::sync::Mutex<CacheManager>>>,
59 request_context: Option<adk_core::RequestContext>,
60 cancellation_token: Option<CancellationToken>,
61}
62
63impl Runner {
64 pub fn new(config: RunnerConfig) -> Result<Self> {
65 let cache_manager = config
66 .context_cache_config
67 .as_ref()
68 .map(|c| Arc::new(tokio::sync::Mutex::new(CacheManager::new(c.clone()))));
69 Ok(Self {
70 app_name: config.app_name,
71 root_agent: config.agent,
72 session_service: config.session_service,
73 artifact_service: config.artifact_service,
74 memory_service: config.memory_service,
75 plugin_manager: config.plugin_manager,
76 skill_injector: None,
77 run_config: config.run_config.unwrap_or_default(),
78 compaction_config: config.compaction_config,
79 context_cache_config: config.context_cache_config,
80 cache_capable: config.cache_capable,
81 cache_manager,
82 request_context: config.request_context,
83 cancellation_token: config.cancellation_token,
84 })
85 }
86
87 pub fn with_skill_injector(mut self, injector: SkillInjector) -> Self {
91 self.skill_injector = Some(Arc::new(injector));
92 self
93 }
94
95 pub fn with_auto_skills(
97 mut self,
98 root: impl AsRef<std::path::Path>,
99 config: SkillInjectorConfig,
100 ) -> adk_skill::SkillResult<Self> {
101 let injector = SkillInjector::from_root(root, config)?;
102 self.skill_injector = Some(Arc::new(injector));
103 Ok(self)
104 }
105
106 pub async fn run(
107 &self,
108 user_id: String,
109 session_id: String,
110 user_content: Content,
111 ) -> Result<EventStream> {
112 let app_name = self.app_name.clone();
113 let session_service = self.session_service.clone();
114 let root_agent = self.root_agent.clone();
115 let artifact_service = self.artifact_service.clone();
116 let memory_service = self.memory_service.clone();
117 let plugin_manager = self.plugin_manager.clone();
118 let skill_injector = self.skill_injector.clone();
119 let mut run_config = self.run_config.clone();
120 let compaction_config = self.compaction_config.clone();
121 let context_cache_config = self.context_cache_config.clone();
122 let cache_capable = self.cache_capable.clone();
123 let cache_manager_ref = self.cache_manager.clone();
124 let request_context = self.request_context.clone();
125 let cancellation_token = self.cancellation_token.clone();
126
127 let s = stream! {
128 let session = match session_service
130 .get(adk_session::GetRequest {
131 app_name: app_name.clone(),
132 user_id: user_id.clone(),
133 session_id: session_id.clone(),
134 num_recent_events: None,
135 after: None,
136 })
137 .await
138 {
139 Ok(s) => s,
140 Err(e) => {
141 yield Err(e);
142 return;
143 }
144 };
145
146 let agent_to_run = Self::find_agent_to_run(&root_agent, session.as_ref());
148
149 let artifact_service_clone = artifact_service.clone();
151 let memory_service_clone = memory_service.clone();
152
153 let invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
155 let mut effective_user_content = user_content.clone();
156 let mut selected_skill_name = String::new();
157 let mut selected_skill_id = String::new();
158
159 if let Some(injector) = skill_injector.as_ref() {
160 if let Some(matched) = adk_skill::apply_skill_injection(
161 &mut effective_user_content,
162 injector.index(),
163 injector.policy(),
164 injector.max_injected_chars(),
165 ) {
166 selected_skill_name = matched.skill.name;
167 selected_skill_id = matched.skill.id;
168 }
169 }
170
171 let mut invocation_ctx = InvocationContext::new(
172 invocation_id.clone(),
173 agent_to_run.clone(),
174 user_id.clone(),
175 app_name.clone(),
176 session_id.clone(),
177 effective_user_content.clone(),
178 Arc::from(session),
179 );
180
181 if let Some(service) = artifact_service {
183 let scoped = adk_artifact::ScopedArtifacts::new(
185 service,
186 app_name.clone(),
187 user_id.clone(),
188 session_id.clone(),
189 );
190 invocation_ctx = invocation_ctx.with_artifacts(Arc::new(scoped));
191 }
192 if let Some(memory) = memory_service {
193 invocation_ctx = invocation_ctx.with_memory(memory);
194 }
195
196 invocation_ctx = invocation_ctx.with_run_config(run_config.clone());
198
199 if let Some(rc) = request_context.clone() {
201 invocation_ctx = invocation_ctx.with_request_context(rc);
202 }
203
204 let mut ctx = Arc::new(invocation_ctx);
205
206 if let Some(manager) = plugin_manager.as_ref() {
207 match manager
208 .run_before_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>)
209 .await
210 {
211 Ok(Some(content)) => {
212 let mut early_event = adk_core::Event::new(ctx.invocation_id());
213 early_event.author = agent_to_run.name().to_string();
214 early_event.llm_response.content = Some(content);
215
216 ctx.mutable_session().append_event(early_event.clone());
217 if let Err(e) = session_service.append_event(ctx.session_id(), early_event.clone()).await {
218 yield Err(e);
219 return;
220 }
221
222 yield Ok(early_event);
223 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
224 return;
225 }
226 Ok(None) => {}
227 Err(e) => {
228 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
229 yield Err(e);
230 return;
231 }
232 }
233
234 match manager
235 .run_on_user_message(
236 ctx.clone() as Arc<dyn adk_core::InvocationContext>,
237 effective_user_content.clone(),
238 )
239 .await
240 {
241 Ok(Some(modified)) => {
242 effective_user_content = modified;
243
244 let mut refreshed_ctx = InvocationContext::with_mutable_session(
245 ctx.invocation_id().to_string(),
246 agent_to_run.clone(),
247 ctx.user_id().to_string(),
248 ctx.app_name().to_string(),
249 ctx.session_id().to_string(),
250 effective_user_content.clone(),
251 ctx.mutable_session().clone(),
252 );
253
254 if let Some(service) = artifact_service_clone.clone() {
255 let scoped = adk_artifact::ScopedArtifacts::new(
256 service,
257 ctx.app_name().to_string(),
258 ctx.user_id().to_string(),
259 ctx.session_id().to_string(),
260 );
261 refreshed_ctx = refreshed_ctx.with_artifacts(Arc::new(scoped));
262 }
263 if let Some(memory) = memory_service_clone.clone() {
264 refreshed_ctx = refreshed_ctx.with_memory(memory);
265 }
266 refreshed_ctx = refreshed_ctx.with_run_config(run_config.clone());
267 if let Some(rc) = request_context.clone() {
268 refreshed_ctx = refreshed_ctx.with_request_context(rc);
269 }
270 ctx = Arc::new(refreshed_ctx);
271 }
272 Ok(None) => {}
273 Err(e) => {
274 if let Some(manager) = plugin_manager.as_ref() {
275 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
276 }
277 yield Err(e);
278 return;
279 }
280 }
281 }
282
283 let mut user_event = adk_core::Event::new(ctx.invocation_id());
285 user_event.author = "user".to_string();
286 user_event.llm_response.content = Some(effective_user_content.clone());
287
288 ctx.mutable_session().append_event(user_event.clone());
291
292 if let Err(e) = session_service.append_event(ctx.session_id(), user_event).await {
293 if let Some(manager) = plugin_manager.as_ref() {
294 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
295 }
296 yield Err(e);
297 return;
298 }
299
300 if let (Some(cm_mutex), Some(cache_model)) = (&cache_manager_ref, &cache_capable) {
305 let mut cm = cm_mutex.lock().await;
306 if cm.is_enabled() {
307 if cm.active_cache_name().is_none() || cm.needs_refresh() {
308 let system_instruction = agent_to_run.description().to_string();
312 let tools = std::collections::HashMap::new();
313 let ttl = context_cache_config.as_ref().map_or(600, |c| c.ttl_seconds);
314
315 match cache_model.create_cache(&system_instruction, &tools, ttl).await {
316 Ok(name) => {
317 if let Some(old) = cm.clear_active_cache() {
319 if let Err(e) = cache_model.delete_cache(&old).await {
320 tracing::warn!(
321 old_cache = %old,
322 error = %e,
323 "failed to delete old cache, proceeding with new cache"
324 );
325 }
326 }
327 cm.set_active_cache(name);
328 }
329 Err(e) => {
330 tracing::warn!(
331 error = %e,
332 "cache creation failed, proceeding without cache"
333 );
334 }
335 }
336 }
337
338 if let Some(cache_name) = cm.record_invocation() {
340 run_config.cached_content = Some(cache_name.to_string());
341 let mut refreshed_ctx = InvocationContext::with_mutable_session(
343 ctx.invocation_id().to_string(),
344 agent_to_run.clone(),
345 ctx.user_id().to_string(),
346 ctx.app_name().to_string(),
347 ctx.session_id().to_string(),
348 effective_user_content.clone(),
349 ctx.mutable_session().clone(),
350 );
351 if let Some(service) = artifact_service_clone.clone() {
352 let scoped = adk_artifact::ScopedArtifacts::new(
353 service,
354 ctx.app_name().to_string(),
355 ctx.user_id().to_string(),
356 ctx.session_id().to_string(),
357 );
358 refreshed_ctx = refreshed_ctx.with_artifacts(Arc::new(scoped));
359 }
360 if let Some(memory) = memory_service_clone.clone() {
361 refreshed_ctx = refreshed_ctx.with_memory(memory);
362 }
363 refreshed_ctx = refreshed_ctx.with_run_config(run_config.clone());
364 if let Some(rc) = request_context.clone() {
365 refreshed_ctx = refreshed_ctx.with_request_context(rc);
366 }
367 ctx = Arc::new(refreshed_ctx);
368 }
369 }
370 }
371
372 let agent_span = tracing::info_span!(
374 "agent.execute",
375 "gcp.vertex.agent.invocation_id" = ctx.invocation_id(),
376 "gcp.vertex.agent.session_id" = ctx.session_id(),
377 "gcp.vertex.agent.event_id" = ctx.invocation_id(), "gen_ai.conversation.id" = ctx.session_id(),
379 "adk.app_name" = ctx.app_name(),
380 "adk.user_id" = ctx.user_id(),
381 "agent.name" = %agent_to_run.name(),
382 "adk.skills.selected_name" = %selected_skill_name,
383 "adk.skills.selected_id" = %selected_skill_id
384 );
385
386 let mut agent_stream = match agent_to_run.run(ctx.clone()).instrument(agent_span).await {
387 Ok(s) => s,
388 Err(e) => {
389 if let Some(manager) = plugin_manager.as_ref() {
390 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
391 }
392 yield Err(e);
393 return;
394 }
395 };
396
397 use futures::StreamExt;
399 let mut transfer_target: Option<String> = None;
400
401 while let Some(result) = {
402 if let Some(token) = cancellation_token.as_ref() {
403 if token.is_cancelled() {
404 if let Some(manager) = plugin_manager.as_ref() {
405 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
406 }
407 return;
408 }
409 }
410 agent_stream.next().await
411 } {
412 match result {
413 Ok(event) => {
414 let mut event = event;
415
416 if let Some(manager) = plugin_manager.as_ref() {
417 match manager
418 .run_on_event(
419 ctx.clone() as Arc<dyn adk_core::InvocationContext>,
420 event.clone(),
421 )
422 .await
423 {
424 Ok(Some(modified)) => {
425 event = modified;
426 }
427 Ok(None) => {}
428 Err(e) => {
429 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
430 yield Err(e);
431 return;
432 }
433 }
434 }
435
436 if let Some(target) = &event.actions.transfer_to_agent {
438 transfer_target = Some(target.clone());
439 }
440
441 if !event.actions.state_delta.is_empty() {
447 ctx.mutable_session().apply_state_delta(&event.actions.state_delta);
448 }
449
450 ctx.mutable_session().append_event(event.clone());
452
453 if !event.llm_response.partial {
460 if let Err(e) = session_service.append_event(ctx.session_id(), event.clone()).await {
461 if let Some(manager) = plugin_manager.as_ref() {
462 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
463 }
464 yield Err(e);
465 return;
466 }
467 }
468 yield Ok(event);
469 }
470 Err(e) => {
471 if let Some(manager) = plugin_manager.as_ref() {
472 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
473 }
474 yield Err(e);
475 return;
476 }
477 }
478 }
479
480 const MAX_TRANSFER_DEPTH: u32 = 10;
487 let mut transfer_depth: u32 = 0;
488 let mut current_transfer_target = transfer_target;
489
490 while let Some(target_name) = current_transfer_target.take() {
491 transfer_depth += 1;
492 if transfer_depth > MAX_TRANSFER_DEPTH {
493 tracing::warn!(
494 depth = transfer_depth,
495 target = %target_name,
496 "max transfer depth exceeded, stopping transfer chain"
497 );
498 break;
499 }
500
501 let target_agent = match Self::find_agent(&root_agent, &target_name) {
502 Some(a) => a,
503 None => {
504 tracing::warn!(target = %target_name, "transfer target not found in agent tree");
505 break;
506 }
507 };
508
509 let (parent_name, peer_names) = Self::compute_transfer_context(&root_agent, &target_name);
514
515 let mut transfer_run_config = run_config.clone();
516 let mut targets = Vec::new();
517 if let Some(ref parent) = parent_name {
518 targets.push(parent.clone());
519 }
520 targets.extend(peer_names);
521 transfer_run_config.transfer_targets = targets;
522 transfer_run_config.parent_agent = parent_name;
523
524 let transfer_invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
526 let mut transfer_ctx = InvocationContext::with_mutable_session(
527 transfer_invocation_id.clone(),
528 target_agent.clone(),
529 ctx.user_id().to_string(),
530 ctx.app_name().to_string(),
531 ctx.session_id().to_string(),
532 effective_user_content.clone(),
533 ctx.mutable_session().clone(),
534 );
535
536 if let Some(ref service) = artifact_service_clone {
537 let scoped = adk_artifact::ScopedArtifacts::new(
538 service.clone(),
539 ctx.app_name().to_string(),
540 ctx.user_id().to_string(),
541 ctx.session_id().to_string(),
542 );
543 transfer_ctx = transfer_ctx.with_artifacts(Arc::new(scoped));
544 }
545 if let Some(ref memory) = memory_service_clone {
546 transfer_ctx = transfer_ctx.with_memory(memory.clone());
547 }
548 transfer_ctx = transfer_ctx.with_run_config(transfer_run_config);
549 if let Some(rc) = request_context.clone() {
550 transfer_ctx = transfer_ctx.with_request_context(rc);
551 }
552
553 let transfer_ctx = Arc::new(transfer_ctx);
554
555 let mut transfer_stream = match target_agent.run(transfer_ctx.clone()).await {
557 Ok(s) => s,
558 Err(e) => {
559 if let Some(manager) = plugin_manager.as_ref() {
560 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
561 }
562 yield Err(e);
563 return;
564 }
565 };
566
567 while let Some(result) = {
569 if let Some(token) = cancellation_token.as_ref() {
570 if token.is_cancelled() {
571 if let Some(manager) = plugin_manager.as_ref() {
572 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
573 }
574 return;
575 }
576 }
577 transfer_stream.next().await
578 } {
579 match result {
580 Ok(event) => {
581 let mut event = event;
582 if let Some(manager) = plugin_manager.as_ref() {
583 match manager
584 .run_on_event(
585 transfer_ctx.clone() as Arc<dyn adk_core::InvocationContext>,
586 event.clone(),
587 )
588 .await
589 {
590 Ok(Some(modified)) => {
591 event = modified;
592 }
593 Ok(None) => {}
594 Err(e) => {
595 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
596 yield Err(e);
597 return;
598 }
599 }
600 }
601
602 if let Some(target) = &event.actions.transfer_to_agent {
604 current_transfer_target = Some(target.clone());
605 }
606
607 if !event.actions.state_delta.is_empty() {
609 transfer_ctx.mutable_session().apply_state_delta(&event.actions.state_delta);
610 }
611
612 transfer_ctx.mutable_session().append_event(event.clone());
614
615 if !event.llm_response.partial {
616 if let Err(e) = session_service.append_event(ctx.session_id(), event.clone()).await {
617 if let Some(manager) = plugin_manager.as_ref() {
618 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
619 }
620 yield Err(e);
621 return;
622 }
623 }
624 yield Ok(event);
625 }
626 Err(e) => {
627 if let Some(manager) = plugin_manager.as_ref() {
628 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
629 }
630 yield Err(e);
631 return;
632 }
633 }
634 }
635 }
636
637 if let Some(ref compaction_cfg) = compaction_config {
641 let all_events = ctx.mutable_session().as_ref().events_snapshot();
643 let invocation_count = all_events.iter()
644 .filter(|e| e.author == "user")
645 .count() as u32;
646
647 if invocation_count > 0 && invocation_count % compaction_cfg.compaction_interval == 0 {
648 let overlap = compaction_cfg.overlap_size as usize;
651
652 let user_msg_indices: Vec<usize> = all_events.iter()
654 .enumerate()
655 .filter(|(_, e)| e.author == "user")
656 .map(|(i, _)| i)
657 .collect();
658
659 let compact_up_to = if overlap == 0 {
662 all_events.len()
663 } else if user_msg_indices.len() > overlap {
664 user_msg_indices[user_msg_indices.len() - overlap]
666 } else {
667 0
669 };
670
671 if compact_up_to > 0 {
672 let events_to_compact = &all_events[..compact_up_to];
673
674 match compaction_cfg.summarizer.summarize_events(events_to_compact).await {
675 Ok(Some(compaction_event)) => {
676 if let Err(e) = session_service.append_event(
678 ctx.session_id(),
679 compaction_event.clone(),
680 ).await {
681 tracing::warn!(error = %e, "Failed to persist compaction event");
682 } else {
683 tracing::info!(
684 compacted_events = compact_up_to,
685 "Context compaction completed"
686 );
687 }
688 }
689 Ok(None) => {
690 tracing::debug!("Compaction summarizer returned no result");
691 }
692 Err(e) => {
693 tracing::warn!(error = %e, "Context compaction failed");
695 }
696 }
697 }
698 }
699 }
700
701 if let Some(manager) = plugin_manager.as_ref() {
702 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
703 }
704 };
705
706 Ok(Box::pin(s))
707 }
708
709 pub fn find_agent_to_run(
711 root_agent: &Arc<dyn Agent>,
712 session: &dyn adk_session::Session,
713 ) -> Arc<dyn Agent> {
714 let events = session.events();
716 for i in (0..events.len()).rev() {
717 if let Some(event) = events.at(i) {
718 if let Some(target_name) = &event.actions.transfer_to_agent {
720 if let Some(agent) = Self::find_agent(root_agent, target_name) {
721 return agent;
722 }
723 }
724
725 if event.author == "user" {
726 continue;
727 }
728
729 if let Some(agent) = Self::find_agent(root_agent, &event.author) {
731 if Self::is_transferable(root_agent, &agent) {
733 return agent;
734 }
735 }
736 }
737 }
738
739 root_agent.clone()
741 }
742
743 fn is_transferable(root_agent: &Arc<dyn Agent>, agent: &Arc<dyn Agent>) -> bool {
745 let _ = (root_agent, agent);
748 true
749 }
750
751 pub fn find_agent(current: &Arc<dyn Agent>, target_name: &str) -> Option<Arc<dyn Agent>> {
753 if current.name() == target_name {
754 return Some(current.clone());
755 }
756
757 for sub_agent in current.sub_agents() {
758 if let Some(found) = Self::find_agent(sub_agent, target_name) {
759 return Some(found);
760 }
761 }
762
763 None
764 }
765
766 pub fn compute_transfer_context(
772 root: &Arc<dyn Agent>,
773 target_name: &str,
774 ) -> (Option<String>, Vec<String>) {
775 if root.name() == target_name {
777 return (None, Vec::new());
778 }
779
780 fn find_parent(current: &Arc<dyn Agent>, target: &str) -> Option<Arc<dyn Agent>> {
782 for sub in current.sub_agents() {
783 if sub.name() == target {
784 return Some(current.clone());
785 }
786 if let Some(found) = find_parent(sub, target) {
787 return Some(found);
788 }
789 }
790 None
791 }
792
793 match find_parent(root, target_name) {
794 Some(parent) => {
795 let parent_name = parent.name().to_string();
796 let peers: Vec<String> = parent
797 .sub_agents()
798 .iter()
799 .filter(|a| a.name() != target_name)
800 .map(|a| a.name().to_string())
801 .collect();
802 (Some(parent_name), peers)
803 }
804 None => (None, Vec::new()),
805 }
806 }
807}