Skip to main content

openclaw_channels/matrix/
mod.rs

1//! Matrix channel adapter using the Client-Server API.
2
3use async_trait::async_trait;
4use reqwest::Client;
5use serde::{Deserialize, Serialize};
6use std::sync::Arc;
7use tokio::sync::RwLock;
8
9use openclaw_core::secrets::ApiKey;
10use openclaw_core::types::{
11    Attachment, AttachmentKind, ChannelId, DeliveryResult, Message, PeerId, PeerType,
12};
13
14use crate::traits::{
15    Channel, ChannelCapabilities, ChannelContext, ChannelError, ChannelInbound, ChannelOutbound,
16    ChannelProbe, DeliveryMode, OutboundContext,
17};
18
19/// Matrix channel adapter.
20pub struct MatrixChannel {
21    client: Client,
22    homeserver_url: String,
23    access_token: ApiKey,
24    state: Arc<RwLock<MatrixState>>,
25}
26
27#[derive(Debug, Default)]
28struct MatrixState {
29    user_id: Option<String>,
30    device_id: Option<String>,
31    connected: bool,
32    next_batch: Option<String>,
33}
34
35impl MatrixChannel {
36    /// Create a new Matrix channel.
37    ///
38    /// # Arguments
39    /// * `homeserver_url` - URL of the Matrix homeserver (e.g., "<https://matrix.org>")
40    /// * `access_token` - Access token for authentication
41    #[must_use]
42    pub fn new(homeserver_url: impl Into<String>, access_token: ApiKey) -> Self {
43        Self {
44            client: Client::new(),
45            homeserver_url: homeserver_url.into(),
46            access_token,
47            state: Arc::new(RwLock::new(MatrixState::default())),
48        }
49    }
50
51    /// Build API URL.
52    fn api_url(&self, path: &str) -> String {
53        format!("{}/_matrix/client/v3{}", self.homeserver_url, path)
54    }
55
56    /// Call a Matrix API endpoint.
57    async fn call<T: for<'de> Deserialize<'de>>(
58        &self,
59        method: reqwest::Method,
60        path: &str,
61        body: Option<&impl Serialize>,
62    ) -> Result<T, ChannelError> {
63        let url = self.api_url(path);
64
65        let mut request = self
66            .client
67            .request(method, &url)
68            .header(
69                "Authorization",
70                format!("Bearer {}", self.access_token.expose()),
71            )
72            .header("Content-Type", "application/json");
73
74        if let Some(b) = body {
75            request = request.json(b);
76        }
77
78        let response = request
79            .send()
80            .await
81            .map_err(|e| ChannelError::Network(e.to_string()))?;
82
83        if !response.status().is_success() {
84            let status = response.status();
85            if status.as_u16() == 429 {
86                return Err(ChannelError::RateLimited);
87            }
88            let text = response.text().await.unwrap_or_default();
89            return Err(ChannelError::Network(format!("{status}: {text}")));
90        }
91
92        response
93            .json()
94            .await
95            .map_err(|e| ChannelError::Network(e.to_string()))
96    }
97
98    /// Generate a transaction ID for idempotency.
99    fn txn_id() -> String {
100        format!("openclaw_{}", uuid::Uuid::new_v4())
101    }
102}
103
104#[async_trait]
105impl Channel for MatrixChannel {
106    fn id(&self) -> &'static str {
107        "matrix"
108    }
109
110    fn label(&self) -> &'static str {
111        "Matrix"
112    }
113
114    fn capabilities(&self) -> ChannelCapabilities {
115        ChannelCapabilities {
116            text: true,
117            images: true,
118            videos: true,
119            voice: true,
120            files: true,
121            threads: true, // Matrix has reply threads
122            reactions: true,
123            editing: true,
124            deletion: true,
125        }
126    }
127
128    async fn start(&self, _ctx: ChannelContext) -> Result<(), ChannelError> {
129        // Get whoami to verify credentials
130        let whoami: WhoAmIResponse = self
131            .call(reqwest::Method::GET, "/account/whoami", None::<&()>)
132            .await?;
133
134        let mut state = self.state.write().await;
135        state.user_id = Some(whoami.user_id.clone());
136        state.device_id = whoami.device_id;
137        state.connected = true;
138
139        tracing::info!("Matrix connected: {}", whoami.user_id);
140        Ok(())
141    }
142
143    async fn stop(&self) -> Result<(), ChannelError> {
144        let mut state = self.state.write().await;
145        state.connected = false;
146        Ok(())
147    }
148
149    async fn probe(&self) -> Result<ChannelProbe, ChannelError> {
150        match self
151            .call::<WhoAmIResponse>(reqwest::Method::GET, "/account/whoami", None::<&()>)
152            .await
153        {
154            Ok(whoami) => Ok(ChannelProbe {
155                connected: true,
156                account_id: Some(whoami.user_id.clone()),
157                display_name: Some(whoami.user_id),
158                error: None,
159            }),
160            Err(e) => Ok(ChannelProbe {
161                connected: false,
162                account_id: None,
163                display_name: None,
164                error: Some(e.to_string()),
165            }),
166        }
167    }
168}
169
170#[async_trait]
171impl ChannelOutbound for MatrixChannel {
172    async fn send_text(
173        &self,
174        ctx: OutboundContext,
175        text: &str,
176    ) -> Result<DeliveryResult, ChannelError> {
177        let room_id = urlencoding::encode(&ctx.chat_id);
178        let txn_id = Self::txn_id();
179        let path = format!("/rooms/{room_id}/send/m.room.message/{txn_id}");
180
181        let mut content = MessageContent {
182            msgtype: "m.text".to_string(),
183            body: text.to_string(),
184            format: Some("org.matrix.custom.html".to_string()),
185            formatted_body: Some(text.to_string()),
186            relates_to: None,
187        };
188
189        // Add reply relation if replying
190        if let Some(reply_to) = ctx.reply_to {
191            content.relates_to = Some(RelatesTo {
192                in_reply_to: Some(InReplyTo { event_id: reply_to }),
193                rel_type: None,
194                event_id: None,
195            });
196        }
197
198        let result: SendEventResponse = self
199            .call(reqwest::Method::PUT, &path, Some(&content))
200            .await?;
201
202        Ok(DeliveryResult {
203            message_id: result.event_id,
204            channel: ChannelId::matrix(),
205            timestamp: chrono::Utc::now(),
206            chat_id: Some(ctx.chat_id),
207            meta: None,
208        })
209    }
210
211    async fn send_media(
212        &self,
213        ctx: OutboundContext,
214        media: &[Attachment],
215    ) -> Result<DeliveryResult, ChannelError> {
216        let room_id = urlencoding::encode(&ctx.chat_id);
217        let mut last_event_id = String::new();
218
219        for attachment in media {
220            let txn_id = Self::txn_id();
221            let path = format!("/rooms/{room_id}/send/m.room.message/{txn_id}");
222
223            let msgtype = match attachment.kind {
224                AttachmentKind::Image => "m.image",
225                AttachmentKind::Video => "m.video",
226                AttachmentKind::Audio | AttachmentKind::Voice => "m.audio",
227                _ => "m.file",
228            };
229
230            let content = MediaMessageContent {
231                msgtype: msgtype.to_string(),
232                body: attachment
233                    .filename
234                    .clone()
235                    .unwrap_or_else(|| "file".to_string()),
236                url: attachment.url.clone(),
237                info: Some(MediaInfo {
238                    mimetype: attachment.mime_type.clone(),
239                    size: attachment.size.map(|s| s as i64),
240                    thumbnail_url: None,
241                }),
242            };
243
244            let result: SendEventResponse = self
245                .call(reqwest::Method::PUT, &path, Some(&content))
246                .await?;
247
248            last_event_id = result.event_id;
249        }
250
251        Ok(DeliveryResult {
252            message_id: last_event_id,
253            channel: ChannelId::matrix(),
254            timestamp: chrono::Utc::now(),
255            chat_id: Some(ctx.chat_id),
256            meta: None,
257        })
258    }
259
260    fn text_chunk_limit(&self) -> usize {
261        // Matrix doesn't have a strict limit, but events should be < 64KB
262        60000
263    }
264
265    fn delivery_mode(&self) -> DeliveryMode {
266        DeliveryMode::Immediate
267    }
268}
269
270#[async_trait]
271impl ChannelInbound for MatrixChannel {
272    type RawMessage = MatrixEvent;
273
274    fn normalize(&self, raw: Self::RawMessage) -> Result<Message, ChannelError> {
275        let raw_value = serde_json::to_value(&raw).unwrap_or_default();
276
277        let sender = raw
278            .sender
279            .ok_or_else(|| ChannelError::Config("No sender in event".to_string()))?;
280
281        let _room_id = raw
282            .room_id
283            .ok_or_else(|| ChannelError::Config("No room_id in event".to_string()))?;
284
285        let content = raw
286            .content
287            .ok_or_else(|| ChannelError::Config("No content in event".to_string()))?;
288
289        let state = futures::executor::block_on(self.state.read());
290        let account_id = state.user_id.clone().unwrap_or_default();
291
292        // Extract text content
293        let text = content.body.unwrap_or_default();
294
295        // Handle attachments (m.image, m.video, m.audio, m.file)
296        let attachments = if let Some(url) = content.url {
297            let kind = match content.msgtype.as_deref() {
298                Some("m.image") => AttachmentKind::Image,
299                Some("m.video") => AttachmentKind::Video,
300                Some("m.audio") => AttachmentKind::Audio,
301                _ => AttachmentKind::Document,
302            };
303
304            vec![Attachment {
305                kind,
306                url,
307                mime_type: content.info.as_ref().and_then(|i| i.mimetype.clone()),
308                filename: Some(text.clone()),
309                size: content.info.as_ref().and_then(|i| i.size.map(|s| s as u64)),
310                thumbnail_url: content.info.and_then(|i| i.thumbnail_url),
311            }]
312        } else {
313            Vec::new()
314        };
315
316        // Determine peer type (DM vs group)
317        // In Matrix, room membership determines this, but we simplify
318        let peer_type = PeerType::Group; // Most Matrix rooms are group-like
319
320        // Parse timestamp from origin_server_ts (milliseconds)
321        let timestamp = raw
322            .origin_server_ts
323            .and_then(chrono::DateTime::from_timestamp_millis)
324            .unwrap_or_else(chrono::Utc::now);
325
326        // Extract reply-to from m.relates_to
327        let reply_to = content
328            .relates_to
329            .and_then(|r| r.in_reply_to)
330            .map(|r| r.event_id);
331
332        Ok(Message {
333            id: raw.event_id.unwrap_or_default(),
334            channel: ChannelId::matrix(),
335            account_id,
336            peer_id: PeerId::new(sender),
337            peer_type,
338            content: text,
339            attachments,
340            timestamp,
341            reply_to,
342            thread_id: None, // Matrix uses reply chains, not explicit threads
343            mentions: Vec::new(),
344            raw: Some(raw_value),
345        })
346    }
347
348    async fn acknowledge(&self, _message_id: &str) -> Result<(), ChannelError> {
349        // Matrix uses read receipts, could implement with /receipt endpoint
350        Ok(())
351    }
352}
353
354// Matrix API types
355
356/// whoami response.
357#[derive(Debug, Deserialize)]
358struct WhoAmIResponse {
359    user_id: String,
360    device_id: Option<String>,
361}
362
363/// Send event response.
364#[derive(Debug, Deserialize)]
365struct SendEventResponse {
366    event_id: String,
367}
368
369/// Message content for m.room.message.
370#[derive(Debug, Serialize)]
371struct MessageContent {
372    msgtype: String,
373    body: String,
374    #[serde(skip_serializing_if = "Option::is_none")]
375    format: Option<String>,
376    #[serde(skip_serializing_if = "Option::is_none")]
377    formatted_body: Option<String>,
378    #[serde(rename = "m.relates_to")]
379    #[serde(skip_serializing_if = "Option::is_none")]
380    relates_to: Option<RelatesTo>,
381}
382
383/// Media message content.
384#[derive(Debug, Serialize)]
385struct MediaMessageContent {
386    msgtype: String,
387    body: String,
388    url: String,
389    #[serde(skip_serializing_if = "Option::is_none")]
390    info: Option<MediaInfo>,
391}
392
393/// Media info.
394#[derive(Debug, Clone, Serialize, Deserialize)]
395struct MediaInfo {
396    #[serde(skip_serializing_if = "Option::is_none")]
397    mimetype: Option<String>,
398    #[serde(skip_serializing_if = "Option::is_none")]
399    size: Option<i64>,
400    #[serde(skip_serializing_if = "Option::is_none")]
401    thumbnail_url: Option<String>,
402}
403
404/// Relates-to for replies/threads.
405#[derive(Debug, Clone, Serialize, Deserialize)]
406struct RelatesTo {
407    #[serde(rename = "m.in_reply_to")]
408    #[serde(skip_serializing_if = "Option::is_none")]
409    in_reply_to: Option<InReplyTo>,
410    #[serde(skip_serializing_if = "Option::is_none")]
411    rel_type: Option<String>,
412    #[serde(skip_serializing_if = "Option::is_none")]
413    event_id: Option<String>,
414}
415
416/// In-reply-to reference.
417#[derive(Debug, Clone, Serialize, Deserialize)]
418struct InReplyTo {
419    event_id: String,
420}
421
422/// Matrix room event.
423#[derive(Debug, Clone, Serialize, Deserialize)]
424pub struct MatrixEvent {
425    /// Event ID.
426    pub event_id: Option<String>,
427    /// Event type (e.g., "m.room.message").
428    #[serde(rename = "type")]
429    pub event_type: Option<String>,
430    /// Room ID.
431    pub room_id: Option<String>,
432    /// Sender user ID.
433    pub sender: Option<String>,
434    /// Origin server timestamp (milliseconds).
435    pub origin_server_ts: Option<i64>,
436    /// Event content.
437    pub content: Option<MatrixMessageContent>,
438}
439
440/// Matrix message content.
441#[derive(Debug, Clone, Serialize, Deserialize)]
442pub struct MatrixMessageContent {
443    /// Message type (e.g., "m.text", "m.image").
444    pub msgtype: Option<String>,
445    /// Message body.
446    pub body: Option<String>,
447    /// Media URL (for m.image, m.video, etc.).
448    pub url: Option<String>,
449    /// Media info.
450    pub info: Option<MediaInfo>,
451    /// Formatted body (HTML).
452    pub formatted_body: Option<String>,
453    /// Relations (replies, threads).
454    #[serde(rename = "m.relates_to")]
455    pub relates_to: Option<RelatesTo>,
456}
457
458#[cfg(test)]
459mod tests {
460    use super::*;
461
462    #[test]
463    fn test_channel_id() {
464        let channel = MatrixChannel::new("https://matrix.org", ApiKey::new("test".to_string()));
465        assert_eq!(channel.id(), "matrix");
466    }
467
468    #[test]
469    fn test_capabilities() {
470        let channel = MatrixChannel::new("https://matrix.org", ApiKey::new("test".to_string()));
471        let caps = channel.capabilities();
472        assert!(caps.text);
473        assert!(caps.images);
474        assert!(caps.reactions);
475        assert!(caps.threads);
476        assert!(caps.editing);
477    }
478}