Skip to main content

construct/channels/
gmail_push.rs

1//! Gmail Pub/Sub push notification channel.
2//!
3//! Instead of polling via IMAP, this channel uses Google's Gmail Pub/Sub push
4//! notifications.  Google sends a POST to our webhook endpoint whenever the
5//! user's mailbox changes.  The notification body contains a base64-encoded
6//! JSON payload with `emailAddress` and `historyId`; we then call the Gmail
7//! History API to fetch newly arrived messages.
8//!
9//! ## Setup
10//!
11//! 1. Create a Google Cloud Pub/Sub topic and grant `gmail-api-push@system.gserviceaccount.com`
12//!    the **Pub/Sub Publisher** role on that topic.
13//! 2. Create a push subscription pointing to `https://<your-domain>/webhook/gmail`.
14//! 3. Configure `[channels_config.gmail_push]` in `config.toml` with `topic` and
15//!    `oauth_token` (or set `GMAIL_PUSH_OAUTH_TOKEN` env var).
16//!
17//! The channel automatically calls `users.watch` to register the subscription
18//! and renews it before the 7-day expiry.
19
20use anyhow::{Result, anyhow};
21use async_trait::async_trait;
22use base64::{Engine, engine::general_purpose::STANDARD as BASE64};
23use reqwest::Client;
24use schemars::JsonSchema;
25use serde::{Deserialize, Serialize};
26use std::fmt::Write as _;
27use std::sync::Arc;
28use std::time::{Duration, SystemTime, UNIX_EPOCH};
29use tokio::sync::{Mutex, mpsc};
30use tracing::{debug, error, info, warn};
31
32use super::traits::{Channel, ChannelMessage, SendMessage};
33
34// ── Configuration ────────────────────────────────────────────────
35
36/// Gmail Pub/Sub push notification channel configuration.
37#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
38pub struct GmailPushConfig {
39    /// Enable the Gmail push channel. Default: `false`.
40    #[serde(default)]
41    pub enabled: bool,
42    /// Google Cloud Pub/Sub topic in the form `projects/<project>/topics/<topic>`.
43    pub topic: String,
44    /// Gmail labels to watch. Default: `["INBOX"]`.
45    #[serde(default = "default_label_filter")]
46    pub label_filter: Vec<String>,
47    /// OAuth2 access token for the Gmail API.
48    /// Falls back to `GMAIL_PUSH_OAUTH_TOKEN` env var.
49    #[serde(default)]
50    pub oauth_token: String,
51    /// Allowed sender addresses/domains. Empty = deny all, `["*"]` = allow all.
52    #[serde(default)]
53    pub allowed_senders: Vec<String>,
54    /// Webhook URL that Google Pub/Sub should POST to.
55    /// Usually `https://<your-domain>/webhook/gmail`.
56    /// If empty, watch registration is skipped (useful when using external subscription management).
57    #[serde(default)]
58    pub webhook_url: String,
59    /// Shared secret for webhook authentication. If set, incoming webhook
60    /// requests must include `Authorization: Bearer <secret>`.
61    /// Falls back to `GMAIL_PUSH_WEBHOOK_SECRET` env var.
62    #[serde(default)]
63    pub webhook_secret: String,
64}
65
66fn default_label_filter() -> Vec<String> {
67    vec!["INBOX".into()]
68}
69
70impl crate::config::traits::ChannelConfig for GmailPushConfig {
71    fn name() -> &'static str {
72        "Gmail Push"
73    }
74    fn desc() -> &'static str {
75        "Gmail Pub/Sub real-time push notifications"
76    }
77}
78
79impl Default for GmailPushConfig {
80    fn default() -> Self {
81        Self {
82            enabled: false,
83            topic: String::new(),
84            label_filter: default_label_filter(),
85            oauth_token: String::new(),
86            allowed_senders: Vec::new(),
87            webhook_url: String::new(),
88            webhook_secret: String::new(),
89        }
90    }
91}
92
93// ── Pub/Sub notification payload ─────────────────────────────────
94
95/// The outer JSON envelope that Google Pub/Sub POSTs to the push endpoint.
96#[derive(Debug, Deserialize, Serialize)]
97pub struct PubSubEnvelope {
98    pub message: PubSubMessage,
99    /// Subscription name (informational).
100    #[serde(default)]
101    pub subscription: String,
102}
103
104/// A single Pub/Sub message inside the envelope.
105#[derive(Debug, Deserialize, Serialize)]
106pub struct PubSubMessage {
107    /// Base64-encoded JSON data from Gmail.
108    pub data: String,
109    /// Pub/Sub message ID.
110    #[serde(default, rename = "messageId")]
111    pub message_id: String,
112    /// Publish timestamp (RFC 3339).
113    #[serde(default, rename = "publishTime")]
114    pub publish_time: String,
115}
116
117/// The decoded payload inside `PubSubMessage.data`.
118#[derive(Debug, Deserialize, Serialize)]
119pub struct GmailNotification {
120    /// Email address of the affected mailbox.
121    #[serde(rename = "emailAddress")]
122    pub email_address: String,
123    /// History ID to use as `startHistoryId` for incremental sync.
124    #[serde(rename = "historyId")]
125    pub history_id: u64,
126}
127
128// ── Gmail API response types ─────────────────────────────────────
129
130/// Response from `GET /gmail/v1/users/me/history`.
131#[derive(Debug, Deserialize)]
132pub struct HistoryResponse {
133    pub history: Option<Vec<HistoryRecord>>,
134    #[serde(default, rename = "historyId")]
135    pub history_id: u64,
136    #[serde(default, rename = "nextPageToken")]
137    pub next_page_token: Option<String>,
138}
139
140/// A single history record containing messages added to the mailbox.
141#[derive(Debug, Deserialize)]
142pub struct HistoryRecord {
143    #[serde(default, rename = "messagesAdded")]
144    pub messages_added: Vec<MessageAdded>,
145}
146
147/// Wrapper for a newly added message reference.
148#[derive(Debug, Deserialize)]
149pub struct MessageAdded {
150    pub message: MessageRef,
151}
152
153/// Minimal message reference returned by the history API.
154#[derive(Debug, Deserialize)]
155pub struct MessageRef {
156    pub id: String,
157    #[serde(default, rename = "threadId")]
158    pub thread_id: String,
159}
160
161/// Full message returned by `GET /gmail/v1/users/me/messages/{id}`.
162#[derive(Debug, Deserialize)]
163pub struct GmailMessage {
164    pub id: String,
165    #[serde(default, rename = "threadId")]
166    pub thread_id: String,
167    #[serde(default)]
168    pub snippet: String,
169    pub payload: Option<MessagePayload>,
170    #[serde(default, rename = "internalDate")]
171    pub internal_date: String,
172}
173
174/// Message payload with headers and parts.
175#[derive(Debug, Deserialize)]
176pub struct MessagePayload {
177    #[serde(default)]
178    pub headers: Vec<MessageHeader>,
179    pub body: Option<MessageBody>,
180    #[serde(default)]
181    pub parts: Vec<MessagePart>,
182    #[serde(default, rename = "mimeType")]
183    pub mime_type: String,
184}
185
186/// A single email header (name/value pair).
187#[derive(Debug, Deserialize)]
188pub struct MessageHeader {
189    pub name: String,
190    pub value: String,
191}
192
193/// Message body with optional base64-encoded data.
194#[derive(Debug, Deserialize)]
195pub struct MessageBody {
196    #[serde(default)]
197    pub data: Option<String>,
198    #[serde(default)]
199    pub size: u64,
200}
201
202/// A MIME part of a multipart message.
203#[derive(Debug, Deserialize)]
204pub struct MessagePart {
205    #[serde(default, rename = "mimeType")]
206    pub mime_type: String,
207    pub body: Option<MessageBody>,
208    #[serde(default)]
209    pub parts: Vec<MessagePart>,
210    #[serde(default)]
211    pub filename: String,
212}
213
214/// Response from `POST /gmail/v1/users/me/watch`.
215#[derive(Debug, Deserialize)]
216pub struct WatchResponse {
217    #[serde(default, rename = "historyId")]
218    pub history_id: u64,
219    #[serde(default)]
220    pub expiration: String,
221}
222
223// ── Channel implementation ───────────────────────────────────────
224
225/// Gmail Pub/Sub push notification channel.
226///
227/// Incoming messages arrive via webhook (`POST /webhook/gmail`) and are
228/// dispatched to the agent.  The `listen` method registers the Gmail watch
229/// subscription and periodically renews it.
230pub struct GmailPushChannel {
231    pub config: GmailPushConfig,
232    http: Client,
233    last_history_id: Arc<Mutex<u64>>,
234    /// Sender half injected by the gateway to forward webhook-received messages.
235    pub tx: Arc<Mutex<Option<mpsc::Sender<ChannelMessage>>>>,
236}
237
238impl GmailPushChannel {
239    pub fn new(config: GmailPushConfig) -> Self {
240        let http = Client::builder()
241            .timeout(Duration::from_secs(30))
242            .build()
243            .expect("failed to build HTTP client");
244        Self {
245            config,
246            http,
247            last_history_id: Arc::new(Mutex::new(0)),
248            tx: Arc::new(Mutex::new(None)),
249        }
250    }
251
252    /// Resolve the webhook secret from config or environment.
253    pub fn resolve_webhook_secret(&self) -> String {
254        if !self.config.webhook_secret.is_empty() {
255            return self.config.webhook_secret.clone();
256        }
257        std::env::var("GMAIL_PUSH_WEBHOOK_SECRET").unwrap_or_default()
258    }
259
260    /// Resolve the OAuth token from config or environment.
261    pub fn resolve_oauth_token(&self) -> String {
262        if !self.config.oauth_token.is_empty() {
263            return self.config.oauth_token.clone();
264        }
265        std::env::var("GMAIL_PUSH_OAUTH_TOKEN").unwrap_or_default()
266    }
267
268    /// Register a Gmail watch subscription via `POST /gmail/v1/users/me/watch`.
269    pub async fn register_watch(&self) -> Result<WatchResponse> {
270        let token = self.resolve_oauth_token();
271        if token.is_empty() {
272            return Err(anyhow!("Gmail OAuth token is not configured"));
273        }
274
275        let body = serde_json::json!({
276            "topicName": self.config.topic,
277            "labelIds": self.config.label_filter,
278        });
279
280        let resp = self
281            .http
282            .post("https://gmail.googleapis.com/gmail/v1/users/me/watch")
283            .bearer_auth(&token)
284            .json(&body)
285            .send()
286            .await?;
287
288        if !resp.status().is_success() {
289            let status = resp.status();
290            let text = resp.text().await.unwrap_or_default();
291            return Err(anyhow!(
292                "Gmail watch registration failed ({}): {}",
293                status,
294                text
295            ));
296        }
297
298        let watch: WatchResponse = resp.json().await?;
299        let mut last_id = self.last_history_id.lock().await;
300        if *last_id == 0 {
301            *last_id = watch.history_id;
302        }
303        info!(
304            "Gmail watch registered — historyId={}, expiration={}",
305            watch.history_id, watch.expiration
306        );
307        Ok(watch)
308    }
309
310    /// Fetch new messages since the given `start_history_id` using the History API.
311    pub async fn fetch_history(&self, start_history_id: u64) -> Result<Vec<String>> {
312        let mut last_id = self.last_history_id.lock().await;
313        self.fetch_history_inner(start_history_id, &mut last_id)
314            .await
315    }
316
317    /// Inner history fetch that takes an already-locked history ID reference.
318    /// This allows callers that already hold the lock to avoid deadlock.
319    async fn fetch_history_inner(
320        &self,
321        start_history_id: u64,
322        last_id: &mut u64,
323    ) -> Result<Vec<String>> {
324        let token = self.resolve_oauth_token();
325        if token.is_empty() {
326            return Err(anyhow!("Gmail OAuth token is not configured"));
327        }
328
329        let mut message_ids = Vec::new();
330        let mut page_token: Option<String> = None;
331
332        loop {
333            let mut url = format!(
334                "https://gmail.googleapis.com/gmail/v1/users/me/history?startHistoryId={}&historyTypes=messageAdded",
335                start_history_id
336            );
337            if let Some(ref pt) = page_token {
338                let _ = write!(url, "&pageToken={pt}");
339            }
340
341            let resp = self.http.get(&url).bearer_auth(&token).send().await?;
342
343            if !resp.status().is_success() {
344                let status = resp.status();
345                let text = resp.text().await.unwrap_or_default();
346                return Err(anyhow!("Gmail history fetch failed ({}): {}", status, text));
347            }
348
349            let history_resp: HistoryResponse = resp.json().await?;
350
351            if let Some(records) = history_resp.history {
352                for record in records {
353                    for added in record.messages_added {
354                        message_ids.push(added.message.id);
355                    }
356                }
357            }
358
359            // Update tracked history ID
360            if history_resp.history_id > 0 && history_resp.history_id > *last_id {
361                *last_id = history_resp.history_id;
362            }
363
364            match history_resp.next_page_token {
365                Some(token) => page_token = Some(token),
366                None => break,
367            }
368        }
369
370        Ok(message_ids)
371    }
372
373    /// Fetch a full message by ID from the Gmail API.
374    pub async fn fetch_message(&self, message_id: &str) -> Result<GmailMessage> {
375        let token = self.resolve_oauth_token();
376        let url = format!(
377            "https://gmail.googleapis.com/gmail/v1/users/me/messages/{}?format=full",
378            message_id
379        );
380
381        let resp = self.http.get(&url).bearer_auth(&token).send().await?;
382
383        if !resp.status().is_success() {
384            let status = resp.status();
385            let text = resp.text().await.unwrap_or_default();
386            return Err(anyhow!("Gmail message fetch failed ({}): {}", status, text));
387        }
388
389        Ok(resp.json().await?)
390    }
391
392    /// Check if a sender email is in the allowlist.
393    pub fn is_sender_allowed(&self, email: &str) -> bool {
394        if self.config.allowed_senders.is_empty() {
395            return false;
396        }
397        if self.config.allowed_senders.iter().any(|a| a == "*") {
398            return true;
399        }
400        let email_lower = email.to_lowercase();
401        self.config.allowed_senders.iter().any(|allowed| {
402            if allowed.starts_with('@') {
403                email_lower.ends_with(&allowed.to_lowercase())
404            } else if allowed.contains('@') {
405                allowed.eq_ignore_ascii_case(email)
406            } else {
407                email_lower.ends_with(&format!("@{}", allowed.to_lowercase()))
408            }
409        })
410    }
411
412    /// Process a Pub/Sub push notification and dispatch new messages to the agent.
413    pub async fn handle_notification(&self, envelope: &PubSubEnvelope) -> Result<()> {
414        let notification = parse_notification(&envelope.message)?;
415        debug!(
416            "Gmail push notification: email={}, historyId={}",
417            notification.email_address, notification.history_id
418        );
419
420        // Hold the lock across read-fetch-update to prevent duplicate
421        // processing when concurrent webhook notifications arrive.
422        let mut last_id = self.last_history_id.lock().await;
423
424        if *last_id == 0 {
425            // First notification — just record the history ID.
426            *last_id = notification.history_id;
427            info!(
428                "Gmail push: first notification, seeding historyId={}",
429                notification.history_id
430            );
431            return Ok(());
432        }
433
434        let start_id = *last_id;
435        let message_ids = self.fetch_history_inner(start_id, &mut last_id).await?;
436        // Explicitly drop the lock before doing network-heavy message fetching.
437        drop(last_id);
438
439        if message_ids.is_empty() {
440            debug!("Gmail push: no new messages in history");
441            return Ok(());
442        }
443
444        info!(
445            "Gmail push: {} new message(s) to process",
446            message_ids.len()
447        );
448
449        // Clone the sender and drop the mutex immediately to avoid holding it
450        // across network calls.
451        let tx = {
452            let tx_guard = self.tx.lock().await;
453            match tx_guard.clone() {
454                Some(tx) => tx,
455                None => {
456                    warn!("Gmail push: no listener registered, dropping messages");
457                    return Ok(());
458                }
459            }
460        };
461
462        for msg_id in message_ids {
463            match self.fetch_message(&msg_id).await {
464                Ok(gmail_msg) => {
465                    let sender = extract_header(&gmail_msg, "From").unwrap_or_default();
466                    let sender_email = extract_email_from_header(&sender);
467
468                    if !self.is_sender_allowed(&sender_email) {
469                        warn!("Gmail push: blocked message from {}", sender_email);
470                        continue;
471                    }
472
473                    let subject = extract_header(&gmail_msg, "Subject").unwrap_or_default();
474                    let body_text = extract_body_text(&gmail_msg);
475
476                    let content = format!("Subject: {subject}\n\n{body_text}");
477                    let timestamp = gmail_msg
478                        .internal_date
479                        .parse::<u64>()
480                        .map(|ms| ms / 1000)
481                        .unwrap_or_else(|_| {
482                            SystemTime::now()
483                                .duration_since(UNIX_EPOCH)
484                                .map(|d| d.as_secs())
485                                .unwrap_or(0)
486                        });
487
488                    let channel_msg = ChannelMessage {
489                        id: format!("gmail_{}", gmail_msg.id),
490                        reply_target: sender_email.clone(),
491                        sender: sender_email,
492                        content,
493                        channel: "gmail_push".to_string(),
494                        timestamp,
495                        thread_ts: Some(gmail_msg.thread_id),
496                        interruption_scope_id: None,
497                        attachments: Vec::new(),
498                    };
499
500                    if tx.send(channel_msg).await.is_err() {
501                        debug!("Gmail push: listener channel closed");
502                        return Ok(());
503                    }
504                }
505                Err(e) => {
506                    error!("Gmail push: failed to fetch message {}: {}", msg_id, e);
507                }
508            }
509        }
510
511        Ok(())
512    }
513}
514
515#[async_trait]
516impl Channel for GmailPushChannel {
517    fn name(&self) -> &str {
518        "gmail_push"
519    }
520
521    async fn send(&self, message: &SendMessage) -> Result<()> {
522        // Send via Gmail API (drafts.send or messages.send)
523        let token = self.resolve_oauth_token();
524        if token.is_empty() {
525            return Err(anyhow!("Gmail OAuth token is not configured for sending"));
526        }
527
528        let subject = message.subject.as_deref().unwrap_or("Construct Message");
529        // Sanitize headers to prevent CRLF injection attacks.
530        let safe_recipient = sanitize_header_value(&message.recipient);
531        let safe_subject = sanitize_header_value(subject);
532        let rfc2822 = format!(
533            "To: {}\r\nSubject: {}\r\nContent-Type: text/plain; charset=utf-8\r\n\r\n{}",
534            safe_recipient, safe_subject, message.content
535        );
536        let encoded = BASE64.encode(rfc2822.as_bytes());
537        // Gmail API uses URL-safe base64 with no padding
538        let url_safe = encoded.replace('+', "-").replace('/', "_").replace('=', "");
539
540        let body = serde_json::json!({
541            "raw": url_safe,
542        });
543
544        let resp = self
545            .http
546            .post("https://gmail.googleapis.com/gmail/v1/users/me/messages/send")
547            .bearer_auth(&token)
548            .json(&body)
549            .send()
550            .await?;
551
552        if !resp.status().is_success() {
553            let status = resp.status();
554            let text = resp.text().await.unwrap_or_default();
555            return Err(anyhow!("Gmail send failed ({}): {}", status, text));
556        }
557
558        info!("Gmail message sent to {}", message.recipient);
559        Ok(())
560    }
561
562    async fn listen(&self, tx: mpsc::Sender<ChannelMessage>) -> Result<()> {
563        // Store the sender for webhook-driven message dispatch
564        {
565            let mut tx_guard = self.tx.lock().await;
566            *tx_guard = Some(tx);
567        }
568
569        info!("Gmail push channel started — registering watch subscription");
570
571        // Register initial watch
572        if !self.config.webhook_url.is_empty() {
573            if let Err(e) = self.register_watch().await {
574                error!("Gmail watch registration failed: {e:#}");
575                // Non-fatal — external subscription management may be in use
576            }
577        }
578
579        // Renewal loop: Gmail watch subscriptions expire after 7 days.
580        // Re-register every 6 days to maintain continuous coverage.
581        let renewal_interval = Duration::from_secs(6 * 24 * 60 * 60); // 6 days
582        loop {
583            tokio::time::sleep(renewal_interval).await;
584            info!("Gmail push: renewing watch subscription");
585            if let Err(e) = self.register_watch().await {
586                error!("Gmail watch renewal failed: {e:#}");
587            }
588        }
589    }
590
591    async fn health_check(&self) -> bool {
592        let token = self.resolve_oauth_token();
593        if token.is_empty() {
594            return false;
595        }
596
597        match self
598            .http
599            .get("https://gmail.googleapis.com/gmail/v1/users/me/profile")
600            .bearer_auth(&token)
601            .timeout(Duration::from_secs(10))
602            .send()
603            .await
604        {
605            Ok(resp) => resp.status().is_success(),
606            Err(_) => false,
607        }
608    }
609}
610
611// ── Helper functions ─────────────────────────────────────────────
612
613/// Parse and decode the Gmail notification from a Pub/Sub message.
614pub fn parse_notification(msg: &PubSubMessage) -> Result<GmailNotification> {
615    let decoded = BASE64
616        .decode(&msg.data)
617        .map_err(|e| anyhow!("Invalid base64 in Pub/Sub message: {e}"))?;
618    let notification: GmailNotification = serde_json::from_slice(&decoded)
619        .map_err(|e| anyhow!("Invalid JSON in Gmail notification: {e}"))?;
620    Ok(notification)
621}
622
623/// Extract a header value from a Gmail message by name.
624pub fn extract_header(msg: &GmailMessage, name: &str) -> Option<String> {
625    msg.payload.as_ref().and_then(|p| {
626        p.headers
627            .iter()
628            .find(|h| h.name.eq_ignore_ascii_case(name))
629            .map(|h| h.value.clone())
630    })
631}
632
633/// Extract the plain email address from a `From` header value like `"Name <email@example.com>"`.
634pub fn extract_email_from_header(from: &str) -> String {
635    if let Some(start) = from.find('<') {
636        // Use rfind to find the matching '>' after '<', preventing panic
637        // when malformed headers have '>' before '<'.
638        if let Some(end) = from.rfind('>') {
639            if end > start + 1 {
640                return from[start + 1..end].to_string();
641            }
642        }
643    }
644    from.trim().to_string()
645}
646
647/// Sanitize a string for use in an RFC 2822 header value.
648/// Removes CR and LF characters to prevent header injection attacks.
649pub fn sanitize_header_value(value: &str) -> String {
650    value.chars().filter(|c| *c != '\r' && *c != '\n').collect()
651}
652
653/// Extract the plain-text body from a Gmail message.
654///
655/// Walks MIME parts looking for `text/plain`; falls back to `text/html`
656/// with basic tag stripping; finally falls back to the `snippet`.
657pub fn extract_body_text(msg: &GmailMessage) -> String {
658    if let Some(ref payload) = msg.payload {
659        // Single-part message
660        if payload.mime_type == "text/plain" {
661            if let Some(text) = decode_body(payload.body.as_ref()) {
662                return text;
663            }
664        }
665
666        // Multipart — walk parts
667        if let Some(text) = find_text_in_parts(&payload.parts, "text/plain") {
668            return text;
669        }
670        if let Some(html) = find_text_in_parts(&payload.parts, "text/html") {
671            return strip_html(&html);
672        }
673    }
674
675    // Fallback to snippet
676    msg.snippet.clone()
677}
678
679/// Recursively search MIME parts for a given content type.
680fn find_text_in_parts(parts: &[MessagePart], mime_type: &str) -> Option<String> {
681    for part in parts {
682        if part.mime_type == mime_type {
683            if let Some(text) = decode_body(part.body.as_ref()) {
684                return Some(text);
685            }
686        }
687        // Recurse into nested parts
688        if let Some(text) = find_text_in_parts(&part.parts, mime_type) {
689            return Some(text);
690        }
691    }
692    None
693}
694
695/// Decode a base64url-encoded Gmail message body.
696fn decode_body(body: Option<&MessageBody>) -> Option<String> {
697    body.and_then(|b| {
698        b.data.as_ref().and_then(|data| {
699            // Gmail API uses URL-safe base64 without padding
700            let standard = data.replace('-', "+").replace('_', "/");
701            // Restore padding stripped by Gmail API
702            let padded = match standard.len() % 4 {
703                2 => format!("{standard}=="),
704                3 => format!("{standard}="),
705                _ => standard,
706            };
707            BASE64
708                .decode(&padded)
709                .ok()
710                .and_then(|bytes| String::from_utf8(bytes).ok())
711        })
712    })
713}
714
715/// Basic HTML tag stripper (reuses the pattern from email_channel).
716fn strip_html(html: &str) -> String {
717    let mut result = String::new();
718    let mut in_tag = false;
719    for ch in html.chars() {
720        match ch {
721            '<' => in_tag = true,
722            '>' => in_tag = false,
723            _ if !in_tag => result.push(ch),
724            _ => {}
725        }
726    }
727    let mut normalized = String::with_capacity(result.len());
728    for word in result.split_whitespace() {
729        if !normalized.is_empty() {
730            normalized.push(' ');
731        }
732        normalized.push_str(word);
733    }
734    normalized
735}
736
737// ── Tests ────────────────────────────────────────────────────────
738
739#[cfg(test)]
740mod tests {
741    use super::*;
742
743    // ── Notification parsing ─────────────────────────────────────
744
745    #[test]
746    fn parse_notification_valid() {
747        let payload = serde_json::json!({
748            "emailAddress": "user@example.com",
749            "historyId": 12345
750        });
751        let encoded = BASE64.encode(serde_json::to_vec(&payload).unwrap());
752
753        let msg = PubSubMessage {
754            data: encoded,
755            message_id: "msg-1".into(),
756            publish_time: "2026-03-21T08:00:00Z".into(),
757        };
758
759        let notification = parse_notification(&msg).unwrap();
760        assert_eq!(notification.email_address, "user@example.com");
761        assert_eq!(notification.history_id, 12345);
762    }
763
764    #[test]
765    fn parse_notification_invalid_base64() {
766        let msg = PubSubMessage {
767            data: "!!!not-base64!!!".into(),
768            message_id: "msg-2".into(),
769            publish_time: String::new(),
770        };
771        assert!(parse_notification(&msg).is_err());
772    }
773
774    #[test]
775    fn parse_notification_invalid_json() {
776        let encoded = BASE64.encode(b"not json at all");
777        let msg = PubSubMessage {
778            data: encoded,
779            message_id: "msg-3".into(),
780            publish_time: String::new(),
781        };
782        assert!(parse_notification(&msg).is_err());
783    }
784
785    // ── Envelope deserialization ─────────────────────────────────
786
787    #[test]
788    fn pubsub_envelope_deserialize() {
789        let payload = serde_json::json!({
790            "emailAddress": "test@gmail.com",
791            "historyId": 999
792        });
793        let encoded = BASE64.encode(serde_json::to_vec(&payload).unwrap());
794
795        let json = serde_json::json!({
796            "message": {
797                "data": encoded,
798                "messageId": "pubsub-1",
799                "publishTime": "2026-03-21T10:00:00Z"
800            },
801            "subscription": "projects/my-project/subscriptions/gmail-push"
802        });
803
804        let envelope: PubSubEnvelope = serde_json::from_value(json).unwrap();
805        assert_eq!(envelope.message.message_id, "pubsub-1");
806        assert_eq!(
807            envelope.subscription,
808            "projects/my-project/subscriptions/gmail-push"
809        );
810
811        let notification = parse_notification(&envelope.message).unwrap();
812        assert_eq!(notification.email_address, "test@gmail.com");
813        assert_eq!(notification.history_id, 999);
814    }
815
816    // ── Email extraction from From header ────────────────────────
817
818    #[test]
819    fn extract_email_from_header_angle_brackets() {
820        assert_eq!(
821            extract_email_from_header("John Doe <john@example.com>"),
822            "john@example.com"
823        );
824    }
825
826    #[test]
827    fn extract_email_from_header_bare_email() {
828        assert_eq!(
829            extract_email_from_header("user@example.com"),
830            "user@example.com"
831        );
832    }
833
834    #[test]
835    fn extract_email_from_header_empty() {
836        assert_eq!(extract_email_from_header(""), "");
837    }
838
839    #[test]
840    fn extract_email_with_quotes() {
841        assert_eq!(
842            extract_email_from_header("\"Doe, John\" <john@example.com>"),
843            "john@example.com"
844        );
845    }
846
847    #[test]
848    fn extract_email_malformed_angle_brackets() {
849        // '>' before '<' with no proper closing — falls back to full trimmed string
850        assert_eq!(
851            extract_email_from_header("attacker> <victim@example.com"),
852            "attacker> <victim@example.com"
853        );
854        // Properly closed after the second '<'
855        assert_eq!(
856            extract_email_from_header("attacker> <victim@example.com>"),
857            "victim@example.com"
858        );
859        // No closing '>' at all
860        assert_eq!(extract_email_from_header("Name <broken"), "Name <broken");
861    }
862
863    #[test]
864    fn sanitize_header_strips_crlf() {
865        assert_eq!(
866            sanitize_header_value("normal@example.com"),
867            "normal@example.com"
868        );
869        assert_eq!(
870            sanitize_header_value("evil@example.com\r\nBcc: spy@evil.com"),
871            "evil@example.comBcc: spy@evil.com"
872        );
873        assert_eq!(
874            sanitize_header_value("inject\nSubject: fake"),
875            "injectSubject: fake"
876        );
877    }
878
879    // ── Header extraction ────────────────────────────────────────
880
881    #[test]
882    fn extract_header_found() {
883        let msg = GmailMessage {
884            id: "msg-1".into(),
885            thread_id: "thread-1".into(),
886            snippet: String::new(),
887            payload: Some(MessagePayload {
888                headers: vec![
889                    MessageHeader {
890                        name: "From".into(),
891                        value: "sender@example.com".into(),
892                    },
893                    MessageHeader {
894                        name: "Subject".into(),
895                        value: "Test Subject".into(),
896                    },
897                ],
898                body: None,
899                parts: Vec::new(),
900                mime_type: String::new(),
901            }),
902            internal_date: "0".into(),
903        };
904
905        assert_eq!(
906            extract_header(&msg, "Subject"),
907            Some("Test Subject".to_string())
908        );
909        assert_eq!(
910            extract_header(&msg, "from"), // case-insensitive
911            Some("sender@example.com".to_string())
912        );
913        assert_eq!(extract_header(&msg, "X-Missing"), None);
914    }
915
916    #[test]
917    fn extract_header_no_payload() {
918        let msg = GmailMessage {
919            id: "msg-2".into(),
920            thread_id: String::new(),
921            snippet: String::new(),
922            payload: None,
923            internal_date: "0".into(),
924        };
925        assert_eq!(extract_header(&msg, "Subject"), None);
926    }
927
928    // ── Body text extraction ─────────────────────────────────────
929
930    #[test]
931    fn extract_body_text_plain() {
932        let plain_b64 = BASE64
933            .encode(b"Hello, world!")
934            .replace('+', "-")
935            .replace('/', "_")
936            .replace('=', "");
937
938        let msg = GmailMessage {
939            id: "msg-3".into(),
940            thread_id: String::new(),
941            snippet: "snippet".into(),
942            payload: Some(MessagePayload {
943                headers: Vec::new(),
944                body: Some(MessageBody {
945                    data: Some(plain_b64),
946                    size: 13,
947                }),
948                parts: Vec::new(),
949                mime_type: "text/plain".into(),
950            }),
951            internal_date: "0".into(),
952        };
953
954        assert_eq!(extract_body_text(&msg), "Hello, world!");
955    }
956
957    #[test]
958    fn extract_body_text_multipart() {
959        let html_b64 = BASE64
960            .encode(b"<p>Hello</p>")
961            .replace('+', "-")
962            .replace('/', "_")
963            .replace('=', "");
964
965        let msg = GmailMessage {
966            id: "msg-4".into(),
967            thread_id: String::new(),
968            snippet: "snippet".into(),
969            payload: Some(MessagePayload {
970                headers: Vec::new(),
971                body: None,
972                parts: vec![MessagePart {
973                    mime_type: "text/html".into(),
974                    body: Some(MessageBody {
975                        data: Some(html_b64),
976                        size: 12,
977                    }),
978                    parts: Vec::new(),
979                    filename: String::new(),
980                }],
981                mime_type: "multipart/alternative".into(),
982            }),
983            internal_date: "0".into(),
984        };
985
986        assert_eq!(extract_body_text(&msg), "Hello");
987    }
988
989    #[test]
990    fn extract_body_text_fallback_to_snippet() {
991        let msg = GmailMessage {
992            id: "msg-5".into(),
993            thread_id: String::new(),
994            snippet: "My snippet text".into(),
995            payload: Some(MessagePayload {
996                headers: Vec::new(),
997                body: None,
998                parts: Vec::new(),
999                mime_type: "multipart/mixed".into(),
1000            }),
1001            internal_date: "0".into(),
1002        };
1003
1004        assert_eq!(extract_body_text(&msg), "My snippet text");
1005    }
1006
1007    // ── Sender allowlist ─────────────────────────────────────────
1008
1009    #[test]
1010    fn sender_allowed_empty_denies() {
1011        let ch = GmailPushChannel::new(GmailPushConfig::default());
1012        assert!(!ch.is_sender_allowed("anyone@example.com"));
1013    }
1014
1015    #[test]
1016    fn sender_allowed_wildcard() {
1017        let ch = GmailPushChannel::new(GmailPushConfig {
1018            allowed_senders: vec!["*".into()],
1019            ..Default::default()
1020        });
1021        assert!(ch.is_sender_allowed("anyone@example.com"));
1022    }
1023
1024    #[test]
1025    fn sender_allowed_specific_email() {
1026        let ch = GmailPushChannel::new(GmailPushConfig {
1027            allowed_senders: vec!["user@example.com".into()],
1028            ..Default::default()
1029        });
1030        assert!(ch.is_sender_allowed("user@example.com"));
1031        assert!(!ch.is_sender_allowed("other@example.com"));
1032    }
1033
1034    #[test]
1035    fn sender_allowed_domain_with_at() {
1036        let ch = GmailPushChannel::new(GmailPushConfig {
1037            allowed_senders: vec!["@example.com".into()],
1038            ..Default::default()
1039        });
1040        assert!(ch.is_sender_allowed("user@example.com"));
1041        assert!(ch.is_sender_allowed("admin@example.com"));
1042        assert!(!ch.is_sender_allowed("user@other.com"));
1043    }
1044
1045    #[test]
1046    fn sender_allowed_domain_without_at() {
1047        let ch = GmailPushChannel::new(GmailPushConfig {
1048            allowed_senders: vec!["example.com".into()],
1049            ..Default::default()
1050        });
1051        assert!(ch.is_sender_allowed("user@example.com"));
1052        assert!(!ch.is_sender_allowed("user@other.com"));
1053    }
1054
1055    // ── Strip HTML ───────────────────────────────────────────────
1056
1057    #[test]
1058    fn strip_html_basic() {
1059        assert_eq!(strip_html("<p>Hello</p>"), "Hello");
1060    }
1061
1062    #[test]
1063    fn strip_html_nested() {
1064        assert_eq!(
1065            strip_html("<div><p>Hello <b>World</b></p></div>"),
1066            "Hello World"
1067        );
1068    }
1069
1070    // ── Config defaults ──────────────────────────────────────────
1071
1072    #[test]
1073    fn config_default_values() {
1074        let config = GmailPushConfig::default();
1075        assert!(!config.enabled);
1076        assert!(config.topic.is_empty());
1077        assert_eq!(config.label_filter, vec!["INBOX"]);
1078        assert!(config.oauth_token.is_empty());
1079        assert!(config.allowed_senders.is_empty());
1080        assert!(config.webhook_url.is_empty());
1081    }
1082
1083    #[test]
1084    fn config_deserialize_with_defaults() {
1085        let json = r#"{"topic": "projects/my-proj/topics/gmail"}"#;
1086        let config: GmailPushConfig = serde_json::from_str(json).unwrap();
1087        assert!(!config.enabled);
1088        assert_eq!(config.topic, "projects/my-proj/topics/gmail");
1089        assert_eq!(config.label_filter, vec!["INBOX"]);
1090    }
1091
1092    #[test]
1093    fn config_serialize_roundtrip() {
1094        let config = GmailPushConfig {
1095            enabled: true,
1096            topic: "projects/test/topics/gmail".into(),
1097            label_filter: vec!["INBOX".into(), "IMPORTANT".into()],
1098            oauth_token: "test-token".into(),
1099            allowed_senders: vec!["@example.com".into()],
1100            webhook_url: "https://example.com/webhook/gmail".into(),
1101            webhook_secret: "my-secret".into(),
1102        };
1103        let json = serde_json::to_string(&config).unwrap();
1104        let deserialized: GmailPushConfig = serde_json::from_str(&json).unwrap();
1105        assert_eq!(deserialized.topic, config.topic);
1106        assert_eq!(deserialized.label_filter, config.label_filter);
1107        assert_eq!(deserialized.webhook_url, config.webhook_url);
1108    }
1109
1110    // ── Channel name ─────────────────────────────────────────────
1111
1112    #[test]
1113    fn channel_name() {
1114        let ch = GmailPushChannel::new(GmailPushConfig::default());
1115        assert_eq!(ch.name(), "gmail_push");
1116    }
1117
1118    // ── Decode body ──────────────────────────────────────────────
1119
1120    #[test]
1121    fn decode_body_none() {
1122        assert!(decode_body(None).is_none());
1123    }
1124
1125    #[test]
1126    fn decode_body_empty_data() {
1127        let body = MessageBody {
1128            data: None,
1129            size: 0,
1130        };
1131        assert!(decode_body(Some(&body)).is_none());
1132    }
1133
1134    #[test]
1135    fn decode_body_valid() {
1136        let b64 = BASE64
1137            .encode(b"test content")
1138            .replace('+', "-")
1139            .replace('/', "_")
1140            .replace('=', "");
1141        let body = MessageBody {
1142            data: Some(b64),
1143            size: 12,
1144        };
1145        assert_eq!(decode_body(Some(&body)), Some("test content".to_string()));
1146    }
1147}