Skip to main content

adk_server/a2a/
executor.rs

1use crate::a2a::{
2    Message, TaskState, TaskStatus, TaskStatusUpdateEvent, UpdateEvent, events::message_to_event,
3    metadata::to_invocation_meta, processor::EventProcessor,
4};
5use adk_core::Result;
6use adk_runner::{Runner, RunnerConfig};
7use adk_session::{CreateRequest, GetRequest};
8use futures::StreamExt;
9use std::sync::Arc;
10use tokio_util::sync::CancellationToken;
11
12pub struct ExecutorConfig {
13    pub app_name: String,
14    pub runner_config: Arc<RunnerConfig>,
15    pub cancellation_token: Option<CancellationToken>,
16}
17
18pub struct Executor {
19    config: ExecutorConfig,
20}
21
22impl Executor {
23    pub fn new(config: ExecutorConfig) -> Self {
24        Self { config }
25    }
26
27    pub async fn execute(
28        &self,
29        context_id: &str,
30        task_id: &str,
31        message: &Message,
32    ) -> Result<Vec<UpdateEvent>> {
33        let meta = to_invocation_meta(&self.config.app_name, context_id, None);
34        let cancellation_token = self.config.cancellation_token.clone();
35
36        // Prepare session
37        self.prepare_session(&meta.user_id, &meta.session_id).await?;
38
39        // Convert message to event
40        let invocation_id = uuid::Uuid::new_v4().to_string();
41        let event = message_to_event(message, invocation_id)?;
42
43        // Create runner
44        let runner = Runner::new(RunnerConfig {
45            app_name: self.config.runner_config.app_name.clone(),
46            agent: self.config.runner_config.agent.clone(),
47            session_service: self.config.runner_config.session_service.clone(),
48            artifact_service: self.config.runner_config.artifact_service.clone(),
49            memory_service: self.config.runner_config.memory_service.clone(),
50            plugin_manager: self.config.runner_config.plugin_manager.clone(),
51            run_config: self.config.runner_config.run_config.clone(),
52            compaction_config: self.config.runner_config.compaction_config.clone(),
53            context_cache_config: self.config.runner_config.context_cache_config.clone(),
54            cache_capable: self.config.runner_config.cache_capable.clone(),
55            request_context: self.config.runner_config.request_context.clone(),
56            cancellation_token: cancellation_token.clone(),
57        })?;
58
59        // Create processor
60        let mut processor =
61            EventProcessor::new(context_id.to_string(), task_id.to_string(), meta.clone());
62
63        let mut results = vec![];
64
65        // Send submitted event
66        results.push(UpdateEvent::TaskStatusUpdate(TaskStatusUpdateEvent {
67            task_id: task_id.to_string(),
68            context_id: Some(context_id.to_string()),
69            status: TaskStatus { state: TaskState::Submitted, message: None },
70            final_update: false,
71        }));
72
73        // Send working event
74        results.push(UpdateEvent::TaskStatusUpdate(TaskStatusUpdateEvent {
75            task_id: task_id.to_string(),
76            context_id: Some(context_id.to_string()),
77            status: TaskStatus { state: TaskState::Working, message: None },
78            final_update: false,
79        }));
80
81        // Run agent
82        let content = event
83            .llm_response
84            .content
85            .ok_or_else(|| adk_core::AdkError::Agent("Event has no content".to_string()))?;
86
87        let mut event_stream =
88            runner.run(meta.user_id.clone(), meta.session_id.clone(), content).await?;
89
90        // Process events
91        while let Some(result) = event_stream.next().await {
92            if cancellation_token.as_ref().is_some_and(CancellationToken::is_cancelled) {
93                results.push(UpdateEvent::TaskStatusUpdate(TaskStatusUpdateEvent {
94                    task_id: task_id.to_string(),
95                    context_id: Some(context_id.to_string()),
96                    status: TaskStatus { state: TaskState::Canceled, message: None },
97                    final_update: true,
98                }));
99                return Ok(results);
100            }
101
102            match result {
103                Ok(adk_event) => {
104                    if let Some(artifact_event) = processor.process(&adk_event)? {
105                        results.push(UpdateEvent::TaskArtifactUpdate(artifact_event));
106                    }
107                }
108                Err(e) => {
109                    // Send failed event
110                    results.push(UpdateEvent::TaskStatusUpdate(TaskStatusUpdateEvent {
111                        task_id: task_id.to_string(),
112                        context_id: Some(context_id.to_string()),
113                        status: TaskStatus {
114                            state: TaskState::Failed,
115                            message: Some(e.to_string()),
116                        },
117                        final_update: true,
118                    }));
119                    return Ok(results);
120                }
121            }
122        }
123
124        if cancellation_token.as_ref().is_some_and(CancellationToken::is_cancelled) {
125            results.push(UpdateEvent::TaskStatusUpdate(TaskStatusUpdateEvent {
126                task_id: task_id.to_string(),
127                context_id: Some(context_id.to_string()),
128                status: TaskStatus { state: TaskState::Canceled, message: None },
129                final_update: true,
130            }));
131            return Ok(results);
132        }
133
134        // Send terminal events
135        for terminal_event in processor.make_terminal_events() {
136            results.push(UpdateEvent::TaskStatusUpdate(terminal_event));
137        }
138
139        Ok(results)
140    }
141
142    pub async fn cancel(&self, context_id: &str, task_id: &str) -> Result<TaskStatusUpdateEvent> {
143        Ok(TaskStatusUpdateEvent {
144            task_id: task_id.to_string(),
145            context_id: Some(context_id.to_string()),
146            status: TaskStatus { state: TaskState::Canceled, message: None },
147            final_update: true,
148        })
149    }
150
151    async fn prepare_session(&self, user_id: &str, session_id: &str) -> Result<()> {
152        let session_service = &self.config.runner_config.session_service;
153
154        // Try to get existing session
155        let get_result = session_service
156            .get(GetRequest {
157                app_name: self.config.app_name.clone(),
158                user_id: user_id.to_string(),
159                session_id: session_id.to_string(),
160                num_recent_events: None,
161                after: None,
162            })
163            .await;
164
165        if get_result.is_ok() {
166            return Ok(());
167        }
168
169        // Create new session
170        session_service
171            .create(CreateRequest {
172                app_name: self.config.app_name.clone(),
173                user_id: user_id.to_string(),
174                session_id: Some(session_id.to_string()),
175                state: std::collections::HashMap::new(),
176            })
177            .await?;
178
179        Ok(())
180    }
181}