adk_server/a2a/
executor.rs

1use crate::a2a::{
2    Message, TaskState, TaskStatus, TaskStatusUpdateEvent, UpdateEvent, events::message_to_event,
3    metadata::to_invocation_meta, processor::EventProcessor,
4};
5use adk_core::Result;
6use adk_runner::{Runner, RunnerConfig};
7use adk_session::{CreateRequest, GetRequest};
8use futures::StreamExt;
9use std::sync::Arc;
10
11pub struct ExecutorConfig {
12    pub app_name: String,
13    pub runner_config: Arc<RunnerConfig>,
14}
15
16pub struct Executor {
17    config: ExecutorConfig,
18}
19
20impl Executor {
21    pub fn new(config: ExecutorConfig) -> Self {
22        Self { config }
23    }
24
25    pub async fn execute(
26        &self,
27        context_id: &str,
28        task_id: &str,
29        message: &Message,
30    ) -> Result<Vec<UpdateEvent>> {
31        let meta = to_invocation_meta(&self.config.app_name, context_id, None);
32
33        // Prepare session
34        self.prepare_session(&meta.user_id, &meta.session_id).await?;
35
36        // Convert message to event
37        let invocation_id = uuid::Uuid::new_v4().to_string();
38        let event = message_to_event(message, invocation_id)?;
39
40        // Create runner
41        let runner = Runner::new(RunnerConfig {
42            app_name: self.config.runner_config.app_name.clone(),
43            agent: self.config.runner_config.agent.clone(),
44            session_service: self.config.runner_config.session_service.clone(),
45            artifact_service: self.config.runner_config.artifact_service.clone(),
46            memory_service: self.config.runner_config.memory_service.clone(),
47            run_config: None,
48        })?;
49
50        // Create processor
51        let mut processor =
52            EventProcessor::new(context_id.to_string(), task_id.to_string(), meta.clone());
53
54        let mut results = vec![];
55
56        // Send submitted event
57        results.push(UpdateEvent::TaskStatusUpdate(TaskStatusUpdateEvent {
58            task_id: task_id.to_string(),
59            context_id: Some(context_id.to_string()),
60            status: TaskStatus { state: TaskState::Submitted, message: None },
61            final_update: false,
62        }));
63
64        // Send working event
65        results.push(UpdateEvent::TaskStatusUpdate(TaskStatusUpdateEvent {
66            task_id: task_id.to_string(),
67            context_id: Some(context_id.to_string()),
68            status: TaskStatus { state: TaskState::Working, message: None },
69            final_update: false,
70        }));
71
72        // Run agent
73        let content = event
74            .llm_response
75            .content
76            .ok_or_else(|| adk_core::AdkError::Agent("Event has no content".to_string()))?;
77
78        let mut event_stream =
79            runner.run(meta.user_id.clone(), meta.session_id.clone(), content).await?;
80
81        // Process events
82        while let Some(result) = event_stream.next().await {
83            match result {
84                Ok(adk_event) => {
85                    if let Some(artifact_event) = processor.process(&adk_event)? {
86                        results.push(UpdateEvent::TaskArtifactUpdate(artifact_event));
87                    }
88                }
89                Err(e) => {
90                    // Send failed event
91                    results.push(UpdateEvent::TaskStatusUpdate(TaskStatusUpdateEvent {
92                        task_id: task_id.to_string(),
93                        context_id: Some(context_id.to_string()),
94                        status: TaskStatus {
95                            state: TaskState::Failed,
96                            message: Some(e.to_string()),
97                        },
98                        final_update: true,
99                    }));
100                    return Ok(results);
101                }
102            }
103        }
104
105        // Send terminal events
106        for terminal_event in processor.make_terminal_events() {
107            results.push(UpdateEvent::TaskStatusUpdate(terminal_event));
108        }
109
110        Ok(results)
111    }
112
113    pub async fn cancel(&self, context_id: &str, task_id: &str) -> Result<TaskStatusUpdateEvent> {
114        Ok(TaskStatusUpdateEvent {
115            task_id: task_id.to_string(),
116            context_id: Some(context_id.to_string()),
117            status: TaskStatus { state: TaskState::Canceled, message: None },
118            final_update: true,
119        })
120    }
121
122    async fn prepare_session(&self, user_id: &str, session_id: &str) -> Result<()> {
123        let session_service = &self.config.runner_config.session_service;
124
125        // Try to get existing session
126        let get_result = session_service
127            .get(GetRequest {
128                app_name: self.config.app_name.clone(),
129                user_id: user_id.to_string(),
130                session_id: session_id.to_string(),
131                num_recent_events: None,
132                after: None,
133            })
134            .await;
135
136        if get_result.is_ok() {
137            return Ok(());
138        }
139
140        // Create new session
141        session_service
142            .create(CreateRequest {
143                app_name: self.config.app_name.clone(),
144                user_id: user_id.to_string(),
145                session_id: Some(session_id.to_string()),
146                state: std::collections::HashMap::new(),
147            })
148            .await?;
149
150        Ok(())
151    }
152}