1use std::sync::Arc;
23use std::time::Duration;
24
25use anyhow::{Context, Result, anyhow, bail};
26use base64::Engine as _;
27use base64::engine::general_purpose::{STANDARD, URL_SAFE_NO_PAD};
28use chrono::{DateTime, TimeZone, Utc};
29use jsonwebtoken::{Algorithm, DecodingKey, Validation, decode, decode_header};
30use parking_lot::RwLock;
31use serde::{Deserialize, Serialize};
32
33const DEFAULT_GMAIL_BASE: &str = "https://gmail.googleapis.com";
36
37const GOOGLE_ISSUER: &str = "https://accounts.google.com";
39
40pub const GOOGLE_JWKS_URL: &str = "https://www.googleapis.com/oauth2/v3/certs";
42
43const JWKS_CACHE_TTL: Duration = Duration::from_secs(3600);
45
46#[derive(Debug, Clone)]
48pub struct GmailPushConfig {
49 pub project_id: String,
51 pub topic_name: String,
53 pub push_audience: String,
56 pub watched_label_ids: Vec<String>,
58 pub oauth_token: String,
62 pub gmail_base_url: Option<String>,
65}
66
67impl GmailPushConfig {
68 fn gmail_base(&self) -> &str {
69 self.gmail_base_url.as_deref().unwrap_or(DEFAULT_GMAIL_BASE)
70 }
71}
72
73#[derive(Default)]
75struct JwksCache {
76 fetched_at: Option<std::time::Instant>,
77 keys: Vec<JwkEntry>,
78}
79
80#[derive(Debug, Clone, Deserialize)]
82struct JwkEntry {
83 kid: String,
84 #[serde(default)]
85 alg: Option<String>,
86 n: String,
87 e: String,
88}
89
90#[derive(Debug, Deserialize)]
91struct JwksResponse {
92 keys: Vec<JwkEntry>,
93}
94
95pub struct GmailPushHandler {
97 config: GmailPushConfig,
98 http: reqwest::Client,
99 jwks: Arc<RwLock<JwksCache>>,
100 jwks_url: String,
101}
102
103impl GmailPushHandler {
104 pub fn new(config: GmailPushConfig) -> Self {
106 let http = reqwest::Client::builder()
107 .timeout(Duration::from_secs(30))
108 .build()
109 .expect("reqwest::Client default build");
110 Self {
111 config,
112 http,
113 jwks: Arc::new(RwLock::new(JwksCache::default())),
114 jwks_url: GOOGLE_JWKS_URL.to_string(),
115 }
116 }
117
118 pub fn with_http_client(mut self, http: reqwest::Client) -> Self {
121 self.http = http;
122 self
123 }
124
125 pub fn with_jwks_url(mut self, url: impl Into<String>) -> Self {
129 self.jwks_url = url.into();
130 self
131 }
132
133 pub fn push_audience(&self) -> &str {
136 &self.config.push_audience
137 }
138
139 async fn jwks(&self) -> Result<Vec<JwkEntry>> {
141 {
143 let cache = self.jwks.read();
144 if let Some(fetched) = cache.fetched_at
145 && fetched.elapsed() < JWKS_CACHE_TTL
146 && !cache.keys.is_empty()
147 {
148 return Ok(cache.keys.clone());
149 }
150 }
151
152 let resp = self
153 .http
154 .get(&self.jwks_url)
155 .send()
156 .await
157 .context("fetch Google JWKs")?;
158 if !resp.status().is_success() {
159 bail!("JWKs endpoint returned {}", resp.status());
160 }
161 let body: JwksResponse = resp.json().await.context("parse JWKs JSON")?;
162 let keys = body.keys;
163
164 {
165 let mut cache = self.jwks.write();
166 cache.fetched_at = Some(std::time::Instant::now());
167 cache.keys = keys.clone();
168 }
169 Ok(keys)
170 }
171
172 pub async fn verify_push_jwt(&self, bearer_token: &str) -> Result<VerifiedPush> {
178 let token = bearer_token.trim();
179 let token = token.strip_prefix("Bearer ").unwrap_or(token).trim();
180 if token.is_empty() {
181 bail!("empty bearer token");
182 }
183
184 let header = decode_header(token).context("decode JWT header")?;
185 if header.alg != Algorithm::RS256 {
186 bail!("unexpected JWT alg {:?}; expected RS256", header.alg);
187 }
188 let kid = header
189 .kid
190 .as_ref()
191 .ok_or_else(|| anyhow!("JWT header missing kid"))?;
192
193 let jwks = self.jwks().await?;
194 let jwk = jwks
195 .iter()
196 .find(|k| &k.kid == kid)
197 .ok_or_else(|| anyhow!("no matching JWK for kid {}", kid))?;
198 if let Some(alg) = &jwk.alg
199 && alg != "RS256"
200 {
201 bail!("JWK alg {} is not RS256", alg);
202 }
203
204 let decoding_key = DecodingKey::from_rsa_components(&jwk.n, &jwk.e)
205 .context("build decoding key from JWK")?;
206
207 let mut validation = Validation::new(Algorithm::RS256);
208 validation.set_issuer(&[GOOGLE_ISSUER]);
209 let audience = [self.config.push_audience.as_str()];
210 validation.set_audience(&audience);
211 validation.validate_exp = true;
212
213 let data = decode::<GoogleJwtClaims>(token, &decoding_key, &validation)
214 .context("verify Google push JWT")?;
215
216 Ok(VerifiedPush {
220 aud: data.claims.aud,
221 sub: data.claims.email.unwrap_or(data.claims.sub),
222 })
223 }
224
225 pub fn parse_envelope(body: &[u8]) -> Result<PushEnvelope> {
238 #[derive(Deserialize)]
239 struct Outer {
240 message: OuterMessage,
241 #[serde(default)]
242 subscription: Option<String>,
243 }
244 #[derive(Deserialize)]
245 struct OuterMessage {
246 data: String,
247 #[serde(rename = "messageId", default)]
248 message_id: Option<String>,
249 #[serde(rename = "publishTime", default)]
250 publish_time: Option<String>,
251 }
252 #[derive(Deserialize)]
253 struct Inner {
254 #[serde(rename = "emailAddress")]
255 email_address: String,
256 #[serde(rename = "historyId")]
257 history_id: serde_json::Value,
258 }
259
260 let outer: Outer =
261 serde_json::from_slice(body).context("parse Pub/Sub push envelope JSON")?;
262 let decoded = STANDARD
263 .decode(outer.message.data.as_bytes())
264 .context("base64-decode Pub/Sub message data")?;
265 let inner: Inner =
266 serde_json::from_slice(&decoded).context("parse Gmail push inner JSON")?;
267
268 let history_id = match &inner.history_id {
269 serde_json::Value::Number(n) => n
270 .as_u64()
271 .ok_or_else(|| anyhow!("historyId is not u64: {n}"))?,
272 serde_json::Value::String(s) => s
273 .parse::<u64>()
274 .with_context(|| format!("historyId string is not u64: {s}"))?,
275 other => bail!("historyId has unexpected type: {other:?}"),
276 };
277
278 let publish_time = match &outer.message.publish_time {
279 Some(ts) => DateTime::parse_from_rfc3339(ts)
280 .map(|dt| dt.with_timezone(&Utc))
281 .unwrap_or_else(|_| Utc::now()),
282 None => Utc::now(),
283 };
284
285 Ok(PushEnvelope {
286 email_address: inner.email_address,
287 history_id,
288 publish_time,
289 message_id: outer.message.message_id,
290 subscription: outer.subscription,
291 })
292 }
293
294 pub async fn fetch_new_messages(
300 &self,
301 envelope: &PushEnvelope,
302 since_history_id: u64,
303 ) -> Result<(Vec<EmailMessage>, u64)> {
304 let base = self.config.gmail_base();
305 let email = &envelope.email_address;
306
307 let mut url = format!(
310 "{base}/gmail/v1/users/{email}/history?startHistoryId={since}&historyTypes=messageAdded",
311 base = base,
312 email = urlencoding::encode(email),
313 since = since_history_id,
314 );
315 for label in &self.config.watched_label_ids {
316 url.push_str("&labelId=");
317 url.push_str(&urlencoding::encode(label));
318 }
319
320 let hist_resp = self
321 .http
322 .get(&url)
323 .bearer_auth(&self.config.oauth_token)
324 .send()
325 .await
326 .context("Gmail history.list request")?;
327
328 if hist_resp.status() == reqwest::StatusCode::TOO_MANY_REQUESTS {
329 bail!("Gmail rate-limited history.list (429)");
330 }
331 if !hist_resp.status().is_success() {
332 let status = hist_resp.status();
333 let body = hist_resp.text().await.unwrap_or_default();
334 bail!("Gmail history.list returned {status}: {body}");
335 }
336 let hist_json: serde_json::Value = hist_resp
337 .json()
338 .await
339 .context("parse Gmail history.list JSON")?;
340
341 let mut message_ids: Vec<String> = Vec::new();
342 let mut new_history_id = since_history_id;
343
344 if let Some(top_hid) = hist_json.get("historyId").and_then(|v| v.as_str())
345 && let Ok(h) = top_hid.parse::<u64>()
346 {
347 new_history_id = new_history_id.max(h);
348 }
349
350 if let Some(entries) = hist_json.get("history").and_then(|v| v.as_array()) {
351 for entry in entries {
352 if let Some(hid) = entry.get("id").and_then(|v| v.as_str())
353 && let Ok(h) = hid.parse::<u64>()
354 {
355 new_history_id = new_history_id.max(h);
356 }
357 if let Some(added) = entry.get("messagesAdded").and_then(|v| v.as_array()) {
358 for item in added {
359 if let Some(id) = item
360 .get("message")
361 .and_then(|m| m.get("id"))
362 .and_then(|v| v.as_str())
363 && !message_ids.iter().any(|x| x == id)
364 {
365 message_ids.push(id.to_string());
366 }
367 }
368 }
369 }
370 }
371
372 let mut out = Vec::with_capacity(message_ids.len());
374 for id in &message_ids {
375 let url = format!(
376 "{base}/gmail/v1/users/{email}/messages/{id}?format=full",
377 base = base,
378 email = urlencoding::encode(email),
379 id = urlencoding::encode(id),
380 );
381 let resp = self
382 .http
383 .get(&url)
384 .bearer_auth(&self.config.oauth_token)
385 .send()
386 .await
387 .context("Gmail messages.get request")?;
388 if resp.status() == reqwest::StatusCode::TOO_MANY_REQUESTS {
389 tracing::warn!(
391 email = %email,
392 message_id = %id,
393 "Gmail rate-limited messages.get (429); returning partial batch"
394 );
395 break;
396 }
397 if !resp.status().is_success() {
398 let status = resp.status();
399 let body = resp.text().await.unwrap_or_default();
400 tracing::warn!(
401 email = %email,
402 message_id = %id,
403 %status,
404 %body,
405 "Gmail messages.get failed"
406 );
407 continue;
408 }
409 let msg_json: serde_json::Value =
410 resp.json().await.context("parse Gmail messages.get JSON")?;
411 match parse_gmail_message(&msg_json) {
412 Ok(msg) => out.push(msg),
413 Err(e) => {
414 tracing::warn!(error = %e, message_id = %id, "parse Gmail message failed")
415 }
416 }
417 }
418
419 Ok((out, new_history_id))
420 }
421
422 pub async fn register_watch(&self) -> Result<WatchResponse> {
425 let base = self.config.gmail_base();
426 let url = format!("{base}/gmail/v1/users/me/watch");
427
428 let body = serde_json::json!({
429 "topicName": self.config.topic_name,
430 "labelIds": self.config.watched_label_ids,
431 });
432
433 let resp = self
434 .http
435 .post(&url)
436 .bearer_auth(&self.config.oauth_token)
437 .json(&body)
438 .send()
439 .await
440 .context("Gmail users.watch request")?;
441
442 if !resp.status().is_success() {
443 let status = resp.status();
444 let body = resp.text().await.unwrap_or_default();
445 bail!("Gmail users.watch returned {status}: {body}");
446 }
447 let resp_json: serde_json::Value = resp.json().await.context("parse users.watch JSON")?;
448 let history_id = resp_json
449 .get("historyId")
450 .and_then(|v| v.as_str())
451 .ok_or_else(|| anyhow!("users.watch response missing historyId"))?
452 .parse::<u64>()
453 .context("historyId is not u64")?;
454 let expiration_ms = resp_json
455 .get("expiration")
456 .and_then(|v| v.as_str())
457 .ok_or_else(|| anyhow!("users.watch response missing expiration"))?
458 .parse::<i64>()
459 .context("expiration is not i64")?;
460
461 let expiration = Utc
462 .timestamp_millis_opt(expiration_ms)
463 .single()
464 .ok_or_else(|| anyhow!("invalid expiration ms: {expiration_ms}"))?;
465
466 Ok(WatchResponse {
467 history_id,
468 expiration,
469 })
470 }
471}
472
473#[derive(Debug, Clone)]
475pub struct VerifiedPush {
476 pub aud: String,
478 pub sub: String,
480}
481
482#[derive(Debug, Clone)]
484pub struct PushEnvelope {
485 pub email_address: String,
487 pub history_id: u64,
489 pub publish_time: DateTime<Utc>,
491 pub message_id: Option<String>,
493 pub subscription: Option<String>,
495}
496
497#[derive(Debug, Clone, Serialize, Deserialize)]
499pub struct EmailMessage {
500 pub id: String,
502 pub thread_id: String,
504 pub from: String,
506 #[serde(default)]
508 pub to: Vec<String>,
509 #[serde(default)]
511 pub cc: Vec<String>,
512 #[serde(default)]
514 pub subject: String,
515 #[serde(default)]
517 pub body_text: String,
518 #[serde(default)]
520 pub body_html: Option<String>,
521 pub received_at: DateTime<Utc>,
523 #[serde(default)]
525 pub labels: Vec<String>,
526}
527
528#[derive(Debug, Clone, Serialize, Deserialize)]
530pub struct WatchResponse {
531 pub history_id: u64,
534 pub expiration: DateTime<Utc>,
536}
537
538#[derive(Debug, Deserialize)]
541struct GoogleJwtClaims {
542 aud: String,
543 #[allow(dead_code)]
544 iss: String,
545 sub: String,
546 #[serde(default)]
547 email: Option<String>,
548 #[allow(dead_code)]
549 #[serde(default)]
550 exp: Option<i64>,
551}
552
553fn parse_gmail_message(json: &serde_json::Value) -> Result<EmailMessage> {
554 let id = json
555 .get("id")
556 .and_then(|v| v.as_str())
557 .ok_or_else(|| anyhow!("message missing id"))?
558 .to_string();
559 let thread_id = json
560 .get("threadId")
561 .and_then(|v| v.as_str())
562 .unwrap_or("")
563 .to_string();
564 let labels: Vec<String> = json
565 .get("labelIds")
566 .and_then(|v| v.as_array())
567 .map(|a| {
568 a.iter()
569 .filter_map(|v| v.as_str().map(|s| s.to_string()))
570 .collect()
571 })
572 .unwrap_or_default();
573
574 let payload = json
575 .get("payload")
576 .ok_or_else(|| anyhow!("message missing payload"))?;
577
578 let mut from = String::new();
579 let mut subject = String::new();
580 let mut to: Vec<String> = Vec::new();
581 let mut cc: Vec<String> = Vec::new();
582
583 if let Some(headers) = payload.get("headers").and_then(|v| v.as_array()) {
584 for h in headers {
585 let name = h.get("name").and_then(|v| v.as_str()).unwrap_or("");
586 let value = h.get("value").and_then(|v| v.as_str()).unwrap_or("");
587 match name.to_ascii_lowercase().as_str() {
588 "from" => from = value.to_string(),
589 "subject" => subject = value.to_string(),
590 "to" => to = split_addresses(value),
591 "cc" => cc = split_addresses(value),
592 _ => {}
593 }
594 }
595 }
596
597 let (body_text, body_html) = extract_bodies(payload);
598
599 let received_at = json
600 .get("internalDate")
601 .and_then(|v| v.as_str())
602 .and_then(|s| s.parse::<i64>().ok())
603 .and_then(|ms| Utc.timestamp_millis_opt(ms).single())
604 .unwrap_or_else(Utc::now);
605
606 Ok(EmailMessage {
607 id,
608 thread_id,
609 from,
610 to,
611 cc,
612 subject,
613 body_text,
614 body_html,
615 received_at,
616 labels,
617 })
618}
619
620fn split_addresses(s: &str) -> Vec<String> {
621 s.split(',')
622 .map(|p| p.trim())
623 .filter(|p| !p.is_empty())
624 .map(|p| p.to_string())
625 .collect()
626}
627
628fn extract_bodies(payload: &serde_json::Value) -> (String, Option<String>) {
631 let mut text: Option<String> = None;
632 let mut html: Option<String> = None;
633 walk_parts(payload, &mut text, &mut html);
634 (text.unwrap_or_default(), html)
635}
636
637fn walk_parts(part: &serde_json::Value, text: &mut Option<String>, html: &mut Option<String>) {
638 let mime = part.get("mimeType").and_then(|v| v.as_str()).unwrap_or("");
639 let body = part.get("body");
640 let data_b64 = body
641 .and_then(|b| b.get("data"))
642 .and_then(|v| v.as_str())
643 .filter(|s| !s.is_empty());
644
645 if let Some(b64) = data_b64
646 && let Ok(bytes) = URL_SAFE_NO_PAD.decode(b64.trim_end_matches('='))
648 && let Ok(s) = String::from_utf8(bytes)
649 {
650 match mime {
651 "text/plain" if text.is_none() => *text = Some(s),
652 "text/html" if html.is_none() => *html = Some(s),
653 _ => {}
654 }
655 }
656
657 if let Some(parts) = part.get("parts").and_then(|v| v.as_array()) {
658 for p in parts {
659 walk_parts(p, text, html);
660 if text.is_some() && html.is_some() {
661 return;
662 }
663 }
664 }
665}
666
667#[cfg(test)]
668mod tests {
669 use super::*;
670
671 #[test]
672 fn parse_envelope_roundtrip() {
673 let inner = serde_json::json!({
674 "emailAddress": "alice@example.com",
675 "historyId": 12345u64,
676 });
677 let encoded = STANDARD.encode(serde_json::to_vec(&inner).unwrap());
678 let outer = serde_json::json!({
679 "message": {
680 "data": encoded,
681 "messageId": "pubsub-abc",
682 "publishTime": "2025-01-01T00:00:00Z",
683 },
684 "subscription": "projects/p/subscriptions/s",
685 });
686 let body = serde_json::to_vec(&outer).unwrap();
687 let env = GmailPushHandler::parse_envelope(&body).unwrap();
688 assert_eq!(env.email_address, "alice@example.com");
689 assert_eq!(env.history_id, 12345);
690 assert_eq!(env.message_id.as_deref(), Some("pubsub-abc"));
691 assert_eq!(
692 env.subscription.as_deref(),
693 Some("projects/p/subscriptions/s")
694 );
695 }
696
697 #[test]
698 fn parse_envelope_accepts_string_history_id() {
699 let inner = serde_json::json!({
703 "emailAddress": "bob@example.com",
704 "historyId": "99999999999999",
705 });
706 let encoded = STANDARD.encode(serde_json::to_vec(&inner).unwrap());
707 let outer = serde_json::json!({
708 "message": { "data": encoded },
709 });
710 let body = serde_json::to_vec(&outer).unwrap();
711 let env = GmailPushHandler::parse_envelope(&body).unwrap();
712 assert_eq!(env.email_address, "bob@example.com");
713 assert_eq!(env.history_id, 99_999_999_999_999u64);
714 }
715
716 #[test]
717 fn parse_gmail_message_plain_text() {
718 let body_data = URL_SAFE_NO_PAD.encode(b"hello world");
719 let payload = serde_json::json!({
720 "id": "m-1",
721 "threadId": "t-1",
722 "labelIds": ["INBOX", "UNREAD"],
723 "internalDate": "1700000000000",
724 "payload": {
725 "mimeType": "text/plain",
726 "headers": [
727 { "name": "From", "value": "alice@example.com" },
728 { "name": "To", "value": "bob@example.com, carol@example.com" },
729 { "name": "Subject", "value": "hi" },
730 ],
731 "body": { "data": body_data },
732 }
733 });
734 let msg = parse_gmail_message(&payload).unwrap();
735 assert_eq!(msg.id, "m-1");
736 assert_eq!(msg.thread_id, "t-1");
737 assert_eq!(msg.from, "alice@example.com");
738 assert_eq!(msg.to.len(), 2);
739 assert_eq!(msg.subject, "hi");
740 assert_eq!(msg.body_text, "hello world");
741 assert!(msg.body_html.is_none());
742 assert_eq!(msg.labels, vec!["INBOX", "UNREAD"]);
743 }
744
745 #[test]
746 fn parse_gmail_message_multipart() {
747 let text_data = URL_SAFE_NO_PAD.encode(b"plain body");
748 let html_data = URL_SAFE_NO_PAD.encode(b"<p>html body</p>");
749 let payload = serde_json::json!({
750 "id": "m-2",
751 "threadId": "t-2",
752 "internalDate": "1700000000000",
753 "payload": {
754 "mimeType": "multipart/alternative",
755 "headers": [
756 { "name": "From", "value": "dave@example.com" },
757 { "name": "Subject", "value": "multi" },
758 ],
759 "parts": [
760 {
761 "mimeType": "text/plain",
762 "body": { "data": text_data },
763 },
764 {
765 "mimeType": "text/html",
766 "body": { "data": html_data },
767 }
768 ]
769 }
770 });
771 let msg = parse_gmail_message(&payload).unwrap();
772 assert_eq!(msg.body_text, "plain body");
773 assert_eq!(msg.body_html.as_deref(), Some("<p>html body</p>"));
774 }
775
776 #[test]
777 fn extract_bodies_short_circuits_on_complete() {
778 let text_data = URL_SAFE_NO_PAD.encode(b"T");
779 let html_data = URL_SAFE_NO_PAD.encode(b"H");
780 let payload = serde_json::json!({
781 "mimeType": "multipart/mixed",
782 "parts": [
783 {
784 "mimeType": "multipart/alternative",
785 "parts": [
786 { "mimeType": "text/plain", "body": { "data": text_data } },
787 { "mimeType": "text/html", "body": { "data": html_data } },
788 ]
789 }
790 ]
791 });
792 let (t, h) = extract_bodies(&payload);
793 assert_eq!(t, "T");
794 assert_eq!(h.as_deref(), Some("H"));
795 }
796}