1use crate::InvocationContext;
2use crate::cache::CacheManager;
3use adk_artifact::ArtifactService;
4use adk_core::{
5 Agent, AppName, CacheCapable, Content, ContextCacheConfig, EventStream, Memory,
6 ReadonlyContext, Result, RunConfig, SessionId, UserId,
7};
8use adk_plugin::PluginManager;
9use adk_session::SessionService;
10use adk_skill::{SkillInjector, SkillInjectorConfig};
11use async_stream::stream;
12use std::sync::Arc;
13use tokio_util::sync::CancellationToken;
14use tracing::Instrument;
15
16pub struct RunnerConfig {
17 pub app_name: String,
18 pub agent: Arc<dyn Agent>,
19 pub session_service: Arc<dyn SessionService>,
20 pub artifact_service: Option<Arc<dyn ArtifactService>>,
21 pub memory_service: Option<Arc<dyn Memory>>,
22 pub plugin_manager: Option<Arc<PluginManager>>,
23 #[allow(dead_code)]
26 pub run_config: Option<RunConfig>,
27 pub compaction_config: Option<adk_core::EventsCompactionConfig>,
31 pub context_cache_config: Option<ContextCacheConfig>,
39 pub cache_capable: Option<Arc<dyn CacheCapable>>,
42 pub request_context: Option<adk_core::RequestContext>,
46 pub cancellation_token: Option<CancellationToken>,
48}
49
50pub struct Runner {
51 app_name: String,
52 root_agent: Arc<dyn Agent>,
53 session_service: Arc<dyn SessionService>,
54 artifact_service: Option<Arc<dyn ArtifactService>>,
55 memory_service: Option<Arc<dyn Memory>>,
56 plugin_manager: Option<Arc<PluginManager>>,
57 skill_injector: Option<Arc<SkillInjector>>,
58 run_config: RunConfig,
59 compaction_config: Option<adk_core::EventsCompactionConfig>,
60 context_cache_config: Option<ContextCacheConfig>,
61 cache_capable: Option<Arc<dyn CacheCapable>>,
62 cache_manager: Option<Arc<tokio::sync::Mutex<CacheManager>>>,
63 request_context: Option<adk_core::RequestContext>,
64 cancellation_token: Option<CancellationToken>,
65}
66
67impl Runner {
68 pub fn new(config: RunnerConfig) -> Result<Self> {
69 let effective_cache_config = config
72 .context_cache_config
73 .or_else(|| config.cache_capable.as_ref().map(|_| ContextCacheConfig::default()));
74
75 let cache_manager = effective_cache_config
76 .as_ref()
77 .map(|c| Arc::new(tokio::sync::Mutex::new(CacheManager::new(c.clone()))));
78 Ok(Self {
79 app_name: config.app_name,
80 root_agent: config.agent,
81 session_service: config.session_service,
82 artifact_service: config.artifact_service,
83 memory_service: config.memory_service,
84 plugin_manager: config.plugin_manager,
85 skill_injector: None,
86 run_config: config.run_config.unwrap_or_default(),
87 compaction_config: config.compaction_config,
88 context_cache_config: effective_cache_config,
89 cache_capable: config.cache_capable,
90 cache_manager,
91 request_context: config.request_context,
92 cancellation_token: config.cancellation_token,
93 })
94 }
95
96 pub fn with_skill_injector(mut self, injector: SkillInjector) -> Self {
100 self.skill_injector = Some(Arc::new(injector));
101 self
102 }
103
104 pub fn with_auto_skills(
106 mut self,
107 root: impl AsRef<std::path::Path>,
108 config: SkillInjectorConfig,
109 ) -> adk_skill::SkillResult<Self> {
110 let injector = SkillInjector::from_root(root, config)?;
111 self.skill_injector = Some(Arc::new(injector));
112 Ok(self)
113 }
114
115 pub async fn run(
116 &self,
117 user_id: UserId,
118 session_id: SessionId,
119 user_content: Content,
120 ) -> Result<EventStream> {
121 let app_name = self.app_name.clone();
122 let typed_app_name = AppName::try_from(app_name.clone())?;
123 let session_service = self.session_service.clone();
124 let root_agent = self.root_agent.clone();
125 let artifact_service = self.artifact_service.clone();
126 let memory_service = self.memory_service.clone();
127 let plugin_manager = self.plugin_manager.clone();
128 let skill_injector = self.skill_injector.clone();
129 let mut run_config = self.run_config.clone();
130 let compaction_config = self.compaction_config.clone();
131 let context_cache_config = self.context_cache_config.clone();
132 let cache_capable = self.cache_capable.clone();
133 let cache_manager_ref = self.cache_manager.clone();
134 let request_context = self.request_context.clone();
135 let cancellation_token = self.cancellation_token.clone();
136
137 let s = stream! {
138 let session = match session_service
140 .get(adk_session::GetRequest {
141 app_name: app_name.clone(),
142 user_id: user_id.to_string(),
143 session_id: session_id.to_string(),
144 num_recent_events: None,
145 after: None,
146 })
147 .await
148 {
149 Ok(s) => s,
150 Err(e) => {
151 yield Err(e);
152 return;
153 }
154 };
155
156 let agent_to_run = Self::find_agent_to_run(&root_agent, session.as_ref());
158
159 let artifact_service_clone = artifact_service.clone();
161 let memory_service_clone = memory_service.clone();
162
163 let invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
165 let mut effective_user_content = user_content.clone();
166 let mut selected_skill_name = String::new();
167 let mut selected_skill_id = String::new();
168
169 if let Some(injector) = skill_injector.as_ref() {
170 if let Some(matched) = adk_skill::apply_skill_injection(
171 &mut effective_user_content,
172 injector.index(),
173 injector.policy(),
174 injector.max_injected_chars(),
175 ) {
176 selected_skill_name = matched.skill.name;
177 selected_skill_id = matched.skill.id;
178 }
179 }
180
181 let mut invocation_ctx = match InvocationContext::new_typed(
182 invocation_id.clone(),
183 agent_to_run.clone(),
184 user_id.clone(),
185 typed_app_name.clone(),
186 session_id.clone(),
187 effective_user_content.clone(),
188 Arc::from(session),
189 ) {
190 Ok(ctx) => ctx,
191 Err(e) => {
192 yield Err(e);
193 return;
194 }
195 };
196
197 if let Some(service) = artifact_service {
199 let scoped = adk_artifact::ScopedArtifacts::new(
201 service,
202 app_name.clone(),
203 user_id.to_string(),
204 session_id.to_string(),
205 );
206 invocation_ctx = invocation_ctx.with_artifacts(Arc::new(scoped));
207 }
208 if let Some(memory) = memory_service {
209 invocation_ctx = invocation_ctx.with_memory(memory);
210 }
211
212 invocation_ctx = invocation_ctx.with_run_config(run_config.clone());
214
215 if let Some(rc) = request_context.clone() {
217 invocation_ctx = invocation_ctx.with_request_context(rc);
218 }
219
220 let mut ctx = Arc::new(invocation_ctx);
221
222 if let Some(manager) = plugin_manager.as_ref() {
223 match manager
224 .run_before_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>)
225 .await
226 {
227 Ok(Some(content)) => {
228 let mut early_event = adk_core::Event::new(ctx.invocation_id());
229 early_event.author = agent_to_run.name().to_string();
230 early_event.llm_response.content = Some(content);
231
232 ctx.mutable_session().append_event(early_event.clone());
233 if let Err(e) = session_service.append_event(ctx.session_id(), early_event.clone()).await {
234 yield Err(e);
235 return;
236 }
237
238 yield Ok(early_event);
239 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
240 return;
241 }
242 Ok(None) => {}
243 Err(e) => {
244 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
245 yield Err(e);
246 return;
247 }
248 }
249
250 match manager
251 .run_on_user_message(
252 ctx.clone() as Arc<dyn adk_core::InvocationContext>,
253 effective_user_content.clone(),
254 )
255 .await
256 {
257 Ok(Some(modified)) => {
258 effective_user_content = modified;
259
260 let mut refreshed_ctx = match InvocationContext::with_mutable_session(
261 ctx.invocation_id().to_string(),
262 agent_to_run.clone(),
263 ctx.user_id().to_string(),
264 ctx.app_name().to_string(),
265 ctx.session_id().to_string(),
266 effective_user_content.clone(),
267 ctx.mutable_session().clone(),
268 ) {
269 Ok(ctx) => ctx,
270 Err(e) => {
271 yield Err(e);
272 return;
273 }
274 };
275
276 if let Some(service) = artifact_service_clone.clone() {
277 let scoped = adk_artifact::ScopedArtifacts::new(
278 service,
279 ctx.app_name().to_string(),
280 ctx.user_id().to_string(),
281 ctx.session_id().to_string(),
282 );
283 refreshed_ctx = refreshed_ctx.with_artifacts(Arc::new(scoped));
284 }
285 if let Some(memory) = memory_service_clone.clone() {
286 refreshed_ctx = refreshed_ctx.with_memory(memory);
287 }
288 refreshed_ctx = refreshed_ctx.with_run_config(run_config.clone());
289 if let Some(rc) = request_context.clone() {
290 refreshed_ctx = refreshed_ctx.with_request_context(rc);
291 }
292 ctx = Arc::new(refreshed_ctx);
293 }
294 Ok(None) => {}
295 Err(e) => {
296 if let Some(manager) = plugin_manager.as_ref() {
297 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
298 }
299 yield Err(e);
300 return;
301 }
302 }
303 }
304
305 let mut user_event = adk_core::Event::new(ctx.invocation_id());
307 user_event.author = "user".to_string();
308 user_event.llm_response.content = Some(effective_user_content.clone());
309
310 ctx.mutable_session().append_event(user_event.clone());
313
314 if let Err(e) = session_service.append_event(ctx.session_id(), user_event).await {
315 if let Some(manager) = plugin_manager.as_ref() {
316 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
317 }
318 yield Err(e);
319 return;
320 }
321
322 if let (Some(cm_mutex), Some(cache_model)) = (&cache_manager_ref, &cache_capable) {
327 let mut cm = cm_mutex.lock().await;
328 if cm.is_enabled() {
329 if cm.active_cache_name().is_none() || cm.needs_refresh() {
330 let system_instruction = agent_to_run.description().to_string();
334 let tools = std::collections::HashMap::new();
335 let ttl = context_cache_config.as_ref().map_or(600, |c| c.ttl_seconds);
336
337 match cache_model.create_cache(&system_instruction, &tools, ttl).await {
338 Ok(name) => {
339 if let Some(old) = cm.clear_active_cache() {
341 if let Err(e) = cache_model.delete_cache(&old).await {
342 tracing::warn!(
343 old_cache = %old,
344 error = %e,
345 "failed to delete old cache, proceeding with new cache"
346 );
347 }
348 }
349 cm.set_active_cache(name);
350 }
351 Err(e) => {
352 tracing::warn!(
353 error = %e,
354 "cache creation failed, proceeding without cache"
355 );
356 }
357 }
358 }
359
360 if let Some(cache_name) = cm.record_invocation() {
362 run_config.cached_content = Some(cache_name.to_string());
363 let mut refreshed_ctx = match InvocationContext::with_mutable_session(
365 ctx.invocation_id().to_string(),
366 agent_to_run.clone(),
367 ctx.user_id().to_string(),
368 ctx.app_name().to_string(),
369 ctx.session_id().to_string(),
370 effective_user_content.clone(),
371 ctx.mutable_session().clone(),
372 ) {
373 Ok(ctx) => ctx,
374 Err(e) => {
375 yield Err(e);
376 return;
377 }
378 };
379 if let Some(service) = artifact_service_clone.clone() {
380 let scoped = adk_artifact::ScopedArtifacts::new(
381 service,
382 ctx.app_name().to_string(),
383 ctx.user_id().to_string(),
384 ctx.session_id().to_string(),
385 );
386 refreshed_ctx = refreshed_ctx.with_artifacts(Arc::new(scoped));
387 }
388 if let Some(memory) = memory_service_clone.clone() {
389 refreshed_ctx = refreshed_ctx.with_memory(memory);
390 }
391 refreshed_ctx = refreshed_ctx.with_run_config(run_config.clone());
392 if let Some(rc) = request_context.clone() {
393 refreshed_ctx = refreshed_ctx.with_request_context(rc);
394 }
395 ctx = Arc::new(refreshed_ctx);
396 }
397 }
398 }
399
400 let agent_span = tracing::info_span!(
402 "agent.execute",
403 "gcp.vertex.agent.invocation_id" = ctx.invocation_id(),
404 "gcp.vertex.agent.session_id" = ctx.session_id(),
405 "gcp.vertex.agent.event_id" = ctx.invocation_id(), "gen_ai.conversation.id" = ctx.session_id(),
407 "adk.app_name" = ctx.app_name(),
408 "adk.user_id" = ctx.user_id(),
409 "agent.name" = %agent_to_run.name(),
410 "adk.skills.selected_name" = %selected_skill_name,
411 "adk.skills.selected_id" = %selected_skill_id
412 );
413
414 let mut agent_stream = match agent_to_run.run(ctx.clone()).instrument(agent_span).await {
415 Ok(s) => s,
416 Err(e) => {
417 if let Some(manager) = plugin_manager.as_ref() {
418 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
419 }
420 yield Err(e);
421 return;
422 }
423 };
424
425 use futures::StreamExt;
427 let mut transfer_target: Option<String> = None;
428
429 while let Some(result) = {
430 if let Some(token) = cancellation_token.as_ref() {
431 if token.is_cancelled() {
432 if let Some(manager) = plugin_manager.as_ref() {
433 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
434 }
435 return;
436 }
437 }
438 agent_stream.next().await
439 } {
440 match result {
441 Ok(event) => {
442 let mut event = event;
443
444 if let Some(manager) = plugin_manager.as_ref() {
445 match manager
446 .run_on_event(
447 ctx.clone() as Arc<dyn adk_core::InvocationContext>,
448 event.clone(),
449 )
450 .await
451 {
452 Ok(Some(modified)) => {
453 event = modified;
454 }
455 Ok(None) => {}
456 Err(e) => {
457 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
458 yield Err(e);
459 return;
460 }
461 }
462 }
463
464 if let Some(target) = &event.actions.transfer_to_agent {
466 transfer_target = Some(target.clone());
467 }
468
469 if !event.actions.state_delta.is_empty() {
475 ctx.mutable_session().apply_state_delta(&event.actions.state_delta);
476 }
477
478 ctx.mutable_session().append_event(event.clone());
480
481 if !event.llm_response.partial {
488 if let Err(e) = session_service.append_event(ctx.session_id(), event.clone()).await {
489 if let Some(manager) = plugin_manager.as_ref() {
490 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
491 }
492 yield Err(e);
493 return;
494 }
495 }
496 yield Ok(event);
497 }
498 Err(e) => {
499 if let Some(manager) = plugin_manager.as_ref() {
500 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
501 }
502 yield Err(e);
503 return;
504 }
505 }
506 }
507
508 const MAX_TRANSFER_DEPTH: u32 = 10;
515 let mut transfer_depth: u32 = 0;
516 let mut current_transfer_target = transfer_target;
517
518 while let Some(target_name) = current_transfer_target.take() {
519 transfer_depth += 1;
520 if transfer_depth > MAX_TRANSFER_DEPTH {
521 tracing::warn!(
522 depth = transfer_depth,
523 target = %target_name,
524 "max transfer depth exceeded, stopping transfer chain"
525 );
526 break;
527 }
528
529 let target_agent = match Self::find_agent(&root_agent, &target_name) {
530 Some(a) => a,
531 None => {
532 tracing::warn!(target = %target_name, "transfer target not found in agent tree");
533 break;
534 }
535 };
536
537 let (parent_name, peer_names) = Self::compute_transfer_context(&root_agent, &target_name);
542
543 let mut transfer_run_config = run_config.clone();
544 let mut targets = Vec::new();
545 if let Some(ref parent) = parent_name {
546 targets.push(parent.clone());
547 }
548 targets.extend(peer_names);
549 transfer_run_config.transfer_targets = targets;
550 transfer_run_config.parent_agent = parent_name;
551
552 let transfer_invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
554 let mut transfer_ctx = match InvocationContext::with_mutable_session(
555 transfer_invocation_id.clone(),
556 target_agent.clone(),
557 ctx.user_id().to_string(),
558 ctx.app_name().to_string(),
559 ctx.session_id().to_string(),
560 effective_user_content.clone(),
561 ctx.mutable_session().clone(),
562 ) {
563 Ok(ctx) => ctx,
564 Err(e) => {
565 yield Err(e);
566 return;
567 }
568 };
569
570 if let Some(ref service) = artifact_service_clone {
571 let scoped = adk_artifact::ScopedArtifacts::new(
572 service.clone(),
573 ctx.app_name().to_string(),
574 ctx.user_id().to_string(),
575 ctx.session_id().to_string(),
576 );
577 transfer_ctx = transfer_ctx.with_artifacts(Arc::new(scoped));
578 }
579 if let Some(ref memory) = memory_service_clone {
580 transfer_ctx = transfer_ctx.with_memory(memory.clone());
581 }
582 transfer_ctx = transfer_ctx.with_run_config(transfer_run_config);
583 if let Some(rc) = request_context.clone() {
584 transfer_ctx = transfer_ctx.with_request_context(rc);
585 }
586
587 let transfer_ctx = Arc::new(transfer_ctx);
588
589 let mut transfer_stream = match target_agent.run(transfer_ctx.clone()).await {
591 Ok(s) => s,
592 Err(e) => {
593 if let Some(manager) = plugin_manager.as_ref() {
594 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
595 }
596 yield Err(e);
597 return;
598 }
599 };
600
601 while let Some(result) = {
603 if let Some(token) = cancellation_token.as_ref() {
604 if token.is_cancelled() {
605 if let Some(manager) = plugin_manager.as_ref() {
606 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
607 }
608 return;
609 }
610 }
611 transfer_stream.next().await
612 } {
613 match result {
614 Ok(event) => {
615 let mut event = event;
616 if let Some(manager) = plugin_manager.as_ref() {
617 match manager
618 .run_on_event(
619 transfer_ctx.clone() as Arc<dyn adk_core::InvocationContext>,
620 event.clone(),
621 )
622 .await
623 {
624 Ok(Some(modified)) => {
625 event = modified;
626 }
627 Ok(None) => {}
628 Err(e) => {
629 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
630 yield Err(e);
631 return;
632 }
633 }
634 }
635
636 if let Some(target) = &event.actions.transfer_to_agent {
638 current_transfer_target = Some(target.clone());
639 }
640
641 if !event.actions.state_delta.is_empty() {
643 transfer_ctx.mutable_session().apply_state_delta(&event.actions.state_delta);
644 }
645
646 transfer_ctx.mutable_session().append_event(event.clone());
648
649 if !event.llm_response.partial {
650 if let Err(e) = session_service.append_event(ctx.session_id(), event.clone()).await {
651 if let Some(manager) = plugin_manager.as_ref() {
652 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
653 }
654 yield Err(e);
655 return;
656 }
657 }
658 yield Ok(event);
659 }
660 Err(e) => {
661 if let Some(manager) = plugin_manager.as_ref() {
662 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
663 }
664 yield Err(e);
665 return;
666 }
667 }
668 }
669 }
670
671 if let Some(ref compaction_cfg) = compaction_config {
675 let event_count = ctx.mutable_session().as_ref().events_len();
676
677 if event_count > 0 {
678 let all_events = ctx.mutable_session().as_ref().events_snapshot();
679 let invocation_count = all_events.iter().filter(|e| e.author == "user").count()
680 as u32;
681
682 if invocation_count > 0
683 && invocation_count % compaction_cfg.compaction_interval == 0
684 {
685 let overlap = compaction_cfg.overlap_size as usize;
688
689 let user_msg_indices: Vec<usize> = all_events.iter()
691 .enumerate()
692 .filter(|(_, e)| e.author == "user")
693 .map(|(i, _)| i)
694 .collect();
695
696 let compact_up_to = if overlap == 0 {
699 all_events.len()
700 } else if user_msg_indices.len() > overlap {
701 user_msg_indices[user_msg_indices.len() - overlap]
703 } else {
704 0
706 };
707
708 if compact_up_to > 0 {
709 let events_to_compact = &all_events[..compact_up_to];
710
711 match compaction_cfg.summarizer.summarize_events(events_to_compact).await {
712 Ok(Some(compaction_event)) => {
713 if let Err(e) = session_service.append_event(
715 ctx.session_id(),
716 compaction_event.clone(),
717 ).await {
718 tracing::warn!(error = %e, "Failed to persist compaction event");
719 } else {
720 tracing::info!(
721 compacted_events = compact_up_to,
722 "Context compaction completed"
723 );
724 }
725 }
726 Ok(None) => {
727 tracing::debug!("Compaction summarizer returned no result");
728 }
729 Err(e) => {
730 tracing::warn!(error = %e, "Context compaction failed");
732 }
733 }
734 }
735 }
736 }
737 }
738
739 if let Some(manager) = plugin_manager.as_ref() {
740 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
741 }
742 };
743
744 Ok(Box::pin(s))
745 }
746
747 pub fn find_agent_to_run(
749 root_agent: &Arc<dyn Agent>,
750 session: &dyn adk_session::Session,
751 ) -> Arc<dyn Agent> {
752 let events = session.events();
754 for i in (0..events.len()).rev() {
755 if let Some(event) = events.at(i) {
756 if let Some(target_name) = &event.actions.transfer_to_agent {
758 if let Some(agent) = Self::find_agent(root_agent, target_name) {
759 return agent;
760 }
761 }
762
763 if event.author == "user" {
764 continue;
765 }
766
767 if let Some(agent) = Self::find_agent(root_agent, &event.author) {
769 if Self::is_transferable(root_agent, &agent) {
771 return agent;
772 }
773 }
774 }
775 }
776
777 root_agent.clone()
779 }
780
781 fn is_transferable(_root_agent: &Arc<dyn Agent>, _agent: &Arc<dyn Agent>) -> bool {
791 true
792 }
793
794 pub fn find_agent(current: &Arc<dyn Agent>, target_name: &str) -> Option<Arc<dyn Agent>> {
796 if current.name() == target_name {
797 return Some(current.clone());
798 }
799
800 for sub_agent in current.sub_agents() {
801 if let Some(found) = Self::find_agent(sub_agent, target_name) {
802 return Some(found);
803 }
804 }
805
806 None
807 }
808
809 pub fn compute_transfer_context(
815 root: &Arc<dyn Agent>,
816 target_name: &str,
817 ) -> (Option<String>, Vec<String>) {
818 if root.name() == target_name {
820 return (None, Vec::new());
821 }
822
823 fn find_parent(current: &Arc<dyn Agent>, target: &str) -> Option<Arc<dyn Agent>> {
825 for sub in current.sub_agents() {
826 if sub.name() == target {
827 return Some(current.clone());
828 }
829 if let Some(found) = find_parent(sub, target) {
830 return Some(found);
831 }
832 }
833 None
834 }
835
836 match find_parent(root, target_name) {
837 Some(parent) => {
838 let parent_name = parent.name().to_string();
839 let peers: Vec<String> = parent
840 .sub_agents()
841 .iter()
842 .filter(|a| a.name() != target_name)
843 .map(|a| a.name().to_string())
844 .collect();
845 (Some(parent_name), peers)
846 }
847 None => (None, Vec::new()),
848 }
849 }
850}