Skip to main content

adk_runner/
runner.rs

1use crate::InvocationContext;
2use crate::cache::CacheManager;
3#[cfg(feature = "artifacts")]
4use adk_artifact::ArtifactService;
5use adk_core::{
6    Agent, AppName, CacheCapable, Content, ContextCacheConfig, EventStream, Memory,
7    ReadonlyContext, Result, RunConfig, SessionId, UserId,
8};
9#[cfg(feature = "plugins")]
10use adk_plugin::PluginManager;
11use adk_session::SessionService;
12#[cfg(feature = "skills")]
13use adk_skill::{SkillInjector, SkillInjectorConfig};
14use async_stream::stream;
15use std::sync::Arc;
16use tokio_util::sync::CancellationToken;
17use tracing::Instrument;
18
19pub struct RunnerConfig {
20    pub app_name: String,
21    pub agent: Arc<dyn Agent>,
22    pub session_service: Arc<dyn SessionService>,
23    #[cfg(feature = "artifacts")]
24    pub artifact_service: Option<Arc<dyn ArtifactService>>,
25    pub memory_service: Option<Arc<dyn Memory>>,
26    #[cfg(feature = "plugins")]
27    pub plugin_manager: Option<Arc<PluginManager>>,
28    /// Optional run configuration (streaming mode, etc.)
29    /// If not provided, uses default (SSE streaming)
30    #[allow(dead_code)]
31    pub run_config: Option<RunConfig>,
32    /// Optional context compaction configuration.
33    /// When set, the runner will periodically summarize older events
34    /// to reduce context size sent to the LLM.
35    pub compaction_config: Option<adk_core::EventsCompactionConfig>,
36    /// Optional context cache configuration for automatic prompt caching lifecycle.
37    /// When set alongside `cache_capable`, the runner will automatically create and
38    /// manage cached content resources for supported providers.
39    ///
40    /// When `cache_capable` is set but this field is `None`, the runner
41    /// automatically uses [`ContextCacheConfig::default()`] (4096 min tokens,
42    /// 600s TTL, refresh every 3 invocations).
43    pub context_cache_config: Option<ContextCacheConfig>,
44    /// Optional cache-capable model reference for automatic cache management.
45    /// Set this to the same model used by the agent if it supports caching.
46    pub cache_capable: Option<Arc<dyn CacheCapable>>,
47    /// Optional request context from the server's auth middleware bridge.
48    /// When set, the runner passes it to `InvocationContext` so that
49    /// `user_scopes()` and `user_id()` reflect the authenticated identity.
50    pub request_context: Option<adk_core::RequestContext>,
51    /// Optional cooperative cancellation token for externally managed runs.
52    pub cancellation_token: Option<CancellationToken>,
53    /// Optional intra-invocation compaction configuration.
54    /// When set, the runner estimates token count before each agent run
55    /// and triggers mid-invocation summarization when the threshold is exceeded.
56    pub intra_compaction_config: Option<adk_core::IntraCompactionConfig>,
57    /// Optional summarizer for intra-invocation compaction.
58    /// Required when `intra_compaction_config` is set.
59    pub intra_compaction_summarizer: Option<Arc<dyn adk_core::BaseEventsSummarizer>>,
60}
61
62pub struct Runner {
63    app_name: String,
64    root_agent: Arc<dyn Agent>,
65    session_service: Arc<dyn SessionService>,
66    #[cfg(feature = "artifacts")]
67    artifact_service: Option<Arc<dyn ArtifactService>>,
68    memory_service: Option<Arc<dyn Memory>>,
69    #[cfg(feature = "plugins")]
70    plugin_manager: Option<Arc<PluginManager>>,
71    #[cfg(feature = "skills")]
72    skill_injector: Option<Arc<SkillInjector>>,
73    run_config: RunConfig,
74    compaction_config: Option<adk_core::EventsCompactionConfig>,
75    context_cache_config: Option<ContextCacheConfig>,
76    cache_capable: Option<Arc<dyn CacheCapable>>,
77    cache_manager: Option<Arc<tokio::sync::Mutex<CacheManager>>>,
78    request_context: Option<adk_core::RequestContext>,
79    cancellation_token: Option<CancellationToken>,
80    intra_compactor: Option<Arc<crate::intra_compaction::IntraInvocationCompactor>>,
81    /// Per-session cancellation tokens for the interrupt API.
82    /// Each `run()` call registers a token here; `interrupt()` cancels it.
83    active_sessions: Arc<std::sync::Mutex<std::collections::HashMap<String, CancellationToken>>>,
84}
85
86impl Runner {
87    /// Create a typestate builder for constructing a `Runner`.
88    ///
89    /// The builder enforces at compile time that the three required fields
90    /// (`app_name`, `agent`, `session_service`) are set before `build()` is
91    /// callable.
92    ///
93    /// # Example
94    ///
95    /// ```rust,ignore
96    /// let runner = Runner::builder()
97    ///     .app_name("my-app")
98    ///     .agent(agent)
99    ///     .session_service(session_service)
100    ///     .build()?;
101    /// ```
102    pub fn builder() -> crate::builder::RunnerConfigBuilder<
103        crate::builder::NoAppName,
104        crate::builder::NoAgent,
105        crate::builder::NoSessionService,
106    > {
107        crate::builder::RunnerConfigBuilder::new()
108    }
109
110    pub fn new(config: RunnerConfig) -> Result<Self> {
111        let run_config = config.run_config.unwrap_or_default();
112
113        // When a cache-capable model is provided but no explicit cache config,
114        // use the default ContextCacheConfig to enable caching automatically.
115        let effective_cache_config = config
116            .context_cache_config
117            .or_else(|| config.cache_capable.as_ref().map(|_| ContextCacheConfig::default()));
118
119        let cache_manager = effective_cache_config
120            .as_ref()
121            .map(|c| Arc::new(tokio::sync::Mutex::new(CacheManager::new(c.clone()))));
122
123        let intra_compactor = config.intra_compaction_config.as_ref().and_then(|ic_config| {
124            config.intra_compaction_summarizer.as_ref().map(|summarizer| {
125                Arc::new(crate::intra_compaction::IntraInvocationCompactor::new(
126                    ic_config.clone(),
127                    summarizer.clone(),
128                ))
129            })
130        });
131
132        Ok(Self {
133            app_name: config.app_name,
134            root_agent: config.agent,
135            session_service: config.session_service,
136            #[cfg(feature = "artifacts")]
137            artifact_service: config.artifact_service,
138            memory_service: config.memory_service,
139            #[cfg(feature = "plugins")]
140            plugin_manager: config.plugin_manager,
141            #[cfg(feature = "skills")]
142            skill_injector: None,
143            run_config,
144            compaction_config: config.compaction_config,
145            context_cache_config: effective_cache_config,
146            cache_capable: config.cache_capable,
147            cache_manager,
148            request_context: config.request_context,
149            cancellation_token: config.cancellation_token,
150            intra_compactor,
151            active_sessions: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
152        })
153    }
154
155    /// Enable skill injection using a pre-built injector.
156    ///
157    /// Skill injection runs before plugin `on_user_message` callbacks.
158    #[cfg(feature = "skills")]
159    pub fn with_skill_injector(mut self, injector: SkillInjector) -> Self {
160        self.skill_injector = Some(Arc::new(injector));
161        self
162    }
163
164    /// Enable skill injection by auto-loading `.skills/` from the given root path.
165    #[cfg(feature = "skills")]
166    #[deprecated(note = "Use with_auto_skills_mut instead")]
167    pub fn with_auto_skills(
168        mut self,
169        root: impl AsRef<std::path::Path>,
170        config: SkillInjectorConfig,
171    ) -> adk_skill::SkillResult<Self> {
172        self.with_auto_skills_mut(root, config)?;
173        Ok(self)
174    }
175
176    /// Enable skill injection by auto-loading `.skills/` from the given root path.
177    ///
178    /// Unlike [`with_auto_skills`](Self::with_auto_skills), this method borrows
179    /// the Runner mutably instead of consuming it. On error, the Runner remains
180    /// valid with no skill injector configured.
181    #[cfg(feature = "skills")]
182    pub fn with_auto_skills_mut(
183        &mut self,
184        root: impl AsRef<std::path::Path>,
185        config: SkillInjectorConfig,
186    ) -> adk_skill::SkillResult<()> {
187        let injector = SkillInjector::from_root(root, config)?;
188        self.skill_injector = Some(Arc::new(injector));
189        Ok(())
190    }
191
192    pub async fn run(
193        &self,
194        user_id: UserId,
195        session_id: SessionId,
196        user_content: Content,
197    ) -> Result<EventStream> {
198        let app_name = self.app_name.clone();
199        let typed_app_name = AppName::try_from(app_name.clone())?;
200        let session_service = self.session_service.clone();
201        let root_agent = self.root_agent.clone();
202        #[cfg(feature = "artifacts")]
203        let artifact_service = self.artifact_service.clone();
204        let memory_service = self.memory_service.clone();
205        #[cfg(feature = "plugins")]
206        let plugin_manager = self.plugin_manager.clone();
207        #[cfg(feature = "skills")]
208        let skill_injector = self.skill_injector.clone();
209        let mut run_config = self.run_config.clone();
210        let compaction_config = self.compaction_config.clone();
211        let context_cache_config = self.context_cache_config.clone();
212        let cache_capable = self.cache_capable.clone();
213        let cache_manager_ref = self.cache_manager.clone();
214        let request_context = self.request_context.clone();
215        let cancellation_token = self.cancellation_token.clone();
216        let intra_compactor = self.intra_compactor.clone();
217
218        // Register a per-session cancellation token for the interrupt API.
219        // If a global token is configured, create a child token so that
220        // either global cancellation OR per-session interrupt stops this run.
221        let session_token = CancellationToken::new();
222        let session_id_str = session_id.as_str().to_string();
223        {
224            let mut sessions = self.active_sessions.lock().unwrap();
225            sessions.insert(session_id_str.clone(), session_token.clone());
226        }
227        let active_sessions = self.active_sessions.clone();
228
229        // Effective token: cancelled if either the global token or the session token fires
230        let effective_token = if let Some(ref global) = cancellation_token {
231            let combined = CancellationToken::new();
232            let combined_clone = combined.clone();
233            let global_clone = global.clone();
234            let session_clone = session_token.clone();
235            // Watch both tokens — cancel the combined token when either fires
236            let combined_for_global = combined_clone.clone();
237            tokio::spawn(async move {
238                global_clone.cancelled().await;
239                combined_for_global.cancel();
240            });
241            let combined_for_session = combined_clone;
242            tokio::spawn(async move {
243                session_clone.cancelled().await;
244                combined_for_session.cancel();
245            });
246            Some(combined)
247        } else {
248            Some(session_token.clone())
249        };
250
251        let s = stream! {
252            // Clean up session tracking when the stream ends.
253            // We use a simple struct with Drop to ensure cleanup even on early return.
254            struct SessionCleanup {
255                active_sessions: Arc<std::sync::Mutex<std::collections::HashMap<String, CancellationToken>>>,
256                session_id: String,
257            }
258            impl Drop for SessionCleanup {
259                fn drop(&mut self) {
260                    let mut sessions = self.active_sessions.lock().unwrap();
261                    sessions.remove(&self.session_id);
262                }
263            }
264            let _cleanup = SessionCleanup {
265                active_sessions: active_sessions.clone(),
266                session_id: session_id_str,
267            };
268
269            // Use the effective token (combines global + per-session)
270            let cancellation_token = effective_token;
271            // Get or create session
272            let session = match session_service
273                .get(adk_session::GetRequest {
274                    app_name: app_name.clone(),
275                    user_id: user_id.to_string(),
276                    session_id: session_id.to_string(),
277                    num_recent_events: run_config.history_max_events,
278                    after: None,
279                })
280                .await
281            {
282                Ok(s) => s,
283                Err(e) => {
284                    yield Err(e);
285                    return;
286                }
287            };
288
289            // Find which agent should handle this request
290            let agent_to_run = Self::find_agent_to_run(&root_agent, session.as_ref());
291
292            // Clone services for potential reuse in transfer
293            #[cfg(feature = "artifacts")]
294            let artifact_service_clone = artifact_service.clone();
295            let memory_service_clone = memory_service.clone();
296
297            // Create invocation context with MutableSession
298            let invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
299            #[cfg(any(feature = "skills", feature = "plugins"))]
300            let mut effective_user_content = user_content.clone();
301            #[cfg(not(any(feature = "skills", feature = "plugins")))]
302            let effective_user_content = user_content.clone();
303            #[cfg(feature = "skills")]
304            let mut selected_skill_name = String::new();
305            #[cfg(not(feature = "skills"))]
306            let selected_skill_name = String::new();
307            #[cfg(feature = "skills")]
308            let mut selected_skill_id = String::new();
309            #[cfg(not(feature = "skills"))]
310            let selected_skill_id = String::new();
311
312            #[cfg(feature = "skills")]
313            if let Some(injector) = skill_injector.as_ref() {
314                if let Some(matched) = adk_skill::apply_skill_injection(
315                    &mut effective_user_content,
316                    injector.index(),
317                    injector.policy(),
318                    injector.max_injected_chars(),
319                ) {
320                    selected_skill_name = matched.skill.name;
321                    selected_skill_id = matched.skill.id;
322                }
323            }
324
325            let mut invocation_ctx = match InvocationContext::new_typed(
326                invocation_id.clone(),
327                agent_to_run.clone(),
328                user_id.clone(),
329                typed_app_name.clone(),
330                session_id.clone(),
331                effective_user_content.clone(),
332                Arc::from(session),
333            ) {
334                Ok(ctx) => ctx,
335                Err(e) => {
336                    yield Err(e);
337                    return;
338                }
339            };
340
341            // Add optional services
342            #[cfg(feature = "artifacts")]
343            if let Some(service) = artifact_service {
344                // Wrap service with ScopedArtifacts to bind session context
345                let scoped = adk_artifact::ScopedArtifacts::new(
346                    service,
347                    app_name.clone(),
348                    user_id.to_string(),
349                    session_id.to_string(),
350                );
351                invocation_ctx = invocation_ctx.with_artifacts(Arc::new(scoped));
352            }
353            if let Some(memory) = memory_service {
354                invocation_ctx = invocation_ctx.with_memory(memory);
355            }
356
357            // Apply run config (streaming mode, etc.)
358            invocation_ctx = invocation_ctx.with_run_config(run_config.clone());
359
360            // Apply request context from auth middleware bridge if present
361            if let Some(rc) = request_context.clone() {
362                invocation_ctx = invocation_ctx.with_request_context(rc);
363            }
364
365            let mut ctx = Arc::new(invocation_ctx);
366
367            #[cfg(feature = "plugins")]
368            if let Some(manager) = plugin_manager.as_ref() {
369                match manager
370                    .run_before_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>)
371                    .await
372                {
373                    Ok(Some(content)) => {
374                        let mut early_event = adk_core::Event::new(ctx.invocation_id());
375                        early_event.author = agent_to_run.name().to_string();
376                        early_event.llm_response.content = Some(content);
377
378                        ctx.mutable_session().append_event(early_event.clone());
379                        if let Err(e) = session_service.append_event(ctx.session_id(), early_event.clone()).await {
380                            yield Err(e);
381                            return;
382                        }
383
384                        yield Ok(early_event);
385                        manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
386                        return;
387                    }
388                    Ok(None) => {}
389                    Err(e) => {
390                        manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
391                        yield Err(e);
392                        return;
393                    }
394                }
395
396                match manager
397                    .run_on_user_message(
398                        ctx.clone() as Arc<dyn adk_core::InvocationContext>,
399                        effective_user_content.clone(),
400                    )
401                    .await
402                {
403                    Ok(Some(modified)) => {
404                        effective_user_content = modified;
405
406                        let mut refreshed_ctx = match InvocationContext::with_mutable_session(
407                            ctx.invocation_id().to_string(),
408                            agent_to_run.clone(),
409                            ctx.user_id().to_string(),
410                            ctx.app_name().to_string(),
411                            ctx.session_id().to_string(),
412                            effective_user_content.clone(),
413                            ctx.mutable_session().clone(),
414                        ) {
415                            Ok(ctx) => ctx,
416                            Err(e) => {
417                                yield Err(e);
418                                return;
419                            }
420                        };
421
422                        #[cfg(feature = "artifacts")]
423                        if let Some(service) = artifact_service_clone.clone() {
424                            let scoped = adk_artifact::ScopedArtifacts::new(
425                                service,
426                                ctx.app_name().to_string(),
427                                ctx.user_id().to_string(),
428                                ctx.session_id().to_string(),
429                            );
430                            refreshed_ctx = refreshed_ctx.with_artifacts(Arc::new(scoped));
431                        }
432                        if let Some(memory) = memory_service_clone.clone() {
433                            refreshed_ctx = refreshed_ctx.with_memory(memory);
434                        }
435                        refreshed_ctx = refreshed_ctx.with_run_config(run_config.clone());
436                        if let Some(rc) = request_context.clone() {
437                            refreshed_ctx = refreshed_ctx.with_request_context(rc);
438                        }
439                        ctx = Arc::new(refreshed_ctx);
440                    }
441                    Ok(None) => {}
442                    Err(e) => {
443                        if let Some(manager) = plugin_manager.as_ref() {
444                            manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
445                        }
446                        yield Err(e);
447                        return;
448                    }
449                }
450            }
451
452            // Append user message to session service (persistent storage)
453            let mut user_event = adk_core::Event::new(ctx.invocation_id());
454            user_event.author = "user".to_string();
455            user_event.llm_response.content = Some(effective_user_content.clone());
456
457            // Also add to mutable session for immediate visibility
458            // Note: adk_session::Event is a re-export of adk_core::Event, so we can use it directly
459            ctx.mutable_session().append_event(user_event.clone());
460
461            if let Err(e) = session_service.append_event(ctx.session_id(), user_event).await {
462                #[cfg(feature = "plugins")]
463                if let Some(manager) = plugin_manager.as_ref() {
464                    manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
465                }
466                yield Err(e);
467                return;
468            }
469
470            // ===== CONTEXT CACHE LIFECYCLE =====
471            // If context caching is configured and a cache-capable model is available,
472            // create or refresh the cached content before agent execution.
473            // Cache failures are non-fatal — log a warning and proceed without cache.
474            if let (Some(cm_mutex), Some(cache_model)) = (&cache_manager_ref, &cache_capable) {
475                let should_refresh_cache = {
476                    let cm = cm_mutex.lock().await;
477                    cm.is_enabled() && (cm.active_cache_name().is_none() || cm.needs_refresh())
478                };
479
480                if should_refresh_cache {
481                    // Gather system instruction from the agent's description
482                    // (the full instruction is resolved inside the agent, but the
483                    // description provides a reasonable proxy for cache keying).
484                    let system_instruction = agent_to_run.description().to_string();
485                    let tools = std::collections::HashMap::new();
486                    let ttl = context_cache_config.as_ref().map_or(600, |c| c.ttl_seconds);
487
488                    match cache_model.create_cache(&system_instruction, &tools, ttl).await {
489                        Ok(name) => {
490                            let old_cache = {
491                                let mut cm = cm_mutex.lock().await;
492                                let old = cm.clear_active_cache();
493                                cm.set_active_cache(name);
494                                old
495                            };
496
497                            if let Some(old) = old_cache {
498                                if let Err(e) = cache_model.delete_cache(&old).await {
499                                    tracing::warn!(
500                                        old_cache = %old,
501                                        error = %e,
502                                        "failed to delete old cache, proceeding with new cache"
503                                    );
504                                }
505                            }
506                        }
507                        Err(e) => {
508                            tracing::warn!(
509                                error = %e,
510                                "cache creation failed, proceeding without cache"
511                            );
512                        }
513                    }
514                }
515
516                // Attach cache name to run config so agents can use it.
517                let cache_name = {
518                    let mut cm = cm_mutex.lock().await;
519                    if cm.is_enabled() {
520                        cm.record_invocation().map(str::to_string)
521                    } else {
522                        None
523                    }
524                };
525
526                if let Some(cache_name) = cache_name {
527                    run_config.cached_content = Some(cache_name);
528                    // Rebuild the invocation context with the updated run config.
529                    let mut refreshed_ctx = match InvocationContext::with_mutable_session(
530                        ctx.invocation_id().to_string(),
531                        agent_to_run.clone(),
532                        ctx.user_id().to_string(),
533                        ctx.app_name().to_string(),
534                        ctx.session_id().to_string(),
535                        effective_user_content.clone(),
536                        ctx.mutable_session().clone(),
537                    ) {
538                        Ok(ctx) => ctx,
539                        Err(e) => {
540                            yield Err(e);
541                            return;
542                        }
543                    };
544                    #[cfg(feature = "artifacts")]
545                    if let Some(service) = artifact_service_clone.clone() {
546                        let scoped = adk_artifact::ScopedArtifacts::new(
547                            service,
548                            ctx.app_name().to_string(),
549                            ctx.user_id().to_string(),
550                            ctx.session_id().to_string(),
551                        );
552                        refreshed_ctx = refreshed_ctx.with_artifacts(Arc::new(scoped));
553                    }
554                    if let Some(memory) = memory_service_clone.clone() {
555                        refreshed_ctx = refreshed_ctx.with_memory(memory);
556                    }
557                    refreshed_ctx = refreshed_ctx.with_run_config(run_config.clone());
558                    if let Some(rc) = request_context.clone() {
559                        refreshed_ctx = refreshed_ctx.with_request_context(rc);
560                    }
561                    ctx = Arc::new(refreshed_ctx);
562                }
563            }
564
565            // ===== INTRA-INVOCATION COMPACTION =====
566            // If intra-compaction is configured, check if the session events
567            // exceed the token threshold and compact them before the agent runs.
568            if let Some(ref compactor) = intra_compactor {
569                compactor.reset_cycle();
570                let session_events = ctx.mutable_session().as_ref().events_snapshot();
571                match compactor.maybe_compact(&session_events).await {
572                    Ok(Some(compacted_events)) => {
573                        ctx.mutable_session().replace_events(compacted_events);
574                        tracing::info!("intra-invocation compaction applied before agent execution");
575                    }
576                    Ok(None) => {} // No compaction needed
577                    Err(e) => {
578                        tracing::warn!(error = %e, "intra-invocation compaction check failed");
579                    }
580                }
581            }
582
583            // Run the agent with instrumentation (ADK-Go style attributes)
584            let agent_span = tracing::info_span!(
585                "agent.execute",
586                "gcp.vertex.agent.invocation_id" = ctx.invocation_id(),
587                "gcp.vertex.agent.session_id" = ctx.session_id(),
588                "gcp.vertex.agent.event_id" = ctx.invocation_id(), // Use invocation_id as event_id for agent spans
589                "gen_ai.conversation.id" = ctx.session_id(),
590                "adk.app_name" = ctx.app_name(),
591                "adk.user_id" = ctx.user_id(),
592                "agent.name" = %agent_to_run.name(),
593                "adk.skills.selected_name" = %selected_skill_name,
594                "adk.skills.selected_id" = %selected_skill_id
595            );
596
597            let mut agent_stream = match agent_to_run.run(ctx.clone()).instrument(agent_span).await {
598                Ok(s) => s,
599                Err(e) => {
600                    #[cfg(feature = "plugins")]
601                    if let Some(manager) = plugin_manager.as_ref() {
602                        manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
603                    }
604                    yield Err(e);
605                    return;
606                }
607            };
608
609            // Stream events and check for transfers
610            use futures::StreamExt;
611            let mut transfer_target: Option<String> = None;
612
613            while let Some(result) = {
614                if let Some(token) = cancellation_token.as_ref() {
615                    if token.is_cancelled() {
616                        #[cfg(feature = "plugins")]
617                        if let Some(manager) = plugin_manager.as_ref() {
618                            manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
619                        }
620                        return;
621                    }
622                }
623                agent_stream.next().await
624            } {
625                match result {
626                    Ok(event) => {
627                        #[cfg(feature = "plugins")]
628                        let mut event = event;
629
630                        #[cfg(feature = "plugins")]
631                        if let Some(manager) = plugin_manager.as_ref() {
632                            match manager
633                                .run_on_event(
634                                    ctx.clone() as Arc<dyn adk_core::InvocationContext>,
635                                    event.clone(),
636                                )
637                                .await
638                            {
639                                Ok(Some(modified)) => {
640                                    event = modified;
641                                }
642                                Ok(None) => {}
643                                Err(e) => {
644                                    manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
645                                    yield Err(e);
646                                    return;
647                                }
648                            }
649                        }
650
651                        // Check for transfer action
652                        if let Some(target) = &event.actions.transfer_to_agent {
653                            transfer_target = Some(target.clone());
654                        }
655
656                        // CRITICAL: Apply state_delta to the mutable session immediately.
657                        // This is the key fix for state propagation between sequential agents.
658                        // When an agent sets output_key, it emits an event with state_delta.
659                        // We must apply this to the mutable session so downstream agents
660                        // can read the value via ctx.session().state().get().
661                        if !event.actions.state_delta.is_empty() {
662                            ctx.mutable_session().apply_state_delta(&event.actions.state_delta);
663                        }
664
665                        // Also add the event to the mutable session's event list
666                        ctx.mutable_session().append_event(event.clone());
667
668                        // Append event to session service (persistent storage)
669                        // Skip partial streaming chunks — only persist the final
670                        // event. Streaming chunks share the same event ID, so
671                        // persisting each one would violate the primary key
672                        // constraint. The final chunk (partial=false) carries the
673                        // complete accumulated content.
674                        if !event.llm_response.partial {
675                            if let Err(e) = session_service.append_event(ctx.session_id(), event.clone()).await {
676                                #[cfg(feature = "plugins")]
677                                if let Some(manager) = plugin_manager.as_ref() {
678                                    manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
679                                }
680                                yield Err(e);
681                                return;
682                            }
683                        }
684                        yield Ok(event);
685                    }
686                    Err(e) => {
687                        #[cfg(feature = "plugins")]
688                        if let Some(manager) = plugin_manager.as_ref() {
689                            manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
690                        }
691                        yield Err(e);
692                        return;
693                    }
694                }
695            }
696
697            // ===== TRANSFER LOOP =====
698            // Support multi-hop transfers with a max-depth guard.
699            // When an agent emits transfer_to_agent, the runner resolves the
700            // target from the root agent tree, computes transfer_targets
701            // (parent + peers) for the new agent, and runs it. This repeats
702            // until no further transfer is requested or the depth limit is hit.
703            const MAX_TRANSFER_DEPTH: u32 = 10;
704            let mut transfer_depth: u32 = 0;
705            let mut current_transfer_target = transfer_target;
706
707            while let Some(target_name) = current_transfer_target.take() {
708                transfer_depth += 1;
709                if transfer_depth > MAX_TRANSFER_DEPTH {
710                    tracing::warn!(
711                        depth = transfer_depth,
712                        target = %target_name,
713                        "max transfer depth exceeded, stopping transfer chain"
714                    );
715                    break;
716                }
717
718                let target_agent = match Self::find_agent(&root_agent, &target_name) {
719                    Some(a) => a,
720                    None => {
721                        tracing::warn!(target = %target_name, "transfer target not found in agent tree");
722                        break;
723                    }
724                };
725
726                // Compute transfer_targets for the target agent:
727                // - parent: the agent that transferred to it (or root if applicable)
728                // - peers: siblings in the agent tree
729                // - children: handled by the agent itself via sub_agents()
730                let (parent_name, peer_names) = Self::compute_transfer_context(&root_agent, &target_name);
731
732                let mut transfer_run_config = run_config.clone();
733                let mut targets = Vec::new();
734                if let Some(ref parent) = parent_name {
735                    targets.push(parent.clone());
736                }
737                targets.extend(peer_names);
738                transfer_run_config.transfer_targets = targets;
739                transfer_run_config.parent_agent = parent_name;
740
741                // For transfers, we reuse the same mutable session to preserve state
742                let transfer_invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
743                let mut transfer_ctx = match InvocationContext::with_mutable_session(
744                    transfer_invocation_id.clone(),
745                    target_agent.clone(),
746                    ctx.user_id().to_string(),
747                    ctx.app_name().to_string(),
748                    ctx.session_id().to_string(),
749                    effective_user_content.clone(),
750                    ctx.mutable_session().clone(),
751                ) {
752                    Ok(ctx) => ctx,
753                    Err(e) => {
754                        yield Err(e);
755                        return;
756                    }
757                };
758
759                #[cfg(feature = "artifacts")]
760                if let Some(ref service) = artifact_service_clone {
761                    let scoped = adk_artifact::ScopedArtifacts::new(
762                        service.clone(),
763                        ctx.app_name().to_string(),
764                        ctx.user_id().to_string(),
765                        ctx.session_id().to_string(),
766                    );
767                    transfer_ctx = transfer_ctx.with_artifacts(Arc::new(scoped));
768                }
769                if let Some(ref memory) = memory_service_clone {
770                    transfer_ctx = transfer_ctx.with_memory(memory.clone());
771                }
772                transfer_ctx = transfer_ctx.with_run_config(transfer_run_config);
773                if let Some(rc) = request_context.clone() {
774                    transfer_ctx = transfer_ctx.with_request_context(rc);
775                }
776
777                let transfer_ctx = Arc::new(transfer_ctx);
778
779                // Run the transferred agent
780                let mut transfer_stream = match target_agent.run(transfer_ctx.clone()).await {
781                    Ok(s) => s,
782                    Err(e) => {
783                        #[cfg(feature = "plugins")]
784                        if let Some(manager) = plugin_manager.as_ref() {
785                            manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
786                        }
787                        yield Err(e);
788                        return;
789                    }
790                };
791
792                // Stream events from the transferred agent, capturing any further transfer
793                while let Some(result) = {
794                    if let Some(token) = cancellation_token.as_ref() {
795                        if token.is_cancelled() {
796                            #[cfg(feature = "plugins")]
797                            if let Some(manager) = plugin_manager.as_ref() {
798                                manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
799                            }
800                            return;
801                        }
802                    }
803                    transfer_stream.next().await
804                } {
805                    match result {
806                        Ok(event) => {
807                            #[cfg(feature = "plugins")]
808                            let mut event = event;
809                            #[cfg(feature = "plugins")]
810                            if let Some(manager) = plugin_manager.as_ref() {
811                                match manager
812                                    .run_on_event(
813                                        transfer_ctx.clone() as Arc<dyn adk_core::InvocationContext>,
814                                        event.clone(),
815                                    )
816                                    .await
817                                {
818                                    Ok(Some(modified)) => {
819                                        event = modified;
820                                    }
821                                    Ok(None) => {}
822                                    Err(e) => {
823                                        manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
824                                        yield Err(e);
825                                        return;
826                                    }
827                                }
828                            }
829
830                            // Capture further transfer requests
831                            if let Some(target) = &event.actions.transfer_to_agent {
832                                current_transfer_target = Some(target.clone());
833                            }
834
835                            // Apply state delta for transferred agent too
836                            if !event.actions.state_delta.is_empty() {
837                                transfer_ctx.mutable_session().apply_state_delta(&event.actions.state_delta);
838                            }
839
840                            // Add to mutable session
841                            transfer_ctx.mutable_session().append_event(event.clone());
842
843                            if !event.llm_response.partial {
844                                if let Err(e) = session_service.append_event(ctx.session_id(), event.clone()).await {
845                                    #[cfg(feature = "plugins")]
846                                    if let Some(manager) = plugin_manager.as_ref() {
847                                        manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
848                                    }
849                                    yield Err(e);
850                                    return;
851                                }
852                            }
853                            yield Ok(event);
854                        }
855                        Err(e) => {
856                            #[cfg(feature = "plugins")]
857                            if let Some(manager) = plugin_manager.as_ref() {
858                                manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
859                            }
860                            yield Err(e);
861                            return;
862                        }
863                    }
864                }
865            }
866
867            // ===== CONTEXT COMPACTION =====
868            // After all events have been processed, check if compaction should trigger.
869            // This runs in the background after the invocation completes.
870            if let Some(ref compaction_cfg) = compaction_config {
871                let event_count = ctx.mutable_session().as_ref().events_len();
872
873                if event_count > 0 {
874                    let all_events = ctx.mutable_session().as_ref().events_snapshot();
875                    let invocation_count = all_events.iter().filter(|e| e.author == "user").count()
876                        as u32;
877
878                    if invocation_count > 0
879                        && invocation_count % compaction_cfg.compaction_interval == 0
880                    {
881                        // Determine the window of events to compact
882                        // We compact all events except the most recent overlap_size invocations
883                        let overlap = compaction_cfg.overlap_size as usize;
884
885                        // Find the boundary: keep the last `overlap` user messages and everything after
886                        let user_msg_indices: Vec<usize> = all_events.iter()
887                            .enumerate()
888                            .filter(|(_, e)| e.author == "user")
889                            .map(|(i, _)| i)
890                            .collect();
891
892                        // Keep the last `overlap` user messages intact.
893                        // When overlap is 0, compact everything.
894                        let compact_up_to = if overlap == 0 {
895                            all_events.len()
896                        } else if user_msg_indices.len() > overlap {
897                            // Compact up to (but not including) the overlap-th-from-last user message
898                            user_msg_indices[user_msg_indices.len() - overlap]
899                        } else {
900                            // Not enough user messages to satisfy overlap — skip compaction
901                            0
902                        };
903
904                        if compact_up_to > 0 {
905                            let events_to_compact = &all_events[..compact_up_to];
906
907                            match compaction_cfg.summarizer.summarize_events(events_to_compact).await {
908                                Ok(Some(compaction_event)) => {
909                                    // Persist the compaction event
910                                    if let Err(e) = session_service.append_event(
911                                        ctx.session_id(),
912                                        compaction_event.clone(),
913                                    ).await {
914                                        tracing::warn!(error = %e, "Failed to persist compaction event");
915                                    } else {
916                                        tracing::info!(
917                                            compacted_events = compact_up_to,
918                                            "Context compaction completed"
919                                        );
920                                    }
921                                }
922                                Ok(None) => {
923                                    tracing::debug!("Compaction summarizer returned no result");
924                                }
925                                Err(e) => {
926                                    // Compaction failure is non-fatal — log and continue
927                                    tracing::warn!(error = %e, "Context compaction failed");
928                                }
929                            }
930                        }
931                    }
932                }
933            }
934
935            #[cfg(feature = "plugins")]
936            if let Some(manager) = plugin_manager.as_ref() {
937                manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
938            }
939        };
940
941        Ok(Box::pin(s))
942    }
943
944    /// Convenience method that accepts string arguments.
945    ///
946    /// Converts `user_id` and `session_id` to their typed equivalents
947    /// and delegates to [`run()`](Self::run).
948    ///
949    /// # Errors
950    ///
951    /// Returns an error if either string fails identity validation
952    /// (empty, contains null bytes, or exceeds length limit).
953    pub async fn run_str(
954        &self,
955        user_id: &str,
956        session_id: &str,
957        user_content: Content,
958    ) -> Result<EventStream> {
959        let user_id = UserId::try_from(user_id)?;
960        let session_id = SessionId::try_from(session_id)?;
961        self.run(user_id, session_id, user_content).await
962    }
963
964    /// Interrupt a running agent for the given session.
965    ///
966    /// Cancels the agent's current execution within the event loop. Events
967    /// already produced and appended to the session are preserved — only
968    /// future events are stopped. The caller can then issue a new `run()`
969    /// call with a different instruction to redirect the agent.
970    ///
971    /// Returns `true` if a running session was found and interrupted,
972    /// `false` if no active run exists for that session ID.
973    ///
974    /// # Example
975    ///
976    /// ```rust,ignore
977    /// // Start a run in the background
978    /// let mut stream = runner.run(user_id, session_id, content).await?;
979    /// tokio::spawn(async move { while stream.next().await.is_some() {} });
980    ///
981    /// // Later, interrupt it
982    /// let was_running = runner.interrupt("session-1");
983    /// assert!(was_running);
984    ///
985    /// // Redirect with a new instruction
986    /// let mut stream = runner.run(user_id, session_id, new_content).await?;
987    /// ```
988    pub fn interrupt(&self, session_id: &str) -> bool {
989        let sessions = self.active_sessions.lock().unwrap();
990        if let Some(token) = sessions.get(session_id) {
991            tracing::info!(session.id = session_id, "interrupting running agent");
992            token.cancel();
993            true
994        } else {
995            tracing::debug!(session.id = session_id, "no active run to interrupt");
996            false
997        }
998    }
999
1000    /// Returns the session IDs of all currently running agent executions.
1001    pub fn active_session_ids(&self) -> Vec<String> {
1002        let sessions = self.active_sessions.lock().unwrap();
1003        sessions.keys().cloned().collect()
1004    }
1005
1006    /// Find which agent should handle the request based on session history
1007    pub fn find_agent_to_run(
1008        root_agent: &Arc<dyn Agent>,
1009        session: &dyn adk_session::Session,
1010    ) -> Arc<dyn Agent> {
1011        // Look at recent events to find last agent that responded
1012        let events = session.events();
1013        for i in (0..events.len()).rev() {
1014            if let Some(event) = events.at(i) {
1015                // Check for explicit transfer
1016                if let Some(target_name) = &event.actions.transfer_to_agent {
1017                    if let Some(agent) = Self::find_agent(root_agent, target_name) {
1018                        return agent;
1019                    }
1020                }
1021
1022                if event.author == "user" {
1023                    continue;
1024                }
1025
1026                // Try to find this agent in the tree
1027                if let Some(agent) = Self::find_agent(root_agent, &event.author) {
1028                    // Check if agent allows transfer up the tree
1029                    if Self::is_transferable(root_agent, &agent) {
1030                        return agent;
1031                    }
1032                }
1033            }
1034        }
1035
1036        // Default to root agent
1037        root_agent.clone()
1038    }
1039
1040    /// Check if an agent found in session history can be resumed for the next
1041    /// user message.
1042    ///
1043    /// This always returns `true` because the transfer-policy enforcement
1044    /// (`disallow_transfer_to_parent` / `disallow_transfer_to_peers`) is
1045    /// handled inside `LlmAgent::run()` when it builds the `transfer_to_agent`
1046    /// tool's valid-target list. The runner does not need to duplicate that
1047    /// check here — it only needs to know whether the agent is a valid
1048    /// resumption target, which it always is if it exists in the tree.
1049    fn is_transferable(_root_agent: &Arc<dyn Agent>, _agent: &Arc<dyn Agent>) -> bool {
1050        true
1051    }
1052
1053    /// Recursively search agent tree for agent with given name
1054    pub fn find_agent(current: &Arc<dyn Agent>, target_name: &str) -> Option<Arc<dyn Agent>> {
1055        if current.name() == target_name {
1056            return Some(current.clone());
1057        }
1058
1059        for sub_agent in current.sub_agents() {
1060            if let Some(found) = Self::find_agent(sub_agent, target_name) {
1061                return Some(found);
1062            }
1063        }
1064
1065        None
1066    }
1067
1068    /// Compute the parent name and peer names for a given agent in the tree.
1069    /// Returns `(parent_name, peer_names)`.
1070    ///
1071    /// Walks the agent tree to find the parent of `target_name`, then collects
1072    /// the parent's name and the sibling agent names (excluding the target itself).
1073    pub fn compute_transfer_context(
1074        root: &Arc<dyn Agent>,
1075        target_name: &str,
1076    ) -> (Option<String>, Vec<String>) {
1077        // If the target is the root itself, there's no parent or peers
1078        if root.name() == target_name {
1079            return (None, Vec::new());
1080        }
1081
1082        // BFS/DFS to find the parent of target_name
1083        fn find_parent(current: &Arc<dyn Agent>, target: &str) -> Option<Arc<dyn Agent>> {
1084            for sub in current.sub_agents() {
1085                if sub.name() == target {
1086                    return Some(current.clone());
1087                }
1088                if let Some(found) = find_parent(sub, target) {
1089                    return Some(found);
1090                }
1091            }
1092            None
1093        }
1094
1095        match find_parent(root, target_name) {
1096            Some(parent) => {
1097                let parent_name = parent.name().to_string();
1098                let peers: Vec<String> = parent
1099                    .sub_agents()
1100                    .iter()
1101                    .filter(|a| a.name() != target_name)
1102                    .map(|a| a.name().to_string())
1103                    .collect();
1104                (Some(parent_name), peers)
1105            }
1106            None => (None, Vec::new()),
1107        }
1108    }
1109}