Skip to main content

construct/channels/
signal.rs

1use crate::channels::traits::{Channel, ChannelMessage, SendMessage};
2use async_trait::async_trait;
3use futures_util::StreamExt;
4use reqwest::Client;
5use serde::Deserialize;
6use std::time::Duration;
7use tokio::sync::mpsc;
8use uuid::Uuid;
9
10const GROUP_TARGET_PREFIX: &str = "group:";
11
12#[derive(Debug, Clone, PartialEq, Eq)]
13enum RecipientTarget {
14    Direct(String),
15    Group(String),
16}
17
18/// Signal channel using signal-cli daemon's native JSON-RPC + SSE API.
19///
20/// Connects to a running `signal-cli daemon --http <host:port>`.
21/// Listens via SSE at `/api/v1/events` and sends via JSON-RPC at
22/// `/api/v1/rpc`.
23#[derive(Clone)]
24pub struct SignalChannel {
25    http_url: String,
26    account: String,
27    group_id: Option<String>,
28    allowed_from: Vec<String>,
29    ignore_attachments: bool,
30    ignore_stories: bool,
31    /// Per-channel proxy URL override.
32    proxy_url: Option<String>,
33}
34
35// ── signal-cli SSE event JSON shapes ────────────────────────────
36
37#[derive(Debug, Deserialize)]
38struct SseEnvelope {
39    #[serde(default)]
40    envelope: Option<Envelope>,
41}
42
43#[derive(Debug, Deserialize)]
44struct Envelope {
45    #[serde(default)]
46    source: Option<String>,
47    #[serde(rename = "sourceNumber", default)]
48    source_number: Option<String>,
49    #[serde(rename = "dataMessage", default)]
50    data_message: Option<DataMessage>,
51    #[serde(rename = "storyMessage", default)]
52    story_message: Option<serde_json::Value>,
53    #[serde(default)]
54    timestamp: Option<u64>,
55}
56
57#[derive(Debug, Deserialize)]
58struct DataMessage {
59    #[serde(default)]
60    message: Option<String>,
61    #[serde(default)]
62    timestamp: Option<u64>,
63    #[serde(rename = "groupInfo", default)]
64    group_info: Option<GroupInfo>,
65    #[serde(default)]
66    attachments: Option<Vec<serde_json::Value>>,
67}
68
69#[derive(Debug, Deserialize)]
70struct GroupInfo {
71    #[serde(rename = "groupId", default)]
72    group_id: Option<String>,
73}
74
75impl SignalChannel {
76    pub fn new(
77        http_url: String,
78        account: String,
79        group_id: Option<String>,
80        allowed_from: Vec<String>,
81        ignore_attachments: bool,
82        ignore_stories: bool,
83    ) -> Self {
84        let http_url = http_url.trim_end_matches('/').to_string();
85        Self {
86            http_url,
87            account,
88            group_id,
89            allowed_from,
90            ignore_attachments,
91            ignore_stories,
92            proxy_url: None,
93        }
94    }
95
96    /// Set a per-channel proxy URL that overrides the global proxy config.
97    pub fn with_proxy_url(mut self, proxy_url: Option<String>) -> Self {
98        self.proxy_url = proxy_url;
99        self
100    }
101
102    fn http_client(&self) -> Client {
103        let builder = Client::builder().connect_timeout(Duration::from_secs(10));
104        let builder = crate::config::apply_channel_proxy_to_builder(
105            builder,
106            "channel.signal",
107            self.proxy_url.as_deref(),
108        );
109        builder.build().expect("Signal HTTP client should build")
110    }
111
112    /// Effective sender: prefer `sourceNumber` (E.164), fall back to `source`.
113    fn sender(envelope: &Envelope) -> Option<String> {
114        envelope
115            .source_number
116            .as_deref()
117            .or(envelope.source.as_deref())
118            .map(String::from)
119    }
120
121    fn is_sender_allowed(&self, sender: &str) -> bool {
122        if self.allowed_from.iter().any(|u| u == "*") {
123            return true;
124        }
125        self.allowed_from.iter().any(|u| u == sender)
126    }
127
128    fn is_e164(recipient: &str) -> bool {
129        let Some(number) = recipient.strip_prefix('+') else {
130            return false;
131        };
132        (2..=15).contains(&number.len()) && number.chars().all(|c| c.is_ascii_digit())
133    }
134
135    /// Check whether a string is a valid UUID (signal-cli uses these for
136    /// privacy-enabled users who have opted out of sharing their phone number).
137    fn is_uuid(s: &str) -> bool {
138        Uuid::parse_str(s).is_ok()
139    }
140
141    fn parse_recipient_target(recipient: &str) -> RecipientTarget {
142        if let Some(group_id) = recipient.strip_prefix(GROUP_TARGET_PREFIX) {
143            return RecipientTarget::Group(group_id.to_string());
144        }
145
146        if Self::is_e164(recipient) || Self::is_uuid(recipient) {
147            RecipientTarget::Direct(recipient.to_string())
148        } else {
149            RecipientTarget::Group(recipient.to_string())
150        }
151    }
152
153    /// Check whether the message targets the configured group.
154    /// If no `group_id` is configured (None), all DMs and groups are accepted.
155    /// Use "dm" to filter DMs only.
156    fn matches_group(&self, data_msg: &DataMessage) -> bool {
157        let Some(ref expected) = self.group_id else {
158            return true;
159        };
160        match data_msg
161            .group_info
162            .as_ref()
163            .and_then(|g| g.group_id.as_deref())
164        {
165            Some(gid) => gid == expected.as_str(),
166            None => expected.eq_ignore_ascii_case("dm"),
167        }
168    }
169
170    /// Determine the send target: group id or the sender's number.
171    fn reply_target(&self, data_msg: &DataMessage, sender: &str) -> String {
172        if let Some(group_id) = data_msg
173            .group_info
174            .as_ref()
175            .and_then(|g| g.group_id.as_deref())
176        {
177            format!("{GROUP_TARGET_PREFIX}{group_id}")
178        } else {
179            sender.to_string()
180        }
181    }
182
183    /// Send a JSON-RPC request to signal-cli daemon.
184    async fn rpc_request(
185        &self,
186        method: &str,
187        params: serde_json::Value,
188    ) -> anyhow::Result<Option<serde_json::Value>> {
189        let url = format!("{}/api/v1/rpc", self.http_url);
190        let id = Uuid::new_v4().to_string();
191
192        let body = serde_json::json!({
193            "jsonrpc": "2.0",
194            "method": method,
195            "params": params,
196            "id": id,
197        });
198
199        let resp = self
200            .http_client()
201            .post(&url)
202            .timeout(Duration::from_secs(30))
203            .header("Content-Type", "application/json")
204            .json(&body)
205            .send()
206            .await?;
207
208        // 201 = success with no body (e.g. typing indicators)
209        if resp.status().as_u16() == 201 {
210            return Ok(None);
211        }
212
213        let text = resp.text().await?;
214        if text.is_empty() {
215            return Ok(None);
216        }
217
218        let parsed: serde_json::Value = serde_json::from_str(&text)?;
219        if let Some(err) = parsed.get("error") {
220            let code = err.get("code").and_then(|c| c.as_i64()).unwrap_or(-1);
221            let msg = err
222                .get("message")
223                .and_then(|m| m.as_str())
224                .unwrap_or("unknown");
225            anyhow::bail!("Signal RPC error {code}: {msg}");
226        }
227
228        Ok(parsed.get("result").cloned())
229    }
230
231    /// Process a single SSE envelope, returning a ChannelMessage if valid.
232    fn process_envelope(&self, envelope: &Envelope) -> Option<ChannelMessage> {
233        // Skip story messages when configured
234        if self.ignore_stories && envelope.story_message.is_some() {
235            return None;
236        }
237
238        let data_msg = envelope.data_message.as_ref()?;
239
240        // Skip attachment-only messages when configured
241        if self.ignore_attachments {
242            let has_attachments = data_msg.attachments.as_ref().is_some_and(|a| !a.is_empty());
243            if has_attachments && data_msg.message.is_none() {
244                return None;
245            }
246        }
247
248        let text = data_msg.message.as_deref().filter(|t| !t.is_empty())?;
249        let sender = Self::sender(envelope)?;
250
251        if !self.is_sender_allowed(&sender) {
252            return None;
253        }
254
255        if !self.matches_group(data_msg) {
256            return None;
257        }
258
259        let target = self.reply_target(data_msg, &sender);
260
261        let timestamp = data_msg
262            .timestamp
263            .or(envelope.timestamp)
264            .unwrap_or_else(|| {
265                u64::try_from(
266                    std::time::SystemTime::now()
267                        .duration_since(std::time::UNIX_EPOCH)
268                        .unwrap_or_default()
269                        .as_millis(),
270                )
271                .unwrap_or(u64::MAX)
272            });
273
274        Some(ChannelMessage {
275            id: format!("sig_{timestamp}"),
276            sender: sender.clone(),
277            reply_target: target,
278            content: text.to_string(),
279            channel: "signal".to_string(),
280            timestamp: timestamp / 1000, // millis → secs
281            thread_ts: None,
282            interruption_scope_id: None,
283            attachments: vec![],
284        })
285    }
286}
287
288#[async_trait]
289impl Channel for SignalChannel {
290    fn name(&self) -> &str {
291        "signal"
292    }
293
294    async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
295        let params = match Self::parse_recipient_target(&message.recipient) {
296            RecipientTarget::Direct(number) => serde_json::json!({
297                "recipient": [number],
298                "message": &message.content,
299                "account": &self.account,
300            }),
301            RecipientTarget::Group(group_id) => serde_json::json!({
302                "groupId": group_id,
303                "message": &message.content,
304                "account": &self.account,
305            }),
306        };
307
308        self.rpc_request("send", params).await?;
309        Ok(())
310    }
311
312    async fn listen(&self, tx: mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
313        let mut url = reqwest::Url::parse(&format!("{}/api/v1/events", self.http_url))?;
314        url.query_pairs_mut().append_pair("account", &self.account);
315
316        tracing::info!("Signal channel listening via SSE on {}...", self.http_url);
317
318        let mut retry_delay_secs = 2u64;
319        let max_delay_secs = 60u64;
320
321        loop {
322            let resp = self
323                .http_client()
324                .get(url.clone())
325                .header("Accept", "text/event-stream")
326                .send()
327                .await;
328
329            let resp = match resp {
330                Ok(r) if r.status().is_success() => r,
331                Ok(r) => {
332                    let status = r.status();
333                    let body = r.text().await.unwrap_or_default();
334                    tracing::warn!("Signal SSE returned {status}: {body}");
335                    tokio::time::sleep(tokio::time::Duration::from_secs(retry_delay_secs)).await;
336                    retry_delay_secs = (retry_delay_secs * 2).min(max_delay_secs);
337                    continue;
338                }
339                Err(e) => {
340                    tracing::warn!("Signal SSE connect error: {e}, retrying...");
341                    tokio::time::sleep(tokio::time::Duration::from_secs(retry_delay_secs)).await;
342                    retry_delay_secs = (retry_delay_secs * 2).min(max_delay_secs);
343                    continue;
344                }
345            };
346
347            retry_delay_secs = 2;
348
349            let mut bytes_stream = resp.bytes_stream();
350            let mut buffer = String::new();
351            let mut current_data = String::new();
352
353            while let Some(chunk) = bytes_stream.next().await {
354                let chunk = match chunk {
355                    Ok(c) => c,
356                    Err(e) => {
357                        tracing::debug!("Signal SSE chunk error, reconnecting: {e}");
358                        break;
359                    }
360                };
361
362                let text = match String::from_utf8(chunk.to_vec()) {
363                    Ok(t) => t,
364                    Err(e) => {
365                        tracing::debug!("Signal SSE invalid UTF-8, skipping chunk: {}", e);
366                        continue;
367                    }
368                };
369
370                buffer.push_str(&text);
371
372                while let Some(newline_pos) = buffer.find('\n') {
373                    let line = buffer[..newline_pos].trim_end_matches('\r').to_string();
374                    buffer = buffer[newline_pos + 1..].to_string();
375
376                    // Skip SSE comments (keepalive)
377                    if line.starts_with(':') {
378                        continue;
379                    }
380
381                    if line.is_empty() {
382                        // Empty line = event boundary, dispatch accumulated data
383                        if !current_data.is_empty() {
384                            match serde_json::from_str::<SseEnvelope>(&current_data) {
385                                Ok(sse) => {
386                                    if let Some(ref envelope) = sse.envelope {
387                                        if let Some(msg) = self.process_envelope(envelope) {
388                                            if tx.send(msg).await.is_err() {
389                                                return Ok(());
390                                            }
391                                        }
392                                    }
393                                }
394                                Err(e) => {
395                                    tracing::debug!("Signal SSE parse skip: {e}");
396                                }
397                            }
398                            current_data.clear();
399                        }
400                    } else if let Some(data) = line.strip_prefix("data:") {
401                        if !current_data.is_empty() {
402                            current_data.push('\n');
403                        }
404                        current_data.push_str(data.trim_start());
405                    }
406                    // Ignore "event:", "id:", "retry:" lines
407                }
408            }
409
410            if !current_data.is_empty() {
411                match serde_json::from_str::<SseEnvelope>(&current_data) {
412                    Ok(sse) => {
413                        if let Some(ref envelope) = sse.envelope {
414                            if let Some(msg) = self.process_envelope(envelope) {
415                                let _ = tx.send(msg).await;
416                            }
417                        }
418                    }
419                    Err(e) => {
420                        tracing::debug!("Signal SSE trailing parse skip: {e}");
421                    }
422                }
423            }
424
425            tracing::debug!("Signal SSE stream ended, reconnecting...");
426            tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
427        }
428    }
429
430    async fn health_check(&self) -> bool {
431        let url = format!("{}/api/v1/check", self.http_url);
432        let Ok(resp) = self
433            .http_client()
434            .get(&url)
435            .timeout(Duration::from_secs(10))
436            .send()
437            .await
438        else {
439            return false;
440        };
441        resp.status().is_success()
442    }
443
444    async fn start_typing(&self, recipient: &str) -> anyhow::Result<()> {
445        let params = match Self::parse_recipient_target(recipient) {
446            RecipientTarget::Direct(number) => serde_json::json!({
447                "recipient": [number],
448                "account": &self.account,
449            }),
450            RecipientTarget::Group(group_id) => serde_json::json!({
451                "groupId": group_id,
452                "account": &self.account,
453            }),
454        };
455        self.rpc_request("sendTyping", params).await?;
456        Ok(())
457    }
458
459    async fn stop_typing(&self, _recipient: &str) -> anyhow::Result<()> {
460        // signal-cli doesn't have a stop-typing RPC; typing indicators
461        // auto-expire after ~15s on the client side.
462        Ok(())
463    }
464}
465
466#[cfg(test)]
467mod tests {
468    use super::*;
469
470    fn make_channel() -> SignalChannel {
471        SignalChannel::new(
472            "http://127.0.0.1:8686".to_string(),
473            "+1234567890".to_string(),
474            None,
475            vec!["+1111111111".to_string()],
476            false,
477            false,
478        )
479    }
480
481    fn make_channel_with_group(group_id: &str) -> SignalChannel {
482        SignalChannel::new(
483            "http://127.0.0.1:8686".to_string(),
484            "+1234567890".to_string(),
485            Some(group_id.to_string()),
486            vec!["*".to_string()],
487            true,
488            true,
489        )
490    }
491
492    fn make_envelope(source_number: Option<&str>, message: Option<&str>) -> Envelope {
493        Envelope {
494            source: source_number.map(String::from),
495            source_number: source_number.map(String::from),
496            data_message: message.map(|m| DataMessage {
497                message: Some(m.to_string()),
498                timestamp: Some(1_700_000_000_000),
499                group_info: None,
500                attachments: None,
501            }),
502            story_message: None,
503            timestamp: Some(1_700_000_000_000),
504        }
505    }
506
507    #[test]
508    fn creates_with_correct_fields() {
509        let ch = make_channel();
510        assert_eq!(ch.http_url, "http://127.0.0.1:8686");
511        assert_eq!(ch.account, "+1234567890");
512        assert!(ch.group_id.is_none());
513        assert_eq!(ch.allowed_from.len(), 1);
514        assert!(!ch.ignore_attachments);
515        assert!(!ch.ignore_stories);
516    }
517
518    #[test]
519    fn strips_trailing_slash() {
520        let ch = SignalChannel::new(
521            "http://127.0.0.1:8686/".to_string(),
522            "+1234567890".to_string(),
523            None,
524            vec![],
525            false,
526            false,
527        );
528        assert_eq!(ch.http_url, "http://127.0.0.1:8686");
529    }
530
531    #[test]
532    fn wildcard_allows_anyone() {
533        let ch = make_channel_with_group("dm");
534        assert!(ch.is_sender_allowed("+9999999999"));
535    }
536
537    #[test]
538    fn specific_sender_allowed() {
539        let ch = make_channel();
540        assert!(ch.is_sender_allowed("+1111111111"));
541    }
542
543    #[test]
544    fn unknown_sender_denied() {
545        let ch = make_channel();
546        assert!(!ch.is_sender_allowed("+9999999999"));
547    }
548
549    #[test]
550    fn empty_allowlist_denies_all() {
551        let ch = SignalChannel::new(
552            "http://127.0.0.1:8686".to_string(),
553            "+1234567890".to_string(),
554            None,
555            vec![],
556            false,
557            false,
558        );
559        assert!(!ch.is_sender_allowed("+1111111111"));
560    }
561
562    #[test]
563    fn name_returns_signal() {
564        let ch = make_channel();
565        assert_eq!(ch.name(), "signal");
566    }
567
568    #[test]
569    fn matches_group_no_group_id_accepts_all() {
570        let ch = make_channel();
571        let dm = DataMessage {
572            message: Some("hi".to_string()),
573            timestamp: Some(1000),
574            group_info: None,
575            attachments: None,
576        };
577        assert!(ch.matches_group(&dm));
578
579        let group = DataMessage {
580            message: Some("hi".to_string()),
581            timestamp: Some(1000),
582            group_info: Some(GroupInfo {
583                group_id: Some("group123".to_string()),
584            }),
585            attachments: None,
586        };
587        assert!(ch.matches_group(&group));
588    }
589
590    #[test]
591    fn matches_group_filters_group() {
592        let ch = make_channel_with_group("group123");
593        let matching = DataMessage {
594            message: Some("hi".to_string()),
595            timestamp: Some(1000),
596            group_info: Some(GroupInfo {
597                group_id: Some("group123".to_string()),
598            }),
599            attachments: None,
600        };
601        assert!(ch.matches_group(&matching));
602
603        let non_matching = DataMessage {
604            message: Some("hi".to_string()),
605            timestamp: Some(1000),
606            group_info: Some(GroupInfo {
607                group_id: Some("other_group".to_string()),
608            }),
609            attachments: None,
610        };
611        assert!(!ch.matches_group(&non_matching));
612    }
613
614    #[test]
615    fn matches_group_dm_keyword() {
616        let ch = make_channel_with_group("dm");
617        let dm = DataMessage {
618            message: Some("hi".to_string()),
619            timestamp: Some(1000),
620            group_info: None,
621            attachments: None,
622        };
623        assert!(ch.matches_group(&dm));
624
625        let group = DataMessage {
626            message: Some("hi".to_string()),
627            timestamp: Some(1000),
628            group_info: Some(GroupInfo {
629                group_id: Some("group123".to_string()),
630            }),
631            attachments: None,
632        };
633        assert!(!ch.matches_group(&group));
634    }
635
636    #[test]
637    fn reply_target_dm() {
638        let ch = make_channel();
639        let dm = DataMessage {
640            message: Some("hi".to_string()),
641            timestamp: Some(1000),
642            group_info: None,
643            attachments: None,
644        };
645        assert_eq!(ch.reply_target(&dm, "+1111111111"), "+1111111111");
646    }
647
648    #[test]
649    fn reply_target_group() {
650        let ch = make_channel();
651        let group = DataMessage {
652            message: Some("hi".to_string()),
653            timestamp: Some(1000),
654            group_info: Some(GroupInfo {
655                group_id: Some("group123".to_string()),
656            }),
657            attachments: None,
658        };
659        assert_eq!(ch.reply_target(&group, "+1111111111"), "group:group123");
660    }
661
662    #[test]
663    fn parse_recipient_target_e164_is_direct() {
664        assert_eq!(
665            SignalChannel::parse_recipient_target("+1234567890"),
666            RecipientTarget::Direct("+1234567890".to_string())
667        );
668    }
669
670    #[test]
671    fn parse_recipient_target_prefixed_group_is_group() {
672        assert_eq!(
673            SignalChannel::parse_recipient_target("group:abc123"),
674            RecipientTarget::Group("abc123".to_string())
675        );
676    }
677
678    #[test]
679    fn parse_recipient_target_uuid_is_direct() {
680        let uuid = "a1b2c3d4-e5f6-7890-abcd-ef1234567890";
681        assert_eq!(
682            SignalChannel::parse_recipient_target(uuid),
683            RecipientTarget::Direct(uuid.to_string())
684        );
685    }
686
687    #[test]
688    fn parse_recipient_target_non_e164_plus_is_group() {
689        assert_eq!(
690            SignalChannel::parse_recipient_target("+abc123"),
691            RecipientTarget::Group("+abc123".to_string())
692        );
693    }
694
695    #[test]
696    fn is_uuid_valid() {
697        assert!(SignalChannel::is_uuid(
698            "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
699        ));
700        assert!(SignalChannel::is_uuid(
701            "00000000-0000-0000-0000-000000000000"
702        ));
703    }
704
705    #[test]
706    fn is_uuid_invalid() {
707        assert!(!SignalChannel::is_uuid("+1234567890"));
708        assert!(!SignalChannel::is_uuid("not-a-uuid"));
709        assert!(!SignalChannel::is_uuid("group:abc123"));
710        assert!(!SignalChannel::is_uuid(""));
711    }
712
713    #[test]
714    fn sender_prefers_source_number() {
715        let env = Envelope {
716            source: Some("uuid-123".to_string()),
717            source_number: Some("+1111111111".to_string()),
718            data_message: None,
719            story_message: None,
720            timestamp: Some(1000),
721        };
722        assert_eq!(SignalChannel::sender(&env), Some("+1111111111".to_string()));
723    }
724
725    #[test]
726    fn sender_falls_back_to_source() {
727        let env = Envelope {
728            source: Some("uuid-123".to_string()),
729            source_number: None,
730            data_message: None,
731            story_message: None,
732            timestamp: Some(1000),
733        };
734        assert_eq!(SignalChannel::sender(&env), Some("uuid-123".to_string()));
735    }
736
737    #[test]
738    fn process_envelope_uuid_sender_dm() {
739        let uuid = "a1b2c3d4-e5f6-7890-abcd-ef1234567890";
740        let ch = SignalChannel::new(
741            "http://127.0.0.1:8686".to_string(),
742            "+1234567890".to_string(),
743            None,
744            vec!["*".to_string()],
745            false,
746            false,
747        );
748        let env = Envelope {
749            source: Some(uuid.to_string()),
750            source_number: None,
751            data_message: Some(DataMessage {
752                message: Some("Hello from privacy user".to_string()),
753                timestamp: Some(1_700_000_000_000),
754                group_info: None,
755                attachments: None,
756            }),
757            story_message: None,
758            timestamp: Some(1_700_000_000_000),
759        };
760        let msg = ch.process_envelope(&env).unwrap();
761        assert_eq!(msg.sender, uuid);
762        assert_eq!(msg.reply_target, uuid);
763        assert_eq!(msg.content, "Hello from privacy user");
764
765        // Verify reply routing: UUID sender in DM should route as Direct
766        let target = SignalChannel::parse_recipient_target(&msg.reply_target);
767        assert_eq!(target, RecipientTarget::Direct(uuid.to_string()));
768    }
769
770    #[test]
771    fn process_envelope_uuid_sender_in_group() {
772        let uuid = "a1b2c3d4-e5f6-7890-abcd-ef1234567890";
773        let ch = SignalChannel::new(
774            "http://127.0.0.1:8686".to_string(),
775            "+1234567890".to_string(),
776            Some("testgroup".to_string()),
777            vec!["*".to_string()],
778            false,
779            false,
780        );
781        let env = Envelope {
782            source: Some(uuid.to_string()),
783            source_number: None,
784            data_message: Some(DataMessage {
785                message: Some("Group msg from privacy user".to_string()),
786                timestamp: Some(1_700_000_000_000),
787                group_info: Some(GroupInfo {
788                    group_id: Some("testgroup".to_string()),
789                }),
790                attachments: None,
791            }),
792            story_message: None,
793            timestamp: Some(1_700_000_000_000),
794        };
795        let msg = ch.process_envelope(&env).unwrap();
796        assert_eq!(msg.sender, uuid);
797        assert_eq!(msg.reply_target, "group:testgroup");
798
799        // Verify reply routing: group message should still route as Group
800        let target = SignalChannel::parse_recipient_target(&msg.reply_target);
801        assert_eq!(target, RecipientTarget::Group("testgroup".to_string()));
802    }
803
804    #[test]
805    fn sender_none_when_both_missing() {
806        let env = Envelope {
807            source: None,
808            source_number: None,
809            data_message: None,
810            story_message: None,
811            timestamp: None,
812        };
813        assert_eq!(SignalChannel::sender(&env), None);
814    }
815
816    #[test]
817    fn process_envelope_valid_dm() {
818        let ch = make_channel();
819        let env = make_envelope(Some("+1111111111"), Some("Hello!"));
820        let msg = ch.process_envelope(&env).unwrap();
821        assert_eq!(msg.content, "Hello!");
822        assert_eq!(msg.sender, "+1111111111");
823        assert_eq!(msg.channel, "signal");
824    }
825
826    #[test]
827    fn process_envelope_denied_sender() {
828        let ch = make_channel();
829        let env = make_envelope(Some("+9999999999"), Some("Hello!"));
830        assert!(ch.process_envelope(&env).is_none());
831    }
832
833    #[test]
834    fn process_envelope_empty_message() {
835        let ch = make_channel();
836        let env = make_envelope(Some("+1111111111"), Some(""));
837        assert!(ch.process_envelope(&env).is_none());
838    }
839
840    #[test]
841    fn process_envelope_no_data_message() {
842        let ch = make_channel();
843        let env = make_envelope(Some("+1111111111"), None);
844        assert!(ch.process_envelope(&env).is_none());
845    }
846
847    #[test]
848    fn process_envelope_skips_stories() {
849        let ch = make_channel_with_group("dm");
850        let mut env = make_envelope(Some("+1111111111"), Some("story text"));
851        env.story_message = Some(serde_json::json!({}));
852        assert!(ch.process_envelope(&env).is_none());
853    }
854
855    #[test]
856    fn process_envelope_skips_attachment_only() {
857        let ch = make_channel_with_group("dm");
858        let env = Envelope {
859            source: Some("+1111111111".to_string()),
860            source_number: Some("+1111111111".to_string()),
861            data_message: Some(DataMessage {
862                message: None,
863                timestamp: Some(1_700_000_000_000),
864                group_info: None,
865                attachments: Some(vec![serde_json::json!({"contentType": "image/png"})]),
866            }),
867            story_message: None,
868            timestamp: Some(1_700_000_000_000),
869        };
870        assert!(ch.process_envelope(&env).is_none());
871    }
872
873    #[test]
874    fn sse_envelope_deserializes() {
875        let json = r#"{
876            "envelope": {
877                "source": "+1111111111",
878                "sourceNumber": "+1111111111",
879                "timestamp": 1700000000000,
880                "dataMessage": {
881                    "message": "Hello Signal!",
882                    "timestamp": 1700000000000
883                }
884            }
885        }"#;
886        let sse: SseEnvelope = serde_json::from_str(json).unwrap();
887        let env = sse.envelope.unwrap();
888        assert_eq!(env.source_number.as_deref(), Some("+1111111111"));
889        let dm = env.data_message.unwrap();
890        assert_eq!(dm.message.as_deref(), Some("Hello Signal!"));
891    }
892
893    #[test]
894    fn sse_envelope_deserializes_group() {
895        let json = r#"{
896            "envelope": {
897                "sourceNumber": "+2222222222",
898                "dataMessage": {
899                    "message": "Group msg",
900                    "groupInfo": {
901                        "groupId": "abc123"
902                    }
903                }
904            }
905        }"#;
906        let sse: SseEnvelope = serde_json::from_str(json).unwrap();
907        let env = sse.envelope.unwrap();
908        let dm = env.data_message.unwrap();
909        assert_eq!(
910            dm.group_info.as_ref().unwrap().group_id.as_deref(),
911            Some("abc123")
912        );
913    }
914
915    #[test]
916    fn envelope_defaults() {
917        let json = r#"{}"#;
918        let env: Envelope = serde_json::from_str(json).unwrap();
919        assert!(env.source.is_none());
920        assert!(env.source_number.is_none());
921        assert!(env.data_message.is_none());
922        assert!(env.story_message.is_none());
923        assert!(env.timestamp.is_none());
924    }
925}