1use crate::device_manager::DeviceManager;
4use crate::errors::WebexError;
5use crate::kms_client::{KmsClient, KmsResponseHandler};
6use crate::mercury_socket::{MercuryEvent, MercurySocket};
7use crate::message_decryptor::MessageDecryptor;
8use crate::types::{
9 Config, ConnectionStatus, DecryptedMessage, DeletedMessage, DeviceRegistration, HandlerStatus,
10 MercuryActivity,
11};
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::sync::{mpsc, Mutex};
15use tracing::{error, info, warn};
16
17#[derive(Debug, Clone)]
19pub enum HandlerEvent {
20 MessageCreated(DecryptedMessage),
22 MessageDeleted(DeletedMessage),
24 Connected,
26 Disconnected(String),
28 Reconnecting(u32),
30 Error(String),
32}
33
34pub struct WebexMessageHandler {
36 token: Arc<Mutex<String>>,
37 client: reqwest::Client,
38 device_manager: Arc<Mutex<DeviceManager>>,
39 mercury_socket: Arc<MercurySocket>,
40 kms_client: Arc<Mutex<Option<KmsClient>>>,
41 kms_response_handler: Arc<Mutex<Option<KmsResponseHandler>>>,
43 registration: Arc<Mutex<Option<DeviceRegistration>>>,
44 connected: Arc<Mutex<bool>>,
45 connecting: Arc<Mutex<bool>>,
46
47 #[allow(dead_code)]
48 config: Config,
49 event_tx: mpsc::UnboundedSender<HandlerEvent>,
50 event_rx: Arc<Mutex<Option<mpsc::UnboundedReceiver<HandlerEvent>>>>,
51}
52
53impl WebexMessageHandler {
54 pub fn new(config: Config) -> Result<Self, WebexError> {
56 if config.token.is_empty() {
57 return Err(WebexError::Internal(
58 "WebexMessageHandler requires a non-empty token string".into(),
59 ));
60 }
61
62 let client = config.client.clone().unwrap_or_else(|| reqwest::Client::new());
63
64 let mercury_socket = MercurySocket::new(
65 client.clone(),
66 Duration::from_secs_f64(config.ping_interval),
67 Duration::from_secs_f64(config.pong_timeout),
68 Duration::from_secs_f64(config.reconnect_backoff_max),
69 config.max_reconnect_attempts,
70 );
71
72 let (event_tx, event_rx) = mpsc::unbounded_channel();
73
74 Ok(Self {
75 token: Arc::new(Mutex::new(config.token.clone())),
76 client: client.clone(),
77 device_manager: Arc::new(Mutex::new(DeviceManager::new(client.clone()))),
78 mercury_socket: Arc::new(mercury_socket),
79 kms_client: Arc::new(Mutex::new(None)),
80 kms_response_handler: Arc::new(Mutex::new(None)),
81 registration: Arc::new(Mutex::new(None)),
82 connected: Arc::new(Mutex::new(false)),
83 connecting: Arc::new(Mutex::new(false)),
84 config,
85 event_tx,
86 event_rx: Arc::new(Mutex::new(Some(event_rx))),
87 })
88 }
89
90 pub async fn take_event_rx(&self) -> Option<mpsc::UnboundedReceiver<HandlerEvent>> {
92 self.event_rx.lock().await.take()
93 }
94
95 pub async fn connect(&self) -> Result<(), WebexError> {
97 {
98 let connecting = self.connecting.lock().await;
99 if *connecting {
100 return Err(WebexError::Internal("connect() already in progress".into()));
101 }
102 }
103 {
104 let connected = self.connected.lock().await;
105 if *connected {
106 return Err(WebexError::Internal(
107 "Already connected. Call disconnect() first, or use reconnect().".into(),
108 ));
109 }
110 }
111
112 info!("Connecting to Webex...");
113 *self.connecting.lock().await = true;
114
115 let result = self.connect_internal().await;
116
117 *self.connecting.lock().await = false;
118
119 match result {
120 Ok(()) => {
121 *self.connected.lock().await = true;
122 info!("Connected to Webex");
123 let _ = self.event_tx.send(HandlerEvent::Connected);
124 Ok(())
125 }
126 Err(e) => Err(e),
127 }
128 }
129
130 async fn connect_internal(&self) -> Result<(), WebexError> {
131 let token = self.token.lock().await.clone();
132
133 let reg = {
135 let mut dm = self.device_manager.lock().await;
136 dm.register(&token).await?
137 };
138 info!("Device registered");
139
140 let kms = KmsClient::new(
142 self.client.clone(),
143 &token,
144 ®.device_url,
145 ®.user_id,
146 ®.encryption_service_url,
147 );
148
149 let response_handler = kms.response_handler();
152 *self.kms_response_handler.lock().await = Some(response_handler);
153 *self.kms_client.lock().await = Some(kms);
154
155 self.mercury_socket
157 .connect(®.web_socket_url, &token)
158 .await?;
159 info!("Mercury connected");
160
161 self.start_mercury_event_loop().await;
163
164 {
166 let mut kms_guard = self.kms_client.lock().await;
167 if let Some(ref mut kms) = *kms_guard {
168 kms.initialize().await?;
169 }
170 }
171 info!("KMS initialized");
172
173 *self.registration.lock().await = Some(reg);
175
176 Ok(())
177 }
178
179 async fn start_mercury_event_loop(&self) {
181 let mut mercury_rx = match self.mercury_socket.take_event_rx().await {
182 Some(rx) => rx,
183 None => {
184 warn!("Mercury event receiver already taken");
185 return;
186 }
187 };
188
189 let kms_client = self.kms_client.clone();
190 let kms_response_handler = self.kms_response_handler.clone();
191 let event_tx = self.event_tx.clone();
192 let connected = self.connected.clone();
193 let registration = self.registration.clone();
194 let device_manager = self.device_manager.clone();
195 let token = self.token.clone();
196
197 tokio::spawn(async move {
198 while let Some(event) = mercury_rx.recv().await {
199 match event {
200 MercuryEvent::KmsResponse(data) => {
201 let handler_guard = kms_response_handler.lock().await;
204 if let Some(ref handler) = *handler_guard {
205 handler.handle_kms_message(&data).await;
206 }
207 }
208 MercuryEvent::Activity(activity) => {
209 let kms_client_clone = kms_client.clone();
212 let event_tx_clone = event_tx.clone();
213 tokio::spawn(async move {
214 let mut kms_guard = kms_client_clone.lock().await;
215 if let Some(ref mut kms) = *kms_guard {
216 Self::handle_activity_static(kms, &activity, &event_tx_clone).await;
217 } else {
218 warn!("Received activity but KMS client not initialized");
219 }
220 });
221 }
222 MercuryEvent::Connected => {
223 info!("Mercury reconnected, refreshing device and KMS");
224
225 let tok = token.lock().await.clone();
227 {
228 let reg_guard = registration.lock().await;
229 if reg_guard.is_some() {
230 let dm = device_manager.lock().await;
231 match dm.refresh(&tok).await {
232 Ok(new_reg) => {
233 drop(reg_guard);
234 *registration.lock().await = Some(new_reg);
235 }
236 Err(e) => {
237 warn!("Device refresh on reconnect failed: {e}");
238 }
239 }
240 }
241 }
242
243 {
245 let mut kms_guard = kms_client.lock().await;
246 if let Some(ref mut kms) = *kms_guard {
247 if let Err(e) = kms.initialize().await {
248 warn!("KMS re-init on reconnect failed: {e}");
249 }
250 }
251 }
252
253 *connected.lock().await = true;
254 let _ = event_tx.send(HandlerEvent::Connected);
255 }
256 MercuryEvent::Disconnected(reason) => {
257 *connected.lock().await = false;
258 let _ = event_tx.send(HandlerEvent::Disconnected(reason));
259 }
260 MercuryEvent::Reconnecting(attempt) => {
261 let _ = event_tx.send(HandlerEvent::Reconnecting(attempt));
262 }
263 MercuryEvent::Error(msg) => {
264 let _ = event_tx.send(HandlerEvent::Error(msg));
265 }
266 }
267 }
268 });
269 }
270
271 async fn handle_activity_static(
273 kms: &mut KmsClient,
274 activity: &MercuryActivity,
275 event_tx: &mpsc::UnboundedSender<HandlerEvent>,
276 ) {
277 if activity.verb == "post" && activity.object.object_type == "comment" {
279 let mut decryptor = MessageDecryptor::new(kms);
280 match decryptor.decrypt_activity(activity).await {
281 Ok(decrypted) => {
282 let msg = DecryptedMessage {
283 id: decrypted.object.id.clone(),
284 room_id: decrypted.target.id.clone(),
285 person_id: decrypted.actor.id.clone(),
286 person_email: decrypted
287 .actor
288 .email_address
289 .clone()
290 .unwrap_or_default(),
291 text: decrypted.object.display_name.clone().unwrap_or_default(),
292 html: decrypted.object.content.clone(),
293 created: decrypted.published.clone(),
294 room_type: infer_room_type(&decrypted),
295 raw: decrypted,
296 };
297 let _ = event_tx.send(HandlerEvent::MessageCreated(msg));
298 }
299 Err(e) => {
300 error!("Error decrypting activity: {e}");
301 let _ = event_tx.send(HandlerEvent::Error(e.to_string()));
302 }
303 }
304 return;
305 }
306
307 if activity.verb == "delete" && activity.object.object_type == "activity" {
309 let _ = event_tx.send(HandlerEvent::MessageDeleted(DeletedMessage {
310 message_id: activity.object.id.clone(),
311 room_id: activity.target.id.clone(),
312 person_id: activity.actor.id.clone(),
313 }));
314 }
315 }
316
317 pub async fn disconnect(&self) {
319 info!("Disconnecting from Webex...");
320 *self.connected.lock().await = false;
321
322 self.mercury_socket.disconnect().await;
323
324 let token = self.token.lock().await.clone();
325 {
326 let reg = self.registration.lock().await;
327 if reg.is_some() {
328 let mut dm = self.device_manager.lock().await;
329 if let Err(e) = dm.unregister(&token).await {
330 warn!("Failed to unregister device: {e}");
331 } else {
332 info!("Device unregistered");
333 }
334 }
335 }
336
337 *self.registration.lock().await = None;
338 *self.kms_client.lock().await = None;
339 *self.kms_response_handler.lock().await = None;
340 }
341
342 pub async fn reconnect(&self, new_token: &str) -> Result<(), WebexError> {
344 if new_token.is_empty() {
345 return Err(WebexError::Internal(
346 "reconnect() requires a non-empty token string".into(),
347 ));
348 }
349
350 info!("Reconnecting with new token...");
351 self.disconnect().await;
352
353 *self.token.lock().await = new_token.to_string();
354 self.connect().await
355 }
356
357 pub async fn connected(&self) -> bool {
359 let conn = *self.connected.lock().await;
360 conn && self.mercury_socket.connected().await
361 }
362
363 pub async fn status(&self) -> HandlerStatus {
365 let reconnect_attempt = self.mercury_socket.current_reconnect_attempts().await;
366 let ws_open = self.mercury_socket.connected().await;
367 let is_connected = *self.connected.lock().await;
368 let is_connecting = *self.connecting.lock().await;
369
370 let status = if is_connected && ws_open {
371 ConnectionStatus::Connected
372 } else if is_connecting {
373 ConnectionStatus::Connecting
374 } else if reconnect_attempt > 0 {
375 ConnectionStatus::Reconnecting
376 } else {
377 ConnectionStatus::Disconnected
378 };
379
380 HandlerStatus {
381 status,
382 web_socket_open: ws_open,
383 kms_initialized: self.kms_client.lock().await.is_some(),
384 device_registered: self.registration.lock().await.is_some(),
385 reconnect_attempt,
386 }
387 }
388}
389
390fn infer_room_type(activity: &MercuryActivity) -> Option<String> {
391 let tags = &activity.target.tags;
392 if tags.contains(&"ONE_ON_ONE".to_string()) {
393 return Some("direct".to_string());
394 }
395 if tags.contains(&"TEAM".to_string())
396 || tags.contains(&"LOCKED".to_string())
397 || tags.contains(&"GROUP".to_string())
398 {
399 return Some("group".to_string());
400 }
401 None
402}