adk_server/a2a/
processor.rs

1use crate::a2a::{
2    metadata::{to_event_meta, InvocationMeta},
3    parts::adk_parts_to_a2a,
4    Artifact, TaskArtifactUpdateEvent, TaskState, TaskStatus, TaskStatusUpdateEvent,
5};
6use adk_core::{Event, EventActions, Result};
7
8pub struct EventProcessor {
9    context_id: String,
10    task_id: String,
11    meta: InvocationMeta,
12    terminal_actions: EventActions,
13    response_id: Option<String>,
14    terminal_state: Option<TaskState>,
15    has_artifacts: bool,
16}
17
18impl EventProcessor {
19    pub fn new(context_id: String, task_id: String, meta: InvocationMeta) -> Self {
20        Self {
21            context_id,
22            task_id,
23            meta,
24            terminal_actions: EventActions::default(),
25            response_id: None,
26            terminal_state: None,
27            has_artifacts: false,
28        }
29    }
30
31    pub fn process(&mut self, event: &Event) -> Result<Option<TaskArtifactUpdateEvent>> {
32        self.update_terminal_actions(event);
33
34        let event_meta = to_event_meta(&self.meta, event);
35        let event_meta_map: serde_json::Map<String, serde_json::Value> =
36            event_meta.into_iter().collect();
37
38        // Get content
39        let content = match &event.llm_response.content {
40            Some(c) => c,
41            None => return Ok(None),
42        };
43
44        if content.parts.is_empty() {
45            return Ok(None);
46        }
47
48        // Convert parts
49        let parts = adk_parts_to_a2a(&content.parts, &[])?;
50
51        if parts.is_empty() {
52            return Ok(None);
53        }
54
55        self.has_artifacts = true;
56
57        let artifact_event = if let Some(response_id) = &self.response_id {
58            TaskArtifactUpdateEvent {
59                task_id: self.task_id.clone(),
60                context_id: Some(self.context_id.clone()),
61                artifact: Artifact {
62                    artifact_id: response_id.clone(),
63                    name: None,
64                    description: None,
65                    parts,
66                    metadata: Some(event_meta_map),
67                    extensions: None,
68                },
69                append: true,
70                last_chunk: false,
71            }
72        } else {
73            let artifact_id = uuid::Uuid::new_v4().to_string();
74            self.response_id = Some(artifact_id.clone());
75
76            TaskArtifactUpdateEvent {
77                task_id: self.task_id.clone(),
78                context_id: Some(self.context_id.clone()),
79                artifact: Artifact {
80                    artifact_id,
81                    name: None,
82                    description: None,
83                    parts,
84                    metadata: Some(event_meta_map),
85                    extensions: None,
86                },
87                append: true,
88                last_chunk: false,
89            }
90        };
91
92        Ok(Some(artifact_event))
93    }
94
95    pub fn make_terminal_events(&self) -> Vec<TaskStatusUpdateEvent> {
96        let mut events = vec![];
97
98        // Terminal status
99        let state = self.terminal_state.clone().unwrap_or(TaskState::Completed);
100
101        events.push(TaskStatusUpdateEvent {
102            task_id: self.task_id.clone(),
103            context_id: Some(self.context_id.clone()),
104            status: TaskStatus { state, message: None },
105            final_update: true,
106        });
107
108        events
109    }
110
111    fn update_terminal_actions(&mut self, event: &Event) {
112        self.terminal_actions.escalate = self.terminal_actions.escalate || event.actions.escalate;
113        if let Some(agent) = &event.actions.transfer_to_agent {
114            self.terminal_actions.transfer_to_agent = Some(agent.clone());
115        }
116    }
117}