adk_server/a2a/
executor.rs1use crate::a2a::{
2 events::message_to_event, metadata::to_invocation_meta, processor::EventProcessor, Message,
3 TaskState, TaskStatus, TaskStatusUpdateEvent, UpdateEvent,
4};
5use adk_core::Result;
6use adk_runner::{Runner, RunnerConfig};
7use adk_session::{CreateRequest, GetRequest};
8use futures::StreamExt;
9use std::sync::Arc;
10
11pub struct ExecutorConfig {
12 pub app_name: String,
13 pub runner_config: Arc<RunnerConfig>,
14}
15
16pub struct Executor {
17 config: ExecutorConfig,
18}
19
20impl Executor {
21 pub fn new(config: ExecutorConfig) -> Self {
22 Self { config }
23 }
24
25 pub async fn execute(
26 &self,
27 context_id: &str,
28 task_id: &str,
29 message: &Message,
30 ) -> Result<Vec<UpdateEvent>> {
31 let meta = to_invocation_meta(&self.config.app_name, context_id, None);
32
33 self.prepare_session(&meta.user_id, &meta.session_id).await?;
35
36 let invocation_id = uuid::Uuid::new_v4().to_string();
38 let event = message_to_event(message, invocation_id)?;
39
40 let runner = Runner::new(RunnerConfig {
42 app_name: self.config.runner_config.app_name.clone(),
43 agent: self.config.runner_config.agent.clone(),
44 session_service: self.config.runner_config.session_service.clone(),
45 artifact_service: self.config.runner_config.artifact_service.clone(),
46 memory_service: self.config.runner_config.memory_service.clone(),
47 })?;
48
49 let mut processor =
51 EventProcessor::new(context_id.to_string(), task_id.to_string(), meta.clone());
52
53 let mut results = vec![];
54
55 results.push(UpdateEvent::TaskStatusUpdate(TaskStatusUpdateEvent {
57 task_id: task_id.to_string(),
58 context_id: Some(context_id.to_string()),
59 status: TaskStatus { state: TaskState::Submitted, message: None },
60 final_update: false,
61 }));
62
63 results.push(UpdateEvent::TaskStatusUpdate(TaskStatusUpdateEvent {
65 task_id: task_id.to_string(),
66 context_id: Some(context_id.to_string()),
67 status: TaskStatus { state: TaskState::Working, message: None },
68 final_update: false,
69 }));
70
71 let content = event
73 .llm_response
74 .content
75 .ok_or_else(|| adk_core::AdkError::Agent("Event has no content".to_string()))?;
76
77 let mut event_stream =
78 runner.run(meta.user_id.clone(), meta.session_id.clone(), content).await?;
79
80 while let Some(result) = event_stream.next().await {
82 match result {
83 Ok(adk_event) => {
84 if let Some(artifact_event) = processor.process(&adk_event)? {
85 results.push(UpdateEvent::TaskArtifactUpdate(artifact_event));
86 }
87 }
88 Err(e) => {
89 results.push(UpdateEvent::TaskStatusUpdate(TaskStatusUpdateEvent {
91 task_id: task_id.to_string(),
92 context_id: Some(context_id.to_string()),
93 status: TaskStatus {
94 state: TaskState::Failed,
95 message: Some(e.to_string()),
96 },
97 final_update: true,
98 }));
99 return Ok(results);
100 }
101 }
102 }
103
104 for terminal_event in processor.make_terminal_events() {
106 results.push(UpdateEvent::TaskStatusUpdate(terminal_event));
107 }
108
109 Ok(results)
110 }
111
112 pub async fn cancel(&self, context_id: &str, task_id: &str) -> Result<TaskStatusUpdateEvent> {
113 Ok(TaskStatusUpdateEvent {
114 task_id: task_id.to_string(),
115 context_id: Some(context_id.to_string()),
116 status: TaskStatus { state: TaskState::Canceled, message: None },
117 final_update: true,
118 })
119 }
120
121 async fn prepare_session(&self, user_id: &str, session_id: &str) -> Result<()> {
122 let session_service = &self.config.runner_config.session_service;
123
124 let get_result = session_service
126 .get(GetRequest {
127 app_name: self.config.app_name.clone(),
128 user_id: user_id.to_string(),
129 session_id: session_id.to_string(),
130 num_recent_events: None,
131 after: None,
132 })
133 .await;
134
135 if get_result.is_ok() {
136 return Ok(());
137 }
138
139 session_service
141 .create(CreateRequest {
142 app_name: self.config.app_name.clone(),
143 user_id: user_id.to_string(),
144 session_id: Some(session_id.to_string()),
145 state: std::collections::HashMap::new(),
146 })
147 .await?;
148
149 Ok(())
150 }
151}