nexo_poller_google_calendar/
lib.rs1use std::path::PathBuf;
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use chrono::Utc;
20use dashmap::DashMap;
21use serde::Deserialize;
22use serde_json::{json, Value};
23
24use nexo_microapp_sdk::poller::{PollerHandler, TickRequest};
25use nexo_plugin_google::GoogleAuthClient;
26use nexo_poller::{PollerError, PollerHost, TickAck, TickMetrics};
27
28#[derive(Debug, Deserialize, Clone)]
29#[serde(deny_unknown_fields)]
30pub struct CalendarJobConfig {
31 #[serde(default = "default_calendar_id")]
34 pub calendar_id: String,
35 #[serde(default = "default_template")]
38 pub message_template: String,
39 #[serde(default = "default_skip_cancelled")]
41 pub skip_cancelled: bool,
42 pub deliver: DeliverCfg,
43}
44
45#[derive(Debug, Deserialize, Clone)]
46#[serde(deny_unknown_fields)]
47pub struct DeliverCfg {
48 pub channel: String,
49 #[serde(alias = "recipient")]
50 pub to: String,
51}
52
53fn default_calendar_id() -> String {
54 "primary".into()
55}
56fn default_skip_cancelled() -> bool {
57 true
58}
59fn default_template() -> String {
60 "📅 {summary} — {start}\n{html_link}".to_string()
61}
62
63pub struct GoogleCalendarHandler {
66 clients: DashMap<String, Arc<GoogleAuthClient>>,
67}
68
69impl GoogleCalendarHandler {
70 pub fn new() -> Self {
71 Self {
72 clients: DashMap::new(),
73 }
74 }
75}
76
77impl Default for GoogleCalendarHandler {
78 fn default() -> Self {
79 Self::new()
80 }
81}
82
83#[derive(Debug, Deserialize)]
84struct GoogleAccountCreds {
85 account_id: String,
86 client_id_path: String,
87 client_secret_path: String,
88 token_path: String,
89 #[serde(default)]
90 scopes: Vec<String>,
91}
92
93#[async_trait]
94impl PollerHandler for GoogleCalendarHandler {
95 async fn tick(
96 &self,
97 req: TickRequest,
98 host: Arc<dyn PollerHost>,
99 ) -> Result<TickAck, PollerError> {
100 let cfg: CalendarJobConfig =
101 serde_json::from_value(req.config.clone()).map_err(|e| PollerError::Config {
102 job: req.job_id.clone(),
103 reason: e.to_string(),
104 })?;
105
106 let cred_value = host
107 .credentials_get("google".into())
108 .await
109 .map_err(|e| PollerError::Permanent(anyhow::anyhow!("credentials_get: {e}")))?;
110 let cred: GoogleAccountCreds =
111 serde_json::from_value(cred_value).map_err(|e| PollerError::Permanent(anyhow::anyhow!(
112 "credentials_get returned unexpected shape: {e}"
113 )))?;
114
115 let client = self.build_client(&cred).await?;
116
117 let sync_token = req
118 .cursor_bytes()?
119 .and_then(|b| String::from_utf8(b).ok());
120
121 let mut url = format!(
122 "https://www.googleapis.com/calendar/v3/calendars/{}/events?singleEvents=true&maxResults=250",
123 urlencode(&cfg.calendar_id)
124 );
125 if let Some(t) = sync_token.as_deref() {
126 url.push_str("&syncToken=");
127 url.push_str(&urlencode(t));
128 } else {
129 url.push_str("&timeMin=");
131 url.push_str(&urlencode(&Utc::now().to_rfc3339()));
132 }
133
134 let resp: Value = client
135 .authorized_call("GET", &url, None)
136 .await
137 .map_err(classify_calendar_err)?;
138
139 let next_sync = resp
140 .get("nextSyncToken")
141 .and_then(Value::as_str)
142 .map(str::to_string);
143
144 let events = resp
145 .get("items")
146 .and_then(Value::as_array)
147 .cloned()
148 .unwrap_or_default();
149 let items_seen = events.len() as u32;
150
151 let target_cred = host
153 .credentials_get(cfg.deliver.channel.clone())
154 .await
155 .map_err(|e| PollerError::Permanent(anyhow::anyhow!("credentials_get outbound: {e}")))?;
156 let target_account = target_cred
157 .get("account_id")
158 .and_then(|v| v.as_str())
159 .ok_or_else(|| {
160 PollerError::Permanent(anyhow::anyhow!(
161 "outbound credentials_get('{}') missing account_id",
162 cfg.deliver.channel
163 ))
164 })?
165 .to_string();
166 let topic = format!("plugin.outbound.{}.{}", cfg.deliver.channel, target_account);
167
168 let mut items_dispatched = 0u32;
169 if sync_token.is_some() {
173 for ev in &events {
174 if cfg.skip_cancelled
175 && ev.get("status").and_then(Value::as_str) == Some("cancelled")
176 {
177 continue;
178 }
179 let text = render_event_template(&cfg.message_template, ev);
180 let payload = json!({ "to": cfg.deliver.to, "text": text });
181 let payload_bytes = serde_json::to_vec(&payload)
182 .map_err(|e| PollerError::Transient(anyhow::Error::from(e)))?;
183 host.broker_publish(topic.clone(), payload_bytes)
184 .await
185 .map_err(|e| PollerError::Transient(anyhow::anyhow!("broker_publish: {e}")))?;
186 items_dispatched += 1;
187 }
188 }
189
190 let cursor_bytes = next_sync.map(|t| t.into_bytes());
191 Ok(TickAck {
192 next_cursor: cursor_bytes,
193 next_interval_hint: None,
194 metrics: Some(TickMetrics {
195 items_seen,
196 items_dispatched,
197 }),
198 })
199 }
200}
201
202impl GoogleCalendarHandler {
203 async fn build_client(
204 &self,
205 cred: &GoogleAccountCreds,
206 ) -> Result<Arc<GoogleAuthClient>, PollerError> {
207 if let Some(c) = self.clients.get(&cred.account_id) {
208 return Ok(c.clone());
209 }
210 let cid = std::fs::read_to_string(&cred.client_id_path)
211 .map(|s| s.trim().to_string())
212 .map_err(|e| {
213 PollerError::Transient(anyhow::Error::from(e).context("read client_id_path"))
214 })?;
215 let cs = std::fs::read_to_string(&cred.client_secret_path)
216 .map(|s| s.trim().to_string())
217 .map_err(|e| {
218 PollerError::Transient(anyhow::Error::from(e).context("read client_secret_path"))
219 })?;
220 let auth_cfg = nexo_plugin_google::GoogleAuthConfig {
221 client_id: cid,
222 client_secret: cs,
223 scopes: cred.scopes.clone(),
224 token_file: cred.token_path.clone(),
225 redirect_port: 0,
226 };
227 let token_path = PathBuf::from(&cred.token_path);
228 let workspace = token_path
229 .parent()
230 .map(|p| p.to_path_buf())
231 .unwrap_or_else(|| std::path::PathBuf::from("."));
232 let client = GoogleAuthClient::new_with_sources(
233 auth_cfg,
234 &workspace,
235 Some(nexo_plugin_google::SecretSources {
236 client_id_path: PathBuf::from(&cred.client_id_path),
237 client_secret_path: PathBuf::from(&cred.client_secret_path),
238 }),
239 );
240 client
241 .load_from_disk()
242 .await
243 .map_err(|e| PollerError::Permanent(e.context("calendar: load_from_disk")))?;
244 self.clients.insert(cred.account_id.clone(), client.clone());
245 Ok(client)
246 }
247}
248
249fn render_event_template(template: &str, ev: &Value) -> String {
250 let summary = ev
251 .get("summary")
252 .and_then(Value::as_str)
253 .unwrap_or("(no title)");
254 let start = ev
255 .get("start")
256 .and_then(|s| s.get("dateTime").or_else(|| s.get("date")))
257 .and_then(Value::as_str)
258 .unwrap_or("");
259 let end = ev
260 .get("end")
261 .and_then(|s| s.get("dateTime").or_else(|| s.get("date")))
262 .and_then(Value::as_str)
263 .unwrap_or("");
264 let location = ev.get("location").and_then(Value::as_str).unwrap_or("");
265 let status = ev.get("status").and_then(Value::as_str).unwrap_or("");
266 let html_link = ev.get("htmlLink").and_then(Value::as_str).unwrap_or("");
267 template
268 .replace("{summary}", summary)
269 .replace("{start}", start)
270 .replace("{end}", end)
271 .replace("{location}", location)
272 .replace("{status}", status)
273 .replace("{html_link}", html_link)
274}
275
276fn classify_calendar_err(err: anyhow::Error) -> PollerError {
277 let m = err.to_string();
278 if m.contains("410") || m.contains("Gone") || m.contains("invalid_grant") {
279 PollerError::Permanent(err.context("calendar"))
280 } else if m.contains("401") || m.contains("403") {
281 PollerError::Permanent(err.context("calendar: auth"))
282 } else {
283 PollerError::Transient(err.context("calendar"))
284 }
285}
286
287fn urlencode(s: &str) -> String {
288 let mut out = String::with_capacity(s.len());
289 for ch in s.chars() {
290 if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_' | '.' | '~') {
291 out.push(ch);
292 } else {
293 for b in ch.to_string().as_bytes() {
294 out.push_str(&format!("%{:02X}", b));
295 }
296 }
297 }
298 out
299}
300
301#[cfg(test)]
302mod tests {
303 use super::*;
304
305 #[test]
306 fn parses_minimal_config() {
307 let cfg: CalendarJobConfig = serde_json::from_value(json!({
308 "deliver": { "channel": "whatsapp", "to": "+57300" },
309 }))
310 .unwrap();
311 assert_eq!(cfg.calendar_id, "primary");
312 assert!(cfg.skip_cancelled);
313 }
314
315 #[test]
316 fn config_accepts_recipient_alias() {
317 let cfg: CalendarJobConfig = serde_json::from_value(json!({
318 "deliver": { "channel": "telegram", "recipient": "-100" },
319 }))
320 .unwrap();
321 assert_eq!(cfg.deliver.to, "-100");
322 }
323
324 #[test]
325 fn render_substitutes_event_fields() {
326 let ev = json!({
327 "summary": "Standup",
328 "start": { "dateTime": "2026-05-17T10:00:00Z" },
329 "end": { "dateTime": "2026-05-17T10:30:00Z" },
330 "location": "Zoom",
331 "status": "confirmed",
332 "htmlLink": "https://cal.google/event/abc"
333 });
334 let text = render_event_template(
335 "{summary} @ {start} ({location}) — {html_link}",
336 &ev,
337 );
338 assert_eq!(
339 text,
340 "Standup @ 2026-05-17T10:00:00Z (Zoom) — https://cal.google/event/abc"
341 );
342 }
343
344 #[test]
345 fn render_falls_back_to_date_field_for_all_day_events() {
346 let ev = json!({
347 "summary": "Holiday",
348 "start": { "date": "2026-12-25" },
349 "end": { "date": "2026-12-26" },
350 });
351 let text = render_event_template("{summary} on {start}", &ev);
352 assert_eq!(text, "Holiday on 2026-12-25");
353 }
354
355 #[test]
356 fn urlencode_preserves_safe_chars_and_pcts_the_rest() {
357 assert_eq!(urlencode("primary"), "primary");
358 assert_eq!(urlencode("a b"), "a%20b");
359 assert_eq!(urlencode("name@host.com"), "name%40host.com");
360 assert_eq!(urlencode("AB-_.~12"), "AB-_.~12");
361 }
362
363 #[test]
364 fn classify_410_as_permanent() {
365 let err = anyhow::anyhow!("HTTP 410 Gone");
366 assert!(matches!(
367 classify_calendar_err(err),
368 PollerError::Permanent(_)
369 ));
370 }
371
372 #[test]
373 fn classify_invalid_grant_as_permanent() {
374 let err = anyhow::anyhow!("invalid_grant: token expired");
375 assert!(matches!(
376 classify_calendar_err(err),
377 PollerError::Permanent(_)
378 ));
379 }
380
381 #[test]
382 fn classify_500_as_transient() {
383 let err = anyhow::anyhow!("HTTP 500 server error");
384 assert!(matches!(
385 classify_calendar_err(err),
386 PollerError::Transient(_)
387 ));
388 }
389
390 #[test]
391 fn classify_401_as_permanent_auth() {
392 let err = anyhow::anyhow!("HTTP 401 unauthorized");
393 assert!(matches!(
394 classify_calendar_err(err),
395 PollerError::Permanent(_)
396 ));
397 }
398}