Skip to main content

adk_runner/
runner.rs

1use crate::InvocationContext;
2use crate::cache::CacheManager;
3#[cfg(feature = "artifacts")]
4use adk_artifact::ArtifactService;
5use adk_core::{
6    Agent, AppName, CacheCapable, Content, ContextCacheConfig, EventStream, Memory,
7    ReadonlyContext, Result, RunConfig, SessionId, UserId,
8};
9#[cfg(feature = "plugins")]
10use adk_plugin::PluginManager;
11use adk_session::SessionService;
12#[cfg(feature = "skills")]
13use adk_skill::{SkillInjector, SkillInjectorConfig};
14use async_stream::stream;
15use std::sync::Arc;
16use tokio_util::sync::CancellationToken;
17use tracing::Instrument;
18
19/// Configuration for constructing a [`Runner`].
20///
21/// Use [`Runner::builder()`] for a compile-time-safe way to construct this.
22pub struct RunnerConfig {
23    /// Application name used for session scoping.
24    pub app_name: String,
25    /// The root agent to execute.
26    pub agent: Arc<dyn Agent>,
27    /// Session persistence backend.
28    pub session_service: Arc<dyn SessionService>,
29    #[cfg(feature = "artifacts")]
30    /// Optional artifact storage service.
31    pub artifact_service: Option<Arc<dyn ArtifactService>>,
32    /// Optional memory/RAG service.
33    pub memory_service: Option<Arc<dyn Memory>>,
34    #[cfg(feature = "plugins")]
35    /// Optional plugin manager for lifecycle hooks.
36    pub plugin_manager: Option<Arc<PluginManager>>,
37    /// Optional run configuration (streaming mode, etc.)
38    /// If not provided, uses default (SSE streaming)
39    #[allow(dead_code)]
40    pub run_config: Option<RunConfig>,
41    /// Optional context compaction configuration.
42    /// When set, the runner will periodically summarize older events
43    /// to reduce context size sent to the LLM.
44    pub compaction_config: Option<adk_core::EventsCompactionConfig>,
45    /// Optional context cache configuration for automatic prompt caching lifecycle.
46    /// When set alongside `cache_capable`, the runner will automatically create and
47    /// manage cached content resources for supported providers.
48    ///
49    /// When `cache_capable` is set but this field is `None`, the runner
50    /// automatically uses [`ContextCacheConfig::default()`] (4096 min tokens,
51    /// 600s TTL, refresh every 3 invocations).
52    pub context_cache_config: Option<ContextCacheConfig>,
53    /// Optional cache-capable model reference for automatic cache management.
54    /// Set this to the same model used by the agent if it supports caching.
55    pub cache_capable: Option<Arc<dyn CacheCapable>>,
56    /// Optional request context from the server's auth middleware bridge.
57    /// When set, the runner passes it to `InvocationContext` so that
58    /// `user_scopes()` and `user_id()` reflect the authenticated identity.
59    pub request_context: Option<adk_core::RequestContext>,
60    /// Optional cooperative cancellation token for externally managed runs.
61    pub cancellation_token: Option<CancellationToken>,
62    /// Optional intra-invocation compaction configuration.
63    /// When set, the runner estimates token count before each agent run
64    /// and triggers mid-invocation summarization when the threshold is exceeded.
65    pub intra_compaction_config: Option<adk_core::IntraCompactionConfig>,
66    /// Optional summarizer for intra-invocation compaction.
67    /// Required when `intra_compaction_config` is set.
68    pub intra_compaction_summarizer: Option<Arc<dyn adk_core::BaseEventsSummarizer>>,
69    /// Optional context compaction configuration for token-budget overflow handling.
70    ///
71    /// When set, the runner applies the configured [`CompactionStrategy`](crate::compaction::CompactionStrategy)
72    /// to shrink the event history when the context exceeds the token budget,
73    /// retrying the model request up to `max_retries` times.
74    ///
75    /// This field is only available when the `context-compaction` feature is enabled.
76    #[cfg(feature = "context-compaction")]
77    pub context_compaction: Option<crate::compaction::CompactionConfig>,
78}
79
80/// Agent execution runtime.
81///
82/// Orchestrates session retrieval, agent dispatch, event streaming, context
83/// caching, and compaction. Construct via [`Runner::builder()`] or
84/// [`Runner::new()`].
85pub struct Runner {
86    app_name: String,
87    root_agent: Arc<dyn Agent>,
88    session_service: Arc<dyn SessionService>,
89    #[cfg(feature = "artifacts")]
90    artifact_service: Option<Arc<dyn ArtifactService>>,
91    memory_service: Option<Arc<dyn Memory>>,
92    #[cfg(feature = "plugins")]
93    plugin_manager: Option<Arc<PluginManager>>,
94    #[cfg(feature = "skills")]
95    skill_injector: Option<Arc<SkillInjector>>,
96    run_config: RunConfig,
97    compaction_config: Option<adk_core::EventsCompactionConfig>,
98    context_cache_config: Option<ContextCacheConfig>,
99    cache_capable: Option<Arc<dyn CacheCapable>>,
100    cache_manager: Option<Arc<tokio::sync::Mutex<CacheManager>>>,
101    request_context: Option<adk_core::RequestContext>,
102    cancellation_token: Option<CancellationToken>,
103    intra_compactor: Option<Arc<crate::intra_compaction::IntraInvocationCompactor>>,
104    /// Optional context compaction configuration for token-budget overflow handling.
105    #[cfg(feature = "context-compaction")]
106    context_compaction: Option<Arc<crate::compaction::CompactionConfig>>,
107    /// Per-session cancellation tokens for the interrupt API.
108    /// Each `run()` call registers a token here; `interrupt()` cancels it.
109    active_sessions: Arc<std::sync::Mutex<std::collections::HashMap<String, CancellationToken>>>,
110}
111
112impl Runner {
113    /// Create a typestate builder for constructing a `Runner`.
114    ///
115    /// The builder enforces at compile time that the three required fields
116    /// (`app_name`, `agent`, `session_service`) are set before `build()` is
117    /// callable.
118    ///
119    /// # Example
120    ///
121    /// ```rust,ignore
122    /// let runner = Runner::builder()
123    ///     .app_name("my-app")
124    ///     .agent(agent)
125    ///     .session_service(session_service)
126    ///     .build()?;
127    /// ```
128    pub fn builder() -> crate::builder::RunnerConfigBuilder<
129        crate::builder::NoAppName,
130        crate::builder::NoAgent,
131        crate::builder::NoSessionService,
132    > {
133        crate::builder::RunnerConfigBuilder::new()
134    }
135
136    /// Create a new runner from a [`RunnerConfig`].
137    ///
138    /// Prefer [`Runner::builder()`] for a compile-time-safe construction API.
139    pub fn new(config: RunnerConfig) -> Result<Self> {
140        let run_config = config.run_config.unwrap_or_default();
141
142        // When a cache-capable model is provided but no explicit cache config,
143        // use the default ContextCacheConfig to enable caching automatically.
144        let effective_cache_config = config
145            .context_cache_config
146            .or_else(|| config.cache_capable.as_ref().map(|_| ContextCacheConfig::default()));
147
148        let cache_manager = effective_cache_config
149            .as_ref()
150            .map(|c| Arc::new(tokio::sync::Mutex::new(CacheManager::new(c.clone()))));
151
152        let intra_compactor = config.intra_compaction_config.as_ref().and_then(|ic_config| {
153            config.intra_compaction_summarizer.as_ref().map(|summarizer| {
154                Arc::new(crate::intra_compaction::IntraInvocationCompactor::new(
155                    ic_config.clone(),
156                    summarizer.clone(),
157                ))
158            })
159        });
160
161        Ok(Self {
162            app_name: config.app_name,
163            root_agent: config.agent,
164            session_service: config.session_service,
165            #[cfg(feature = "artifacts")]
166            artifact_service: config.artifact_service,
167            memory_service: config.memory_service,
168            #[cfg(feature = "plugins")]
169            plugin_manager: config.plugin_manager,
170            #[cfg(feature = "skills")]
171            skill_injector: None,
172            run_config,
173            compaction_config: config.compaction_config,
174            context_cache_config: effective_cache_config,
175            cache_capable: config.cache_capable,
176            cache_manager,
177            request_context: config.request_context,
178            cancellation_token: config.cancellation_token,
179            intra_compactor,
180            #[cfg(feature = "context-compaction")]
181            context_compaction: config.context_compaction.map(Arc::new),
182            active_sessions: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
183        })
184    }
185
186    /// Enable skill injection using a pre-built injector.
187    ///
188    /// Skill injection runs before plugin `on_user_message` callbacks.
189    #[cfg(feature = "skills")]
190    pub fn with_skill_injector(mut self, injector: SkillInjector) -> Self {
191        self.skill_injector = Some(Arc::new(injector));
192        self
193    }
194
195    /// Enable skill injection by auto-loading `.skills/` from the given root path.
196    #[cfg(feature = "skills")]
197    #[deprecated(note = "Use with_auto_skills_mut instead")]
198    pub fn with_auto_skills(
199        mut self,
200        root: impl AsRef<std::path::Path>,
201        config: SkillInjectorConfig,
202    ) -> adk_skill::SkillResult<Self> {
203        self.with_auto_skills_mut(root, config)?;
204        Ok(self)
205    }
206
207    /// Enable skill injection by auto-loading `.skills/` from the given root path.
208    ///
209    /// Unlike [`with_auto_skills`](Self::with_auto_skills), this method borrows
210    /// the Runner mutably instead of consuming it. On error, the Runner remains
211    /// valid with no skill injector configured.
212    #[cfg(feature = "skills")]
213    pub fn with_auto_skills_mut(
214        &mut self,
215        root: impl AsRef<std::path::Path>,
216        config: SkillInjectorConfig,
217    ) -> adk_skill::SkillResult<()> {
218        let injector = SkillInjector::from_root(root, config)?;
219        self.skill_injector = Some(Arc::new(injector));
220        Ok(())
221    }
222
223    /// Execute the root agent for the given user and session, returning an event stream.
224    ///
225    /// Retrieves (or creates) the session, resolves the target agent, runs
226    /// plugins/skills, and streams events as the agent executes.
227    pub async fn run(
228        &self,
229        user_id: UserId,
230        session_id: SessionId,
231        user_content: Content,
232    ) -> Result<EventStream> {
233        let app_name = self.app_name.clone();
234        let typed_app_name = AppName::try_from(app_name.clone())?;
235        let session_service = self.session_service.clone();
236        let root_agent = self.root_agent.clone();
237        #[cfg(feature = "artifacts")]
238        let artifact_service = self.artifact_service.clone();
239        let memory_service = self.memory_service.clone();
240        #[cfg(feature = "plugins")]
241        let plugin_manager = self.plugin_manager.clone();
242        #[cfg(feature = "skills")]
243        let skill_injector = self.skill_injector.clone();
244        let mut run_config = self.run_config.clone();
245        let compaction_config = self.compaction_config.clone();
246        let context_cache_config = self.context_cache_config.clone();
247        let cache_capable = self.cache_capable.clone();
248        let cache_manager_ref = self.cache_manager.clone();
249        let request_context = self.request_context.clone();
250        let cancellation_token = self.cancellation_token.clone();
251        let intra_compactor = self.intra_compactor.clone();
252        #[cfg(feature = "context-compaction")]
253        let context_compaction = self.context_compaction.clone();
254
255        // Register a per-session cancellation token for the interrupt API.
256        // If a global token is configured, create a child token so that
257        // either global cancellation OR per-session interrupt stops this run.
258        let session_token = CancellationToken::new();
259        let session_id_str = session_id.as_str().to_string();
260        {
261            let mut sessions = self.active_sessions.lock().unwrap_or_else(|e| e.into_inner());
262            sessions.insert(session_id_str.clone(), session_token.clone());
263        }
264        let active_sessions = self.active_sessions.clone();
265
266        // Effective token: cancelled if either the global token or the session token fires
267        let effective_token = if let Some(ref global) = cancellation_token {
268            let combined = CancellationToken::new();
269            let combined_clone = combined.clone();
270            let global_clone = global.clone();
271            let session_clone = session_token.clone();
272            // Watch both tokens — cancel the combined token when either fires
273            let combined_for_global = combined_clone.clone();
274            tokio::spawn(async move {
275                global_clone.cancelled().await;
276                combined_for_global.cancel();
277            });
278            let combined_for_session = combined_clone;
279            tokio::spawn(async move {
280                session_clone.cancelled().await;
281                combined_for_session.cancel();
282            });
283            Some(combined)
284        } else {
285            Some(session_token.clone())
286        };
287
288        let s = stream! {
289            // Clean up session tracking when the stream ends.
290            // We use a simple struct with Drop to ensure cleanup even on early return.
291            struct SessionCleanup {
292                active_sessions: Arc<std::sync::Mutex<std::collections::HashMap<String, CancellationToken>>>,
293                session_id: String,
294            }
295            impl Drop for SessionCleanup {
296                fn drop(&mut self) {
297                    let mut sessions = self.active_sessions.lock().unwrap_or_else(|e| e.into_inner());
298                    sessions.remove(&self.session_id);
299                }
300            }
301            let _cleanup = SessionCleanup {
302                active_sessions: active_sessions.clone(),
303                session_id: session_id_str,
304            };
305
306            // Use the effective token (combines global + per-session)
307            let cancellation_token = effective_token;
308            // Get or create session
309            let session = match session_service
310                .get(adk_session::GetRequest {
311                    app_name: app_name.clone(),
312                    user_id: user_id.to_string(),
313                    session_id: session_id.to_string(),
314                    num_recent_events: run_config.history_max_events,
315                    after: None,
316                })
317                .await
318            {
319                Ok(s) => s,
320                Err(e) => {
321                    yield Err(e);
322                    return;
323                }
324            };
325
326            // Find which agent should handle this request
327            let agent_to_run = Self::find_agent_to_run(&root_agent, session.as_ref());
328
329            // Clone services for potential reuse in transfer
330            #[cfg(feature = "artifacts")]
331            let artifact_service_clone = artifact_service.clone();
332            let memory_service_clone = memory_service.clone();
333
334            // Create invocation context with MutableSession
335            let invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
336            #[cfg(any(feature = "skills", feature = "plugins"))]
337            let mut effective_user_content = user_content.clone();
338            #[cfg(not(any(feature = "skills", feature = "plugins")))]
339            let effective_user_content = user_content.clone();
340            #[cfg(feature = "skills")]
341            let mut selected_skill_name = String::new();
342            #[cfg(not(feature = "skills"))]
343            let selected_skill_name = String::new();
344            #[cfg(feature = "skills")]
345            let mut selected_skill_id = String::new();
346            #[cfg(not(feature = "skills"))]
347            let selected_skill_id = String::new();
348
349            #[cfg(feature = "skills")]
350            if let Some(injector) = skill_injector.as_ref()
351                && let Some(matched) = adk_skill::apply_skill_injection(
352                    &mut effective_user_content,
353                    injector.index(),
354                    injector.policy(),
355                    injector.max_injected_chars(),
356                ) {
357                    selected_skill_name = matched.skill.name;
358                    selected_skill_id = matched.skill.id;
359                }
360
361            let mut invocation_ctx = match InvocationContext::new_typed(
362                invocation_id.clone(),
363                agent_to_run.clone(),
364                user_id.clone(),
365                typed_app_name.clone(),
366                session_id.clone(),
367                effective_user_content.clone(),
368                Arc::from(session),
369            ) {
370                Ok(ctx) => ctx,
371                Err(e) => {
372                    yield Err(e);
373                    return;
374                }
375            };
376
377            // Add optional services
378            #[cfg(feature = "artifacts")]
379            if let Some(service) = artifact_service {
380                // Wrap service with ScopedArtifacts to bind session context
381                let scoped = adk_artifact::ScopedArtifacts::new(
382                    service,
383                    app_name.clone(),
384                    user_id.to_string(),
385                    session_id.to_string(),
386                );
387                invocation_ctx = invocation_ctx.with_artifacts(Arc::new(scoped));
388            }
389            if let Some(memory) = memory_service {
390                invocation_ctx = invocation_ctx.with_memory(memory);
391            }
392
393            // Apply run config (streaming mode, etc.)
394            invocation_ctx = invocation_ctx.with_run_config(run_config.clone());
395
396            // Apply request context from auth middleware bridge if present
397            if let Some(rc) = request_context.clone() {
398                invocation_ctx = invocation_ctx.with_request_context(rc);
399            }
400
401            let mut ctx = Arc::new(invocation_ctx);
402
403            #[cfg(feature = "plugins")]
404            if let Some(manager) = plugin_manager.as_ref() {
405                match manager
406                    .run_before_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>)
407                    .await
408                {
409                    Ok(Some(content)) => {
410                        let mut early_event = adk_core::Event::new(ctx.invocation_id());
411                        early_event.author = agent_to_run.name().to_string();
412                        early_event.llm_response.content = Some(content);
413
414                        ctx.mutable_session().append_event(early_event.clone());
415                        if let Err(e) = session_service.append_event(ctx.session_id(), early_event.clone()).await {
416                            yield Err(e);
417                            return;
418                        }
419
420                        yield Ok(early_event);
421                        manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
422                        return;
423                    }
424                    Ok(None) => {}
425                    Err(e) => {
426                        manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
427                        yield Err(e);
428                        return;
429                    }
430                }
431
432                match manager
433                    .run_on_user_message(
434                        ctx.clone() as Arc<dyn adk_core::InvocationContext>,
435                        effective_user_content.clone(),
436                    )
437                    .await
438                {
439                    Ok(Some(modified)) => {
440                        effective_user_content = modified;
441
442                        let mut refreshed_ctx = match InvocationContext::with_mutable_session(
443                            ctx.invocation_id().to_string(),
444                            agent_to_run.clone(),
445                            ctx.user_id().to_string(),
446                            ctx.app_name().to_string(),
447                            ctx.session_id().to_string(),
448                            effective_user_content.clone(),
449                            ctx.mutable_session().clone(),
450                        ) {
451                            Ok(ctx) => ctx,
452                            Err(e) => {
453                                yield Err(e);
454                                return;
455                            }
456                        };
457
458                        #[cfg(feature = "artifacts")]
459                        if let Some(service) = artifact_service_clone.clone() {
460                            let scoped = adk_artifact::ScopedArtifacts::new(
461                                service,
462                                ctx.app_name().to_string(),
463                                ctx.user_id().to_string(),
464                                ctx.session_id().to_string(),
465                            );
466                            refreshed_ctx = refreshed_ctx.with_artifacts(Arc::new(scoped));
467                        }
468                        if let Some(memory) = memory_service_clone.clone() {
469                            refreshed_ctx = refreshed_ctx.with_memory(memory);
470                        }
471                        refreshed_ctx = refreshed_ctx.with_run_config(run_config.clone());
472                        if let Some(rc) = request_context.clone() {
473                            refreshed_ctx = refreshed_ctx.with_request_context(rc);
474                        }
475                        ctx = Arc::new(refreshed_ctx);
476                    }
477                    Ok(None) => {}
478                    Err(e) => {
479                        if let Some(manager) = plugin_manager.as_ref() {
480                            manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
481                        }
482                        yield Err(e);
483                        return;
484                    }
485                }
486            }
487
488            // Append user message to session service (persistent storage)
489            let mut user_event = adk_core::Event::new(ctx.invocation_id());
490            user_event.author = "user".to_string();
491            user_event.llm_response.content = Some(effective_user_content.clone());
492
493            // Also add to mutable session for immediate visibility
494            // Note: adk_session::Event is a re-export of adk_core::Event, so we can use it directly
495            ctx.mutable_session().append_event(user_event.clone());
496
497            if let Err(e) = session_service.append_event(ctx.session_id(), user_event).await {
498                #[cfg(feature = "plugins")]
499                if let Some(manager) = plugin_manager.as_ref() {
500                    manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
501                }
502                yield Err(e);
503                return;
504            }
505
506            // ===== CONTEXT CACHE LIFECYCLE =====
507            // If context caching is configured and a cache-capable model is available,
508            // create or refresh the cached content before agent execution.
509            // Cache failures are non-fatal — log a warning and proceed without cache.
510            if let (Some(cm_mutex), Some(cache_model)) = (&cache_manager_ref, &cache_capable) {
511                let should_refresh_cache = {
512                    let cm = cm_mutex.lock().await;
513                    cm.is_enabled() && (cm.active_cache_name().is_none() || cm.needs_refresh())
514                };
515
516                if should_refresh_cache {
517                    // Gather system instruction from the agent's description
518                    // (the full instruction is resolved inside the agent, but the
519                    // description provides a reasonable proxy for cache keying).
520                    let system_instruction = agent_to_run.description().to_string();
521                    let tools = std::collections::HashMap::new();
522                    let ttl = context_cache_config.as_ref().map_or(600, |c| c.ttl_seconds);
523
524                    match cache_model.create_cache(&system_instruction, &tools, ttl).await {
525                        Ok(name) => {
526                            let old_cache = {
527                                let mut cm = cm_mutex.lock().await;
528                                let old = cm.clear_active_cache();
529                                cm.set_active_cache(name);
530                                old
531                            };
532
533                            if let Some(old) = old_cache
534                                && let Err(e) = cache_model.delete_cache(&old).await {
535                                    tracing::warn!(
536                                        old_cache = %old,
537                                        error = %e,
538                                        "failed to delete old cache, proceeding with new cache"
539                                    );
540                                }
541                        }
542                        Err(e) => {
543                            tracing::warn!(
544                                error = %e,
545                                "cache creation failed, proceeding without cache"
546                            );
547                        }
548                    }
549                }
550
551                // Attach cache name to run config so agents can use it.
552                let cache_name = {
553                    let mut cm = cm_mutex.lock().await;
554                    if cm.is_enabled() {
555                        cm.record_invocation().map(str::to_string)
556                    } else {
557                        None
558                    }
559                };
560
561                if let Some(cache_name) = cache_name {
562                    run_config.cached_content = Some(cache_name);
563                    // Rebuild the invocation context with the updated run config.
564                    let mut refreshed_ctx = match InvocationContext::with_mutable_session(
565                        ctx.invocation_id().to_string(),
566                        agent_to_run.clone(),
567                        ctx.user_id().to_string(),
568                        ctx.app_name().to_string(),
569                        ctx.session_id().to_string(),
570                        effective_user_content.clone(),
571                        ctx.mutable_session().clone(),
572                    ) {
573                        Ok(ctx) => ctx,
574                        Err(e) => {
575                            yield Err(e);
576                            return;
577                        }
578                    };
579                    #[cfg(feature = "artifacts")]
580                    if let Some(service) = artifact_service_clone.clone() {
581                        let scoped = adk_artifact::ScopedArtifacts::new(
582                            service,
583                            ctx.app_name().to_string(),
584                            ctx.user_id().to_string(),
585                            ctx.session_id().to_string(),
586                        );
587                        refreshed_ctx = refreshed_ctx.with_artifacts(Arc::new(scoped));
588                    }
589                    if let Some(memory) = memory_service_clone.clone() {
590                        refreshed_ctx = refreshed_ctx.with_memory(memory);
591                    }
592                    refreshed_ctx = refreshed_ctx.with_run_config(run_config.clone());
593                    if let Some(rc) = request_context.clone() {
594                        refreshed_ctx = refreshed_ctx.with_request_context(rc);
595                    }
596                    ctx = Arc::new(refreshed_ctx);
597                }
598            }
599
600            // ===== INTRA-INVOCATION COMPACTION =====
601            // If intra-compaction is configured, check if the session events
602            // exceed the token threshold and compact them before the agent runs.
603            if let Some(ref compactor) = intra_compactor {
604                compactor.reset_cycle();
605                let session_events = ctx.mutable_session().as_ref().events_snapshot();
606                match compactor.maybe_compact(&session_events).await {
607                    Ok(Some(compacted_events)) => {
608                        ctx.mutable_session().replace_events(compacted_events);
609                        tracing::info!("intra-invocation compaction applied before agent execution");
610                    }
611                    Ok(None) => {} // No compaction needed
612                    Err(e) => {
613                        tracing::warn!(error = %e, "intra-invocation compaction check failed");
614                    }
615                }
616            }
617
618            // ===== CONTEXT COMPACTION (TOKEN BUDGET) =====
619            // If context-compaction is configured, proactively check the estimated
620            // token count before calling the agent. If it exceeds the budget,
621            // apply compaction to bring it under the limit.
622            #[cfg(feature = "context-compaction")]
623            if let Some(ref cc_config) = context_compaction {
624                let session_events = ctx.mutable_session().events_snapshot();
625                let estimated = crate::compaction::estimate_event_tokens(&session_events);
626                if estimated > cc_config.context_budget {
627                    tracing::info!(
628                        estimated_tokens = estimated,
629                        budget = cc_config.context_budget,
630                        "context exceeds budget, applying proactive compaction"
631                    );
632                    match crate::compaction::apply_compaction_with_retry(cc_config, session_events).await {
633                        Ok(compacted) => {
634                            ctx.mutable_session().replace_events(compacted);
635                            tracing::info!("proactive context compaction succeeded");
636                        }
637                        Err(e) => {
638                            // Proactive compaction failed — proceed anyway and let the
639                            // model reject the request if it's truly too large.
640                            tracing::warn!(error = %e, "proactive context compaction failed, proceeding with full context");
641                        }
642                    }
643                }
644            }
645
646            // Run the agent with instrumentation (ADK-Go style attributes)
647            let agent_span = tracing::info_span!(
648                "agent.execute",
649                "gcp.vertex.agent.invocation_id" = ctx.invocation_id(),
650                "gcp.vertex.agent.session_id" = ctx.session_id(),
651                "gcp.vertex.agent.event_id" = ctx.invocation_id(), // Use invocation_id as event_id for agent spans
652                "gen_ai.conversation.id" = ctx.session_id(),
653                "adk.app_name" = ctx.app_name(),
654                "adk.user_id" = ctx.user_id(),
655                "agent.name" = %agent_to_run.name(),
656                "adk.skills.selected_name" = %selected_skill_name,
657                "adk.skills.selected_id" = %selected_skill_id
658            );
659
660            let mut agent_stream = match agent_to_run.run(ctx.clone()).instrument(agent_span.clone()).await {
661                Ok(s) => s,
662                #[cfg(feature = "context-compaction")]
663                Err(e) if context_compaction.is_some() && crate::compaction::is_token_limit_error(&e) => {
664                    // Token limit error on agent.run() — apply compaction and retry
665                    let cc_config = context_compaction.as_ref().unwrap();
666                    tracing::warn!(
667                        error = %e,
668                        "agent execution failed with token limit error, attempting compaction"
669                    );
670                    let session_events = ctx.mutable_session().events_snapshot();
671                    match crate::compaction::apply_compaction_with_retry(cc_config, session_events).await {
672                        Ok(compacted) => {
673                            ctx.mutable_session().replace_events(compacted);
674                            tracing::info!("context compaction succeeded after token limit error, retrying agent");
675                            // Retry the agent call with compacted context
676                            match agent_to_run.run(ctx.clone()).instrument(agent_span).await {
677                                Ok(s) => s,
678                                Err(retry_err) => {
679                                    #[cfg(feature = "plugins")]
680                                    if let Some(manager) = plugin_manager.as_ref() {
681                                        manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
682                                    }
683                                    yield Err(retry_err);
684                                    return;
685                                }
686                            }
687                        }
688                        Err(compaction_err) => {
689                            #[cfg(feature = "plugins")]
690                            if let Some(manager) = plugin_manager.as_ref() {
691                                manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
692                            }
693                            yield Err(compaction_err);
694                            return;
695                        }
696                    }
697                }
698                Err(e) => {
699                    #[cfg(feature = "plugins")]
700                    if let Some(manager) = plugin_manager.as_ref() {
701                        manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
702                    }
703                    yield Err(e);
704                    return;
705                }
706            };
707
708            // Stream events and check for transfers
709            use futures::StreamExt;
710            let mut transfer_target: Option<String> = None;
711
712            while let Some(result) = {
713                if let Some(token) = cancellation_token.as_ref()
714                    && token.is_cancelled() {
715                        #[cfg(feature = "plugins")]
716                        if let Some(manager) = plugin_manager.as_ref() {
717                            manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
718                        }
719                        return;
720                    }
721                agent_stream.next().await
722            } {
723                match result {
724                    Ok(event) => {
725                        #[cfg(feature = "plugins")]
726                        let mut event = event;
727
728                        #[cfg(feature = "plugins")]
729                        if let Some(manager) = plugin_manager.as_ref() {
730                            match manager
731                                .run_on_event(
732                                    ctx.clone() as Arc<dyn adk_core::InvocationContext>,
733                                    event.clone(),
734                                )
735                                .await
736                            {
737                                Ok(Some(modified)) => {
738                                    event = modified;
739                                }
740                                Ok(None) => {}
741                                Err(e) => {
742                                    manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
743                                    yield Err(e);
744                                    return;
745                                }
746                            }
747                        }
748
749                        // Check for transfer action
750                        if let Some(target) = &event.actions.transfer_to_agent {
751                            transfer_target = Some(target.clone());
752                        }
753
754                        // CRITICAL: Apply state_delta to the mutable session immediately.
755                        // This is the key fix for state propagation between sequential agents.
756                        // When an agent sets output_key, it emits an event with state_delta.
757                        // We must apply this to the mutable session so downstream agents
758                        // can read the value via ctx.session().state().get().
759                        if !event.actions.state_delta.is_empty() {
760                            ctx.mutable_session().apply_state_delta(&event.actions.state_delta);
761                        }
762
763                        // Also add the event to the mutable session's event list
764                        ctx.mutable_session().append_event(event.clone());
765
766                        // Append event to session service (persistent storage)
767                        // Skip partial streaming chunks — only persist the final
768                        // event. Streaming chunks share the same event ID, so
769                        // persisting each one would violate the primary key
770                        // constraint. The final chunk (partial=false) carries the
771                        // complete accumulated content.
772                        if !event.llm_response.partial
773                            && let Err(e) = session_service.append_event(ctx.session_id(), event.clone()).await {
774                                #[cfg(feature = "plugins")]
775                                if let Some(manager) = plugin_manager.as_ref() {
776                                    manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
777                                }
778                                yield Err(e);
779                                return;
780                            }
781                        yield Ok(event);
782                    }
783                    Err(e) => {
784                        #[cfg(feature = "plugins")]
785                        if let Some(manager) = plugin_manager.as_ref() {
786                            manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
787                        }
788                        yield Err(e);
789                        return;
790                    }
791                }
792            }
793
794            // ===== TRANSFER LOOP =====
795            // Support multi-hop transfers with a max-depth guard.
796            // When an agent emits transfer_to_agent, the runner resolves the
797            // target from the root agent tree, computes transfer_targets
798            // (parent + peers) for the new agent, and runs it. This repeats
799            // until no further transfer is requested or the depth limit is hit.
800            const DEFAULT_MAX_TRANSFER_DEPTH: u32 = 10;
801            let max_depth = run_config.max_transfer_depth.unwrap_or(DEFAULT_MAX_TRANSFER_DEPTH);
802            let mut transfer_depth: u32 = 0;
803            let mut current_transfer_target = transfer_target;
804
805            while let Some(target_name) = current_transfer_target.take() {
806                transfer_depth += 1;
807                if transfer_depth > max_depth {
808                    tracing::warn!(
809                        depth = transfer_depth,
810                        target = %target_name,
811                        "max transfer depth exceeded, stopping transfer chain"
812                    );
813                    break;
814                }
815
816                let target_agent = match Self::find_agent(&root_agent, &target_name) {
817                    Some(a) => a,
818                    None => {
819                        tracing::warn!(target = %target_name, "transfer target not found in agent tree");
820                        break;
821                    }
822                };
823
824                // Compute transfer_targets for the target agent:
825                // - parent: the agent that transferred to it (or root if applicable)
826                // - peers: siblings in the agent tree
827                // - children: handled by the agent itself via sub_agents()
828                let (parent_name, peer_names) = Self::compute_transfer_context(&root_agent, &target_name);
829
830                let mut transfer_run_config = run_config.clone();
831                let mut targets = Vec::new();
832                if let Some(ref parent) = parent_name {
833                    targets.push(parent.clone());
834                }
835                targets.extend(peer_names);
836                transfer_run_config.transfer_targets = targets;
837                transfer_run_config.parent_agent = parent_name;
838
839                // For transfers, we reuse the same mutable session to preserve state
840                let transfer_invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
841                let mut transfer_ctx = match InvocationContext::with_mutable_session(
842                    transfer_invocation_id.clone(),
843                    target_agent.clone(),
844                    ctx.user_id().to_string(),
845                    ctx.app_name().to_string(),
846                    ctx.session_id().to_string(),
847                    effective_user_content.clone(),
848                    ctx.mutable_session().clone(),
849                ) {
850                    Ok(ctx) => ctx,
851                    Err(e) => {
852                        yield Err(e);
853                        return;
854                    }
855                };
856
857                #[cfg(feature = "artifacts")]
858                if let Some(ref service) = artifact_service_clone {
859                    let scoped = adk_artifact::ScopedArtifacts::new(
860                        service.clone(),
861                        ctx.app_name().to_string(),
862                        ctx.user_id().to_string(),
863                        ctx.session_id().to_string(),
864                    );
865                    transfer_ctx = transfer_ctx.with_artifacts(Arc::new(scoped));
866                }
867                if let Some(ref memory) = memory_service_clone {
868                    transfer_ctx = transfer_ctx.with_memory(memory.clone());
869                }
870                transfer_ctx = transfer_ctx.with_run_config(transfer_run_config);
871                if let Some(rc) = request_context.clone() {
872                    transfer_ctx = transfer_ctx.with_request_context(rc);
873                }
874
875                let transfer_ctx = Arc::new(transfer_ctx);
876
877                // Run the transferred agent
878                let mut transfer_stream = match target_agent.run(transfer_ctx.clone()).await {
879                    Ok(s) => s,
880                    Err(e) => {
881                        #[cfg(feature = "plugins")]
882                        if let Some(manager) = plugin_manager.as_ref() {
883                            manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
884                        }
885                        yield Err(e);
886                        return;
887                    }
888                };
889
890                // Stream events from the transferred agent, capturing any further transfer
891                while let Some(result) = {
892                    if let Some(token) = cancellation_token.as_ref()
893                        && token.is_cancelled() {
894                            #[cfg(feature = "plugins")]
895                            if let Some(manager) = plugin_manager.as_ref() {
896                                manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
897                            }
898                            return;
899                        }
900                    transfer_stream.next().await
901                } {
902                    match result {
903                        Ok(event) => {
904                            #[cfg(feature = "plugins")]
905                            let mut event = event;
906                            #[cfg(feature = "plugins")]
907                            if let Some(manager) = plugin_manager.as_ref() {
908                                match manager
909                                    .run_on_event(
910                                        transfer_ctx.clone() as Arc<dyn adk_core::InvocationContext>,
911                                        event.clone(),
912                                    )
913                                    .await
914                                {
915                                    Ok(Some(modified)) => {
916                                        event = modified;
917                                    }
918                                    Ok(None) => {}
919                                    Err(e) => {
920                                        manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
921                                        yield Err(e);
922                                        return;
923                                    }
924                                }
925                            }
926
927                            // Capture further transfer requests
928                            if let Some(target) = &event.actions.transfer_to_agent {
929                                current_transfer_target = Some(target.clone());
930                            }
931
932                            // Apply state delta for transferred agent too
933                            if !event.actions.state_delta.is_empty() {
934                                transfer_ctx.mutable_session().apply_state_delta(&event.actions.state_delta);
935                            }
936
937                            // Add to mutable session
938                            transfer_ctx.mutable_session().append_event(event.clone());
939
940                            if !event.llm_response.partial
941                                && let Err(e) = session_service.append_event(ctx.session_id(), event.clone()).await {
942                                    #[cfg(feature = "plugins")]
943                                    if let Some(manager) = plugin_manager.as_ref() {
944                                        manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
945                                    }
946                                    yield Err(e);
947                                    return;
948                                }
949                            yield Ok(event);
950                        }
951                        Err(e) => {
952                            #[cfg(feature = "plugins")]
953                            if let Some(manager) = plugin_manager.as_ref() {
954                                manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
955                            }
956                            yield Err(e);
957                            return;
958                        }
959                    }
960                }
961            }
962
963            // ===== CONTEXT COMPACTION =====
964            // After all events have been processed, check if compaction should trigger.
965            // This runs in the background after the invocation completes.
966            if let Some(ref compaction_cfg) = compaction_config {
967                let event_count = ctx.mutable_session().as_ref().events_len();
968
969                if event_count > 0 {
970                    let all_events = ctx.mutable_session().as_ref().events_snapshot();
971                    let invocation_count = all_events.iter().filter(|e| e.author == "user").count()
972                        as u32;
973
974                    if invocation_count > 0
975                        && invocation_count.is_multiple_of(compaction_cfg.compaction_interval)
976                    {
977                        // Determine the window of events to compact
978                        // We compact all events except the most recent overlap_size invocations
979                        let overlap = compaction_cfg.overlap_size as usize;
980
981                        // Find the boundary: keep the last `overlap` user messages and everything after
982                        let user_msg_indices: Vec<usize> = all_events.iter()
983                            .enumerate()
984                            .filter(|(_, e)| e.author == "user")
985                            .map(|(i, _)| i)
986                            .collect();
987
988                        // Keep the last `overlap` user messages intact.
989                        // When overlap is 0, compact everything.
990                        let compact_up_to = if overlap == 0 {
991                            all_events.len()
992                        } else if user_msg_indices.len() > overlap {
993                            // Compact up to (but not including) the overlap-th-from-last user message
994                            user_msg_indices[user_msg_indices.len() - overlap]
995                        } else {
996                            // Not enough user messages to satisfy overlap — skip compaction
997                            0
998                        };
999
1000                        if compact_up_to > 0 {
1001                            let events_to_compact = &all_events[..compact_up_to];
1002
1003                            match compaction_cfg.summarizer.summarize_events(events_to_compact).await {
1004                                Ok(Some(compaction_event)) => {
1005                                    // Persist the compaction event
1006                                    if let Err(e) = session_service.append_event(
1007                                        ctx.session_id(),
1008                                        compaction_event.clone(),
1009                                    ).await {
1010                                        tracing::warn!(error = %e, "Failed to persist compaction event");
1011                                    } else {
1012                                        tracing::info!(
1013                                            compacted_events = compact_up_to,
1014                                            "Context compaction completed"
1015                                        );
1016                                    }
1017                                }
1018                                Ok(None) => {
1019                                    tracing::debug!("Compaction summarizer returned no result");
1020                                }
1021                                Err(e) => {
1022                                    // Compaction failure is non-fatal — log and continue
1023                                    tracing::warn!(error = %e, "Context compaction failed");
1024                                }
1025                            }
1026                        }
1027                    }
1028                }
1029            }
1030
1031            #[cfg(feature = "plugins")]
1032            if let Some(manager) = plugin_manager.as_ref() {
1033                manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
1034            }
1035        };
1036
1037        Ok(Box::pin(s))
1038    }
1039
1040    /// Convenience method that accepts string arguments.
1041    ///
1042    /// Converts `user_id` and `session_id` to their typed equivalents
1043    /// and delegates to [`run()`](Self::run).
1044    ///
1045    /// # Errors
1046    ///
1047    /// Returns an error if either string fails identity validation
1048    /// (empty, contains null bytes, or exceeds length limit).
1049    pub async fn run_str(
1050        &self,
1051        user_id: &str,
1052        session_id: &str,
1053        user_content: Content,
1054    ) -> Result<EventStream> {
1055        let user_id = UserId::try_from(user_id)?;
1056        let session_id = SessionId::try_from(session_id)?;
1057        self.run(user_id, session_id, user_content).await
1058    }
1059
1060    /// Interrupt a running agent for the given session.
1061    ///
1062    /// Cancels the agent's current execution within the event loop. Events
1063    /// already produced and appended to the session are preserved — only
1064    /// future events are stopped. The caller can then issue a new `run()`
1065    /// call with a different instruction to redirect the agent.
1066    ///
1067    /// Returns `true` if a running session was found and interrupted,
1068    /// `false` if no active run exists for that session ID.
1069    ///
1070    /// # Example
1071    ///
1072    /// ```rust,ignore
1073    /// // Start a run in the background
1074    /// let mut stream = runner.run(user_id, session_id, content).await?;
1075    /// tokio::spawn(async move { while stream.next().await.is_some() {} });
1076    ///
1077    /// // Later, interrupt it
1078    /// let was_running = runner.interrupt("session-1");
1079    /// assert!(was_running);
1080    ///
1081    /// // Redirect with a new instruction
1082    /// let mut stream = runner.run(user_id, session_id, new_content).await?;
1083    /// ```
1084    pub fn interrupt(&self, session_id: &str) -> bool {
1085        let sessions = self.active_sessions.lock().unwrap_or_else(|e| e.into_inner());
1086        if let Some(token) = sessions.get(session_id) {
1087            tracing::info!(session.id = session_id, "interrupting running agent");
1088            token.cancel();
1089            true
1090        } else {
1091            tracing::debug!(session.id = session_id, "no active run to interrupt");
1092            false
1093        }
1094    }
1095
1096    /// Returns the session IDs of all currently running agent executions.
1097    pub fn active_session_ids(&self) -> Vec<String> {
1098        let sessions = self.active_sessions.lock().unwrap_or_else(|e| e.into_inner());
1099        sessions.keys().cloned().collect()
1100    }
1101
1102    /// Returns a reference to the context compaction configuration, if set.
1103    ///
1104    /// This is used by the runner's generate_content loop to detect token limit
1105    /// errors and apply compaction strategies before retrying.
1106    #[cfg(feature = "context-compaction")]
1107    pub fn context_compaction(&self) -> Option<&crate::compaction::CompactionConfig> {
1108        self.context_compaction.as_deref()
1109    }
1110
1111    /// Find which agent should handle the request based on session history
1112    pub fn find_agent_to_run(
1113        root_agent: &Arc<dyn Agent>,
1114        session: &dyn adk_session::Session,
1115    ) -> Arc<dyn Agent> {
1116        // Look at recent events to find last agent that responded
1117        let events = session.events();
1118        for i in (0..events.len()).rev() {
1119            if let Some(event) = events.at(i) {
1120                // Check for explicit transfer
1121                if let Some(target_name) = &event.actions.transfer_to_agent
1122                    && let Some(agent) = Self::find_agent(root_agent, target_name)
1123                {
1124                    return agent;
1125                }
1126
1127                if event.author == "user" {
1128                    continue;
1129                }
1130
1131                // Try to find this agent in the tree
1132                if let Some(agent) = Self::find_agent(root_agent, &event.author) {
1133                    // Check if agent allows transfer up the tree
1134                    if Self::is_transferable(root_agent, &agent) {
1135                        return agent;
1136                    }
1137                }
1138            }
1139        }
1140
1141        // Default to root agent
1142        root_agent.clone()
1143    }
1144
1145    /// Check if an agent found in session history can be resumed for the next
1146    /// user message.
1147    ///
1148    /// This always returns `true` because the transfer-policy enforcement
1149    /// (`disallow_transfer_to_parent` / `disallow_transfer_to_peers`) is
1150    /// handled inside `LlmAgent::run()` when it builds the `transfer_to_agent`
1151    /// tool's valid-target list. The runner does not need to duplicate that
1152    /// check here — it only needs to know whether the agent is a valid
1153    /// resumption target, which it always is if it exists in the tree.
1154    fn is_transferable(_root_agent: &Arc<dyn Agent>, _agent: &Arc<dyn Agent>) -> bool {
1155        true
1156    }
1157
1158    /// Recursively search agent tree for agent with given name
1159    pub fn find_agent(current: &Arc<dyn Agent>, target_name: &str) -> Option<Arc<dyn Agent>> {
1160        if current.name() == target_name {
1161            return Some(current.clone());
1162        }
1163
1164        for sub_agent in current.sub_agents() {
1165            if let Some(found) = Self::find_agent(sub_agent, target_name) {
1166                return Some(found);
1167            }
1168        }
1169
1170        None
1171    }
1172
1173    /// Compute the parent name and peer names for a given agent in the tree.
1174    /// Returns `(parent_name, peer_names)`.
1175    ///
1176    /// Walks the agent tree to find the parent of `target_name`, then collects
1177    /// the parent's name and the sibling agent names (excluding the target itself).
1178    pub fn compute_transfer_context(
1179        root: &Arc<dyn Agent>,
1180        target_name: &str,
1181    ) -> (Option<String>, Vec<String>) {
1182        // If the target is the root itself, there's no parent or peers
1183        if root.name() == target_name {
1184            return (None, Vec::new());
1185        }
1186
1187        // BFS/DFS to find the parent of target_name
1188        fn find_parent(current: &Arc<dyn Agent>, target: &str) -> Option<Arc<dyn Agent>> {
1189            for sub in current.sub_agents() {
1190                if sub.name() == target {
1191                    return Some(current.clone());
1192                }
1193                if let Some(found) = find_parent(sub, target) {
1194                    return Some(found);
1195                }
1196            }
1197            None
1198        }
1199
1200        match find_parent(root, target_name) {
1201            Some(parent) => {
1202                let parent_name = parent.name().to_string();
1203                let peers: Vec<String> = parent
1204                    .sub_agents()
1205                    .iter()
1206                    .filter(|a| a.name() != target_name)
1207                    .map(|a| a.name().to_string())
1208                    .collect();
1209                (Some(parent_name), peers)
1210            }
1211            None => (None, Vec::new()),
1212        }
1213    }
1214}