1use async_trait::async_trait;
4use reqwest::Client;
5use serde::{Deserialize, Serialize};
6use std::sync::Arc;
7use tokio::sync::RwLock;
8
9use openclaw_core::types::{
10 Attachment, AttachmentKind, ChannelId, DeliveryResult, Message, PeerId, PeerType,
11};
12
13use crate::traits::{
14 Channel, ChannelCapabilities, ChannelContext, ChannelError, ChannelInbound, ChannelOutbound,
15 ChannelProbe, DeliveryMode, OutboundContext,
16};
17
18pub struct SignalChannel {
22 client: Client,
23 api_url: String,
24 phone_number: String,
25 state: Arc<RwLock<SignalState>>,
26}
27
28#[derive(Debug, Default)]
29struct SignalState {
30 registered: bool,
31 connected: bool,
32}
33
34impl SignalChannel {
35 #[must_use]
41 pub fn new(api_url: impl Into<String>, phone_number: impl Into<String>) -> Self {
42 Self {
43 client: Client::new(),
44 api_url: api_url.into(),
45 phone_number: phone_number.into(),
46 state: Arc::new(RwLock::new(SignalState::default())),
47 }
48 }
49
50 async fn call<T: for<'de> Deserialize<'de>>(
52 &self,
53 method: reqwest::Method,
54 endpoint: &str,
55 body: Option<&impl Serialize>,
56 ) -> Result<T, ChannelError> {
57 let url = format!("{}{}", self.api_url, endpoint);
58
59 let mut request = self
60 .client
61 .request(method, &url)
62 .header("Content-Type", "application/json");
63
64 if let Some(b) = body {
65 request = request.json(b);
66 }
67
68 let response = request
69 .send()
70 .await
71 .map_err(|e| ChannelError::Network(e.to_string()))?;
72
73 if !response.status().is_success() {
74 let status = response.status();
75 let text = response.text().await.unwrap_or_default();
76 return Err(ChannelError::Network(format!("{status}: {text}")));
77 }
78
79 response
80 .json()
81 .await
82 .map_err(|e| ChannelError::Network(e.to_string()))
83 }
84
85 async fn call_no_response(
87 &self,
88 method: reqwest::Method,
89 endpoint: &str,
90 body: Option<&impl Serialize>,
91 ) -> Result<(), ChannelError> {
92 let url = format!("{}{}", self.api_url, endpoint);
93
94 let mut request = self
95 .client
96 .request(method, &url)
97 .header("Content-Type", "application/json");
98
99 if let Some(b) = body {
100 request = request.json(b);
101 }
102
103 let response = request
104 .send()
105 .await
106 .map_err(|e| ChannelError::Network(e.to_string()))?;
107
108 if !response.status().is_success() {
109 let status = response.status();
110 let text = response.text().await.unwrap_or_default();
111 return Err(ChannelError::Network(format!("{status}: {text}")));
112 }
113
114 Ok(())
115 }
116}
117
118#[async_trait]
119impl Channel for SignalChannel {
120 fn id(&self) -> &'static str {
121 "signal"
122 }
123
124 fn label(&self) -> &'static str {
125 "Signal"
126 }
127
128 fn capabilities(&self) -> ChannelCapabilities {
129 ChannelCapabilities {
130 text: true,
131 images: true,
132 videos: true,
133 voice: true,
134 files: true,
135 threads: false, reactions: true, editing: false, deletion: true, }
140 }
141
142 async fn start(&self, _ctx: ChannelContext) -> Result<(), ChannelError> {
143 let endpoint = format!("/v1/about/{}", self.phone_number);
145 let _: SignalAbout = self
146 .call(reqwest::Method::GET, &endpoint, None::<&()>)
147 .await?;
148
149 let mut state = self.state.write().await;
150 state.registered = true;
151 state.connected = true;
152
153 tracing::info!("Signal connected: {}", self.phone_number);
154 Ok(())
155 }
156
157 async fn stop(&self) -> Result<(), ChannelError> {
158 let mut state = self.state.write().await;
159 state.connected = false;
160 Ok(())
161 }
162
163 async fn probe(&self) -> Result<ChannelProbe, ChannelError> {
164 let endpoint = format!("/v1/about/{}", self.phone_number);
165 match self
166 .call::<SignalAbout>(reqwest::Method::GET, &endpoint, None::<&()>)
167 .await
168 {
169 Ok(_) => Ok(ChannelProbe {
170 connected: true,
171 account_id: Some(self.phone_number.clone()),
172 display_name: Some(self.phone_number.clone()),
173 error: None,
174 }),
175 Err(e) => Ok(ChannelProbe {
176 connected: false,
177 account_id: None,
178 display_name: None,
179 error: Some(e.to_string()),
180 }),
181 }
182 }
183}
184
185#[async_trait]
186impl ChannelOutbound for SignalChannel {
187 async fn send_text(
188 &self,
189 ctx: OutboundContext,
190 text: &str,
191 ) -> Result<DeliveryResult, ChannelError> {
192 let endpoint = "/v2/send".to_string();
193
194 let params = SendMessageParams {
195 number: self.phone_number.clone(),
196 recipients: vec![ctx.chat_id.clone()],
197 message: text.to_string(),
198 base64_attachments: None,
199 };
200
201 let results: Vec<SendResult> = self
202 .call(reqwest::Method::POST, &endpoint, Some(¶ms))
203 .await?;
204
205 let timestamp = results
206 .first()
207 .map(|r| r.timestamp.to_string())
208 .unwrap_or_default();
209
210 Ok(DeliveryResult {
211 message_id: timestamp,
212 channel: ChannelId::signal(),
213 timestamp: chrono::Utc::now(),
214 chat_id: Some(ctx.chat_id),
215 meta: None,
216 })
217 }
218
219 async fn send_media(
220 &self,
221 ctx: OutboundContext,
222 media: &[Attachment],
223 ) -> Result<DeliveryResult, ChannelError> {
224 let urls: Vec<String> = media.iter().map(|a| a.url.clone()).collect();
228 let text = urls.join("\n");
229
230 self.send_text(ctx, &text).await
231 }
232
233 fn text_chunk_limit(&self) -> usize {
234 65536
236 }
237
238 fn delivery_mode(&self) -> DeliveryMode {
239 DeliveryMode::Immediate
240 }
241}
242
243#[async_trait]
244impl ChannelInbound for SignalChannel {
245 type RawMessage = SignalMessage;
246
247 fn normalize(&self, raw: Self::RawMessage) -> Result<Message, ChannelError> {
248 let raw_value = serde_json::to_value(&raw).unwrap_or_default();
249
250 let envelope = raw
251 .envelope
252 .ok_or_else(|| ChannelError::Config("No envelope in message".to_string()))?;
253
254 let source = envelope
255 .source
256 .ok_or_else(|| ChannelError::Config("No source in envelope".to_string()))?;
257
258 let data_message = envelope
259 .data_message
260 .ok_or_else(|| ChannelError::Config("No data message".to_string()))?;
261
262 let (peer_type, peer_id_str) = if let Some(group_info) = &data_message.group_info {
264 (PeerType::Group, group_info.group_id.clone())
265 } else {
266 (PeerType::Dm, source)
267 };
268
269 let attachments = data_message
271 .attachments
272 .unwrap_or_default()
273 .into_iter()
274 .map(|a| {
275 let kind = if a.content_type.starts_with("image/") {
276 AttachmentKind::Image
277 } else if a.content_type.starts_with("video/") {
278 AttachmentKind::Video
279 } else if a.content_type.starts_with("audio/") {
280 AttachmentKind::Audio
281 } else {
282 AttachmentKind::Document
283 };
284
285 Attachment {
286 kind,
287 url: a.id.clone(),
288 mime_type: Some(a.content_type),
289 filename: a.filename,
290 size: Some(a.size as u64),
291 thumbnail_url: None,
292 }
293 })
294 .collect();
295
296 let timestamp = chrono::DateTime::from_timestamp_millis(envelope.timestamp.unwrap_or(0))
297 .unwrap_or_else(chrono::Utc::now);
298
299 Ok(Message {
300 id: envelope.timestamp.unwrap_or(0).to_string(),
301 channel: ChannelId::signal(),
302 account_id: self.phone_number.clone(),
303 peer_id: PeerId::new(peer_id_str),
304 peer_type,
305 content: data_message.message.unwrap_or_default(),
306 attachments,
307 timestamp,
308 reply_to: data_message.quote.map(|q| q.id.to_string()),
309 thread_id: None,
310 mentions: data_message
311 .mentions
312 .map(|m| m.into_iter().map(|mention| mention.uuid).collect())
313 .unwrap_or_default(),
314 raw: Some(raw_value),
315 })
316 }
317
318 async fn acknowledge(&self, _message_id: &str) -> Result<(), ChannelError> {
319 Ok(())
321 }
322}
323
324#[derive(Debug, Deserialize)]
328struct SignalAbout {
329 versions: Option<serde_json::Value>,
330}
331
332#[derive(Debug, Serialize)]
334struct SendMessageParams {
335 number: String,
336 recipients: Vec<String>,
337 message: String,
338 #[serde(skip_serializing_if = "Option::is_none")]
339 base64_attachments: Option<Vec<String>>,
340}
341
342#[derive(Debug, Deserialize)]
344struct SendResult {
345 timestamp: i64,
346}
347
348#[derive(Debug, Clone, Serialize, Deserialize)]
350pub struct SignalMessage {
351 pub account: Option<String>,
353 pub envelope: Option<SignalEnvelope>,
355}
356
357#[derive(Debug, Clone, Serialize, Deserialize)]
359pub struct SignalEnvelope {
360 pub source: Option<String>,
362 #[serde(rename = "sourceUuid")]
364 pub source_uuid: Option<String>,
365 #[serde(rename = "sourceDevice")]
367 pub source_device: Option<i32>,
368 pub timestamp: Option<i64>,
370 #[serde(rename = "dataMessage")]
372 pub data_message: Option<SignalDataMessage>,
373}
374
375#[derive(Debug, Clone, Serialize, Deserialize)]
377pub struct SignalDataMessage {
378 pub message: Option<String>,
380 pub timestamp: Option<i64>,
382 pub attachments: Option<Vec<SignalAttachment>>,
384 #[serde(rename = "groupInfo")]
386 pub group_info: Option<SignalGroupInfo>,
387 pub quote: Option<SignalQuote>,
389 pub mentions: Option<Vec<SignalMention>>,
391}
392
393#[derive(Debug, Clone, Serialize, Deserialize)]
395pub struct SignalAttachment {
396 pub id: String,
398 #[serde(rename = "contentType")]
400 pub content_type: String,
401 pub filename: Option<String>,
403 pub size: i64,
405}
406
407#[derive(Debug, Clone, Serialize, Deserialize)]
409pub struct SignalGroupInfo {
410 #[serde(rename = "groupId")]
412 pub group_id: String,
413 #[serde(rename = "type")]
415 pub group_type: Option<String>,
416}
417
418#[derive(Debug, Clone, Serialize, Deserialize)]
420pub struct SignalQuote {
421 pub id: i64,
423 pub author: Option<String>,
425}
426
427#[derive(Debug, Clone, Serialize, Deserialize)]
429pub struct SignalMention {
430 pub uuid: String,
432 pub start: Option<i32>,
434 pub length: Option<i32>,
436}
437
438#[cfg(test)]
439mod tests {
440 use super::*;
441
442 #[test]
443 fn test_channel_id() {
444 let channel = SignalChannel::new("http://localhost:8080", "+1234567890");
445 assert_eq!(channel.id(), "signal");
446 }
447
448 #[test]
449 fn test_capabilities() {
450 let channel = SignalChannel::new("http://localhost:8080", "+1234567890");
451 let caps = channel.capabilities();
452 assert!(caps.text);
453 assert!(caps.images);
454 assert!(caps.reactions);
455 assert!(!caps.threads);
456 assert!(!caps.editing);
457 }
458}