Skip to main content

chat_system/messengers/
google_chat.rs

1//! Google Chat messenger — Incoming Webhook and Google Chat API implementation.
2
3use crate::{Message, Messenger};
4use anyhow::{Context, Result};
5use async_trait::async_trait;
6use chrono::DateTime;
7use reqwest::Client;
8use serde_json::{json, Value};
9use tokio::sync::Mutex;
10
11pub struct GoogleChatMessenger {
12    name: String,
13    mode: GoogleChatMode,
14    client: Client,
15    connected: bool,
16}
17
18enum GoogleChatMode {
19    Webhook {
20        webhook_url: String,
21    },
22    Api {
23        token: String,
24        space_id: String,
25        api_base_url: String,
26        last_seen_message_name: Mutex<Option<String>>,
27    },
28}
29
30impl GoogleChatMessenger {
31    pub fn new(name: impl Into<String>, webhook_url: impl Into<String>) -> Self {
32        Self {
33            name: name.into(),
34            mode: GoogleChatMode::Webhook {
35                webhook_url: webhook_url.into(),
36            },
37            client: Client::new(),
38            connected: false,
39        }
40    }
41
42    pub fn new_api(
43        name: impl Into<String>,
44        token: impl Into<String>,
45        space_id: impl Into<String>,
46    ) -> Self {
47        Self {
48            name: name.into(),
49            mode: GoogleChatMode::Api {
50                token: token.into(),
51                space_id: space_id.into(),
52                api_base_url: "https://chat.googleapis.com/v1".to_string(),
53                last_seen_message_name: Mutex::new(None),
54            },
55            client: Client::new(),
56            connected: false,
57        }
58    }
59
60    pub fn with_api_base_url(mut self, url: impl Into<String>) -> Self {
61        if let GoogleChatMode::Api { api_base_url, .. } = &mut self.mode {
62            *api_base_url = url.into();
63        }
64        self
65    }
66
67    fn api_url(api_base_url: &str, path: impl AsRef<str>) -> String {
68        format!(
69            "{}/{}",
70            api_base_url.trim_end_matches('/'),
71            path.as_ref().trim_start_matches('/')
72        )
73    }
74
75    async fn api_get_json(&self, path: impl AsRef<str>) -> Result<Value> {
76        let (token, api_base_url) = match &self.mode {
77            GoogleChatMode::Api {
78                token,
79                api_base_url,
80                ..
81            } => (token, api_base_url),
82            GoogleChatMode::Webhook { .. } => anyhow::bail!("Google Chat API requested in webhook mode"),
83        };
84
85        let response = self
86            .client
87            .get(Self::api_url(api_base_url, path))
88            .bearer_auth(token)
89            .send()
90            .await
91            .context("Google Chat API request failed")?;
92        let status = response.status();
93        let body = response
94            .text()
95            .await
96            .context("Failed to read Google Chat API response body")?;
97
98        if !status.is_success() {
99            anyhow::bail!("Google Chat API request failed {}: {}", status, body);
100        }
101
102        serde_json::from_str(&body).context("Invalid Google Chat API response")
103    }
104
105    async fn api_post_json(&self, path: impl AsRef<str>, body: Value) -> Result<Value> {
106        let (token, api_base_url) = match &self.mode {
107            GoogleChatMode::Api {
108                token,
109                api_base_url,
110                ..
111            } => (token, api_base_url),
112            GoogleChatMode::Webhook { .. } => anyhow::bail!("Google Chat API requested in webhook mode"),
113        };
114
115        let response = self
116            .client
117            .post(Self::api_url(api_base_url, path))
118            .bearer_auth(token)
119            .json(&body)
120            .send()
121            .await
122            .context("Google Chat API request failed")?;
123        let status = response.status();
124        let response_body = response
125            .text()
126            .await
127            .context("Failed to read Google Chat API response body")?;
128
129        if !status.is_success() {
130            anyhow::bail!("Google Chat API request failed {}: {}", status, response_body);
131        }
132
133        if response_body.trim().is_empty() {
134            Ok(Value::Null)
135        } else {
136            serde_json::from_str(&response_body).context("Invalid Google Chat API response")
137        }
138    }
139
140    fn space_path(space_id: &str) -> String {
141        format!("spaces/{space_id}")
142    }
143
144    fn space_messages_path(space_id: &str) -> String {
145        format!("spaces/{space_id}/messages")
146    }
147
148    async fn api_receive_messages(&self) -> Result<Vec<Message>> {
149        let space_id = match &self.mode {
150            GoogleChatMode::Api { space_id, .. } => space_id.clone(),
151            GoogleChatMode::Webhook { .. } => return Ok(Vec::new()),
152        };
153
154        let last_seen = match &self.mode {
155            GoogleChatMode::Api {
156                last_seen_message_name,
157                ..
158            } => last_seen_message_name.lock().await.clone(),
159            GoogleChatMode::Webhook { .. } => None,
160        };
161
162        let data = self.api_get_json(Self::space_messages_path(&space_id)).await?;
163        let mut messages = Vec::new();
164        let mut newest_name = last_seen.clone();
165
166        if let Some(entries) = data["messages"].as_array() {
167            let mut parsed = Vec::new();
168
169            for entry in entries {
170                let Some(name) = entry["name"].as_str() else {
171                    continue;
172                };
173                let content = entry["text"].as_str().unwrap_or("").to_string();
174                if content.is_empty() {
175                    continue;
176                }
177
178                let timestamp = entry["createTime"]
179                    .as_str()
180                    .and_then(|value| DateTime::parse_from_rfc3339(value).ok())
181                    .map(|value| value.timestamp())
182                    .unwrap_or_else(|| chrono::Utc::now().timestamp());
183                let sender = entry["sender"]["displayName"]
184                    .as_str()
185                    .or_else(|| entry["sender"]["name"].as_str())
186                    .unwrap_or("unknown")
187                    .to_string();
188                let is_direct = entry["space"]["type"].as_str() == Some("DM");
189
190                parsed.push(Message {
191                    id: name.to_string(),
192                    sender,
193                    content,
194                    timestamp,
195                    channel: Some(space_id.clone()),
196                    reply_to: entry["thread"]["name"].as_str().map(ToString::to_string),
197                    media: None,
198                    is_direct,
199                    reactions: None,
200                });
201            }
202
203            if let Some(first) = parsed.first() {
204                newest_name = Some(first.id.clone());
205            }
206
207            if let Some(seen_name) = &last_seen {
208                for message in parsed {
209                    if message.id == *seen_name {
210                        break;
211                    }
212                    messages.push(message);
213                }
214                messages.reverse();
215            } else {
216                messages.extend(parsed.into_iter().rev());
217            }
218        }
219
220        if let GoogleChatMode::Api {
221            last_seen_message_name,
222            ..
223        } = &self.mode
224        {
225            *last_seen_message_name.lock().await = newest_name;
226        }
227
228        Ok(messages)
229    }
230}
231
232#[async_trait]
233impl Messenger for GoogleChatMessenger {
234    fn name(&self) -> &str {
235        &self.name
236    }
237
238    fn messenger_type(&self) -> &str {
239        "googlechat"
240    }
241
242    async fn initialize(&mut self) -> Result<()> {
243        if matches!(&self.mode, GoogleChatMode::Api { .. }) {
244            let space_id = match &self.mode {
245                GoogleChatMode::Api { space_id, .. } => space_id.clone(),
246                GoogleChatMode::Webhook { .. } => String::new(),
247            };
248            self.api_get_json(Self::space_path(&space_id)).await?;
249        }
250        self.connected = true;
251        Ok(())
252    }
253
254    async fn send_message(&self, space: &str, content: &str) -> Result<String> {
255        match &self.mode {
256            GoogleChatMode::Webhook { webhook_url } => {
257                let body = json!({ "text": content });
258
259                let resp = self.client.post(webhook_url).json(&body).send().await?;
260
261                if resp.status().is_success() {
262                    Ok(format!(
263                        "googlechat:{}",
264                        chrono::Utc::now().timestamp_millis()
265                    ))
266                } else {
267                    let status = resp.status();
268                    let text = resp.text().await.unwrap_or_default();
269                    anyhow::bail!("Google Chat webhook failed {}: {}", status, text);
270                }
271            }
272            GoogleChatMode::Api { space_id, .. } => {
273                let target_space = if space.is_empty() { space_id } else { space };
274                let data = self
275                    .api_post_json(
276                        Self::space_messages_path(target_space),
277                        json!({ "text": content }),
278                    )
279                    .await?;
280
281                Ok(data["name"].as_str().unwrap_or_default().to_string())
282            }
283        }
284    }
285
286    async fn receive_messages(&self) -> Result<Vec<Message>> {
287        self.api_receive_messages().await
288    }
289
290    fn is_connected(&self) -> bool {
291        self.connected
292    }
293
294    async fn disconnect(&mut self) -> Result<()> {
295        if let GoogleChatMode::Api {
296            last_seen_message_name,
297            ..
298        } = &self.mode
299        {
300            *last_seen_message_name.lock().await = None;
301        }
302        self.connected = false;
303        Ok(())
304    }
305}