1use crate::InvocationContext;
2use crate::cache::CacheManager;
3use adk_artifact::ArtifactService;
4use adk_core::{
5 Agent, AppName, CacheCapable, Content, ContextCacheConfig, EventStream, Memory,
6 ReadonlyContext, Result, RunConfig, SessionId, UserId,
7};
8use adk_plugin::PluginManager;
9use adk_session::SessionService;
10use adk_skill::{SkillInjector, SkillInjectorConfig};
11use async_stream::stream;
12use std::sync::Arc;
13use tokio_util::sync::CancellationToken;
14use tracing::Instrument;
15
16pub struct RunnerConfig {
17 pub app_name: String,
18 pub agent: Arc<dyn Agent>,
19 pub session_service: Arc<dyn SessionService>,
20 pub artifact_service: Option<Arc<dyn ArtifactService>>,
21 pub memory_service: Option<Arc<dyn Memory>>,
22 pub plugin_manager: Option<Arc<PluginManager>>,
23 #[allow(dead_code)]
26 pub run_config: Option<RunConfig>,
27 pub compaction_config: Option<adk_core::EventsCompactionConfig>,
31 pub context_cache_config: Option<ContextCacheConfig>,
39 pub cache_capable: Option<Arc<dyn CacheCapable>>,
42 pub request_context: Option<adk_core::RequestContext>,
46 pub cancellation_token: Option<CancellationToken>,
48 pub intra_compaction_config: Option<adk_core::IntraCompactionConfig>,
52 pub intra_compaction_summarizer: Option<Arc<dyn adk_core::BaseEventsSummarizer>>,
55}
56
57pub struct Runner {
58 app_name: String,
59 root_agent: Arc<dyn Agent>,
60 session_service: Arc<dyn SessionService>,
61 artifact_service: Option<Arc<dyn ArtifactService>>,
62 memory_service: Option<Arc<dyn Memory>>,
63 plugin_manager: Option<Arc<PluginManager>>,
64 skill_injector: Option<Arc<SkillInjector>>,
65 run_config: RunConfig,
66 compaction_config: Option<adk_core::EventsCompactionConfig>,
67 context_cache_config: Option<ContextCacheConfig>,
68 cache_capable: Option<Arc<dyn CacheCapable>>,
69 cache_manager: Option<Arc<tokio::sync::Mutex<CacheManager>>>,
70 request_context: Option<adk_core::RequestContext>,
71 cancellation_token: Option<CancellationToken>,
72 intra_compactor: Option<Arc<crate::intra_compaction::IntraInvocationCompactor>>,
73 active_sessions: Arc<std::sync::Mutex<std::collections::HashMap<String, CancellationToken>>>,
76}
77
78impl Runner {
79 pub fn builder() -> crate::builder::RunnerConfigBuilder<
95 crate::builder::NoAppName,
96 crate::builder::NoAgent,
97 crate::builder::NoSessionService,
98 > {
99 crate::builder::RunnerConfigBuilder::new()
100 }
101
102 pub fn new(config: RunnerConfig) -> Result<Self> {
103 let run_config = config.run_config.unwrap_or_default();
104
105 let effective_cache_config = config
108 .context_cache_config
109 .or_else(|| config.cache_capable.as_ref().map(|_| ContextCacheConfig::default()));
110
111 let cache_manager = effective_cache_config
112 .as_ref()
113 .map(|c| Arc::new(tokio::sync::Mutex::new(CacheManager::new(c.clone()))));
114
115 let intra_compactor = config.intra_compaction_config.as_ref().and_then(|ic_config| {
116 config.intra_compaction_summarizer.as_ref().map(|summarizer| {
117 Arc::new(crate::intra_compaction::IntraInvocationCompactor::new(
118 ic_config.clone(),
119 summarizer.clone(),
120 ))
121 })
122 });
123
124 Ok(Self {
125 app_name: config.app_name,
126 root_agent: config.agent,
127 session_service: config.session_service,
128 artifact_service: config.artifact_service,
129 memory_service: config.memory_service,
130 plugin_manager: config.plugin_manager,
131 skill_injector: None,
132 run_config,
133 compaction_config: config.compaction_config,
134 context_cache_config: effective_cache_config,
135 cache_capable: config.cache_capable,
136 cache_manager,
137 request_context: config.request_context,
138 cancellation_token: config.cancellation_token,
139 intra_compactor,
140 active_sessions: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
141 })
142 }
143
144 pub fn with_skill_injector(mut self, injector: SkillInjector) -> Self {
148 self.skill_injector = Some(Arc::new(injector));
149 self
150 }
151
152 #[deprecated(note = "Use with_auto_skills_mut instead")]
154 pub fn with_auto_skills(
155 mut self,
156 root: impl AsRef<std::path::Path>,
157 config: SkillInjectorConfig,
158 ) -> adk_skill::SkillResult<Self> {
159 self.with_auto_skills_mut(root, config)?;
160 Ok(self)
161 }
162
163 pub fn with_auto_skills_mut(
169 &mut self,
170 root: impl AsRef<std::path::Path>,
171 config: SkillInjectorConfig,
172 ) -> adk_skill::SkillResult<()> {
173 let injector = SkillInjector::from_root(root, config)?;
174 self.skill_injector = Some(Arc::new(injector));
175 Ok(())
176 }
177
178 pub async fn run(
179 &self,
180 user_id: UserId,
181 session_id: SessionId,
182 user_content: Content,
183 ) -> Result<EventStream> {
184 let app_name = self.app_name.clone();
185 let typed_app_name = AppName::try_from(app_name.clone())?;
186 let session_service = self.session_service.clone();
187 let root_agent = self.root_agent.clone();
188 let artifact_service = self.artifact_service.clone();
189 let memory_service = self.memory_service.clone();
190 let plugin_manager = self.plugin_manager.clone();
191 let skill_injector = self.skill_injector.clone();
192 let mut run_config = self.run_config.clone();
193 let compaction_config = self.compaction_config.clone();
194 let context_cache_config = self.context_cache_config.clone();
195 let cache_capable = self.cache_capable.clone();
196 let cache_manager_ref = self.cache_manager.clone();
197 let request_context = self.request_context.clone();
198 let cancellation_token = self.cancellation_token.clone();
199 let intra_compactor = self.intra_compactor.clone();
200
201 let session_token = CancellationToken::new();
205 let session_id_str = session_id.as_str().to_string();
206 {
207 let mut sessions = self.active_sessions.lock().unwrap();
208 sessions.insert(session_id_str.clone(), session_token.clone());
209 }
210 let active_sessions = self.active_sessions.clone();
211
212 let effective_token = if let Some(ref global) = cancellation_token {
214 let combined = CancellationToken::new();
215 let combined_clone = combined.clone();
216 let global_clone = global.clone();
217 let session_clone = session_token.clone();
218 let combined_for_global = combined_clone.clone();
220 tokio::spawn(async move {
221 global_clone.cancelled().await;
222 combined_for_global.cancel();
223 });
224 let combined_for_session = combined_clone;
225 tokio::spawn(async move {
226 session_clone.cancelled().await;
227 combined_for_session.cancel();
228 });
229 Some(combined)
230 } else {
231 Some(session_token.clone())
232 };
233
234 let s = stream! {
235 struct SessionCleanup {
238 active_sessions: Arc<std::sync::Mutex<std::collections::HashMap<String, CancellationToken>>>,
239 session_id: String,
240 }
241 impl Drop for SessionCleanup {
242 fn drop(&mut self) {
243 let mut sessions = self.active_sessions.lock().unwrap();
244 sessions.remove(&self.session_id);
245 }
246 }
247 let _cleanup = SessionCleanup {
248 active_sessions: active_sessions.clone(),
249 session_id: session_id_str,
250 };
251
252 let cancellation_token = effective_token;
254 let session = match session_service
256 .get(adk_session::GetRequest {
257 app_name: app_name.clone(),
258 user_id: user_id.to_string(),
259 session_id: session_id.to_string(),
260 num_recent_events: None,
261 after: None,
262 })
263 .await
264 {
265 Ok(s) => s,
266 Err(e) => {
267 yield Err(e);
268 return;
269 }
270 };
271
272 let agent_to_run = Self::find_agent_to_run(&root_agent, session.as_ref());
274
275 let artifact_service_clone = artifact_service.clone();
277 let memory_service_clone = memory_service.clone();
278
279 let invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
281 let mut effective_user_content = user_content.clone();
282 let mut selected_skill_name = String::new();
283 let mut selected_skill_id = String::new();
284
285 if let Some(injector) = skill_injector.as_ref() {
286 if let Some(matched) = adk_skill::apply_skill_injection(
287 &mut effective_user_content,
288 injector.index(),
289 injector.policy(),
290 injector.max_injected_chars(),
291 ) {
292 selected_skill_name = matched.skill.name;
293 selected_skill_id = matched.skill.id;
294 }
295 }
296
297 let mut invocation_ctx = match InvocationContext::new_typed(
298 invocation_id.clone(),
299 agent_to_run.clone(),
300 user_id.clone(),
301 typed_app_name.clone(),
302 session_id.clone(),
303 effective_user_content.clone(),
304 Arc::from(session),
305 ) {
306 Ok(ctx) => ctx,
307 Err(e) => {
308 yield Err(e);
309 return;
310 }
311 };
312
313 if let Some(service) = artifact_service {
315 let scoped = adk_artifact::ScopedArtifacts::new(
317 service,
318 app_name.clone(),
319 user_id.to_string(),
320 session_id.to_string(),
321 );
322 invocation_ctx = invocation_ctx.with_artifacts(Arc::new(scoped));
323 }
324 if let Some(memory) = memory_service {
325 invocation_ctx = invocation_ctx.with_memory(memory);
326 }
327
328 invocation_ctx = invocation_ctx.with_run_config(run_config.clone());
330
331 if let Some(rc) = request_context.clone() {
333 invocation_ctx = invocation_ctx.with_request_context(rc);
334 }
335
336 let mut ctx = Arc::new(invocation_ctx);
337
338 if let Some(manager) = plugin_manager.as_ref() {
339 match manager
340 .run_before_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>)
341 .await
342 {
343 Ok(Some(content)) => {
344 let mut early_event = adk_core::Event::new(ctx.invocation_id());
345 early_event.author = agent_to_run.name().to_string();
346 early_event.llm_response.content = Some(content);
347
348 ctx.mutable_session().append_event(early_event.clone());
349 if let Err(e) = session_service.append_event(ctx.session_id(), early_event.clone()).await {
350 yield Err(e);
351 return;
352 }
353
354 yield Ok(early_event);
355 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
356 return;
357 }
358 Ok(None) => {}
359 Err(e) => {
360 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
361 yield Err(e);
362 return;
363 }
364 }
365
366 match manager
367 .run_on_user_message(
368 ctx.clone() as Arc<dyn adk_core::InvocationContext>,
369 effective_user_content.clone(),
370 )
371 .await
372 {
373 Ok(Some(modified)) => {
374 effective_user_content = modified;
375
376 let mut refreshed_ctx = match InvocationContext::with_mutable_session(
377 ctx.invocation_id().to_string(),
378 agent_to_run.clone(),
379 ctx.user_id().to_string(),
380 ctx.app_name().to_string(),
381 ctx.session_id().to_string(),
382 effective_user_content.clone(),
383 ctx.mutable_session().clone(),
384 ) {
385 Ok(ctx) => ctx,
386 Err(e) => {
387 yield Err(e);
388 return;
389 }
390 };
391
392 if let Some(service) = artifact_service_clone.clone() {
393 let scoped = adk_artifact::ScopedArtifacts::new(
394 service,
395 ctx.app_name().to_string(),
396 ctx.user_id().to_string(),
397 ctx.session_id().to_string(),
398 );
399 refreshed_ctx = refreshed_ctx.with_artifacts(Arc::new(scoped));
400 }
401 if let Some(memory) = memory_service_clone.clone() {
402 refreshed_ctx = refreshed_ctx.with_memory(memory);
403 }
404 refreshed_ctx = refreshed_ctx.with_run_config(run_config.clone());
405 if let Some(rc) = request_context.clone() {
406 refreshed_ctx = refreshed_ctx.with_request_context(rc);
407 }
408 ctx = Arc::new(refreshed_ctx);
409 }
410 Ok(None) => {}
411 Err(e) => {
412 if let Some(manager) = plugin_manager.as_ref() {
413 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
414 }
415 yield Err(e);
416 return;
417 }
418 }
419 }
420
421 let mut user_event = adk_core::Event::new(ctx.invocation_id());
423 user_event.author = "user".to_string();
424 user_event.llm_response.content = Some(effective_user_content.clone());
425
426 ctx.mutable_session().append_event(user_event.clone());
429
430 if let Err(e) = session_service.append_event(ctx.session_id(), user_event).await {
431 if let Some(manager) = plugin_manager.as_ref() {
432 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
433 }
434 yield Err(e);
435 return;
436 }
437
438 if let (Some(cm_mutex), Some(cache_model)) = (&cache_manager_ref, &cache_capable) {
443 let mut cm = cm_mutex.lock().await;
444 if cm.is_enabled() {
445 if cm.active_cache_name().is_none() || cm.needs_refresh() {
446 let system_instruction = agent_to_run.description().to_string();
450 let tools = std::collections::HashMap::new();
451 let ttl = context_cache_config.as_ref().map_or(600, |c| c.ttl_seconds);
452
453 match cache_model.create_cache(&system_instruction, &tools, ttl).await {
454 Ok(name) => {
455 if let Some(old) = cm.clear_active_cache() {
457 if let Err(e) = cache_model.delete_cache(&old).await {
458 tracing::warn!(
459 old_cache = %old,
460 error = %e,
461 "failed to delete old cache, proceeding with new cache"
462 );
463 }
464 }
465 cm.set_active_cache(name);
466 }
467 Err(e) => {
468 tracing::warn!(
469 error = %e,
470 "cache creation failed, proceeding without cache"
471 );
472 }
473 }
474 }
475
476 if let Some(cache_name) = cm.record_invocation() {
478 run_config.cached_content = Some(cache_name.to_string());
479 let mut refreshed_ctx = match InvocationContext::with_mutable_session(
481 ctx.invocation_id().to_string(),
482 agent_to_run.clone(),
483 ctx.user_id().to_string(),
484 ctx.app_name().to_string(),
485 ctx.session_id().to_string(),
486 effective_user_content.clone(),
487 ctx.mutable_session().clone(),
488 ) {
489 Ok(ctx) => ctx,
490 Err(e) => {
491 yield Err(e);
492 return;
493 }
494 };
495 if let Some(service) = artifact_service_clone.clone() {
496 let scoped = adk_artifact::ScopedArtifacts::new(
497 service,
498 ctx.app_name().to_string(),
499 ctx.user_id().to_string(),
500 ctx.session_id().to_string(),
501 );
502 refreshed_ctx = refreshed_ctx.with_artifacts(Arc::new(scoped));
503 }
504 if let Some(memory) = memory_service_clone.clone() {
505 refreshed_ctx = refreshed_ctx.with_memory(memory);
506 }
507 refreshed_ctx = refreshed_ctx.with_run_config(run_config.clone());
508 if let Some(rc) = request_context.clone() {
509 refreshed_ctx = refreshed_ctx.with_request_context(rc);
510 }
511 ctx = Arc::new(refreshed_ctx);
512 }
513 }
514 }
515
516 if let Some(ref compactor) = intra_compactor {
520 compactor.reset_cycle();
521 let session_events = ctx.mutable_session().as_ref().events_snapshot();
522 match compactor.maybe_compact(&session_events).await {
523 Ok(Some(compacted_events)) => {
524 ctx.mutable_session().replace_events(compacted_events);
525 tracing::info!("intra-invocation compaction applied before agent execution");
526 }
527 Ok(None) => {} Err(e) => {
529 tracing::warn!(error = %e, "intra-invocation compaction check failed");
530 }
531 }
532 }
533
534 let agent_span = tracing::info_span!(
536 "agent.execute",
537 "gcp.vertex.agent.invocation_id" = ctx.invocation_id(),
538 "gcp.vertex.agent.session_id" = ctx.session_id(),
539 "gcp.vertex.agent.event_id" = ctx.invocation_id(), "gen_ai.conversation.id" = ctx.session_id(),
541 "adk.app_name" = ctx.app_name(),
542 "adk.user_id" = ctx.user_id(),
543 "agent.name" = %agent_to_run.name(),
544 "adk.skills.selected_name" = %selected_skill_name,
545 "adk.skills.selected_id" = %selected_skill_id
546 );
547
548 let mut agent_stream = match agent_to_run.run(ctx.clone()).instrument(agent_span).await {
549 Ok(s) => s,
550 Err(e) => {
551 if let Some(manager) = plugin_manager.as_ref() {
552 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
553 }
554 yield Err(e);
555 return;
556 }
557 };
558
559 use futures::StreamExt;
561 let mut transfer_target: Option<String> = None;
562
563 while let Some(result) = {
564 if let Some(token) = cancellation_token.as_ref() {
565 if token.is_cancelled() {
566 if let Some(manager) = plugin_manager.as_ref() {
567 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
568 }
569 return;
570 }
571 }
572 agent_stream.next().await
573 } {
574 match result {
575 Ok(event) => {
576 let mut event = event;
577
578 if let Some(manager) = plugin_manager.as_ref() {
579 match manager
580 .run_on_event(
581 ctx.clone() as Arc<dyn adk_core::InvocationContext>,
582 event.clone(),
583 )
584 .await
585 {
586 Ok(Some(modified)) => {
587 event = modified;
588 }
589 Ok(None) => {}
590 Err(e) => {
591 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
592 yield Err(e);
593 return;
594 }
595 }
596 }
597
598 if let Some(target) = &event.actions.transfer_to_agent {
600 transfer_target = Some(target.clone());
601 }
602
603 if !event.actions.state_delta.is_empty() {
609 ctx.mutable_session().apply_state_delta(&event.actions.state_delta);
610 }
611
612 ctx.mutable_session().append_event(event.clone());
614
615 if !event.llm_response.partial {
622 if let Err(e) = session_service.append_event(ctx.session_id(), event.clone()).await {
623 if let Some(manager) = plugin_manager.as_ref() {
624 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
625 }
626 yield Err(e);
627 return;
628 }
629 }
630 yield Ok(event);
631 }
632 Err(e) => {
633 if let Some(manager) = plugin_manager.as_ref() {
634 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
635 }
636 yield Err(e);
637 return;
638 }
639 }
640 }
641
642 const MAX_TRANSFER_DEPTH: u32 = 10;
649 let mut transfer_depth: u32 = 0;
650 let mut current_transfer_target = transfer_target;
651
652 while let Some(target_name) = current_transfer_target.take() {
653 transfer_depth += 1;
654 if transfer_depth > MAX_TRANSFER_DEPTH {
655 tracing::warn!(
656 depth = transfer_depth,
657 target = %target_name,
658 "max transfer depth exceeded, stopping transfer chain"
659 );
660 break;
661 }
662
663 let target_agent = match Self::find_agent(&root_agent, &target_name) {
664 Some(a) => a,
665 None => {
666 tracing::warn!(target = %target_name, "transfer target not found in agent tree");
667 break;
668 }
669 };
670
671 let (parent_name, peer_names) = Self::compute_transfer_context(&root_agent, &target_name);
676
677 let mut transfer_run_config = run_config.clone();
678 let mut targets = Vec::new();
679 if let Some(ref parent) = parent_name {
680 targets.push(parent.clone());
681 }
682 targets.extend(peer_names);
683 transfer_run_config.transfer_targets = targets;
684 transfer_run_config.parent_agent = parent_name;
685
686 let transfer_invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
688 let mut transfer_ctx = match InvocationContext::with_mutable_session(
689 transfer_invocation_id.clone(),
690 target_agent.clone(),
691 ctx.user_id().to_string(),
692 ctx.app_name().to_string(),
693 ctx.session_id().to_string(),
694 effective_user_content.clone(),
695 ctx.mutable_session().clone(),
696 ) {
697 Ok(ctx) => ctx,
698 Err(e) => {
699 yield Err(e);
700 return;
701 }
702 };
703
704 if let Some(ref service) = artifact_service_clone {
705 let scoped = adk_artifact::ScopedArtifacts::new(
706 service.clone(),
707 ctx.app_name().to_string(),
708 ctx.user_id().to_string(),
709 ctx.session_id().to_string(),
710 );
711 transfer_ctx = transfer_ctx.with_artifacts(Arc::new(scoped));
712 }
713 if let Some(ref memory) = memory_service_clone {
714 transfer_ctx = transfer_ctx.with_memory(memory.clone());
715 }
716 transfer_ctx = transfer_ctx.with_run_config(transfer_run_config);
717 if let Some(rc) = request_context.clone() {
718 transfer_ctx = transfer_ctx.with_request_context(rc);
719 }
720
721 let transfer_ctx = Arc::new(transfer_ctx);
722
723 let mut transfer_stream = match target_agent.run(transfer_ctx.clone()).await {
725 Ok(s) => s,
726 Err(e) => {
727 if let Some(manager) = plugin_manager.as_ref() {
728 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
729 }
730 yield Err(e);
731 return;
732 }
733 };
734
735 while let Some(result) = {
737 if let Some(token) = cancellation_token.as_ref() {
738 if token.is_cancelled() {
739 if let Some(manager) = plugin_manager.as_ref() {
740 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
741 }
742 return;
743 }
744 }
745 transfer_stream.next().await
746 } {
747 match result {
748 Ok(event) => {
749 let mut event = event;
750 if let Some(manager) = plugin_manager.as_ref() {
751 match manager
752 .run_on_event(
753 transfer_ctx.clone() as Arc<dyn adk_core::InvocationContext>,
754 event.clone(),
755 )
756 .await
757 {
758 Ok(Some(modified)) => {
759 event = modified;
760 }
761 Ok(None) => {}
762 Err(e) => {
763 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
764 yield Err(e);
765 return;
766 }
767 }
768 }
769
770 if let Some(target) = &event.actions.transfer_to_agent {
772 current_transfer_target = Some(target.clone());
773 }
774
775 if !event.actions.state_delta.is_empty() {
777 transfer_ctx.mutable_session().apply_state_delta(&event.actions.state_delta);
778 }
779
780 transfer_ctx.mutable_session().append_event(event.clone());
782
783 if !event.llm_response.partial {
784 if let Err(e) = session_service.append_event(ctx.session_id(), event.clone()).await {
785 if let Some(manager) = plugin_manager.as_ref() {
786 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
787 }
788 yield Err(e);
789 return;
790 }
791 }
792 yield Ok(event);
793 }
794 Err(e) => {
795 if let Some(manager) = plugin_manager.as_ref() {
796 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
797 }
798 yield Err(e);
799 return;
800 }
801 }
802 }
803 }
804
805 if let Some(ref compaction_cfg) = compaction_config {
809 let event_count = ctx.mutable_session().as_ref().events_len();
810
811 if event_count > 0 {
812 let all_events = ctx.mutable_session().as_ref().events_snapshot();
813 let invocation_count = all_events.iter().filter(|e| e.author == "user").count()
814 as u32;
815
816 if invocation_count > 0
817 && invocation_count % compaction_cfg.compaction_interval == 0
818 {
819 let overlap = compaction_cfg.overlap_size as usize;
822
823 let user_msg_indices: Vec<usize> = all_events.iter()
825 .enumerate()
826 .filter(|(_, e)| e.author == "user")
827 .map(|(i, _)| i)
828 .collect();
829
830 let compact_up_to = if overlap == 0 {
833 all_events.len()
834 } else if user_msg_indices.len() > overlap {
835 user_msg_indices[user_msg_indices.len() - overlap]
837 } else {
838 0
840 };
841
842 if compact_up_to > 0 {
843 let events_to_compact = &all_events[..compact_up_to];
844
845 match compaction_cfg.summarizer.summarize_events(events_to_compact).await {
846 Ok(Some(compaction_event)) => {
847 if let Err(e) = session_service.append_event(
849 ctx.session_id(),
850 compaction_event.clone(),
851 ).await {
852 tracing::warn!(error = %e, "Failed to persist compaction event");
853 } else {
854 tracing::info!(
855 compacted_events = compact_up_to,
856 "Context compaction completed"
857 );
858 }
859 }
860 Ok(None) => {
861 tracing::debug!("Compaction summarizer returned no result");
862 }
863 Err(e) => {
864 tracing::warn!(error = %e, "Context compaction failed");
866 }
867 }
868 }
869 }
870 }
871 }
872
873 if let Some(manager) = plugin_manager.as_ref() {
874 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
875 }
876 };
877
878 Ok(Box::pin(s))
879 }
880
881 pub async fn run_str(
891 &self,
892 user_id: &str,
893 session_id: &str,
894 user_content: Content,
895 ) -> Result<EventStream> {
896 let user_id = UserId::try_from(user_id)?;
897 let session_id = SessionId::try_from(session_id)?;
898 self.run(user_id, session_id, user_content).await
899 }
900
901 pub fn interrupt(&self, session_id: &str) -> bool {
926 let sessions = self.active_sessions.lock().unwrap();
927 if let Some(token) = sessions.get(session_id) {
928 tracing::info!(session.id = session_id, "interrupting running agent");
929 token.cancel();
930 true
931 } else {
932 tracing::debug!(session.id = session_id, "no active run to interrupt");
933 false
934 }
935 }
936
937 pub fn active_session_ids(&self) -> Vec<String> {
939 let sessions = self.active_sessions.lock().unwrap();
940 sessions.keys().cloned().collect()
941 }
942
943 pub fn find_agent_to_run(
945 root_agent: &Arc<dyn Agent>,
946 session: &dyn adk_session::Session,
947 ) -> Arc<dyn Agent> {
948 let events = session.events();
950 for i in (0..events.len()).rev() {
951 if let Some(event) = events.at(i) {
952 if let Some(target_name) = &event.actions.transfer_to_agent {
954 if let Some(agent) = Self::find_agent(root_agent, target_name) {
955 return agent;
956 }
957 }
958
959 if event.author == "user" {
960 continue;
961 }
962
963 if let Some(agent) = Self::find_agent(root_agent, &event.author) {
965 if Self::is_transferable(root_agent, &agent) {
967 return agent;
968 }
969 }
970 }
971 }
972
973 root_agent.clone()
975 }
976
977 fn is_transferable(_root_agent: &Arc<dyn Agent>, _agent: &Arc<dyn Agent>) -> bool {
987 true
988 }
989
990 pub fn find_agent(current: &Arc<dyn Agent>, target_name: &str) -> Option<Arc<dyn Agent>> {
992 if current.name() == target_name {
993 return Some(current.clone());
994 }
995
996 for sub_agent in current.sub_agents() {
997 if let Some(found) = Self::find_agent(sub_agent, target_name) {
998 return Some(found);
999 }
1000 }
1001
1002 None
1003 }
1004
1005 pub fn compute_transfer_context(
1011 root: &Arc<dyn Agent>,
1012 target_name: &str,
1013 ) -> (Option<String>, Vec<String>) {
1014 if root.name() == target_name {
1016 return (None, Vec::new());
1017 }
1018
1019 fn find_parent(current: &Arc<dyn Agent>, target: &str) -> Option<Arc<dyn Agent>> {
1021 for sub in current.sub_agents() {
1022 if sub.name() == target {
1023 return Some(current.clone());
1024 }
1025 if let Some(found) = find_parent(sub, target) {
1026 return Some(found);
1027 }
1028 }
1029 None
1030 }
1031
1032 match find_parent(root, target_name) {
1033 Some(parent) => {
1034 let parent_name = parent.name().to_string();
1035 let peers: Vec<String> = parent
1036 .sub_agents()
1037 .iter()
1038 .filter(|a| a.name() != target_name)
1039 .map(|a| a.name().to_string())
1040 .collect();
1041 (Some(parent_name), peers)
1042 }
1043 None => (None, Vec::new()),
1044 }
1045 }
1046}