1use crate::InvocationContext;
2use crate::cache::CacheManager;
3#[cfg(feature = "artifacts")]
4use adk_artifact::ArtifactService;
5use adk_core::{
6 Agent, AppName, CacheCapable, Content, ContextCacheConfig, EventStream, Memory,
7 ReadonlyContext, Result, RunConfig, SessionId, UserId,
8};
9#[cfg(feature = "plugins")]
10use adk_plugin::PluginManager;
11use adk_session::SessionService;
12#[cfg(feature = "skills")]
13use adk_skill::{SkillInjector, SkillInjectorConfig};
14use async_stream::stream;
15use std::sync::Arc;
16use tokio_util::sync::CancellationToken;
17use tracing::Instrument;
18
19pub struct RunnerConfig {
20 pub app_name: String,
21 pub agent: Arc<dyn Agent>,
22 pub session_service: Arc<dyn SessionService>,
23 #[cfg(feature = "artifacts")]
24 pub artifact_service: Option<Arc<dyn ArtifactService>>,
25 pub memory_service: Option<Arc<dyn Memory>>,
26 #[cfg(feature = "plugins")]
27 pub plugin_manager: Option<Arc<PluginManager>>,
28 #[allow(dead_code)]
31 pub run_config: Option<RunConfig>,
32 pub compaction_config: Option<adk_core::EventsCompactionConfig>,
36 pub context_cache_config: Option<ContextCacheConfig>,
44 pub cache_capable: Option<Arc<dyn CacheCapable>>,
47 pub request_context: Option<adk_core::RequestContext>,
51 pub cancellation_token: Option<CancellationToken>,
53 pub intra_compaction_config: Option<adk_core::IntraCompactionConfig>,
57 pub intra_compaction_summarizer: Option<Arc<dyn adk_core::BaseEventsSummarizer>>,
60}
61
62pub struct Runner {
63 app_name: String,
64 root_agent: Arc<dyn Agent>,
65 session_service: Arc<dyn SessionService>,
66 #[cfg(feature = "artifacts")]
67 artifact_service: Option<Arc<dyn ArtifactService>>,
68 memory_service: Option<Arc<dyn Memory>>,
69 #[cfg(feature = "plugins")]
70 plugin_manager: Option<Arc<PluginManager>>,
71 #[cfg(feature = "skills")]
72 skill_injector: Option<Arc<SkillInjector>>,
73 run_config: RunConfig,
74 compaction_config: Option<adk_core::EventsCompactionConfig>,
75 context_cache_config: Option<ContextCacheConfig>,
76 cache_capable: Option<Arc<dyn CacheCapable>>,
77 cache_manager: Option<Arc<tokio::sync::Mutex<CacheManager>>>,
78 request_context: Option<adk_core::RequestContext>,
79 cancellation_token: Option<CancellationToken>,
80 intra_compactor: Option<Arc<crate::intra_compaction::IntraInvocationCompactor>>,
81 active_sessions: Arc<std::sync::Mutex<std::collections::HashMap<String, CancellationToken>>>,
84}
85
86impl Runner {
87 pub fn builder() -> crate::builder::RunnerConfigBuilder<
103 crate::builder::NoAppName,
104 crate::builder::NoAgent,
105 crate::builder::NoSessionService,
106 > {
107 crate::builder::RunnerConfigBuilder::new()
108 }
109
110 pub fn new(config: RunnerConfig) -> Result<Self> {
111 let run_config = config.run_config.unwrap_or_default();
112
113 let effective_cache_config = config
116 .context_cache_config
117 .or_else(|| config.cache_capable.as_ref().map(|_| ContextCacheConfig::default()));
118
119 let cache_manager = effective_cache_config
120 .as_ref()
121 .map(|c| Arc::new(tokio::sync::Mutex::new(CacheManager::new(c.clone()))));
122
123 let intra_compactor = config.intra_compaction_config.as_ref().and_then(|ic_config| {
124 config.intra_compaction_summarizer.as_ref().map(|summarizer| {
125 Arc::new(crate::intra_compaction::IntraInvocationCompactor::new(
126 ic_config.clone(),
127 summarizer.clone(),
128 ))
129 })
130 });
131
132 Ok(Self {
133 app_name: config.app_name,
134 root_agent: config.agent,
135 session_service: config.session_service,
136 #[cfg(feature = "artifacts")]
137 artifact_service: config.artifact_service,
138 memory_service: config.memory_service,
139 #[cfg(feature = "plugins")]
140 plugin_manager: config.plugin_manager,
141 #[cfg(feature = "skills")]
142 skill_injector: None,
143 run_config,
144 compaction_config: config.compaction_config,
145 context_cache_config: effective_cache_config,
146 cache_capable: config.cache_capable,
147 cache_manager,
148 request_context: config.request_context,
149 cancellation_token: config.cancellation_token,
150 intra_compactor,
151 active_sessions: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
152 })
153 }
154
155 #[cfg(feature = "skills")]
159 pub fn with_skill_injector(mut self, injector: SkillInjector) -> Self {
160 self.skill_injector = Some(Arc::new(injector));
161 self
162 }
163
164 #[cfg(feature = "skills")]
166 #[deprecated(note = "Use with_auto_skills_mut instead")]
167 pub fn with_auto_skills(
168 mut self,
169 root: impl AsRef<std::path::Path>,
170 config: SkillInjectorConfig,
171 ) -> adk_skill::SkillResult<Self> {
172 self.with_auto_skills_mut(root, config)?;
173 Ok(self)
174 }
175
176 #[cfg(feature = "skills")]
182 pub fn with_auto_skills_mut(
183 &mut self,
184 root: impl AsRef<std::path::Path>,
185 config: SkillInjectorConfig,
186 ) -> adk_skill::SkillResult<()> {
187 let injector = SkillInjector::from_root(root, config)?;
188 self.skill_injector = Some(Arc::new(injector));
189 Ok(())
190 }
191
192 pub async fn run(
193 &self,
194 user_id: UserId,
195 session_id: SessionId,
196 user_content: Content,
197 ) -> Result<EventStream> {
198 let app_name = self.app_name.clone();
199 let typed_app_name = AppName::try_from(app_name.clone())?;
200 let session_service = self.session_service.clone();
201 let root_agent = self.root_agent.clone();
202 #[cfg(feature = "artifacts")]
203 let artifact_service = self.artifact_service.clone();
204 let memory_service = self.memory_service.clone();
205 #[cfg(feature = "plugins")]
206 let plugin_manager = self.plugin_manager.clone();
207 #[cfg(feature = "skills")]
208 let skill_injector = self.skill_injector.clone();
209 let mut run_config = self.run_config.clone();
210 let compaction_config = self.compaction_config.clone();
211 let context_cache_config = self.context_cache_config.clone();
212 let cache_capable = self.cache_capable.clone();
213 let cache_manager_ref = self.cache_manager.clone();
214 let request_context = self.request_context.clone();
215 let cancellation_token = self.cancellation_token.clone();
216 let intra_compactor = self.intra_compactor.clone();
217
218 let session_token = CancellationToken::new();
222 let session_id_str = session_id.as_str().to_string();
223 {
224 let mut sessions = self.active_sessions.lock().unwrap();
225 sessions.insert(session_id_str.clone(), session_token.clone());
226 }
227 let active_sessions = self.active_sessions.clone();
228
229 let effective_token = if let Some(ref global) = cancellation_token {
231 let combined = CancellationToken::new();
232 let combined_clone = combined.clone();
233 let global_clone = global.clone();
234 let session_clone = session_token.clone();
235 let combined_for_global = combined_clone.clone();
237 tokio::spawn(async move {
238 global_clone.cancelled().await;
239 combined_for_global.cancel();
240 });
241 let combined_for_session = combined_clone;
242 tokio::spawn(async move {
243 session_clone.cancelled().await;
244 combined_for_session.cancel();
245 });
246 Some(combined)
247 } else {
248 Some(session_token.clone())
249 };
250
251 let s = stream! {
252 struct SessionCleanup {
255 active_sessions: Arc<std::sync::Mutex<std::collections::HashMap<String, CancellationToken>>>,
256 session_id: String,
257 }
258 impl Drop for SessionCleanup {
259 fn drop(&mut self) {
260 let mut sessions = self.active_sessions.lock().unwrap();
261 sessions.remove(&self.session_id);
262 }
263 }
264 let _cleanup = SessionCleanup {
265 active_sessions: active_sessions.clone(),
266 session_id: session_id_str,
267 };
268
269 let cancellation_token = effective_token;
271 let session = match session_service
273 .get(adk_session::GetRequest {
274 app_name: app_name.clone(),
275 user_id: user_id.to_string(),
276 session_id: session_id.to_string(),
277 num_recent_events: run_config.history_max_events,
278 after: None,
279 })
280 .await
281 {
282 Ok(s) => s,
283 Err(e) => {
284 yield Err(e);
285 return;
286 }
287 };
288
289 let agent_to_run = Self::find_agent_to_run(&root_agent, session.as_ref());
291
292 #[cfg(feature = "artifacts")]
294 let artifact_service_clone = artifact_service.clone();
295 let memory_service_clone = memory_service.clone();
296
297 let invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
299 #[cfg(any(feature = "skills", feature = "plugins"))]
300 let mut effective_user_content = user_content.clone();
301 #[cfg(not(any(feature = "skills", feature = "plugins")))]
302 let effective_user_content = user_content.clone();
303 #[cfg(feature = "skills")]
304 let mut selected_skill_name = String::new();
305 #[cfg(not(feature = "skills"))]
306 let selected_skill_name = String::new();
307 #[cfg(feature = "skills")]
308 let mut selected_skill_id = String::new();
309 #[cfg(not(feature = "skills"))]
310 let selected_skill_id = String::new();
311
312 #[cfg(feature = "skills")]
313 if let Some(injector) = skill_injector.as_ref() {
314 if let Some(matched) = adk_skill::apply_skill_injection(
315 &mut effective_user_content,
316 injector.index(),
317 injector.policy(),
318 injector.max_injected_chars(),
319 ) {
320 selected_skill_name = matched.skill.name;
321 selected_skill_id = matched.skill.id;
322 }
323 }
324
325 let mut invocation_ctx = match InvocationContext::new_typed(
326 invocation_id.clone(),
327 agent_to_run.clone(),
328 user_id.clone(),
329 typed_app_name.clone(),
330 session_id.clone(),
331 effective_user_content.clone(),
332 Arc::from(session),
333 ) {
334 Ok(ctx) => ctx,
335 Err(e) => {
336 yield Err(e);
337 return;
338 }
339 };
340
341 #[cfg(feature = "artifacts")]
343 if let Some(service) = artifact_service {
344 let scoped = adk_artifact::ScopedArtifacts::new(
346 service,
347 app_name.clone(),
348 user_id.to_string(),
349 session_id.to_string(),
350 );
351 invocation_ctx = invocation_ctx.with_artifacts(Arc::new(scoped));
352 }
353 if let Some(memory) = memory_service {
354 invocation_ctx = invocation_ctx.with_memory(memory);
355 }
356
357 invocation_ctx = invocation_ctx.with_run_config(run_config.clone());
359
360 if let Some(rc) = request_context.clone() {
362 invocation_ctx = invocation_ctx.with_request_context(rc);
363 }
364
365 let mut ctx = Arc::new(invocation_ctx);
366
367 #[cfg(feature = "plugins")]
368 if let Some(manager) = plugin_manager.as_ref() {
369 match manager
370 .run_before_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>)
371 .await
372 {
373 Ok(Some(content)) => {
374 let mut early_event = adk_core::Event::new(ctx.invocation_id());
375 early_event.author = agent_to_run.name().to_string();
376 early_event.llm_response.content = Some(content);
377
378 ctx.mutable_session().append_event(early_event.clone());
379 if let Err(e) = session_service.append_event(ctx.session_id(), early_event.clone()).await {
380 yield Err(e);
381 return;
382 }
383
384 yield Ok(early_event);
385 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
386 return;
387 }
388 Ok(None) => {}
389 Err(e) => {
390 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
391 yield Err(e);
392 return;
393 }
394 }
395
396 match manager
397 .run_on_user_message(
398 ctx.clone() as Arc<dyn adk_core::InvocationContext>,
399 effective_user_content.clone(),
400 )
401 .await
402 {
403 Ok(Some(modified)) => {
404 effective_user_content = modified;
405
406 let mut refreshed_ctx = match InvocationContext::with_mutable_session(
407 ctx.invocation_id().to_string(),
408 agent_to_run.clone(),
409 ctx.user_id().to_string(),
410 ctx.app_name().to_string(),
411 ctx.session_id().to_string(),
412 effective_user_content.clone(),
413 ctx.mutable_session().clone(),
414 ) {
415 Ok(ctx) => ctx,
416 Err(e) => {
417 yield Err(e);
418 return;
419 }
420 };
421
422 #[cfg(feature = "artifacts")]
423 if let Some(service) = artifact_service_clone.clone() {
424 let scoped = adk_artifact::ScopedArtifacts::new(
425 service,
426 ctx.app_name().to_string(),
427 ctx.user_id().to_string(),
428 ctx.session_id().to_string(),
429 );
430 refreshed_ctx = refreshed_ctx.with_artifacts(Arc::new(scoped));
431 }
432 if let Some(memory) = memory_service_clone.clone() {
433 refreshed_ctx = refreshed_ctx.with_memory(memory);
434 }
435 refreshed_ctx = refreshed_ctx.with_run_config(run_config.clone());
436 if let Some(rc) = request_context.clone() {
437 refreshed_ctx = refreshed_ctx.with_request_context(rc);
438 }
439 ctx = Arc::new(refreshed_ctx);
440 }
441 Ok(None) => {}
442 Err(e) => {
443 if let Some(manager) = plugin_manager.as_ref() {
444 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
445 }
446 yield Err(e);
447 return;
448 }
449 }
450 }
451
452 let mut user_event = adk_core::Event::new(ctx.invocation_id());
454 user_event.author = "user".to_string();
455 user_event.llm_response.content = Some(effective_user_content.clone());
456
457 ctx.mutable_session().append_event(user_event.clone());
460
461 if let Err(e) = session_service.append_event(ctx.session_id(), user_event).await {
462 #[cfg(feature = "plugins")]
463 if let Some(manager) = plugin_manager.as_ref() {
464 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
465 }
466 yield Err(e);
467 return;
468 }
469
470 if let (Some(cm_mutex), Some(cache_model)) = (&cache_manager_ref, &cache_capable) {
475 let should_refresh_cache = {
476 let cm = cm_mutex.lock().await;
477 cm.is_enabled() && (cm.active_cache_name().is_none() || cm.needs_refresh())
478 };
479
480 if should_refresh_cache {
481 let system_instruction = agent_to_run.description().to_string();
485 let tools = std::collections::HashMap::new();
486 let ttl = context_cache_config.as_ref().map_or(600, |c| c.ttl_seconds);
487
488 match cache_model.create_cache(&system_instruction, &tools, ttl).await {
489 Ok(name) => {
490 let old_cache = {
491 let mut cm = cm_mutex.lock().await;
492 let old = cm.clear_active_cache();
493 cm.set_active_cache(name);
494 old
495 };
496
497 if let Some(old) = old_cache {
498 if let Err(e) = cache_model.delete_cache(&old).await {
499 tracing::warn!(
500 old_cache = %old,
501 error = %e,
502 "failed to delete old cache, proceeding with new cache"
503 );
504 }
505 }
506 }
507 Err(e) => {
508 tracing::warn!(
509 error = %e,
510 "cache creation failed, proceeding without cache"
511 );
512 }
513 }
514 }
515
516 let cache_name = {
518 let mut cm = cm_mutex.lock().await;
519 if cm.is_enabled() {
520 cm.record_invocation().map(str::to_string)
521 } else {
522 None
523 }
524 };
525
526 if let Some(cache_name) = cache_name {
527 run_config.cached_content = Some(cache_name);
528 let mut refreshed_ctx = match InvocationContext::with_mutable_session(
530 ctx.invocation_id().to_string(),
531 agent_to_run.clone(),
532 ctx.user_id().to_string(),
533 ctx.app_name().to_string(),
534 ctx.session_id().to_string(),
535 effective_user_content.clone(),
536 ctx.mutable_session().clone(),
537 ) {
538 Ok(ctx) => ctx,
539 Err(e) => {
540 yield Err(e);
541 return;
542 }
543 };
544 #[cfg(feature = "artifacts")]
545 if let Some(service) = artifact_service_clone.clone() {
546 let scoped = adk_artifact::ScopedArtifacts::new(
547 service,
548 ctx.app_name().to_string(),
549 ctx.user_id().to_string(),
550 ctx.session_id().to_string(),
551 );
552 refreshed_ctx = refreshed_ctx.with_artifacts(Arc::new(scoped));
553 }
554 if let Some(memory) = memory_service_clone.clone() {
555 refreshed_ctx = refreshed_ctx.with_memory(memory);
556 }
557 refreshed_ctx = refreshed_ctx.with_run_config(run_config.clone());
558 if let Some(rc) = request_context.clone() {
559 refreshed_ctx = refreshed_ctx.with_request_context(rc);
560 }
561 ctx = Arc::new(refreshed_ctx);
562 }
563 }
564
565 if let Some(ref compactor) = intra_compactor {
569 compactor.reset_cycle();
570 let session_events = ctx.mutable_session().as_ref().events_snapshot();
571 match compactor.maybe_compact(&session_events).await {
572 Ok(Some(compacted_events)) => {
573 ctx.mutable_session().replace_events(compacted_events);
574 tracing::info!("intra-invocation compaction applied before agent execution");
575 }
576 Ok(None) => {} Err(e) => {
578 tracing::warn!(error = %e, "intra-invocation compaction check failed");
579 }
580 }
581 }
582
583 let agent_span = tracing::info_span!(
585 "agent.execute",
586 "gcp.vertex.agent.invocation_id" = ctx.invocation_id(),
587 "gcp.vertex.agent.session_id" = ctx.session_id(),
588 "gcp.vertex.agent.event_id" = ctx.invocation_id(), "gen_ai.conversation.id" = ctx.session_id(),
590 "adk.app_name" = ctx.app_name(),
591 "adk.user_id" = ctx.user_id(),
592 "agent.name" = %agent_to_run.name(),
593 "adk.skills.selected_name" = %selected_skill_name,
594 "adk.skills.selected_id" = %selected_skill_id
595 );
596
597 let mut agent_stream = match agent_to_run.run(ctx.clone()).instrument(agent_span).await {
598 Ok(s) => s,
599 Err(e) => {
600 #[cfg(feature = "plugins")]
601 if let Some(manager) = plugin_manager.as_ref() {
602 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
603 }
604 yield Err(e);
605 return;
606 }
607 };
608
609 use futures::StreamExt;
611 let mut transfer_target: Option<String> = None;
612
613 while let Some(result) = {
614 if let Some(token) = cancellation_token.as_ref() {
615 if token.is_cancelled() {
616 #[cfg(feature = "plugins")]
617 if let Some(manager) = plugin_manager.as_ref() {
618 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
619 }
620 return;
621 }
622 }
623 agent_stream.next().await
624 } {
625 match result {
626 Ok(event) => {
627 #[cfg(feature = "plugins")]
628 let mut event = event;
629
630 #[cfg(feature = "plugins")]
631 if let Some(manager) = plugin_manager.as_ref() {
632 match manager
633 .run_on_event(
634 ctx.clone() as Arc<dyn adk_core::InvocationContext>,
635 event.clone(),
636 )
637 .await
638 {
639 Ok(Some(modified)) => {
640 event = modified;
641 }
642 Ok(None) => {}
643 Err(e) => {
644 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
645 yield Err(e);
646 return;
647 }
648 }
649 }
650
651 if let Some(target) = &event.actions.transfer_to_agent {
653 transfer_target = Some(target.clone());
654 }
655
656 if !event.actions.state_delta.is_empty() {
662 ctx.mutable_session().apply_state_delta(&event.actions.state_delta);
663 }
664
665 ctx.mutable_session().append_event(event.clone());
667
668 if !event.llm_response.partial {
675 if let Err(e) = session_service.append_event(ctx.session_id(), event.clone()).await {
676 #[cfg(feature = "plugins")]
677 if let Some(manager) = plugin_manager.as_ref() {
678 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
679 }
680 yield Err(e);
681 return;
682 }
683 }
684 yield Ok(event);
685 }
686 Err(e) => {
687 #[cfg(feature = "plugins")]
688 if let Some(manager) = plugin_manager.as_ref() {
689 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
690 }
691 yield Err(e);
692 return;
693 }
694 }
695 }
696
697 const MAX_TRANSFER_DEPTH: u32 = 10;
704 let mut transfer_depth: u32 = 0;
705 let mut current_transfer_target = transfer_target;
706
707 while let Some(target_name) = current_transfer_target.take() {
708 transfer_depth += 1;
709 if transfer_depth > MAX_TRANSFER_DEPTH {
710 tracing::warn!(
711 depth = transfer_depth,
712 target = %target_name,
713 "max transfer depth exceeded, stopping transfer chain"
714 );
715 break;
716 }
717
718 let target_agent = match Self::find_agent(&root_agent, &target_name) {
719 Some(a) => a,
720 None => {
721 tracing::warn!(target = %target_name, "transfer target not found in agent tree");
722 break;
723 }
724 };
725
726 let (parent_name, peer_names) = Self::compute_transfer_context(&root_agent, &target_name);
731
732 let mut transfer_run_config = run_config.clone();
733 let mut targets = Vec::new();
734 if let Some(ref parent) = parent_name {
735 targets.push(parent.clone());
736 }
737 targets.extend(peer_names);
738 transfer_run_config.transfer_targets = targets;
739 transfer_run_config.parent_agent = parent_name;
740
741 let transfer_invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
743 let mut transfer_ctx = match InvocationContext::with_mutable_session(
744 transfer_invocation_id.clone(),
745 target_agent.clone(),
746 ctx.user_id().to_string(),
747 ctx.app_name().to_string(),
748 ctx.session_id().to_string(),
749 effective_user_content.clone(),
750 ctx.mutable_session().clone(),
751 ) {
752 Ok(ctx) => ctx,
753 Err(e) => {
754 yield Err(e);
755 return;
756 }
757 };
758
759 #[cfg(feature = "artifacts")]
760 if let Some(ref service) = artifact_service_clone {
761 let scoped = adk_artifact::ScopedArtifacts::new(
762 service.clone(),
763 ctx.app_name().to_string(),
764 ctx.user_id().to_string(),
765 ctx.session_id().to_string(),
766 );
767 transfer_ctx = transfer_ctx.with_artifacts(Arc::new(scoped));
768 }
769 if let Some(ref memory) = memory_service_clone {
770 transfer_ctx = transfer_ctx.with_memory(memory.clone());
771 }
772 transfer_ctx = transfer_ctx.with_run_config(transfer_run_config);
773 if let Some(rc) = request_context.clone() {
774 transfer_ctx = transfer_ctx.with_request_context(rc);
775 }
776
777 let transfer_ctx = Arc::new(transfer_ctx);
778
779 let mut transfer_stream = match target_agent.run(transfer_ctx.clone()).await {
781 Ok(s) => s,
782 Err(e) => {
783 #[cfg(feature = "plugins")]
784 if let Some(manager) = plugin_manager.as_ref() {
785 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
786 }
787 yield Err(e);
788 return;
789 }
790 };
791
792 while let Some(result) = {
794 if let Some(token) = cancellation_token.as_ref() {
795 if token.is_cancelled() {
796 #[cfg(feature = "plugins")]
797 if let Some(manager) = plugin_manager.as_ref() {
798 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
799 }
800 return;
801 }
802 }
803 transfer_stream.next().await
804 } {
805 match result {
806 Ok(event) => {
807 #[cfg(feature = "plugins")]
808 let mut event = event;
809 #[cfg(feature = "plugins")]
810 if let Some(manager) = plugin_manager.as_ref() {
811 match manager
812 .run_on_event(
813 transfer_ctx.clone() as Arc<dyn adk_core::InvocationContext>,
814 event.clone(),
815 )
816 .await
817 {
818 Ok(Some(modified)) => {
819 event = modified;
820 }
821 Ok(None) => {}
822 Err(e) => {
823 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
824 yield Err(e);
825 return;
826 }
827 }
828 }
829
830 if let Some(target) = &event.actions.transfer_to_agent {
832 current_transfer_target = Some(target.clone());
833 }
834
835 if !event.actions.state_delta.is_empty() {
837 transfer_ctx.mutable_session().apply_state_delta(&event.actions.state_delta);
838 }
839
840 transfer_ctx.mutable_session().append_event(event.clone());
842
843 if !event.llm_response.partial {
844 if let Err(e) = session_service.append_event(ctx.session_id(), event.clone()).await {
845 #[cfg(feature = "plugins")]
846 if let Some(manager) = plugin_manager.as_ref() {
847 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
848 }
849 yield Err(e);
850 return;
851 }
852 }
853 yield Ok(event);
854 }
855 Err(e) => {
856 #[cfg(feature = "plugins")]
857 if let Some(manager) = plugin_manager.as_ref() {
858 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
859 }
860 yield Err(e);
861 return;
862 }
863 }
864 }
865 }
866
867 if let Some(ref compaction_cfg) = compaction_config {
871 let event_count = ctx.mutable_session().as_ref().events_len();
872
873 if event_count > 0 {
874 let all_events = ctx.mutable_session().as_ref().events_snapshot();
875 let invocation_count = all_events.iter().filter(|e| e.author == "user").count()
876 as u32;
877
878 if invocation_count > 0
879 && invocation_count % compaction_cfg.compaction_interval == 0
880 {
881 let overlap = compaction_cfg.overlap_size as usize;
884
885 let user_msg_indices: Vec<usize> = all_events.iter()
887 .enumerate()
888 .filter(|(_, e)| e.author == "user")
889 .map(|(i, _)| i)
890 .collect();
891
892 let compact_up_to = if overlap == 0 {
895 all_events.len()
896 } else if user_msg_indices.len() > overlap {
897 user_msg_indices[user_msg_indices.len() - overlap]
899 } else {
900 0
902 };
903
904 if compact_up_to > 0 {
905 let events_to_compact = &all_events[..compact_up_to];
906
907 match compaction_cfg.summarizer.summarize_events(events_to_compact).await {
908 Ok(Some(compaction_event)) => {
909 if let Err(e) = session_service.append_event(
911 ctx.session_id(),
912 compaction_event.clone(),
913 ).await {
914 tracing::warn!(error = %e, "Failed to persist compaction event");
915 } else {
916 tracing::info!(
917 compacted_events = compact_up_to,
918 "Context compaction completed"
919 );
920 }
921 }
922 Ok(None) => {
923 tracing::debug!("Compaction summarizer returned no result");
924 }
925 Err(e) => {
926 tracing::warn!(error = %e, "Context compaction failed");
928 }
929 }
930 }
931 }
932 }
933 }
934
935 #[cfg(feature = "plugins")]
936 if let Some(manager) = plugin_manager.as_ref() {
937 manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
938 }
939 };
940
941 Ok(Box::pin(s))
942 }
943
944 pub async fn run_str(
954 &self,
955 user_id: &str,
956 session_id: &str,
957 user_content: Content,
958 ) -> Result<EventStream> {
959 let user_id = UserId::try_from(user_id)?;
960 let session_id = SessionId::try_from(session_id)?;
961 self.run(user_id, session_id, user_content).await
962 }
963
964 pub fn interrupt(&self, session_id: &str) -> bool {
989 let sessions = self.active_sessions.lock().unwrap();
990 if let Some(token) = sessions.get(session_id) {
991 tracing::info!(session.id = session_id, "interrupting running agent");
992 token.cancel();
993 true
994 } else {
995 tracing::debug!(session.id = session_id, "no active run to interrupt");
996 false
997 }
998 }
999
1000 pub fn active_session_ids(&self) -> Vec<String> {
1002 let sessions = self.active_sessions.lock().unwrap();
1003 sessions.keys().cloned().collect()
1004 }
1005
1006 pub fn find_agent_to_run(
1008 root_agent: &Arc<dyn Agent>,
1009 session: &dyn adk_session::Session,
1010 ) -> Arc<dyn Agent> {
1011 let events = session.events();
1013 for i in (0..events.len()).rev() {
1014 if let Some(event) = events.at(i) {
1015 if let Some(target_name) = &event.actions.transfer_to_agent {
1017 if let Some(agent) = Self::find_agent(root_agent, target_name) {
1018 return agent;
1019 }
1020 }
1021
1022 if event.author == "user" {
1023 continue;
1024 }
1025
1026 if let Some(agent) = Self::find_agent(root_agent, &event.author) {
1028 if Self::is_transferable(root_agent, &agent) {
1030 return agent;
1031 }
1032 }
1033 }
1034 }
1035
1036 root_agent.clone()
1038 }
1039
1040 fn is_transferable(_root_agent: &Arc<dyn Agent>, _agent: &Arc<dyn Agent>) -> bool {
1050 true
1051 }
1052
1053 pub fn find_agent(current: &Arc<dyn Agent>, target_name: &str) -> Option<Arc<dyn Agent>> {
1055 if current.name() == target_name {
1056 return Some(current.clone());
1057 }
1058
1059 for sub_agent in current.sub_agents() {
1060 if let Some(found) = Self::find_agent(sub_agent, target_name) {
1061 return Some(found);
1062 }
1063 }
1064
1065 None
1066 }
1067
1068 pub fn compute_transfer_context(
1074 root: &Arc<dyn Agent>,
1075 target_name: &str,
1076 ) -> (Option<String>, Vec<String>) {
1077 if root.name() == target_name {
1079 return (None, Vec::new());
1080 }
1081
1082 fn find_parent(current: &Arc<dyn Agent>, target: &str) -> Option<Arc<dyn Agent>> {
1084 for sub in current.sub_agents() {
1085 if sub.name() == target {
1086 return Some(current.clone());
1087 }
1088 if let Some(found) = find_parent(sub, target) {
1089 return Some(found);
1090 }
1091 }
1092 None
1093 }
1094
1095 match find_parent(root, target_name) {
1096 Some(parent) => {
1097 let parent_name = parent.name().to_string();
1098 let peers: Vec<String> = parent
1099 .sub_agents()
1100 .iter()
1101 .filter(|a| a.name() != target_name)
1102 .map(|a| a.name().to_string())
1103 .collect();
1104 (Some(parent_name), peers)
1105 }
1106 None => (None, Vec::new()),
1107 }
1108 }
1109}