Skip to main content

adk_runner/
runner.rs

1use crate::InvocationContext;
2use crate::cache::CacheManager;
3#[cfg(feature = "artifacts")]
4use adk_artifact::ArtifactService;
5use adk_core::{
6    Agent, AppName, CacheCapable, Content, ContextCacheConfig, EventStream, Memory,
7    ReadonlyContext, Result, RunConfig, SessionId, UserId,
8};
9#[cfg(feature = "plugins")]
10use adk_plugin::PluginManager;
11use adk_session::SessionService;
12#[cfg(feature = "skills")]
13use adk_skill::{SkillInjector, SkillInjectorConfig};
14use async_stream::stream;
15use std::sync::Arc;
16use tokio_util::sync::CancellationToken;
17use tracing::Instrument;
18
19/// Configuration for constructing a [`Runner`].
20///
21/// Use [`Runner::builder()`] for a compile-time-safe way to construct this.
22#[non_exhaustive]
23pub struct RunnerConfig {
24    /// Application name used for session scoping.
25    pub app_name: String,
26    /// The root agent to execute.
27    pub agent: Arc<dyn Agent>,
28    /// Session persistence backend.
29    pub session_service: Arc<dyn SessionService>,
30    #[cfg(feature = "artifacts")]
31    /// Optional artifact storage service.
32    pub artifact_service: Option<Arc<dyn ArtifactService>>,
33    /// Optional memory/RAG service.
34    pub memory_service: Option<Arc<dyn Memory>>,
35    #[cfg(feature = "plugins")]
36    /// Optional plugin manager for lifecycle hooks.
37    pub plugin_manager: Option<Arc<PluginManager>>,
38    /// Optional run configuration (streaming mode, etc.)
39    /// If not provided, uses default (SSE streaming)
40    #[allow(dead_code)]
41    pub run_config: Option<RunConfig>,
42    /// Optional context compaction configuration.
43    /// When set, the runner will periodically summarize older events
44    /// to reduce context size sent to the LLM.
45    pub compaction_config: Option<adk_core::EventsCompactionConfig>,
46    /// Optional context cache configuration for automatic prompt caching lifecycle.
47    /// When set alongside `cache_capable`, the runner will automatically create and
48    /// manage cached content resources for supported providers.
49    ///
50    /// When `cache_capable` is set but this field is `None`, the runner
51    /// automatically uses [`ContextCacheConfig::default()`] (4096 min tokens,
52    /// 600s TTL, refresh every 3 invocations).
53    pub context_cache_config: Option<ContextCacheConfig>,
54    /// Optional cache-capable model reference for automatic cache management.
55    /// Set this to the same model used by the agent if it supports caching.
56    pub cache_capable: Option<Arc<dyn CacheCapable>>,
57    /// Optional request context from the server's auth middleware bridge.
58    /// When set, the runner passes it to `InvocationContext` so that
59    /// `user_scopes()` and `user_id()` reflect the authenticated identity.
60    pub request_context: Option<adk_core::RequestContext>,
61    /// Optional cooperative cancellation token for externally managed runs.
62    pub cancellation_token: Option<CancellationToken>,
63    /// Optional intra-invocation compaction configuration.
64    /// When set, the runner estimates token count before each agent run
65    /// and triggers mid-invocation summarization when the threshold is exceeded.
66    pub intra_compaction_config: Option<adk_core::IntraCompactionConfig>,
67    /// Optional summarizer for intra-invocation compaction.
68    /// Required when `intra_compaction_config` is set.
69    pub intra_compaction_summarizer: Option<Arc<dyn adk_core::BaseEventsSummarizer>>,
70    /// Optional context compaction configuration for token-budget overflow handling.
71    ///
72    /// When set, the runner applies the configured [`CompactionStrategy`](crate::compaction::CompactionStrategy)
73    /// to shrink the event history when the context exceeds the token budget,
74    /// retrying the model request up to `max_retries` times.
75    ///
76    /// This field is only available when the `context-compaction` feature is enabled.
77    #[cfg(feature = "context-compaction")]
78    pub context_compaction: Option<crate::compaction::CompactionConfig>,
79}
80
81/// Agent execution runtime.
82///
83/// Orchestrates session retrieval, agent dispatch, event streaming, context
84/// caching, and compaction. Construct via [`Runner::builder()`] or
85/// [`Runner::new()`].
86pub struct Runner {
87    app_name: String,
88    root_agent: Arc<dyn Agent>,
89    session_service: Arc<dyn SessionService>,
90    #[cfg(feature = "artifacts")]
91    artifact_service: Option<Arc<dyn ArtifactService>>,
92    memory_service: Option<Arc<dyn Memory>>,
93    #[cfg(feature = "plugins")]
94    plugin_manager: Option<Arc<PluginManager>>,
95    #[cfg(feature = "skills")]
96    skill_injector: Option<Arc<SkillInjector>>,
97    run_config: RunConfig,
98    compaction_config: Option<adk_core::EventsCompactionConfig>,
99    context_cache_config: Option<ContextCacheConfig>,
100    cache_capable: Option<Arc<dyn CacheCapable>>,
101    cache_manager: Option<Arc<tokio::sync::Mutex<CacheManager>>>,
102    request_context: Option<adk_core::RequestContext>,
103    cancellation_token: Option<CancellationToken>,
104    intra_compactor: Option<Arc<crate::intra_compaction::IntraInvocationCompactor>>,
105    /// Optional context compaction configuration for token-budget overflow handling.
106    #[cfg(feature = "context-compaction")]
107    context_compaction: Option<Arc<crate::compaction::CompactionConfig>>,
108    /// Per-session cancellation tokens for the interrupt API.
109    /// Each `run()` call registers a token here; `interrupt()` cancels it.
110    active_sessions: Arc<std::sync::Mutex<std::collections::HashMap<String, CancellationToken>>>,
111}
112
113impl Runner {
114    /// Create a typestate builder for constructing a `Runner`.
115    ///
116    /// The builder enforces at compile time that the three required fields
117    /// (`app_name`, `agent`, `session_service`) are set before `build()` is
118    /// callable.
119    ///
120    /// # Example
121    ///
122    /// ```rust,ignore
123    /// let runner = Runner::builder()
124    ///     .app_name("my-app")
125    ///     .agent(agent)
126    ///     .session_service(session_service)
127    ///     .build()?;
128    /// ```
129    pub fn builder() -> crate::builder::RunnerConfigBuilder<
130        crate::builder::NoAppName,
131        crate::builder::NoAgent,
132        crate::builder::NoSessionService,
133    > {
134        crate::builder::RunnerConfigBuilder::new()
135    }
136
137    /// Create a new runner from a [`RunnerConfig`].
138    ///
139    /// Prefer [`Runner::builder()`] for a compile-time-safe construction API.
140    pub fn new(config: RunnerConfig) -> Result<Self> {
141        let run_config = config.run_config.unwrap_or_default();
142
143        // When a cache-capable model is provided but no explicit cache config,
144        // use the default ContextCacheConfig to enable caching automatically.
145        let effective_cache_config = config
146            .context_cache_config
147            .or_else(|| config.cache_capable.as_ref().map(|_| ContextCacheConfig::default()));
148
149        let cache_manager = effective_cache_config
150            .as_ref()
151            .map(|c| Arc::new(tokio::sync::Mutex::new(CacheManager::new(c.clone()))));
152
153        let intra_compactor = config.intra_compaction_config.as_ref().and_then(|ic_config| {
154            config.intra_compaction_summarizer.as_ref().map(|summarizer| {
155                Arc::new(crate::intra_compaction::IntraInvocationCompactor::new(
156                    ic_config.clone(),
157                    summarizer.clone(),
158                ))
159            })
160        });
161
162        Ok(Self {
163            app_name: config.app_name,
164            root_agent: config.agent,
165            session_service: config.session_service,
166            #[cfg(feature = "artifacts")]
167            artifact_service: config.artifact_service,
168            memory_service: config.memory_service,
169            #[cfg(feature = "plugins")]
170            plugin_manager: config.plugin_manager,
171            #[cfg(feature = "skills")]
172            skill_injector: None,
173            run_config,
174            compaction_config: config.compaction_config,
175            context_cache_config: effective_cache_config,
176            cache_capable: config.cache_capable,
177            cache_manager,
178            request_context: config.request_context,
179            cancellation_token: config.cancellation_token,
180            intra_compactor,
181            #[cfg(feature = "context-compaction")]
182            context_compaction: config.context_compaction.map(Arc::new),
183            active_sessions: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
184        })
185    }
186
187    /// Enable skill injection using a pre-built injector.
188    ///
189    /// Skill injection runs before plugin `on_user_message` callbacks.
190    #[cfg(feature = "skills")]
191    pub fn with_skill_injector(mut self, injector: SkillInjector) -> Self {
192        self.skill_injector = Some(Arc::new(injector));
193        self
194    }
195
196    /// Enable skill injection by auto-loading `.skills/` from the given root path.
197    #[cfg(feature = "skills")]
198    #[deprecated(note = "Use with_auto_skills_mut instead")]
199    pub fn with_auto_skills(
200        mut self,
201        root: impl AsRef<std::path::Path>,
202        config: SkillInjectorConfig,
203    ) -> adk_skill::SkillResult<Self> {
204        self.with_auto_skills_mut(root, config)?;
205        Ok(self)
206    }
207
208    /// Enable skill injection by auto-loading `.skills/` from the given root path.
209    ///
210    /// Unlike [`with_auto_skills`](Self::with_auto_skills), this method borrows
211    /// the Runner mutably instead of consuming it. On error, the Runner remains
212    /// valid with no skill injector configured.
213    #[cfg(feature = "skills")]
214    pub fn with_auto_skills_mut(
215        &mut self,
216        root: impl AsRef<std::path::Path>,
217        config: SkillInjectorConfig,
218    ) -> adk_skill::SkillResult<()> {
219        let injector = SkillInjector::from_root(root, config)?;
220        self.skill_injector = Some(Arc::new(injector));
221        Ok(())
222    }
223
224    /// Execute the root agent for the given user and session, returning an event stream.
225    ///
226    /// Retrieves (or creates) the session, resolves the target agent, runs
227    /// plugins/skills, and streams events as the agent executes.
228    pub async fn run(
229        &self,
230        user_id: UserId,
231        session_id: SessionId,
232        user_content: Content,
233    ) -> Result<EventStream> {
234        let app_name = self.app_name.clone();
235        let typed_app_name = AppName::try_from(app_name.clone())?;
236        let session_service = self.session_service.clone();
237        let root_agent = self.root_agent.clone();
238        #[cfg(feature = "artifacts")]
239        let artifact_service = self.artifact_service.clone();
240        let memory_service = self.memory_service.clone();
241        #[cfg(feature = "plugins")]
242        let plugin_manager = self.plugin_manager.clone();
243        #[cfg(feature = "skills")]
244        let skill_injector = self.skill_injector.clone();
245        let mut run_config = self.run_config.clone();
246        let compaction_config = self.compaction_config.clone();
247        let context_cache_config = self.context_cache_config.clone();
248        let cache_capable = self.cache_capable.clone();
249        let cache_manager_ref = self.cache_manager.clone();
250        let request_context = self.request_context.clone();
251        let cancellation_token = self.cancellation_token.clone();
252        let intra_compactor = self.intra_compactor.clone();
253        #[cfg(feature = "context-compaction")]
254        let context_compaction = self.context_compaction.clone();
255
256        // Register a per-session cancellation token for the interrupt API.
257        // If a global token is configured, create a child token so that
258        // either global cancellation OR per-session interrupt stops this run.
259        let session_token = CancellationToken::new();
260        let session_id_str = session_id.as_str().to_string();
261        {
262            let mut sessions = self.active_sessions.lock().unwrap();
263            sessions.insert(session_id_str.clone(), session_token.clone());
264        }
265        let active_sessions = self.active_sessions.clone();
266
267        // Effective token: cancelled if either the global token or the session token fires
268        let effective_token = if let Some(ref global) = cancellation_token {
269            let combined = CancellationToken::new();
270            let combined_clone = combined.clone();
271            let global_clone = global.clone();
272            let session_clone = session_token.clone();
273            // Watch both tokens — cancel the combined token when either fires
274            let combined_for_global = combined_clone.clone();
275            tokio::spawn(async move {
276                global_clone.cancelled().await;
277                combined_for_global.cancel();
278            });
279            let combined_for_session = combined_clone;
280            tokio::spawn(async move {
281                session_clone.cancelled().await;
282                combined_for_session.cancel();
283            });
284            Some(combined)
285        } else {
286            Some(session_token.clone())
287        };
288
289        let s = stream! {
290            // Clean up session tracking when the stream ends.
291            // We use a simple struct with Drop to ensure cleanup even on early return.
292            struct SessionCleanup {
293                active_sessions: Arc<std::sync::Mutex<std::collections::HashMap<String, CancellationToken>>>,
294                session_id: String,
295            }
296            impl Drop for SessionCleanup {
297                fn drop(&mut self) {
298                    let mut sessions = self.active_sessions.lock().unwrap();
299                    sessions.remove(&self.session_id);
300                }
301            }
302            let _cleanup = SessionCleanup {
303                active_sessions: active_sessions.clone(),
304                session_id: session_id_str,
305            };
306
307            // Use the effective token (combines global + per-session)
308            let cancellation_token = effective_token;
309            // Get or create session
310            let session = match session_service
311                .get(adk_session::GetRequest {
312                    app_name: app_name.clone(),
313                    user_id: user_id.to_string(),
314                    session_id: session_id.to_string(),
315                    num_recent_events: run_config.history_max_events,
316                    after: None,
317                })
318                .await
319            {
320                Ok(s) => s,
321                Err(e) => {
322                    yield Err(e);
323                    return;
324                }
325            };
326
327            // Find which agent should handle this request
328            let agent_to_run = Self::find_agent_to_run(&root_agent, session.as_ref());
329
330            // Clone services for potential reuse in transfer
331            #[cfg(feature = "artifacts")]
332            let artifact_service_clone = artifact_service.clone();
333            let memory_service_clone = memory_service.clone();
334
335            // Create invocation context with MutableSession
336            let invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
337            #[cfg(any(feature = "skills", feature = "plugins"))]
338            let mut effective_user_content = user_content.clone();
339            #[cfg(not(any(feature = "skills", feature = "plugins")))]
340            let effective_user_content = user_content.clone();
341            #[cfg(feature = "skills")]
342            let mut selected_skill_name = String::new();
343            #[cfg(not(feature = "skills"))]
344            let selected_skill_name = String::new();
345            #[cfg(feature = "skills")]
346            let mut selected_skill_id = String::new();
347            #[cfg(not(feature = "skills"))]
348            let selected_skill_id = String::new();
349
350            #[cfg(feature = "skills")]
351            if let Some(injector) = skill_injector.as_ref() {
352                if let Some(matched) = adk_skill::apply_skill_injection(
353                    &mut effective_user_content,
354                    injector.index(),
355                    injector.policy(),
356                    injector.max_injected_chars(),
357                ) {
358                    selected_skill_name = matched.skill.name;
359                    selected_skill_id = matched.skill.id;
360                }
361            }
362
363            let mut invocation_ctx = match InvocationContext::new_typed(
364                invocation_id.clone(),
365                agent_to_run.clone(),
366                user_id.clone(),
367                typed_app_name.clone(),
368                session_id.clone(),
369                effective_user_content.clone(),
370                Arc::from(session),
371            ) {
372                Ok(ctx) => ctx,
373                Err(e) => {
374                    yield Err(e);
375                    return;
376                }
377            };
378
379            // Add optional services
380            #[cfg(feature = "artifacts")]
381            if let Some(service) = artifact_service {
382                // Wrap service with ScopedArtifacts to bind session context
383                let scoped = adk_artifact::ScopedArtifacts::new(
384                    service,
385                    app_name.clone(),
386                    user_id.to_string(),
387                    session_id.to_string(),
388                );
389                invocation_ctx = invocation_ctx.with_artifacts(Arc::new(scoped));
390            }
391            if let Some(memory) = memory_service {
392                invocation_ctx = invocation_ctx.with_memory(memory);
393            }
394
395            // Apply run config (streaming mode, etc.)
396            invocation_ctx = invocation_ctx.with_run_config(run_config.clone());
397
398            // Apply request context from auth middleware bridge if present
399            if let Some(rc) = request_context.clone() {
400                invocation_ctx = invocation_ctx.with_request_context(rc);
401            }
402
403            let mut ctx = Arc::new(invocation_ctx);
404
405            #[cfg(feature = "plugins")]
406            if let Some(manager) = plugin_manager.as_ref() {
407                match manager
408                    .run_before_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>)
409                    .await
410                {
411                    Ok(Some(content)) => {
412                        let mut early_event = adk_core::Event::new(ctx.invocation_id());
413                        early_event.author = agent_to_run.name().to_string();
414                        early_event.llm_response.content = Some(content);
415
416                        ctx.mutable_session().append_event(early_event.clone());
417                        if let Err(e) = session_service.append_event(ctx.session_id(), early_event.clone()).await {
418                            yield Err(e);
419                            return;
420                        }
421
422                        yield Ok(early_event);
423                        manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
424                        return;
425                    }
426                    Ok(None) => {}
427                    Err(e) => {
428                        manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
429                        yield Err(e);
430                        return;
431                    }
432                }
433
434                match manager
435                    .run_on_user_message(
436                        ctx.clone() as Arc<dyn adk_core::InvocationContext>,
437                        effective_user_content.clone(),
438                    )
439                    .await
440                {
441                    Ok(Some(modified)) => {
442                        effective_user_content = modified;
443
444                        let mut refreshed_ctx = match InvocationContext::with_mutable_session(
445                            ctx.invocation_id().to_string(),
446                            agent_to_run.clone(),
447                            ctx.user_id().to_string(),
448                            ctx.app_name().to_string(),
449                            ctx.session_id().to_string(),
450                            effective_user_content.clone(),
451                            ctx.mutable_session().clone(),
452                        ) {
453                            Ok(ctx) => ctx,
454                            Err(e) => {
455                                yield Err(e);
456                                return;
457                            }
458                        };
459
460                        #[cfg(feature = "artifacts")]
461                        if let Some(service) = artifact_service_clone.clone() {
462                            let scoped = adk_artifact::ScopedArtifacts::new(
463                                service,
464                                ctx.app_name().to_string(),
465                                ctx.user_id().to_string(),
466                                ctx.session_id().to_string(),
467                            );
468                            refreshed_ctx = refreshed_ctx.with_artifacts(Arc::new(scoped));
469                        }
470                        if let Some(memory) = memory_service_clone.clone() {
471                            refreshed_ctx = refreshed_ctx.with_memory(memory);
472                        }
473                        refreshed_ctx = refreshed_ctx.with_run_config(run_config.clone());
474                        if let Some(rc) = request_context.clone() {
475                            refreshed_ctx = refreshed_ctx.with_request_context(rc);
476                        }
477                        ctx = Arc::new(refreshed_ctx);
478                    }
479                    Ok(None) => {}
480                    Err(e) => {
481                        if let Some(manager) = plugin_manager.as_ref() {
482                            manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
483                        }
484                        yield Err(e);
485                        return;
486                    }
487                }
488            }
489
490            // Append user message to session service (persistent storage)
491            let mut user_event = adk_core::Event::new(ctx.invocation_id());
492            user_event.author = "user".to_string();
493            user_event.llm_response.content = Some(effective_user_content.clone());
494
495            // Also add to mutable session for immediate visibility
496            // Note: adk_session::Event is a re-export of adk_core::Event, so we can use it directly
497            ctx.mutable_session().append_event(user_event.clone());
498
499            if let Err(e) = session_service.append_event(ctx.session_id(), user_event).await {
500                #[cfg(feature = "plugins")]
501                if let Some(manager) = plugin_manager.as_ref() {
502                    manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
503                }
504                yield Err(e);
505                return;
506            }
507
508            // ===== CONTEXT CACHE LIFECYCLE =====
509            // If context caching is configured and a cache-capable model is available,
510            // create or refresh the cached content before agent execution.
511            // Cache failures are non-fatal — log a warning and proceed without cache.
512            if let (Some(cm_mutex), Some(cache_model)) = (&cache_manager_ref, &cache_capable) {
513                let should_refresh_cache = {
514                    let cm = cm_mutex.lock().await;
515                    cm.is_enabled() && (cm.active_cache_name().is_none() || cm.needs_refresh())
516                };
517
518                if should_refresh_cache {
519                    // Gather system instruction from the agent's description
520                    // (the full instruction is resolved inside the agent, but the
521                    // description provides a reasonable proxy for cache keying).
522                    let system_instruction = agent_to_run.description().to_string();
523                    let tools = std::collections::HashMap::new();
524                    let ttl = context_cache_config.as_ref().map_or(600, |c| c.ttl_seconds);
525
526                    match cache_model.create_cache(&system_instruction, &tools, ttl).await {
527                        Ok(name) => {
528                            let old_cache = {
529                                let mut cm = cm_mutex.lock().await;
530                                let old = cm.clear_active_cache();
531                                cm.set_active_cache(name);
532                                old
533                            };
534
535                            if let Some(old) = old_cache {
536                                if let Err(e) = cache_model.delete_cache(&old).await {
537                                    tracing::warn!(
538                                        old_cache = %old,
539                                        error = %e,
540                                        "failed to delete old cache, proceeding with new cache"
541                                    );
542                                }
543                            }
544                        }
545                        Err(e) => {
546                            tracing::warn!(
547                                error = %e,
548                                "cache creation failed, proceeding without cache"
549                            );
550                        }
551                    }
552                }
553
554                // Attach cache name to run config so agents can use it.
555                let cache_name = {
556                    let mut cm = cm_mutex.lock().await;
557                    if cm.is_enabled() {
558                        cm.record_invocation().map(str::to_string)
559                    } else {
560                        None
561                    }
562                };
563
564                if let Some(cache_name) = cache_name {
565                    run_config.cached_content = Some(cache_name);
566                    // Rebuild the invocation context with the updated run config.
567                    let mut refreshed_ctx = match InvocationContext::with_mutable_session(
568                        ctx.invocation_id().to_string(),
569                        agent_to_run.clone(),
570                        ctx.user_id().to_string(),
571                        ctx.app_name().to_string(),
572                        ctx.session_id().to_string(),
573                        effective_user_content.clone(),
574                        ctx.mutable_session().clone(),
575                    ) {
576                        Ok(ctx) => ctx,
577                        Err(e) => {
578                            yield Err(e);
579                            return;
580                        }
581                    };
582                    #[cfg(feature = "artifacts")]
583                    if let Some(service) = artifact_service_clone.clone() {
584                        let scoped = adk_artifact::ScopedArtifacts::new(
585                            service,
586                            ctx.app_name().to_string(),
587                            ctx.user_id().to_string(),
588                            ctx.session_id().to_string(),
589                        );
590                        refreshed_ctx = refreshed_ctx.with_artifacts(Arc::new(scoped));
591                    }
592                    if let Some(memory) = memory_service_clone.clone() {
593                        refreshed_ctx = refreshed_ctx.with_memory(memory);
594                    }
595                    refreshed_ctx = refreshed_ctx.with_run_config(run_config.clone());
596                    if let Some(rc) = request_context.clone() {
597                        refreshed_ctx = refreshed_ctx.with_request_context(rc);
598                    }
599                    ctx = Arc::new(refreshed_ctx);
600                }
601            }
602
603            // ===== INTRA-INVOCATION COMPACTION =====
604            // If intra-compaction is configured, check if the session events
605            // exceed the token threshold and compact them before the agent runs.
606            if let Some(ref compactor) = intra_compactor {
607                compactor.reset_cycle();
608                let session_events = ctx.mutable_session().as_ref().events_snapshot();
609                match compactor.maybe_compact(&session_events).await {
610                    Ok(Some(compacted_events)) => {
611                        ctx.mutable_session().replace_events(compacted_events);
612                        tracing::info!("intra-invocation compaction applied before agent execution");
613                    }
614                    Ok(None) => {} // No compaction needed
615                    Err(e) => {
616                        tracing::warn!(error = %e, "intra-invocation compaction check failed");
617                    }
618                }
619            }
620
621            // ===== CONTEXT COMPACTION (TOKEN BUDGET) =====
622            // If context-compaction is configured, proactively check the estimated
623            // token count before calling the agent. If it exceeds the budget,
624            // apply compaction to bring it under the limit.
625            #[cfg(feature = "context-compaction")]
626            if let Some(ref cc_config) = context_compaction {
627                let session_events = ctx.mutable_session().events_snapshot();
628                let estimated = crate::compaction::estimate_event_tokens(&session_events);
629                if estimated > cc_config.context_budget {
630                    tracing::info!(
631                        estimated_tokens = estimated,
632                        budget = cc_config.context_budget,
633                        "context exceeds budget, applying proactive compaction"
634                    );
635                    match crate::compaction::apply_compaction_with_retry(cc_config, session_events).await {
636                        Ok(compacted) => {
637                            ctx.mutable_session().replace_events(compacted);
638                            tracing::info!("proactive context compaction succeeded");
639                        }
640                        Err(e) => {
641                            // Proactive compaction failed — proceed anyway and let the
642                            // model reject the request if it's truly too large.
643                            tracing::warn!(error = %e, "proactive context compaction failed, proceeding with full context");
644                        }
645                    }
646                }
647            }
648
649            // Run the agent with instrumentation (ADK-Go style attributes)
650            let agent_span = tracing::info_span!(
651                "agent.execute",
652                "gcp.vertex.agent.invocation_id" = ctx.invocation_id(),
653                "gcp.vertex.agent.session_id" = ctx.session_id(),
654                "gcp.vertex.agent.event_id" = ctx.invocation_id(), // Use invocation_id as event_id for agent spans
655                "gen_ai.conversation.id" = ctx.session_id(),
656                "adk.app_name" = ctx.app_name(),
657                "adk.user_id" = ctx.user_id(),
658                "agent.name" = %agent_to_run.name(),
659                "adk.skills.selected_name" = %selected_skill_name,
660                "adk.skills.selected_id" = %selected_skill_id
661            );
662
663            let mut agent_stream = match agent_to_run.run(ctx.clone()).instrument(agent_span.clone()).await {
664                Ok(s) => s,
665                #[cfg(feature = "context-compaction")]
666                Err(e) if context_compaction.is_some() && crate::compaction::is_token_limit_error(&e) => {
667                    // Token limit error on agent.run() — apply compaction and retry
668                    let cc_config = context_compaction.as_ref().unwrap();
669                    tracing::warn!(
670                        error = %e,
671                        "agent execution failed with token limit error, attempting compaction"
672                    );
673                    let session_events = ctx.mutable_session().events_snapshot();
674                    match crate::compaction::apply_compaction_with_retry(cc_config, session_events).await {
675                        Ok(compacted) => {
676                            ctx.mutable_session().replace_events(compacted);
677                            tracing::info!("context compaction succeeded after token limit error, retrying agent");
678                            // Retry the agent call with compacted context
679                            match agent_to_run.run(ctx.clone()).instrument(agent_span).await {
680                                Ok(s) => s,
681                                Err(retry_err) => {
682                                    #[cfg(feature = "plugins")]
683                                    if let Some(manager) = plugin_manager.as_ref() {
684                                        manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
685                                    }
686                                    yield Err(retry_err);
687                                    return;
688                                }
689                            }
690                        }
691                        Err(compaction_err) => {
692                            #[cfg(feature = "plugins")]
693                            if let Some(manager) = plugin_manager.as_ref() {
694                                manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
695                            }
696                            yield Err(compaction_err);
697                            return;
698                        }
699                    }
700                }
701                Err(e) => {
702                    #[cfg(feature = "plugins")]
703                    if let Some(manager) = plugin_manager.as_ref() {
704                        manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
705                    }
706                    yield Err(e);
707                    return;
708                }
709            };
710
711            // Stream events and check for transfers
712            use futures::StreamExt;
713            let mut transfer_target: Option<String> = None;
714
715            while let Some(result) = {
716                if let Some(token) = cancellation_token.as_ref() {
717                    if token.is_cancelled() {
718                        #[cfg(feature = "plugins")]
719                        if let Some(manager) = plugin_manager.as_ref() {
720                            manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
721                        }
722                        return;
723                    }
724                }
725                agent_stream.next().await
726            } {
727                match result {
728                    Ok(event) => {
729                        #[cfg(feature = "plugins")]
730                        let mut event = event;
731
732                        #[cfg(feature = "plugins")]
733                        if let Some(manager) = plugin_manager.as_ref() {
734                            match manager
735                                .run_on_event(
736                                    ctx.clone() as Arc<dyn adk_core::InvocationContext>,
737                                    event.clone(),
738                                )
739                                .await
740                            {
741                                Ok(Some(modified)) => {
742                                    event = modified;
743                                }
744                                Ok(None) => {}
745                                Err(e) => {
746                                    manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
747                                    yield Err(e);
748                                    return;
749                                }
750                            }
751                        }
752
753                        // Check for transfer action
754                        if let Some(target) = &event.actions.transfer_to_agent {
755                            transfer_target = Some(target.clone());
756                        }
757
758                        // CRITICAL: Apply state_delta to the mutable session immediately.
759                        // This is the key fix for state propagation between sequential agents.
760                        // When an agent sets output_key, it emits an event with state_delta.
761                        // We must apply this to the mutable session so downstream agents
762                        // can read the value via ctx.session().state().get().
763                        if !event.actions.state_delta.is_empty() {
764                            ctx.mutable_session().apply_state_delta(&event.actions.state_delta);
765                        }
766
767                        // Also add the event to the mutable session's event list
768                        ctx.mutable_session().append_event(event.clone());
769
770                        // Append event to session service (persistent storage)
771                        // Skip partial streaming chunks — only persist the final
772                        // event. Streaming chunks share the same event ID, so
773                        // persisting each one would violate the primary key
774                        // constraint. The final chunk (partial=false) carries the
775                        // complete accumulated content.
776                        if !event.llm_response.partial {
777                            if let Err(e) = session_service.append_event(ctx.session_id(), event.clone()).await {
778                                #[cfg(feature = "plugins")]
779                                if let Some(manager) = plugin_manager.as_ref() {
780                                    manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
781                                }
782                                yield Err(e);
783                                return;
784                            }
785                        }
786                        yield Ok(event);
787                    }
788                    Err(e) => {
789                        #[cfg(feature = "plugins")]
790                        if let Some(manager) = plugin_manager.as_ref() {
791                            manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
792                        }
793                        yield Err(e);
794                        return;
795                    }
796                }
797            }
798
799            // ===== TRANSFER LOOP =====
800            // Support multi-hop transfers with a max-depth guard.
801            // When an agent emits transfer_to_agent, the runner resolves the
802            // target from the root agent tree, computes transfer_targets
803            // (parent + peers) for the new agent, and runs it. This repeats
804            // until no further transfer is requested or the depth limit is hit.
805            const MAX_TRANSFER_DEPTH: u32 = 10;
806            let mut transfer_depth: u32 = 0;
807            let mut current_transfer_target = transfer_target;
808
809            while let Some(target_name) = current_transfer_target.take() {
810                transfer_depth += 1;
811                if transfer_depth > MAX_TRANSFER_DEPTH {
812                    tracing::warn!(
813                        depth = transfer_depth,
814                        target = %target_name,
815                        "max transfer depth exceeded, stopping transfer chain"
816                    );
817                    break;
818                }
819
820                let target_agent = match Self::find_agent(&root_agent, &target_name) {
821                    Some(a) => a,
822                    None => {
823                        tracing::warn!(target = %target_name, "transfer target not found in agent tree");
824                        break;
825                    }
826                };
827
828                // Compute transfer_targets for the target agent:
829                // - parent: the agent that transferred to it (or root if applicable)
830                // - peers: siblings in the agent tree
831                // - children: handled by the agent itself via sub_agents()
832                let (parent_name, peer_names) = Self::compute_transfer_context(&root_agent, &target_name);
833
834                let mut transfer_run_config = run_config.clone();
835                let mut targets = Vec::new();
836                if let Some(ref parent) = parent_name {
837                    targets.push(parent.clone());
838                }
839                targets.extend(peer_names);
840                transfer_run_config.transfer_targets = targets;
841                transfer_run_config.parent_agent = parent_name;
842
843                // For transfers, we reuse the same mutable session to preserve state
844                let transfer_invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
845                let mut transfer_ctx = match InvocationContext::with_mutable_session(
846                    transfer_invocation_id.clone(),
847                    target_agent.clone(),
848                    ctx.user_id().to_string(),
849                    ctx.app_name().to_string(),
850                    ctx.session_id().to_string(),
851                    effective_user_content.clone(),
852                    ctx.mutable_session().clone(),
853                ) {
854                    Ok(ctx) => ctx,
855                    Err(e) => {
856                        yield Err(e);
857                        return;
858                    }
859                };
860
861                #[cfg(feature = "artifacts")]
862                if let Some(ref service) = artifact_service_clone {
863                    let scoped = adk_artifact::ScopedArtifacts::new(
864                        service.clone(),
865                        ctx.app_name().to_string(),
866                        ctx.user_id().to_string(),
867                        ctx.session_id().to_string(),
868                    );
869                    transfer_ctx = transfer_ctx.with_artifacts(Arc::new(scoped));
870                }
871                if let Some(ref memory) = memory_service_clone {
872                    transfer_ctx = transfer_ctx.with_memory(memory.clone());
873                }
874                transfer_ctx = transfer_ctx.with_run_config(transfer_run_config);
875                if let Some(rc) = request_context.clone() {
876                    transfer_ctx = transfer_ctx.with_request_context(rc);
877                }
878
879                let transfer_ctx = Arc::new(transfer_ctx);
880
881                // Run the transferred agent
882                let mut transfer_stream = match target_agent.run(transfer_ctx.clone()).await {
883                    Ok(s) => s,
884                    Err(e) => {
885                        #[cfg(feature = "plugins")]
886                        if let Some(manager) = plugin_manager.as_ref() {
887                            manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
888                        }
889                        yield Err(e);
890                        return;
891                    }
892                };
893
894                // Stream events from the transferred agent, capturing any further transfer
895                while let Some(result) = {
896                    if let Some(token) = cancellation_token.as_ref() {
897                        if token.is_cancelled() {
898                            #[cfg(feature = "plugins")]
899                            if let Some(manager) = plugin_manager.as_ref() {
900                                manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
901                            }
902                            return;
903                        }
904                    }
905                    transfer_stream.next().await
906                } {
907                    match result {
908                        Ok(event) => {
909                            #[cfg(feature = "plugins")]
910                            let mut event = event;
911                            #[cfg(feature = "plugins")]
912                            if let Some(manager) = plugin_manager.as_ref() {
913                                match manager
914                                    .run_on_event(
915                                        transfer_ctx.clone() as Arc<dyn adk_core::InvocationContext>,
916                                        event.clone(),
917                                    )
918                                    .await
919                                {
920                                    Ok(Some(modified)) => {
921                                        event = modified;
922                                    }
923                                    Ok(None) => {}
924                                    Err(e) => {
925                                        manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
926                                        yield Err(e);
927                                        return;
928                                    }
929                                }
930                            }
931
932                            // Capture further transfer requests
933                            if let Some(target) = &event.actions.transfer_to_agent {
934                                current_transfer_target = Some(target.clone());
935                            }
936
937                            // Apply state delta for transferred agent too
938                            if !event.actions.state_delta.is_empty() {
939                                transfer_ctx.mutable_session().apply_state_delta(&event.actions.state_delta);
940                            }
941
942                            // Add to mutable session
943                            transfer_ctx.mutable_session().append_event(event.clone());
944
945                            if !event.llm_response.partial {
946                                if let Err(e) = session_service.append_event(ctx.session_id(), event.clone()).await {
947                                    #[cfg(feature = "plugins")]
948                                    if let Some(manager) = plugin_manager.as_ref() {
949                                        manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
950                                    }
951                                    yield Err(e);
952                                    return;
953                                }
954                            }
955                            yield Ok(event);
956                        }
957                        Err(e) => {
958                            #[cfg(feature = "plugins")]
959                            if let Some(manager) = plugin_manager.as_ref() {
960                                manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
961                            }
962                            yield Err(e);
963                            return;
964                        }
965                    }
966                }
967            }
968
969            // ===== CONTEXT COMPACTION =====
970            // After all events have been processed, check if compaction should trigger.
971            // This runs in the background after the invocation completes.
972            if let Some(ref compaction_cfg) = compaction_config {
973                let event_count = ctx.mutable_session().as_ref().events_len();
974
975                if event_count > 0 {
976                    let all_events = ctx.mutable_session().as_ref().events_snapshot();
977                    let invocation_count = all_events.iter().filter(|e| e.author == "user").count()
978                        as u32;
979
980                    if invocation_count > 0
981                        && invocation_count % compaction_cfg.compaction_interval == 0
982                    {
983                        // Determine the window of events to compact
984                        // We compact all events except the most recent overlap_size invocations
985                        let overlap = compaction_cfg.overlap_size as usize;
986
987                        // Find the boundary: keep the last `overlap` user messages and everything after
988                        let user_msg_indices: Vec<usize> = all_events.iter()
989                            .enumerate()
990                            .filter(|(_, e)| e.author == "user")
991                            .map(|(i, _)| i)
992                            .collect();
993
994                        // Keep the last `overlap` user messages intact.
995                        // When overlap is 0, compact everything.
996                        let compact_up_to = if overlap == 0 {
997                            all_events.len()
998                        } else if user_msg_indices.len() > overlap {
999                            // Compact up to (but not including) the overlap-th-from-last user message
1000                            user_msg_indices[user_msg_indices.len() - overlap]
1001                        } else {
1002                            // Not enough user messages to satisfy overlap — skip compaction
1003                            0
1004                        };
1005
1006                        if compact_up_to > 0 {
1007                            let events_to_compact = &all_events[..compact_up_to];
1008
1009                            match compaction_cfg.summarizer.summarize_events(events_to_compact).await {
1010                                Ok(Some(compaction_event)) => {
1011                                    // Persist the compaction event
1012                                    if let Err(e) = session_service.append_event(
1013                                        ctx.session_id(),
1014                                        compaction_event.clone(),
1015                                    ).await {
1016                                        tracing::warn!(error = %e, "Failed to persist compaction event");
1017                                    } else {
1018                                        tracing::info!(
1019                                            compacted_events = compact_up_to,
1020                                            "Context compaction completed"
1021                                        );
1022                                    }
1023                                }
1024                                Ok(None) => {
1025                                    tracing::debug!("Compaction summarizer returned no result");
1026                                }
1027                                Err(e) => {
1028                                    // Compaction failure is non-fatal — log and continue
1029                                    tracing::warn!(error = %e, "Context compaction failed");
1030                                }
1031                            }
1032                        }
1033                    }
1034                }
1035            }
1036
1037            #[cfg(feature = "plugins")]
1038            if let Some(manager) = plugin_manager.as_ref() {
1039                manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
1040            }
1041        };
1042
1043        Ok(Box::pin(s))
1044    }
1045
1046    /// Convenience method that accepts string arguments.
1047    ///
1048    /// Converts `user_id` and `session_id` to their typed equivalents
1049    /// and delegates to [`run()`](Self::run).
1050    ///
1051    /// # Errors
1052    ///
1053    /// Returns an error if either string fails identity validation
1054    /// (empty, contains null bytes, or exceeds length limit).
1055    pub async fn run_str(
1056        &self,
1057        user_id: &str,
1058        session_id: &str,
1059        user_content: Content,
1060    ) -> Result<EventStream> {
1061        let user_id = UserId::try_from(user_id)?;
1062        let session_id = SessionId::try_from(session_id)?;
1063        self.run(user_id, session_id, user_content).await
1064    }
1065
1066    /// Interrupt a running agent for the given session.
1067    ///
1068    /// Cancels the agent's current execution within the event loop. Events
1069    /// already produced and appended to the session are preserved — only
1070    /// future events are stopped. The caller can then issue a new `run()`
1071    /// call with a different instruction to redirect the agent.
1072    ///
1073    /// Returns `true` if a running session was found and interrupted,
1074    /// `false` if no active run exists for that session ID.
1075    ///
1076    /// # Example
1077    ///
1078    /// ```rust,ignore
1079    /// // Start a run in the background
1080    /// let mut stream = runner.run(user_id, session_id, content).await?;
1081    /// tokio::spawn(async move { while stream.next().await.is_some() {} });
1082    ///
1083    /// // Later, interrupt it
1084    /// let was_running = runner.interrupt("session-1");
1085    /// assert!(was_running);
1086    ///
1087    /// // Redirect with a new instruction
1088    /// let mut stream = runner.run(user_id, session_id, new_content).await?;
1089    /// ```
1090    pub fn interrupt(&self, session_id: &str) -> bool {
1091        let sessions = self.active_sessions.lock().unwrap();
1092        if let Some(token) = sessions.get(session_id) {
1093            tracing::info!(session.id = session_id, "interrupting running agent");
1094            token.cancel();
1095            true
1096        } else {
1097            tracing::debug!(session.id = session_id, "no active run to interrupt");
1098            false
1099        }
1100    }
1101
1102    /// Returns the session IDs of all currently running agent executions.
1103    pub fn active_session_ids(&self) -> Vec<String> {
1104        let sessions = self.active_sessions.lock().unwrap();
1105        sessions.keys().cloned().collect()
1106    }
1107
1108    /// Returns a reference to the context compaction configuration, if set.
1109    ///
1110    /// This is used by the runner's generate_content loop to detect token limit
1111    /// errors and apply compaction strategies before retrying.
1112    #[cfg(feature = "context-compaction")]
1113    pub fn context_compaction(&self) -> Option<&crate::compaction::CompactionConfig> {
1114        self.context_compaction.as_deref()
1115    }
1116
1117    /// Find which agent should handle the request based on session history
1118    pub fn find_agent_to_run(
1119        root_agent: &Arc<dyn Agent>,
1120        session: &dyn adk_session::Session,
1121    ) -> Arc<dyn Agent> {
1122        // Look at recent events to find last agent that responded
1123        let events = session.events();
1124        for i in (0..events.len()).rev() {
1125            if let Some(event) = events.at(i) {
1126                // Check for explicit transfer
1127                if let Some(target_name) = &event.actions.transfer_to_agent {
1128                    if let Some(agent) = Self::find_agent(root_agent, target_name) {
1129                        return agent;
1130                    }
1131                }
1132
1133                if event.author == "user" {
1134                    continue;
1135                }
1136
1137                // Try to find this agent in the tree
1138                if let Some(agent) = Self::find_agent(root_agent, &event.author) {
1139                    // Check if agent allows transfer up the tree
1140                    if Self::is_transferable(root_agent, &agent) {
1141                        return agent;
1142                    }
1143                }
1144            }
1145        }
1146
1147        // Default to root agent
1148        root_agent.clone()
1149    }
1150
1151    /// Check if an agent found in session history can be resumed for the next
1152    /// user message.
1153    ///
1154    /// This always returns `true` because the transfer-policy enforcement
1155    /// (`disallow_transfer_to_parent` / `disallow_transfer_to_peers`) is
1156    /// handled inside `LlmAgent::run()` when it builds the `transfer_to_agent`
1157    /// tool's valid-target list. The runner does not need to duplicate that
1158    /// check here — it only needs to know whether the agent is a valid
1159    /// resumption target, which it always is if it exists in the tree.
1160    fn is_transferable(_root_agent: &Arc<dyn Agent>, _agent: &Arc<dyn Agent>) -> bool {
1161        true
1162    }
1163
1164    /// Recursively search agent tree for agent with given name
1165    pub fn find_agent(current: &Arc<dyn Agent>, target_name: &str) -> Option<Arc<dyn Agent>> {
1166        if current.name() == target_name {
1167            return Some(current.clone());
1168        }
1169
1170        for sub_agent in current.sub_agents() {
1171            if let Some(found) = Self::find_agent(sub_agent, target_name) {
1172                return Some(found);
1173            }
1174        }
1175
1176        None
1177    }
1178
1179    /// Compute the parent name and peer names for a given agent in the tree.
1180    /// Returns `(parent_name, peer_names)`.
1181    ///
1182    /// Walks the agent tree to find the parent of `target_name`, then collects
1183    /// the parent's name and the sibling agent names (excluding the target itself).
1184    pub fn compute_transfer_context(
1185        root: &Arc<dyn Agent>,
1186        target_name: &str,
1187    ) -> (Option<String>, Vec<String>) {
1188        // If the target is the root itself, there's no parent or peers
1189        if root.name() == target_name {
1190            return (None, Vec::new());
1191        }
1192
1193        // BFS/DFS to find the parent of target_name
1194        fn find_parent(current: &Arc<dyn Agent>, target: &str) -> Option<Arc<dyn Agent>> {
1195            for sub in current.sub_agents() {
1196                if sub.name() == target {
1197                    return Some(current.clone());
1198                }
1199                if let Some(found) = find_parent(sub, target) {
1200                    return Some(found);
1201                }
1202            }
1203            None
1204        }
1205
1206        match find_parent(root, target_name) {
1207            Some(parent) => {
1208                let parent_name = parent.name().to_string();
1209                let peers: Vec<String> = parent
1210                    .sub_agents()
1211                    .iter()
1212                    .filter(|a| a.name() != target_name)
1213                    .map(|a| a.name().to_string())
1214                    .collect();
1215                (Some(parent_name), peers)
1216            }
1217            None => (None, Vec::new()),
1218        }
1219    }
1220}