1use crate::message::MessageType;
4use crate::{Message, Messenger};
5use anyhow::{Context, Result};
6use async_trait::async_trait;
7use chrono::DateTime;
8use reqwest::Client;
9use serde_json::{Value, json};
10use tokio::sync::Mutex;
11
12pub struct GoogleChatMessenger {
13 name: String,
14 mode: GoogleChatMode,
15 client: Client,
16 connected: bool,
17}
18
19enum GoogleChatMode {
20 Webhook {
21 webhook_url: String,
22 },
23 Api {
24 token: String,
25 space_id: String,
26 api_base_url: String,
27 last_seen_message_name: Mutex<Option<String>>,
28 },
29}
30
31impl GoogleChatMessenger {
32 pub fn new(name: impl Into<String>, webhook_url: impl Into<String>) -> Self {
33 Self {
34 name: name.into(),
35 mode: GoogleChatMode::Webhook {
36 webhook_url: webhook_url.into(),
37 },
38 client: Client::new(),
39 connected: false,
40 }
41 }
42
43 pub fn new_api(
44 name: impl Into<String>,
45 token: impl Into<String>,
46 space_id: impl Into<String>,
47 ) -> Self {
48 Self {
49 name: name.into(),
50 mode: GoogleChatMode::Api {
51 token: token.into(),
52 space_id: space_id.into(),
53 api_base_url: "https://chat.googleapis.com/v1".to_string(),
54 last_seen_message_name: Mutex::new(None),
55 },
56 client: Client::new(),
57 connected: false,
58 }
59 }
60
61 pub fn with_api_base_url(mut self, url: impl Into<String>) -> Self {
62 if let GoogleChatMode::Api { api_base_url, .. } = &mut self.mode {
63 *api_base_url = url.into();
64 }
65 self
66 }
67
68 fn api_url(api_base_url: &str, path: impl AsRef<str>) -> String {
69 format!(
70 "{}/{}",
71 api_base_url.trim_end_matches('/'),
72 path.as_ref().trim_start_matches('/')
73 )
74 }
75
76 async fn api_get_json(&self, path: impl AsRef<str>) -> Result<Value> {
77 let (token, api_base_url) = match &self.mode {
78 GoogleChatMode::Api {
79 token,
80 api_base_url,
81 ..
82 } => (token, api_base_url),
83 GoogleChatMode::Webhook { .. } => {
84 anyhow::bail!("Google Chat API requested in webhook mode")
85 }
86 };
87
88 let response = self
89 .client
90 .get(Self::api_url(api_base_url, path))
91 .bearer_auth(token)
92 .send()
93 .await
94 .context("Google Chat API request failed")?;
95 let status = response.status();
96 let body = response
97 .text()
98 .await
99 .context("Failed to read Google Chat API response body")?;
100
101 if !status.is_success() {
102 anyhow::bail!("Google Chat API request failed {}: {}", status, body);
103 }
104
105 serde_json::from_str(&body).context("Invalid Google Chat API response")
106 }
107
108 async fn api_post_json(&self, path: impl AsRef<str>, body: Value) -> Result<Value> {
109 let (token, api_base_url) = match &self.mode {
110 GoogleChatMode::Api {
111 token,
112 api_base_url,
113 ..
114 } => (token, api_base_url),
115 GoogleChatMode::Webhook { .. } => {
116 anyhow::bail!("Google Chat API requested in webhook mode")
117 }
118 };
119
120 let response = self
121 .client
122 .post(Self::api_url(api_base_url, path))
123 .bearer_auth(token)
124 .json(&body)
125 .send()
126 .await
127 .context("Google Chat API request failed")?;
128 let status = response.status();
129 let response_body = response
130 .text()
131 .await
132 .context("Failed to read Google Chat API response body")?;
133
134 if !status.is_success() {
135 anyhow::bail!(
136 "Google Chat API request failed {}: {}",
137 status,
138 response_body
139 );
140 }
141
142 if response_body.trim().is_empty() {
143 Ok(Value::Null)
144 } else {
145 serde_json::from_str(&response_body).context("Invalid Google Chat API response")
146 }
147 }
148
149 fn space_path(space_id: &str) -> String {
150 format!("spaces/{space_id}")
151 }
152
153 fn space_messages_path(space_id: &str) -> String {
154 format!("spaces/{space_id}/messages")
155 }
156
157 async fn api_receive_messages(&self) -> Result<Vec<Message>> {
158 let space_id = match &self.mode {
159 GoogleChatMode::Api { space_id, .. } => space_id.clone(),
160 GoogleChatMode::Webhook { .. } => return Ok(Vec::new()),
161 };
162
163 let last_seen = match &self.mode {
164 GoogleChatMode::Api {
165 last_seen_message_name,
166 ..
167 } => last_seen_message_name.lock().await.clone(),
168 GoogleChatMode::Webhook { .. } => None,
169 };
170
171 let data = self
172 .api_get_json(Self::space_messages_path(&space_id))
173 .await?;
174 let mut messages = Vec::new();
175 let mut newest_name = last_seen.clone();
176
177 if let Some(entries) = data["messages"].as_array() {
178 let mut parsed = Vec::new();
179
180 for entry in entries {
181 let Some(name) = entry["name"].as_str() else {
182 continue;
183 };
184 let content = entry["text"].as_str().unwrap_or("").to_string();
185 if content.is_empty() {
186 continue;
187 }
188
189 let timestamp = entry["createTime"]
190 .as_str()
191 .and_then(|value| DateTime::parse_from_rfc3339(value).ok())
192 .map(|value| value.timestamp())
193 .unwrap_or_else(|| chrono::Utc::now().timestamp());
194 let sender = entry["sender"]["displayName"]
195 .as_str()
196 .or_else(|| entry["sender"]["name"].as_str())
197 .unwrap_or("unknown")
198 .to_string();
199 let is_direct = entry["space"]["type"].as_str() == Some("DM");
200
201 parsed.push(Message {
202 id: name.to_string(),
203 sender,
204 content,
205 timestamp,
206 channel: Some(space_id.clone()),
207 reply_to: entry["thread"]["name"].as_str().map(ToString::to_string),
208 thread_id: None,
209 media: None,
210 is_direct,
211 message_type: MessageType::Text,
212 edited_timestamp: None,
213 reactions: None,
214 });
215 }
216
217 if let Some(first) = parsed.first() {
218 newest_name = Some(first.id.clone());
219 }
220
221 if let Some(seen_name) = &last_seen {
222 for message in parsed {
223 if message.id == *seen_name {
224 break;
225 }
226 messages.push(message);
227 }
228 messages.reverse();
229 } else {
230 messages.extend(parsed.into_iter().rev());
231 }
232 }
233
234 if let GoogleChatMode::Api {
235 last_seen_message_name,
236 ..
237 } = &self.mode
238 {
239 *last_seen_message_name.lock().await = newest_name;
240 }
241
242 Ok(messages)
243 }
244}
245
246#[async_trait]
247impl Messenger for GoogleChatMessenger {
248 fn name(&self) -> &str {
249 &self.name
250 }
251
252 fn messenger_type(&self) -> &str {
253 "googlechat"
254 }
255
256 async fn initialize(&mut self) -> Result<()> {
257 if matches!(&self.mode, GoogleChatMode::Api { .. }) {
258 let space_id = match &self.mode {
259 GoogleChatMode::Api { space_id, .. } => space_id.clone(),
260 GoogleChatMode::Webhook { .. } => String::new(),
261 };
262 self.api_get_json(Self::space_path(&space_id)).await?;
263 }
264 self.connected = true;
265 Ok(())
266 }
267
268 async fn send_message(&self, space: &str, content: &str) -> Result<String> {
269 match &self.mode {
270 GoogleChatMode::Webhook { webhook_url } => {
271 let body = json!({ "text": content });
272
273 let resp = self.client.post(webhook_url).json(&body).send().await?;
274
275 if resp.status().is_success() {
276 Ok(format!(
277 "googlechat:{}",
278 chrono::Utc::now().timestamp_millis()
279 ))
280 } else {
281 let status = resp.status();
282 let text = resp.text().await.unwrap_or_default();
283 anyhow::bail!("Google Chat webhook failed {}: {}", status, text);
284 }
285 }
286 GoogleChatMode::Api { space_id, .. } => {
287 let target_space = if space.is_empty() { space_id } else { space };
288 let data = self
289 .api_post_json(
290 Self::space_messages_path(target_space),
291 json!({ "text": content }),
292 )
293 .await?;
294
295 Ok(data["name"].as_str().unwrap_or_default().to_string())
296 }
297 }
298 }
299
300 async fn receive_messages(&self) -> Result<Vec<Message>> {
301 self.api_receive_messages().await
302 }
303
304 fn is_connected(&self) -> bool {
305 self.connected
306 }
307
308 async fn disconnect(&mut self) -> Result<()> {
309 if let GoogleChatMode::Api {
310 last_seen_message_name,
311 ..
312 } = &self.mode
313 {
314 *last_seen_message_name.lock().await = None;
315 }
316 self.connected = false;
317 Ok(())
318 }
319}