Skip to main content

adk_runner/
runner.rs

1use crate::InvocationContext;
2use crate::cache::CacheManager;
3use adk_artifact::ArtifactService;
4use adk_core::{
5    Agent, CacheCapable, Content, ContextCacheConfig, EventStream, Memory, ReadonlyContext, Result,
6    RunConfig,
7};
8use adk_plugin::PluginManager;
9use adk_session::SessionService;
10use adk_skill::{SkillInjector, SkillInjectorConfig};
11use async_stream::stream;
12use std::sync::Arc;
13use tokio_util::sync::CancellationToken;
14use tracing::Instrument;
15
16pub struct RunnerConfig {
17    pub app_name: String,
18    pub agent: Arc<dyn Agent>,
19    pub session_service: Arc<dyn SessionService>,
20    pub artifact_service: Option<Arc<dyn ArtifactService>>,
21    pub memory_service: Option<Arc<dyn Memory>>,
22    pub plugin_manager: Option<Arc<PluginManager>>,
23    /// Optional run configuration (streaming mode, etc.)
24    /// If not provided, uses default (SSE streaming)
25    #[allow(dead_code)]
26    pub run_config: Option<RunConfig>,
27    /// Optional context compaction configuration.
28    /// When set, the runner will periodically summarize older events
29    /// to reduce context size sent to the LLM.
30    pub compaction_config: Option<adk_core::EventsCompactionConfig>,
31    /// Optional context cache configuration for automatic prompt caching lifecycle.
32    /// When set alongside `cache_capable`, the runner will automatically create and
33    /// manage cached content resources for supported providers.
34    pub context_cache_config: Option<ContextCacheConfig>,
35    /// Optional cache-capable model reference for automatic cache management.
36    /// Set this to the same model used by the agent if it supports caching.
37    pub cache_capable: Option<Arc<dyn CacheCapable>>,
38    /// Optional request context from the server's auth middleware bridge.
39    /// When set, the runner passes it to `InvocationContext` so that
40    /// `user_scopes()` and `user_id()` reflect the authenticated identity.
41    pub request_context: Option<adk_core::RequestContext>,
42    /// Optional cooperative cancellation token for externally managed runs.
43    pub cancellation_token: Option<CancellationToken>,
44}
45
46pub struct Runner {
47    app_name: String,
48    root_agent: Arc<dyn Agent>,
49    session_service: Arc<dyn SessionService>,
50    artifact_service: Option<Arc<dyn ArtifactService>>,
51    memory_service: Option<Arc<dyn Memory>>,
52    plugin_manager: Option<Arc<PluginManager>>,
53    skill_injector: Option<Arc<SkillInjector>>,
54    run_config: RunConfig,
55    compaction_config: Option<adk_core::EventsCompactionConfig>,
56    context_cache_config: Option<ContextCacheConfig>,
57    cache_capable: Option<Arc<dyn CacheCapable>>,
58    cache_manager: Option<Arc<tokio::sync::Mutex<CacheManager>>>,
59    request_context: Option<adk_core::RequestContext>,
60    cancellation_token: Option<CancellationToken>,
61}
62
63impl Runner {
64    pub fn new(config: RunnerConfig) -> Result<Self> {
65        let cache_manager = config
66            .context_cache_config
67            .as_ref()
68            .map(|c| Arc::new(tokio::sync::Mutex::new(CacheManager::new(c.clone()))));
69        Ok(Self {
70            app_name: config.app_name,
71            root_agent: config.agent,
72            session_service: config.session_service,
73            artifact_service: config.artifact_service,
74            memory_service: config.memory_service,
75            plugin_manager: config.plugin_manager,
76            skill_injector: None,
77            run_config: config.run_config.unwrap_or_default(),
78            compaction_config: config.compaction_config,
79            context_cache_config: config.context_cache_config,
80            cache_capable: config.cache_capable,
81            cache_manager,
82            request_context: config.request_context,
83            cancellation_token: config.cancellation_token,
84        })
85    }
86
87    /// Enable skill injection using a pre-built injector.
88    ///
89    /// Skill injection runs before plugin `on_user_message` callbacks.
90    pub fn with_skill_injector(mut self, injector: SkillInjector) -> Self {
91        self.skill_injector = Some(Arc::new(injector));
92        self
93    }
94
95    /// Enable skill injection by auto-loading `.skills/` from the given root path.
96    pub fn with_auto_skills(
97        mut self,
98        root: impl AsRef<std::path::Path>,
99        config: SkillInjectorConfig,
100    ) -> adk_skill::SkillResult<Self> {
101        let injector = SkillInjector::from_root(root, config)?;
102        self.skill_injector = Some(Arc::new(injector));
103        Ok(self)
104    }
105
106    pub async fn run(
107        &self,
108        user_id: String,
109        session_id: String,
110        user_content: Content,
111    ) -> Result<EventStream> {
112        let app_name = self.app_name.clone();
113        let session_service = self.session_service.clone();
114        let root_agent = self.root_agent.clone();
115        let artifact_service = self.artifact_service.clone();
116        let memory_service = self.memory_service.clone();
117        let plugin_manager = self.plugin_manager.clone();
118        let skill_injector = self.skill_injector.clone();
119        let mut run_config = self.run_config.clone();
120        let compaction_config = self.compaction_config.clone();
121        let context_cache_config = self.context_cache_config.clone();
122        let cache_capable = self.cache_capable.clone();
123        let cache_manager_ref = self.cache_manager.clone();
124        let request_context = self.request_context.clone();
125        let cancellation_token = self.cancellation_token.clone();
126
127        let s = stream! {
128            // Get or create session
129            let session = match session_service
130                .get(adk_session::GetRequest {
131                    app_name: app_name.clone(),
132                    user_id: user_id.clone(),
133                    session_id: session_id.clone(),
134                    num_recent_events: None,
135                    after: None,
136                })
137                .await
138            {
139                Ok(s) => s,
140                Err(e) => {
141                    yield Err(e);
142                    return;
143                }
144            };
145
146            // Find which agent should handle this request
147            let agent_to_run = Self::find_agent_to_run(&root_agent, session.as_ref());
148
149            // Clone services for potential reuse in transfer
150            let artifact_service_clone = artifact_service.clone();
151            let memory_service_clone = memory_service.clone();
152
153            // Create invocation context with MutableSession
154            let invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
155            let mut effective_user_content = user_content.clone();
156            let mut selected_skill_name = String::new();
157            let mut selected_skill_id = String::new();
158
159            if let Some(injector) = skill_injector.as_ref() {
160                if let Some(matched) = adk_skill::apply_skill_injection(
161                    &mut effective_user_content,
162                    injector.index(),
163                    injector.policy(),
164                    injector.max_injected_chars(),
165                ) {
166                    selected_skill_name = matched.skill.name;
167                    selected_skill_id = matched.skill.id;
168                }
169            }
170
171            let mut invocation_ctx = InvocationContext::new(
172                invocation_id.clone(),
173                agent_to_run.clone(),
174                user_id.clone(),
175                app_name.clone(),
176                session_id.clone(),
177                effective_user_content.clone(),
178                Arc::from(session),
179            );
180
181            // Add optional services
182            if let Some(service) = artifact_service {
183                // Wrap service with ScopedArtifacts to bind session context
184                let scoped = adk_artifact::ScopedArtifacts::new(
185                    service,
186                    app_name.clone(),
187                    user_id.clone(),
188                    session_id.clone(),
189                );
190                invocation_ctx = invocation_ctx.with_artifacts(Arc::new(scoped));
191            }
192            if let Some(memory) = memory_service {
193                invocation_ctx = invocation_ctx.with_memory(memory);
194            }
195
196            // Apply run config (streaming mode, etc.)
197            invocation_ctx = invocation_ctx.with_run_config(run_config.clone());
198
199            // Apply request context from auth middleware bridge if present
200            if let Some(rc) = request_context.clone() {
201                invocation_ctx = invocation_ctx.with_request_context(rc);
202            }
203
204            let mut ctx = Arc::new(invocation_ctx);
205
206            if let Some(manager) = plugin_manager.as_ref() {
207                match manager
208                    .run_before_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>)
209                    .await
210                {
211                    Ok(Some(content)) => {
212                        let mut early_event = adk_core::Event::new(ctx.invocation_id());
213                        early_event.author = agent_to_run.name().to_string();
214                        early_event.llm_response.content = Some(content);
215
216                        ctx.mutable_session().append_event(early_event.clone());
217                        if let Err(e) = session_service.append_event(ctx.session_id(), early_event.clone()).await {
218                            yield Err(e);
219                            return;
220                        }
221
222                        yield Ok(early_event);
223                        manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
224                        return;
225                    }
226                    Ok(None) => {}
227                    Err(e) => {
228                        manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
229                        yield Err(e);
230                        return;
231                    }
232                }
233
234                match manager
235                    .run_on_user_message(
236                        ctx.clone() as Arc<dyn adk_core::InvocationContext>,
237                        effective_user_content.clone(),
238                    )
239                    .await
240                {
241                    Ok(Some(modified)) => {
242                        effective_user_content = modified;
243
244                        let mut refreshed_ctx = InvocationContext::with_mutable_session(
245                            ctx.invocation_id().to_string(),
246                            agent_to_run.clone(),
247                            ctx.user_id().to_string(),
248                            ctx.app_name().to_string(),
249                            ctx.session_id().to_string(),
250                            effective_user_content.clone(),
251                            ctx.mutable_session().clone(),
252                        );
253
254                        if let Some(service) = artifact_service_clone.clone() {
255                            let scoped = adk_artifact::ScopedArtifacts::new(
256                                service,
257                                ctx.app_name().to_string(),
258                                ctx.user_id().to_string(),
259                                ctx.session_id().to_string(),
260                            );
261                            refreshed_ctx = refreshed_ctx.with_artifacts(Arc::new(scoped));
262                        }
263                        if let Some(memory) = memory_service_clone.clone() {
264                            refreshed_ctx = refreshed_ctx.with_memory(memory);
265                        }
266                        refreshed_ctx = refreshed_ctx.with_run_config(run_config.clone());
267                        if let Some(rc) = request_context.clone() {
268                            refreshed_ctx = refreshed_ctx.with_request_context(rc);
269                        }
270                        ctx = Arc::new(refreshed_ctx);
271                    }
272                    Ok(None) => {}
273                    Err(e) => {
274                        if let Some(manager) = plugin_manager.as_ref() {
275                            manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
276                        }
277                        yield Err(e);
278                        return;
279                    }
280                }
281            }
282
283            // Append user message to session service (persistent storage)
284            let mut user_event = adk_core::Event::new(ctx.invocation_id());
285            user_event.author = "user".to_string();
286            user_event.llm_response.content = Some(effective_user_content.clone());
287
288            // Also add to mutable session for immediate visibility
289            // Note: adk_session::Event is a re-export of adk_core::Event, so we can use it directly
290            ctx.mutable_session().append_event(user_event.clone());
291
292            if let Err(e) = session_service.append_event(ctx.session_id(), user_event).await {
293                if let Some(manager) = plugin_manager.as_ref() {
294                    manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
295                }
296                yield Err(e);
297                return;
298            }
299
300            // ===== CONTEXT CACHE LIFECYCLE =====
301            // If context caching is configured and a cache-capable model is available,
302            // create or refresh the cached content before agent execution.
303            // Cache failures are non-fatal — log a warning and proceed without cache.
304            if let (Some(cm_mutex), Some(cache_model)) = (&cache_manager_ref, &cache_capable) {
305                let mut cm = cm_mutex.lock().await;
306                if cm.is_enabled() {
307                    if cm.active_cache_name().is_none() || cm.needs_refresh() {
308                        // Gather system instruction from the agent's description
309                        // (the full instruction is resolved inside the agent, but the
310                        // description provides a reasonable proxy for cache keying)
311                        let system_instruction = agent_to_run.description().to_string();
312                        let tools = std::collections::HashMap::new();
313                        let ttl = context_cache_config.as_ref().map_or(600, |c| c.ttl_seconds);
314
315                        match cache_model.create_cache(&system_instruction, &tools, ttl).await {
316                            Ok(name) => {
317                                // Delete old cache if refreshing
318                                if let Some(old) = cm.clear_active_cache() {
319                                    if let Err(e) = cache_model.delete_cache(&old).await {
320                                        tracing::warn!(
321                                            old_cache = %old,
322                                            error = %e,
323                                            "failed to delete old cache, proceeding with new cache"
324                                        );
325                                    }
326                                }
327                                cm.set_active_cache(name);
328                            }
329                            Err(e) => {
330                                tracing::warn!(
331                                    error = %e,
332                                    "cache creation failed, proceeding without cache"
333                                );
334                            }
335                        }
336                    }
337
338                    // Attach cache name to run config so agents can use it
339                    if let Some(cache_name) = cm.record_invocation() {
340                        run_config.cached_content = Some(cache_name.to_string());
341                        // Rebuild the invocation context with the updated run config
342                        let mut refreshed_ctx = InvocationContext::with_mutable_session(
343                            ctx.invocation_id().to_string(),
344                            agent_to_run.clone(),
345                            ctx.user_id().to_string(),
346                            ctx.app_name().to_string(),
347                            ctx.session_id().to_string(),
348                            effective_user_content.clone(),
349                            ctx.mutable_session().clone(),
350                        );
351                        if let Some(service) = artifact_service_clone.clone() {
352                            let scoped = adk_artifact::ScopedArtifacts::new(
353                                service,
354                                ctx.app_name().to_string(),
355                                ctx.user_id().to_string(),
356                                ctx.session_id().to_string(),
357                            );
358                            refreshed_ctx = refreshed_ctx.with_artifacts(Arc::new(scoped));
359                        }
360                        if let Some(memory) = memory_service_clone.clone() {
361                            refreshed_ctx = refreshed_ctx.with_memory(memory);
362                        }
363                        refreshed_ctx = refreshed_ctx.with_run_config(run_config.clone());
364                        if let Some(rc) = request_context.clone() {
365                            refreshed_ctx = refreshed_ctx.with_request_context(rc);
366                        }
367                        ctx = Arc::new(refreshed_ctx);
368                    }
369                }
370            }
371
372            // Run the agent with instrumentation (ADK-Go style attributes)
373            let agent_span = tracing::info_span!(
374                "agent.execute",
375                "gcp.vertex.agent.invocation_id" = ctx.invocation_id(),
376                "gcp.vertex.agent.session_id" = ctx.session_id(),
377                "gcp.vertex.agent.event_id" = ctx.invocation_id(), // Use invocation_id as event_id for agent spans
378                "gen_ai.conversation.id" = ctx.session_id(),
379                "adk.app_name" = ctx.app_name(),
380                "adk.user_id" = ctx.user_id(),
381                "agent.name" = %agent_to_run.name(),
382                "adk.skills.selected_name" = %selected_skill_name,
383                "adk.skills.selected_id" = %selected_skill_id
384            );
385
386            let mut agent_stream = match agent_to_run.run(ctx.clone()).instrument(agent_span).await {
387                Ok(s) => s,
388                Err(e) => {
389                    if let Some(manager) = plugin_manager.as_ref() {
390                        manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
391                    }
392                    yield Err(e);
393                    return;
394                }
395            };
396
397            // Stream events and check for transfers
398            use futures::StreamExt;
399            let mut transfer_target: Option<String> = None;
400
401            while let Some(result) = {
402                if let Some(token) = cancellation_token.as_ref() {
403                    if token.is_cancelled() {
404                        if let Some(manager) = plugin_manager.as_ref() {
405                            manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
406                        }
407                        return;
408                    }
409                }
410                agent_stream.next().await
411            } {
412                match result {
413                    Ok(event) => {
414                        let mut event = event;
415
416                        if let Some(manager) = plugin_manager.as_ref() {
417                            match manager
418                                .run_on_event(
419                                    ctx.clone() as Arc<dyn adk_core::InvocationContext>,
420                                    event.clone(),
421                                )
422                                .await
423                            {
424                                Ok(Some(modified)) => {
425                                    event = modified;
426                                }
427                                Ok(None) => {}
428                                Err(e) => {
429                                    manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
430                                    yield Err(e);
431                                    return;
432                                }
433                            }
434                        }
435
436                        // Check for transfer action
437                        if let Some(target) = &event.actions.transfer_to_agent {
438                            transfer_target = Some(target.clone());
439                        }
440
441                        // CRITICAL: Apply state_delta to the mutable session immediately.
442                        // This is the key fix for state propagation between sequential agents.
443                        // When an agent sets output_key, it emits an event with state_delta.
444                        // We must apply this to the mutable session so downstream agents
445                        // can read the value via ctx.session().state().get().
446                        if !event.actions.state_delta.is_empty() {
447                            ctx.mutable_session().apply_state_delta(&event.actions.state_delta);
448                        }
449
450                        // Also add the event to the mutable session's event list
451                        ctx.mutable_session().append_event(event.clone());
452
453                        // Append event to session service (persistent storage)
454                        // Skip partial streaming chunks — only persist the final
455                        // event. Streaming chunks share the same event ID, so
456                        // persisting each one would violate the primary key
457                        // constraint. The final chunk (partial=false) carries the
458                        // complete accumulated content.
459                        if !event.llm_response.partial {
460                            if let Err(e) = session_service.append_event(ctx.session_id(), event.clone()).await {
461                                if let Some(manager) = plugin_manager.as_ref() {
462                                    manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
463                                }
464                                yield Err(e);
465                                return;
466                            }
467                        }
468                        yield Ok(event);
469                    }
470                    Err(e) => {
471                        if let Some(manager) = plugin_manager.as_ref() {
472                            manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
473                        }
474                        yield Err(e);
475                        return;
476                    }
477                }
478            }
479
480            // ===== TRANSFER LOOP =====
481            // Support multi-hop transfers with a max-depth guard.
482            // When an agent emits transfer_to_agent, the runner resolves the
483            // target from the root agent tree, computes transfer_targets
484            // (parent + peers) for the new agent, and runs it. This repeats
485            // until no further transfer is requested or the depth limit is hit.
486            const MAX_TRANSFER_DEPTH: u32 = 10;
487            let mut transfer_depth: u32 = 0;
488            let mut current_transfer_target = transfer_target;
489
490            while let Some(target_name) = current_transfer_target.take() {
491                transfer_depth += 1;
492                if transfer_depth > MAX_TRANSFER_DEPTH {
493                    tracing::warn!(
494                        depth = transfer_depth,
495                        target = %target_name,
496                        "max transfer depth exceeded, stopping transfer chain"
497                    );
498                    break;
499                }
500
501                let target_agent = match Self::find_agent(&root_agent, &target_name) {
502                    Some(a) => a,
503                    None => {
504                        tracing::warn!(target = %target_name, "transfer target not found in agent tree");
505                        break;
506                    }
507                };
508
509                // Compute transfer_targets for the target agent:
510                // - parent: the agent that transferred to it (or root if applicable)
511                // - peers: siblings in the agent tree
512                // - children: handled by the agent itself via sub_agents()
513                let (parent_name, peer_names) = Self::compute_transfer_context(&root_agent, &target_name);
514
515                let mut transfer_run_config = run_config.clone();
516                let mut targets = Vec::new();
517                if let Some(ref parent) = parent_name {
518                    targets.push(parent.clone());
519                }
520                targets.extend(peer_names);
521                transfer_run_config.transfer_targets = targets;
522                transfer_run_config.parent_agent = parent_name;
523
524                // For transfers, we reuse the same mutable session to preserve state
525                let transfer_invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
526                let mut transfer_ctx = InvocationContext::with_mutable_session(
527                    transfer_invocation_id.clone(),
528                    target_agent.clone(),
529                    ctx.user_id().to_string(),
530                    ctx.app_name().to_string(),
531                    ctx.session_id().to_string(),
532                    effective_user_content.clone(),
533                    ctx.mutable_session().clone(),
534                );
535
536                if let Some(ref service) = artifact_service_clone {
537                    let scoped = adk_artifact::ScopedArtifacts::new(
538                        service.clone(),
539                        ctx.app_name().to_string(),
540                        ctx.user_id().to_string(),
541                        ctx.session_id().to_string(),
542                    );
543                    transfer_ctx = transfer_ctx.with_artifacts(Arc::new(scoped));
544                }
545                if let Some(ref memory) = memory_service_clone {
546                    transfer_ctx = transfer_ctx.with_memory(memory.clone());
547                }
548                transfer_ctx = transfer_ctx.with_run_config(transfer_run_config);
549                if let Some(rc) = request_context.clone() {
550                    transfer_ctx = transfer_ctx.with_request_context(rc);
551                }
552
553                let transfer_ctx = Arc::new(transfer_ctx);
554
555                // Run the transferred agent
556                let mut transfer_stream = match target_agent.run(transfer_ctx.clone()).await {
557                    Ok(s) => s,
558                    Err(e) => {
559                        if let Some(manager) = plugin_manager.as_ref() {
560                            manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
561                        }
562                        yield Err(e);
563                        return;
564                    }
565                };
566
567                // Stream events from the transferred agent, capturing any further transfer
568                while let Some(result) = {
569                    if let Some(token) = cancellation_token.as_ref() {
570                        if token.is_cancelled() {
571                            if let Some(manager) = plugin_manager.as_ref() {
572                                manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
573                            }
574                            return;
575                        }
576                    }
577                    transfer_stream.next().await
578                } {
579                    match result {
580                        Ok(event) => {
581                            let mut event = event;
582                            if let Some(manager) = plugin_manager.as_ref() {
583                                match manager
584                                    .run_on_event(
585                                        transfer_ctx.clone() as Arc<dyn adk_core::InvocationContext>,
586                                        event.clone(),
587                                    )
588                                    .await
589                                {
590                                    Ok(Some(modified)) => {
591                                        event = modified;
592                                    }
593                                    Ok(None) => {}
594                                    Err(e) => {
595                                        manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
596                                        yield Err(e);
597                                        return;
598                                    }
599                                }
600                            }
601
602                            // Capture further transfer requests
603                            if let Some(target) = &event.actions.transfer_to_agent {
604                                current_transfer_target = Some(target.clone());
605                            }
606
607                            // Apply state delta for transferred agent too
608                            if !event.actions.state_delta.is_empty() {
609                                transfer_ctx.mutable_session().apply_state_delta(&event.actions.state_delta);
610                            }
611
612                            // Add to mutable session
613                            transfer_ctx.mutable_session().append_event(event.clone());
614
615                            if !event.llm_response.partial {
616                                if let Err(e) = session_service.append_event(ctx.session_id(), event.clone()).await {
617                                    if let Some(manager) = plugin_manager.as_ref() {
618                                        manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
619                                    }
620                                    yield Err(e);
621                                    return;
622                                }
623                            }
624                            yield Ok(event);
625                        }
626                        Err(e) => {
627                            if let Some(manager) = plugin_manager.as_ref() {
628                                manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
629                            }
630                            yield Err(e);
631                            return;
632                        }
633                    }
634                }
635            }
636
637            // ===== CONTEXT COMPACTION =====
638            // After all events have been processed, check if compaction should trigger.
639            // This runs in the background after the invocation completes.
640            if let Some(ref compaction_cfg) = compaction_config {
641                // Count invocations by counting user events in the session
642                let all_events = ctx.mutable_session().as_ref().events_snapshot();
643                let invocation_count = all_events.iter()
644                    .filter(|e| e.author == "user")
645                    .count() as u32;
646
647                if invocation_count > 0 && invocation_count % compaction_cfg.compaction_interval == 0 {
648                    // Determine the window of events to compact
649                    // We compact all events except the most recent overlap_size invocations
650                    let overlap = compaction_cfg.overlap_size as usize;
651
652                    // Find the boundary: keep the last `overlap` user messages and everything after
653                    let user_msg_indices: Vec<usize> = all_events.iter()
654                        .enumerate()
655                        .filter(|(_, e)| e.author == "user")
656                        .map(|(i, _)| i)
657                        .collect();
658
659                    // Keep the last `overlap` user messages intact.
660                    // When overlap is 0, compact everything.
661                    let compact_up_to = if overlap == 0 {
662                        all_events.len()
663                    } else if user_msg_indices.len() > overlap {
664                        // Compact up to (but not including) the overlap-th-from-last user message
665                        user_msg_indices[user_msg_indices.len() - overlap]
666                    } else {
667                        // Not enough user messages to satisfy overlap — skip compaction
668                        0
669                    };
670
671                    if compact_up_to > 0 {
672                        let events_to_compact = &all_events[..compact_up_to];
673
674                        match compaction_cfg.summarizer.summarize_events(events_to_compact).await {
675                            Ok(Some(compaction_event)) => {
676                                // Persist the compaction event
677                                if let Err(e) = session_service.append_event(
678                                    ctx.session_id(),
679                                    compaction_event.clone(),
680                                ).await {
681                                    tracing::warn!(error = %e, "Failed to persist compaction event");
682                                } else {
683                                    tracing::info!(
684                                        compacted_events = compact_up_to,
685                                        "Context compaction completed"
686                                    );
687                                }
688                            }
689                            Ok(None) => {
690                                tracing::debug!("Compaction summarizer returned no result");
691                            }
692                            Err(e) => {
693                                // Compaction failure is non-fatal — log and continue
694                                tracing::warn!(error = %e, "Context compaction failed");
695                            }
696                        }
697                    }
698                }
699            }
700
701            if let Some(manager) = plugin_manager.as_ref() {
702                manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
703            }
704        };
705
706        Ok(Box::pin(s))
707    }
708
709    /// Find which agent should handle the request based on session history
710    pub fn find_agent_to_run(
711        root_agent: &Arc<dyn Agent>,
712        session: &dyn adk_session::Session,
713    ) -> Arc<dyn Agent> {
714        // Look at recent events to find last agent that responded
715        let events = session.events();
716        for i in (0..events.len()).rev() {
717            if let Some(event) = events.at(i) {
718                // Check for explicit transfer
719                if let Some(target_name) = &event.actions.transfer_to_agent {
720                    if let Some(agent) = Self::find_agent(root_agent, target_name) {
721                        return agent;
722                    }
723                }
724
725                if event.author == "user" {
726                    continue;
727                }
728
729                // Try to find this agent in the tree
730                if let Some(agent) = Self::find_agent(root_agent, &event.author) {
731                    // Check if agent allows transfer up the tree
732                    if Self::is_transferable(root_agent, &agent) {
733                        return agent;
734                    }
735                }
736            }
737        }
738
739        // Default to root agent
740        root_agent.clone()
741    }
742
743    /// Check if agent and its parent chain allow transfer up the tree
744    fn is_transferable(root_agent: &Arc<dyn Agent>, agent: &Arc<dyn Agent>) -> bool {
745        // For now, always allow transfer
746        // TODO: Check DisallowTransferToParent flag when LlmAgent supports it
747        let _ = (root_agent, agent);
748        true
749    }
750
751    /// Recursively search agent tree for agent with given name
752    pub fn find_agent(current: &Arc<dyn Agent>, target_name: &str) -> Option<Arc<dyn Agent>> {
753        if current.name() == target_name {
754            return Some(current.clone());
755        }
756
757        for sub_agent in current.sub_agents() {
758            if let Some(found) = Self::find_agent(sub_agent, target_name) {
759                return Some(found);
760            }
761        }
762
763        None
764    }
765
766    /// Compute the parent name and peer names for a given agent in the tree.
767    /// Returns `(parent_name, peer_names)`.
768    ///
769    /// Walks the agent tree to find the parent of `target_name`, then collects
770    /// the parent's name and the sibling agent names (excluding the target itself).
771    pub fn compute_transfer_context(
772        root: &Arc<dyn Agent>,
773        target_name: &str,
774    ) -> (Option<String>, Vec<String>) {
775        // If the target is the root itself, there's no parent or peers
776        if root.name() == target_name {
777            return (None, Vec::new());
778        }
779
780        // BFS/DFS to find the parent of target_name
781        fn find_parent(current: &Arc<dyn Agent>, target: &str) -> Option<Arc<dyn Agent>> {
782            for sub in current.sub_agents() {
783                if sub.name() == target {
784                    return Some(current.clone());
785                }
786                if let Some(found) = find_parent(sub, target) {
787                    return Some(found);
788                }
789            }
790            None
791        }
792
793        match find_parent(root, target_name) {
794            Some(parent) => {
795                let parent_name = parent.name().to_string();
796                let peers: Vec<String> = parent
797                    .sub_agents()
798                    .iter()
799                    .filter(|a| a.name() != target_name)
800                    .map(|a| a.name().to_string())
801                    .collect();
802                (Some(parent_name), peers)
803            }
804            None => (None, Vec::new()),
805        }
806    }
807}