Skip to main content

brainwires_tool_builtins/email/
gmail_push.rs

1//! Gmail push ingestion via Google Cloud Pub/Sub.
2//!
3//! This module implements the "push" path for inbound Gmail:
4//!
5//! 1. The operator registers a `users.watch` on a mailbox, pointing at a
6//!    Pub/Sub topic.
7//! 2. Pub/Sub POSTs each notification to the BrainClaw gateway webhook
8//!    `/webhooks/gmail-push`, carrying a Google-signed JWT in the
9//!    `Authorization` header.
10//! 3. The gateway calls [`GmailPushHandler::verify_push_jwt`] and
11//!    [`GmailPushHandler::parse_envelope`] to authenticate the request and
12//!    extract the watched mailbox plus history id.
13//! 4. [`GmailPushHandler::fetch_new_messages`] then calls `users.history.list`
14//!    + `users.messages.get` on the Gmail REST API to pull the actual
15//!      messages, returning them as [`EmailMessage`] records suitable for
16//!      dispatching to an agent.
17//!
18//! Watch registration is time-limited (7 days), so callers must call
19//! [`GmailPushHandler::register_watch`] on a timer (see the daemon's
20//! `app.rs` for the background task that owns this cadence).
21
22use std::sync::Arc;
23use std::time::Duration;
24
25use anyhow::{Context, Result, anyhow, bail};
26use base64::Engine as _;
27use base64::engine::general_purpose::{STANDARD, URL_SAFE_NO_PAD};
28use chrono::{DateTime, TimeZone, Utc};
29use jsonwebtoken::{Algorithm, DecodingKey, Validation, decode, decode_header};
30use parking_lot::RwLock;
31use serde::{Deserialize, Serialize};
32
33/// Default Gmail REST API base URL. Tests override this to point at a
34/// local mock server.
35const DEFAULT_GMAIL_BASE: &str = "https://gmail.googleapis.com";
36
37/// Google's OpenID Connect issuer used for Pub/Sub push tokens.
38const GOOGLE_ISSUER: &str = "https://accounts.google.com";
39
40/// Google's JWKs endpoint for RS256 push-token verification.
41pub const GOOGLE_JWKS_URL: &str = "https://www.googleapis.com/oauth2/v3/certs";
42
43/// How long a fetched JWKs set is cached before we refresh it.
44const JWKS_CACHE_TTL: Duration = Duration::from_secs(3600);
45
46/// Configuration for a [`GmailPushHandler`].
47#[derive(Debug, Clone)]
48pub struct GmailPushConfig {
49    /// GCP project id that owns the Pub/Sub topic.
50    pub project_id: String,
51    /// Fully-qualified topic name, `projects/<proj>/topics/<topic>`.
52    pub topic_name: String,
53    /// Expected `aud` claim on the Google-signed push JWT. Configured on
54    /// the Pub/Sub subscription.
55    pub push_audience: String,
56    /// Gmail labels to watch — typically `["INBOX"]`.
57    pub watched_label_ids: Vec<String>,
58    /// OAuth 2.0 access token (bearer) for the watched mailbox. This is
59    /// the *user's* token — not a service-account one — and must have the
60    /// `https://www.googleapis.com/auth/gmail.modify` scope at minimum.
61    pub oauth_token: String,
62    /// Override for the Gmail REST API base URL. Production code should
63    /// leave this at the default; tests point it at a mock server.
64    pub gmail_base_url: Option<String>,
65}
66
67impl GmailPushConfig {
68    fn gmail_base(&self) -> &str {
69        self.gmail_base_url.as_deref().unwrap_or(DEFAULT_GMAIL_BASE)
70    }
71}
72
73/// Cached JWKs set with a fetched-at timestamp.
74#[derive(Default)]
75struct JwksCache {
76    fetched_at: Option<std::time::Instant>,
77    keys: Vec<JwkEntry>,
78}
79
80/// A single RSA JWK entry we care about.
81#[derive(Debug, Clone, Deserialize)]
82struct JwkEntry {
83    kid: String,
84    #[serde(default)]
85    alg: Option<String>,
86    n: String,
87    e: String,
88}
89
90#[derive(Debug, Deserialize)]
91struct JwksResponse {
92    keys: Vec<JwkEntry>,
93}
94
95/// Pub/Sub push handler for one watched Gmail account.
96pub struct GmailPushHandler {
97    config: GmailPushConfig,
98    http: reqwest::Client,
99    jwks: Arc<RwLock<JwksCache>>,
100    jwks_url: String,
101}
102
103impl GmailPushHandler {
104    /// Construct a new push handler.
105    pub fn new(config: GmailPushConfig) -> Self {
106        let http = reqwest::Client::builder()
107            .timeout(Duration::from_secs(30))
108            .build()
109            .expect("reqwest::Client default build");
110        Self {
111            config,
112            http,
113            jwks: Arc::new(RwLock::new(JwksCache::default())),
114            jwks_url: GOOGLE_JWKS_URL.to_string(),
115        }
116    }
117
118    /// Inject a custom HTTP client — for tests that need routing to a
119    /// mock server.
120    pub fn with_http_client(mut self, http: reqwest::Client) -> Self {
121        self.http = http;
122        self
123    }
124
125    /// Override the JWKs endpoint. Production code should never call this;
126    /// tests point it at a local mock server that serves RSA public keys
127    /// matching a hand-signed test JWT.
128    pub fn with_jwks_url(mut self, url: impl Into<String>) -> Self {
129        self.jwks_url = url.into();
130        self
131    }
132
133    /// Expose the configured push audience so webhook code can double-check
134    /// claims without reaching into the private config.
135    pub fn push_audience(&self) -> &str {
136        &self.config.push_audience
137    }
138
139    /// Fetch Google's JWKs set, using the 1-hour cache.
140    async fn jwks(&self) -> Result<Vec<JwkEntry>> {
141        // Fast path: fresh cache.
142        {
143            let cache = self.jwks.read();
144            if let Some(fetched) = cache.fetched_at
145                && fetched.elapsed() < JWKS_CACHE_TTL
146                && !cache.keys.is_empty()
147            {
148                return Ok(cache.keys.clone());
149            }
150        }
151
152        let resp = self
153            .http
154            .get(&self.jwks_url)
155            .send()
156            .await
157            .context("fetch Google JWKs")?;
158        if !resp.status().is_success() {
159            bail!("JWKs endpoint returned {}", resp.status());
160        }
161        let body: JwksResponse = resp.json().await.context("parse JWKs JSON")?;
162        let keys = body.keys;
163
164        {
165            let mut cache = self.jwks.write();
166            cache.fetched_at = Some(std::time::Instant::now());
167            cache.keys = keys.clone();
168        }
169        Ok(keys)
170    }
171
172    /// Verify the Google-signed JWT delivered in the `Authorization`
173    /// header of a Pub/Sub push request.
174    ///
175    /// Returns the decoded claims on success. Fails on signature
176    /// mismatch, wrong issuer, wrong audience, or expired token.
177    pub async fn verify_push_jwt(&self, bearer_token: &str) -> Result<VerifiedPush> {
178        let token = bearer_token.trim();
179        let token = token.strip_prefix("Bearer ").unwrap_or(token).trim();
180        if token.is_empty() {
181            bail!("empty bearer token");
182        }
183
184        let header = decode_header(token).context("decode JWT header")?;
185        if header.alg != Algorithm::RS256 {
186            bail!("unexpected JWT alg {:?}; expected RS256", header.alg);
187        }
188        let kid = header
189            .kid
190            .as_ref()
191            .ok_or_else(|| anyhow!("JWT header missing kid"))?;
192
193        let jwks = self.jwks().await?;
194        let jwk = jwks
195            .iter()
196            .find(|k| &k.kid == kid)
197            .ok_or_else(|| anyhow!("no matching JWK for kid {}", kid))?;
198        if let Some(alg) = &jwk.alg
199            && alg != "RS256"
200        {
201            bail!("JWK alg {} is not RS256", alg);
202        }
203
204        let decoding_key = DecodingKey::from_rsa_components(&jwk.n, &jwk.e)
205            .context("build decoding key from JWK")?;
206
207        let mut validation = Validation::new(Algorithm::RS256);
208        validation.set_issuer(&[GOOGLE_ISSUER]);
209        let audience = [self.config.push_audience.as_str()];
210        validation.set_audience(&audience);
211        validation.validate_exp = true;
212
213        let data = decode::<GoogleJwtClaims>(token, &decoding_key, &validation)
214            .context("verify Google push JWT")?;
215
216        // `parse_envelope` operates on the body — the JWT itself only
217        // authenticates the sender. We still return the claims so the
218        // caller can cross-reference (e.g. log the service account email).
219        Ok(VerifiedPush {
220            aud: data.claims.aud,
221            sub: data.claims.email.unwrap_or(data.claims.sub),
222        })
223    }
224
225    /// Parse the Pub/Sub HTTP push envelope.
226    ///
227    /// Pub/Sub delivers JSON of the shape:
228    ///
229    /// ```json
230    /// {
231    ///   "message": { "data": "<base64>", "messageId": "...", "publishTime": "..." },
232    ///   "subscription": "projects/.../subscriptions/..."
233    /// }
234    /// ```
235    ///
236    /// The inner `data` decodes to `{ "emailAddress": ..., "historyId": ... }`.
237    pub fn parse_envelope(body: &[u8]) -> Result<PushEnvelope> {
238        #[derive(Deserialize)]
239        struct Outer {
240            message: OuterMessage,
241            #[serde(default)]
242            subscription: Option<String>,
243        }
244        #[derive(Deserialize)]
245        struct OuterMessage {
246            data: String,
247            #[serde(rename = "messageId", default)]
248            message_id: Option<String>,
249            #[serde(rename = "publishTime", default)]
250            publish_time: Option<String>,
251        }
252        #[derive(Deserialize)]
253        struct Inner {
254            #[serde(rename = "emailAddress")]
255            email_address: String,
256            #[serde(rename = "historyId")]
257            history_id: serde_json::Value,
258        }
259
260        let outer: Outer =
261            serde_json::from_slice(body).context("parse Pub/Sub push envelope JSON")?;
262        let decoded = STANDARD
263            .decode(outer.message.data.as_bytes())
264            .context("base64-decode Pub/Sub message data")?;
265        let inner: Inner =
266            serde_json::from_slice(&decoded).context("parse Gmail push inner JSON")?;
267
268        let history_id = match &inner.history_id {
269            serde_json::Value::Number(n) => n
270                .as_u64()
271                .ok_or_else(|| anyhow!("historyId is not u64: {n}"))?,
272            serde_json::Value::String(s) => s
273                .parse::<u64>()
274                .with_context(|| format!("historyId string is not u64: {s}"))?,
275            other => bail!("historyId has unexpected type: {other:?}"),
276        };
277
278        let publish_time = match &outer.message.publish_time {
279            Some(ts) => DateTime::parse_from_rfc3339(ts)
280                .map(|dt| dt.with_timezone(&Utc))
281                .unwrap_or_else(|_| Utc::now()),
282            None => Utc::now(),
283        };
284
285        Ok(PushEnvelope {
286            email_address: inner.email_address,
287            history_id,
288            publish_time,
289            message_id: outer.message.message_id,
290            subscription: outer.subscription,
291        })
292    }
293
294    /// Fetch messages added since `since_history_id` for `verified.envelope.email_address`.
295    ///
296    /// Returns the fetched messages plus the new "latest" history id that
297    /// the caller should persist. On a dry history window (no messages),
298    /// the returned history id equals `since_history_id`.
299    pub async fn fetch_new_messages(
300        &self,
301        envelope: &PushEnvelope,
302        since_history_id: u64,
303    ) -> Result<(Vec<EmailMessage>, u64)> {
304        let base = self.config.gmail_base();
305        let email = &envelope.email_address;
306
307        // Step 1: GET /gmail/v1/users/{email}/history
308        //   ?startHistoryId=<since>&historyTypes=messageAdded[&labelId=<label>]
309        let mut url = format!(
310            "{base}/gmail/v1/users/{email}/history?startHistoryId={since}&historyTypes=messageAdded",
311            base = base,
312            email = urlencoding::encode(email),
313            since = since_history_id,
314        );
315        for label in &self.config.watched_label_ids {
316            url.push_str("&labelId=");
317            url.push_str(&urlencoding::encode(label));
318        }
319
320        let hist_resp = self
321            .http
322            .get(&url)
323            .bearer_auth(&self.config.oauth_token)
324            .send()
325            .await
326            .context("Gmail history.list request")?;
327
328        if hist_resp.status() == reqwest::StatusCode::TOO_MANY_REQUESTS {
329            bail!("Gmail rate-limited history.list (429)");
330        }
331        if !hist_resp.status().is_success() {
332            let status = hist_resp.status();
333            let body = hist_resp.text().await.unwrap_or_default();
334            bail!("Gmail history.list returned {status}: {body}");
335        }
336        let hist_json: serde_json::Value = hist_resp
337            .json()
338            .await
339            .context("parse Gmail history.list JSON")?;
340
341        let mut message_ids: Vec<String> = Vec::new();
342        let mut new_history_id = since_history_id;
343
344        if let Some(top_hid) = hist_json.get("historyId").and_then(|v| v.as_str())
345            && let Ok(h) = top_hid.parse::<u64>()
346        {
347            new_history_id = new_history_id.max(h);
348        }
349
350        if let Some(entries) = hist_json.get("history").and_then(|v| v.as_array()) {
351            for entry in entries {
352                if let Some(hid) = entry.get("id").and_then(|v| v.as_str())
353                    && let Ok(h) = hid.parse::<u64>()
354                {
355                    new_history_id = new_history_id.max(h);
356                }
357                if let Some(added) = entry.get("messagesAdded").and_then(|v| v.as_array()) {
358                    for item in added {
359                        if let Some(id) = item
360                            .get("message")
361                            .and_then(|m| m.get("id"))
362                            .and_then(|v| v.as_str())
363                            && !message_ids.iter().any(|x| x == id)
364                        {
365                            message_ids.push(id.to_string());
366                        }
367                    }
368                }
369            }
370        }
371
372        // Step 2: fetch each message via /gmail/v1/users/{email}/messages/{id}?format=full
373        let mut out = Vec::with_capacity(message_ids.len());
374        for id in &message_ids {
375            let url = format!(
376                "{base}/gmail/v1/users/{email}/messages/{id}?format=full",
377                base = base,
378                email = urlencoding::encode(email),
379                id = urlencoding::encode(id),
380            );
381            let resp = self
382                .http
383                .get(&url)
384                .bearer_auth(&self.config.oauth_token)
385                .send()
386                .await
387                .context("Gmail messages.get request")?;
388            if resp.status() == reqwest::StatusCode::TOO_MANY_REQUESTS {
389                // Soft-fail: return what we have so far.
390                tracing::warn!(
391                    email = %email,
392                    message_id = %id,
393                    "Gmail rate-limited messages.get (429); returning partial batch"
394                );
395                break;
396            }
397            if !resp.status().is_success() {
398                let status = resp.status();
399                let body = resp.text().await.unwrap_or_default();
400                tracing::warn!(
401                    email = %email,
402                    message_id = %id,
403                    %status,
404                    %body,
405                    "Gmail messages.get failed"
406                );
407                continue;
408            }
409            let msg_json: serde_json::Value =
410                resp.json().await.context("parse Gmail messages.get JSON")?;
411            match parse_gmail_message(&msg_json) {
412                Ok(msg) => out.push(msg),
413                Err(e) => {
414                    tracing::warn!(error = %e, message_id = %id, "parse Gmail message failed")
415                }
416            }
417        }
418
419        Ok((out, new_history_id))
420    }
421
422    /// Register (or renew) the Gmail watch. Returns the new starting history
423    /// id and the watch's expiry.
424    pub async fn register_watch(&self) -> Result<WatchResponse> {
425        let base = self.config.gmail_base();
426        let url = format!("{base}/gmail/v1/users/me/watch");
427
428        let body = serde_json::json!({
429            "topicName": self.config.topic_name,
430            "labelIds": self.config.watched_label_ids,
431        });
432
433        let resp = self
434            .http
435            .post(&url)
436            .bearer_auth(&self.config.oauth_token)
437            .json(&body)
438            .send()
439            .await
440            .context("Gmail users.watch request")?;
441
442        if !resp.status().is_success() {
443            let status = resp.status();
444            let body = resp.text().await.unwrap_or_default();
445            bail!("Gmail users.watch returned {status}: {body}");
446        }
447        let resp_json: serde_json::Value = resp.json().await.context("parse users.watch JSON")?;
448        let history_id = resp_json
449            .get("historyId")
450            .and_then(|v| v.as_str())
451            .ok_or_else(|| anyhow!("users.watch response missing historyId"))?
452            .parse::<u64>()
453            .context("historyId is not u64")?;
454        let expiration_ms = resp_json
455            .get("expiration")
456            .and_then(|v| v.as_str())
457            .ok_or_else(|| anyhow!("users.watch response missing expiration"))?
458            .parse::<i64>()
459            .context("expiration is not i64")?;
460
461        let expiration = Utc
462            .timestamp_millis_opt(expiration_ms)
463            .single()
464            .ok_or_else(|| anyhow!("invalid expiration ms: {expiration_ms}"))?;
465
466        Ok(WatchResponse {
467            history_id,
468            expiration,
469        })
470    }
471}
472
473/// Parsed and authenticated Pub/Sub push.
474#[derive(Debug, Clone)]
475pub struct VerifiedPush {
476    /// `aud` claim from the JWT — must match the configured audience.
477    pub aud: String,
478    /// Service-account email (if present) or `sub` claim.
479    pub sub: String,
480}
481
482/// Parsed Pub/Sub push envelope.
483#[derive(Debug, Clone)]
484pub struct PushEnvelope {
485    /// The mailbox that received a new message.
486    pub email_address: String,
487    /// Opaque monotonic history cursor for this mailbox.
488    pub history_id: u64,
489    /// When Pub/Sub published the notification.
490    pub publish_time: DateTime<Utc>,
491    /// The Pub/Sub message id — used for best-effort de-dup by callers.
492    pub message_id: Option<String>,
493    /// Subscription the push came from, if reported by Pub/Sub.
494    pub subscription: Option<String>,
495}
496
497/// Parsed Gmail message suitable for agent dispatch.
498#[derive(Debug, Clone, Serialize, Deserialize)]
499pub struct EmailMessage {
500    /// Gmail message id.
501    pub id: String,
502    /// Gmail thread id.
503    pub thread_id: String,
504    /// `From:` header value.
505    pub from: String,
506    /// Parsed `To:` recipients.
507    #[serde(default)]
508    pub to: Vec<String>,
509    /// Parsed `Cc:` recipients.
510    #[serde(default)]
511    pub cc: Vec<String>,
512    /// `Subject:` header value.
513    #[serde(default)]
514    pub subject: String,
515    /// Plain-text body (best-effort extraction).
516    #[serde(default)]
517    pub body_text: String,
518    /// HTML body, when available.
519    #[serde(default)]
520    pub body_html: Option<String>,
521    /// Arrival timestamp (`internalDate` from Gmail).
522    pub received_at: DateTime<Utc>,
523    /// Gmail labels on the message.
524    #[serde(default)]
525    pub labels: Vec<String>,
526}
527
528/// Response from `users.watch`.
529#[derive(Debug, Clone, Serialize, Deserialize)]
530pub struct WatchResponse {
531    /// Starting history id — pass this as `since_history_id` on the next
532    /// push notification.
533    pub history_id: u64,
534    /// When the watch expires. Call `register_watch` again before this.
535    pub expiration: DateTime<Utc>,
536}
537
538// ── Internals ───────────────────────────────────────────────────────────
539
540#[derive(Debug, Deserialize)]
541struct GoogleJwtClaims {
542    aud: String,
543    #[allow(dead_code)]
544    iss: String,
545    sub: String,
546    #[serde(default)]
547    email: Option<String>,
548    #[allow(dead_code)]
549    #[serde(default)]
550    exp: Option<i64>,
551}
552
553fn parse_gmail_message(json: &serde_json::Value) -> Result<EmailMessage> {
554    let id = json
555        .get("id")
556        .and_then(|v| v.as_str())
557        .ok_or_else(|| anyhow!("message missing id"))?
558        .to_string();
559    let thread_id = json
560        .get("threadId")
561        .and_then(|v| v.as_str())
562        .unwrap_or("")
563        .to_string();
564    let labels: Vec<String> = json
565        .get("labelIds")
566        .and_then(|v| v.as_array())
567        .map(|a| {
568            a.iter()
569                .filter_map(|v| v.as_str().map(|s| s.to_string()))
570                .collect()
571        })
572        .unwrap_or_default();
573
574    let payload = json
575        .get("payload")
576        .ok_or_else(|| anyhow!("message missing payload"))?;
577
578    let mut from = String::new();
579    let mut subject = String::new();
580    let mut to: Vec<String> = Vec::new();
581    let mut cc: Vec<String> = Vec::new();
582
583    if let Some(headers) = payload.get("headers").and_then(|v| v.as_array()) {
584        for h in headers {
585            let name = h.get("name").and_then(|v| v.as_str()).unwrap_or("");
586            let value = h.get("value").and_then(|v| v.as_str()).unwrap_or("");
587            match name.to_ascii_lowercase().as_str() {
588                "from" => from = value.to_string(),
589                "subject" => subject = value.to_string(),
590                "to" => to = split_addresses(value),
591                "cc" => cc = split_addresses(value),
592                _ => {}
593            }
594        }
595    }
596
597    let (body_text, body_html) = extract_bodies(payload);
598
599    let received_at = json
600        .get("internalDate")
601        .and_then(|v| v.as_str())
602        .and_then(|s| s.parse::<i64>().ok())
603        .and_then(|ms| Utc.timestamp_millis_opt(ms).single())
604        .unwrap_or_else(Utc::now);
605
606    Ok(EmailMessage {
607        id,
608        thread_id,
609        from,
610        to,
611        cc,
612        subject,
613        body_text,
614        body_html,
615        received_at,
616        labels,
617    })
618}
619
620fn split_addresses(s: &str) -> Vec<String> {
621    s.split(',')
622        .map(|p| p.trim())
623        .filter(|p| !p.is_empty())
624        .map(|p| p.to_string())
625        .collect()
626}
627
628/// Walk the Gmail payload tree and pick out the first `text/plain` and
629/// `text/html` parts. Gmail base64url-encodes the body data.
630fn extract_bodies(payload: &serde_json::Value) -> (String, Option<String>) {
631    let mut text: Option<String> = None;
632    let mut html: Option<String> = None;
633    walk_parts(payload, &mut text, &mut html);
634    (text.unwrap_or_default(), html)
635}
636
637fn walk_parts(part: &serde_json::Value, text: &mut Option<String>, html: &mut Option<String>) {
638    let mime = part.get("mimeType").and_then(|v| v.as_str()).unwrap_or("");
639    let body = part.get("body");
640    let data_b64 = body
641        .and_then(|b| b.get("data"))
642        .and_then(|v| v.as_str())
643        .filter(|s| !s.is_empty());
644
645    if let Some(b64) = data_b64
646        // Gmail uses URL-safe base64 without padding.
647        && let Ok(bytes) = URL_SAFE_NO_PAD.decode(b64.trim_end_matches('='))
648        && let Ok(s) = String::from_utf8(bytes)
649    {
650        match mime {
651            "text/plain" if text.is_none() => *text = Some(s),
652            "text/html" if html.is_none() => *html = Some(s),
653            _ => {}
654        }
655    }
656
657    if let Some(parts) = part.get("parts").and_then(|v| v.as_array()) {
658        for p in parts {
659            walk_parts(p, text, html);
660            if text.is_some() && html.is_some() {
661                return;
662            }
663        }
664    }
665}
666
667#[cfg(test)]
668mod tests {
669    use super::*;
670
671    #[test]
672    fn parse_envelope_roundtrip() {
673        let inner = serde_json::json!({
674            "emailAddress": "alice@example.com",
675            "historyId": 12345u64,
676        });
677        let encoded = STANDARD.encode(serde_json::to_vec(&inner).unwrap());
678        let outer = serde_json::json!({
679            "message": {
680                "data": encoded,
681                "messageId": "pubsub-abc",
682                "publishTime": "2025-01-01T00:00:00Z",
683            },
684            "subscription": "projects/p/subscriptions/s",
685        });
686        let body = serde_json::to_vec(&outer).unwrap();
687        let env = GmailPushHandler::parse_envelope(&body).unwrap();
688        assert_eq!(env.email_address, "alice@example.com");
689        assert_eq!(env.history_id, 12345);
690        assert_eq!(env.message_id.as_deref(), Some("pubsub-abc"));
691        assert_eq!(
692            env.subscription.as_deref(),
693            Some("projects/p/subscriptions/s")
694        );
695    }
696
697    #[test]
698    fn parse_envelope_accepts_string_history_id() {
699        // Google documents historyId as uint64 but the JSON representation
700        // is often a decimal string because JavaScript can't round-trip
701        // uint64 exactly. Accept both.
702        let inner = serde_json::json!({
703            "emailAddress": "bob@example.com",
704            "historyId": "99999999999999",
705        });
706        let encoded = STANDARD.encode(serde_json::to_vec(&inner).unwrap());
707        let outer = serde_json::json!({
708            "message": { "data": encoded },
709        });
710        let body = serde_json::to_vec(&outer).unwrap();
711        let env = GmailPushHandler::parse_envelope(&body).unwrap();
712        assert_eq!(env.email_address, "bob@example.com");
713        assert_eq!(env.history_id, 99_999_999_999_999u64);
714    }
715
716    #[test]
717    fn parse_gmail_message_plain_text() {
718        let body_data = URL_SAFE_NO_PAD.encode(b"hello world");
719        let payload = serde_json::json!({
720            "id": "m-1",
721            "threadId": "t-1",
722            "labelIds": ["INBOX", "UNREAD"],
723            "internalDate": "1700000000000",
724            "payload": {
725                "mimeType": "text/plain",
726                "headers": [
727                    { "name": "From", "value": "alice@example.com" },
728                    { "name": "To",   "value": "bob@example.com, carol@example.com" },
729                    { "name": "Subject", "value": "hi" },
730                ],
731                "body": { "data": body_data },
732            }
733        });
734        let msg = parse_gmail_message(&payload).unwrap();
735        assert_eq!(msg.id, "m-1");
736        assert_eq!(msg.thread_id, "t-1");
737        assert_eq!(msg.from, "alice@example.com");
738        assert_eq!(msg.to.len(), 2);
739        assert_eq!(msg.subject, "hi");
740        assert_eq!(msg.body_text, "hello world");
741        assert!(msg.body_html.is_none());
742        assert_eq!(msg.labels, vec!["INBOX", "UNREAD"]);
743    }
744
745    #[test]
746    fn parse_gmail_message_multipart() {
747        let text_data = URL_SAFE_NO_PAD.encode(b"plain body");
748        let html_data = URL_SAFE_NO_PAD.encode(b"<p>html body</p>");
749        let payload = serde_json::json!({
750            "id": "m-2",
751            "threadId": "t-2",
752            "internalDate": "1700000000000",
753            "payload": {
754                "mimeType": "multipart/alternative",
755                "headers": [
756                    { "name": "From",    "value": "dave@example.com" },
757                    { "name": "Subject", "value": "multi" },
758                ],
759                "parts": [
760                    {
761                        "mimeType": "text/plain",
762                        "body": { "data": text_data },
763                    },
764                    {
765                        "mimeType": "text/html",
766                        "body": { "data": html_data },
767                    }
768                ]
769            }
770        });
771        let msg = parse_gmail_message(&payload).unwrap();
772        assert_eq!(msg.body_text, "plain body");
773        assert_eq!(msg.body_html.as_deref(), Some("<p>html body</p>"));
774    }
775
776    #[test]
777    fn extract_bodies_short_circuits_on_complete() {
778        let text_data = URL_SAFE_NO_PAD.encode(b"T");
779        let html_data = URL_SAFE_NO_PAD.encode(b"H");
780        let payload = serde_json::json!({
781            "mimeType": "multipart/mixed",
782            "parts": [
783                {
784                    "mimeType": "multipart/alternative",
785                    "parts": [
786                        { "mimeType": "text/plain", "body": { "data": text_data } },
787                        { "mimeType": "text/html",  "body": { "data": html_data } },
788                    ]
789                }
790            ]
791        });
792        let (t, h) = extract_bodies(&payload);
793        assert_eq!(t, "T");
794        assert_eq!(h.as_deref(), Some("H"));
795    }
796}