Skip to main content

adk_runner/
runner.rs

1use crate::InvocationContext;
2use adk_artifact::ArtifactService;
3use adk_core::{Agent, Content, EventStream, Memory, Result, RunConfig};
4use adk_plugin::PluginManager;
5use adk_session::SessionService;
6use adk_skill::{SkillInjector, SkillInjectorConfig};
7use async_stream::stream;
8use std::sync::Arc;
9use tracing::Instrument;
10
11pub struct RunnerConfig {
12    pub app_name: String,
13    pub agent: Arc<dyn Agent>,
14    pub session_service: Arc<dyn SessionService>,
15    pub artifact_service: Option<Arc<dyn ArtifactService>>,
16    pub memory_service: Option<Arc<dyn Memory>>,
17    pub plugin_manager: Option<Arc<PluginManager>>,
18    /// Optional run configuration (streaming mode, etc.)
19    /// If not provided, uses default (SSE streaming)
20    #[allow(dead_code)]
21    pub run_config: Option<RunConfig>,
22    /// Optional context compaction configuration.
23    /// When set, the runner will periodically summarize older events
24    /// to reduce context size sent to the LLM.
25    pub compaction_config: Option<adk_core::EventsCompactionConfig>,
26}
27
28pub struct Runner {
29    app_name: String,
30    root_agent: Arc<dyn Agent>,
31    session_service: Arc<dyn SessionService>,
32    artifact_service: Option<Arc<dyn ArtifactService>>,
33    memory_service: Option<Arc<dyn Memory>>,
34    plugin_manager: Option<Arc<PluginManager>>,
35    skill_injector: Option<Arc<SkillInjector>>,
36    run_config: RunConfig,
37    compaction_config: Option<adk_core::EventsCompactionConfig>,
38}
39
40impl Runner {
41    pub fn new(config: RunnerConfig) -> Result<Self> {
42        Ok(Self {
43            app_name: config.app_name,
44            root_agent: config.agent,
45            session_service: config.session_service,
46            artifact_service: config.artifact_service,
47            memory_service: config.memory_service,
48            plugin_manager: config.plugin_manager,
49            skill_injector: None,
50            run_config: config.run_config.unwrap_or_default(),
51            compaction_config: config.compaction_config,
52        })
53    }
54
55    /// Enable skill injection using a pre-built injector.
56    ///
57    /// Skill injection runs before plugin `on_user_message` callbacks.
58    pub fn with_skill_injector(mut self, injector: SkillInjector) -> Self {
59        self.skill_injector = Some(Arc::new(injector));
60        self
61    }
62
63    /// Enable skill injection by auto-loading `.skills/` from the given root path.
64    pub fn with_auto_skills(
65        mut self,
66        root: impl AsRef<std::path::Path>,
67        config: SkillInjectorConfig,
68    ) -> adk_skill::SkillResult<Self> {
69        let injector = SkillInjector::from_root(root, config)?;
70        self.skill_injector = Some(Arc::new(injector));
71        Ok(self)
72    }
73
74    pub async fn run(
75        &self,
76        user_id: String,
77        session_id: String,
78        user_content: Content,
79    ) -> Result<EventStream> {
80        let app_name = self.app_name.clone();
81        let session_service = self.session_service.clone();
82        let root_agent = self.root_agent.clone();
83        let artifact_service = self.artifact_service.clone();
84        let memory_service = self.memory_service.clone();
85        let plugin_manager = self.plugin_manager.clone();
86        let skill_injector = self.skill_injector.clone();
87        let run_config = self.run_config.clone();
88        let compaction_config = self.compaction_config.clone();
89
90        let s = stream! {
91            // Get or create session
92            let session = match session_service
93                .get(adk_session::GetRequest {
94                    app_name: app_name.clone(),
95                    user_id: user_id.clone(),
96                    session_id: session_id.clone(),
97                    num_recent_events: None,
98                    after: None,
99                })
100                .await
101            {
102                Ok(s) => s,
103                Err(e) => {
104                    yield Err(e);
105                    return;
106                }
107            };
108
109            // Find which agent should handle this request
110            let agent_to_run = Self::find_agent_to_run(&root_agent, session.as_ref());
111
112            // Clone services for potential reuse in transfer
113            let artifact_service_clone = artifact_service.clone();
114            let memory_service_clone = memory_service.clone();
115
116            // Create invocation context with MutableSession
117            let invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
118            let mut effective_user_content = user_content.clone();
119            let mut selected_skill_name = String::new();
120            let mut selected_skill_id = String::new();
121
122            if let Some(injector) = skill_injector.as_ref() {
123                if let Some(matched) = adk_skill::apply_skill_injection(
124                    &mut effective_user_content,
125                    injector.index(),
126                    injector.policy(),
127                    injector.max_injected_chars(),
128                ) {
129                    selected_skill_name = matched.skill.name;
130                    selected_skill_id = matched.skill.id;
131                }
132            }
133
134            let mut invocation_ctx = InvocationContext::new(
135                invocation_id.clone(),
136                agent_to_run.clone(),
137                user_id.clone(),
138                app_name.clone(),
139                session_id.clone(),
140                effective_user_content.clone(),
141                Arc::from(session),
142            );
143
144            // Add optional services
145            if let Some(service) = artifact_service {
146                // Wrap service with ScopedArtifacts to bind session context
147                let scoped = adk_artifact::ScopedArtifacts::new(
148                    service,
149                    app_name.clone(),
150                    user_id.clone(),
151                    session_id.clone(),
152                );
153                invocation_ctx = invocation_ctx.with_artifacts(Arc::new(scoped));
154            }
155            if let Some(memory) = memory_service {
156                invocation_ctx = invocation_ctx.with_memory(memory);
157            }
158
159            // Apply run config (streaming mode, etc.)
160            invocation_ctx = invocation_ctx.with_run_config(run_config.clone());
161
162            let mut ctx = Arc::new(invocation_ctx);
163
164            if let Some(manager) = plugin_manager.as_ref() {
165                match manager
166                    .run_before_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>)
167                    .await
168                {
169                    Ok(Some(content)) => {
170                        let mut early_event = adk_core::Event::new(&invocation_id);
171                        early_event.author = agent_to_run.name().to_string();
172                        early_event.llm_response.content = Some(content);
173
174                        ctx.mutable_session().append_event(early_event.clone());
175                        if let Err(e) = session_service.append_event(&session_id, early_event.clone()).await {
176                            yield Err(e);
177                            return;
178                        }
179
180                        yield Ok(early_event);
181                        manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
182                        return;
183                    }
184                    Ok(None) => {}
185                    Err(e) => {
186                        manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
187                        yield Err(e);
188                        return;
189                    }
190                }
191
192                match manager
193                    .run_on_user_message(
194                        ctx.clone() as Arc<dyn adk_core::InvocationContext>,
195                        effective_user_content.clone(),
196                    )
197                    .await
198                {
199                    Ok(Some(modified)) => {
200                        effective_user_content = modified;
201
202                        let mut refreshed_ctx = InvocationContext::with_mutable_session(
203                            invocation_id.clone(),
204                            agent_to_run.clone(),
205                            user_id.clone(),
206                            app_name.clone(),
207                            session_id.clone(),
208                            effective_user_content.clone(),
209                            ctx.mutable_session().clone(),
210                        );
211
212                        if let Some(service) = artifact_service_clone.clone() {
213                            let scoped = adk_artifact::ScopedArtifacts::new(
214                                service,
215                                app_name.clone(),
216                                user_id.clone(),
217                                session_id.clone(),
218                            );
219                            refreshed_ctx = refreshed_ctx.with_artifacts(Arc::new(scoped));
220                        }
221                        if let Some(memory) = memory_service_clone.clone() {
222                            refreshed_ctx = refreshed_ctx.with_memory(memory);
223                        }
224                        refreshed_ctx = refreshed_ctx.with_run_config(run_config.clone());
225                        ctx = Arc::new(refreshed_ctx);
226                    }
227                    Ok(None) => {}
228                    Err(e) => {
229                        if let Some(manager) = plugin_manager.as_ref() {
230                            manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
231                        }
232                        yield Err(e);
233                        return;
234                    }
235                }
236            }
237
238            // Append user message to session service (persistent storage)
239            let mut user_event = adk_core::Event::new(&invocation_id);
240            user_event.author = "user".to_string();
241            user_event.llm_response.content = Some(effective_user_content.clone());
242
243            // Also add to mutable session for immediate visibility
244            // Note: adk_session::Event is a re-export of adk_core::Event, so we can use it directly
245            ctx.mutable_session().append_event(user_event.clone());
246
247            if let Err(e) = session_service.append_event(&session_id, user_event).await {
248                if let Some(manager) = plugin_manager.as_ref() {
249                    manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
250                }
251                yield Err(e);
252                return;
253            }
254
255            // Run the agent with instrumentation (ADK-Go style attributes)
256            let agent_span = tracing::info_span!(
257                "agent.execute",
258                "gcp.vertex.agent.invocation_id" = %invocation_id,
259                "gcp.vertex.agent.session_id" = %session_id,
260                "gcp.vertex.agent.event_id" = %invocation_id, // Use invocation_id as event_id for agent spans
261                "gen_ai.conversation.id" = %session_id,
262                "agent.name" = %agent_to_run.name(),
263                "adk.skills.selected_name" = %selected_skill_name,
264                "adk.skills.selected_id" = %selected_skill_id
265            );
266
267            let mut agent_stream = match agent_to_run.run(ctx.clone()).instrument(agent_span).await {
268                Ok(s) => s,
269                Err(e) => {
270                    if let Some(manager) = plugin_manager.as_ref() {
271                        manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
272                    }
273                    yield Err(e);
274                    return;
275                }
276            };
277
278            // Stream events and check for transfers
279            use futures::StreamExt;
280            let mut transfer_target: Option<String> = None;
281
282            while let Some(result) = agent_stream.next().await {
283                match result {
284                    Ok(event) => {
285                        let mut event = event;
286
287                        if let Some(manager) = plugin_manager.as_ref() {
288                            match manager
289                                .run_on_event(
290                                    ctx.clone() as Arc<dyn adk_core::InvocationContext>,
291                                    event.clone(),
292                                )
293                                .await
294                            {
295                                Ok(Some(modified)) => {
296                                    event = modified;
297                                }
298                                Ok(None) => {}
299                                Err(e) => {
300                                    manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
301                                    yield Err(e);
302                                    return;
303                                }
304                            }
305                        }
306
307                        // Check for transfer action
308                        if let Some(target) = &event.actions.transfer_to_agent {
309                            transfer_target = Some(target.clone());
310                        }
311
312                        // CRITICAL: Apply state_delta to the mutable session immediately.
313                        // This is the key fix for state propagation between sequential agents.
314                        // When an agent sets output_key, it emits an event with state_delta.
315                        // We must apply this to the mutable session so downstream agents
316                        // can read the value via ctx.session().state().get().
317                        if !event.actions.state_delta.is_empty() {
318                            ctx.mutable_session().apply_state_delta(&event.actions.state_delta);
319                        }
320
321                        // Also add the event to the mutable session's event list
322                        ctx.mutable_session().append_event(event.clone());
323
324                        // Append event to session service (persistent storage)
325                        if let Err(e) = session_service.append_event(&session_id, event.clone()).await {
326                            if let Some(manager) = plugin_manager.as_ref() {
327                                manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
328                            }
329                            yield Err(e);
330                            return;
331                        }
332                        yield Ok(event);
333                    }
334                    Err(e) => {
335                        if let Some(manager) = plugin_manager.as_ref() {
336                            manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
337                        }
338                        yield Err(e);
339                        return;
340                    }
341                }
342            }
343
344            // If a transfer was requested, automatically invoke the target agent
345            if let Some(target_name) = transfer_target {
346                if let Some(target_agent) = Self::find_agent(&root_agent, &target_name) {
347                    // For transfers, we reuse the same mutable session to preserve state
348                    let transfer_invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
349                    let mut transfer_ctx = InvocationContext::with_mutable_session(
350                        transfer_invocation_id.clone(),
351                        target_agent.clone(),
352                        user_id.clone(),
353                        app_name.clone(),
354                        session_id.clone(),
355                        effective_user_content.clone(),
356                        ctx.mutable_session().clone(),
357                    );
358
359                    if let Some(service) = artifact_service_clone {
360                        let scoped = adk_artifact::ScopedArtifacts::new(
361                            service,
362                            app_name.clone(),
363                            user_id.clone(),
364                            session_id.clone(),
365                        );
366                        transfer_ctx = transfer_ctx.with_artifacts(Arc::new(scoped));
367                    }
368                    if let Some(memory) = memory_service_clone {
369                        transfer_ctx = transfer_ctx.with_memory(memory);
370                    }
371
372                    let transfer_ctx = Arc::new(transfer_ctx);
373
374                    // Run the transferred agent
375                    let mut transfer_stream = match target_agent.run(transfer_ctx.clone()).await {
376                        Ok(s) => s,
377                        Err(e) => {
378                            if let Some(manager) = plugin_manager.as_ref() {
379                                manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
380                            }
381                            yield Err(e);
382                            return;
383                        }
384                    };
385
386                    // Stream events from the transferred agent
387                    while let Some(result) = transfer_stream.next().await {
388                        match result {
389                            Ok(event) => {
390                                let mut event = event;
391                                if let Some(manager) = plugin_manager.as_ref() {
392                                    match manager
393                                        .run_on_event(
394                                            transfer_ctx.clone() as Arc<dyn adk_core::InvocationContext>,
395                                            event.clone(),
396                                        )
397                                        .await
398                                    {
399                                        Ok(Some(modified)) => {
400                                            event = modified;
401                                        }
402                                        Ok(None) => {}
403                                        Err(e) => {
404                                            manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
405                                            yield Err(e);
406                                            return;
407                                        }
408                                    }
409                                }
410
411                                // Apply state delta for transferred agent too
412                                if !event.actions.state_delta.is_empty() {
413                                    transfer_ctx.mutable_session().apply_state_delta(&event.actions.state_delta);
414                                }
415
416                                // Add to mutable session
417                                transfer_ctx.mutable_session().append_event(event.clone());
418
419                                if let Err(e) = session_service.append_event(&session_id, event.clone()).await {
420                                    if let Some(manager) = plugin_manager.as_ref() {
421                                        manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
422                                    }
423                                    yield Err(e);
424                                    return;
425                                }
426                                yield Ok(event);
427                            }
428                            Err(e) => {
429                                if let Some(manager) = plugin_manager.as_ref() {
430                                    manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
431                                }
432                                yield Err(e);
433                                return;
434                            }
435                        }
436                    }
437                }
438            }
439
440            // ===== CONTEXT COMPACTION =====
441            // After all events have been processed, check if compaction should trigger.
442            // This runs in the background after the invocation completes.
443            if let Some(ref compaction_cfg) = compaction_config {
444                // Count invocations by counting user events in the session
445                let all_events = ctx.mutable_session().as_ref().events_snapshot();
446                let invocation_count = all_events.iter()
447                    .filter(|e| e.author == "user")
448                    .count() as u32;
449
450                if invocation_count > 0 && invocation_count % compaction_cfg.compaction_interval == 0 {
451                    // Determine the window of events to compact
452                    // We compact all events except the most recent overlap_size invocations
453                    let overlap = compaction_cfg.overlap_size as usize;
454
455                    // Find the boundary: keep the last `overlap` user messages and everything after
456                    let user_msg_indices: Vec<usize> = all_events.iter()
457                        .enumerate()
458                        .filter(|(_, e)| e.author == "user")
459                        .map(|(i, _)| i)
460                        .collect();
461
462                    // Keep the last `overlap` user messages intact.
463                    // When overlap is 0, compact everything.
464                    let compact_up_to = if overlap == 0 {
465                        all_events.len()
466                    } else if user_msg_indices.len() > overlap {
467                        // Compact up to (but not including) the overlap-th-from-last user message
468                        user_msg_indices[user_msg_indices.len() - overlap]
469                    } else {
470                        // Not enough user messages to satisfy overlap — skip compaction
471                        0
472                    };
473
474                    if compact_up_to > 0 {
475                        let events_to_compact = &all_events[..compact_up_to];
476
477                        match compaction_cfg.summarizer.summarize_events(events_to_compact).await {
478                            Ok(Some(compaction_event)) => {
479                                // Persist the compaction event
480                                if let Err(e) = session_service.append_event(
481                                    &session_id,
482                                    compaction_event.clone(),
483                                ).await {
484                                    tracing::warn!(error = %e, "Failed to persist compaction event");
485                                } else {
486                                    tracing::info!(
487                                        compacted_events = compact_up_to,
488                                        "Context compaction completed"
489                                    );
490                                }
491                            }
492                            Ok(None) => {
493                                tracing::debug!("Compaction summarizer returned no result");
494                            }
495                            Err(e) => {
496                                // Compaction failure is non-fatal — log and continue
497                                tracing::warn!(error = %e, "Context compaction failed");
498                            }
499                        }
500                    }
501                }
502            }
503
504            if let Some(manager) = plugin_manager.as_ref() {
505                manager.run_after_run(ctx.clone() as Arc<dyn adk_core::InvocationContext>).await;
506            }
507        };
508
509        Ok(Box::pin(s))
510    }
511
512    /// Find which agent should handle the request based on session history
513    pub fn find_agent_to_run(
514        root_agent: &Arc<dyn Agent>,
515        session: &dyn adk_session::Session,
516    ) -> Arc<dyn Agent> {
517        // Look at recent events to find last agent that responded
518        let events = session.events();
519        for i in (0..events.len()).rev() {
520            if let Some(event) = events.at(i) {
521                // Check for explicit transfer
522                if let Some(target_name) = &event.actions.transfer_to_agent {
523                    if let Some(agent) = Self::find_agent(root_agent, target_name) {
524                        return agent;
525                    }
526                }
527
528                if event.author == "user" {
529                    continue;
530                }
531
532                // Try to find this agent in the tree
533                if let Some(agent) = Self::find_agent(root_agent, &event.author) {
534                    // Check if agent allows transfer up the tree
535                    if Self::is_transferable(root_agent, &agent) {
536                        return agent;
537                    }
538                }
539            }
540        }
541
542        // Default to root agent
543        root_agent.clone()
544    }
545
546    /// Check if agent and its parent chain allow transfer up the tree
547    fn is_transferable(root_agent: &Arc<dyn Agent>, agent: &Arc<dyn Agent>) -> bool {
548        // For now, always allow transfer
549        // TODO: Check DisallowTransferToParent flag when LlmAgent supports it
550        let _ = (root_agent, agent);
551        true
552    }
553
554    /// Recursively search agent tree for agent with given name
555    pub fn find_agent(current: &Arc<dyn Agent>, target_name: &str) -> Option<Arc<dyn Agent>> {
556        if current.name() == target_name {
557            return Some(current.clone());
558        }
559
560        for sub_agent in current.sub_agents() {
561            if let Some(found) = Self::find_agent(sub_agent, target_name) {
562                return Some(found);
563            }
564        }
565
566        None
567    }
568}