1use crate::{Message, Messenger};
4use anyhow::{Context, Result};
5use async_trait::async_trait;
6use chrono::DateTime;
7use reqwest::Client;
8use serde_json::{json, Value};
9use tokio::sync::Mutex;
10
11pub struct GoogleChatMessenger {
12 name: String,
13 mode: GoogleChatMode,
14 client: Client,
15 connected: bool,
16}
17
18enum GoogleChatMode {
19 Webhook {
20 webhook_url: String,
21 },
22 Api {
23 token: String,
24 space_id: String,
25 api_base_url: String,
26 last_seen_message_name: Mutex<Option<String>>,
27 },
28}
29
30impl GoogleChatMessenger {
31 pub fn new(name: impl Into<String>, webhook_url: impl Into<String>) -> Self {
32 Self {
33 name: name.into(),
34 mode: GoogleChatMode::Webhook {
35 webhook_url: webhook_url.into(),
36 },
37 client: Client::new(),
38 connected: false,
39 }
40 }
41
42 pub fn new_api(
43 name: impl Into<String>,
44 token: impl Into<String>,
45 space_id: impl Into<String>,
46 ) -> Self {
47 Self {
48 name: name.into(),
49 mode: GoogleChatMode::Api {
50 token: token.into(),
51 space_id: space_id.into(),
52 api_base_url: "https://chat.googleapis.com/v1".to_string(),
53 last_seen_message_name: Mutex::new(None),
54 },
55 client: Client::new(),
56 connected: false,
57 }
58 }
59
60 pub fn with_api_base_url(mut self, url: impl Into<String>) -> Self {
61 if let GoogleChatMode::Api { api_base_url, .. } = &mut self.mode {
62 *api_base_url = url.into();
63 }
64 self
65 }
66
67 fn api_url(api_base_url: &str, path: impl AsRef<str>) -> String {
68 format!(
69 "{}/{}",
70 api_base_url.trim_end_matches('/'),
71 path.as_ref().trim_start_matches('/')
72 )
73 }
74
75 async fn api_get_json(&self, path: impl AsRef<str>) -> Result<Value> {
76 let (token, api_base_url) = match &self.mode {
77 GoogleChatMode::Api {
78 token,
79 api_base_url,
80 ..
81 } => (token, api_base_url),
82 GoogleChatMode::Webhook { .. } => anyhow::bail!("Google Chat API requested in webhook mode"),
83 };
84
85 let response = self
86 .client
87 .get(Self::api_url(api_base_url, path))
88 .bearer_auth(token)
89 .send()
90 .await
91 .context("Google Chat API request failed")?;
92 let status = response.status();
93 let body = response
94 .text()
95 .await
96 .context("Failed to read Google Chat API response body")?;
97
98 if !status.is_success() {
99 anyhow::bail!("Google Chat API request failed {}: {}", status, body);
100 }
101
102 serde_json::from_str(&body).context("Invalid Google Chat API response")
103 }
104
105 async fn api_post_json(&self, path: impl AsRef<str>, body: Value) -> Result<Value> {
106 let (token, api_base_url) = match &self.mode {
107 GoogleChatMode::Api {
108 token,
109 api_base_url,
110 ..
111 } => (token, api_base_url),
112 GoogleChatMode::Webhook { .. } => anyhow::bail!("Google Chat API requested in webhook mode"),
113 };
114
115 let response = self
116 .client
117 .post(Self::api_url(api_base_url, path))
118 .bearer_auth(token)
119 .json(&body)
120 .send()
121 .await
122 .context("Google Chat API request failed")?;
123 let status = response.status();
124 let response_body = response
125 .text()
126 .await
127 .context("Failed to read Google Chat API response body")?;
128
129 if !status.is_success() {
130 anyhow::bail!("Google Chat API request failed {}: {}", status, response_body);
131 }
132
133 if response_body.trim().is_empty() {
134 Ok(Value::Null)
135 } else {
136 serde_json::from_str(&response_body).context("Invalid Google Chat API response")
137 }
138 }
139
140 fn space_path(space_id: &str) -> String {
141 format!("spaces/{space_id}")
142 }
143
144 fn space_messages_path(space_id: &str) -> String {
145 format!("spaces/{space_id}/messages")
146 }
147
148 async fn api_receive_messages(&self) -> Result<Vec<Message>> {
149 let space_id = match &self.mode {
150 GoogleChatMode::Api { space_id, .. } => space_id.clone(),
151 GoogleChatMode::Webhook { .. } => return Ok(Vec::new()),
152 };
153
154 let last_seen = match &self.mode {
155 GoogleChatMode::Api {
156 last_seen_message_name,
157 ..
158 } => last_seen_message_name.lock().await.clone(),
159 GoogleChatMode::Webhook { .. } => None,
160 };
161
162 let data = self.api_get_json(Self::space_messages_path(&space_id)).await?;
163 let mut messages = Vec::new();
164 let mut newest_name = last_seen.clone();
165
166 if let Some(entries) = data["messages"].as_array() {
167 let mut parsed = Vec::new();
168
169 for entry in entries {
170 let Some(name) = entry["name"].as_str() else {
171 continue;
172 };
173 let content = entry["text"].as_str().unwrap_or("").to_string();
174 if content.is_empty() {
175 continue;
176 }
177
178 let timestamp = entry["createTime"]
179 .as_str()
180 .and_then(|value| DateTime::parse_from_rfc3339(value).ok())
181 .map(|value| value.timestamp())
182 .unwrap_or_else(|| chrono::Utc::now().timestamp());
183 let sender = entry["sender"]["displayName"]
184 .as_str()
185 .or_else(|| entry["sender"]["name"].as_str())
186 .unwrap_or("unknown")
187 .to_string();
188 let is_direct = entry["space"]["type"].as_str() == Some("DM");
189
190 parsed.push(Message {
191 id: name.to_string(),
192 sender,
193 content,
194 timestamp,
195 channel: Some(space_id.clone()),
196 reply_to: entry["thread"]["name"].as_str().map(ToString::to_string),
197 media: None,
198 is_direct,
199 reactions: None,
200 });
201 }
202
203 if let Some(first) = parsed.first() {
204 newest_name = Some(first.id.clone());
205 }
206
207 if let Some(seen_name) = &last_seen {
208 for message in parsed {
209 if message.id == *seen_name {
210 break;
211 }
212 messages.push(message);
213 }
214 messages.reverse();
215 } else {
216 messages.extend(parsed.into_iter().rev());
217 }
218 }
219
220 if let GoogleChatMode::Api {
221 last_seen_message_name,
222 ..
223 } = &self.mode
224 {
225 *last_seen_message_name.lock().await = newest_name;
226 }
227
228 Ok(messages)
229 }
230}
231
232#[async_trait]
233impl Messenger for GoogleChatMessenger {
234 fn name(&self) -> &str {
235 &self.name
236 }
237
238 fn messenger_type(&self) -> &str {
239 "googlechat"
240 }
241
242 async fn initialize(&mut self) -> Result<()> {
243 if matches!(&self.mode, GoogleChatMode::Api { .. }) {
244 let space_id = match &self.mode {
245 GoogleChatMode::Api { space_id, .. } => space_id.clone(),
246 GoogleChatMode::Webhook { .. } => String::new(),
247 };
248 self.api_get_json(Self::space_path(&space_id)).await?;
249 }
250 self.connected = true;
251 Ok(())
252 }
253
254 async fn send_message(&self, space: &str, content: &str) -> Result<String> {
255 match &self.mode {
256 GoogleChatMode::Webhook { webhook_url } => {
257 let body = json!({ "text": content });
258
259 let resp = self.client.post(webhook_url).json(&body).send().await?;
260
261 if resp.status().is_success() {
262 Ok(format!(
263 "googlechat:{}",
264 chrono::Utc::now().timestamp_millis()
265 ))
266 } else {
267 let status = resp.status();
268 let text = resp.text().await.unwrap_or_default();
269 anyhow::bail!("Google Chat webhook failed {}: {}", status, text);
270 }
271 }
272 GoogleChatMode::Api { space_id, .. } => {
273 let target_space = if space.is_empty() { space_id } else { space };
274 let data = self
275 .api_post_json(
276 Self::space_messages_path(target_space),
277 json!({ "text": content }),
278 )
279 .await?;
280
281 Ok(data["name"].as_str().unwrap_or_default().to_string())
282 }
283 }
284 }
285
286 async fn receive_messages(&self) -> Result<Vec<Message>> {
287 self.api_receive_messages().await
288 }
289
290 fn is_connected(&self) -> bool {
291 self.connected
292 }
293
294 async fn disconnect(&mut self) -> Result<()> {
295 if let GoogleChatMode::Api {
296 last_seen_message_name,
297 ..
298 } = &self.mode
299 {
300 *last_seen_message_name.lock().await = None;
301 }
302 self.connected = false;
303 Ok(())
304 }
305}