Skip to main content

nexo_poller_google_calendar/
lib.rs

1//! Google Calendar v3 events incremental sync poller.
2//!
3//! Cursor stores the `nextSyncToken` Google returns. First tick
4//! fetches a window with `timeMin = now`, captures the
5//! `nextSyncToken`, dispatches nothing. Subsequent ticks pass
6//! `syncToken = <cursor>` and dispatch only the diff. Token expiry
7//! (HTTP 410 / invalid_grant) → `Permanent` error so the operator
8//! runs `agent pollers reset <id>` to re-baseline.
9//!
10//! Ported from `nexo-poller::builtins::google_calendar` (V1) during
11//! Phase 96. OAuth client + token refresh happen inside this
12//! subprocess; the daemon hands credential file paths over via
13//! `host.credentials_get("google")` (Phase 96.7 reverse-RPC).
14
15use std::path::PathBuf;
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use chrono::Utc;
20use dashmap::DashMap;
21use serde::Deserialize;
22use serde_json::{json, Value};
23
24use nexo_microapp_sdk::poller::{PollerHandler, TickRequest};
25use nexo_plugin_google::GoogleAuthClient;
26use nexo_poller::{PollerError, PollerHost, TickAck, TickMetrics};
27
28#[derive(Debug, Deserialize, Clone)]
29#[serde(deny_unknown_fields)]
30pub struct CalendarJobConfig {
31    /// Calendar id. `"primary"` resolves to the agent's primary
32    /// calendar.
33    #[serde(default = "default_calendar_id")]
34    pub calendar_id: String,
35    /// Mustache-light template. Fields: `{summary}`, `{start}`,
36    /// `{end}`, `{location}`, `{status}`, `{html_link}`.
37    #[serde(default = "default_template")]
38    pub message_template: String,
39    /// Skip events whose `status` is `"cancelled"`. Default true.
40    #[serde(default = "default_skip_cancelled")]
41    pub skip_cancelled: bool,
42    pub deliver: DeliverCfg,
43}
44
45#[derive(Debug, Deserialize, Clone)]
46#[serde(deny_unknown_fields)]
47pub struct DeliverCfg {
48    pub channel: String,
49    #[serde(alias = "recipient")]
50    pub to: String,
51}
52
53fn default_calendar_id() -> String {
54    "primary".into()
55}
56fn default_skip_cancelled() -> bool {
57    true
58}
59fn default_template() -> String {
60    "📅 {summary} — {start}\n{html_link}".to_string()
61}
62
63/// One Google OAuth client per account_id. Shared across ticks for
64/// the same agent so token refreshes amortise.
65pub struct GoogleCalendarHandler {
66    clients: DashMap<String, Arc<GoogleAuthClient>>,
67}
68
69impl GoogleCalendarHandler {
70    pub fn new() -> Self {
71        Self {
72            clients: DashMap::new(),
73        }
74    }
75}
76
77impl Default for GoogleCalendarHandler {
78    fn default() -> Self {
79        Self::new()
80    }
81}
82
83#[derive(Debug, Deserialize)]
84struct GoogleAccountCreds {
85    account_id: String,
86    client_id_path: String,
87    client_secret_path: String,
88    token_path: String,
89    #[serde(default)]
90    scopes: Vec<String>,
91}
92
93#[async_trait]
94impl PollerHandler for GoogleCalendarHandler {
95    async fn tick(
96        &self,
97        req: TickRequest,
98        host: Arc<dyn PollerHost>,
99    ) -> Result<TickAck, PollerError> {
100        let cfg: CalendarJobConfig =
101            serde_json::from_value(req.config.clone()).map_err(|e| PollerError::Config {
102                job: req.job_id.clone(),
103                reason: e.to_string(),
104            })?;
105
106        let cred_value = host
107            .credentials_get("google".into())
108            .await
109            .map_err(|e| PollerError::Permanent(anyhow::anyhow!("credentials_get: {e}")))?;
110        let cred: GoogleAccountCreds =
111            serde_json::from_value(cred_value).map_err(|e| PollerError::Permanent(anyhow::anyhow!(
112                "credentials_get returned unexpected shape: {e}"
113            )))?;
114
115        let client = self.build_client(&cred).await?;
116
117        let sync_token = req
118            .cursor_bytes()?
119            .and_then(|b| String::from_utf8(b).ok());
120
121        let mut url = format!(
122            "https://www.googleapis.com/calendar/v3/calendars/{}/events?singleEvents=true&maxResults=250",
123            urlencode(&cfg.calendar_id)
124        );
125        if let Some(t) = sync_token.as_deref() {
126            url.push_str("&syncToken=");
127            url.push_str(&urlencode(t));
128        } else {
129            // First tick: only future events. Avoid back-fill of years.
130            url.push_str("&timeMin=");
131            url.push_str(&urlencode(&Utc::now().to_rfc3339()));
132        }
133
134        let resp: Value = client
135            .authorized_call("GET", &url, None)
136            .await
137            .map_err(classify_calendar_err)?;
138
139        let next_sync = resp
140            .get("nextSyncToken")
141            .and_then(Value::as_str)
142            .map(str::to_string);
143
144        let events = resp
145            .get("items")
146            .and_then(Value::as_array)
147            .cloned()
148            .unwrap_or_default();
149        let items_seen = events.len() as u32;
150
151        // Resolve outbound topic.
152        let target_cred = host
153            .credentials_get(cfg.deliver.channel.clone())
154            .await
155            .map_err(|e| PollerError::Permanent(anyhow::anyhow!("credentials_get outbound: {e}")))?;
156        let target_account = target_cred
157            .get("account_id")
158            .and_then(|v| v.as_str())
159            .ok_or_else(|| {
160                PollerError::Permanent(anyhow::anyhow!(
161                    "outbound credentials_get('{}') missing account_id",
162                    cfg.deliver.channel
163                ))
164            })?
165            .to_string();
166        let topic = format!("plugin.outbound.{}.{}", cfg.deliver.channel, target_account);
167
168        let mut items_dispatched = 0u32;
169        // Don't dispatch anything on the very first tick — we just
170        // want to capture nextSyncToken so subsequent ticks see
171        // incrementals.
172        if sync_token.is_some() {
173            for ev in &events {
174                if cfg.skip_cancelled
175                    && ev.get("status").and_then(Value::as_str) == Some("cancelled")
176                {
177                    continue;
178                }
179                let text = render_event_template(&cfg.message_template, ev);
180                let payload = json!({ "to": cfg.deliver.to, "text": text });
181                let payload_bytes = serde_json::to_vec(&payload)
182                    .map_err(|e| PollerError::Transient(anyhow::Error::from(e)))?;
183                host.broker_publish(topic.clone(), payload_bytes)
184                    .await
185                    .map_err(|e| PollerError::Transient(anyhow::anyhow!("broker_publish: {e}")))?;
186                items_dispatched += 1;
187            }
188        }
189
190        let cursor_bytes = next_sync.map(|t| t.into_bytes());
191        Ok(TickAck {
192            next_cursor: cursor_bytes,
193            next_interval_hint: None,
194            metrics: Some(TickMetrics {
195                items_seen,
196                items_dispatched,
197            }),
198        })
199    }
200}
201
202impl GoogleCalendarHandler {
203    async fn build_client(
204        &self,
205        cred: &GoogleAccountCreds,
206    ) -> Result<Arc<GoogleAuthClient>, PollerError> {
207        if let Some(c) = self.clients.get(&cred.account_id) {
208            return Ok(c.clone());
209        }
210        let cid = std::fs::read_to_string(&cred.client_id_path)
211            .map(|s| s.trim().to_string())
212            .map_err(|e| {
213                PollerError::Transient(anyhow::Error::from(e).context("read client_id_path"))
214            })?;
215        let cs = std::fs::read_to_string(&cred.client_secret_path)
216            .map(|s| s.trim().to_string())
217            .map_err(|e| {
218                PollerError::Transient(anyhow::Error::from(e).context("read client_secret_path"))
219            })?;
220        let auth_cfg = nexo_plugin_google::GoogleAuthConfig {
221            client_id: cid,
222            client_secret: cs,
223            scopes: cred.scopes.clone(),
224            token_file: cred.token_path.clone(),
225            redirect_port: 0,
226        };
227        let token_path = PathBuf::from(&cred.token_path);
228        let workspace = token_path
229            .parent()
230            .map(|p| p.to_path_buf())
231            .unwrap_or_else(|| std::path::PathBuf::from("."));
232        let client = GoogleAuthClient::new_with_sources(
233            auth_cfg,
234            &workspace,
235            Some(nexo_plugin_google::SecretSources {
236                client_id_path: PathBuf::from(&cred.client_id_path),
237                client_secret_path: PathBuf::from(&cred.client_secret_path),
238            }),
239        );
240        client
241            .load_from_disk()
242            .await
243            .map_err(|e| PollerError::Permanent(e.context("calendar: load_from_disk")))?;
244        self.clients.insert(cred.account_id.clone(), client.clone());
245        Ok(client)
246    }
247}
248
249fn render_event_template(template: &str, ev: &Value) -> String {
250    let summary = ev
251        .get("summary")
252        .and_then(Value::as_str)
253        .unwrap_or("(no title)");
254    let start = ev
255        .get("start")
256        .and_then(|s| s.get("dateTime").or_else(|| s.get("date")))
257        .and_then(Value::as_str)
258        .unwrap_or("");
259    let end = ev
260        .get("end")
261        .and_then(|s| s.get("dateTime").or_else(|| s.get("date")))
262        .and_then(Value::as_str)
263        .unwrap_or("");
264    let location = ev.get("location").and_then(Value::as_str).unwrap_or("");
265    let status = ev.get("status").and_then(Value::as_str).unwrap_or("");
266    let html_link = ev.get("htmlLink").and_then(Value::as_str).unwrap_or("");
267    template
268        .replace("{summary}", summary)
269        .replace("{start}", start)
270        .replace("{end}", end)
271        .replace("{location}", location)
272        .replace("{status}", status)
273        .replace("{html_link}", html_link)
274}
275
276fn classify_calendar_err(err: anyhow::Error) -> PollerError {
277    let m = err.to_string();
278    if m.contains("410") || m.contains("Gone") || m.contains("invalid_grant") {
279        PollerError::Permanent(err.context("calendar"))
280    } else if m.contains("401") || m.contains("403") {
281        PollerError::Permanent(err.context("calendar: auth"))
282    } else {
283        PollerError::Transient(err.context("calendar"))
284    }
285}
286
287fn urlencode(s: &str) -> String {
288    let mut out = String::with_capacity(s.len());
289    for ch in s.chars() {
290        if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_' | '.' | '~') {
291            out.push(ch);
292        } else {
293            for b in ch.to_string().as_bytes() {
294                out.push_str(&format!("%{:02X}", b));
295            }
296        }
297    }
298    out
299}
300
301#[cfg(test)]
302mod tests {
303    use super::*;
304
305    #[test]
306    fn parses_minimal_config() {
307        let cfg: CalendarJobConfig = serde_json::from_value(json!({
308            "deliver": { "channel": "whatsapp", "to": "+57300" },
309        }))
310        .unwrap();
311        assert_eq!(cfg.calendar_id, "primary");
312        assert!(cfg.skip_cancelled);
313    }
314
315    #[test]
316    fn config_accepts_recipient_alias() {
317        let cfg: CalendarJobConfig = serde_json::from_value(json!({
318            "deliver": { "channel": "telegram", "recipient": "-100" },
319        }))
320        .unwrap();
321        assert_eq!(cfg.deliver.to, "-100");
322    }
323
324    #[test]
325    fn render_substitutes_event_fields() {
326        let ev = json!({
327            "summary": "Standup",
328            "start": { "dateTime": "2026-05-17T10:00:00Z" },
329            "end":   { "dateTime": "2026-05-17T10:30:00Z" },
330            "location": "Zoom",
331            "status": "confirmed",
332            "htmlLink": "https://cal.google/event/abc"
333        });
334        let text = render_event_template(
335            "{summary} @ {start} ({location}) — {html_link}",
336            &ev,
337        );
338        assert_eq!(
339            text,
340            "Standup @ 2026-05-17T10:00:00Z (Zoom) — https://cal.google/event/abc"
341        );
342    }
343
344    #[test]
345    fn render_falls_back_to_date_field_for_all_day_events() {
346        let ev = json!({
347            "summary": "Holiday",
348            "start": { "date": "2026-12-25" },
349            "end":   { "date": "2026-12-26" },
350        });
351        let text = render_event_template("{summary} on {start}", &ev);
352        assert_eq!(text, "Holiday on 2026-12-25");
353    }
354
355    #[test]
356    fn urlencode_preserves_safe_chars_and_pcts_the_rest() {
357        assert_eq!(urlencode("primary"), "primary");
358        assert_eq!(urlencode("a b"), "a%20b");
359        assert_eq!(urlencode("name@host.com"), "name%40host.com");
360        assert_eq!(urlencode("AB-_.~12"), "AB-_.~12");
361    }
362
363    #[test]
364    fn classify_410_as_permanent() {
365        let err = anyhow::anyhow!("HTTP 410 Gone");
366        assert!(matches!(
367            classify_calendar_err(err),
368            PollerError::Permanent(_)
369        ));
370    }
371
372    #[test]
373    fn classify_invalid_grant_as_permanent() {
374        let err = anyhow::anyhow!("invalid_grant: token expired");
375        assert!(matches!(
376            classify_calendar_err(err),
377            PollerError::Permanent(_)
378        ));
379    }
380
381    #[test]
382    fn classify_500_as_transient() {
383        let err = anyhow::anyhow!("HTTP 500 server error");
384        assert!(matches!(
385            classify_calendar_err(err),
386            PollerError::Transient(_)
387        ));
388    }
389
390    #[test]
391    fn classify_401_as_permanent_auth() {
392        let err = anyhow::anyhow!("HTTP 401 unauthorized");
393        assert!(matches!(
394            classify_calendar_err(err),
395            PollerError::Permanent(_)
396        ));
397    }
398}