Skip to main content

chat_system/messengers/
google_chat.rs

1//! Google Chat messenger — Incoming Webhook and Google Chat API implementation.
2
3use crate::message::MessageType;
4use crate::{Message, Messenger};
5use anyhow::{Context, Result};
6use async_trait::async_trait;
7use chrono::DateTime;
8use reqwest::Client;
9use serde_json::{Value, json};
10use tokio::sync::Mutex;
11
12pub struct GoogleChatMessenger {
13    name: String,
14    mode: GoogleChatMode,
15    client: Client,
16    connected: bool,
17}
18
19enum GoogleChatMode {
20    Webhook {
21        webhook_url: String,
22    },
23    Api {
24        token: String,
25        space_id: String,
26        spaces: Vec<String>,
27        api_base_url: String,
28        last_seen_message_name: Mutex<Option<String>>,
29    },
30    ServiceAccount {
31        /// Path to service account JSON file; retained for future full auth implementation.
32        #[allow(dead_code)]
33        credentials_path: String,
34        spaces: Vec<String>,
35        api_base_url: String,
36        /// Cached access token from service account auth
37        access_token: Mutex<Option<String>>,
38        last_seen_message_name: Mutex<Option<String>>,
39    },
40}
41
42impl GoogleChatMessenger {
43    pub fn new(name: impl Into<String>, webhook_url: impl Into<String>) -> Self {
44        Self {
45            name: name.into(),
46            mode: GoogleChatMode::Webhook {
47                webhook_url: webhook_url.into(),
48            },
49            client: Client::new(),
50            connected: false,
51        }
52    }
53
54    pub fn new_api(
55        name: impl Into<String>,
56        token: impl Into<String>,
57        space_id: impl Into<String>,
58    ) -> Self {
59        let space = space_id.into();
60        Self {
61            name: name.into(),
62            mode: GoogleChatMode::Api {
63                token: token.into(),
64                space_id: space.clone(),
65                spaces: vec![space],
66                api_base_url: "https://chat.googleapis.com/v1".to_string(),
67                last_seen_message_name: Mutex::new(None),
68            },
69            client: Client::new(),
70            connected: false,
71        }
72    }
73
74    /// Create a Google Chat messenger using a service account credentials file.
75    ///
76    /// The credentials file should be a JSON file downloaded from the Google Cloud Console
77    /// containing the service account's private key and email.
78    ///
79    /// # Arguments
80    /// * `name` - Messenger instance name
81    /// * `credentials_path` - Path to the service account JSON credentials file
82    /// * `spaces` - List of space IDs to monitor (e.g., ["spaces/ABC123", "spaces/DEF456"])
83    pub fn with_credentials(
84        name: impl Into<String>,
85        credentials_path: impl Into<String>,
86        spaces: Vec<impl Into<String>>,
87    ) -> Self {
88        Self {
89            name: name.into(),
90            mode: GoogleChatMode::ServiceAccount {
91                credentials_path: credentials_path.into(),
92                spaces: spaces.into_iter().map(|s| s.into()).collect(),
93                api_base_url: "https://chat.googleapis.com/v1".to_string(),
94                access_token: Mutex::new(None),
95                last_seen_message_name: Mutex::new(None),
96            },
97            client: Client::new(),
98            connected: false,
99        }
100    }
101
102    /// Add additional spaces to monitor (for API or ServiceAccount modes).
103    pub fn with_spaces(mut self, spaces: Vec<impl Into<String>>) -> Self {
104        match &mut self.mode {
105            GoogleChatMode::Api { spaces: s, .. }
106            | GoogleChatMode::ServiceAccount { spaces: s, .. } => {
107                s.extend(spaces.into_iter().map(|x| x.into()));
108            }
109            GoogleChatMode::Webhook { .. } => {}
110        }
111        self
112    }
113
114    pub fn with_api_base_url(mut self, url: impl Into<String>) -> Self {
115        if let GoogleChatMode::Api { api_base_url, .. } = &mut self.mode {
116            *api_base_url = url.into();
117        }
118        self
119    }
120
121    fn api_url(api_base_url: &str, path: impl AsRef<str>) -> String {
122        format!(
123            "{}/{}",
124            api_base_url.trim_end_matches('/'),
125            path.as_ref().trim_start_matches('/')
126        )
127    }
128
129    async fn api_get_json(&self, path: impl AsRef<str>) -> Result<Value> {
130        let (token, api_base_url) = match &self.mode {
131            GoogleChatMode::Api {
132                token,
133                api_base_url,
134                ..
135            } => (token.clone(), api_base_url.clone()),
136            GoogleChatMode::ServiceAccount {
137                api_base_url,
138                access_token,
139                ..
140            } => {
141                let token = access_token
142                    .lock()
143                    .await
144                    .clone()
145                    .ok_or_else(|| anyhow::anyhow!("Service account not initialized"))?;
146                (token, api_base_url.clone())
147            }
148            GoogleChatMode::Webhook { .. } => {
149                anyhow::bail!("Google Chat API requested in webhook mode")
150            }
151        };
152
153        let response = self
154            .client
155            .get(Self::api_url(&api_base_url, path))
156            .bearer_auth(&token)
157            .send()
158            .await
159            .context("Google Chat API request failed")?;
160        let status = response.status();
161        let body = response
162            .text()
163            .await
164            .context("Failed to read Google Chat API response body")?;
165
166        if !status.is_success() {
167            anyhow::bail!("Google Chat API request failed {}: {}", status, body);
168        }
169
170        serde_json::from_str(&body).context("Invalid Google Chat API response")
171    }
172
173    async fn api_post_json(&self, path: impl AsRef<str>, body: Value) -> Result<Value> {
174        let (token, api_base_url) = match &self.mode {
175            GoogleChatMode::Api {
176                token,
177                api_base_url,
178                ..
179            } => (token.clone(), api_base_url.clone()),
180            GoogleChatMode::ServiceAccount {
181                api_base_url,
182                access_token,
183                ..
184            } => {
185                let token = access_token
186                    .lock()
187                    .await
188                    .clone()
189                    .ok_or_else(|| anyhow::anyhow!("Service account not initialized"))?;
190                (token, api_base_url.clone())
191            }
192            GoogleChatMode::Webhook { .. } => {
193                anyhow::bail!("Google Chat API requested in webhook mode")
194            }
195        };
196
197        let response = self
198            .client
199            .post(Self::api_url(&api_base_url, path))
200            .bearer_auth(&token)
201            .json(&body)
202            .send()
203            .await
204            .context("Google Chat API request failed")?;
205        let status = response.status();
206        let response_body = response
207            .text()
208            .await
209            .context("Failed to read Google Chat API response body")?;
210
211        if !status.is_success() {
212            anyhow::bail!(
213                "Google Chat API request failed {}: {}",
214                status,
215                response_body
216            );
217        }
218
219        if response_body.trim().is_empty() {
220            Ok(Value::Null)
221        } else {
222            serde_json::from_str(&response_body).context("Invalid Google Chat API response")
223        }
224    }
225
226    fn space_path(space_id: &str) -> String {
227        format!("spaces/{space_id}")
228    }
229
230    fn space_messages_path(space_id: &str) -> String {
231        format!("spaces/{space_id}/messages")
232    }
233
234    async fn api_receive_messages(&self) -> Result<Vec<Message>> {
235        let spaces = match &self.mode {
236            GoogleChatMode::Api {
237                space_id, spaces, ..
238            } => {
239                if spaces.is_empty() {
240                    vec![space_id.clone()]
241                } else {
242                    spaces.clone()
243                }
244            }
245            GoogleChatMode::ServiceAccount { spaces, .. } => spaces.clone(),
246            GoogleChatMode::Webhook { .. } => return Ok(Vec::new()),
247        };
248
249        let last_seen = match &self.mode {
250            GoogleChatMode::Api {
251                last_seen_message_name,
252                ..
253            }
254            | GoogleChatMode::ServiceAccount {
255                last_seen_message_name,
256                ..
257            } => last_seen_message_name.lock().await.clone(),
258            GoogleChatMode::Webhook { .. } => None,
259        };
260
261        let mut all_messages = Vec::new();
262        let mut newest_name = last_seen.clone();
263
264        for space_id in &spaces {
265            let data = self
266                .api_get_json(Self::space_messages_path(space_id))
267                .await?;
268            let mut messages = Vec::new();
269
270            if let Some(entries) = data["messages"].as_array() {
271                let mut parsed = Vec::new();
272
273                for entry in entries {
274                    let Some(name) = entry["name"].as_str() else {
275                        continue;
276                    };
277                    let content = entry["text"].as_str().unwrap_or("").to_string();
278                    if content.is_empty() {
279                        continue;
280                    }
281
282                    let timestamp = entry["createTime"]
283                        .as_str()
284                        .and_then(|value| DateTime::parse_from_rfc3339(value).ok())
285                        .map(|value| value.timestamp())
286                        .unwrap_or_else(|| chrono::Utc::now().timestamp());
287                    let sender = entry["sender"]["displayName"]
288                        .as_str()
289                        .or_else(|| entry["sender"]["name"].as_str())
290                        .unwrap_or("unknown")
291                        .to_string();
292                    let is_direct = entry["space"]["type"].as_str() == Some("DM");
293
294                    parsed.push(Message {
295                        id: name.to_string(),
296                        sender,
297                        content,
298                        timestamp,
299                        channel: Some(space_id.clone()),
300                        reply_to: entry["thread"]["name"].as_str().map(ToString::to_string),
301                        thread_id: None,
302                        media: None,
303                        is_direct,
304                        message_type: MessageType::Text,
305                        edited_timestamp: None,
306                        reactions: None,
307                    });
308                }
309
310                if let Some(first) = parsed.first() {
311                    if newest_name.is_none() || first.id > *newest_name.as_ref().unwrap() {
312                        newest_name = Some(first.id.clone());
313                    }
314                }
315
316                if let Some(seen_name) = &last_seen {
317                    for message in parsed {
318                        if message.id == *seen_name {
319                            break;
320                        }
321                        messages.push(message);
322                    }
323                    messages.reverse();
324                } else {
325                    messages.extend(parsed.into_iter().rev());
326                }
327            }
328
329            all_messages.extend(messages);
330        }
331
332        match &self.mode {
333            GoogleChatMode::Api {
334                last_seen_message_name,
335                ..
336            }
337            | GoogleChatMode::ServiceAccount {
338                last_seen_message_name,
339                ..
340            } => {
341                *last_seen_message_name.lock().await = newest_name;
342            }
343            GoogleChatMode::Webhook { .. } => {}
344        }
345
346        Ok(all_messages)
347    }
348}
349
350#[async_trait]
351impl Messenger for GoogleChatMessenger {
352    fn name(&self) -> &str {
353        &self.name
354    }
355
356    fn messenger_type(&self) -> &str {
357        "googlechat"
358    }
359
360    async fn initialize(&mut self) -> Result<()> {
361        match &self.mode {
362            GoogleChatMode::Api { space_id, .. } => {
363                self.api_get_json(Self::space_path(space_id)).await?;
364            }
365            GoogleChatMode::ServiceAccount { spaces, .. } => {
366                // Validate by checking at least one space
367                if let Some(space) = spaces.first() {
368                    self.api_get_json(Self::space_path(space)).await?;
369                }
370            }
371            GoogleChatMode::Webhook { .. } => {}
372        }
373        self.connected = true;
374        Ok(())
375    }
376
377    async fn send_message(&self, space: &str, content: &str) -> Result<String> {
378        match &self.mode {
379            GoogleChatMode::Webhook { webhook_url } => {
380                let body = json!({ "text": content });
381
382                let resp = self.client.post(webhook_url).json(&body).send().await?;
383
384                if resp.status().is_success() {
385                    Ok(format!(
386                        "googlechat:{}",
387                        chrono::Utc::now().timestamp_millis()
388                    ))
389                } else {
390                    let status = resp.status();
391                    let text = resp.text().await.unwrap_or_default();
392                    anyhow::bail!("Google Chat webhook failed {}: {}", status, text);
393                }
394            }
395            GoogleChatMode::Api { space_id, .. } => {
396                let target_space = if space.is_empty() { space_id } else { space };
397                let data = self
398                    .api_post_json(
399                        Self::space_messages_path(target_space),
400                        json!({ "text": content }),
401                    )
402                    .await?;
403
404                Ok(data["name"].as_str().unwrap_or_default().to_string())
405            }
406            GoogleChatMode::ServiceAccount { spaces, .. } => {
407                let target_space = if space.is_empty() {
408                    spaces
409                        .first()
410                        .ok_or_else(|| anyhow::anyhow!("No spaces configured"))?
411                } else {
412                    space
413                };
414                let data = self
415                    .api_post_json(
416                        Self::space_messages_path(target_space),
417                        json!({ "text": content }),
418                    )
419                    .await?;
420
421                Ok(data["name"].as_str().unwrap_or_default().to_string())
422            }
423        }
424    }
425
426    async fn receive_messages(&self) -> Result<Vec<Message>> {
427        self.api_receive_messages().await
428    }
429
430    fn is_connected(&self) -> bool {
431        self.connected
432    }
433
434    async fn disconnect(&mut self) -> Result<()> {
435        match &self.mode {
436            GoogleChatMode::Api {
437                last_seen_message_name,
438                ..
439            }
440            | GoogleChatMode::ServiceAccount {
441                last_seen_message_name,
442                ..
443            } => {
444                *last_seen_message_name.lock().await = None;
445            }
446            GoogleChatMode::Webhook { .. } => {}
447        }
448        self.connected = false;
449        Ok(())
450    }
451}