adk_server/a2a/
processor.rs1use crate::a2a::{
2 metadata::{to_event_meta, InvocationMeta},
3 parts::adk_parts_to_a2a,
4 Artifact, TaskArtifactUpdateEvent, TaskState, TaskStatus, TaskStatusUpdateEvent,
5};
6use adk_core::{Event, EventActions, Result};
7
8pub struct EventProcessor {
9 context_id: String,
10 task_id: String,
11 meta: InvocationMeta,
12 terminal_actions: EventActions,
13 response_id: Option<String>,
14 terminal_state: Option<TaskState>,
15 has_artifacts: bool,
16}
17
18impl EventProcessor {
19 pub fn new(context_id: String, task_id: String, meta: InvocationMeta) -> Self {
20 Self {
21 context_id,
22 task_id,
23 meta,
24 terminal_actions: EventActions::default(),
25 response_id: None,
26 terminal_state: None,
27 has_artifacts: false,
28 }
29 }
30
31 pub fn process(&mut self, event: &Event) -> Result<Option<TaskArtifactUpdateEvent>> {
32 self.update_terminal_actions(event);
33
34 let event_meta = to_event_meta(&self.meta, event);
35 let event_meta_map: serde_json::Map<String, serde_json::Value> =
36 event_meta.into_iter().collect();
37
38 let content = match &event.llm_response.content {
40 Some(c) => c,
41 None => return Ok(None),
42 };
43
44 if content.parts.is_empty() {
45 return Ok(None);
46 }
47
48 let parts = adk_parts_to_a2a(&content.parts, &[])?;
50
51 if parts.is_empty() {
52 return Ok(None);
53 }
54
55 self.has_artifacts = true;
56
57 let artifact_event = if let Some(response_id) = &self.response_id {
58 TaskArtifactUpdateEvent {
59 task_id: self.task_id.clone(),
60 context_id: Some(self.context_id.clone()),
61 artifact: Artifact {
62 artifact_id: response_id.clone(),
63 name: None,
64 description: None,
65 parts,
66 metadata: Some(event_meta_map),
67 extensions: None,
68 },
69 append: true,
70 last_chunk: false,
71 }
72 } else {
73 let artifact_id = uuid::Uuid::new_v4().to_string();
74 self.response_id = Some(artifact_id.clone());
75
76 TaskArtifactUpdateEvent {
77 task_id: self.task_id.clone(),
78 context_id: Some(self.context_id.clone()),
79 artifact: Artifact {
80 artifact_id,
81 name: None,
82 description: None,
83 parts,
84 metadata: Some(event_meta_map),
85 extensions: None,
86 },
87 append: true,
88 last_chunk: false,
89 }
90 };
91
92 Ok(Some(artifact_event))
93 }
94
95 pub fn make_terminal_events(&self) -> Vec<TaskStatusUpdateEvent> {
96 let mut events = vec![];
97
98 let state = self.terminal_state.clone().unwrap_or(TaskState::Completed);
100
101 events.push(TaskStatusUpdateEvent {
102 task_id: self.task_id.clone(),
103 context_id: Some(self.context_id.clone()),
104 status: TaskStatus { state, message: None },
105 final_update: true,
106 });
107
108 events
109 }
110
111 fn update_terminal_actions(&mut self, event: &Event) {
112 self.terminal_actions.escalate = self.terminal_actions.escalate || event.actions.escalate;
113 if let Some(agent) = &event.actions.transfer_to_agent {
114 self.terminal_actions.transfer_to_agent = Some(agent.clone());
115 }
116 }
117}