Skip to main content

chat_system/messengers/
google_chat.rs

1//! Google Chat messenger — Incoming Webhook and Google Chat API implementation.
2
3use crate::message::MessageType;
4use crate::{Message, Messenger};
5use anyhow::{Context, Result};
6use async_trait::async_trait;
7use chrono::DateTime;
8use reqwest::Client;
9use serde_json::{Value, json};
10use tokio::sync::Mutex;
11
12pub struct GoogleChatMessenger {
13    name: String,
14    mode: GoogleChatMode,
15    client: Client,
16    connected: bool,
17}
18
19enum GoogleChatMode {
20    Webhook {
21        webhook_url: String,
22    },
23    Api {
24        token: String,
25        space_id: String,
26        api_base_url: String,
27        last_seen_message_name: Mutex<Option<String>>,
28    },
29}
30
31impl GoogleChatMessenger {
32    pub fn new(name: impl Into<String>, webhook_url: impl Into<String>) -> Self {
33        Self {
34            name: name.into(),
35            mode: GoogleChatMode::Webhook {
36                webhook_url: webhook_url.into(),
37            },
38            client: Client::new(),
39            connected: false,
40        }
41    }
42
43    pub fn new_api(
44        name: impl Into<String>,
45        token: impl Into<String>,
46        space_id: impl Into<String>,
47    ) -> Self {
48        Self {
49            name: name.into(),
50            mode: GoogleChatMode::Api {
51                token: token.into(),
52                space_id: space_id.into(),
53                api_base_url: "https://chat.googleapis.com/v1".to_string(),
54                last_seen_message_name: Mutex::new(None),
55            },
56            client: Client::new(),
57            connected: false,
58        }
59    }
60
61    pub fn with_api_base_url(mut self, url: impl Into<String>) -> Self {
62        if let GoogleChatMode::Api { api_base_url, .. } = &mut self.mode {
63            *api_base_url = url.into();
64        }
65        self
66    }
67
68    fn api_url(api_base_url: &str, path: impl AsRef<str>) -> String {
69        format!(
70            "{}/{}",
71            api_base_url.trim_end_matches('/'),
72            path.as_ref().trim_start_matches('/')
73        )
74    }
75
76    async fn api_get_json(&self, path: impl AsRef<str>) -> Result<Value> {
77        let (token, api_base_url) = match &self.mode {
78            GoogleChatMode::Api {
79                token,
80                api_base_url,
81                ..
82            } => (token, api_base_url),
83            GoogleChatMode::Webhook { .. } => {
84                anyhow::bail!("Google Chat API requested in webhook mode")
85            }
86        };
87
88        let response = self
89            .client
90            .get(Self::api_url(api_base_url, path))
91            .bearer_auth(token)
92            .send()
93            .await
94            .context("Google Chat API request failed")?;
95        let status = response.status();
96        let body = response
97            .text()
98            .await
99            .context("Failed to read Google Chat API response body")?;
100
101        if !status.is_success() {
102            anyhow::bail!("Google Chat API request failed {}: {}", status, body);
103        }
104
105        serde_json::from_str(&body).context("Invalid Google Chat API response")
106    }
107
108    async fn api_post_json(&self, path: impl AsRef<str>, body: Value) -> Result<Value> {
109        let (token, api_base_url) = match &self.mode {
110            GoogleChatMode::Api {
111                token,
112                api_base_url,
113                ..
114            } => (token, api_base_url),
115            GoogleChatMode::Webhook { .. } => {
116                anyhow::bail!("Google Chat API requested in webhook mode")
117            }
118        };
119
120        let response = self
121            .client
122            .post(Self::api_url(api_base_url, path))
123            .bearer_auth(token)
124            .json(&body)
125            .send()
126            .await
127            .context("Google Chat API request failed")?;
128        let status = response.status();
129        let response_body = response
130            .text()
131            .await
132            .context("Failed to read Google Chat API response body")?;
133
134        if !status.is_success() {
135            anyhow::bail!(
136                "Google Chat API request failed {}: {}",
137                status,
138                response_body
139            );
140        }
141
142        if response_body.trim().is_empty() {
143            Ok(Value::Null)
144        } else {
145            serde_json::from_str(&response_body).context("Invalid Google Chat API response")
146        }
147    }
148
149    fn space_path(space_id: &str) -> String {
150        format!("spaces/{space_id}")
151    }
152
153    fn space_messages_path(space_id: &str) -> String {
154        format!("spaces/{space_id}/messages")
155    }
156
157    async fn api_receive_messages(&self) -> Result<Vec<Message>> {
158        let space_id = match &self.mode {
159            GoogleChatMode::Api { space_id, .. } => space_id.clone(),
160            GoogleChatMode::Webhook { .. } => return Ok(Vec::new()),
161        };
162
163        let last_seen = match &self.mode {
164            GoogleChatMode::Api {
165                last_seen_message_name,
166                ..
167            } => last_seen_message_name.lock().await.clone(),
168            GoogleChatMode::Webhook { .. } => None,
169        };
170
171        let data = self
172            .api_get_json(Self::space_messages_path(&space_id))
173            .await?;
174        let mut messages = Vec::new();
175        let mut newest_name = last_seen.clone();
176
177        if let Some(entries) = data["messages"].as_array() {
178            let mut parsed = Vec::new();
179
180            for entry in entries {
181                let Some(name) = entry["name"].as_str() else {
182                    continue;
183                };
184                let content = entry["text"].as_str().unwrap_or("").to_string();
185                if content.is_empty() {
186                    continue;
187                }
188
189                let timestamp = entry["createTime"]
190                    .as_str()
191                    .and_then(|value| DateTime::parse_from_rfc3339(value).ok())
192                    .map(|value| value.timestamp())
193                    .unwrap_or_else(|| chrono::Utc::now().timestamp());
194                let sender = entry["sender"]["displayName"]
195                    .as_str()
196                    .or_else(|| entry["sender"]["name"].as_str())
197                    .unwrap_or("unknown")
198                    .to_string();
199                let is_direct = entry["space"]["type"].as_str() == Some("DM");
200
201                parsed.push(Message {
202                    id: name.to_string(),
203                    sender,
204                    content,
205                    timestamp,
206                    channel: Some(space_id.clone()),
207                    reply_to: entry["thread"]["name"].as_str().map(ToString::to_string),
208                    thread_id: None,
209                    media: None,
210                    is_direct,
211                    message_type: MessageType::Text,
212                    edited_timestamp: None,
213                    reactions: None,
214                });
215            }
216
217            if let Some(first) = parsed.first() {
218                newest_name = Some(first.id.clone());
219            }
220
221            if let Some(seen_name) = &last_seen {
222                for message in parsed {
223                    if message.id == *seen_name {
224                        break;
225                    }
226                    messages.push(message);
227                }
228                messages.reverse();
229            } else {
230                messages.extend(parsed.into_iter().rev());
231            }
232        }
233
234        if let GoogleChatMode::Api {
235            last_seen_message_name,
236            ..
237        } = &self.mode
238        {
239            *last_seen_message_name.lock().await = newest_name;
240        }
241
242        Ok(messages)
243    }
244}
245
246#[async_trait]
247impl Messenger for GoogleChatMessenger {
248    fn name(&self) -> &str {
249        &self.name
250    }
251
252    fn messenger_type(&self) -> &str {
253        "googlechat"
254    }
255
256    async fn initialize(&mut self) -> Result<()> {
257        if matches!(&self.mode, GoogleChatMode::Api { .. }) {
258            let space_id = match &self.mode {
259                GoogleChatMode::Api { space_id, .. } => space_id.clone(),
260                GoogleChatMode::Webhook { .. } => String::new(),
261            };
262            self.api_get_json(Self::space_path(&space_id)).await?;
263        }
264        self.connected = true;
265        Ok(())
266    }
267
268    async fn send_message(&self, space: &str, content: &str) -> Result<String> {
269        match &self.mode {
270            GoogleChatMode::Webhook { webhook_url } => {
271                let body = json!({ "text": content });
272
273                let resp = self.client.post(webhook_url).json(&body).send().await?;
274
275                if resp.status().is_success() {
276                    Ok(format!(
277                        "googlechat:{}",
278                        chrono::Utc::now().timestamp_millis()
279                    ))
280                } else {
281                    let status = resp.status();
282                    let text = resp.text().await.unwrap_or_default();
283                    anyhow::bail!("Google Chat webhook failed {}: {}", status, text);
284                }
285            }
286            GoogleChatMode::Api { space_id, .. } => {
287                let target_space = if space.is_empty() { space_id } else { space };
288                let data = self
289                    .api_post_json(
290                        Self::space_messages_path(target_space),
291                        json!({ "text": content }),
292                    )
293                    .await?;
294
295                Ok(data["name"].as_str().unwrap_or_default().to_string())
296            }
297        }
298    }
299
300    async fn receive_messages(&self) -> Result<Vec<Message>> {
301        self.api_receive_messages().await
302    }
303
304    fn is_connected(&self) -> bool {
305        self.connected
306    }
307
308    async fn disconnect(&mut self) -> Result<()> {
309        if let GoogleChatMode::Api {
310            last_seen_message_name,
311            ..
312        } = &self.mode
313        {
314            *last_seen_message_name.lock().await = None;
315        }
316        self.connected = false;
317        Ok(())
318    }
319}