adk_server/a2a/
executor.rs

1use crate::a2a::{
2    events::message_to_event, metadata::to_invocation_meta, processor::EventProcessor, Message,
3    TaskState, TaskStatus, TaskStatusUpdateEvent, UpdateEvent,
4};
5use adk_core::Result;
6use adk_runner::{Runner, RunnerConfig};
7use adk_session::{CreateRequest, GetRequest};
8use futures::StreamExt;
9use std::sync::Arc;
10
11pub struct ExecutorConfig {
12    pub app_name: String,
13    pub runner_config: Arc<RunnerConfig>,
14}
15
16pub struct Executor {
17    config: ExecutorConfig,
18}
19
20impl Executor {
21    pub fn new(config: ExecutorConfig) -> Self {
22        Self { config }
23    }
24
25    pub async fn execute(
26        &self,
27        context_id: &str,
28        task_id: &str,
29        message: &Message,
30    ) -> Result<Vec<UpdateEvent>> {
31        let meta = to_invocation_meta(&self.config.app_name, context_id, None);
32
33        // Prepare session
34        self.prepare_session(&meta.user_id, &meta.session_id).await?;
35
36        // Convert message to event
37        let invocation_id = uuid::Uuid::new_v4().to_string();
38        let event = message_to_event(message, invocation_id)?;
39
40        // Create runner
41        let runner = Runner::new(RunnerConfig {
42            app_name: self.config.runner_config.app_name.clone(),
43            agent: self.config.runner_config.agent.clone(),
44            session_service: self.config.runner_config.session_service.clone(),
45            artifact_service: self.config.runner_config.artifact_service.clone(),
46            memory_service: self.config.runner_config.memory_service.clone(),
47        })?;
48
49        // Create processor
50        let mut processor =
51            EventProcessor::new(context_id.to_string(), task_id.to_string(), meta.clone());
52
53        let mut results = vec![];
54
55        // Send submitted event
56        results.push(UpdateEvent::TaskStatusUpdate(TaskStatusUpdateEvent {
57            task_id: task_id.to_string(),
58            context_id: Some(context_id.to_string()),
59            status: TaskStatus { state: TaskState::Submitted, message: None },
60            final_update: false,
61        }));
62
63        // Send working event
64        results.push(UpdateEvent::TaskStatusUpdate(TaskStatusUpdateEvent {
65            task_id: task_id.to_string(),
66            context_id: Some(context_id.to_string()),
67            status: TaskStatus { state: TaskState::Working, message: None },
68            final_update: false,
69        }));
70
71        // Run agent
72        let content = event
73            .llm_response
74            .content
75            .ok_or_else(|| adk_core::AdkError::Agent("Event has no content".to_string()))?;
76
77        let mut event_stream =
78            runner.run(meta.user_id.clone(), meta.session_id.clone(), content).await?;
79
80        // Process events
81        while let Some(result) = event_stream.next().await {
82            match result {
83                Ok(adk_event) => {
84                    if let Some(artifact_event) = processor.process(&adk_event)? {
85                        results.push(UpdateEvent::TaskArtifactUpdate(artifact_event));
86                    }
87                }
88                Err(e) => {
89                    // Send failed event
90                    results.push(UpdateEvent::TaskStatusUpdate(TaskStatusUpdateEvent {
91                        task_id: task_id.to_string(),
92                        context_id: Some(context_id.to_string()),
93                        status: TaskStatus {
94                            state: TaskState::Failed,
95                            message: Some(e.to_string()),
96                        },
97                        final_update: true,
98                    }));
99                    return Ok(results);
100                }
101            }
102        }
103
104        // Send terminal events
105        for terminal_event in processor.make_terminal_events() {
106            results.push(UpdateEvent::TaskStatusUpdate(terminal_event));
107        }
108
109        Ok(results)
110    }
111
112    pub async fn cancel(&self, context_id: &str, task_id: &str) -> Result<TaskStatusUpdateEvent> {
113        Ok(TaskStatusUpdateEvent {
114            task_id: task_id.to_string(),
115            context_id: Some(context_id.to_string()),
116            status: TaskStatus { state: TaskState::Canceled, message: None },
117            final_update: true,
118        })
119    }
120
121    async fn prepare_session(&self, user_id: &str, session_id: &str) -> Result<()> {
122        let session_service = &self.config.runner_config.session_service;
123
124        // Try to get existing session
125        let get_result = session_service
126            .get(GetRequest {
127                app_name: self.config.app_name.clone(),
128                user_id: user_id.to_string(),
129                session_id: session_id.to_string(),
130                num_recent_events: None,
131                after: None,
132            })
133            .await;
134
135        if get_result.is_ok() {
136            return Ok(());
137        }
138
139        // Create new session
140        session_service
141            .create(CreateRequest {
142                app_name: self.config.app_name.clone(),
143                user_id: user_id.to_string(),
144                session_id: Some(session_id.to_string()),
145                state: std::collections::HashMap::new(),
146            })
147            .await?;
148
149        Ok(())
150    }
151}