Skip to main content

adk_runner/
runner.rs

1use crate::InvocationContext;
2use crate::cache::CacheManager;
3use adk_artifact::ArtifactService;
4use adk_core::{
5    Agent, AppName, CacheCapable, Content, ContextCacheConfig, EventStream, Memory,
6    ReadonlyContext, Result, RunConfig, SessionId, UserId,
7};
8use adk_plugin::PluginManager;
9use adk_session::SessionService;
10use adk_skill::{SkillInjector, SkillInjectorConfig};
11use async_stream::stream;
12use std::sync::Arc;
13use tokio_util::sync::CancellationToken;
14use tracing::Instrument;
15
16pub struct RunnerConfig {
17    pub app_name: String,
18    pub agent: Arc<dyn Agent>,
19    pub session_service: Arc<dyn SessionService>,
20    pub artifact_service: Option<Arc<dyn ArtifactService>>,
21    pub memory_service: Option<Arc<dyn Memory>>,
22    pub plugin_manager: Option<Arc<PluginManager>>,
23    /// Optional run configuration (streaming mode, etc.)
24    /// If not provided, uses default (SSE streaming)
25    #[allow(dead_code)]
26    pub run_config: Option<RunConfig>,
27    /// Optional context compaction configuration.
28    /// When set, the runner will periodically summarize older events
29    /// to reduce context size sent to the LLM.
30    pub compaction_config: Option<adk_core::EventsCompactionConfig>,
31    /// Optional context cache configuration for automatic prompt caching lifecycle.
32    /// When set alongside `cache_capable`, the runner will automatically create and
33    /// manage cached content resources for supported providers.
34    ///
35    /// When `cache_capable` is set but this field is `None`, the runner
36    /// automatically uses [`ContextCacheConfig::default()`] (4096 min tokens,
37    /// 600s TTL, refresh every 3 invocations).
38    pub context_cache_config: Option<ContextCacheConfig>,
39    /// Optional cache-capable model reference for automatic cache management.
40    /// Set this to the same model used by the agent if it supports caching.
41    pub cache_capable: Option<Arc<dyn CacheCapable>>,
42    /// Optional request context from the server's auth middleware bridge.
43    /// When set, the runner passes it to `InvocationContext` so that
44    /// `user_scopes()` and `user_id()` reflect the authenticated identity.
45    pub request_context: Option<adk_core::RequestContext>,
46    /// Optional cooperative cancellation token for externally managed runs.
47    pub cancellation_token: Option<CancellationToken>,
48}
49
50pub struct Runner {
51    app_name: String,
52    root_agent: Arc<dyn Agent>,
53    session_service: Arc<dyn SessionService>,
54    artifact_service: Option<Arc<dyn ArtifactService>>,
55    memory_service: Option<Arc<dyn Memory>>,
56    plugin_manager: Option<Arc<PluginManager>>,
57    skill_injector: Option<Arc<SkillInjector>>,
58    run_config: RunConfig,
59    compaction_config: Option<adk_core::EventsCompactionConfig>,
60    context_cache_config: Option<ContextCacheConfig>,
61    cache_capable: Option<Arc<dyn CacheCapable>>,
62    cache_manager: Option<Arc<tokio::sync::Mutex<CacheManager>>>,
63    request_context: Option<adk_core::RequestContext>,
64    cancellation_token: Option<CancellationToken>,
65}
66
67impl Runner {
68    pub fn new(config: RunnerConfig) -> Result<Self> {
69        // When a cache-capable model is provided but no explicit cache config,
70        // use the default ContextCacheConfig to enable caching automatically.
71        let effective_cache_config = config
72            .context_cache_config
73            .or_else(|| config.cache_capable.as_ref().map(|_| ContextCacheConfig::default()));
74
75        let cache_manager = effective_cache_config
76            .as_ref()
77            .map(|c| Arc::new(tokio::sync::Mutex::new(CacheManager::new(c.clone()))));
78        Ok(Self {
79            app_name: config.app_name,
80            root_agent: config.agent,
81            session_service: config.session_service,
82            artifact_service: config.artifact_service,
83            memory_service: config.memory_service,
84            plugin_manager: config.plugin_manager,
85            skill_injector: None,
86            run_config: config.run_config.unwrap_or_default(),
87            compaction_config: config.compaction_config,
88            context_cache_config: effective_cache_config,
89            cache_capable: config.cache_capable,
90            cache_manager,
91            request_context: config.request_context,
92            cancellation_token: config.cancellation_token,
93        })
94    }
95
96    /// Enable skill injection using a pre-built injector.
97    ///
98    /// Skill injection runs before plugin `on_user_message` callbacks.
99    pub fn with_skill_injector(mut self, injector: SkillInjector) -> Self {
100        self.skill_injector = Some(Arc::new(injector));
101        self
102    }
103
104    /// Enable skill injection by auto-loading `.skills/` from the given root path.
105    pub fn with_auto_skills(
106        mut self,
107        root: impl AsRef<std::path::Path>,
108        config: SkillInjectorConfig,
109    ) -> adk_skill::SkillResult<Self> {
110        let injector = SkillInjector::from_root(root, config)?;
111        self.skill_injector = Some(Arc::new(injector));
112        Ok(self)
113    }
114
115    pub async fn run(
116        &self,
117        user_id: UserId,
118        session_id: SessionId,
119        user_content: Content,
120    ) -> Result<EventStream> {
121        let app_name = self.app_name.clone();
122        let typed_app_name = AppName::try_from(app_name.clone())?;
123        let session_service = self.session_service.clone();
124        let root_agent = self.root_agent.clone();
125        let artifact_service = self.artifact_service.clone();
126        let memory_service = self.memory_service.clone();
127        let plugin_manager = self.plugin_manager.clone();
128        let skill_injector = self.skill_injector.clone();
129        let mut run_config = self.run_config.clone();
130        let compaction_config = self.compaction_config.clone();
131        let context_cache_config = self.context_cache_config.clone();
132        let cache_capable = self.cache_capable.clone();
133        let cache_manager_ref = self.cache_manager.clone();
134        let request_context = self.request_context.clone();
135        let cancellation_token = self.cancellation_token.clone();
136
137        let s = stream! {
138            // Get or create session
139            let session = match session_service
140                .get(adk_session::GetRequest {
141                    app_name: app_name.clone(),
142                    user_id: user_id.to_string(),
143                    session_id: session_id.to_string(),
144                    num_recent_events: None,
145                    after: None,
146                })
147                .await
148            {
149                Ok(s) => s,
150                Err(e) => {
151                    yield Err(e);
152                    return;
153                }
154            };
155
156            // Find which agent should handle this request
157            let agent_to_run = Self::find_agent_to_run(&root_agent, session.as_ref());
158
159            // Clone services for potential reuse in transfer
160            let artifact_service_clone = artifact_service.clone();
161            let memory_service_clone = memory_service.clone();
162
163            // Create invocation context with MutableSession
164            let invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
165            let mut effective_user_content = user_content.clone();
166            let mut selected_skill_name = String::new();
167            let mut selected_skill_id = String::new();
168
169            if let Some(injector) = skill_injector.as_ref() {
170                if let Some(matched) = adk_skill::apply_skill_injection(
171                    &mut effective_user_content,
172                    injector.index(),
173                    injector.policy(),
174                    injector.max_injected_chars(),
175                ) {
176                    selected_skill_name = matched.skill.name;
177                    selected_skill_id = matched.skill.id;
178                }
179            }
180
181            let mut invocation_ctx = match InvocationContext::new_typed(
182                invocation_id.clone(),
183                agent_to_run.clone(),
184                user_id.clone(),
185                typed_app_name.clone(),
186                session_id.clone(),
187                effective_user_content.clone(),
188                Arc::from(session),
189            ) {
190                Ok(ctx) => ctx,
191                Err(e) => {
192                    yield Err(e);
193                    return;
194                }
195            };
196
197            // Add optional services
198            if let Some(service) = artifact_service {
199                // Wrap service with ScopedArtifacts to bind session context
200                let scoped = adk_artifact::ScopedArtifacts::new(
201                    service,
202                    app_name.clone(),
203                    user_id.to_string(),
204                    session_id.to_string(),
205                );
206                invocation_ctx = invocation_ctx.with_artifacts(Arc::new(scoped));
207            }
208            if let Some(memory) = memory_service {
209                invocation_ctx = invocation_ctx.with_memory(memory);
210            }
211
212            // Apply run config (streaming mode, etc.)
213            invocation_ctx = invocation_ctx.with_run_config(run_config.clone());
214
215            // Apply request context from auth middleware bridge if present
216            if let Some(rc) = request_context.clone() {
217                invocation_ctx = invocation_ctx.with_request_context(rc);
218            }
219
220            let mut ctx = Arc::new(invocation_ctx);
221
222            if let Some(manager) = plugin_manager.as_ref() {
223                match manager
224                    .run_before_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>)
225                    .await
226                {
227                    Ok(Some(content)) => {
228                        let mut early_event = adk_core::Event::new(ctx.invocation_id());
229                        early_event.author = agent_to_run.name().to_string();
230                        early_event.llm_response.content = Some(content);
231
232                        ctx.mutable_session().append_event(early_event.clone());
233                        if let Err(e) = session_service.append_event(ctx.session_id(), early_event.clone()).await {
234                            yield Err(e);
235                            return;
236                        }
237
238                        yield Ok(early_event);
239                        manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
240                        return;
241                    }
242                    Ok(None) => {}
243                    Err(e) => {
244                        manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
245                        yield Err(e);
246                        return;
247                    }
248                }
249
250                match manager
251                    .run_on_user_message(
252                        ctx.clone() as Arc<dyn adk_core::InvocationContext>,
253                        effective_user_content.clone(),
254                    )
255                    .await
256                {
257                    Ok(Some(modified)) => {
258                        effective_user_content = modified;
259
260                        let mut refreshed_ctx = match InvocationContext::with_mutable_session(
261                            ctx.invocation_id().to_string(),
262                            agent_to_run.clone(),
263                            ctx.user_id().to_string(),
264                            ctx.app_name().to_string(),
265                            ctx.session_id().to_string(),
266                            effective_user_content.clone(),
267                            ctx.mutable_session().clone(),
268                        ) {
269                            Ok(ctx) => ctx,
270                            Err(e) => {
271                                yield Err(e);
272                                return;
273                            }
274                        };
275
276                        if let Some(service) = artifact_service_clone.clone() {
277                            let scoped = adk_artifact::ScopedArtifacts::new(
278                                service,
279                                ctx.app_name().to_string(),
280                                ctx.user_id().to_string(),
281                                ctx.session_id().to_string(),
282                            );
283                            refreshed_ctx = refreshed_ctx.with_artifacts(Arc::new(scoped));
284                        }
285                        if let Some(memory) = memory_service_clone.clone() {
286                            refreshed_ctx = refreshed_ctx.with_memory(memory);
287                        }
288                        refreshed_ctx = refreshed_ctx.with_run_config(run_config.clone());
289                        if let Some(rc) = request_context.clone() {
290                            refreshed_ctx = refreshed_ctx.with_request_context(rc);
291                        }
292                        ctx = Arc::new(refreshed_ctx);
293                    }
294                    Ok(None) => {}
295                    Err(e) => {
296                        if let Some(manager) = plugin_manager.as_ref() {
297                            manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
298                        }
299                        yield Err(e);
300                        return;
301                    }
302                }
303            }
304
305            // Append user message to session service (persistent storage)
306            let mut user_event = adk_core::Event::new(ctx.invocation_id());
307            user_event.author = "user".to_string();
308            user_event.llm_response.content = Some(effective_user_content.clone());
309
310            // Also add to mutable session for immediate visibility
311            // Note: adk_session::Event is a re-export of adk_core::Event, so we can use it directly
312            ctx.mutable_session().append_event(user_event.clone());
313
314            if let Err(e) = session_service.append_event(ctx.session_id(), user_event).await {
315                if let Some(manager) = plugin_manager.as_ref() {
316                    manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
317                }
318                yield Err(e);
319                return;
320            }
321
322            // ===== CONTEXT CACHE LIFECYCLE =====
323            // If context caching is configured and a cache-capable model is available,
324            // create or refresh the cached content before agent execution.
325            // Cache failures are non-fatal — log a warning and proceed without cache.
326            if let (Some(cm_mutex), Some(cache_model)) = (&cache_manager_ref, &cache_capable) {
327                let mut cm = cm_mutex.lock().await;
328                if cm.is_enabled() {
329                    if cm.active_cache_name().is_none() || cm.needs_refresh() {
330                        // Gather system instruction from the agent's description
331                        // (the full instruction is resolved inside the agent, but the
332                        // description provides a reasonable proxy for cache keying)
333                        let system_instruction = agent_to_run.description().to_string();
334                        let tools = std::collections::HashMap::new();
335                        let ttl = context_cache_config.as_ref().map_or(600, |c| c.ttl_seconds);
336
337                        match cache_model.create_cache(&system_instruction, &tools, ttl).await {
338                            Ok(name) => {
339                                // Delete old cache if refreshing
340                                if let Some(old) = cm.clear_active_cache() {
341                                    if let Err(e) = cache_model.delete_cache(&old).await {
342                                        tracing::warn!(
343                                            old_cache = %old,
344                                            error = %e,
345                                            "failed to delete old cache, proceeding with new cache"
346                                        );
347                                    }
348                                }
349                                cm.set_active_cache(name);
350                            }
351                            Err(e) => {
352                                tracing::warn!(
353                                    error = %e,
354                                    "cache creation failed, proceeding without cache"
355                                );
356                            }
357                        }
358                    }
359
360                    // Attach cache name to run config so agents can use it
361                    if let Some(cache_name) = cm.record_invocation() {
362                        run_config.cached_content = Some(cache_name.to_string());
363                        // Rebuild the invocation context with the updated run config
364                        let mut refreshed_ctx = match InvocationContext::with_mutable_session(
365                            ctx.invocation_id().to_string(),
366                            agent_to_run.clone(),
367                            ctx.user_id().to_string(),
368                            ctx.app_name().to_string(),
369                            ctx.session_id().to_string(),
370                            effective_user_content.clone(),
371                            ctx.mutable_session().clone(),
372                        ) {
373                            Ok(ctx) => ctx,
374                            Err(e) => {
375                                yield Err(e);
376                                return;
377                            }
378                        };
379                        if let Some(service) = artifact_service_clone.clone() {
380                            let scoped = adk_artifact::ScopedArtifacts::new(
381                                service,
382                                ctx.app_name().to_string(),
383                                ctx.user_id().to_string(),
384                                ctx.session_id().to_string(),
385                            );
386                            refreshed_ctx = refreshed_ctx.with_artifacts(Arc::new(scoped));
387                        }
388                        if let Some(memory) = memory_service_clone.clone() {
389                            refreshed_ctx = refreshed_ctx.with_memory(memory);
390                        }
391                        refreshed_ctx = refreshed_ctx.with_run_config(run_config.clone());
392                        if let Some(rc) = request_context.clone() {
393                            refreshed_ctx = refreshed_ctx.with_request_context(rc);
394                        }
395                        ctx = Arc::new(refreshed_ctx);
396                    }
397                }
398            }
399
400            // Run the agent with instrumentation (ADK-Go style attributes)
401            let agent_span = tracing::info_span!(
402                "agent.execute",
403                "gcp.vertex.agent.invocation_id" = ctx.invocation_id(),
404                "gcp.vertex.agent.session_id" = ctx.session_id(),
405                "gcp.vertex.agent.event_id" = ctx.invocation_id(), // Use invocation_id as event_id for agent spans
406                "gen_ai.conversation.id" = ctx.session_id(),
407                "adk.app_name" = ctx.app_name(),
408                "adk.user_id" = ctx.user_id(),
409                "agent.name" = %agent_to_run.name(),
410                "adk.skills.selected_name" = %selected_skill_name,
411                "adk.skills.selected_id" = %selected_skill_id
412            );
413
414            let mut agent_stream = match agent_to_run.run(ctx.clone()).instrument(agent_span).await {
415                Ok(s) => s,
416                Err(e) => {
417                    if let Some(manager) = plugin_manager.as_ref() {
418                        manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
419                    }
420                    yield Err(e);
421                    return;
422                }
423            };
424
425            // Stream events and check for transfers
426            use futures::StreamExt;
427            let mut transfer_target: Option<String> = None;
428
429            while let Some(result) = {
430                if let Some(token) = cancellation_token.as_ref() {
431                    if token.is_cancelled() {
432                        if let Some(manager) = plugin_manager.as_ref() {
433                            manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
434                        }
435                        return;
436                    }
437                }
438                agent_stream.next().await
439            } {
440                match result {
441                    Ok(event) => {
442                        let mut event = event;
443
444                        if let Some(manager) = plugin_manager.as_ref() {
445                            match manager
446                                .run_on_event(
447                                    ctx.clone() as Arc<dyn adk_core::InvocationContext>,
448                                    event.clone(),
449                                )
450                                .await
451                            {
452                                Ok(Some(modified)) => {
453                                    event = modified;
454                                }
455                                Ok(None) => {}
456                                Err(e) => {
457                                    manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
458                                    yield Err(e);
459                                    return;
460                                }
461                            }
462                        }
463
464                        // Check for transfer action
465                        if let Some(target) = &event.actions.transfer_to_agent {
466                            transfer_target = Some(target.clone());
467                        }
468
469                        // CRITICAL: Apply state_delta to the mutable session immediately.
470                        // This is the key fix for state propagation between sequential agents.
471                        // When an agent sets output_key, it emits an event with state_delta.
472                        // We must apply this to the mutable session so downstream agents
473                        // can read the value via ctx.session().state().get().
474                        if !event.actions.state_delta.is_empty() {
475                            ctx.mutable_session().apply_state_delta(&event.actions.state_delta);
476                        }
477
478                        // Also add the event to the mutable session's event list
479                        ctx.mutable_session().append_event(event.clone());
480
481                        // Append event to session service (persistent storage)
482                        // Skip partial streaming chunks — only persist the final
483                        // event. Streaming chunks share the same event ID, so
484                        // persisting each one would violate the primary key
485                        // constraint. The final chunk (partial=false) carries the
486                        // complete accumulated content.
487                        if !event.llm_response.partial {
488                            if let Err(e) = session_service.append_event(ctx.session_id(), event.clone()).await {
489                                if let Some(manager) = plugin_manager.as_ref() {
490                                    manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
491                                }
492                                yield Err(e);
493                                return;
494                            }
495                        }
496                        yield Ok(event);
497                    }
498                    Err(e) => {
499                        if let Some(manager) = plugin_manager.as_ref() {
500                            manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
501                        }
502                        yield Err(e);
503                        return;
504                    }
505                }
506            }
507
508            // ===== TRANSFER LOOP =====
509            // Support multi-hop transfers with a max-depth guard.
510            // When an agent emits transfer_to_agent, the runner resolves the
511            // target from the root agent tree, computes transfer_targets
512            // (parent + peers) for the new agent, and runs it. This repeats
513            // until no further transfer is requested or the depth limit is hit.
514            const MAX_TRANSFER_DEPTH: u32 = 10;
515            let mut transfer_depth: u32 = 0;
516            let mut current_transfer_target = transfer_target;
517
518            while let Some(target_name) = current_transfer_target.take() {
519                transfer_depth += 1;
520                if transfer_depth > MAX_TRANSFER_DEPTH {
521                    tracing::warn!(
522                        depth = transfer_depth,
523                        target = %target_name,
524                        "max transfer depth exceeded, stopping transfer chain"
525                    );
526                    break;
527                }
528
529                let target_agent = match Self::find_agent(&root_agent, &target_name) {
530                    Some(a) => a,
531                    None => {
532                        tracing::warn!(target = %target_name, "transfer target not found in agent tree");
533                        break;
534                    }
535                };
536
537                // Compute transfer_targets for the target agent:
538                // - parent: the agent that transferred to it (or root if applicable)
539                // - peers: siblings in the agent tree
540                // - children: handled by the agent itself via sub_agents()
541                let (parent_name, peer_names) = Self::compute_transfer_context(&root_agent, &target_name);
542
543                let mut transfer_run_config = run_config.clone();
544                let mut targets = Vec::new();
545                if let Some(ref parent) = parent_name {
546                    targets.push(parent.clone());
547                }
548                targets.extend(peer_names);
549                transfer_run_config.transfer_targets = targets;
550                transfer_run_config.parent_agent = parent_name;
551
552                // For transfers, we reuse the same mutable session to preserve state
553                let transfer_invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
554                let mut transfer_ctx = match InvocationContext::with_mutable_session(
555                    transfer_invocation_id.clone(),
556                    target_agent.clone(),
557                    ctx.user_id().to_string(),
558                    ctx.app_name().to_string(),
559                    ctx.session_id().to_string(),
560                    effective_user_content.clone(),
561                    ctx.mutable_session().clone(),
562                ) {
563                    Ok(ctx) => ctx,
564                    Err(e) => {
565                        yield Err(e);
566                        return;
567                    }
568                };
569
570                if let Some(ref service) = artifact_service_clone {
571                    let scoped = adk_artifact::ScopedArtifacts::new(
572                        service.clone(),
573                        ctx.app_name().to_string(),
574                        ctx.user_id().to_string(),
575                        ctx.session_id().to_string(),
576                    );
577                    transfer_ctx = transfer_ctx.with_artifacts(Arc::new(scoped));
578                }
579                if let Some(ref memory) = memory_service_clone {
580                    transfer_ctx = transfer_ctx.with_memory(memory.clone());
581                }
582                transfer_ctx = transfer_ctx.with_run_config(transfer_run_config);
583                if let Some(rc) = request_context.clone() {
584                    transfer_ctx = transfer_ctx.with_request_context(rc);
585                }
586
587                let transfer_ctx = Arc::new(transfer_ctx);
588
589                // Run the transferred agent
590                let mut transfer_stream = match target_agent.run(transfer_ctx.clone()).await {
591                    Ok(s) => s,
592                    Err(e) => {
593                        if let Some(manager) = plugin_manager.as_ref() {
594                            manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
595                        }
596                        yield Err(e);
597                        return;
598                    }
599                };
600
601                // Stream events from the transferred agent, capturing any further transfer
602                while let Some(result) = {
603                    if let Some(token) = cancellation_token.as_ref() {
604                        if token.is_cancelled() {
605                            if let Some(manager) = plugin_manager.as_ref() {
606                                manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
607                            }
608                            return;
609                        }
610                    }
611                    transfer_stream.next().await
612                } {
613                    match result {
614                        Ok(event) => {
615                            let mut event = event;
616                            if let Some(manager) = plugin_manager.as_ref() {
617                                match manager
618                                    .run_on_event(
619                                        transfer_ctx.clone() as Arc<dyn adk_core::InvocationContext>,
620                                        event.clone(),
621                                    )
622                                    .await
623                                {
624                                    Ok(Some(modified)) => {
625                                        event = modified;
626                                    }
627                                    Ok(None) => {}
628                                    Err(e) => {
629                                        manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
630                                        yield Err(e);
631                                        return;
632                                    }
633                                }
634                            }
635
636                            // Capture further transfer requests
637                            if let Some(target) = &event.actions.transfer_to_agent {
638                                current_transfer_target = Some(target.clone());
639                            }
640
641                            // Apply state delta for transferred agent too
642                            if !event.actions.state_delta.is_empty() {
643                                transfer_ctx.mutable_session().apply_state_delta(&event.actions.state_delta);
644                            }
645
646                            // Add to mutable session
647                            transfer_ctx.mutable_session().append_event(event.clone());
648
649                            if !event.llm_response.partial {
650                                if let Err(e) = session_service.append_event(ctx.session_id(), event.clone()).await {
651                                    if let Some(manager) = plugin_manager.as_ref() {
652                                        manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
653                                    }
654                                    yield Err(e);
655                                    return;
656                                }
657                            }
658                            yield Ok(event);
659                        }
660                        Err(e) => {
661                            if let Some(manager) = plugin_manager.as_ref() {
662                                manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
663                            }
664                            yield Err(e);
665                            return;
666                        }
667                    }
668                }
669            }
670
671            // ===== CONTEXT COMPACTION =====
672            // After all events have been processed, check if compaction should trigger.
673            // This runs in the background after the invocation completes.
674            if let Some(ref compaction_cfg) = compaction_config {
675                let event_count = ctx.mutable_session().as_ref().events_len();
676
677                if event_count > 0 {
678                    let all_events = ctx.mutable_session().as_ref().events_snapshot();
679                    let invocation_count = all_events.iter().filter(|e| e.author == "user").count()
680                        as u32;
681
682                    if invocation_count > 0
683                        && invocation_count % compaction_cfg.compaction_interval == 0
684                    {
685                        // Determine the window of events to compact
686                        // We compact all events except the most recent overlap_size invocations
687                        let overlap = compaction_cfg.overlap_size as usize;
688
689                        // Find the boundary: keep the last `overlap` user messages and everything after
690                        let user_msg_indices: Vec<usize> = all_events.iter()
691                            .enumerate()
692                            .filter(|(_, e)| e.author == "user")
693                            .map(|(i, _)| i)
694                            .collect();
695
696                        // Keep the last `overlap` user messages intact.
697                        // When overlap is 0, compact everything.
698                        let compact_up_to = if overlap == 0 {
699                            all_events.len()
700                        } else if user_msg_indices.len() > overlap {
701                            // Compact up to (but not including) the overlap-th-from-last user message
702                            user_msg_indices[user_msg_indices.len() - overlap]
703                        } else {
704                            // Not enough user messages to satisfy overlap — skip compaction
705                            0
706                        };
707
708                        if compact_up_to > 0 {
709                            let events_to_compact = &all_events[..compact_up_to];
710
711                            match compaction_cfg.summarizer.summarize_events(events_to_compact).await {
712                                Ok(Some(compaction_event)) => {
713                                    // Persist the compaction event
714                                    if let Err(e) = session_service.append_event(
715                                        ctx.session_id(),
716                                        compaction_event.clone(),
717                                    ).await {
718                                        tracing::warn!(error = %e, "Failed to persist compaction event");
719                                    } else {
720                                        tracing::info!(
721                                            compacted_events = compact_up_to,
722                                            "Context compaction completed"
723                                        );
724                                    }
725                                }
726                                Ok(None) => {
727                                    tracing::debug!("Compaction summarizer returned no result");
728                                }
729                                Err(e) => {
730                                    // Compaction failure is non-fatal — log and continue
731                                    tracing::warn!(error = %e, "Context compaction failed");
732                                }
733                            }
734                        }
735                    }
736                }
737            }
738
739            if let Some(manager) = plugin_manager.as_ref() {
740                manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
741            }
742        };
743
744        Ok(Box::pin(s))
745    }
746
747    /// Find which agent should handle the request based on session history
748    pub fn find_agent_to_run(
749        root_agent: &Arc<dyn Agent>,
750        session: &dyn adk_session::Session,
751    ) -> Arc<dyn Agent> {
752        // Look at recent events to find last agent that responded
753        let events = session.events();
754        for i in (0..events.len()).rev() {
755            if let Some(event) = events.at(i) {
756                // Check for explicit transfer
757                if let Some(target_name) = &event.actions.transfer_to_agent {
758                    if let Some(agent) = Self::find_agent(root_agent, target_name) {
759                        return agent;
760                    }
761                }
762
763                if event.author == "user" {
764                    continue;
765                }
766
767                // Try to find this agent in the tree
768                if let Some(agent) = Self::find_agent(root_agent, &event.author) {
769                    // Check if agent allows transfer up the tree
770                    if Self::is_transferable(root_agent, &agent) {
771                        return agent;
772                    }
773                }
774            }
775        }
776
777        // Default to root agent
778        root_agent.clone()
779    }
780
781    /// Check if an agent found in session history can be resumed for the next
782    /// user message.
783    ///
784    /// This always returns `true` because the transfer-policy enforcement
785    /// (`disallow_transfer_to_parent` / `disallow_transfer_to_peers`) is
786    /// handled inside `LlmAgent::run()` when it builds the `transfer_to_agent`
787    /// tool's valid-target list. The runner does not need to duplicate that
788    /// check here — it only needs to know whether the agent is a valid
789    /// resumption target, which it always is if it exists in the tree.
790    fn is_transferable(_root_agent: &Arc<dyn Agent>, _agent: &Arc<dyn Agent>) -> bool {
791        true
792    }
793
794    /// Recursively search agent tree for agent with given name
795    pub fn find_agent(current: &Arc<dyn Agent>, target_name: &str) -> Option<Arc<dyn Agent>> {
796        if current.name() == target_name {
797            return Some(current.clone());
798        }
799
800        for sub_agent in current.sub_agents() {
801            if let Some(found) = Self::find_agent(sub_agent, target_name) {
802                return Some(found);
803            }
804        }
805
806        None
807    }
808
809    /// Compute the parent name and peer names for a given agent in the tree.
810    /// Returns `(parent_name, peer_names)`.
811    ///
812    /// Walks the agent tree to find the parent of `target_name`, then collects
813    /// the parent's name and the sibling agent names (excluding the target itself).
814    pub fn compute_transfer_context(
815        root: &Arc<dyn Agent>,
816        target_name: &str,
817    ) -> (Option<String>, Vec<String>) {
818        // If the target is the root itself, there's no parent or peers
819        if root.name() == target_name {
820            return (None, Vec::new());
821        }
822
823        // BFS/DFS to find the parent of target_name
824        fn find_parent(current: &Arc<dyn Agent>, target: &str) -> Option<Arc<dyn Agent>> {
825            for sub in current.sub_agents() {
826                if sub.name() == target {
827                    return Some(current.clone());
828                }
829                if let Some(found) = find_parent(sub, target) {
830                    return Some(found);
831                }
832            }
833            None
834        }
835
836        match find_parent(root, target_name) {
837            Some(parent) => {
838                let parent_name = parent.name().to_string();
839                let peers: Vec<String> = parent
840                    .sub_agents()
841                    .iter()
842                    .filter(|a| a.name() != target_name)
843                    .map(|a| a.name().to_string())
844                    .collect();
845                (Some(parent_name), peers)
846            }
847            None => (None, Vec::new()),
848        }
849    }
850}