1use crate::InvocationContext;
2use crate::cache::CacheManager;
3#[cfg(feature = "artifacts")]
4use adk_artifact::ArtifactService;
5use adk_core::{
6 Agent, AppName, CacheCapable, Content, ContextCacheConfig, EventStream, Memory,
7 ReadonlyContext, Result, RunConfig, SessionId, UserId,
8};
9#[cfg(feature = "plugins")]
10use adk_plugin::PluginManager;
11use adk_session::SessionService;
12#[cfg(feature = "skills")]
13use adk_skill::{SkillInjector, SkillInjectorConfig};
14use async_stream::stream;
15use std::sync::Arc;
16use tokio_util::sync::CancellationToken;
17use tracing::Instrument;
18
19#[non_exhaustive]
23pub struct RunnerConfig {
24 pub app_name: String,
26 pub agent: Arc<dyn Agent>,
28 pub session_service: Arc<dyn SessionService>,
30 #[cfg(feature = "artifacts")]
31 pub artifact_service: Option<Arc<dyn ArtifactService>>,
33 pub memory_service: Option<Arc<dyn Memory>>,
35 #[cfg(feature = "plugins")]
36 pub plugin_manager: Option<Arc<PluginManager>>,
38 #[allow(dead_code)]
41 pub run_config: Option<RunConfig>,
42 pub compaction_config: Option<adk_core::EventsCompactionConfig>,
46 pub context_cache_config: Option<ContextCacheConfig>,
54 pub cache_capable: Option<Arc<dyn CacheCapable>>,
57 pub request_context: Option<adk_core::RequestContext>,
61 pub cancellation_token: Option<CancellationToken>,
63 pub intra_compaction_config: Option<adk_core::IntraCompactionConfig>,
67 pub intra_compaction_summarizer: Option<Arc<dyn adk_core::BaseEventsSummarizer>>,
70 #[cfg(feature = "context-compaction")]
78 pub context_compaction: Option<crate::compaction::CompactionConfig>,
79}
80
81pub struct Runner {
87 app_name: String,
88 root_agent: Arc<dyn Agent>,
89 session_service: Arc<dyn SessionService>,
90 #[cfg(feature = "artifacts")]
91 artifact_service: Option<Arc<dyn ArtifactService>>,
92 memory_service: Option<Arc<dyn Memory>>,
93 #[cfg(feature = "plugins")]
94 plugin_manager: Option<Arc<PluginManager>>,
95 #[cfg(feature = "skills")]
96 skill_injector: Option<Arc<SkillInjector>>,
97 run_config: RunConfig,
98 compaction_config: Option<adk_core::EventsCompactionConfig>,
99 context_cache_config: Option<ContextCacheConfig>,
100 cache_capable: Option<Arc<dyn CacheCapable>>,
101 cache_manager: Option<Arc<tokio::sync::Mutex<CacheManager>>>,
102 request_context: Option<adk_core::RequestContext>,
103 cancellation_token: Option<CancellationToken>,
104 intra_compactor: Option<Arc<crate::intra_compaction::IntraInvocationCompactor>>,
105 #[cfg(feature = "context-compaction")]
107 context_compaction: Option<Arc<crate::compaction::CompactionConfig>>,
108 active_sessions: Arc<std::sync::Mutex<std::collections::HashMap<String, CancellationToken>>>,
111}
112
113impl Runner {
114 pub fn builder() -> crate::builder::RunnerConfigBuilder<
130 crate::builder::NoAppName,
131 crate::builder::NoAgent,
132 crate::builder::NoSessionService,
133 > {
134 crate::builder::RunnerConfigBuilder::new()
135 }
136
137 pub fn new(config: RunnerConfig) -> Result<Self> {
141 let run_config = config.run_config.unwrap_or_default();
142
143 let effective_cache_config = config
146 .context_cache_config
147 .or_else(|| config.cache_capable.as_ref().map(|_| ContextCacheConfig::default()));
148
149 let cache_manager = effective_cache_config
150 .as_ref()
151 .map(|c| Arc::new(tokio::sync::Mutex::new(CacheManager::new(c.clone()))));
152
153 let intra_compactor = config.intra_compaction_config.as_ref().and_then(|ic_config| {
154 config.intra_compaction_summarizer.as_ref().map(|summarizer| {
155 Arc::new(crate::intra_compaction::IntraInvocationCompactor::new(
156 ic_config.clone(),
157 summarizer.clone(),
158 ))
159 })
160 });
161
162 Ok(Self {
163 app_name: config.app_name,
164 root_agent: config.agent,
165 session_service: config.session_service,
166 #[cfg(feature = "artifacts")]
167 artifact_service: config.artifact_service,
168 memory_service: config.memory_service,
169 #[cfg(feature = "plugins")]
170 plugin_manager: config.plugin_manager,
171 #[cfg(feature = "skills")]
172 skill_injector: None,
173 run_config,
174 compaction_config: config.compaction_config,
175 context_cache_config: effective_cache_config,
176 cache_capable: config.cache_capable,
177 cache_manager,
178 request_context: config.request_context,
179 cancellation_token: config.cancellation_token,
180 intra_compactor,
181 #[cfg(feature = "context-compaction")]
182 context_compaction: config.context_compaction.map(Arc::new),
183 active_sessions: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
184 })
185 }
186
187 #[cfg(feature = "skills")]
191 pub fn with_skill_injector(mut self, injector: SkillInjector) -> Self {
192 self.skill_injector = Some(Arc::new(injector));
193 self
194 }
195
196 #[cfg(feature = "skills")]
198 #[deprecated(note = "Use with_auto_skills_mut instead")]
199 pub fn with_auto_skills(
200 mut self,
201 root: impl AsRef<std::path::Path>,
202 config: SkillInjectorConfig,
203 ) -> adk_skill::SkillResult<Self> {
204 self.with_auto_skills_mut(root, config)?;
205 Ok(self)
206 }
207
208 #[cfg(feature = "skills")]
214 pub fn with_auto_skills_mut(
215 &mut self,
216 root: impl AsRef<std::path::Path>,
217 config: SkillInjectorConfig,
218 ) -> adk_skill::SkillResult<()> {
219 let injector = SkillInjector::from_root(root, config)?;
220 self.skill_injector = Some(Arc::new(injector));
221 Ok(())
222 }
223
224 pub async fn run(
229 &self,
230 user_id: UserId,
231 session_id: SessionId,
232 user_content: Content,
233 ) -> Result<EventStream> {
234 let app_name = self.app_name.clone();
235 let typed_app_name = AppName::try_from(app_name.clone())?;
236 let session_service = self.session_service.clone();
237 let root_agent = self.root_agent.clone();
238 #[cfg(feature = "artifacts")]
239 let artifact_service = self.artifact_service.clone();
240 let memory_service = self.memory_service.clone();
241 #[cfg(feature = "plugins")]
242 let plugin_manager = self.plugin_manager.clone();
243 #[cfg(feature = "skills")]
244 let skill_injector = self.skill_injector.clone();
245 let mut run_config = self.run_config.clone();
246 let compaction_config = self.compaction_config.clone();
247 let context_cache_config = self.context_cache_config.clone();
248 let cache_capable = self.cache_capable.clone();
249 let cache_manager_ref = self.cache_manager.clone();
250 let request_context = self.request_context.clone();
251 let cancellation_token = self.cancellation_token.clone();
252 let intra_compactor = self.intra_compactor.clone();
253 #[cfg(feature = "context-compaction")]
254 let context_compaction = self.context_compaction.clone();
255
256 let session_token = CancellationToken::new();
260 let session_id_str = session_id.as_str().to_string();
261 {
262 let mut sessions = self.active_sessions.lock().unwrap();
263 sessions.insert(session_id_str.clone(), session_token.clone());
264 }
265 let active_sessions = self.active_sessions.clone();
266
267 let effective_token = if let Some(ref global) = cancellation_token {
269 let combined = CancellationToken::new();
270 let combined_clone = combined.clone();
271 let global_clone = global.clone();
272 let session_clone = session_token.clone();
273 let combined_for_global = combined_clone.clone();
275 tokio::spawn(async move {
276 global_clone.cancelled().await;
277 combined_for_global.cancel();
278 });
279 let combined_for_session = combined_clone;
280 tokio::spawn(async move {
281 session_clone.cancelled().await;
282 combined_for_session.cancel();
283 });
284 Some(combined)
285 } else {
286 Some(session_token.clone())
287 };
288
289 let s = stream! {
290 struct SessionCleanup {
293 active_sessions: Arc<std::sync::Mutex<std::collections::HashMap<String, CancellationToken>>>,
294 session_id: String,
295 }
296 impl Drop for SessionCleanup {
297 fn drop(&mut self) {
298 let mut sessions = self.active_sessions.lock().unwrap();
299 sessions.remove(&self.session_id);
300 }
301 }
302 let _cleanup = SessionCleanup {
303 active_sessions: active_sessions.clone(),
304 session_id: session_id_str,
305 };
306
307 let cancellation_token = effective_token;
309 let session = match session_service
311 .get(adk_session::GetRequest {
312 app_name: app_name.clone(),
313 user_id: user_id.to_string(),
314 session_id: session_id.to_string(),
315 num_recent_events: run_config.history_max_events,
316 after: None,
317 })
318 .await
319 {
320 Ok(s) => s,
321 Err(e) => {
322 yield Err(e);
323 return;
324 }
325 };
326
327 let agent_to_run = Self::find_agent_to_run(&root_agent, session.as_ref());
329
330 #[cfg(feature = "artifacts")]
332 let artifact_service_clone = artifact_service.clone();
333 let memory_service_clone = memory_service.clone();
334
335 let invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
337 #[cfg(any(feature = "skills", feature = "plugins"))]
338 let mut effective_user_content = user_content.clone();
339 #[cfg(not(any(feature = "skills", feature = "plugins")))]
340 let effective_user_content = user_content.clone();
341 #[cfg(feature = "skills")]
342 let mut selected_skill_name = String::new();
343 #[cfg(not(feature = "skills"))]
344 let selected_skill_name = String::new();
345 #[cfg(feature = "skills")]
346 let mut selected_skill_id = String::new();
347 #[cfg(not(feature = "skills"))]
348 let selected_skill_id = String::new();
349
350 #[cfg(feature = "skills")]
351 if let Some(injector) = skill_injector.as_ref() {
352 if let Some(matched) = adk_skill::apply_skill_injection(
353 &mut effective_user_content,
354 injector.index(),
355 injector.policy(),
356 injector.max_injected_chars(),
357 ) {
358 selected_skill_name = matched.skill.name;
359 selected_skill_id = matched.skill.id;
360 }
361 }
362
363 let mut invocation_ctx = match InvocationContext::new_typed(
364 invocation_id.clone(),
365 agent_to_run.clone(),
366 user_id.clone(),
367 typed_app_name.clone(),
368 session_id.clone(),
369 effective_user_content.clone(),
370 Arc::from(session),
371 ) {
372 Ok(ctx) => ctx,
373 Err(e) => {
374 yield Err(e);
375 return;
376 }
377 };
378
379 #[cfg(feature = "artifacts")]
381 if let Some(service) = artifact_service {
382 let scoped = adk_artifact::ScopedArtifacts::new(
384 service,
385 app_name.clone(),
386 user_id.to_string(),
387 session_id.to_string(),
388 );
389 invocation_ctx = invocation_ctx.with_artifacts(Arc::new(scoped));
390 }
391 if let Some(memory) = memory_service {
392 invocation_ctx = invocation_ctx.with_memory(memory);
393 }
394
395 invocation_ctx = invocation_ctx.with_run_config(run_config.clone());
397
398 if let Some(rc) = request_context.clone() {
400 invocation_ctx = invocation_ctx.with_request_context(rc);
401 }
402
403 let mut ctx = Arc::new(invocation_ctx);
404
405 #[cfg(feature = "plugins")]
406 if let Some(manager) = plugin_manager.as_ref() {
407 match manager
408 .run_before_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>)
409 .await
410 {
411 Ok(Some(content)) => {
412 let mut early_event = adk_core::Event::new(ctx.invocation_id());
413 early_event.author = agent_to_run.name().to_string();
414 early_event.llm_response.content = Some(content);
415
416 ctx.mutable_session().append_event(early_event.clone());
417 if let Err(e) = session_service.append_event(ctx.session_id(), early_event.clone()).await {
418 yield Err(e);
419 return;
420 }
421
422 yield Ok(early_event);
423 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
424 return;
425 }
426 Ok(None) => {}
427 Err(e) => {
428 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
429 yield Err(e);
430 return;
431 }
432 }
433
434 match manager
435 .run_on_user_message(
436 ctx.clone() as Arc<dyn adk_core::InvocationContext>,
437 effective_user_content.clone(),
438 )
439 .await
440 {
441 Ok(Some(modified)) => {
442 effective_user_content = modified;
443
444 let mut refreshed_ctx = match InvocationContext::with_mutable_session(
445 ctx.invocation_id().to_string(),
446 agent_to_run.clone(),
447 ctx.user_id().to_string(),
448 ctx.app_name().to_string(),
449 ctx.session_id().to_string(),
450 effective_user_content.clone(),
451 ctx.mutable_session().clone(),
452 ) {
453 Ok(ctx) => ctx,
454 Err(e) => {
455 yield Err(e);
456 return;
457 }
458 };
459
460 #[cfg(feature = "artifacts")]
461 if let Some(service) = artifact_service_clone.clone() {
462 let scoped = adk_artifact::ScopedArtifacts::new(
463 service,
464 ctx.app_name().to_string(),
465 ctx.user_id().to_string(),
466 ctx.session_id().to_string(),
467 );
468 refreshed_ctx = refreshed_ctx.with_artifacts(Arc::new(scoped));
469 }
470 if let Some(memory) = memory_service_clone.clone() {
471 refreshed_ctx = refreshed_ctx.with_memory(memory);
472 }
473 refreshed_ctx = refreshed_ctx.with_run_config(run_config.clone());
474 if let Some(rc) = request_context.clone() {
475 refreshed_ctx = refreshed_ctx.with_request_context(rc);
476 }
477 ctx = Arc::new(refreshed_ctx);
478 }
479 Ok(None) => {}
480 Err(e) => {
481 if let Some(manager) = plugin_manager.as_ref() {
482 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
483 }
484 yield Err(e);
485 return;
486 }
487 }
488 }
489
490 let mut user_event = adk_core::Event::new(ctx.invocation_id());
492 user_event.author = "user".to_string();
493 user_event.llm_response.content = Some(effective_user_content.clone());
494
495 ctx.mutable_session().append_event(user_event.clone());
498
499 if let Err(e) = session_service.append_event(ctx.session_id(), user_event).await {
500 #[cfg(feature = "plugins")]
501 if let Some(manager) = plugin_manager.as_ref() {
502 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
503 }
504 yield Err(e);
505 return;
506 }
507
508 if let (Some(cm_mutex), Some(cache_model)) = (&cache_manager_ref, &cache_capable) {
513 let should_refresh_cache = {
514 let cm = cm_mutex.lock().await;
515 cm.is_enabled() && (cm.active_cache_name().is_none() || cm.needs_refresh())
516 };
517
518 if should_refresh_cache {
519 let system_instruction = agent_to_run.description().to_string();
523 let tools = std::collections::HashMap::new();
524 let ttl = context_cache_config.as_ref().map_or(600, |c| c.ttl_seconds);
525
526 match cache_model.create_cache(&system_instruction, &tools, ttl).await {
527 Ok(name) => {
528 let old_cache = {
529 let mut cm = cm_mutex.lock().await;
530 let old = cm.clear_active_cache();
531 cm.set_active_cache(name);
532 old
533 };
534
535 if let Some(old) = old_cache {
536 if let Err(e) = cache_model.delete_cache(&old).await {
537 tracing::warn!(
538 old_cache = %old,
539 error = %e,
540 "failed to delete old cache, proceeding with new cache"
541 );
542 }
543 }
544 }
545 Err(e) => {
546 tracing::warn!(
547 error = %e,
548 "cache creation failed, proceeding without cache"
549 );
550 }
551 }
552 }
553
554 let cache_name = {
556 let mut cm = cm_mutex.lock().await;
557 if cm.is_enabled() {
558 cm.record_invocation().map(str::to_string)
559 } else {
560 None
561 }
562 };
563
564 if let Some(cache_name) = cache_name {
565 run_config.cached_content = Some(cache_name);
566 let mut refreshed_ctx = match InvocationContext::with_mutable_session(
568 ctx.invocation_id().to_string(),
569 agent_to_run.clone(),
570 ctx.user_id().to_string(),
571 ctx.app_name().to_string(),
572 ctx.session_id().to_string(),
573 effective_user_content.clone(),
574 ctx.mutable_session().clone(),
575 ) {
576 Ok(ctx) => ctx,
577 Err(e) => {
578 yield Err(e);
579 return;
580 }
581 };
582 #[cfg(feature = "artifacts")]
583 if let Some(service) = artifact_service_clone.clone() {
584 let scoped = adk_artifact::ScopedArtifacts::new(
585 service,
586 ctx.app_name().to_string(),
587 ctx.user_id().to_string(),
588 ctx.session_id().to_string(),
589 );
590 refreshed_ctx = refreshed_ctx.with_artifacts(Arc::new(scoped));
591 }
592 if let Some(memory) = memory_service_clone.clone() {
593 refreshed_ctx = refreshed_ctx.with_memory(memory);
594 }
595 refreshed_ctx = refreshed_ctx.with_run_config(run_config.clone());
596 if let Some(rc) = request_context.clone() {
597 refreshed_ctx = refreshed_ctx.with_request_context(rc);
598 }
599 ctx = Arc::new(refreshed_ctx);
600 }
601 }
602
603 if let Some(ref compactor) = intra_compactor {
607 compactor.reset_cycle();
608 let session_events = ctx.mutable_session().as_ref().events_snapshot();
609 match compactor.maybe_compact(&session_events).await {
610 Ok(Some(compacted_events)) => {
611 ctx.mutable_session().replace_events(compacted_events);
612 tracing::info!("intra-invocation compaction applied before agent execution");
613 }
614 Ok(None) => {} Err(e) => {
616 tracing::warn!(error = %e, "intra-invocation compaction check failed");
617 }
618 }
619 }
620
621 #[cfg(feature = "context-compaction")]
626 if let Some(ref cc_config) = context_compaction {
627 let session_events = ctx.mutable_session().events_snapshot();
628 let estimated = crate::compaction::estimate_event_tokens(&session_events);
629 if estimated > cc_config.context_budget {
630 tracing::info!(
631 estimated_tokens = estimated,
632 budget = cc_config.context_budget,
633 "context exceeds budget, applying proactive compaction"
634 );
635 match crate::compaction::apply_compaction_with_retry(cc_config, session_events).await {
636 Ok(compacted) => {
637 ctx.mutable_session().replace_events(compacted);
638 tracing::info!("proactive context compaction succeeded");
639 }
640 Err(e) => {
641 tracing::warn!(error = %e, "proactive context compaction failed, proceeding with full context");
644 }
645 }
646 }
647 }
648
649 let agent_span = tracing::info_span!(
651 "agent.execute",
652 "gcp.vertex.agent.invocation_id" = ctx.invocation_id(),
653 "gcp.vertex.agent.session_id" = ctx.session_id(),
654 "gcp.vertex.agent.event_id" = ctx.invocation_id(), "gen_ai.conversation.id" = ctx.session_id(),
656 "adk.app_name" = ctx.app_name(),
657 "adk.user_id" = ctx.user_id(),
658 "agent.name" = %agent_to_run.name(),
659 "adk.skills.selected_name" = %selected_skill_name,
660 "adk.skills.selected_id" = %selected_skill_id
661 );
662
663 let mut agent_stream = match agent_to_run.run(ctx.clone()).instrument(agent_span.clone()).await {
664 Ok(s) => s,
665 #[cfg(feature = "context-compaction")]
666 Err(e) if context_compaction.is_some() && crate::compaction::is_token_limit_error(&e) => {
667 let cc_config = context_compaction.as_ref().unwrap();
669 tracing::warn!(
670 error = %e,
671 "agent execution failed with token limit error, attempting compaction"
672 );
673 let session_events = ctx.mutable_session().events_snapshot();
674 match crate::compaction::apply_compaction_with_retry(cc_config, session_events).await {
675 Ok(compacted) => {
676 ctx.mutable_session().replace_events(compacted);
677 tracing::info!("context compaction succeeded after token limit error, retrying agent");
678 match agent_to_run.run(ctx.clone()).instrument(agent_span).await {
680 Ok(s) => s,
681 Err(retry_err) => {
682 #[cfg(feature = "plugins")]
683 if let Some(manager) = plugin_manager.as_ref() {
684 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
685 }
686 yield Err(retry_err);
687 return;
688 }
689 }
690 }
691 Err(compaction_err) => {
692 #[cfg(feature = "plugins")]
693 if let Some(manager) = plugin_manager.as_ref() {
694 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
695 }
696 yield Err(compaction_err);
697 return;
698 }
699 }
700 }
701 Err(e) => {
702 #[cfg(feature = "plugins")]
703 if let Some(manager) = plugin_manager.as_ref() {
704 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
705 }
706 yield Err(e);
707 return;
708 }
709 };
710
711 use futures::StreamExt;
713 let mut transfer_target: Option<String> = None;
714
715 while let Some(result) = {
716 if let Some(token) = cancellation_token.as_ref() {
717 if token.is_cancelled() {
718 #[cfg(feature = "plugins")]
719 if let Some(manager) = plugin_manager.as_ref() {
720 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
721 }
722 return;
723 }
724 }
725 agent_stream.next().await
726 } {
727 match result {
728 Ok(event) => {
729 #[cfg(feature = "plugins")]
730 let mut event = event;
731
732 #[cfg(feature = "plugins")]
733 if let Some(manager) = plugin_manager.as_ref() {
734 match manager
735 .run_on_event(
736 ctx.clone() as Arc<dyn adk_core::InvocationContext>,
737 event.clone(),
738 )
739 .await
740 {
741 Ok(Some(modified)) => {
742 event = modified;
743 }
744 Ok(None) => {}
745 Err(e) => {
746 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
747 yield Err(e);
748 return;
749 }
750 }
751 }
752
753 if let Some(target) = &event.actions.transfer_to_agent {
755 transfer_target = Some(target.clone());
756 }
757
758 if !event.actions.state_delta.is_empty() {
764 ctx.mutable_session().apply_state_delta(&event.actions.state_delta);
765 }
766
767 ctx.mutable_session().append_event(event.clone());
769
770 if !event.llm_response.partial {
777 if let Err(e) = session_service.append_event(ctx.session_id(), event.clone()).await {
778 #[cfg(feature = "plugins")]
779 if let Some(manager) = plugin_manager.as_ref() {
780 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
781 }
782 yield Err(e);
783 return;
784 }
785 }
786 yield Ok(event);
787 }
788 Err(e) => {
789 #[cfg(feature = "plugins")]
790 if let Some(manager) = plugin_manager.as_ref() {
791 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
792 }
793 yield Err(e);
794 return;
795 }
796 }
797 }
798
799 const MAX_TRANSFER_DEPTH: u32 = 10;
806 let mut transfer_depth: u32 = 0;
807 let mut current_transfer_target = transfer_target;
808
809 while let Some(target_name) = current_transfer_target.take() {
810 transfer_depth += 1;
811 if transfer_depth > MAX_TRANSFER_DEPTH {
812 tracing::warn!(
813 depth = transfer_depth,
814 target = %target_name,
815 "max transfer depth exceeded, stopping transfer chain"
816 );
817 break;
818 }
819
820 let target_agent = match Self::find_agent(&root_agent, &target_name) {
821 Some(a) => a,
822 None => {
823 tracing::warn!(target = %target_name, "transfer target not found in agent tree");
824 break;
825 }
826 };
827
828 let (parent_name, peer_names) = Self::compute_transfer_context(&root_agent, &target_name);
833
834 let mut transfer_run_config = run_config.clone();
835 let mut targets = Vec::new();
836 if let Some(ref parent) = parent_name {
837 targets.push(parent.clone());
838 }
839 targets.extend(peer_names);
840 transfer_run_config.transfer_targets = targets;
841 transfer_run_config.parent_agent = parent_name;
842
843 let transfer_invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
845 let mut transfer_ctx = match InvocationContext::with_mutable_session(
846 transfer_invocation_id.clone(),
847 target_agent.clone(),
848 ctx.user_id().to_string(),
849 ctx.app_name().to_string(),
850 ctx.session_id().to_string(),
851 effective_user_content.clone(),
852 ctx.mutable_session().clone(),
853 ) {
854 Ok(ctx) => ctx,
855 Err(e) => {
856 yield Err(e);
857 return;
858 }
859 };
860
861 #[cfg(feature = "artifacts")]
862 if let Some(ref service) = artifact_service_clone {
863 let scoped = adk_artifact::ScopedArtifacts::new(
864 service.clone(),
865 ctx.app_name().to_string(),
866 ctx.user_id().to_string(),
867 ctx.session_id().to_string(),
868 );
869 transfer_ctx = transfer_ctx.with_artifacts(Arc::new(scoped));
870 }
871 if let Some(ref memory) = memory_service_clone {
872 transfer_ctx = transfer_ctx.with_memory(memory.clone());
873 }
874 transfer_ctx = transfer_ctx.with_run_config(transfer_run_config);
875 if let Some(rc) = request_context.clone() {
876 transfer_ctx = transfer_ctx.with_request_context(rc);
877 }
878
879 let transfer_ctx = Arc::new(transfer_ctx);
880
881 let mut transfer_stream = match target_agent.run(transfer_ctx.clone()).await {
883 Ok(s) => s,
884 Err(e) => {
885 #[cfg(feature = "plugins")]
886 if let Some(manager) = plugin_manager.as_ref() {
887 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
888 }
889 yield Err(e);
890 return;
891 }
892 };
893
894 while let Some(result) = {
896 if let Some(token) = cancellation_token.as_ref() {
897 if token.is_cancelled() {
898 #[cfg(feature = "plugins")]
899 if let Some(manager) = plugin_manager.as_ref() {
900 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
901 }
902 return;
903 }
904 }
905 transfer_stream.next().await
906 } {
907 match result {
908 Ok(event) => {
909 #[cfg(feature = "plugins")]
910 let mut event = event;
911 #[cfg(feature = "plugins")]
912 if let Some(manager) = plugin_manager.as_ref() {
913 match manager
914 .run_on_event(
915 transfer_ctx.clone() as Arc<dyn adk_core::InvocationContext>,
916 event.clone(),
917 )
918 .await
919 {
920 Ok(Some(modified)) => {
921 event = modified;
922 }
923 Ok(None) => {}
924 Err(e) => {
925 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
926 yield Err(e);
927 return;
928 }
929 }
930 }
931
932 if let Some(target) = &event.actions.transfer_to_agent {
934 current_transfer_target = Some(target.clone());
935 }
936
937 if !event.actions.state_delta.is_empty() {
939 transfer_ctx.mutable_session().apply_state_delta(&event.actions.state_delta);
940 }
941
942 transfer_ctx.mutable_session().append_event(event.clone());
944
945 if !event.llm_response.partial {
946 if let Err(e) = session_service.append_event(ctx.session_id(), event.clone()).await {
947 #[cfg(feature = "plugins")]
948 if let Some(manager) = plugin_manager.as_ref() {
949 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
950 }
951 yield Err(e);
952 return;
953 }
954 }
955 yield Ok(event);
956 }
957 Err(e) => {
958 #[cfg(feature = "plugins")]
959 if let Some(manager) = plugin_manager.as_ref() {
960 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
961 }
962 yield Err(e);
963 return;
964 }
965 }
966 }
967 }
968
969 if let Some(ref compaction_cfg) = compaction_config {
973 let event_count = ctx.mutable_session().as_ref().events_len();
974
975 if event_count > 0 {
976 let all_events = ctx.mutable_session().as_ref().events_snapshot();
977 let invocation_count = all_events.iter().filter(|e| e.author == "user").count()
978 as u32;
979
980 if invocation_count > 0
981 && invocation_count % compaction_cfg.compaction_interval == 0
982 {
983 let overlap = compaction_cfg.overlap_size as usize;
986
987 let user_msg_indices: Vec<usize> = all_events.iter()
989 .enumerate()
990 .filter(|(_, e)| e.author == "user")
991 .map(|(i, _)| i)
992 .collect();
993
994 let compact_up_to = if overlap == 0 {
997 all_events.len()
998 } else if user_msg_indices.len() > overlap {
999 user_msg_indices[user_msg_indices.len() - overlap]
1001 } else {
1002 0
1004 };
1005
1006 if compact_up_to > 0 {
1007 let events_to_compact = &all_events[..compact_up_to];
1008
1009 match compaction_cfg.summarizer.summarize_events(events_to_compact).await {
1010 Ok(Some(compaction_event)) => {
1011 if let Err(e) = session_service.append_event(
1013 ctx.session_id(),
1014 compaction_event.clone(),
1015 ).await {
1016 tracing::warn!(error = %e, "Failed to persist compaction event");
1017 } else {
1018 tracing::info!(
1019 compacted_events = compact_up_to,
1020 "Context compaction completed"
1021 );
1022 }
1023 }
1024 Ok(None) => {
1025 tracing::debug!("Compaction summarizer returned no result");
1026 }
1027 Err(e) => {
1028 tracing::warn!(error = %e, "Context compaction failed");
1030 }
1031 }
1032 }
1033 }
1034 }
1035 }
1036
1037 #[cfg(feature = "plugins")]
1038 if let Some(manager) = plugin_manager.as_ref() {
1039 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
1040 }
1041 };
1042
1043 Ok(Box::pin(s))
1044 }
1045
1046 pub async fn run_str(
1056 &self,
1057 user_id: &str,
1058 session_id: &str,
1059 user_content: Content,
1060 ) -> Result<EventStream> {
1061 let user_id = UserId::try_from(user_id)?;
1062 let session_id = SessionId::try_from(session_id)?;
1063 self.run(user_id, session_id, user_content).await
1064 }
1065
1066 pub fn interrupt(&self, session_id: &str) -> bool {
1091 let sessions = self.active_sessions.lock().unwrap();
1092 if let Some(token) = sessions.get(session_id) {
1093 tracing::info!(session.id = session_id, "interrupting running agent");
1094 token.cancel();
1095 true
1096 } else {
1097 tracing::debug!(session.id = session_id, "no active run to interrupt");
1098 false
1099 }
1100 }
1101
1102 pub fn active_session_ids(&self) -> Vec<String> {
1104 let sessions = self.active_sessions.lock().unwrap();
1105 sessions.keys().cloned().collect()
1106 }
1107
1108 #[cfg(feature = "context-compaction")]
1113 pub fn context_compaction(&self) -> Option<&crate::compaction::CompactionConfig> {
1114 self.context_compaction.as_deref()
1115 }
1116
1117 pub fn find_agent_to_run(
1119 root_agent: &Arc<dyn Agent>,
1120 session: &dyn adk_session::Session,
1121 ) -> Arc<dyn Agent> {
1122 let events = session.events();
1124 for i in (0..events.len()).rev() {
1125 if let Some(event) = events.at(i) {
1126 if let Some(target_name) = &event.actions.transfer_to_agent {
1128 if let Some(agent) = Self::find_agent(root_agent, target_name) {
1129 return agent;
1130 }
1131 }
1132
1133 if event.author == "user" {
1134 continue;
1135 }
1136
1137 if let Some(agent) = Self::find_agent(root_agent, &event.author) {
1139 if Self::is_transferable(root_agent, &agent) {
1141 return agent;
1142 }
1143 }
1144 }
1145 }
1146
1147 root_agent.clone()
1149 }
1150
1151 fn is_transferable(_root_agent: &Arc<dyn Agent>, _agent: &Arc<dyn Agent>) -> bool {
1161 true
1162 }
1163
1164 pub fn find_agent(current: &Arc<dyn Agent>, target_name: &str) -> Option<Arc<dyn Agent>> {
1166 if current.name() == target_name {
1167 return Some(current.clone());
1168 }
1169
1170 for sub_agent in current.sub_agents() {
1171 if let Some(found) = Self::find_agent(sub_agent, target_name) {
1172 return Some(found);
1173 }
1174 }
1175
1176 None
1177 }
1178
1179 pub fn compute_transfer_context(
1185 root: &Arc<dyn Agent>,
1186 target_name: &str,
1187 ) -> (Option<String>, Vec<String>) {
1188 if root.name() == target_name {
1190 return (None, Vec::new());
1191 }
1192
1193 fn find_parent(current: &Arc<dyn Agent>, target: &str) -> Option<Arc<dyn Agent>> {
1195 for sub in current.sub_agents() {
1196 if sub.name() == target {
1197 return Some(current.clone());
1198 }
1199 if let Some(found) = find_parent(sub, target) {
1200 return Some(found);
1201 }
1202 }
1203 None
1204 }
1205
1206 match find_parent(root, target_name) {
1207 Some(parent) => {
1208 let parent_name = parent.name().to_string();
1209 let peers: Vec<String> = parent
1210 .sub_agents()
1211 .iter()
1212 .filter(|a| a.name() != target_name)
1213 .map(|a| a.name().to_string())
1214 .collect();
1215 (Some(parent_name), peers)
1216 }
1217 None => (None, Vec::new()),
1218 }
1219 }
1220}