adk_server/a2a/
executor.rs1use crate::a2a::{
2 Message, TaskState, TaskStatus, TaskStatusUpdateEvent, UpdateEvent, events::message_to_event,
3 metadata::to_invocation_meta, processor::EventProcessor,
4};
5use adk_core::Result;
6use adk_runner::{Runner, RunnerConfig};
7use adk_session::{CreateRequest, GetRequest};
8use futures::StreamExt;
9use std::sync::Arc;
10
11pub struct ExecutorConfig {
12 pub app_name: String,
13 pub runner_config: Arc<RunnerConfig>,
14}
15
16pub struct Executor {
17 config: ExecutorConfig,
18}
19
20impl Executor {
21 pub fn new(config: ExecutorConfig) -> Self {
22 Self { config }
23 }
24
25 pub async fn execute(
26 &self,
27 context_id: &str,
28 task_id: &str,
29 message: &Message,
30 ) -> Result<Vec<UpdateEvent>> {
31 let meta = to_invocation_meta(&self.config.app_name, context_id, None);
32
33 self.prepare_session(&meta.user_id, &meta.session_id).await?;
35
36 let invocation_id = uuid::Uuid::new_v4().to_string();
38 let event = message_to_event(message, invocation_id)?;
39
40 let runner = Runner::new(RunnerConfig {
42 app_name: self.config.runner_config.app_name.clone(),
43 agent: self.config.runner_config.agent.clone(),
44 session_service: self.config.runner_config.session_service.clone(),
45 artifact_service: self.config.runner_config.artifact_service.clone(),
46 memory_service: self.config.runner_config.memory_service.clone(),
47 run_config: None,
48 })?;
49
50 let mut processor =
52 EventProcessor::new(context_id.to_string(), task_id.to_string(), meta.clone());
53
54 let mut results = vec![];
55
56 results.push(UpdateEvent::TaskStatusUpdate(TaskStatusUpdateEvent {
58 task_id: task_id.to_string(),
59 context_id: Some(context_id.to_string()),
60 status: TaskStatus { state: TaskState::Submitted, message: None },
61 final_update: false,
62 }));
63
64 results.push(UpdateEvent::TaskStatusUpdate(TaskStatusUpdateEvent {
66 task_id: task_id.to_string(),
67 context_id: Some(context_id.to_string()),
68 status: TaskStatus { state: TaskState::Working, message: None },
69 final_update: false,
70 }));
71
72 let content = event
74 .llm_response
75 .content
76 .ok_or_else(|| adk_core::AdkError::Agent("Event has no content".to_string()))?;
77
78 let mut event_stream =
79 runner.run(meta.user_id.clone(), meta.session_id.clone(), content).await?;
80
81 while let Some(result) = event_stream.next().await {
83 match result {
84 Ok(adk_event) => {
85 if let Some(artifact_event) = processor.process(&adk_event)? {
86 results.push(UpdateEvent::TaskArtifactUpdate(artifact_event));
87 }
88 }
89 Err(e) => {
90 results.push(UpdateEvent::TaskStatusUpdate(TaskStatusUpdateEvent {
92 task_id: task_id.to_string(),
93 context_id: Some(context_id.to_string()),
94 status: TaskStatus {
95 state: TaskState::Failed,
96 message: Some(e.to_string()),
97 },
98 final_update: true,
99 }));
100 return Ok(results);
101 }
102 }
103 }
104
105 for terminal_event in processor.make_terminal_events() {
107 results.push(UpdateEvent::TaskStatusUpdate(terminal_event));
108 }
109
110 Ok(results)
111 }
112
113 pub async fn cancel(&self, context_id: &str, task_id: &str) -> Result<TaskStatusUpdateEvent> {
114 Ok(TaskStatusUpdateEvent {
115 task_id: task_id.to_string(),
116 context_id: Some(context_id.to_string()),
117 status: TaskStatus { state: TaskState::Canceled, message: None },
118 final_update: true,
119 })
120 }
121
122 async fn prepare_session(&self, user_id: &str, session_id: &str) -> Result<()> {
123 let session_service = &self.config.runner_config.session_service;
124
125 let get_result = session_service
127 .get(GetRequest {
128 app_name: self.config.app_name.clone(),
129 user_id: user_id.to_string(),
130 session_id: session_id.to_string(),
131 num_recent_events: None,
132 after: None,
133 })
134 .await;
135
136 if get_result.is_ok() {
137 return Ok(());
138 }
139
140 session_service
142 .create(CreateRequest {
143 app_name: self.config.app_name.clone(),
144 user_id: user_id.to_string(),
145 session_id: Some(session_id.to_string()),
146 state: std::collections::HashMap::new(),
147 })
148 .await?;
149
150 Ok(())
151 }
152}