adk_runner/
runner.rs

1use crate::InvocationContext;
2use adk_artifact::ArtifactService;
3use adk_core::{Agent, Content, EventStream, Memory, Result};
4use adk_session::SessionService;
5use async_stream::stream;
6use std::sync::Arc;
7
8pub struct RunnerConfig {
9    pub app_name: String,
10    pub agent: Arc<dyn Agent>,
11    pub session_service: Arc<dyn SessionService>,
12    pub artifact_service: Option<Arc<dyn ArtifactService>>,
13    pub memory_service: Option<Arc<dyn Memory>>,
14}
15
16pub struct Runner {
17    app_name: String,
18    root_agent: Arc<dyn Agent>,
19    session_service: Arc<dyn SessionService>,
20    artifact_service: Option<Arc<dyn ArtifactService>>,
21    memory_service: Option<Arc<dyn Memory>>,
22}
23
24impl Runner {
25    pub fn new(config: RunnerConfig) -> Result<Self> {
26        Ok(Self {
27            app_name: config.app_name,
28            root_agent: config.agent,
29            session_service: config.session_service,
30            artifact_service: config.artifact_service,
31            memory_service: config.memory_service,
32        })
33    }
34
35    pub async fn run(
36        &self,
37        user_id: String,
38        session_id: String,
39        user_content: Content,
40    ) -> Result<EventStream> {
41        let app_name = self.app_name.clone();
42        let session_service = self.session_service.clone();
43        let root_agent = self.root_agent.clone();
44        let artifact_service = self.artifact_service.clone();
45        let memory_service = self.memory_service.clone();
46
47        let s = stream! {
48            // Get or create session
49            let session = match session_service
50                .get(adk_session::GetRequest {
51                    app_name: app_name.clone(),
52                    user_id: user_id.clone(),
53                    session_id: session_id.clone(),
54                    num_recent_events: None,
55                    after: None,
56                })
57                .await
58            {
59                Ok(s) => s,
60                Err(e) => {
61                    yield Err(e);
62                    return;
63                }
64            };
65
66            // Find which agent should handle this request
67            let agent_to_run = Self::find_agent_to_run(&root_agent, session.as_ref());
68
69            // Clone services for potential reuse in transfer
70            let artifact_service_clone = artifact_service.clone();
71            let memory_service_clone = memory_service.clone();
72
73            // Create invocation context
74            let invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
75            let mut ctx = InvocationContext::new(
76                invocation_id.clone(),
77                agent_to_run.clone(),
78                user_id.clone(),
79                app_name.clone(),
80                session_id.clone(),
81                user_content.clone(),
82                Arc::from(session),
83            );
84
85            // Add optional services
86            if let Some(service) = artifact_service {
87                // Wrap service with ScopedArtifacts to bind session context
88                let scoped = adk_artifact::ScopedArtifacts::new(
89                    service,
90                    app_name.clone(),
91                    user_id.clone(),
92                    session_id.clone(),
93                );
94                ctx = ctx.with_artifacts(Arc::new(scoped));
95            }
96            if let Some(memory) = memory_service {
97                ctx = ctx.with_memory(memory);
98            }
99
100            let ctx = Arc::new(ctx);
101
102            // Append user message to session
103            let mut user_event = adk_core::Event::new(&invocation_id);
104            user_event.author = "user".to_string();
105            user_event.llm_response.content = Some(user_content.clone());
106
107            if let Err(e) = session_service.append_event(&session_id, user_event).await {
108                yield Err(e);
109                return;
110            }
111
112            // Run the agent
113            let mut agent_stream = match agent_to_run.run(ctx).await {
114                Ok(s) => s,
115                Err(e) => {
116                    yield Err(e);
117                    return;
118                }
119            };
120
121            // Stream events and check for transfers
122            use futures::StreamExt;
123            let mut transfer_target: Option<String> = None;
124
125            while let Some(result) = agent_stream.next().await {
126                match result {
127                    Ok(event) => {
128                        // Check for transfer action
129                        if let Some(target) = &event.actions.transfer_to_agent {
130                            transfer_target = Some(target.clone());
131                        }
132
133                        // Append event to session (Event types are now unified)
134                        if let Err(e) = session_service.append_event(&session_id, event.clone()).await {
135                            yield Err(e);
136                            return;
137                        }
138                        yield Ok(event);
139                    }
140                    Err(e) => {
141                        yield Err(e);
142                        return;
143                    }
144                }
145            }
146
147            // If a transfer was requested, automatically invoke the target agent
148            if let Some(target_name) = transfer_target {
149                if let Some(target_agent) = Self::find_agent(&root_agent, &target_name) {
150                    // Get fresh session state
151                    let transfer_session = match session_service
152                        .get(adk_session::GetRequest {
153                            app_name: app_name.clone(),
154                            user_id: user_id.clone(),
155                            session_id: session_id.clone(),
156                            num_recent_events: None,
157                            after: None,
158                        })
159                        .await
160                    {
161                        Ok(s) => s,
162                        Err(e) => {
163                            yield Err(e);
164                            return;
165                        }
166                    };
167
168                    // Create new context for the transferred agent
169                    let transfer_invocation_id = format!("inv-{}", uuid::Uuid::new_v4());
170                    let mut transfer_ctx = InvocationContext::new(
171                        transfer_invocation_id.clone(),
172                        target_agent.clone(),
173                        user_id.clone(),
174                        app_name.clone(),
175                        session_id.clone(),
176                        user_content.clone(),
177                        Arc::from(transfer_session),
178                    );
179
180                    if let Some(service) = artifact_service_clone {
181                        let scoped = adk_artifact::ScopedArtifacts::new(
182                            service,
183                            app_name.clone(),
184                            user_id.clone(),
185                            session_id.clone(),
186                        );
187                        transfer_ctx = transfer_ctx.with_artifacts(Arc::new(scoped));
188                    }
189                    if let Some(memory) = memory_service_clone {
190                        transfer_ctx = transfer_ctx.with_memory(memory);
191                    }
192
193                    let transfer_ctx = Arc::new(transfer_ctx);
194
195                    // Run the transferred agent
196                    let mut transfer_stream = match target_agent.run(transfer_ctx).await {
197                        Ok(s) => s,
198                        Err(e) => {
199                            yield Err(e);
200                            return;
201                        }
202                    };
203
204                    // Stream events from the transferred agent
205                    while let Some(result) = transfer_stream.next().await {
206                        match result {
207                            Ok(event) => {
208                                if let Err(e) = session_service.append_event(&session_id, event.clone()).await {
209                                    yield Err(e);
210                                    return;
211                                }
212                                yield Ok(event);
213                            }
214                            Err(e) => {
215                                yield Err(e);
216                                return;
217                            }
218                        }
219                    }
220                }
221            }
222        };
223
224        Ok(Box::pin(s))
225    }
226
227    /// Find which agent should handle the request based on session history
228    pub fn find_agent_to_run(
229        root_agent: &Arc<dyn Agent>,
230        session: &dyn adk_session::Session,
231    ) -> Arc<dyn Agent> {
232        // Look at recent events to find last agent that responded
233        let events = session.events();
234        for i in (0..events.len()).rev() {
235            if let Some(event) = events.at(i) {
236                // Check for explicit transfer
237                if let Some(target_name) = &event.actions.transfer_to_agent {
238                    if let Some(agent) = Self::find_agent(root_agent, target_name) {
239                        return agent;
240                    }
241                }
242
243                if event.author == "user" {
244                    continue;
245                }
246
247                // Try to find this agent in the tree
248                if let Some(agent) = Self::find_agent(root_agent, &event.author) {
249                    // Check if agent allows transfer up the tree
250                    if Self::is_transferable(root_agent, &agent) {
251                        return agent;
252                    }
253                }
254            }
255        }
256
257        // Default to root agent
258        root_agent.clone()
259    }
260
261    /// Check if agent and its parent chain allow transfer up the tree
262    fn is_transferable(root_agent: &Arc<dyn Agent>, agent: &Arc<dyn Agent>) -> bool {
263        // For now, always allow transfer
264        // TODO: Check DisallowTransferToParent flag when LlmAgent supports it
265        let _ = (root_agent, agent);
266        true
267    }
268
269    /// Recursively search agent tree for agent with given name
270    pub fn find_agent(current: &Arc<dyn Agent>, target_name: &str) -> Option<Arc<dyn Agent>> {
271        if current.name() == target_name {
272            return Some(current.clone());
273        }
274
275        for sub_agent in current.sub_agents() {
276            if let Some(found) = Self::find_agent(sub_agent, target_name) {
277                return Some(found);
278            }
279        }
280
281        None
282    }
283}
284
285// TODO: Add unit tests for transfer logic