1use async_trait::async_trait;
4use reqwest::Client;
5use serde::{Deserialize, Serialize};
6use std::sync::Arc;
7use tokio::sync::RwLock;
8
9use openclaw_core::secrets::ApiKey;
10use openclaw_core::types::{
11 Attachment, AttachmentKind, ChannelId, DeliveryResult, Message, PeerId, PeerType,
12};
13
14use crate::traits::{
15 Channel, ChannelCapabilities, ChannelContext, ChannelError, ChannelInbound, ChannelOutbound,
16 ChannelProbe, DeliveryMode, OutboundContext,
17};
18
19pub struct MatrixChannel {
21 client: Client,
22 homeserver_url: String,
23 access_token: ApiKey,
24 state: Arc<RwLock<MatrixState>>,
25}
26
27#[derive(Debug, Default)]
28struct MatrixState {
29 user_id: Option<String>,
30 device_id: Option<String>,
31 connected: bool,
32 next_batch: Option<String>,
33}
34
35impl MatrixChannel {
36 #[must_use]
42 pub fn new(homeserver_url: impl Into<String>, access_token: ApiKey) -> Self {
43 Self {
44 client: Client::new(),
45 homeserver_url: homeserver_url.into(),
46 access_token,
47 state: Arc::new(RwLock::new(MatrixState::default())),
48 }
49 }
50
51 fn api_url(&self, path: &str) -> String {
53 format!("{}/_matrix/client/v3{}", self.homeserver_url, path)
54 }
55
56 async fn call<T: for<'de> Deserialize<'de>>(
58 &self,
59 method: reqwest::Method,
60 path: &str,
61 body: Option<&impl Serialize>,
62 ) -> Result<T, ChannelError> {
63 let url = self.api_url(path);
64
65 let mut request = self
66 .client
67 .request(method, &url)
68 .header(
69 "Authorization",
70 format!("Bearer {}", self.access_token.expose()),
71 )
72 .header("Content-Type", "application/json");
73
74 if let Some(b) = body {
75 request = request.json(b);
76 }
77
78 let response = request
79 .send()
80 .await
81 .map_err(|e| ChannelError::Network(e.to_string()))?;
82
83 if !response.status().is_success() {
84 let status = response.status();
85 if status.as_u16() == 429 {
86 return Err(ChannelError::RateLimited);
87 }
88 let text = response.text().await.unwrap_or_default();
89 return Err(ChannelError::Network(format!("{status}: {text}")));
90 }
91
92 response
93 .json()
94 .await
95 .map_err(|e| ChannelError::Network(e.to_string()))
96 }
97
98 fn txn_id() -> String {
100 format!("openclaw_{}", uuid::Uuid::new_v4())
101 }
102}
103
104#[async_trait]
105impl Channel for MatrixChannel {
106 fn id(&self) -> &'static str {
107 "matrix"
108 }
109
110 fn label(&self) -> &'static str {
111 "Matrix"
112 }
113
114 fn capabilities(&self) -> ChannelCapabilities {
115 ChannelCapabilities {
116 text: true,
117 images: true,
118 videos: true,
119 voice: true,
120 files: true,
121 threads: true, reactions: true,
123 editing: true,
124 deletion: true,
125 }
126 }
127
128 async fn start(&self, _ctx: ChannelContext) -> Result<(), ChannelError> {
129 let whoami: WhoAmIResponse = self
131 .call(reqwest::Method::GET, "/account/whoami", None::<&()>)
132 .await?;
133
134 let mut state = self.state.write().await;
135 state.user_id = Some(whoami.user_id.clone());
136 state.device_id = whoami.device_id;
137 state.connected = true;
138
139 tracing::info!("Matrix connected: {}", whoami.user_id);
140 Ok(())
141 }
142
143 async fn stop(&self) -> Result<(), ChannelError> {
144 let mut state = self.state.write().await;
145 state.connected = false;
146 Ok(())
147 }
148
149 async fn probe(&self) -> Result<ChannelProbe, ChannelError> {
150 match self
151 .call::<WhoAmIResponse>(reqwest::Method::GET, "/account/whoami", None::<&()>)
152 .await
153 {
154 Ok(whoami) => Ok(ChannelProbe {
155 connected: true,
156 account_id: Some(whoami.user_id.clone()),
157 display_name: Some(whoami.user_id),
158 error: None,
159 }),
160 Err(e) => Ok(ChannelProbe {
161 connected: false,
162 account_id: None,
163 display_name: None,
164 error: Some(e.to_string()),
165 }),
166 }
167 }
168}
169
170#[async_trait]
171impl ChannelOutbound for MatrixChannel {
172 async fn send_text(
173 &self,
174 ctx: OutboundContext,
175 text: &str,
176 ) -> Result<DeliveryResult, ChannelError> {
177 let room_id = urlencoding::encode(&ctx.chat_id);
178 let txn_id = Self::txn_id();
179 let path = format!("/rooms/{room_id}/send/m.room.message/{txn_id}");
180
181 let mut content = MessageContent {
182 msgtype: "m.text".to_string(),
183 body: text.to_string(),
184 format: Some("org.matrix.custom.html".to_string()),
185 formatted_body: Some(text.to_string()),
186 relates_to: None,
187 };
188
189 if let Some(reply_to) = ctx.reply_to {
191 content.relates_to = Some(RelatesTo {
192 in_reply_to: Some(InReplyTo { event_id: reply_to }),
193 rel_type: None,
194 event_id: None,
195 });
196 }
197
198 let result: SendEventResponse = self
199 .call(reqwest::Method::PUT, &path, Some(&content))
200 .await?;
201
202 Ok(DeliveryResult {
203 message_id: result.event_id,
204 channel: ChannelId::matrix(),
205 timestamp: chrono::Utc::now(),
206 chat_id: Some(ctx.chat_id),
207 meta: None,
208 })
209 }
210
211 async fn send_media(
212 &self,
213 ctx: OutboundContext,
214 media: &[Attachment],
215 ) -> Result<DeliveryResult, ChannelError> {
216 let room_id = urlencoding::encode(&ctx.chat_id);
217 let mut last_event_id = String::new();
218
219 for attachment in media {
220 let txn_id = Self::txn_id();
221 let path = format!("/rooms/{room_id}/send/m.room.message/{txn_id}");
222
223 let msgtype = match attachment.kind {
224 AttachmentKind::Image => "m.image",
225 AttachmentKind::Video => "m.video",
226 AttachmentKind::Audio | AttachmentKind::Voice => "m.audio",
227 _ => "m.file",
228 };
229
230 let content = MediaMessageContent {
231 msgtype: msgtype.to_string(),
232 body: attachment
233 .filename
234 .clone()
235 .unwrap_or_else(|| "file".to_string()),
236 url: attachment.url.clone(),
237 info: Some(MediaInfo {
238 mimetype: attachment.mime_type.clone(),
239 size: attachment.size.map(|s| s as i64),
240 thumbnail_url: None,
241 }),
242 };
243
244 let result: SendEventResponse = self
245 .call(reqwest::Method::PUT, &path, Some(&content))
246 .await?;
247
248 last_event_id = result.event_id;
249 }
250
251 Ok(DeliveryResult {
252 message_id: last_event_id,
253 channel: ChannelId::matrix(),
254 timestamp: chrono::Utc::now(),
255 chat_id: Some(ctx.chat_id),
256 meta: None,
257 })
258 }
259
260 fn text_chunk_limit(&self) -> usize {
261 60000
263 }
264
265 fn delivery_mode(&self) -> DeliveryMode {
266 DeliveryMode::Immediate
267 }
268}
269
270#[async_trait]
271impl ChannelInbound for MatrixChannel {
272 type RawMessage = MatrixEvent;
273
274 fn normalize(&self, raw: Self::RawMessage) -> Result<Message, ChannelError> {
275 let raw_value = serde_json::to_value(&raw).unwrap_or_default();
276
277 let sender = raw
278 .sender
279 .ok_or_else(|| ChannelError::Config("No sender in event".to_string()))?;
280
281 let _room_id = raw
282 .room_id
283 .ok_or_else(|| ChannelError::Config("No room_id in event".to_string()))?;
284
285 let content = raw
286 .content
287 .ok_or_else(|| ChannelError::Config("No content in event".to_string()))?;
288
289 let state = futures::executor::block_on(self.state.read());
290 let account_id = state.user_id.clone().unwrap_or_default();
291
292 let text = content.body.unwrap_or_default();
294
295 let attachments = if let Some(url) = content.url {
297 let kind = match content.msgtype.as_deref() {
298 Some("m.image") => AttachmentKind::Image,
299 Some("m.video") => AttachmentKind::Video,
300 Some("m.audio") => AttachmentKind::Audio,
301 _ => AttachmentKind::Document,
302 };
303
304 vec![Attachment {
305 kind,
306 url,
307 mime_type: content.info.as_ref().and_then(|i| i.mimetype.clone()),
308 filename: Some(text.clone()),
309 size: content.info.as_ref().and_then(|i| i.size.map(|s| s as u64)),
310 thumbnail_url: content.info.and_then(|i| i.thumbnail_url),
311 }]
312 } else {
313 Vec::new()
314 };
315
316 let peer_type = PeerType::Group; let timestamp = raw
322 .origin_server_ts
323 .and_then(chrono::DateTime::from_timestamp_millis)
324 .unwrap_or_else(chrono::Utc::now);
325
326 let reply_to = content
328 .relates_to
329 .and_then(|r| r.in_reply_to)
330 .map(|r| r.event_id);
331
332 Ok(Message {
333 id: raw.event_id.unwrap_or_default(),
334 channel: ChannelId::matrix(),
335 account_id,
336 peer_id: PeerId::new(sender),
337 peer_type,
338 content: text,
339 attachments,
340 timestamp,
341 reply_to,
342 thread_id: None, mentions: Vec::new(),
344 raw: Some(raw_value),
345 })
346 }
347
348 async fn acknowledge(&self, _message_id: &str) -> Result<(), ChannelError> {
349 Ok(())
351 }
352}
353
354#[derive(Debug, Deserialize)]
358struct WhoAmIResponse {
359 user_id: String,
360 device_id: Option<String>,
361}
362
363#[derive(Debug, Deserialize)]
365struct SendEventResponse {
366 event_id: String,
367}
368
369#[derive(Debug, Serialize)]
371struct MessageContent {
372 msgtype: String,
373 body: String,
374 #[serde(skip_serializing_if = "Option::is_none")]
375 format: Option<String>,
376 #[serde(skip_serializing_if = "Option::is_none")]
377 formatted_body: Option<String>,
378 #[serde(rename = "m.relates_to")]
379 #[serde(skip_serializing_if = "Option::is_none")]
380 relates_to: Option<RelatesTo>,
381}
382
383#[derive(Debug, Serialize)]
385struct MediaMessageContent {
386 msgtype: String,
387 body: String,
388 url: String,
389 #[serde(skip_serializing_if = "Option::is_none")]
390 info: Option<MediaInfo>,
391}
392
393#[derive(Debug, Clone, Serialize, Deserialize)]
395struct MediaInfo {
396 #[serde(skip_serializing_if = "Option::is_none")]
397 mimetype: Option<String>,
398 #[serde(skip_serializing_if = "Option::is_none")]
399 size: Option<i64>,
400 #[serde(skip_serializing_if = "Option::is_none")]
401 thumbnail_url: Option<String>,
402}
403
404#[derive(Debug, Clone, Serialize, Deserialize)]
406struct RelatesTo {
407 #[serde(rename = "m.in_reply_to")]
408 #[serde(skip_serializing_if = "Option::is_none")]
409 in_reply_to: Option<InReplyTo>,
410 #[serde(skip_serializing_if = "Option::is_none")]
411 rel_type: Option<String>,
412 #[serde(skip_serializing_if = "Option::is_none")]
413 event_id: Option<String>,
414}
415
416#[derive(Debug, Clone, Serialize, Deserialize)]
418struct InReplyTo {
419 event_id: String,
420}
421
422#[derive(Debug, Clone, Serialize, Deserialize)]
424pub struct MatrixEvent {
425 pub event_id: Option<String>,
427 #[serde(rename = "type")]
429 pub event_type: Option<String>,
430 pub room_id: Option<String>,
432 pub sender: Option<String>,
434 pub origin_server_ts: Option<i64>,
436 pub content: Option<MatrixMessageContent>,
438}
439
440#[derive(Debug, Clone, Serialize, Deserialize)]
442pub struct MatrixMessageContent {
443 pub msgtype: Option<String>,
445 pub body: Option<String>,
447 pub url: Option<String>,
449 pub info: Option<MediaInfo>,
451 pub formatted_body: Option<String>,
453 #[serde(rename = "m.relates_to")]
455 pub relates_to: Option<RelatesTo>,
456}
457
458#[cfg(test)]
459mod tests {
460 use super::*;
461
462 #[test]
463 fn test_channel_id() {
464 let channel = MatrixChannel::new("https://matrix.org", ApiKey::new("test".to_string()));
465 assert_eq!(channel.id(), "matrix");
466 }
467
468 #[test]
469 fn test_capabilities() {
470 let channel = MatrixChannel::new("https://matrix.org", ApiKey::new("test".to_string()));
471 let caps = channel.capabilities();
472 assert!(caps.text);
473 assert!(caps.images);
474 assert!(caps.reactions);
475 assert!(caps.threads);
476 assert!(caps.editing);
477 }
478}