1use crate::device_manager::DeviceManager;
4use crate::errors::WebexError;
5use crate::kms_client::{KmsClient, KmsResponseHandler};
6use crate::mercury_socket::{MercuryEvent, MercurySocket};
7use crate::message_decryptor::MessageDecryptor;
8use crate::types::{
9 Config, ConnectionStatus, DecryptedMessage, DeletedMessage, DeviceRegistration, FetchRequest,
10 FetchResponse, HandlerStatus, MercuryActivity, NetworkMode,
11};
12use std::future::Future;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::{mpsc, Mutex};
17use tracing::{error, info, warn};
18
19type HttpDoFn = Arc<
21 dyn Fn(FetchRequest) -> Pin<Box<dyn Future<Output = Result<FetchResponse, Box<dyn std::error::Error + Send + Sync>>> + Send>>
22 + Send
23 + Sync,
24>;
25
26#[derive(Debug, Clone)]
28pub enum HandlerEvent {
29 MessageCreated(DecryptedMessage),
31 MessageDeleted(DeletedMessage),
33 Connected,
35 Disconnected(String),
37 Reconnecting(u32),
39 Error(String),
41}
42
43fn create_native_http_adapter(client: reqwest::Client) -> HttpDoFn {
45 Arc::new(move |req: FetchRequest| {
46 let client = client.clone();
47 Box::pin(async move {
48 let mut request_builder = match req.method.as_str() {
49 "GET" => client.get(&req.url),
50 "POST" => client.post(&req.url),
51 "PUT" => client.put(&req.url),
52 "DELETE" => client.delete(&req.url),
53 _ => return Err(format!("Unsupported HTTP method: {}", req.method).into()),
54 };
55
56 for (key, value) in req.headers {
57 request_builder = request_builder.header(key, value);
58 }
59
60 if let Some(body) = req.body {
61 request_builder = request_builder.body(body);
62 }
63
64 let response = request_builder
65 .send()
66 .await
67 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
68
69 let status = response.status().as_u16();
70 let ok = response.status().is_success();
71 let body_bytes = response
72 .bytes()
73 .await
74 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?
75 .to_vec();
76
77 Ok(FetchResponse {
78 status,
79 ok,
80 body: body_bytes,
81 })
82 })
83 })
84}
85
86pub struct WebexMessageHandler {
88 token: Arc<Mutex<String>>,
89 http_do: HttpDoFn,
90 device_manager: Arc<Mutex<DeviceManager>>,
91 mercury_socket: Arc<MercurySocket>,
92 kms_client: Arc<Mutex<Option<KmsClient>>>,
93 kms_response_handler: Arc<Mutex<Option<KmsResponseHandler>>>,
95 registration: Arc<Mutex<Option<DeviceRegistration>>>,
96 connected: Arc<Mutex<bool>>,
97 connecting: Arc<Mutex<bool>>,
98
99 #[allow(dead_code)]
100 config: Config,
101 event_tx: mpsc::UnboundedSender<HandlerEvent>,
102 event_rx: Arc<Mutex<Option<mpsc::UnboundedReceiver<HandlerEvent>>>>,
103}
104
105impl WebexMessageHandler {
106 pub fn new(config: Config) -> Result<Self, WebexError> {
108 if config.token.is_empty() {
109 return Err(WebexError::Internal(
110 "WebexMessageHandler requires a non-empty token string".into(),
111 ));
112 }
113
114 match config.mode {
116 NetworkMode::Injected => {
117 if config.fetch.is_none() || config.web_socket_factory.is_none() {
118 return Err(WebexError::Internal(
119 "Injected mode requires both fetch and web_socket_factory".into(),
120 ));
121 }
122 if config.client.is_some() {
123 return Err(WebexError::Internal(
124 "Cannot use native proxy parameters (client) in injected mode".into(),
125 ));
126 }
127 }
128 NetworkMode::Native => {
129 if config.fetch.is_some() || config.web_socket_factory.is_some() {
130 return Err(WebexError::Internal(
131 "Cannot provide fetch/web_socket_factory in native mode — set mode to Injected".into(),
132 ));
133 }
134 }
135 }
136
137 let (http_do, ws_factory) = match config.mode {
139 NetworkMode::Native => {
140 let client = config.client.clone().unwrap_or_else(|| reqwest::Client::new());
141 let http_adapter = create_native_http_adapter(client.clone());
142 (http_adapter, None)
143 }
144 NetworkMode::Injected => {
145 let http_adapter = config.fetch.clone().unwrap();
146 let ws_factory = config.web_socket_factory.clone();
147 (http_adapter, ws_factory)
148 }
149 };
150
151 let client = config.client.clone().unwrap_or_else(|| reqwest::Client::new());
152
153 let mercury_socket = MercurySocket::new(
154 ws_factory,
155 Duration::from_secs_f64(config.ping_interval),
156 Duration::from_secs_f64(config.pong_timeout),
157 Duration::from_secs_f64(config.reconnect_backoff_max),
158 config.max_reconnect_attempts,
159 );
160
161 let (event_tx, event_rx) = mpsc::unbounded_channel();
162
163 Ok(Self {
164 token: Arc::new(Mutex::new(config.token.clone())),
165 http_do: http_do.clone(),
166 device_manager: Arc::new(Mutex::new(DeviceManager::new(http_do.clone()))),
167 mercury_socket: Arc::new(mercury_socket),
168 kms_client: Arc::new(Mutex::new(None)),
169 kms_response_handler: Arc::new(Mutex::new(None)),
170 registration: Arc::new(Mutex::new(None)),
171 connected: Arc::new(Mutex::new(false)),
172 connecting: Arc::new(Mutex::new(false)),
173 config,
174 event_tx,
175 event_rx: Arc::new(Mutex::new(Some(event_rx))),
176 })
177 }
178
179 pub async fn take_event_rx(&self) -> Option<mpsc::UnboundedReceiver<HandlerEvent>> {
181 self.event_rx.lock().await.take()
182 }
183
184 pub async fn connect(&self) -> Result<(), WebexError> {
186 {
187 let connecting = self.connecting.lock().await;
188 if *connecting {
189 return Err(WebexError::Internal("connect() already in progress".into()));
190 }
191 }
192 {
193 let connected = self.connected.lock().await;
194 if *connected {
195 return Err(WebexError::Internal(
196 "Already connected. Call disconnect() first, or use reconnect().".into(),
197 ));
198 }
199 }
200
201 info!("Connecting to Webex...");
202 *self.connecting.lock().await = true;
203
204 let result = self.connect_internal().await;
205
206 *self.connecting.lock().await = false;
207
208 match result {
209 Ok(()) => {
210 *self.connected.lock().await = true;
211 info!("Connected to Webex");
212 let _ = self.event_tx.send(HandlerEvent::Connected);
213 Ok(())
214 }
215 Err(e) => Err(e),
216 }
217 }
218
219 async fn connect_internal(&self) -> Result<(), WebexError> {
220 let token = self.token.lock().await.clone();
221
222 let reg = {
224 let mut dm = self.device_manager.lock().await;
225 dm.register(&token).await?
226 };
227 info!("Device registered");
228
229 let kms = KmsClient::new(
231 self.http_do.clone(),
232 &token,
233 ®.device_url,
234 ®.user_id,
235 ®.encryption_service_url,
236 );
237
238 let response_handler = kms.response_handler();
241 *self.kms_response_handler.lock().await = Some(response_handler);
242 *self.kms_client.lock().await = Some(kms);
243
244 self.mercury_socket
246 .connect(®.web_socket_url, &token)
247 .await?;
248 info!("Mercury connected");
249
250 self.start_mercury_event_loop().await;
252
253 {
255 let mut kms_guard = self.kms_client.lock().await;
256 if let Some(ref mut kms) = *kms_guard {
257 kms.initialize().await?;
258 }
259 }
260 info!("KMS initialized");
261
262 *self.registration.lock().await = Some(reg);
264
265 Ok(())
266 }
267
268 async fn start_mercury_event_loop(&self) {
270 let mut mercury_rx = match self.mercury_socket.take_event_rx().await {
271 Some(rx) => rx,
272 None => {
273 warn!("Mercury event receiver already taken");
274 return;
275 }
276 };
277
278 let kms_client = self.kms_client.clone();
279 let kms_response_handler = self.kms_response_handler.clone();
280 let event_tx = self.event_tx.clone();
281 let connected = self.connected.clone();
282 let registration = self.registration.clone();
283 let device_manager = self.device_manager.clone();
284 let token = self.token.clone();
285
286 tokio::spawn(async move {
287 while let Some(event) = mercury_rx.recv().await {
288 match event {
289 MercuryEvent::KmsResponse(data) => {
290 let handler_guard = kms_response_handler.lock().await;
293 if let Some(ref handler) = *handler_guard {
294 handler.handle_kms_message(&data).await;
295 }
296 }
297 MercuryEvent::Activity(activity) => {
298 let kms_client_clone = kms_client.clone();
301 let event_tx_clone = event_tx.clone();
302 tokio::spawn(async move {
303 let mut kms_guard = kms_client_clone.lock().await;
304 if let Some(ref mut kms) = *kms_guard {
305 Self::handle_activity_static(kms, &activity, &event_tx_clone).await;
306 } else {
307 warn!("Received activity but KMS client not initialized");
308 }
309 });
310 }
311 MercuryEvent::Connected => {
312 info!("Mercury reconnected, refreshing device and KMS");
313
314 let tok = token.lock().await.clone();
316 {
317 let reg_guard = registration.lock().await;
318 if reg_guard.is_some() {
319 let dm = device_manager.lock().await;
320 match dm.refresh(&tok).await {
321 Ok(new_reg) => {
322 drop(reg_guard);
323 *registration.lock().await = Some(new_reg);
324 }
325 Err(e) => {
326 warn!("Device refresh on reconnect failed: {e}");
327 }
328 }
329 }
330 }
331
332 {
334 let mut kms_guard = kms_client.lock().await;
335 if let Some(ref mut kms) = *kms_guard {
336 if let Err(e) = kms.initialize().await {
337 warn!("KMS re-init on reconnect failed: {e}");
338 }
339 }
340 }
341
342 *connected.lock().await = true;
343 let _ = event_tx.send(HandlerEvent::Connected);
344 }
345 MercuryEvent::Disconnected(reason) => {
346 *connected.lock().await = false;
347 let _ = event_tx.send(HandlerEvent::Disconnected(reason));
348 }
349 MercuryEvent::Reconnecting(attempt) => {
350 let _ = event_tx.send(HandlerEvent::Reconnecting(attempt));
351 }
352 MercuryEvent::Error(msg) => {
353 let _ = event_tx.send(HandlerEvent::Error(msg));
354 }
355 }
356 }
357 });
358 }
359
360 async fn handle_activity_static(
362 kms: &mut KmsClient,
363 activity: &MercuryActivity,
364 event_tx: &mpsc::UnboundedSender<HandlerEvent>,
365 ) {
366 if activity.verb == "post" && activity.object.object_type == "comment" {
368 let mut decryptor = MessageDecryptor::new(kms);
369 match decryptor.decrypt_activity(activity).await {
370 Ok(decrypted) => {
371 let msg = DecryptedMessage {
372 id: decrypted.object.id.clone(),
373 room_id: decrypted.target.id.clone(),
374 person_id: decrypted.actor.id.clone(),
375 person_email: decrypted
376 .actor
377 .email_address
378 .clone()
379 .unwrap_or_default(),
380 text: decrypted.object.display_name.clone().unwrap_or_default(),
381 html: decrypted.object.content.clone(),
382 created: decrypted.published.clone(),
383 room_type: infer_room_type(&decrypted),
384 raw: decrypted,
385 };
386 let _ = event_tx.send(HandlerEvent::MessageCreated(msg));
387 }
388 Err(e) => {
389 error!("Error decrypting activity: {e}");
390 let _ = event_tx.send(HandlerEvent::Error(e.to_string()));
391 }
392 }
393 return;
394 }
395
396 if activity.verb == "delete" && activity.object.object_type == "activity" {
398 let _ = event_tx.send(HandlerEvent::MessageDeleted(DeletedMessage {
399 message_id: activity.object.id.clone(),
400 room_id: activity.target.id.clone(),
401 person_id: activity.actor.id.clone(),
402 }));
403 }
404 }
405
406 pub async fn disconnect(&self) {
408 info!("Disconnecting from Webex...");
409 *self.connected.lock().await = false;
410
411 self.mercury_socket.disconnect().await;
412
413 let token = self.token.lock().await.clone();
414 {
415 let reg = self.registration.lock().await;
416 if reg.is_some() {
417 let mut dm = self.device_manager.lock().await;
418 if let Err(e) = dm.unregister(&token).await {
419 warn!("Failed to unregister device: {e}");
420 } else {
421 info!("Device unregistered");
422 }
423 }
424 }
425
426 *self.registration.lock().await = None;
427 *self.kms_client.lock().await = None;
428 *self.kms_response_handler.lock().await = None;
429 }
430
431 pub async fn reconnect(&self, new_token: &str) -> Result<(), WebexError> {
433 if new_token.is_empty() {
434 return Err(WebexError::Internal(
435 "reconnect() requires a non-empty token string".into(),
436 ));
437 }
438
439 info!("Reconnecting with new token...");
440 self.disconnect().await;
441
442 *self.token.lock().await = new_token.to_string();
443 self.connect().await
444 }
445
446 pub async fn connected(&self) -> bool {
448 let conn = *self.connected.lock().await;
449 conn && self.mercury_socket.connected().await
450 }
451
452 pub async fn status(&self) -> HandlerStatus {
454 let reconnect_attempt = self.mercury_socket.current_reconnect_attempts().await;
455 let ws_open = self.mercury_socket.connected().await;
456 let is_connected = *self.connected.lock().await;
457 let is_connecting = *self.connecting.lock().await;
458
459 let status = if is_connected && ws_open {
460 ConnectionStatus::Connected
461 } else if is_connecting {
462 ConnectionStatus::Connecting
463 } else if reconnect_attempt > 0 {
464 ConnectionStatus::Reconnecting
465 } else {
466 ConnectionStatus::Disconnected
467 };
468
469 HandlerStatus {
470 status,
471 web_socket_open: ws_open,
472 kms_initialized: self.kms_client.lock().await.is_some(),
473 device_registered: self.registration.lock().await.is_some(),
474 reconnect_attempt,
475 }
476 }
477}
478
479fn infer_room_type(activity: &MercuryActivity) -> Option<String> {
480 let tags = &activity.target.tags;
481 if tags.contains(&"ONE_ON_ONE".to_string()) {
482 return Some("direct".to_string());
483 }
484 if tags.contains(&"TEAM".to_string())
485 || tags.contains(&"LOCKED".to_string())
486 || tags.contains(&"GROUP".to_string())
487 {
488 return Some("group".to_string());
489 }
490 None
491}