adk_server/a2a/
executor.rs1use crate::a2a::{
2 Message, TaskState, TaskStatus, TaskStatusUpdateEvent, UpdateEvent, events::message_to_event,
3 metadata::to_invocation_meta, processor::EventProcessor,
4};
5use adk_core::Result;
6use adk_runner::{Runner, RunnerConfig};
7use adk_session::{CreateRequest, GetRequest};
8use futures::StreamExt;
9use std::sync::Arc;
10use tokio_util::sync::CancellationToken;
11
12pub struct ExecutorConfig {
13 pub app_name: String,
14 pub runner_config: Arc<RunnerConfig>,
15 pub cancellation_token: Option<CancellationToken>,
16}
17
18pub struct Executor {
19 config: ExecutorConfig,
20}
21
22impl Executor {
23 pub fn new(config: ExecutorConfig) -> Self {
24 Self { config }
25 }
26
27 pub async fn execute(
28 &self,
29 context_id: &str,
30 task_id: &str,
31 message: &Message,
32 ) -> Result<Vec<UpdateEvent>> {
33 let meta = to_invocation_meta(&self.config.app_name, context_id, None);
34 let cancellation_token = self.config.cancellation_token.clone();
35
36 self.prepare_session(&meta.user_id, &meta.session_id).await?;
38
39 let invocation_id = uuid::Uuid::new_v4().to_string();
41 let event = message_to_event(message, invocation_id)?;
42
43 let runner = Runner::new(RunnerConfig {
45 app_name: self.config.runner_config.app_name.clone(),
46 agent: self.config.runner_config.agent.clone(),
47 session_service: self.config.runner_config.session_service.clone(),
48 artifact_service: self.config.runner_config.artifact_service.clone(),
49 memory_service: self.config.runner_config.memory_service.clone(),
50 plugin_manager: self.config.runner_config.plugin_manager.clone(),
51 run_config: self.config.runner_config.run_config.clone(),
52 compaction_config: self.config.runner_config.compaction_config.clone(),
53 context_cache_config: self.config.runner_config.context_cache_config.clone(),
54 cache_capable: self.config.runner_config.cache_capable.clone(),
55 request_context: self.config.runner_config.request_context.clone(),
56 cancellation_token: cancellation_token.clone(),
57 })?;
58
59 let mut processor =
61 EventProcessor::new(context_id.to_string(), task_id.to_string(), meta.clone());
62
63 let mut results = vec![];
64
65 results.push(UpdateEvent::TaskStatusUpdate(TaskStatusUpdateEvent {
67 task_id: task_id.to_string(),
68 context_id: Some(context_id.to_string()),
69 status: TaskStatus { state: TaskState::Submitted, message: None },
70 final_update: false,
71 }));
72
73 results.push(UpdateEvent::TaskStatusUpdate(TaskStatusUpdateEvent {
75 task_id: task_id.to_string(),
76 context_id: Some(context_id.to_string()),
77 status: TaskStatus { state: TaskState::Working, message: None },
78 final_update: false,
79 }));
80
81 let content = event
83 .llm_response
84 .content
85 .ok_or_else(|| adk_core::AdkError::Agent("Event has no content".to_string()))?;
86
87 let mut event_stream =
88 runner.run(meta.user_id.clone(), meta.session_id.clone(), content).await?;
89
90 while let Some(result) = event_stream.next().await {
92 if cancellation_token.as_ref().is_some_and(CancellationToken::is_cancelled) {
93 results.push(UpdateEvent::TaskStatusUpdate(TaskStatusUpdateEvent {
94 task_id: task_id.to_string(),
95 context_id: Some(context_id.to_string()),
96 status: TaskStatus { state: TaskState::Canceled, message: None },
97 final_update: true,
98 }));
99 return Ok(results);
100 }
101
102 match result {
103 Ok(adk_event) => {
104 if let Some(artifact_event) = processor.process(&adk_event)? {
105 results.push(UpdateEvent::TaskArtifactUpdate(artifact_event));
106 }
107 }
108 Err(e) => {
109 results.push(UpdateEvent::TaskStatusUpdate(TaskStatusUpdateEvent {
111 task_id: task_id.to_string(),
112 context_id: Some(context_id.to_string()),
113 status: TaskStatus {
114 state: TaskState::Failed,
115 message: Some(e.to_string()),
116 },
117 final_update: true,
118 }));
119 return Ok(results);
120 }
121 }
122 }
123
124 if cancellation_token.as_ref().is_some_and(CancellationToken::is_cancelled) {
125 results.push(UpdateEvent::TaskStatusUpdate(TaskStatusUpdateEvent {
126 task_id: task_id.to_string(),
127 context_id: Some(context_id.to_string()),
128 status: TaskStatus { state: TaskState::Canceled, message: None },
129 final_update: true,
130 }));
131 return Ok(results);
132 }
133
134 for terminal_event in processor.make_terminal_events() {
136 results.push(UpdateEvent::TaskStatusUpdate(terminal_event));
137 }
138
139 Ok(results)
140 }
141
142 pub async fn cancel(&self, context_id: &str, task_id: &str) -> Result<TaskStatusUpdateEvent> {
143 Ok(TaskStatusUpdateEvent {
144 task_id: task_id.to_string(),
145 context_id: Some(context_id.to_string()),
146 status: TaskStatus { state: TaskState::Canceled, message: None },
147 final_update: true,
148 })
149 }
150
151 async fn prepare_session(&self, user_id: &str, session_id: &str) -> Result<()> {
152 let session_service = &self.config.runner_config.session_service;
153
154 let get_result = session_service
156 .get(GetRequest {
157 app_name: self.config.app_name.clone(),
158 user_id: user_id.to_string(),
159 session_id: session_id.to_string(),
160 num_recent_events: None,
161 after: None,
162 })
163 .await;
164
165 if get_result.is_ok() {
166 return Ok(());
167 }
168
169 session_service
171 .create(CreateRequest {
172 app_name: self.config.app_name.clone(),
173 user_id: user_id.to_string(),
174 session_id: Some(session_id.to_string()),
175 state: std::collections::HashMap::new(),
176 })
177 .await?;
178
179 Ok(())
180 }
181}