adk_runner/
runner.rs

1use crate::InvocationContext;
2use adk_artifact::ArtifactService;
3use adk_core::{Agent, Content, EventStream, Memory, Result};
4use adk_session::SessionService;
5use async_stream::stream;
6use std::sync::Arc;
7
8pub struct RunnerConfig {
9    pub app_name: String,
10    pub agent: Arc<dyn Agent>,
11    pub session_service: Arc<dyn SessionService>,
12    pub artifact_service: Option<Arc<dyn ArtifactService>>,
13    pub memory_service: Option<Arc<dyn Memory>>,
14}
15
16pub struct Runner {
17    app_name: String,
18    root_agent: Arc<dyn Agent>,
19    session_service: Arc<dyn SessionService>,
20    artifact_service: Option<Arc<dyn ArtifactService>>,
21    memory_service: Option<Arc<dyn Memory>>,
22}
23
24impl Runner {
25    pub fn new(config: RunnerConfig) -> Result<Self> {
26        Ok(Self {
27            app_name: config.app_name,
28            root_agent: config.agent,
29            session_service: config.session_service,
30            artifact_service: config.artifact_service,
31            memory_service: config.memory_service,
32        })
33    }
34
35    pub async fn run(
36        &self,
37        user_id: String,
38        session_id: String,
39        user_content: Content,
40    ) -> Result<EventStream> {
41        let app_name = self.app_name.clone();
42        let session_service = self.session_service.clone();
43        let root_agent = self.root_agent.clone();
44        let artifact_service = self.artifact_service.clone();
45        let memory_service = self.memory_service.clone();
46
47        let s = stream! {
48            // Get or create session
49            let session = match session_service
50                .get(adk_session::GetRequest {
51                    app_name: app_name.clone(),
52                    user_id: user_id.clone(),
53                    session_id: session_id.clone(),
54                    num_recent_events: None,
55                    after: None,
56                })
57                .await
58            {
59                Ok(s) => s,
60                Err(e) => {
61                    yield Err(e);
62                    return;
63                }
64            };
65
66            // Find which agent should handle this request
67            let agent_to_run = Self::find_agent_to_run(&root_agent, session.as_ref());
68
69            // Clone services for potential reuse in transfer
70            let artifact_service_clone = artifact_service.clone();
71            let memory_service_clone = memory_service.clone();
72
73            // Create invocation context with MutableSession
74            let invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
75            let mut ctx = InvocationContext::new(
76                invocation_id.clone(),
77                agent_to_run.clone(),
78                user_id.clone(),
79                app_name.clone(),
80                session_id.clone(),
81                user_content.clone(),
82                Arc::from(session),
83            );
84
85            // Add optional services
86            if let Some(service) = artifact_service {
87                // Wrap service with ScopedArtifacts to bind session context
88                let scoped = adk_artifact::ScopedArtifacts::new(
89                    service,
90                    app_name.clone(),
91                    user_id.clone(),
92                    session_id.clone(),
93                );
94                ctx = ctx.with_artifacts(Arc::new(scoped));
95            }
96            if let Some(memory) = memory_service {
97                ctx = ctx.with_memory(memory);
98            }
99
100            let ctx = Arc::new(ctx);
101
102            // Append user message to session service (persistent storage)
103            let mut user_event = adk_core::Event::new(&invocation_id);
104            user_event.author = "user".to_string();
105            user_event.llm_response.content = Some(user_content.clone());
106
107            // Also add to mutable session for immediate visibility
108            // Note: adk_session::Event is a re-export of adk_core::Event, so we can use it directly
109            ctx.mutable_session().append_event(user_event.clone());
110
111            if let Err(e) = session_service.append_event(&session_id, user_event).await {
112                yield Err(e);
113                return;
114            }
115
116            // Run the agent
117            let mut agent_stream = match agent_to_run.run(ctx.clone()).await {
118                Ok(s) => s,
119                Err(e) => {
120                    yield Err(e);
121                    return;
122                }
123            };
124
125            // Stream events and check for transfers
126            use futures::StreamExt;
127            let mut transfer_target: Option<String> = None;
128
129            while let Some(result) = agent_stream.next().await {
130                match result {
131                    Ok(event) => {
132                        // Check for transfer action
133                        if let Some(target) = &event.actions.transfer_to_agent {
134                            transfer_target = Some(target.clone());
135                        }
136
137                        // CRITICAL: Apply state_delta to the mutable session immediately.
138                        // This is the key fix for state propagation between sequential agents.
139                        // When an agent sets output_key, it emits an event with state_delta.
140                        // We must apply this to the mutable session so downstream agents
141                        // can read the value via ctx.session().state().get().
142                        if !event.actions.state_delta.is_empty() {
143                            ctx.mutable_session().apply_state_delta(&event.actions.state_delta);
144                        }
145
146                        // Also add the event to the mutable session's event list
147                        ctx.mutable_session().append_event(event.clone());
148
149                        // Append event to session service (persistent storage)
150                        if let Err(e) = session_service.append_event(&session_id, event.clone()).await {
151                            yield Err(e);
152                            return;
153                        }
154                        yield Ok(event);
155                    }
156                    Err(e) => {
157                        yield Err(e);
158                        return;
159                    }
160                }
161            }
162
163            // If a transfer was requested, automatically invoke the target agent
164            if let Some(target_name) = transfer_target {
165                if let Some(target_agent) = Self::find_agent(&root_agent, &target_name) {
166                    // For transfers, we reuse the same mutable session to preserve state
167                    let transfer_invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
168                    let mut transfer_ctx = InvocationContext::with_mutable_session(
169                        transfer_invocation_id.clone(),
170                        target_agent.clone(),
171                        user_id.clone(),
172                        app_name.clone(),
173                        session_id.clone(),
174                        user_content.clone(),
175                        ctx.mutable_session().clone(),
176                    );
177
178                    if let Some(service) = artifact_service_clone {
179                        let scoped = adk_artifact::ScopedArtifacts::new(
180                            service,
181                            app_name.clone(),
182                            user_id.clone(),
183                            session_id.clone(),
184                        );
185                        transfer_ctx = transfer_ctx.with_artifacts(Arc::new(scoped));
186                    }
187                    if let Some(memory) = memory_service_clone {
188                        transfer_ctx = transfer_ctx.with_memory(memory);
189                    }
190
191                    let transfer_ctx = Arc::new(transfer_ctx);
192
193                    // Run the transferred agent
194                    let mut transfer_stream = match target_agent.run(transfer_ctx.clone()).await {
195                        Ok(s) => s,
196                        Err(e) => {
197                            yield Err(e);
198                            return;
199                        }
200                    };
201
202                    // Stream events from the transferred agent
203                    while let Some(result) = transfer_stream.next().await {
204                        match result {
205                            Ok(event) => {
206                                // Apply state delta for transferred agent too
207                                if !event.actions.state_delta.is_empty() {
208                                    transfer_ctx.mutable_session().apply_state_delta(&event.actions.state_delta);
209                                }
210
211                                // Add to mutable session
212                                transfer_ctx.mutable_session().append_event(event.clone());
213
214                                if let Err(e) = session_service.append_event(&session_id, event.clone()).await {
215                                    yield Err(e);
216                                    return;
217                                }
218                                yield Ok(event);
219                            }
220                            Err(e) => {
221                                yield Err(e);
222                                return;
223                            }
224                        }
225                    }
226                }
227            }
228        };
229
230        Ok(Box::pin(s))
231    }
232
233    /// Find which agent should handle the request based on session history
234    pub fn find_agent_to_run(
235        root_agent: &Arc<dyn Agent>,
236        session: &dyn adk_session::Session,
237    ) -> Arc<dyn Agent> {
238        // Look at recent events to find last agent that responded
239        let events = session.events();
240        for i in (0..events.len()).rev() {
241            if let Some(event) = events.at(i) {
242                // Check for explicit transfer
243                if let Some(target_name) = &event.actions.transfer_to_agent {
244                    if let Some(agent) = Self::find_agent(root_agent, target_name) {
245                        return agent;
246                    }
247                }
248
249                if event.author == "user" {
250                    continue;
251                }
252
253                // Try to find this agent in the tree
254                if let Some(agent) = Self::find_agent(root_agent, &event.author) {
255                    // Check if agent allows transfer up the tree
256                    if Self::is_transferable(root_agent, &agent) {
257                        return agent;
258                    }
259                }
260            }
261        }
262
263        // Default to root agent
264        root_agent.clone()
265    }
266
267    /// Check if agent and its parent chain allow transfer up the tree
268    fn is_transferable(root_agent: &Arc<dyn Agent>, agent: &Arc<dyn Agent>) -> bool {
269        // For now, always allow transfer
270        // TODO: Check DisallowTransferToParent flag when LlmAgent supports it
271        let _ = (root_agent, agent);
272        true
273    }
274
275    /// Recursively search agent tree for agent with given name
276    pub fn find_agent(current: &Arc<dyn Agent>, target_name: &str) -> Option<Arc<dyn Agent>> {
277        if current.name() == target_name {
278            return Some(current.clone());
279        }
280
281        for sub_agent in current.sub_agents() {
282            if let Some(found) = Self::find_agent(sub_agent, target_name) {
283                return Some(found);
284            }
285        }
286
287        None
288    }
289}