Skip to main content

a2a_rs/adapter/business/
message_handler.rs

1//! Default message handler implementation
2
3use std::sync::Arc;
4
5use async_trait::async_trait;
6
7use crate::{
8    domain::{A2AError, Message, Task, TaskState},
9    port::{AsyncMessageHandler, AsyncTaskManager},
10};
11
12/// Default message handler that processes messages and delegates to task manager
13#[derive(Clone)]
14pub struct DefaultMessageHandler<T>
15where
16    T: AsyncTaskManager + Send + Sync + 'static,
17{
18    /// Task manager for handling task operations
19    task_manager: Arc<T>,
20}
21
22impl<T> DefaultMessageHandler<T>
23where
24    T: AsyncTaskManager + Send + Sync + 'static,
25{
26    /// Create a new message handler with the given task manager
27    pub fn new(task_manager: T) -> Self {
28        Self {
29            task_manager: Arc::new(task_manager),
30        }
31    }
32}
33
34#[async_trait]
35impl<T> AsyncMessageHandler for DefaultMessageHandler<T>
36where
37    T: AsyncTaskManager + Send + Sync + 'static,
38{
39    async fn process_message(
40        &self,
41        task_id: &str,
42        message: &Message,
43        session_id: Option<&str>,
44    ) -> Result<Task, A2AError> {
45        // Check if task exists
46        let task_exists = self.task_manager.task_exists(task_id).await?;
47
48        if !task_exists {
49            // Create a new task
50            let context_id = session_id.unwrap_or("default");
51            self.task_manager.create_task(task_id, context_id).await?;
52        }
53
54        // First, update the task with the incoming message to add it to history
55        self.task_manager
56            .update_task_status(task_id, TaskState::Working, Some(message.clone()))
57            .await?;
58
59        // Create a simple echo response
60        let response_message = Message::builder()
61            .role(crate::domain::Role::Agent)
62            .parts(vec![crate::domain::Part::text(format!(
63                "Echo: {}",
64                message
65                    .parts
66                    .iter()
67                    .filter_map(|p| match p {
68                        crate::domain::Part::Text { text, .. } => Some(text.as_str()),
69                        _ => None,
70                    })
71                    .collect::<Vec<_>>()
72                    .join(" ")
73            ))])
74            .message_id(uuid::Uuid::new_v4().to_string())
75            .task_id(task_id.to_string())
76            .context_id(message.context_id.as_deref().unwrap_or("").to_string())
77            .build();
78
79        // For the default handler, we'll add the response message to history but keep the task in Working state
80        // Real agents would process the message and determine the appropriate final state
81        let final_task = self
82            .task_manager
83            .update_task_status(task_id, TaskState::Working, Some(response_message))
84            .await?;
85
86        Ok(final_task)
87    }
88}