adk_runner/
runner.rs

1use crate::InvocationContext;
2use adk_artifact::ArtifactService;
3use adk_core::{Agent, Content, EventStream, Memory, Result, RunConfig};
4use adk_session::SessionService;
5use async_stream::stream;
6use std::sync::Arc;
7use tracing::Instrument;
8
9pub struct RunnerConfig {
10    pub app_name: String,
11    pub agent: Arc<dyn Agent>,
12    pub session_service: Arc<dyn SessionService>,
13    pub artifact_service: Option<Arc<dyn ArtifactService>>,
14    pub memory_service: Option<Arc<dyn Memory>>,
15    /// Optional run configuration (streaming mode, etc.)
16    /// If not provided, uses default (SSE streaming)
17    #[allow(dead_code)]
18    pub run_config: Option<RunConfig>,
19}
20
21pub struct Runner {
22    app_name: String,
23    root_agent: Arc<dyn Agent>,
24    session_service: Arc<dyn SessionService>,
25    artifact_service: Option<Arc<dyn ArtifactService>>,
26    memory_service: Option<Arc<dyn Memory>>,
27    run_config: RunConfig,
28}
29
30impl Runner {
31    pub fn new(config: RunnerConfig) -> Result<Self> {
32        Ok(Self {
33            app_name: config.app_name,
34            root_agent: config.agent,
35            session_service: config.session_service,
36            artifact_service: config.artifact_service,
37            memory_service: config.memory_service,
38            run_config: config.run_config.unwrap_or_default(),
39        })
40    }
41
42    pub async fn run(
43        &self,
44        user_id: String,
45        session_id: String,
46        user_content: Content,
47    ) -> Result<EventStream> {
48        let app_name = self.app_name.clone();
49        let session_service = self.session_service.clone();
50        let root_agent = self.root_agent.clone();
51        let artifact_service = self.artifact_service.clone();
52        let memory_service = self.memory_service.clone();
53        let run_config = self.run_config.clone();
54
55        let s = stream! {
56            // Get or create session
57            let session = match session_service
58                .get(adk_session::GetRequest {
59                    app_name: app_name.clone(),
60                    user_id: user_id.clone(),
61                    session_id: session_id.clone(),
62                    num_recent_events: None,
63                    after: None,
64                })
65                .await
66            {
67                Ok(s) => s,
68                Err(e) => {
69                    yield Err(e);
70                    return;
71                }
72            };
73
74            // Find which agent should handle this request
75            let agent_to_run = Self::find_agent_to_run(&root_agent, session.as_ref());
76
77            // Clone services for potential reuse in transfer
78            let artifact_service_clone = artifact_service.clone();
79            let memory_service_clone = memory_service.clone();
80
81            // Create invocation context with MutableSession
82            let invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
83            let mut ctx = InvocationContext::new(
84                invocation_id.clone(),
85                agent_to_run.clone(),
86                user_id.clone(),
87                app_name.clone(),
88                session_id.clone(),
89                user_content.clone(),
90                Arc::from(session),
91            );
92
93            // Add optional services
94            if let Some(service) = artifact_service {
95                // Wrap service with ScopedArtifacts to bind session context
96                let scoped = adk_artifact::ScopedArtifacts::new(
97                    service,
98                    app_name.clone(),
99                    user_id.clone(),
100                    session_id.clone(),
101                );
102                ctx = ctx.with_artifacts(Arc::new(scoped));
103            }
104            if let Some(memory) = memory_service {
105                ctx = ctx.with_memory(memory);
106            }
107
108            // Apply run config (streaming mode, etc.)
109            ctx = ctx.with_run_config(run_config.clone());
110
111            let ctx = Arc::new(ctx);
112
113            // Append user message to session service (persistent storage)
114            let mut user_event = adk_core::Event::new(&invocation_id);
115            user_event.author = "user".to_string();
116            user_event.llm_response.content = Some(user_content.clone());
117
118            // Also add to mutable session for immediate visibility
119            // Note: adk_session::Event is a re-export of adk_core::Event, so we can use it directly
120            ctx.mutable_session().append_event(user_event.clone());
121
122            if let Err(e) = session_service.append_event(&session_id, user_event).await {
123                yield Err(e);
124                return;
125            }
126
127            // Run the agent with instrumentation (ADK-Go style attributes)
128            let agent_span = tracing::info_span!(
129                "agent.execute",
130                "gcp.vertex.agent.invocation_id" = %invocation_id,
131                "gcp.vertex.agent.session_id" = %session_id,
132                "gcp.vertex.agent.event_id" = %invocation_id, // Use invocation_id as event_id for agent spans
133                "agent.name" = %agent_to_run.name()
134            );
135
136            let mut agent_stream = match agent_to_run.run(ctx.clone()).instrument(agent_span).await {
137                Ok(s) => s,
138                Err(e) => {
139                    yield Err(e);
140                    return;
141                }
142            };
143
144            // Stream events and check for transfers
145            use futures::StreamExt;
146            let mut transfer_target: Option<String> = None;
147
148            while let Some(result) = agent_stream.next().await {
149                match result {
150                    Ok(event) => {
151                        // Check for transfer action
152                        if let Some(target) = &event.actions.transfer_to_agent {
153                            transfer_target = Some(target.clone());
154                        }
155
156                        // CRITICAL: Apply state_delta to the mutable session immediately.
157                        // This is the key fix for state propagation between sequential agents.
158                        // When an agent sets output_key, it emits an event with state_delta.
159                        // We must apply this to the mutable session so downstream agents
160                        // can read the value via ctx.session().state().get().
161                        if !event.actions.state_delta.is_empty() {
162                            ctx.mutable_session().apply_state_delta(&event.actions.state_delta);
163                        }
164
165                        // Also add the event to the mutable session's event list
166                        ctx.mutable_session().append_event(event.clone());
167
168                        // Append event to session service (persistent storage)
169                        if let Err(e) = session_service.append_event(&session_id, event.clone()).await {
170                            yield Err(e);
171                            return;
172                        }
173                        yield Ok(event);
174                    }
175                    Err(e) => {
176                        yield Err(e);
177                        return;
178                    }
179                }
180            }
181
182            // If a transfer was requested, automatically invoke the target agent
183            if let Some(target_name) = transfer_target {
184                if let Some(target_agent) = Self::find_agent(&root_agent, &target_name) {
185                    // For transfers, we reuse the same mutable session to preserve state
186                    let transfer_invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
187                    let mut transfer_ctx = InvocationContext::with_mutable_session(
188                        transfer_invocation_id.clone(),
189                        target_agent.clone(),
190                        user_id.clone(),
191                        app_name.clone(),
192                        session_id.clone(),
193                        user_content.clone(),
194                        ctx.mutable_session().clone(),
195                    );
196
197                    if let Some(service) = artifact_service_clone {
198                        let scoped = adk_artifact::ScopedArtifacts::new(
199                            service,
200                            app_name.clone(),
201                            user_id.clone(),
202                            session_id.clone(),
203                        );
204                        transfer_ctx = transfer_ctx.with_artifacts(Arc::new(scoped));
205                    }
206                    if let Some(memory) = memory_service_clone {
207                        transfer_ctx = transfer_ctx.with_memory(memory);
208                    }
209
210                    let transfer_ctx = Arc::new(transfer_ctx);
211
212                    // Run the transferred agent
213                    let mut transfer_stream = match target_agent.run(transfer_ctx.clone()).await {
214                        Ok(s) => s,
215                        Err(e) => {
216                            yield Err(e);
217                            return;
218                        }
219                    };
220
221                    // Stream events from the transferred agent
222                    while let Some(result) = transfer_stream.next().await {
223                        match result {
224                            Ok(event) => {
225                                // Apply state delta for transferred agent too
226                                if !event.actions.state_delta.is_empty() {
227                                    transfer_ctx.mutable_session().apply_state_delta(&event.actions.state_delta);
228                                }
229
230                                // Add to mutable session
231                                transfer_ctx.mutable_session().append_event(event.clone());
232
233                                if let Err(e) = session_service.append_event(&session_id, event.clone()).await {
234                                    yield Err(e);
235                                    return;
236                                }
237                                yield Ok(event);
238                            }
239                            Err(e) => {
240                                yield Err(e);
241                                return;
242                            }
243                        }
244                    }
245                }
246            }
247        };
248
249        Ok(Box::pin(s))
250    }
251
252    /// Find which agent should handle the request based on session history
253    pub fn find_agent_to_run(
254        root_agent: &Arc<dyn Agent>,
255        session: &dyn adk_session::Session,
256    ) -> Arc<dyn Agent> {
257        // Look at recent events to find last agent that responded
258        let events = session.events();
259        for i in (0..events.len()).rev() {
260            if let Some(event) = events.at(i) {
261                // Check for explicit transfer
262                if let Some(target_name) = &event.actions.transfer_to_agent {
263                    if let Some(agent) = Self::find_agent(root_agent, target_name) {
264                        return agent;
265                    }
266                }
267
268                if event.author == "user" {
269                    continue;
270                }
271
272                // Try to find this agent in the tree
273                if let Some(agent) = Self::find_agent(root_agent, &event.author) {
274                    // Check if agent allows transfer up the tree
275                    if Self::is_transferable(root_agent, &agent) {
276                        return agent;
277                    }
278                }
279            }
280        }
281
282        // Default to root agent
283        root_agent.clone()
284    }
285
286    /// Check if agent and its parent chain allow transfer up the tree
287    fn is_transferable(root_agent: &Arc<dyn Agent>, agent: &Arc<dyn Agent>) -> bool {
288        // For now, always allow transfer
289        // TODO: Check DisallowTransferToParent flag when LlmAgent supports it
290        let _ = (root_agent, agent);
291        true
292    }
293
294    /// Recursively search agent tree for agent with given name
295    pub fn find_agent(current: &Arc<dyn Agent>, target_name: &str) -> Option<Arc<dyn Agent>> {
296        if current.name() == target_name {
297            return Some(current.clone());
298        }
299
300        for sub_agent in current.sub_agents() {
301            if let Some(found) = Self::find_agent(sub_agent, target_name) {
302                return Some(found);
303            }
304        }
305
306        None
307    }
308}