1use crate::message::MessageType;
4use crate::{Message, Messenger};
5use anyhow::{Context, Result};
6use async_trait::async_trait;
7use chrono::DateTime;
8use reqwest::Client;
9use serde_json::{Value, json};
10use tokio::sync::Mutex;
11
12pub struct GoogleChatMessenger {
13 name: String,
14 mode: GoogleChatMode,
15 client: Client,
16 connected: bool,
17}
18
19enum GoogleChatMode {
20 Webhook {
21 webhook_url: String,
22 },
23 Api {
24 token: String,
25 space_id: String,
26 spaces: Vec<String>,
27 api_base_url: String,
28 last_seen_message_name: Mutex<Option<String>>,
29 },
30 ServiceAccount {
31 #[allow(dead_code)]
33 credentials_path: String,
34 spaces: Vec<String>,
35 api_base_url: String,
36 access_token: Mutex<Option<String>>,
38 last_seen_message_name: Mutex<Option<String>>,
39 },
40}
41
42impl GoogleChatMessenger {
43 pub fn new(name: impl Into<String>, webhook_url: impl Into<String>) -> Self {
44 Self {
45 name: name.into(),
46 mode: GoogleChatMode::Webhook {
47 webhook_url: webhook_url.into(),
48 },
49 client: Client::new(),
50 connected: false,
51 }
52 }
53
54 pub fn new_api(
55 name: impl Into<String>,
56 token: impl Into<String>,
57 space_id: impl Into<String>,
58 ) -> Self {
59 let space = space_id.into();
60 Self {
61 name: name.into(),
62 mode: GoogleChatMode::Api {
63 token: token.into(),
64 space_id: space.clone(),
65 spaces: vec![space],
66 api_base_url: "https://chat.googleapis.com/v1".to_string(),
67 last_seen_message_name: Mutex::new(None),
68 },
69 client: Client::new(),
70 connected: false,
71 }
72 }
73
74 pub fn with_credentials(
84 name: impl Into<String>,
85 credentials_path: impl Into<String>,
86 spaces: Vec<impl Into<String>>,
87 ) -> Self {
88 Self {
89 name: name.into(),
90 mode: GoogleChatMode::ServiceAccount {
91 credentials_path: credentials_path.into(),
92 spaces: spaces.into_iter().map(|s| s.into()).collect(),
93 api_base_url: "https://chat.googleapis.com/v1".to_string(),
94 access_token: Mutex::new(None),
95 last_seen_message_name: Mutex::new(None),
96 },
97 client: Client::new(),
98 connected: false,
99 }
100 }
101
102 pub fn with_spaces(mut self, spaces: Vec<impl Into<String>>) -> Self {
104 match &mut self.mode {
105 GoogleChatMode::Api { spaces: s, .. }
106 | GoogleChatMode::ServiceAccount { spaces: s, .. } => {
107 s.extend(spaces.into_iter().map(|x| x.into()));
108 }
109 GoogleChatMode::Webhook { .. } => {}
110 }
111 self
112 }
113
114 pub fn with_api_base_url(mut self, url: impl Into<String>) -> Self {
115 if let GoogleChatMode::Api { api_base_url, .. } = &mut self.mode {
116 *api_base_url = url.into();
117 }
118 self
119 }
120
121 fn api_url(api_base_url: &str, path: impl AsRef<str>) -> String {
122 format!(
123 "{}/{}",
124 api_base_url.trim_end_matches('/'),
125 path.as_ref().trim_start_matches('/')
126 )
127 }
128
129 async fn api_get_json(&self, path: impl AsRef<str>) -> Result<Value> {
130 let (token, api_base_url) = match &self.mode {
131 GoogleChatMode::Api {
132 token,
133 api_base_url,
134 ..
135 } => (token.clone(), api_base_url.clone()),
136 GoogleChatMode::ServiceAccount {
137 api_base_url,
138 access_token,
139 ..
140 } => {
141 let token = access_token
142 .lock()
143 .await
144 .clone()
145 .ok_or_else(|| anyhow::anyhow!("Service account not initialized"))?;
146 (token, api_base_url.clone())
147 }
148 GoogleChatMode::Webhook { .. } => {
149 anyhow::bail!("Google Chat API requested in webhook mode")
150 }
151 };
152
153 let response = self
154 .client
155 .get(Self::api_url(&api_base_url, path))
156 .bearer_auth(&token)
157 .send()
158 .await
159 .context("Google Chat API request failed")?;
160 let status = response.status();
161 let body = response
162 .text()
163 .await
164 .context("Failed to read Google Chat API response body")?;
165
166 if !status.is_success() {
167 anyhow::bail!("Google Chat API request failed {}: {}", status, body);
168 }
169
170 serde_json::from_str(&body).context("Invalid Google Chat API response")
171 }
172
173 async fn api_post_json(&self, path: impl AsRef<str>, body: Value) -> Result<Value> {
174 let (token, api_base_url) = match &self.mode {
175 GoogleChatMode::Api {
176 token,
177 api_base_url,
178 ..
179 } => (token.clone(), api_base_url.clone()),
180 GoogleChatMode::ServiceAccount {
181 api_base_url,
182 access_token,
183 ..
184 } => {
185 let token = access_token
186 .lock()
187 .await
188 .clone()
189 .ok_or_else(|| anyhow::anyhow!("Service account not initialized"))?;
190 (token, api_base_url.clone())
191 }
192 GoogleChatMode::Webhook { .. } => {
193 anyhow::bail!("Google Chat API requested in webhook mode")
194 }
195 };
196
197 let response = self
198 .client
199 .post(Self::api_url(&api_base_url, path))
200 .bearer_auth(&token)
201 .json(&body)
202 .send()
203 .await
204 .context("Google Chat API request failed")?;
205 let status = response.status();
206 let response_body = response
207 .text()
208 .await
209 .context("Failed to read Google Chat API response body")?;
210
211 if !status.is_success() {
212 anyhow::bail!(
213 "Google Chat API request failed {}: {}",
214 status,
215 response_body
216 );
217 }
218
219 if response_body.trim().is_empty() {
220 Ok(Value::Null)
221 } else {
222 serde_json::from_str(&response_body).context("Invalid Google Chat API response")
223 }
224 }
225
226 fn space_path(space_id: &str) -> String {
227 format!("spaces/{space_id}")
228 }
229
230 fn space_messages_path(space_id: &str) -> String {
231 format!("spaces/{space_id}/messages")
232 }
233
234 async fn api_receive_messages(&self) -> Result<Vec<Message>> {
235 let spaces = match &self.mode {
236 GoogleChatMode::Api {
237 space_id, spaces, ..
238 } => {
239 if spaces.is_empty() {
240 vec![space_id.clone()]
241 } else {
242 spaces.clone()
243 }
244 }
245 GoogleChatMode::ServiceAccount { spaces, .. } => spaces.clone(),
246 GoogleChatMode::Webhook { .. } => return Ok(Vec::new()),
247 };
248
249 let last_seen = match &self.mode {
250 GoogleChatMode::Api {
251 last_seen_message_name,
252 ..
253 }
254 | GoogleChatMode::ServiceAccount {
255 last_seen_message_name,
256 ..
257 } => last_seen_message_name.lock().await.clone(),
258 GoogleChatMode::Webhook { .. } => None,
259 };
260
261 let mut all_messages = Vec::new();
262 let mut newest_name = last_seen.clone();
263
264 for space_id in &spaces {
265 let data = self
266 .api_get_json(Self::space_messages_path(space_id))
267 .await?;
268 let mut messages = Vec::new();
269
270 if let Some(entries) = data["messages"].as_array() {
271 let mut parsed = Vec::new();
272
273 for entry in entries {
274 let Some(name) = entry["name"].as_str() else {
275 continue;
276 };
277 let content = entry["text"].as_str().unwrap_or("").to_string();
278 if content.is_empty() {
279 continue;
280 }
281
282 let timestamp = entry["createTime"]
283 .as_str()
284 .and_then(|value| DateTime::parse_from_rfc3339(value).ok())
285 .map(|value| value.timestamp())
286 .unwrap_or_else(|| chrono::Utc::now().timestamp());
287 let sender = entry["sender"]["displayName"]
288 .as_str()
289 .or_else(|| entry["sender"]["name"].as_str())
290 .unwrap_or("unknown")
291 .to_string();
292 let is_direct = entry["space"]["type"].as_str() == Some("DM");
293
294 parsed.push(Message {
295 id: name.to_string(),
296 sender,
297 content,
298 timestamp,
299 channel: Some(space_id.clone()),
300 reply_to: entry["thread"]["name"].as_str().map(ToString::to_string),
301 thread_id: None,
302 media: None,
303 is_direct,
304 message_type: MessageType::Text,
305 edited_timestamp: None,
306 reactions: None,
307 });
308 }
309
310 if let Some(first) = parsed.first() {
311 if newest_name.is_none() || first.id > *newest_name.as_ref().unwrap() {
312 newest_name = Some(first.id.clone());
313 }
314 }
315
316 if let Some(seen_name) = &last_seen {
317 for message in parsed {
318 if message.id == *seen_name {
319 break;
320 }
321 messages.push(message);
322 }
323 messages.reverse();
324 } else {
325 messages.extend(parsed.into_iter().rev());
326 }
327 }
328
329 all_messages.extend(messages);
330 }
331
332 match &self.mode {
333 GoogleChatMode::Api {
334 last_seen_message_name,
335 ..
336 }
337 | GoogleChatMode::ServiceAccount {
338 last_seen_message_name,
339 ..
340 } => {
341 *last_seen_message_name.lock().await = newest_name;
342 }
343 GoogleChatMode::Webhook { .. } => {}
344 }
345
346 Ok(all_messages)
347 }
348}
349
350#[async_trait]
351impl Messenger for GoogleChatMessenger {
352 fn name(&self) -> &str {
353 &self.name
354 }
355
356 fn messenger_type(&self) -> &str {
357 "googlechat"
358 }
359
360 async fn initialize(&mut self) -> Result<()> {
361 match &self.mode {
362 GoogleChatMode::Api { space_id, .. } => {
363 self.api_get_json(Self::space_path(space_id)).await?;
364 }
365 GoogleChatMode::ServiceAccount { spaces, .. } => {
366 if let Some(space) = spaces.first() {
368 self.api_get_json(Self::space_path(space)).await?;
369 }
370 }
371 GoogleChatMode::Webhook { .. } => {}
372 }
373 self.connected = true;
374 Ok(())
375 }
376
377 async fn send_message(&self, space: &str, content: &str) -> Result<String> {
378 match &self.mode {
379 GoogleChatMode::Webhook { webhook_url } => {
380 let body = json!({ "text": content });
381
382 let resp = self.client.post(webhook_url).json(&body).send().await?;
383
384 if resp.status().is_success() {
385 Ok(format!(
386 "googlechat:{}",
387 chrono::Utc::now().timestamp_millis()
388 ))
389 } else {
390 let status = resp.status();
391 let text = resp.text().await.unwrap_or_default();
392 anyhow::bail!("Google Chat webhook failed {}: {}", status, text);
393 }
394 }
395 GoogleChatMode::Api { space_id, .. } => {
396 let target_space = if space.is_empty() { space_id } else { space };
397 let data = self
398 .api_post_json(
399 Self::space_messages_path(target_space),
400 json!({ "text": content }),
401 )
402 .await?;
403
404 Ok(data["name"].as_str().unwrap_or_default().to_string())
405 }
406 GoogleChatMode::ServiceAccount { spaces, .. } => {
407 let target_space = if space.is_empty() {
408 spaces
409 .first()
410 .ok_or_else(|| anyhow::anyhow!("No spaces configured"))?
411 } else {
412 space
413 };
414 let data = self
415 .api_post_json(
416 Self::space_messages_path(target_space),
417 json!({ "text": content }),
418 )
419 .await?;
420
421 Ok(data["name"].as_str().unwrap_or_default().to_string())
422 }
423 }
424 }
425
426 async fn receive_messages(&self) -> Result<Vec<Message>> {
427 self.api_receive_messages().await
428 }
429
430 fn is_connected(&self) -> bool {
431 self.connected
432 }
433
434 async fn disconnect(&mut self) -> Result<()> {
435 match &self.mode {
436 GoogleChatMode::Api {
437 last_seen_message_name,
438 ..
439 }
440 | GoogleChatMode::ServiceAccount {
441 last_seen_message_name,
442 ..
443 } => {
444 *last_seen_message_name.lock().await = None;
445 }
446 GoogleChatMode::Webhook { .. } => {}
447 }
448 self.connected = false;
449 Ok(())
450 }
451}