Skip to main content

agentlink_sdk/services/
message_service.rs

1//! Message Service
2
3use std::sync::Arc;
4
5use serde_json::Value;
6use tokio::sync::RwLock;
7
8use crate::error::SdkResult;
9use crate::http::HttpClient;
10use crate::mqtt::client::MqttClient;
11use crate::protocols::message::{Message, MessagesResponse, SendMessageRequest};
12
13/// 消息服务
14pub struct MessageService {
15    http: Arc<HttpClient>,
16    mqtt: Option<Arc<RwLock<MqttClient>>>,
17}
18
19impl MessageService {
20    /// 创建新的消息服务
21    pub fn new(http: Arc<HttpClient>) -> Self {
22        Self { http, mqtt: None }
23    }
24
25    /// 使用 MQTT 客户端创建消息服务
26    pub fn with_mqtt(http: Arc<HttpClient>, mqtt: Arc<RwLock<MqttClient>>) -> Self {
27        Self {
28            http,
29            mqtt: Some(mqtt),
30        }
31    }
32
33    /// 获取会话消息列表(需要认证)
34    ///
35    /// # Arguments
36    /// * `conversation_id` - 会话 ID
37    /// * `cursor` - 分页游标(可选)
38    /// * `limit` - 每页数量(可选,默认 20)
39    ///
40    /// # Returns
41    /// * `MessagesResponse` - 消息列表
42    pub async fn get_conversation_messages(
43        &self,
44        conversation_id: &str,
45        cursor: Option<&str>,
46        limit: Option<i32>,
47    ) -> SdkResult<MessagesResponse> {
48        let mut path = format!(
49            "/conversations/{}/messages?limit={}",
50            conversation_id,
51            limit.unwrap_or(20)
52        );
53        
54        if let Some(c) = cursor {
55            path.push_str(&format!("&cursor={}", c));
56        }
57        
58        self.http.get(&path).await
59    }
60
61    /// 发送消息(需要认证)
62    ///
63    /// # Arguments
64    /// * `conversation_id` - 会话 ID
65    /// * `content` - 消息内容
66    /// * `message_type` - 消息类型(默认 "text")
67    /// * `reply_to_id` - 回复的消息 ID(可选)
68    /// * `data` - 附加数据(可选)
69    ///
70    /// # Returns
71    /// * `Message` - 发送的消息
72    pub async fn send_message(
73        &self,
74        conversation_id: &str,
75        content: &str,
76        message_type: Option<&str>,
77        reply_to_id: Option<&str>,
78        data: Option<Value>,
79    ) -> SdkResult<Message> {
80        let request = SendMessageRequest {
81            conversation_id: conversation_id.to_string(),
82            content: content.to_string(),
83            message_type: message_type.unwrap_or("text").to_string(),
84            reply_to_id: reply_to_id.map(|s| s.to_string()),
85            data,
86        };
87
88        // Protocol: http-api.md#4.1-发送消息
89        // POST /api/v1/messages
90        self.http.post("/messages", &request).await
91    }
92
93    /// 删除消息(需要认证)
94    ///
95    /// # Arguments
96    /// * `conversation_id` - 会话 ID
97    /// * `message_id` - 消息 ID
98    pub async fn delete_message(
99        &self,
100        conversation_id: &str,
101        message_id: &str,
102    ) -> SdkResult<()> {
103        let path = format!("/conversations/{}/messages/{}", conversation_id, message_id);
104        self.http.delete(&path).await
105    }
106
107    /// 标记消息为已读(需要认证)
108    ///
109    /// # Arguments
110    /// * `conversation_id` - 会话 ID
111    /// * `message_id` - 消息 ID
112    pub async fn mark_message_as_read(
113        &self,
114        conversation_id: &str,
115        message_id: &str,
116    ) -> SdkResult<()> {
117        let path = format!(
118            "/conversations/{}/messages/{}/read",
119            conversation_id, message_id
120        );
121        let _: serde_json::Value = self.http.post(&path, &serde_json::json!({})).await?;
122        Ok(())
123    }
124
125    /// 触发消息历史同步(需要认证)
126    ///
127    /// 调用此接口后,服务端会通过 MQTT 推送消息历史数据
128    ///
129    /// # Arguments
130    /// * `conversation_id` - 会话 ID(可选,不提供则同步所有会话)
131    pub async fn trigger_sync(&self, conversation_id: Option<&str>) -> SdkResult<()> {
132        let body = if let Some(cid) = conversation_id {
133            serde_json::json!({ "conversation_id": cid })
134        } else {
135            serde_json::json!({})
136        };
137        
138        let _: serde_json::Value = self.http.post("/sync/messages", &body).await?;
139        Ok(())
140    }
141
142    /// 通过 MQTT 发送消息(需要 MQTT 连接)
143    ///
144    /// # Arguments
145    /// * `conversation_id` - 会话 ID
146    /// * `payload` - 消息 payload
147    pub async fn publish_message(&self, conversation_id: &str, payload: &[u8]) -> SdkResult<()> {
148        if let Some(ref mqtt) = self.mqtt {
149            let topic = format!("conversations/{}/messages", conversation_id);
150            let mqtt = mqtt.read().await;
151            mqtt.publish(&topic, payload).await
152        } else {
153            Err(crate::error::SdkError::NotConnected)
154        }
155    }
156}