Skip to main content

adk_runner/
runner.rs

1use crate::InvocationContext;
2use crate::cache::CacheManager;
3use adk_artifact::ArtifactService;
4use adk_core::{
5    Agent, AppName, CacheCapable, Content, ContextCacheConfig, EventStream, Memory,
6    ReadonlyContext, Result, RunConfig, SessionId, UserId,
7};
8use adk_plugin::PluginManager;
9use adk_session::SessionService;
10use adk_skill::{SkillInjector, SkillInjectorConfig};
11use async_stream::stream;
12use std::sync::Arc;
13use tokio_util::sync::CancellationToken;
14use tracing::Instrument;
15
16pub struct RunnerConfig {
17    pub app_name: String,
18    pub agent: Arc<dyn Agent>,
19    pub session_service: Arc<dyn SessionService>,
20    pub artifact_service: Option<Arc<dyn ArtifactService>>,
21    pub memory_service: Option<Arc<dyn Memory>>,
22    pub plugin_manager: Option<Arc<PluginManager>>,
23    /// Optional run configuration (streaming mode, etc.)
24    /// If not provided, uses default (SSE streaming)
25    #[allow(dead_code)]
26    pub run_config: Option<RunConfig>,
27    /// Optional context compaction configuration.
28    /// When set, the runner will periodically summarize older events
29    /// to reduce context size sent to the LLM.
30    pub compaction_config: Option<adk_core::EventsCompactionConfig>,
31    /// Optional context cache configuration for automatic prompt caching lifecycle.
32    /// When set alongside `cache_capable`, the runner will automatically create and
33    /// manage cached content resources for supported providers.
34    ///
35    /// When `cache_capable` is set but this field is `None`, the runner
36    /// automatically uses [`ContextCacheConfig::default()`] (4096 min tokens,
37    /// 600s TTL, refresh every 3 invocations).
38    pub context_cache_config: Option<ContextCacheConfig>,
39    /// Optional cache-capable model reference for automatic cache management.
40    /// Set this to the same model used by the agent if it supports caching.
41    pub cache_capable: Option<Arc<dyn CacheCapable>>,
42    /// Optional request context from the server's auth middleware bridge.
43    /// When set, the runner passes it to `InvocationContext` so that
44    /// `user_scopes()` and `user_id()` reflect the authenticated identity.
45    pub request_context: Option<adk_core::RequestContext>,
46    /// Optional cooperative cancellation token for externally managed runs.
47    pub cancellation_token: Option<CancellationToken>,
48}
49
50pub struct Runner {
51    app_name: String,
52    root_agent: Arc<dyn Agent>,
53    session_service: Arc<dyn SessionService>,
54    artifact_service: Option<Arc<dyn ArtifactService>>,
55    memory_service: Option<Arc<dyn Memory>>,
56    plugin_manager: Option<Arc<PluginManager>>,
57    skill_injector: Option<Arc<SkillInjector>>,
58    run_config: RunConfig,
59    compaction_config: Option<adk_core::EventsCompactionConfig>,
60    context_cache_config: Option<ContextCacheConfig>,
61    cache_capable: Option<Arc<dyn CacheCapable>>,
62    cache_manager: Option<Arc<tokio::sync::Mutex<CacheManager>>>,
63    request_context: Option<adk_core::RequestContext>,
64    cancellation_token: Option<CancellationToken>,
65}
66
67impl Runner {
68    /// Create a typestate builder for constructing a `Runner`.
69    ///
70    /// The builder enforces at compile time that the three required fields
71    /// (`app_name`, `agent`, `session_service`) are set before `build()` is
72    /// callable.
73    ///
74    /// # Example
75    ///
76    /// ```rust,ignore
77    /// let runner = Runner::builder()
78    ///     .app_name("my-app")
79    ///     .agent(agent)
80    ///     .session_service(session_service)
81    ///     .build()?;
82    /// ```
83    pub fn builder() -> crate::builder::RunnerConfigBuilder<
84        crate::builder::NoAppName,
85        crate::builder::NoAgent,
86        crate::builder::NoSessionService,
87    > {
88        crate::builder::RunnerConfigBuilder::new()
89    }
90
91    pub fn new(config: RunnerConfig) -> Result<Self> {
92        let run_config = config.run_config.unwrap_or_default();
93
94        // When a cache-capable model is provided but no explicit cache config,
95        // use the default ContextCacheConfig to enable caching automatically.
96        let effective_cache_config = config
97            .context_cache_config
98            .or_else(|| config.cache_capable.as_ref().map(|_| ContextCacheConfig::default()));
99
100        let cache_manager = effective_cache_config
101            .as_ref()
102            .map(|c| Arc::new(tokio::sync::Mutex::new(CacheManager::new(c.clone()))));
103        Ok(Self {
104            app_name: config.app_name,
105            root_agent: config.agent,
106            session_service: config.session_service,
107            artifact_service: config.artifact_service,
108            memory_service: config.memory_service,
109            plugin_manager: config.plugin_manager,
110            skill_injector: None,
111            run_config,
112            compaction_config: config.compaction_config,
113            context_cache_config: effective_cache_config,
114            cache_capable: config.cache_capable,
115            cache_manager,
116            request_context: config.request_context,
117            cancellation_token: config.cancellation_token,
118        })
119    }
120
121    /// Enable skill injection using a pre-built injector.
122    ///
123    /// Skill injection runs before plugin `on_user_message` callbacks.
124    pub fn with_skill_injector(mut self, injector: SkillInjector) -> Self {
125        self.skill_injector = Some(Arc::new(injector));
126        self
127    }
128
129    /// Enable skill injection by auto-loading `.skills/` from the given root path.
130    #[deprecated(note = "Use with_auto_skills_mut instead")]
131    pub fn with_auto_skills(
132        mut self,
133        root: impl AsRef<std::path::Path>,
134        config: SkillInjectorConfig,
135    ) -> adk_skill::SkillResult<Self> {
136        self.with_auto_skills_mut(root, config)?;
137        Ok(self)
138    }
139
140    /// Enable skill injection by auto-loading `.skills/` from the given root path.
141    ///
142    /// Unlike [`with_auto_skills`](Self::with_auto_skills), this method borrows
143    /// the Runner mutably instead of consuming it. On error, the Runner remains
144    /// valid with no skill injector configured.
145    pub fn with_auto_skills_mut(
146        &mut self,
147        root: impl AsRef<std::path::Path>,
148        config: SkillInjectorConfig,
149    ) -> adk_skill::SkillResult<()> {
150        let injector = SkillInjector::from_root(root, config)?;
151        self.skill_injector = Some(Arc::new(injector));
152        Ok(())
153    }
154
155    pub async fn run(
156        &self,
157        user_id: UserId,
158        session_id: SessionId,
159        user_content: Content,
160    ) -> Result<EventStream> {
161        let app_name = self.app_name.clone();
162        let typed_app_name = AppName::try_from(app_name.clone())?;
163        let session_service = self.session_service.clone();
164        let root_agent = self.root_agent.clone();
165        let artifact_service = self.artifact_service.clone();
166        let memory_service = self.memory_service.clone();
167        let plugin_manager = self.plugin_manager.clone();
168        let skill_injector = self.skill_injector.clone();
169        let mut run_config = self.run_config.clone();
170        let compaction_config = self.compaction_config.clone();
171        let context_cache_config = self.context_cache_config.clone();
172        let cache_capable = self.cache_capable.clone();
173        let cache_manager_ref = self.cache_manager.clone();
174        let request_context = self.request_context.clone();
175        let cancellation_token = self.cancellation_token.clone();
176
177        let s = stream! {
178            // Get or create session
179            let session = match session_service
180                .get(adk_session::GetRequest {
181                    app_name: app_name.clone(),
182                    user_id: user_id.to_string(),
183                    session_id: session_id.to_string(),
184                    num_recent_events: None,
185                    after: None,
186                })
187                .await
188            {
189                Ok(s) => s,
190                Err(e) => {
191                    yield Err(e);
192                    return;
193                }
194            };
195
196            // Find which agent should handle this request
197            let agent_to_run = Self::find_agent_to_run(&root_agent, session.as_ref());
198
199            // Clone services for potential reuse in transfer
200            let artifact_service_clone = artifact_service.clone();
201            let memory_service_clone = memory_service.clone();
202
203            // Create invocation context with MutableSession
204            let invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
205            let mut effective_user_content = user_content.clone();
206            let mut selected_skill_name = String::new();
207            let mut selected_skill_id = String::new();
208
209            if let Some(injector) = skill_injector.as_ref() {
210                if let Some(matched) = adk_skill::apply_skill_injection(
211                    &mut effective_user_content,
212                    injector.index(),
213                    injector.policy(),
214                    injector.max_injected_chars(),
215                ) {
216                    selected_skill_name = matched.skill.name;
217                    selected_skill_id = matched.skill.id;
218                }
219            }
220
221            let mut invocation_ctx = match InvocationContext::new_typed(
222                invocation_id.clone(),
223                agent_to_run.clone(),
224                user_id.clone(),
225                typed_app_name.clone(),
226                session_id.clone(),
227                effective_user_content.clone(),
228                Arc::from(session),
229            ) {
230                Ok(ctx) => ctx,
231                Err(e) => {
232                    yield Err(e);
233                    return;
234                }
235            };
236
237            // Add optional services
238            if let Some(service) = artifact_service {
239                // Wrap service with ScopedArtifacts to bind session context
240                let scoped = adk_artifact::ScopedArtifacts::new(
241                    service,
242                    app_name.clone(),
243                    user_id.to_string(),
244                    session_id.to_string(),
245                );
246                invocation_ctx = invocation_ctx.with_artifacts(Arc::new(scoped));
247            }
248            if let Some(memory) = memory_service {
249                invocation_ctx = invocation_ctx.with_memory(memory);
250            }
251
252            // Apply run config (streaming mode, etc.)
253            invocation_ctx = invocation_ctx.with_run_config(run_config.clone());
254
255            // Apply request context from auth middleware bridge if present
256            if let Some(rc) = request_context.clone() {
257                invocation_ctx = invocation_ctx.with_request_context(rc);
258            }
259
260            let mut ctx = Arc::new(invocation_ctx);
261
262            if let Some(manager) = plugin_manager.as_ref() {
263                match manager
264                    .run_before_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>)
265                    .await
266                {
267                    Ok(Some(content)) => {
268                        let mut early_event = adk_core::Event::new(ctx.invocation_id());
269                        early_event.author = agent_to_run.name().to_string();
270                        early_event.llm_response.content = Some(content);
271
272                        ctx.mutable_session().append_event(early_event.clone());
273                        if let Err(e) = session_service.append_event(ctx.session_id(), early_event.clone()).await {
274                            yield Err(e);
275                            return;
276                        }
277
278                        yield Ok(early_event);
279                        manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
280                        return;
281                    }
282                    Ok(None) => {}
283                    Err(e) => {
284                        manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
285                        yield Err(e);
286                        return;
287                    }
288                }
289
290                match manager
291                    .run_on_user_message(
292                        ctx.clone() as Arc<dyn adk_core::InvocationContext>,
293                        effective_user_content.clone(),
294                    )
295                    .await
296                {
297                    Ok(Some(modified)) => {
298                        effective_user_content = modified;
299
300                        let mut refreshed_ctx = match InvocationContext::with_mutable_session(
301                            ctx.invocation_id().to_string(),
302                            agent_to_run.clone(),
303                            ctx.user_id().to_string(),
304                            ctx.app_name().to_string(),
305                            ctx.session_id().to_string(),
306                            effective_user_content.clone(),
307                            ctx.mutable_session().clone(),
308                        ) {
309                            Ok(ctx) => ctx,
310                            Err(e) => {
311                                yield Err(e);
312                                return;
313                            }
314                        };
315
316                        if let Some(service) = artifact_service_clone.clone() {
317                            let scoped = adk_artifact::ScopedArtifacts::new(
318                                service,
319                                ctx.app_name().to_string(),
320                                ctx.user_id().to_string(),
321                                ctx.session_id().to_string(),
322                            );
323                            refreshed_ctx = refreshed_ctx.with_artifacts(Arc::new(scoped));
324                        }
325                        if let Some(memory) = memory_service_clone.clone() {
326                            refreshed_ctx = refreshed_ctx.with_memory(memory);
327                        }
328                        refreshed_ctx = refreshed_ctx.with_run_config(run_config.clone());
329                        if let Some(rc) = request_context.clone() {
330                            refreshed_ctx = refreshed_ctx.with_request_context(rc);
331                        }
332                        ctx = Arc::new(refreshed_ctx);
333                    }
334                    Ok(None) => {}
335                    Err(e) => {
336                        if let Some(manager) = plugin_manager.as_ref() {
337                            manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
338                        }
339                        yield Err(e);
340                        return;
341                    }
342                }
343            }
344
345            // Append user message to session service (persistent storage)
346            let mut user_event = adk_core::Event::new(ctx.invocation_id());
347            user_event.author = "user".to_string();
348            user_event.llm_response.content = Some(effective_user_content.clone());
349
350            // Also add to mutable session for immediate visibility
351            // Note: adk_session::Event is a re-export of adk_core::Event, so we can use it directly
352            ctx.mutable_session().append_event(user_event.clone());
353
354            if let Err(e) = session_service.append_event(ctx.session_id(), user_event).await {
355                if let Some(manager) = plugin_manager.as_ref() {
356                    manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
357                }
358                yield Err(e);
359                return;
360            }
361
362            // ===== CONTEXT CACHE LIFECYCLE =====
363            // If context caching is configured and a cache-capable model is available,
364            // create or refresh the cached content before agent execution.
365            // Cache failures are non-fatal — log a warning and proceed without cache.
366            if let (Some(cm_mutex), Some(cache_model)) = (&cache_manager_ref, &cache_capable) {
367                let mut cm = cm_mutex.lock().await;
368                if cm.is_enabled() {
369                    if cm.active_cache_name().is_none() || cm.needs_refresh() {
370                        // Gather system instruction from the agent's description
371                        // (the full instruction is resolved inside the agent, but the
372                        // description provides a reasonable proxy for cache keying)
373                        let system_instruction = agent_to_run.description().to_string();
374                        let tools = std::collections::HashMap::new();
375                        let ttl = context_cache_config.as_ref().map_or(600, |c| c.ttl_seconds);
376
377                        match cache_model.create_cache(&system_instruction, &tools, ttl).await {
378                            Ok(name) => {
379                                // Delete old cache if refreshing
380                                if let Some(old) = cm.clear_active_cache() {
381                                    if let Err(e) = cache_model.delete_cache(&old).await {
382                                        tracing::warn!(
383                                            old_cache = %old,
384                                            error = %e,
385                                            "failed to delete old cache, proceeding with new cache"
386                                        );
387                                    }
388                                }
389                                cm.set_active_cache(name);
390                            }
391                            Err(e) => {
392                                tracing::warn!(
393                                    error = %e,
394                                    "cache creation failed, proceeding without cache"
395                                );
396                            }
397                        }
398                    }
399
400                    // Attach cache name to run config so agents can use it
401                    if let Some(cache_name) = cm.record_invocation() {
402                        run_config.cached_content = Some(cache_name.to_string());
403                        // Rebuild the invocation context with the updated run config
404                        let mut refreshed_ctx = match InvocationContext::with_mutable_session(
405                            ctx.invocation_id().to_string(),
406                            agent_to_run.clone(),
407                            ctx.user_id().to_string(),
408                            ctx.app_name().to_string(),
409                            ctx.session_id().to_string(),
410                            effective_user_content.clone(),
411                            ctx.mutable_session().clone(),
412                        ) {
413                            Ok(ctx) => ctx,
414                            Err(e) => {
415                                yield Err(e);
416                                return;
417                            }
418                        };
419                        if let Some(service) = artifact_service_clone.clone() {
420                            let scoped = adk_artifact::ScopedArtifacts::new(
421                                service,
422                                ctx.app_name().to_string(),
423                                ctx.user_id().to_string(),
424                                ctx.session_id().to_string(),
425                            );
426                            refreshed_ctx = refreshed_ctx.with_artifacts(Arc::new(scoped));
427                        }
428                        if let Some(memory) = memory_service_clone.clone() {
429                            refreshed_ctx = refreshed_ctx.with_memory(memory);
430                        }
431                        refreshed_ctx = refreshed_ctx.with_run_config(run_config.clone());
432                        if let Some(rc) = request_context.clone() {
433                            refreshed_ctx = refreshed_ctx.with_request_context(rc);
434                        }
435                        ctx = Arc::new(refreshed_ctx);
436                    }
437                }
438            }
439
440            // Run the agent with instrumentation (ADK-Go style attributes)
441            let agent_span = tracing::info_span!(
442                "agent.execute",
443                "gcp.vertex.agent.invocation_id" = ctx.invocation_id(),
444                "gcp.vertex.agent.session_id" = ctx.session_id(),
445                "gcp.vertex.agent.event_id" = ctx.invocation_id(), // Use invocation_id as event_id for agent spans
446                "gen_ai.conversation.id" = ctx.session_id(),
447                "adk.app_name" = ctx.app_name(),
448                "adk.user_id" = ctx.user_id(),
449                "agent.name" = %agent_to_run.name(),
450                "adk.skills.selected_name" = %selected_skill_name,
451                "adk.skills.selected_id" = %selected_skill_id
452            );
453
454            let mut agent_stream = match agent_to_run.run(ctx.clone()).instrument(agent_span).await {
455                Ok(s) => s,
456                Err(e) => {
457                    if let Some(manager) = plugin_manager.as_ref() {
458                        manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
459                    }
460                    yield Err(e);
461                    return;
462                }
463            };
464
465            // Stream events and check for transfers
466            use futures::StreamExt;
467            let mut transfer_target: Option<String> = None;
468
469            while let Some(result) = {
470                if let Some(token) = cancellation_token.as_ref() {
471                    if token.is_cancelled() {
472                        if let Some(manager) = plugin_manager.as_ref() {
473                            manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
474                        }
475                        return;
476                    }
477                }
478                agent_stream.next().await
479            } {
480                match result {
481                    Ok(event) => {
482                        let mut event = event;
483
484                        if let Some(manager) = plugin_manager.as_ref() {
485                            match manager
486                                .run_on_event(
487                                    ctx.clone() as Arc<dyn adk_core::InvocationContext>,
488                                    event.clone(),
489                                )
490                                .await
491                            {
492                                Ok(Some(modified)) => {
493                                    event = modified;
494                                }
495                                Ok(None) => {}
496                                Err(e) => {
497                                    manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
498                                    yield Err(e);
499                                    return;
500                                }
501                            }
502                        }
503
504                        // Check for transfer action
505                        if let Some(target) = &event.actions.transfer_to_agent {
506                            transfer_target = Some(target.clone());
507                        }
508
509                        // CRITICAL: Apply state_delta to the mutable session immediately.
510                        // This is the key fix for state propagation between sequential agents.
511                        // When an agent sets output_key, it emits an event with state_delta.
512                        // We must apply this to the mutable session so downstream agents
513                        // can read the value via ctx.session().state().get().
514                        if !event.actions.state_delta.is_empty() {
515                            ctx.mutable_session().apply_state_delta(&event.actions.state_delta);
516                        }
517
518                        // Also add the event to the mutable session's event list
519                        ctx.mutable_session().append_event(event.clone());
520
521                        // Append event to session service (persistent storage)
522                        // Skip partial streaming chunks — only persist the final
523                        // event. Streaming chunks share the same event ID, so
524                        // persisting each one would violate the primary key
525                        // constraint. The final chunk (partial=false) carries the
526                        // complete accumulated content.
527                        if !event.llm_response.partial {
528                            if let Err(e) = session_service.append_event(ctx.session_id(), event.clone()).await {
529                                if let Some(manager) = plugin_manager.as_ref() {
530                                    manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
531                                }
532                                yield Err(e);
533                                return;
534                            }
535                        }
536                        yield Ok(event);
537                    }
538                    Err(e) => {
539                        if let Some(manager) = plugin_manager.as_ref() {
540                            manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
541                        }
542                        yield Err(e);
543                        return;
544                    }
545                }
546            }
547
548            // ===== TRANSFER LOOP =====
549            // Support multi-hop transfers with a max-depth guard.
550            // When an agent emits transfer_to_agent, the runner resolves the
551            // target from the root agent tree, computes transfer_targets
552            // (parent + peers) for the new agent, and runs it. This repeats
553            // until no further transfer is requested or the depth limit is hit.
554            const MAX_TRANSFER_DEPTH: u32 = 10;
555            let mut transfer_depth: u32 = 0;
556            let mut current_transfer_target = transfer_target;
557
558            while let Some(target_name) = current_transfer_target.take() {
559                transfer_depth += 1;
560                if transfer_depth > MAX_TRANSFER_DEPTH {
561                    tracing::warn!(
562                        depth = transfer_depth,
563                        target = %target_name,
564                        "max transfer depth exceeded, stopping transfer chain"
565                    );
566                    break;
567                }
568
569                let target_agent = match Self::find_agent(&root_agent, &target_name) {
570                    Some(a) => a,
571                    None => {
572                        tracing::warn!(target = %target_name, "transfer target not found in agent tree");
573                        break;
574                    }
575                };
576
577                // Compute transfer_targets for the target agent:
578                // - parent: the agent that transferred to it (or root if applicable)
579                // - peers: siblings in the agent tree
580                // - children: handled by the agent itself via sub_agents()
581                let (parent_name, peer_names) = Self::compute_transfer_context(&root_agent, &target_name);
582
583                let mut transfer_run_config = run_config.clone();
584                let mut targets = Vec::new();
585                if let Some(ref parent) = parent_name {
586                    targets.push(parent.clone());
587                }
588                targets.extend(peer_names);
589                transfer_run_config.transfer_targets = targets;
590                transfer_run_config.parent_agent = parent_name;
591
592                // For transfers, we reuse the same mutable session to preserve state
593                let transfer_invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
594                let mut transfer_ctx = match InvocationContext::with_mutable_session(
595                    transfer_invocation_id.clone(),
596                    target_agent.clone(),
597                    ctx.user_id().to_string(),
598                    ctx.app_name().to_string(),
599                    ctx.session_id().to_string(),
600                    effective_user_content.clone(),
601                    ctx.mutable_session().clone(),
602                ) {
603                    Ok(ctx) => ctx,
604                    Err(e) => {
605                        yield Err(e);
606                        return;
607                    }
608                };
609
610                if let Some(ref service) = artifact_service_clone {
611                    let scoped = adk_artifact::ScopedArtifacts::new(
612                        service.clone(),
613                        ctx.app_name().to_string(),
614                        ctx.user_id().to_string(),
615                        ctx.session_id().to_string(),
616                    );
617                    transfer_ctx = transfer_ctx.with_artifacts(Arc::new(scoped));
618                }
619                if let Some(ref memory) = memory_service_clone {
620                    transfer_ctx = transfer_ctx.with_memory(memory.clone());
621                }
622                transfer_ctx = transfer_ctx.with_run_config(transfer_run_config);
623                if let Some(rc) = request_context.clone() {
624                    transfer_ctx = transfer_ctx.with_request_context(rc);
625                }
626
627                let transfer_ctx = Arc::new(transfer_ctx);
628
629                // Run the transferred agent
630                let mut transfer_stream = match target_agent.run(transfer_ctx.clone()).await {
631                    Ok(s) => s,
632                    Err(e) => {
633                        if let Some(manager) = plugin_manager.as_ref() {
634                            manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
635                        }
636                        yield Err(e);
637                        return;
638                    }
639                };
640
641                // Stream events from the transferred agent, capturing any further transfer
642                while let Some(result) = {
643                    if let Some(token) = cancellation_token.as_ref() {
644                        if token.is_cancelled() {
645                            if let Some(manager) = plugin_manager.as_ref() {
646                                manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
647                            }
648                            return;
649                        }
650                    }
651                    transfer_stream.next().await
652                } {
653                    match result {
654                        Ok(event) => {
655                            let mut event = event;
656                            if let Some(manager) = plugin_manager.as_ref() {
657                                match manager
658                                    .run_on_event(
659                                        transfer_ctx.clone() as Arc<dyn adk_core::InvocationContext>,
660                                        event.clone(),
661                                    )
662                                    .await
663                                {
664                                    Ok(Some(modified)) => {
665                                        event = modified;
666                                    }
667                                    Ok(None) => {}
668                                    Err(e) => {
669                                        manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
670                                        yield Err(e);
671                                        return;
672                                    }
673                                }
674                            }
675
676                            // Capture further transfer requests
677                            if let Some(target) = &event.actions.transfer_to_agent {
678                                current_transfer_target = Some(target.clone());
679                            }
680
681                            // Apply state delta for transferred agent too
682                            if !event.actions.state_delta.is_empty() {
683                                transfer_ctx.mutable_session().apply_state_delta(&event.actions.state_delta);
684                            }
685
686                            // Add to mutable session
687                            transfer_ctx.mutable_session().append_event(event.clone());
688
689                            if !event.llm_response.partial {
690                                if let Err(e) = session_service.append_event(ctx.session_id(), event.clone()).await {
691                                    if let Some(manager) = plugin_manager.as_ref() {
692                                        manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
693                                    }
694                                    yield Err(e);
695                                    return;
696                                }
697                            }
698                            yield Ok(event);
699                        }
700                        Err(e) => {
701                            if let Some(manager) = plugin_manager.as_ref() {
702                                manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
703                            }
704                            yield Err(e);
705                            return;
706                        }
707                    }
708                }
709            }
710
711            // ===== CONTEXT COMPACTION =====
712            // After all events have been processed, check if compaction should trigger.
713            // This runs in the background after the invocation completes.
714            if let Some(ref compaction_cfg) = compaction_config {
715                let event_count = ctx.mutable_session().as_ref().events_len();
716
717                if event_count > 0 {
718                    let all_events = ctx.mutable_session().as_ref().events_snapshot();
719                    let invocation_count = all_events.iter().filter(|e| e.author == "user").count()
720                        as u32;
721
722                    if invocation_count > 0
723                        && invocation_count % compaction_cfg.compaction_interval == 0
724                    {
725                        // Determine the window of events to compact
726                        // We compact all events except the most recent overlap_size invocations
727                        let overlap = compaction_cfg.overlap_size as usize;
728
729                        // Find the boundary: keep the last `overlap` user messages and everything after
730                        let user_msg_indices: Vec<usize> = all_events.iter()
731                            .enumerate()
732                            .filter(|(_, e)| e.author == "user")
733                            .map(|(i, _)| i)
734                            .collect();
735
736                        // Keep the last `overlap` user messages intact.
737                        // When overlap is 0, compact everything.
738                        let compact_up_to = if overlap == 0 {
739                            all_events.len()
740                        } else if user_msg_indices.len() > overlap {
741                            // Compact up to (but not including) the overlap-th-from-last user message
742                            user_msg_indices[user_msg_indices.len() - overlap]
743                        } else {
744                            // Not enough user messages to satisfy overlap — skip compaction
745                            0
746                        };
747
748                        if compact_up_to > 0 {
749                            let events_to_compact = &all_events[..compact_up_to];
750
751                            match compaction_cfg.summarizer.summarize_events(events_to_compact).await {
752                                Ok(Some(compaction_event)) => {
753                                    // Persist the compaction event
754                                    if let Err(e) = session_service.append_event(
755                                        ctx.session_id(),
756                                        compaction_event.clone(),
757                                    ).await {
758                                        tracing::warn!(error = %e, "Failed to persist compaction event");
759                                    } else {
760                                        tracing::info!(
761                                            compacted_events = compact_up_to,
762                                            "Context compaction completed"
763                                        );
764                                    }
765                                }
766                                Ok(None) => {
767                                    tracing::debug!("Compaction summarizer returned no result");
768                                }
769                                Err(e) => {
770                                    // Compaction failure is non-fatal — log and continue
771                                    tracing::warn!(error = %e, "Context compaction failed");
772                                }
773                            }
774                        }
775                    }
776                }
777            }
778
779            if let Some(manager) = plugin_manager.as_ref() {
780                manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
781            }
782        };
783
784        Ok(Box::pin(s))
785    }
786
787    /// Convenience method that accepts string arguments.
788    ///
789    /// Converts `user_id` and `session_id` to their typed equivalents
790    /// and delegates to [`run()`](Self::run).
791    ///
792    /// # Errors
793    ///
794    /// Returns an error if either string fails identity validation
795    /// (empty, contains null bytes, or exceeds length limit).
796    pub async fn run_str(
797        &self,
798        user_id: &str,
799        session_id: &str,
800        user_content: Content,
801    ) -> Result<EventStream> {
802        let user_id = UserId::try_from(user_id)?;
803        let session_id = SessionId::try_from(session_id)?;
804        self.run(user_id, session_id, user_content).await
805    }
806
807    /// Find which agent should handle the request based on session history
808    pub fn find_agent_to_run(
809        root_agent: &Arc<dyn Agent>,
810        session: &dyn adk_session::Session,
811    ) -> Arc<dyn Agent> {
812        // Look at recent events to find last agent that responded
813        let events = session.events();
814        for i in (0..events.len()).rev() {
815            if let Some(event) = events.at(i) {
816                // Check for explicit transfer
817                if let Some(target_name) = &event.actions.transfer_to_agent {
818                    if let Some(agent) = Self::find_agent(root_agent, target_name) {
819                        return agent;
820                    }
821                }
822
823                if event.author == "user" {
824                    continue;
825                }
826
827                // Try to find this agent in the tree
828                if let Some(agent) = Self::find_agent(root_agent, &event.author) {
829                    // Check if agent allows transfer up the tree
830                    if Self::is_transferable(root_agent, &agent) {
831                        return agent;
832                    }
833                }
834            }
835        }
836
837        // Default to root agent
838        root_agent.clone()
839    }
840
841    /// Check if an agent found in session history can be resumed for the next
842    /// user message.
843    ///
844    /// This always returns `true` because the transfer-policy enforcement
845    /// (`disallow_transfer_to_parent` / `disallow_transfer_to_peers`) is
846    /// handled inside `LlmAgent::run()` when it builds the `transfer_to_agent`
847    /// tool's valid-target list. The runner does not need to duplicate that
848    /// check here — it only needs to know whether the agent is a valid
849    /// resumption target, which it always is if it exists in the tree.
850    fn is_transferable(_root_agent: &Arc<dyn Agent>, _agent: &Arc<dyn Agent>) -> bool {
851        true
852    }
853
854    /// Recursively search agent tree for agent with given name
855    pub fn find_agent(current: &Arc<dyn Agent>, target_name: &str) -> Option<Arc<dyn Agent>> {
856        if current.name() == target_name {
857            return Some(current.clone());
858        }
859
860        for sub_agent in current.sub_agents() {
861            if let Some(found) = Self::find_agent(sub_agent, target_name) {
862                return Some(found);
863            }
864        }
865
866        None
867    }
868
869    /// Compute the parent name and peer names for a given agent in the tree.
870    /// Returns `(parent_name, peer_names)`.
871    ///
872    /// Walks the agent tree to find the parent of `target_name`, then collects
873    /// the parent's name and the sibling agent names (excluding the target itself).
874    pub fn compute_transfer_context(
875        root: &Arc<dyn Agent>,
876        target_name: &str,
877    ) -> (Option<String>, Vec<String>) {
878        // If the target is the root itself, there's no parent or peers
879        if root.name() == target_name {
880            return (None, Vec::new());
881        }
882
883        // BFS/DFS to find the parent of target_name
884        fn find_parent(current: &Arc<dyn Agent>, target: &str) -> Option<Arc<dyn Agent>> {
885            for sub in current.sub_agents() {
886                if sub.name() == target {
887                    return Some(current.clone());
888                }
889                if let Some(found) = find_parent(sub, target) {
890                    return Some(found);
891                }
892            }
893            None
894        }
895
896        match find_parent(root, target_name) {
897            Some(parent) => {
898                let parent_name = parent.name().to_string();
899                let peers: Vec<String> = parent
900                    .sub_agents()
901                    .iter()
902                    .filter(|a| a.name() != target_name)
903                    .map(|a| a.name().to_string())
904                    .collect();
905                (Some(parent_name), peers)
906            }
907            None => (None, Vec::new()),
908        }
909    }
910}