a2a_rs/adapter/business/
message_handler.rs1use std::sync::Arc;
4
5use async_trait::async_trait;
6
7use crate::{
8 domain::{A2AError, Message, Task, TaskState},
9 port::{AsyncMessageHandler, AsyncTaskManager},
10};
11
12#[derive(Clone)]
14pub struct DefaultMessageHandler<T>
15where
16 T: AsyncTaskManager + Send + Sync + 'static,
17{
18 task_manager: Arc<T>,
20}
21
22impl<T> DefaultMessageHandler<T>
23where
24 T: AsyncTaskManager + Send + Sync + 'static,
25{
26 pub fn new(task_manager: T) -> Self {
28 Self {
29 task_manager: Arc::new(task_manager),
30 }
31 }
32}
33
34#[async_trait]
35impl<T> AsyncMessageHandler for DefaultMessageHandler<T>
36where
37 T: AsyncTaskManager + Send + Sync + 'static,
38{
39 async fn process_message(
40 &self,
41 task_id: &str,
42 message: &Message,
43 session_id: Option<&str>,
44 ) -> Result<Task, A2AError> {
45 let task_exists = self.task_manager.task_exists(task_id).await?;
47
48 if !task_exists {
49 let context_id = session_id.unwrap_or("default");
51 self.task_manager.create_task(task_id, context_id).await?;
52 }
53
54 self.task_manager
56 .update_task_status(task_id, TaskState::Working, Some(message.clone()))
57 .await?;
58
59 let response_message = Message::builder()
61 .role(crate::domain::Role::Agent)
62 .parts(vec![crate::domain::Part::text(format!(
63 "Echo: {}",
64 message
65 .parts
66 .iter()
67 .filter_map(|p| match p {
68 crate::domain::Part::Text { text, .. } => Some(text.as_str()),
69 _ => None,
70 })
71 .collect::<Vec<_>>()
72 .join(" ")
73 ))])
74 .message_id(uuid::Uuid::new_v4().to_string())
75 .task_id(task_id.to_string())
76 .context_id(message.context_id.as_deref().unwrap_or("").to_string())
77 .build();
78
79 let final_task = self
82 .task_manager
83 .update_task_status(task_id, TaskState::Working, Some(response_message))
84 .await?;
85
86 Ok(final_task)
87 }
88}