1use crate::InvocationContext;
2use crate::cache::CacheManager;
3#[cfg(feature = "artifacts")]
4use adk_artifact::ArtifactService;
5use adk_core::{
6 Agent, AppName, CacheCapable, Content, ContextCacheConfig, EventStream, Memory,
7 ReadonlyContext, Result, RunConfig, SessionId, UserId,
8};
9#[cfg(feature = "plugins")]
10use adk_plugin::PluginManager;
11use adk_session::SessionService;
12#[cfg(feature = "skills")]
13use adk_skill::{SkillInjector, SkillInjectorConfig};
14use async_stream::stream;
15use std::sync::Arc;
16use tokio_util::sync::CancellationToken;
17use tracing::Instrument;
18
19pub struct RunnerConfig {
23 pub app_name: String,
25 pub agent: Arc<dyn Agent>,
27 pub session_service: Arc<dyn SessionService>,
29 #[cfg(feature = "artifacts")]
30 pub artifact_service: Option<Arc<dyn ArtifactService>>,
32 pub memory_service: Option<Arc<dyn Memory>>,
34 #[cfg(feature = "plugins")]
35 pub plugin_manager: Option<Arc<PluginManager>>,
37 #[allow(dead_code)]
40 pub run_config: Option<RunConfig>,
41 pub compaction_config: Option<adk_core::EventsCompactionConfig>,
45 pub context_cache_config: Option<ContextCacheConfig>,
53 pub cache_capable: Option<Arc<dyn CacheCapable>>,
56 pub request_context: Option<adk_core::RequestContext>,
60 pub cancellation_token: Option<CancellationToken>,
62 pub intra_compaction_config: Option<adk_core::IntraCompactionConfig>,
66 pub intra_compaction_summarizer: Option<Arc<dyn adk_core::BaseEventsSummarizer>>,
69 #[cfg(feature = "context-compaction")]
77 pub context_compaction: Option<crate::compaction::CompactionConfig>,
78}
79
80pub struct Runner {
86 app_name: String,
87 root_agent: Arc<dyn Agent>,
88 session_service: Arc<dyn SessionService>,
89 #[cfg(feature = "artifacts")]
90 artifact_service: Option<Arc<dyn ArtifactService>>,
91 memory_service: Option<Arc<dyn Memory>>,
92 #[cfg(feature = "plugins")]
93 plugin_manager: Option<Arc<PluginManager>>,
94 #[cfg(feature = "skills")]
95 skill_injector: Option<Arc<SkillInjector>>,
96 run_config: RunConfig,
97 compaction_config: Option<adk_core::EventsCompactionConfig>,
98 context_cache_config: Option<ContextCacheConfig>,
99 cache_capable: Option<Arc<dyn CacheCapable>>,
100 cache_manager: Option<Arc<tokio::sync::Mutex<CacheManager>>>,
101 request_context: Option<adk_core::RequestContext>,
102 cancellation_token: Option<CancellationToken>,
103 intra_compactor: Option<Arc<crate::intra_compaction::IntraInvocationCompactor>>,
104 #[cfg(feature = "context-compaction")]
106 context_compaction: Option<Arc<crate::compaction::CompactionConfig>>,
107 active_sessions: Arc<std::sync::Mutex<std::collections::HashMap<String, CancellationToken>>>,
110}
111
112impl Runner {
113 pub fn builder() -> crate::builder::RunnerConfigBuilder<
129 crate::builder::NoAppName,
130 crate::builder::NoAgent,
131 crate::builder::NoSessionService,
132 > {
133 crate::builder::RunnerConfigBuilder::new()
134 }
135
136 pub fn new(config: RunnerConfig) -> Result<Self> {
140 let run_config = config.run_config.unwrap_or_default();
141
142 let effective_cache_config = config
145 .context_cache_config
146 .or_else(|| config.cache_capable.as_ref().map(|_| ContextCacheConfig::default()));
147
148 let cache_manager = effective_cache_config
149 .as_ref()
150 .map(|c| Arc::new(tokio::sync::Mutex::new(CacheManager::new(c.clone()))));
151
152 let intra_compactor = config.intra_compaction_config.as_ref().and_then(|ic_config| {
153 config.intra_compaction_summarizer.as_ref().map(|summarizer| {
154 Arc::new(crate::intra_compaction::IntraInvocationCompactor::new(
155 ic_config.clone(),
156 summarizer.clone(),
157 ))
158 })
159 });
160
161 Ok(Self {
162 app_name: config.app_name,
163 root_agent: config.agent,
164 session_service: config.session_service,
165 #[cfg(feature = "artifacts")]
166 artifact_service: config.artifact_service,
167 memory_service: config.memory_service,
168 #[cfg(feature = "plugins")]
169 plugin_manager: config.plugin_manager,
170 #[cfg(feature = "skills")]
171 skill_injector: None,
172 run_config,
173 compaction_config: config.compaction_config,
174 context_cache_config: effective_cache_config,
175 cache_capable: config.cache_capable,
176 cache_manager,
177 request_context: config.request_context,
178 cancellation_token: config.cancellation_token,
179 intra_compactor,
180 #[cfg(feature = "context-compaction")]
181 context_compaction: config.context_compaction.map(Arc::new),
182 active_sessions: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
183 })
184 }
185
186 #[cfg(feature = "skills")]
190 pub fn with_skill_injector(mut self, injector: SkillInjector) -> Self {
191 self.skill_injector = Some(Arc::new(injector));
192 self
193 }
194
195 #[cfg(feature = "skills")]
197 #[deprecated(note = "Use with_auto_skills_mut instead")]
198 pub fn with_auto_skills(
199 mut self,
200 root: impl AsRef<std::path::Path>,
201 config: SkillInjectorConfig,
202 ) -> adk_skill::SkillResult<Self> {
203 self.with_auto_skills_mut(root, config)?;
204 Ok(self)
205 }
206
207 #[cfg(feature = "skills")]
213 pub fn with_auto_skills_mut(
214 &mut self,
215 root: impl AsRef<std::path::Path>,
216 config: SkillInjectorConfig,
217 ) -> adk_skill::SkillResult<()> {
218 let injector = SkillInjector::from_root(root, config)?;
219 self.skill_injector = Some(Arc::new(injector));
220 Ok(())
221 }
222
223 pub async fn run(
228 &self,
229 user_id: UserId,
230 session_id: SessionId,
231 user_content: Content,
232 ) -> Result<EventStream> {
233 let app_name = self.app_name.clone();
234 let typed_app_name = AppName::try_from(app_name.clone())?;
235 let session_service = self.session_service.clone();
236 let root_agent = self.root_agent.clone();
237 #[cfg(feature = "artifacts")]
238 let artifact_service = self.artifact_service.clone();
239 let memory_service = self.memory_service.clone();
240 #[cfg(feature = "plugins")]
241 let plugin_manager = self.plugin_manager.clone();
242 #[cfg(feature = "skills")]
243 let skill_injector = self.skill_injector.clone();
244 let mut run_config = self.run_config.clone();
245 let compaction_config = self.compaction_config.clone();
246 let context_cache_config = self.context_cache_config.clone();
247 let cache_capable = self.cache_capable.clone();
248 let cache_manager_ref = self.cache_manager.clone();
249 let request_context = self.request_context.clone();
250 let cancellation_token = self.cancellation_token.clone();
251 let intra_compactor = self.intra_compactor.clone();
252 #[cfg(feature = "context-compaction")]
253 let context_compaction = self.context_compaction.clone();
254
255 let session_token = CancellationToken::new();
259 let session_id_str = session_id.as_str().to_string();
260 {
261 let mut sessions = self.active_sessions.lock().unwrap_or_else(|e| e.into_inner());
262 sessions.insert(session_id_str.clone(), session_token.clone());
263 }
264 let active_sessions = self.active_sessions.clone();
265
266 let effective_token = if let Some(ref global) = cancellation_token {
268 let combined = CancellationToken::new();
269 let combined_clone = combined.clone();
270 let global_clone = global.clone();
271 let session_clone = session_token.clone();
272 let combined_for_global = combined_clone.clone();
274 tokio::spawn(async move {
275 global_clone.cancelled().await;
276 combined_for_global.cancel();
277 });
278 let combined_for_session = combined_clone;
279 tokio::spawn(async move {
280 session_clone.cancelled().await;
281 combined_for_session.cancel();
282 });
283 Some(combined)
284 } else {
285 Some(session_token.clone())
286 };
287
288 let s = stream! {
289 struct SessionCleanup {
292 active_sessions: Arc<std::sync::Mutex<std::collections::HashMap<String, CancellationToken>>>,
293 session_id: String,
294 }
295 impl Drop for SessionCleanup {
296 fn drop(&mut self) {
297 let mut sessions = self.active_sessions.lock().unwrap_or_else(|e| e.into_inner());
298 sessions.remove(&self.session_id);
299 }
300 }
301 let _cleanup = SessionCleanup {
302 active_sessions: active_sessions.clone(),
303 session_id: session_id_str,
304 };
305
306 let cancellation_token = effective_token;
308 let session = match session_service
310 .get(adk_session::GetRequest {
311 app_name: app_name.clone(),
312 user_id: user_id.to_string(),
313 session_id: session_id.to_string(),
314 num_recent_events: run_config.history_max_events,
315 after: None,
316 })
317 .await
318 {
319 Ok(s) => s,
320 Err(e) => {
321 yield Err(e);
322 return;
323 }
324 };
325
326 let agent_to_run = Self::find_agent_to_run(&root_agent, session.as_ref());
328
329 #[cfg(feature = "artifacts")]
331 let artifact_service_clone = artifact_service.clone();
332 let memory_service_clone = memory_service.clone();
333
334 let invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
336 #[cfg(any(feature = "skills", feature = "plugins"))]
337 let mut effective_user_content = user_content.clone();
338 #[cfg(not(any(feature = "skills", feature = "plugins")))]
339 let effective_user_content = user_content.clone();
340 #[cfg(feature = "skills")]
341 let mut selected_skill_name = String::new();
342 #[cfg(not(feature = "skills"))]
343 let selected_skill_name = String::new();
344 #[cfg(feature = "skills")]
345 let mut selected_skill_id = String::new();
346 #[cfg(not(feature = "skills"))]
347 let selected_skill_id = String::new();
348
349 #[cfg(feature = "skills")]
350 if let Some(injector) = skill_injector.as_ref()
351 && let Some(matched) = adk_skill::apply_skill_injection(
352 &mut effective_user_content,
353 injector.index(),
354 injector.policy(),
355 injector.max_injected_chars(),
356 ) {
357 selected_skill_name = matched.skill.name;
358 selected_skill_id = matched.skill.id;
359 }
360
361 let mut invocation_ctx = match InvocationContext::new_typed(
362 invocation_id.clone(),
363 agent_to_run.clone(),
364 user_id.clone(),
365 typed_app_name.clone(),
366 session_id.clone(),
367 effective_user_content.clone(),
368 Arc::from(session),
369 ) {
370 Ok(ctx) => ctx,
371 Err(e) => {
372 yield Err(e);
373 return;
374 }
375 };
376
377 #[cfg(feature = "artifacts")]
379 if let Some(service) = artifact_service {
380 let scoped = adk_artifact::ScopedArtifacts::new(
382 service,
383 app_name.clone(),
384 user_id.to_string(),
385 session_id.to_string(),
386 );
387 invocation_ctx = invocation_ctx.with_artifacts(Arc::new(scoped));
388 }
389 if let Some(memory) = memory_service {
390 invocation_ctx = invocation_ctx.with_memory(memory);
391 }
392
393 invocation_ctx = invocation_ctx.with_run_config(run_config.clone());
395
396 if let Some(rc) = request_context.clone() {
398 invocation_ctx = invocation_ctx.with_request_context(rc);
399 }
400
401 let mut ctx = Arc::new(invocation_ctx);
402
403 #[cfg(feature = "plugins")]
404 if let Some(manager) = plugin_manager.as_ref() {
405 match manager
406 .run_before_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>)
407 .await
408 {
409 Ok(Some(content)) => {
410 let mut early_event = adk_core::Event::new(ctx.invocation_id());
411 early_event.author = agent_to_run.name().to_string();
412 early_event.llm_response.content = Some(content);
413
414 ctx.mutable_session().append_event(early_event.clone());
415 if let Err(e) = session_service.append_event(ctx.session_id(), early_event.clone()).await {
416 yield Err(e);
417 return;
418 }
419
420 yield Ok(early_event);
421 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
422 return;
423 }
424 Ok(None) => {}
425 Err(e) => {
426 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
427 yield Err(e);
428 return;
429 }
430 }
431
432 match manager
433 .run_on_user_message(
434 ctx.clone() as Arc<dyn adk_core::InvocationContext>,
435 effective_user_content.clone(),
436 )
437 .await
438 {
439 Ok(Some(modified)) => {
440 effective_user_content = modified;
441
442 let mut refreshed_ctx = match InvocationContext::with_mutable_session(
443 ctx.invocation_id().to_string(),
444 agent_to_run.clone(),
445 ctx.user_id().to_string(),
446 ctx.app_name().to_string(),
447 ctx.session_id().to_string(),
448 effective_user_content.clone(),
449 ctx.mutable_session().clone(),
450 ) {
451 Ok(ctx) => ctx,
452 Err(e) => {
453 yield Err(e);
454 return;
455 }
456 };
457
458 #[cfg(feature = "artifacts")]
459 if let Some(service) = artifact_service_clone.clone() {
460 let scoped = adk_artifact::ScopedArtifacts::new(
461 service,
462 ctx.app_name().to_string(),
463 ctx.user_id().to_string(),
464 ctx.session_id().to_string(),
465 );
466 refreshed_ctx = refreshed_ctx.with_artifacts(Arc::new(scoped));
467 }
468 if let Some(memory) = memory_service_clone.clone() {
469 refreshed_ctx = refreshed_ctx.with_memory(memory);
470 }
471 refreshed_ctx = refreshed_ctx.with_run_config(run_config.clone());
472 if let Some(rc) = request_context.clone() {
473 refreshed_ctx = refreshed_ctx.with_request_context(rc);
474 }
475 ctx = Arc::new(refreshed_ctx);
476 }
477 Ok(None) => {}
478 Err(e) => {
479 if let Some(manager) = plugin_manager.as_ref() {
480 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
481 }
482 yield Err(e);
483 return;
484 }
485 }
486 }
487
488 let mut user_event = adk_core::Event::new(ctx.invocation_id());
490 user_event.author = "user".to_string();
491 user_event.llm_response.content = Some(effective_user_content.clone());
492
493 ctx.mutable_session().append_event(user_event.clone());
496
497 if let Err(e) = session_service.append_event(ctx.session_id(), user_event).await {
498 #[cfg(feature = "plugins")]
499 if let Some(manager) = plugin_manager.as_ref() {
500 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
501 }
502 yield Err(e);
503 return;
504 }
505
506 if let (Some(cm_mutex), Some(cache_model)) = (&cache_manager_ref, &cache_capable) {
511 let should_refresh_cache = {
512 let cm = cm_mutex.lock().await;
513 cm.is_enabled() && (cm.active_cache_name().is_none() || cm.needs_refresh())
514 };
515
516 if should_refresh_cache {
517 let system_instruction = agent_to_run.description().to_string();
521 let tools = std::collections::HashMap::new();
522 let ttl = context_cache_config.as_ref().map_or(600, |c| c.ttl_seconds);
523
524 match cache_model.create_cache(&system_instruction, &tools, ttl).await {
525 Ok(name) => {
526 let old_cache = {
527 let mut cm = cm_mutex.lock().await;
528 let old = cm.clear_active_cache();
529 cm.set_active_cache(name);
530 old
531 };
532
533 if let Some(old) = old_cache
534 && let Err(e) = cache_model.delete_cache(&old).await {
535 tracing::warn!(
536 old_cache = %old,
537 error = %e,
538 "failed to delete old cache, proceeding with new cache"
539 );
540 }
541 }
542 Err(e) => {
543 tracing::warn!(
544 error = %e,
545 "cache creation failed, proceeding without cache"
546 );
547 }
548 }
549 }
550
551 let cache_name = {
553 let mut cm = cm_mutex.lock().await;
554 if cm.is_enabled() {
555 cm.record_invocation().map(str::to_string)
556 } else {
557 None
558 }
559 };
560
561 if let Some(cache_name) = cache_name {
562 run_config.cached_content = Some(cache_name);
563 let mut refreshed_ctx = match InvocationContext::with_mutable_session(
565 ctx.invocation_id().to_string(),
566 agent_to_run.clone(),
567 ctx.user_id().to_string(),
568 ctx.app_name().to_string(),
569 ctx.session_id().to_string(),
570 effective_user_content.clone(),
571 ctx.mutable_session().clone(),
572 ) {
573 Ok(ctx) => ctx,
574 Err(e) => {
575 yield Err(e);
576 return;
577 }
578 };
579 #[cfg(feature = "artifacts")]
580 if let Some(service) = artifact_service_clone.clone() {
581 let scoped = adk_artifact::ScopedArtifacts::new(
582 service,
583 ctx.app_name().to_string(),
584 ctx.user_id().to_string(),
585 ctx.session_id().to_string(),
586 );
587 refreshed_ctx = refreshed_ctx.with_artifacts(Arc::new(scoped));
588 }
589 if let Some(memory) = memory_service_clone.clone() {
590 refreshed_ctx = refreshed_ctx.with_memory(memory);
591 }
592 refreshed_ctx = refreshed_ctx.with_run_config(run_config.clone());
593 if let Some(rc) = request_context.clone() {
594 refreshed_ctx = refreshed_ctx.with_request_context(rc);
595 }
596 ctx = Arc::new(refreshed_ctx);
597 }
598 }
599
600 if let Some(ref compactor) = intra_compactor {
604 compactor.reset_cycle();
605 let session_events = ctx.mutable_session().as_ref().events_snapshot();
606 match compactor.maybe_compact(&session_events).await {
607 Ok(Some(compacted_events)) => {
608 ctx.mutable_session().replace_events(compacted_events);
609 tracing::info!("intra-invocation compaction applied before agent execution");
610 }
611 Ok(None) => {} Err(e) => {
613 tracing::warn!(error = %e, "intra-invocation compaction check failed");
614 }
615 }
616 }
617
618 #[cfg(feature = "context-compaction")]
623 if let Some(ref cc_config) = context_compaction {
624 let session_events = ctx.mutable_session().events_snapshot();
625 let estimated = crate::compaction::estimate_event_tokens(&session_events);
626 if estimated > cc_config.context_budget {
627 tracing::info!(
628 estimated_tokens = estimated,
629 budget = cc_config.context_budget,
630 "context exceeds budget, applying proactive compaction"
631 );
632 match crate::compaction::apply_compaction_with_retry(cc_config, session_events).await {
633 Ok(compacted) => {
634 ctx.mutable_session().replace_events(compacted);
635 tracing::info!("proactive context compaction succeeded");
636 }
637 Err(e) => {
638 tracing::warn!(error = %e, "proactive context compaction failed, proceeding with full context");
641 }
642 }
643 }
644 }
645
646 let agent_span = tracing::info_span!(
648 "agent.execute",
649 "gcp.vertex.agent.invocation_id" = ctx.invocation_id(),
650 "gcp.vertex.agent.session_id" = ctx.session_id(),
651 "gcp.vertex.agent.event_id" = ctx.invocation_id(), "gen_ai.conversation.id" = ctx.session_id(),
653 "adk.app_name" = ctx.app_name(),
654 "adk.user_id" = ctx.user_id(),
655 "agent.name" = %agent_to_run.name(),
656 "adk.skills.selected_name" = %selected_skill_name,
657 "adk.skills.selected_id" = %selected_skill_id
658 );
659
660 let mut agent_stream = match agent_to_run.run(ctx.clone()).instrument(agent_span.clone()).await {
661 Ok(s) => s,
662 #[cfg(feature = "context-compaction")]
663 Err(e) if context_compaction.is_some() && crate::compaction::is_token_limit_error(&e) => {
664 let cc_config = context_compaction.as_ref().unwrap();
666 tracing::warn!(
667 error = %e,
668 "agent execution failed with token limit error, attempting compaction"
669 );
670 let session_events = ctx.mutable_session().events_snapshot();
671 match crate::compaction::apply_compaction_with_retry(cc_config, session_events).await {
672 Ok(compacted) => {
673 ctx.mutable_session().replace_events(compacted);
674 tracing::info!("context compaction succeeded after token limit error, retrying agent");
675 match agent_to_run.run(ctx.clone()).instrument(agent_span).await {
677 Ok(s) => s,
678 Err(retry_err) => {
679 #[cfg(feature = "plugins")]
680 if let Some(manager) = plugin_manager.as_ref() {
681 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
682 }
683 yield Err(retry_err);
684 return;
685 }
686 }
687 }
688 Err(compaction_err) => {
689 #[cfg(feature = "plugins")]
690 if let Some(manager) = plugin_manager.as_ref() {
691 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
692 }
693 yield Err(compaction_err);
694 return;
695 }
696 }
697 }
698 Err(e) => {
699 #[cfg(feature = "plugins")]
700 if let Some(manager) = plugin_manager.as_ref() {
701 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
702 }
703 yield Err(e);
704 return;
705 }
706 };
707
708 use futures::StreamExt;
710 let mut transfer_target: Option<String> = None;
711
712 while let Some(result) = {
713 if let Some(token) = cancellation_token.as_ref()
714 && token.is_cancelled() {
715 #[cfg(feature = "plugins")]
716 if let Some(manager) = plugin_manager.as_ref() {
717 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
718 }
719 return;
720 }
721 agent_stream.next().await
722 } {
723 match result {
724 Ok(event) => {
725 #[cfg(feature = "plugins")]
726 let mut event = event;
727
728 #[cfg(feature = "plugins")]
729 if let Some(manager) = plugin_manager.as_ref() {
730 match manager
731 .run_on_event(
732 ctx.clone() as Arc<dyn adk_core::InvocationContext>,
733 event.clone(),
734 )
735 .await
736 {
737 Ok(Some(modified)) => {
738 event = modified;
739 }
740 Ok(None) => {}
741 Err(e) => {
742 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
743 yield Err(e);
744 return;
745 }
746 }
747 }
748
749 if let Some(target) = &event.actions.transfer_to_agent {
751 transfer_target = Some(target.clone());
752 }
753
754 if !event.actions.state_delta.is_empty() {
760 ctx.mutable_session().apply_state_delta(&event.actions.state_delta);
761 }
762
763 ctx.mutable_session().append_event(event.clone());
765
766 if !event.llm_response.partial
773 && let Err(e) = session_service.append_event(ctx.session_id(), event.clone()).await {
774 #[cfg(feature = "plugins")]
775 if let Some(manager) = plugin_manager.as_ref() {
776 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
777 }
778 yield Err(e);
779 return;
780 }
781 yield Ok(event);
782 }
783 Err(e) => {
784 #[cfg(feature = "plugins")]
785 if let Some(manager) = plugin_manager.as_ref() {
786 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
787 }
788 yield Err(e);
789 return;
790 }
791 }
792 }
793
794 const DEFAULT_MAX_TRANSFER_DEPTH: u32 = 10;
801 let max_depth = run_config.max_transfer_depth.unwrap_or(DEFAULT_MAX_TRANSFER_DEPTH);
802 let mut transfer_depth: u32 = 0;
803 let mut current_transfer_target = transfer_target;
804
805 while let Some(target_name) = current_transfer_target.take() {
806 transfer_depth += 1;
807 if transfer_depth > max_depth {
808 tracing::warn!(
809 depth = transfer_depth,
810 target = %target_name,
811 "max transfer depth exceeded, stopping transfer chain"
812 );
813 break;
814 }
815
816 let target_agent = match Self::find_agent(&root_agent, &target_name) {
817 Some(a) => a,
818 None => {
819 tracing::warn!(target = %target_name, "transfer target not found in agent tree");
820 break;
821 }
822 };
823
824 let (parent_name, peer_names) = Self::compute_transfer_context(&root_agent, &target_name);
829
830 let mut transfer_run_config = run_config.clone();
831 let mut targets = Vec::new();
832 if let Some(ref parent) = parent_name {
833 targets.push(parent.clone());
834 }
835 targets.extend(peer_names);
836 transfer_run_config.transfer_targets = targets;
837 transfer_run_config.parent_agent = parent_name;
838
839 let transfer_invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
841 let mut transfer_ctx = match InvocationContext::with_mutable_session(
842 transfer_invocation_id.clone(),
843 target_agent.clone(),
844 ctx.user_id().to_string(),
845 ctx.app_name().to_string(),
846 ctx.session_id().to_string(),
847 effective_user_content.clone(),
848 ctx.mutable_session().clone(),
849 ) {
850 Ok(ctx) => ctx,
851 Err(e) => {
852 yield Err(e);
853 return;
854 }
855 };
856
857 #[cfg(feature = "artifacts")]
858 if let Some(ref service) = artifact_service_clone {
859 let scoped = adk_artifact::ScopedArtifacts::new(
860 service.clone(),
861 ctx.app_name().to_string(),
862 ctx.user_id().to_string(),
863 ctx.session_id().to_string(),
864 );
865 transfer_ctx = transfer_ctx.with_artifacts(Arc::new(scoped));
866 }
867 if let Some(ref memory) = memory_service_clone {
868 transfer_ctx = transfer_ctx.with_memory(memory.clone());
869 }
870 transfer_ctx = transfer_ctx.with_run_config(transfer_run_config);
871 if let Some(rc) = request_context.clone() {
872 transfer_ctx = transfer_ctx.with_request_context(rc);
873 }
874
875 let transfer_ctx = Arc::new(transfer_ctx);
876
877 let mut transfer_stream = match target_agent.run(transfer_ctx.clone()).await {
879 Ok(s) => s,
880 Err(e) => {
881 #[cfg(feature = "plugins")]
882 if let Some(manager) = plugin_manager.as_ref() {
883 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
884 }
885 yield Err(e);
886 return;
887 }
888 };
889
890 while let Some(result) = {
892 if let Some(token) = cancellation_token.as_ref()
893 && token.is_cancelled() {
894 #[cfg(feature = "plugins")]
895 if let Some(manager) = plugin_manager.as_ref() {
896 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
897 }
898 return;
899 }
900 transfer_stream.next().await
901 } {
902 match result {
903 Ok(event) => {
904 #[cfg(feature = "plugins")]
905 let mut event = event;
906 #[cfg(feature = "plugins")]
907 if let Some(manager) = plugin_manager.as_ref() {
908 match manager
909 .run_on_event(
910 transfer_ctx.clone() as Arc<dyn adk_core::InvocationContext>,
911 event.clone(),
912 )
913 .await
914 {
915 Ok(Some(modified)) => {
916 event = modified;
917 }
918 Ok(None) => {}
919 Err(e) => {
920 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
921 yield Err(e);
922 return;
923 }
924 }
925 }
926
927 if let Some(target) = &event.actions.transfer_to_agent {
929 current_transfer_target = Some(target.clone());
930 }
931
932 if !event.actions.state_delta.is_empty() {
934 transfer_ctx.mutable_session().apply_state_delta(&event.actions.state_delta);
935 }
936
937 transfer_ctx.mutable_session().append_event(event.clone());
939
940 if !event.llm_response.partial
941 && let Err(e) = session_service.append_event(ctx.session_id(), event.clone()).await {
942 #[cfg(feature = "plugins")]
943 if let Some(manager) = plugin_manager.as_ref() {
944 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
945 }
946 yield Err(e);
947 return;
948 }
949 yield Ok(event);
950 }
951 Err(e) => {
952 #[cfg(feature = "plugins")]
953 if let Some(manager) = plugin_manager.as_ref() {
954 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
955 }
956 yield Err(e);
957 return;
958 }
959 }
960 }
961 }
962
963 if let Some(ref compaction_cfg) = compaction_config {
967 let event_count = ctx.mutable_session().as_ref().events_len();
968
969 if event_count > 0 {
970 let all_events = ctx.mutable_session().as_ref().events_snapshot();
971 let invocation_count = all_events.iter().filter(|e| e.author == "user").count()
972 as u32;
973
974 if invocation_count > 0
975 && invocation_count.is_multiple_of(compaction_cfg.compaction_interval)
976 {
977 let overlap = compaction_cfg.overlap_size as usize;
980
981 let user_msg_indices: Vec<usize> = all_events.iter()
983 .enumerate()
984 .filter(|(_, e)| e.author == "user")
985 .map(|(i, _)| i)
986 .collect();
987
988 let compact_up_to = if overlap == 0 {
991 all_events.len()
992 } else if user_msg_indices.len() > overlap {
993 user_msg_indices[user_msg_indices.len() - overlap]
995 } else {
996 0
998 };
999
1000 if compact_up_to > 0 {
1001 let events_to_compact = &all_events[..compact_up_to];
1002
1003 match compaction_cfg.summarizer.summarize_events(events_to_compact).await {
1004 Ok(Some(compaction_event)) => {
1005 if let Err(e) = session_service.append_event(
1007 ctx.session_id(),
1008 compaction_event.clone(),
1009 ).await {
1010 tracing::warn!(error = %e, "Failed to persist compaction event");
1011 } else {
1012 tracing::info!(
1013 compacted_events = compact_up_to,
1014 "Context compaction completed"
1015 );
1016 }
1017 }
1018 Ok(None) => {
1019 tracing::debug!("Compaction summarizer returned no result");
1020 }
1021 Err(e) => {
1022 tracing::warn!(error = %e, "Context compaction failed");
1024 }
1025 }
1026 }
1027 }
1028 }
1029 }
1030
1031 #[cfg(feature = "plugins")]
1032 if let Some(manager) = plugin_manager.as_ref() {
1033 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
1034 }
1035 };
1036
1037 Ok(Box::pin(s))
1038 }
1039
1040 pub async fn run_str(
1050 &self,
1051 user_id: &str,
1052 session_id: &str,
1053 user_content: Content,
1054 ) -> Result<EventStream> {
1055 let user_id = UserId::try_from(user_id)?;
1056 let session_id = SessionId::try_from(session_id)?;
1057 self.run(user_id, session_id, user_content).await
1058 }
1059
1060 pub fn interrupt(&self, session_id: &str) -> bool {
1085 let sessions = self.active_sessions.lock().unwrap_or_else(|e| e.into_inner());
1086 if let Some(token) = sessions.get(session_id) {
1087 tracing::info!(session.id = session_id, "interrupting running agent");
1088 token.cancel();
1089 true
1090 } else {
1091 tracing::debug!(session.id = session_id, "no active run to interrupt");
1092 false
1093 }
1094 }
1095
1096 pub fn active_session_ids(&self) -> Vec<String> {
1098 let sessions = self.active_sessions.lock().unwrap_or_else(|e| e.into_inner());
1099 sessions.keys().cloned().collect()
1100 }
1101
1102 #[cfg(feature = "context-compaction")]
1107 pub fn context_compaction(&self) -> Option<&crate::compaction::CompactionConfig> {
1108 self.context_compaction.as_deref()
1109 }
1110
1111 pub fn find_agent_to_run(
1113 root_agent: &Arc<dyn Agent>,
1114 session: &dyn adk_session::Session,
1115 ) -> Arc<dyn Agent> {
1116 let events = session.events();
1118 for i in (0..events.len()).rev() {
1119 if let Some(event) = events.at(i) {
1120 if let Some(target_name) = &event.actions.transfer_to_agent
1122 && let Some(agent) = Self::find_agent(root_agent, target_name)
1123 {
1124 return agent;
1125 }
1126
1127 if event.author == "user" {
1128 continue;
1129 }
1130
1131 if let Some(agent) = Self::find_agent(root_agent, &event.author) {
1133 if Self::is_transferable(root_agent, &agent) {
1135 return agent;
1136 }
1137 }
1138 }
1139 }
1140
1141 root_agent.clone()
1143 }
1144
1145 fn is_transferable(_root_agent: &Arc<dyn Agent>, _agent: &Arc<dyn Agent>) -> bool {
1155 true
1156 }
1157
1158 pub fn find_agent(current: &Arc<dyn Agent>, target_name: &str) -> Option<Arc<dyn Agent>> {
1160 if current.name() == target_name {
1161 return Some(current.clone());
1162 }
1163
1164 for sub_agent in current.sub_agents() {
1165 if let Some(found) = Self::find_agent(sub_agent, target_name) {
1166 return Some(found);
1167 }
1168 }
1169
1170 None
1171 }
1172
1173 pub fn compute_transfer_context(
1179 root: &Arc<dyn Agent>,
1180 target_name: &str,
1181 ) -> (Option<String>, Vec<String>) {
1182 if root.name() == target_name {
1184 return (None, Vec::new());
1185 }
1186
1187 fn find_parent(current: &Arc<dyn Agent>, target: &str) -> Option<Arc<dyn Agent>> {
1189 for sub in current.sub_agents() {
1190 if sub.name() == target {
1191 return Some(current.clone());
1192 }
1193 if let Some(found) = find_parent(sub, target) {
1194 return Some(found);
1195 }
1196 }
1197 None
1198 }
1199
1200 match find_parent(root, target_name) {
1201 Some(parent) => {
1202 let parent_name = parent.name().to_string();
1203 let peers: Vec<String> = parent
1204 .sub_agents()
1205 .iter()
1206 .filter(|a| a.name() != target_name)
1207 .map(|a| a.name().to_string())
1208 .collect();
1209 (Some(parent_name), peers)
1210 }
1211 None => (None, Vec::new()),
1212 }
1213 }
1214}