Skip to main content

adk_runner/
runner.rs

1use crate::InvocationContext;
2use crate::cache::CacheManager;
3use adk_artifact::ArtifactService;
4use adk_core::{
5    Agent, AppName, CacheCapable, Content, ContextCacheConfig, EventStream, Memory,
6    ReadonlyContext, Result, RunConfig, SessionId, UserId,
7};
8use adk_plugin::PluginManager;
9use adk_session::SessionService;
10use adk_skill::{SkillInjector, SkillInjectorConfig};
11use async_stream::stream;
12use std::sync::Arc;
13use tokio_util::sync::CancellationToken;
14use tracing::Instrument;
15
16pub struct RunnerConfig {
17    pub app_name: String,
18    pub agent: Arc<dyn Agent>,
19    pub session_service: Arc<dyn SessionService>,
20    pub artifact_service: Option<Arc<dyn ArtifactService>>,
21    pub memory_service: Option<Arc<dyn Memory>>,
22    pub plugin_manager: Option<Arc<PluginManager>>,
23    /// Optional run configuration (streaming mode, etc.)
24    /// If not provided, uses default (SSE streaming)
25    #[allow(dead_code)]
26    pub run_config: Option<RunConfig>,
27    /// Optional context compaction configuration.
28    /// When set, the runner will periodically summarize older events
29    /// to reduce context size sent to the LLM.
30    pub compaction_config: Option<adk_core::EventsCompactionConfig>,
31    /// Optional context cache configuration for automatic prompt caching lifecycle.
32    /// When set alongside `cache_capable`, the runner will automatically create and
33    /// manage cached content resources for supported providers.
34    ///
35    /// When `cache_capable` is set but this field is `None`, the runner
36    /// automatically uses [`ContextCacheConfig::default()`] (4096 min tokens,
37    /// 600s TTL, refresh every 3 invocations).
38    pub context_cache_config: Option<ContextCacheConfig>,
39    /// Optional cache-capable model reference for automatic cache management.
40    /// Set this to the same model used by the agent if it supports caching.
41    pub cache_capable: Option<Arc<dyn CacheCapable>>,
42    /// Optional request context from the server's auth middleware bridge.
43    /// When set, the runner passes it to `InvocationContext` so that
44    /// `user_scopes()` and `user_id()` reflect the authenticated identity.
45    pub request_context: Option<adk_core::RequestContext>,
46    /// Optional cooperative cancellation token for externally managed runs.
47    pub cancellation_token: Option<CancellationToken>,
48    /// Optional intra-invocation compaction configuration.
49    /// When set, the runner estimates token count before each agent run
50    /// and triggers mid-invocation summarization when the threshold is exceeded.
51    pub intra_compaction_config: Option<adk_core::IntraCompactionConfig>,
52    /// Optional summarizer for intra-invocation compaction.
53    /// Required when `intra_compaction_config` is set.
54    pub intra_compaction_summarizer: Option<Arc<dyn adk_core::BaseEventsSummarizer>>,
55}
56
57pub struct Runner {
58    app_name: String,
59    root_agent: Arc<dyn Agent>,
60    session_service: Arc<dyn SessionService>,
61    artifact_service: Option<Arc<dyn ArtifactService>>,
62    memory_service: Option<Arc<dyn Memory>>,
63    plugin_manager: Option<Arc<PluginManager>>,
64    skill_injector: Option<Arc<SkillInjector>>,
65    run_config: RunConfig,
66    compaction_config: Option<adk_core::EventsCompactionConfig>,
67    context_cache_config: Option<ContextCacheConfig>,
68    cache_capable: Option<Arc<dyn CacheCapable>>,
69    cache_manager: Option<Arc<tokio::sync::Mutex<CacheManager>>>,
70    request_context: Option<adk_core::RequestContext>,
71    cancellation_token: Option<CancellationToken>,
72    intra_compactor: Option<Arc<crate::intra_compaction::IntraInvocationCompactor>>,
73    /// Per-session cancellation tokens for the interrupt API.
74    /// Each `run()` call registers a token here; `interrupt()` cancels it.
75    active_sessions: Arc<std::sync::Mutex<std::collections::HashMap<String, CancellationToken>>>,
76}
77
78impl Runner {
79    /// Create a typestate builder for constructing a `Runner`.
80    ///
81    /// The builder enforces at compile time that the three required fields
82    /// (`app_name`, `agent`, `session_service`) are set before `build()` is
83    /// callable.
84    ///
85    /// # Example
86    ///
87    /// ```rust,ignore
88    /// let runner = Runner::builder()
89    ///     .app_name("my-app")
90    ///     .agent(agent)
91    ///     .session_service(session_service)
92    ///     .build()?;
93    /// ```
94    pub fn builder() -> crate::builder::RunnerConfigBuilder<
95        crate::builder::NoAppName,
96        crate::builder::NoAgent,
97        crate::builder::NoSessionService,
98    > {
99        crate::builder::RunnerConfigBuilder::new()
100    }
101
102    pub fn new(config: RunnerConfig) -> Result<Self> {
103        let run_config = config.run_config.unwrap_or_default();
104
105        // When a cache-capable model is provided but no explicit cache config,
106        // use the default ContextCacheConfig to enable caching automatically.
107        let effective_cache_config = config
108            .context_cache_config
109            .or_else(|| config.cache_capable.as_ref().map(|_| ContextCacheConfig::default()));
110
111        let cache_manager = effective_cache_config
112            .as_ref()
113            .map(|c| Arc::new(tokio::sync::Mutex::new(CacheManager::new(c.clone()))));
114
115        let intra_compactor = config.intra_compaction_config.as_ref().and_then(|ic_config| {
116            config.intra_compaction_summarizer.as_ref().map(|summarizer| {
117                Arc::new(crate::intra_compaction::IntraInvocationCompactor::new(
118                    ic_config.clone(),
119                    summarizer.clone(),
120                ))
121            })
122        });
123
124        Ok(Self {
125            app_name: config.app_name,
126            root_agent: config.agent,
127            session_service: config.session_service,
128            artifact_service: config.artifact_service,
129            memory_service: config.memory_service,
130            plugin_manager: config.plugin_manager,
131            skill_injector: None,
132            run_config,
133            compaction_config: config.compaction_config,
134            context_cache_config: effective_cache_config,
135            cache_capable: config.cache_capable,
136            cache_manager,
137            request_context: config.request_context,
138            cancellation_token: config.cancellation_token,
139            intra_compactor,
140            active_sessions: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
141        })
142    }
143
144    /// Enable skill injection using a pre-built injector.
145    ///
146    /// Skill injection runs before plugin `on_user_message` callbacks.
147    pub fn with_skill_injector(mut self, injector: SkillInjector) -> Self {
148        self.skill_injector = Some(Arc::new(injector));
149        self
150    }
151
152    /// Enable skill injection by auto-loading `.skills/` from the given root path.
153    #[deprecated(note = "Use with_auto_skills_mut instead")]
154    pub fn with_auto_skills(
155        mut self,
156        root: impl AsRef<std::path::Path>,
157        config: SkillInjectorConfig,
158    ) -> adk_skill::SkillResult<Self> {
159        self.with_auto_skills_mut(root, config)?;
160        Ok(self)
161    }
162
163    /// Enable skill injection by auto-loading `.skills/` from the given root path.
164    ///
165    /// Unlike [`with_auto_skills`](Self::with_auto_skills), this method borrows
166    /// the Runner mutably instead of consuming it. On error, the Runner remains
167    /// valid with no skill injector configured.
168    pub fn with_auto_skills_mut(
169        &mut self,
170        root: impl AsRef<std::path::Path>,
171        config: SkillInjectorConfig,
172    ) -> adk_skill::SkillResult<()> {
173        let injector = SkillInjector::from_root(root, config)?;
174        self.skill_injector = Some(Arc::new(injector));
175        Ok(())
176    }
177
178    pub async fn run(
179        &self,
180        user_id: UserId,
181        session_id: SessionId,
182        user_content: Content,
183    ) -> Result<EventStream> {
184        let app_name = self.app_name.clone();
185        let typed_app_name = AppName::try_from(app_name.clone())?;
186        let session_service = self.session_service.clone();
187        let root_agent = self.root_agent.clone();
188        let artifact_service = self.artifact_service.clone();
189        let memory_service = self.memory_service.clone();
190        let plugin_manager = self.plugin_manager.clone();
191        let skill_injector = self.skill_injector.clone();
192        let mut run_config = self.run_config.clone();
193        let compaction_config = self.compaction_config.clone();
194        let context_cache_config = self.context_cache_config.clone();
195        let cache_capable = self.cache_capable.clone();
196        let cache_manager_ref = self.cache_manager.clone();
197        let request_context = self.request_context.clone();
198        let cancellation_token = self.cancellation_token.clone();
199        let intra_compactor = self.intra_compactor.clone();
200
201        // Register a per-session cancellation token for the interrupt API.
202        // If a global token is configured, create a child token so that
203        // either global cancellation OR per-session interrupt stops this run.
204        let session_token = CancellationToken::new();
205        let session_id_str = session_id.as_str().to_string();
206        {
207            let mut sessions = self.active_sessions.lock().unwrap();
208            sessions.insert(session_id_str.clone(), session_token.clone());
209        }
210        let active_sessions = self.active_sessions.clone();
211
212        // Effective token: cancelled if either the global token or the session token fires
213        let effective_token = if let Some(ref global) = cancellation_token {
214            let combined = CancellationToken::new();
215            let combined_clone = combined.clone();
216            let global_clone = global.clone();
217            let session_clone = session_token.clone();
218            // Watch both tokens — cancel the combined token when either fires
219            let combined_for_global = combined_clone.clone();
220            tokio::spawn(async move {
221                global_clone.cancelled().await;
222                combined_for_global.cancel();
223            });
224            let combined_for_session = combined_clone;
225            tokio::spawn(async move {
226                session_clone.cancelled().await;
227                combined_for_session.cancel();
228            });
229            Some(combined)
230        } else {
231            Some(session_token.clone())
232        };
233
234        let s = stream! {
235            // Clean up session tracking when the stream ends.
236            // We use a simple struct with Drop to ensure cleanup even on early return.
237            struct SessionCleanup {
238                active_sessions: Arc<std::sync::Mutex<std::collections::HashMap<String, CancellationToken>>>,
239                session_id: String,
240            }
241            impl Drop for SessionCleanup {
242                fn drop(&mut self) {
243                    let mut sessions = self.active_sessions.lock().unwrap();
244                    sessions.remove(&self.session_id);
245                }
246            }
247            let _cleanup = SessionCleanup {
248                active_sessions: active_sessions.clone(),
249                session_id: session_id_str,
250            };
251
252            // Use the effective token (combines global + per-session)
253            let cancellation_token = effective_token;
254            // Get or create session
255            let session = match session_service
256                .get(adk_session::GetRequest {
257                    app_name: app_name.clone(),
258                    user_id: user_id.to_string(),
259                    session_id: session_id.to_string(),
260                    num_recent_events: None,
261                    after: None,
262                })
263                .await
264            {
265                Ok(s) => s,
266                Err(e) => {
267                    yield Err(e);
268                    return;
269                }
270            };
271
272            // Find which agent should handle this request
273            let agent_to_run = Self::find_agent_to_run(&root_agent, session.as_ref());
274
275            // Clone services for potential reuse in transfer
276            let artifact_service_clone = artifact_service.clone();
277            let memory_service_clone = memory_service.clone();
278
279            // Create invocation context with MutableSession
280            let invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
281            let mut effective_user_content = user_content.clone();
282            let mut selected_skill_name = String::new();
283            let mut selected_skill_id = String::new();
284
285            if let Some(injector) = skill_injector.as_ref() {
286                if let Some(matched) = adk_skill::apply_skill_injection(
287                    &mut effective_user_content,
288                    injector.index(),
289                    injector.policy(),
290                    injector.max_injected_chars(),
291                ) {
292                    selected_skill_name = matched.skill.name;
293                    selected_skill_id = matched.skill.id;
294                }
295            }
296
297            let mut invocation_ctx = match InvocationContext::new_typed(
298                invocation_id.clone(),
299                agent_to_run.clone(),
300                user_id.clone(),
301                typed_app_name.clone(),
302                session_id.clone(),
303                effective_user_content.clone(),
304                Arc::from(session),
305            ) {
306                Ok(ctx) => ctx,
307                Err(e) => {
308                    yield Err(e);
309                    return;
310                }
311            };
312
313            // Add optional services
314            if let Some(service) = artifact_service {
315                // Wrap service with ScopedArtifacts to bind session context
316                let scoped = adk_artifact::ScopedArtifacts::new(
317                    service,
318                    app_name.clone(),
319                    user_id.to_string(),
320                    session_id.to_string(),
321                );
322                invocation_ctx = invocation_ctx.with_artifacts(Arc::new(scoped));
323            }
324            if let Some(memory) = memory_service {
325                invocation_ctx = invocation_ctx.with_memory(memory);
326            }
327
328            // Apply run config (streaming mode, etc.)
329            invocation_ctx = invocation_ctx.with_run_config(run_config.clone());
330
331            // Apply request context from auth middleware bridge if present
332            if let Some(rc) = request_context.clone() {
333                invocation_ctx = invocation_ctx.with_request_context(rc);
334            }
335
336            let mut ctx = Arc::new(invocation_ctx);
337
338            if let Some(manager) = plugin_manager.as_ref() {
339                match manager
340                    .run_before_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>)
341                    .await
342                {
343                    Ok(Some(content)) => {
344                        let mut early_event = adk_core::Event::new(ctx.invocation_id());
345                        early_event.author = agent_to_run.name().to_string();
346                        early_event.llm_response.content = Some(content);
347
348                        ctx.mutable_session().append_event(early_event.clone());
349                        if let Err(e) = session_service.append_event(ctx.session_id(), early_event.clone()).await {
350                            yield Err(e);
351                            return;
352                        }
353
354                        yield Ok(early_event);
355                        manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
356                        return;
357                    }
358                    Ok(None) => {}
359                    Err(e) => {
360                        manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
361                        yield Err(e);
362                        return;
363                    }
364                }
365
366                match manager
367                    .run_on_user_message(
368                        ctx.clone() as Arc<dyn adk_core::InvocationContext>,
369                        effective_user_content.clone(),
370                    )
371                    .await
372                {
373                    Ok(Some(modified)) => {
374                        effective_user_content = modified;
375
376                        let mut refreshed_ctx = match InvocationContext::with_mutable_session(
377                            ctx.invocation_id().to_string(),
378                            agent_to_run.clone(),
379                            ctx.user_id().to_string(),
380                            ctx.app_name().to_string(),
381                            ctx.session_id().to_string(),
382                            effective_user_content.clone(),
383                            ctx.mutable_session().clone(),
384                        ) {
385                            Ok(ctx) => ctx,
386                            Err(e) => {
387                                yield Err(e);
388                                return;
389                            }
390                        };
391
392                        if let Some(service) = artifact_service_clone.clone() {
393                            let scoped = adk_artifact::ScopedArtifacts::new(
394                                service,
395                                ctx.app_name().to_string(),
396                                ctx.user_id().to_string(),
397                                ctx.session_id().to_string(),
398                            );
399                            refreshed_ctx = refreshed_ctx.with_artifacts(Arc::new(scoped));
400                        }
401                        if let Some(memory) = memory_service_clone.clone() {
402                            refreshed_ctx = refreshed_ctx.with_memory(memory);
403                        }
404                        refreshed_ctx = refreshed_ctx.with_run_config(run_config.clone());
405                        if let Some(rc) = request_context.clone() {
406                            refreshed_ctx = refreshed_ctx.with_request_context(rc);
407                        }
408                        ctx = Arc::new(refreshed_ctx);
409                    }
410                    Ok(None) => {}
411                    Err(e) => {
412                        if let Some(manager) = plugin_manager.as_ref() {
413                            manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
414                        }
415                        yield Err(e);
416                        return;
417                    }
418                }
419            }
420
421            // Append user message to session service (persistent storage)
422            let mut user_event = adk_core::Event::new(ctx.invocation_id());
423            user_event.author = "user".to_string();
424            user_event.llm_response.content = Some(effective_user_content.clone());
425
426            // Also add to mutable session for immediate visibility
427            // Note: adk_session::Event is a re-export of adk_core::Event, so we can use it directly
428            ctx.mutable_session().append_event(user_event.clone());
429
430            if let Err(e) = session_service.append_event(ctx.session_id(), user_event).await {
431                if let Some(manager) = plugin_manager.as_ref() {
432                    manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
433                }
434                yield Err(e);
435                return;
436            }
437
438            // ===== CONTEXT CACHE LIFECYCLE =====
439            // If context caching is configured and a cache-capable model is available,
440            // create or refresh the cached content before agent execution.
441            // Cache failures are non-fatal — log a warning and proceed without cache.
442            if let (Some(cm_mutex), Some(cache_model)) = (&cache_manager_ref, &cache_capable) {
443                let mut cm = cm_mutex.lock().await;
444                if cm.is_enabled() {
445                    if cm.active_cache_name().is_none() || cm.needs_refresh() {
446                        // Gather system instruction from the agent's description
447                        // (the full instruction is resolved inside the agent, but the
448                        // description provides a reasonable proxy for cache keying)
449                        let system_instruction = agent_to_run.description().to_string();
450                        let tools = std::collections::HashMap::new();
451                        let ttl = context_cache_config.as_ref().map_or(600, |c| c.ttl_seconds);
452
453                        match cache_model.create_cache(&system_instruction, &tools, ttl).await {
454                            Ok(name) => {
455                                // Delete old cache if refreshing
456                                if let Some(old) = cm.clear_active_cache() {
457                                    if let Err(e) = cache_model.delete_cache(&old).await {
458                                        tracing::warn!(
459                                            old_cache = %old,
460                                            error = %e,
461                                            "failed to delete old cache, proceeding with new cache"
462                                        );
463                                    }
464                                }
465                                cm.set_active_cache(name);
466                            }
467                            Err(e) => {
468                                tracing::warn!(
469                                    error = %e,
470                                    "cache creation failed, proceeding without cache"
471                                );
472                            }
473                        }
474                    }
475
476                    // Attach cache name to run config so agents can use it
477                    if let Some(cache_name) = cm.record_invocation() {
478                        run_config.cached_content = Some(cache_name.to_string());
479                        // Rebuild the invocation context with the updated run config
480                        let mut refreshed_ctx = match InvocationContext::with_mutable_session(
481                            ctx.invocation_id().to_string(),
482                            agent_to_run.clone(),
483                            ctx.user_id().to_string(),
484                            ctx.app_name().to_string(),
485                            ctx.session_id().to_string(),
486                            effective_user_content.clone(),
487                            ctx.mutable_session().clone(),
488                        ) {
489                            Ok(ctx) => ctx,
490                            Err(e) => {
491                                yield Err(e);
492                                return;
493                            }
494                        };
495                        if let Some(service) = artifact_service_clone.clone() {
496                            let scoped = adk_artifact::ScopedArtifacts::new(
497                                service,
498                                ctx.app_name().to_string(),
499                                ctx.user_id().to_string(),
500                                ctx.session_id().to_string(),
501                            );
502                            refreshed_ctx = refreshed_ctx.with_artifacts(Arc::new(scoped));
503                        }
504                        if let Some(memory) = memory_service_clone.clone() {
505                            refreshed_ctx = refreshed_ctx.with_memory(memory);
506                        }
507                        refreshed_ctx = refreshed_ctx.with_run_config(run_config.clone());
508                        if let Some(rc) = request_context.clone() {
509                            refreshed_ctx = refreshed_ctx.with_request_context(rc);
510                        }
511                        ctx = Arc::new(refreshed_ctx);
512                    }
513                }
514            }
515
516            // ===== INTRA-INVOCATION COMPACTION =====
517            // If intra-compaction is configured, check if the session events
518            // exceed the token threshold and compact them before the agent runs.
519            if let Some(ref compactor) = intra_compactor {
520                compactor.reset_cycle();
521                let session_events = ctx.mutable_session().as_ref().events_snapshot();
522                match compactor.maybe_compact(&session_events).await {
523                    Ok(Some(compacted_events)) => {
524                        ctx.mutable_session().replace_events(compacted_events);
525                        tracing::info!("intra-invocation compaction applied before agent execution");
526                    }
527                    Ok(None) => {} // No compaction needed
528                    Err(e) => {
529                        tracing::warn!(error = %e, "intra-invocation compaction check failed");
530                    }
531                }
532            }
533
534            // Run the agent with instrumentation (ADK-Go style attributes)
535            let agent_span = tracing::info_span!(
536                "agent.execute",
537                "gcp.vertex.agent.invocation_id" = ctx.invocation_id(),
538                "gcp.vertex.agent.session_id" = ctx.session_id(),
539                "gcp.vertex.agent.event_id" = ctx.invocation_id(), // Use invocation_id as event_id for agent spans
540                "gen_ai.conversation.id" = ctx.session_id(),
541                "adk.app_name" = ctx.app_name(),
542                "adk.user_id" = ctx.user_id(),
543                "agent.name" = %agent_to_run.name(),
544                "adk.skills.selected_name" = %selected_skill_name,
545                "adk.skills.selected_id" = %selected_skill_id
546            );
547
548            let mut agent_stream = match agent_to_run.run(ctx.clone()).instrument(agent_span).await {
549                Ok(s) => s,
550                Err(e) => {
551                    if let Some(manager) = plugin_manager.as_ref() {
552                        manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
553                    }
554                    yield Err(e);
555                    return;
556                }
557            };
558
559            // Stream events and check for transfers
560            use futures::StreamExt;
561            let mut transfer_target: Option<String> = None;
562
563            while let Some(result) = {
564                if let Some(token) = cancellation_token.as_ref() {
565                    if token.is_cancelled() {
566                        if let Some(manager) = plugin_manager.as_ref() {
567                            manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
568                        }
569                        return;
570                    }
571                }
572                agent_stream.next().await
573            } {
574                match result {
575                    Ok(event) => {
576                        let mut event = event;
577
578                        if let Some(manager) = plugin_manager.as_ref() {
579                            match manager
580                                .run_on_event(
581                                    ctx.clone() as Arc<dyn adk_core::InvocationContext>,
582                                    event.clone(),
583                                )
584                                .await
585                            {
586                                Ok(Some(modified)) => {
587                                    event = modified;
588                                }
589                                Ok(None) => {}
590                                Err(e) => {
591                                    manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
592                                    yield Err(e);
593                                    return;
594                                }
595                            }
596                        }
597
598                        // Check for transfer action
599                        if let Some(target) = &event.actions.transfer_to_agent {
600                            transfer_target = Some(target.clone());
601                        }
602
603                        // CRITICAL: Apply state_delta to the mutable session immediately.
604                        // This is the key fix for state propagation between sequential agents.
605                        // When an agent sets output_key, it emits an event with state_delta.
606                        // We must apply this to the mutable session so downstream agents
607                        // can read the value via ctx.session().state().get().
608                        if !event.actions.state_delta.is_empty() {
609                            ctx.mutable_session().apply_state_delta(&event.actions.state_delta);
610                        }
611
612                        // Also add the event to the mutable session's event list
613                        ctx.mutable_session().append_event(event.clone());
614
615                        // Append event to session service (persistent storage)
616                        // Skip partial streaming chunks — only persist the final
617                        // event. Streaming chunks share the same event ID, so
618                        // persisting each one would violate the primary key
619                        // constraint. The final chunk (partial=false) carries the
620                        // complete accumulated content.
621                        if !event.llm_response.partial {
622                            if let Err(e) = session_service.append_event(ctx.session_id(), event.clone()).await {
623                                if let Some(manager) = plugin_manager.as_ref() {
624                                    manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
625                                }
626                                yield Err(e);
627                                return;
628                            }
629                        }
630                        yield Ok(event);
631                    }
632                    Err(e) => {
633                        if let Some(manager) = plugin_manager.as_ref() {
634                            manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
635                        }
636                        yield Err(e);
637                        return;
638                    }
639                }
640            }
641
642            // ===== TRANSFER LOOP =====
643            // Support multi-hop transfers with a max-depth guard.
644            // When an agent emits transfer_to_agent, the runner resolves the
645            // target from the root agent tree, computes transfer_targets
646            // (parent + peers) for the new agent, and runs it. This repeats
647            // until no further transfer is requested or the depth limit is hit.
648            const MAX_TRANSFER_DEPTH: u32 = 10;
649            let mut transfer_depth: u32 = 0;
650            let mut current_transfer_target = transfer_target;
651
652            while let Some(target_name) = current_transfer_target.take() {
653                transfer_depth += 1;
654                if transfer_depth > MAX_TRANSFER_DEPTH {
655                    tracing::warn!(
656                        depth = transfer_depth,
657                        target = %target_name,
658                        "max transfer depth exceeded, stopping transfer chain"
659                    );
660                    break;
661                }
662
663                let target_agent = match Self::find_agent(&root_agent, &target_name) {
664                    Some(a) => a,
665                    None => {
666                        tracing::warn!(target = %target_name, "transfer target not found in agent tree");
667                        break;
668                    }
669                };
670
671                // Compute transfer_targets for the target agent:
672                // - parent: the agent that transferred to it (or root if applicable)
673                // - peers: siblings in the agent tree
674                // - children: handled by the agent itself via sub_agents()
675                let (parent_name, peer_names) = Self::compute_transfer_context(&root_agent, &target_name);
676
677                let mut transfer_run_config = run_config.clone();
678                let mut targets = Vec::new();
679                if let Some(ref parent) = parent_name {
680                    targets.push(parent.clone());
681                }
682                targets.extend(peer_names);
683                transfer_run_config.transfer_targets = targets;
684                transfer_run_config.parent_agent = parent_name;
685
686                // For transfers, we reuse the same mutable session to preserve state
687                let transfer_invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
688                let mut transfer_ctx = match InvocationContext::with_mutable_session(
689                    transfer_invocation_id.clone(),
690                    target_agent.clone(),
691                    ctx.user_id().to_string(),
692                    ctx.app_name().to_string(),
693                    ctx.session_id().to_string(),
694                    effective_user_content.clone(),
695                    ctx.mutable_session().clone(),
696                ) {
697                    Ok(ctx) => ctx,
698                    Err(e) => {
699                        yield Err(e);
700                        return;
701                    }
702                };
703
704                if let Some(ref service) = artifact_service_clone {
705                    let scoped = adk_artifact::ScopedArtifacts::new(
706                        service.clone(),
707                        ctx.app_name().to_string(),
708                        ctx.user_id().to_string(),
709                        ctx.session_id().to_string(),
710                    );
711                    transfer_ctx = transfer_ctx.with_artifacts(Arc::new(scoped));
712                }
713                if let Some(ref memory) = memory_service_clone {
714                    transfer_ctx = transfer_ctx.with_memory(memory.clone());
715                }
716                transfer_ctx = transfer_ctx.with_run_config(transfer_run_config);
717                if let Some(rc) = request_context.clone() {
718                    transfer_ctx = transfer_ctx.with_request_context(rc);
719                }
720
721                let transfer_ctx = Arc::new(transfer_ctx);
722
723                // Run the transferred agent
724                let mut transfer_stream = match target_agent.run(transfer_ctx.clone()).await {
725                    Ok(s) => s,
726                    Err(e) => {
727                        if let Some(manager) = plugin_manager.as_ref() {
728                            manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
729                        }
730                        yield Err(e);
731                        return;
732                    }
733                };
734
735                // Stream events from the transferred agent, capturing any further transfer
736                while let Some(result) = {
737                    if let Some(token) = cancellation_token.as_ref() {
738                        if token.is_cancelled() {
739                            if let Some(manager) = plugin_manager.as_ref() {
740                                manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
741                            }
742                            return;
743                        }
744                    }
745                    transfer_stream.next().await
746                } {
747                    match result {
748                        Ok(event) => {
749                            let mut event = event;
750                            if let Some(manager) = plugin_manager.as_ref() {
751                                match manager
752                                    .run_on_event(
753                                        transfer_ctx.clone() as Arc<dyn adk_core::InvocationContext>,
754                                        event.clone(),
755                                    )
756                                    .await
757                                {
758                                    Ok(Some(modified)) => {
759                                        event = modified;
760                                    }
761                                    Ok(None) => {}
762                                    Err(e) => {
763                                        manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
764                                        yield Err(e);
765                                        return;
766                                    }
767                                }
768                            }
769
770                            // Capture further transfer requests
771                            if let Some(target) = &event.actions.transfer_to_agent {
772                                current_transfer_target = Some(target.clone());
773                            }
774
775                            // Apply state delta for transferred agent too
776                            if !event.actions.state_delta.is_empty() {
777                                transfer_ctx.mutable_session().apply_state_delta(&event.actions.state_delta);
778                            }
779
780                            // Add to mutable session
781                            transfer_ctx.mutable_session().append_event(event.clone());
782
783                            if !event.llm_response.partial {
784                                if let Err(e) = session_service.append_event(ctx.session_id(), event.clone()).await {
785                                    if let Some(manager) = plugin_manager.as_ref() {
786                                        manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
787                                    }
788                                    yield Err(e);
789                                    return;
790                                }
791                            }
792                            yield Ok(event);
793                        }
794                        Err(e) => {
795                            if let Some(manager) = plugin_manager.as_ref() {
796                                manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
797                            }
798                            yield Err(e);
799                            return;
800                        }
801                    }
802                }
803            }
804
805            // ===== CONTEXT COMPACTION =====
806            // After all events have been processed, check if compaction should trigger.
807            // This runs in the background after the invocation completes.
808            if let Some(ref compaction_cfg) = compaction_config {
809                let event_count = ctx.mutable_session().as_ref().events_len();
810
811                if event_count > 0 {
812                    let all_events = ctx.mutable_session().as_ref().events_snapshot();
813                    let invocation_count = all_events.iter().filter(|e| e.author == "user").count()
814                        as u32;
815
816                    if invocation_count > 0
817                        && invocation_count % compaction_cfg.compaction_interval == 0
818                    {
819                        // Determine the window of events to compact
820                        // We compact all events except the most recent overlap_size invocations
821                        let overlap = compaction_cfg.overlap_size as usize;
822
823                        // Find the boundary: keep the last `overlap` user messages and everything after
824                        let user_msg_indices: Vec<usize> = all_events.iter()
825                            .enumerate()
826                            .filter(|(_, e)| e.author == "user")
827                            .map(|(i, _)| i)
828                            .collect();
829
830                        // Keep the last `overlap` user messages intact.
831                        // When overlap is 0, compact everything.
832                        let compact_up_to = if overlap == 0 {
833                            all_events.len()
834                        } else if user_msg_indices.len() > overlap {
835                            // Compact up to (but not including) the overlap-th-from-last user message
836                            user_msg_indices[user_msg_indices.len() - overlap]
837                        } else {
838                            // Not enough user messages to satisfy overlap — skip compaction
839                            0
840                        };
841
842                        if compact_up_to > 0 {
843                            let events_to_compact = &all_events[..compact_up_to];
844
845                            match compaction_cfg.summarizer.summarize_events(events_to_compact).await {
846                                Ok(Some(compaction_event)) => {
847                                    // Persist the compaction event
848                                    if let Err(e) = session_service.append_event(
849                                        ctx.session_id(),
850                                        compaction_event.clone(),
851                                    ).await {
852                                        tracing::warn!(error = %e, "Failed to persist compaction event");
853                                    } else {
854                                        tracing::info!(
855                                            compacted_events = compact_up_to,
856                                            "Context compaction completed"
857                                        );
858                                    }
859                                }
860                                Ok(None) => {
861                                    tracing::debug!("Compaction summarizer returned no result");
862                                }
863                                Err(e) => {
864                                    // Compaction failure is non-fatal — log and continue
865                                    tracing::warn!(error = %e, "Context compaction failed");
866                                }
867                            }
868                        }
869                    }
870                }
871            }
872
873            if let Some(manager) = plugin_manager.as_ref() {
874                manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
875            }
876        };
877
878        Ok(Box::pin(s))
879    }
880
881    /// Convenience method that accepts string arguments.
882    ///
883    /// Converts `user_id` and `session_id` to their typed equivalents
884    /// and delegates to [`run()`](Self::run).
885    ///
886    /// # Errors
887    ///
888    /// Returns an error if either string fails identity validation
889    /// (empty, contains null bytes, or exceeds length limit).
890    pub async fn run_str(
891        &self,
892        user_id: &str,
893        session_id: &str,
894        user_content: Content,
895    ) -> Result<EventStream> {
896        let user_id = UserId::try_from(user_id)?;
897        let session_id = SessionId::try_from(session_id)?;
898        self.run(user_id, session_id, user_content).await
899    }
900
901    /// Interrupt a running agent for the given session.
902    ///
903    /// Cancels the agent's current execution within the event loop. Events
904    /// already produced and appended to the session are preserved — only
905    /// future events are stopped. The caller can then issue a new `run()`
906    /// call with a different instruction to redirect the agent.
907    ///
908    /// Returns `true` if a running session was found and interrupted,
909    /// `false` if no active run exists for that session ID.
910    ///
911    /// # Example
912    ///
913    /// ```rust,ignore
914    /// // Start a run in the background
915    /// let mut stream = runner.run(user_id, session_id, content).await?;
916    /// tokio::spawn(async move { while stream.next().await.is_some() {} });
917    ///
918    /// // Later, interrupt it
919    /// let was_running = runner.interrupt("session-1");
920    /// assert!(was_running);
921    ///
922    /// // Redirect with a new instruction
923    /// let mut stream = runner.run(user_id, session_id, new_content).await?;
924    /// ```
925    pub fn interrupt(&self, session_id: &str) -> bool {
926        let sessions = self.active_sessions.lock().unwrap();
927        if let Some(token) = sessions.get(session_id) {
928            tracing::info!(session.id = session_id, "interrupting running agent");
929            token.cancel();
930            true
931        } else {
932            tracing::debug!(session.id = session_id, "no active run to interrupt");
933            false
934        }
935    }
936
937    /// Returns the session IDs of all currently running agent executions.
938    pub fn active_session_ids(&self) -> Vec<String> {
939        let sessions = self.active_sessions.lock().unwrap();
940        sessions.keys().cloned().collect()
941    }
942
943    /// Find which agent should handle the request based on session history
944    pub fn find_agent_to_run(
945        root_agent: &Arc<dyn Agent>,
946        session: &dyn adk_session::Session,
947    ) -> Arc<dyn Agent> {
948        // Look at recent events to find last agent that responded
949        let events = session.events();
950        for i in (0..events.len()).rev() {
951            if let Some(event) = events.at(i) {
952                // Check for explicit transfer
953                if let Some(target_name) = &event.actions.transfer_to_agent {
954                    if let Some(agent) = Self::find_agent(root_agent, target_name) {
955                        return agent;
956                    }
957                }
958
959                if event.author == "user" {
960                    continue;
961                }
962
963                // Try to find this agent in the tree
964                if let Some(agent) = Self::find_agent(root_agent, &event.author) {
965                    // Check if agent allows transfer up the tree
966                    if Self::is_transferable(root_agent, &agent) {
967                        return agent;
968                    }
969                }
970            }
971        }
972
973        // Default to root agent
974        root_agent.clone()
975    }
976
977    /// Check if an agent found in session history can be resumed for the next
978    /// user message.
979    ///
980    /// This always returns `true` because the transfer-policy enforcement
981    /// (`disallow_transfer_to_parent` / `disallow_transfer_to_peers`) is
982    /// handled inside `LlmAgent::run()` when it builds the `transfer_to_agent`
983    /// tool's valid-target list. The runner does not need to duplicate that
984    /// check here — it only needs to know whether the agent is a valid
985    /// resumption target, which it always is if it exists in the tree.
986    fn is_transferable(_root_agent: &Arc<dyn Agent>, _agent: &Arc<dyn Agent>) -> bool {
987        true
988    }
989
990    /// Recursively search agent tree for agent with given name
991    pub fn find_agent(current: &Arc<dyn Agent>, target_name: &str) -> Option<Arc<dyn Agent>> {
992        if current.name() == target_name {
993            return Some(current.clone());
994        }
995
996        for sub_agent in current.sub_agents() {
997            if let Some(found) = Self::find_agent(sub_agent, target_name) {
998                return Some(found);
999            }
1000        }
1001
1002        None
1003    }
1004
1005    /// Compute the parent name and peer names for a given agent in the tree.
1006    /// Returns `(parent_name, peer_names)`.
1007    ///
1008    /// Walks the agent tree to find the parent of `target_name`, then collects
1009    /// the parent's name and the sibling agent names (excluding the target itself).
1010    pub fn compute_transfer_context(
1011        root: &Arc<dyn Agent>,
1012        target_name: &str,
1013    ) -> (Option<String>, Vec<String>) {
1014        // If the target is the root itself, there's no parent or peers
1015        if root.name() == target_name {
1016            return (None, Vec::new());
1017        }
1018
1019        // BFS/DFS to find the parent of target_name
1020        fn find_parent(current: &Arc<dyn Agent>, target: &str) -> Option<Arc<dyn Agent>> {
1021            for sub in current.sub_agents() {
1022                if sub.name() == target {
1023                    return Some(current.clone());
1024                }
1025                if let Some(found) = find_parent(sub, target) {
1026                    return Some(found);
1027                }
1028            }
1029            None
1030        }
1031
1032        match find_parent(root, target_name) {
1033            Some(parent) => {
1034                let parent_name = parent.name().to_string();
1035                let peers: Vec<String> = parent
1036                    .sub_agents()
1037                    .iter()
1038                    .filter(|a| a.name() != target_name)
1039                    .map(|a| a.name().to_string())
1040                    .collect();
1041                (Some(parent_name), peers)
1042            }
1043            None => (None, Vec::new()),
1044        }
1045    }
1046}