1use crate::device_manager::DeviceManager;
4use crate::errors::WebexError;
5use crate::kms_client::{KmsClient, KmsResponseHandler};
6use crate::mercury_socket::{MercuryEvent, MercurySocket};
7use crate::message_decryptor::MessageDecryptor;
8use crate::types::{
9 Config, ConnectionStatus, DecryptedMessage, DeletedMessage, DeviceRegistration, FetchRequest,
10 FetchResponse, HandlerStatus, MercuryActivity, NetworkMode,
11};
12use std::future::Future;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::{mpsc, Mutex};
17use tracing::{error, info, warn};
18
19type HttpDoFn = Arc<
21 dyn Fn(FetchRequest) -> Pin<Box<dyn Future<Output = Result<FetchResponse, Box<dyn std::error::Error + Send + Sync>>> + Send>>
22 + Send
23 + Sync,
24>;
25
26#[derive(Debug, Clone)]
28pub enum HandlerEvent {
29 MessageCreated(DecryptedMessage),
31 MessageDeleted(DeletedMessage),
33 Connected,
35 Disconnected(String),
37 Reconnecting(u32),
39 Error(String),
41}
42
43fn create_native_http_adapter(client: reqwest::Client) -> HttpDoFn {
45 Arc::new(move |req: FetchRequest| {
46 let client = client.clone();
47 Box::pin(async move {
48 let mut request_builder = match req.method.as_str() {
49 "GET" => client.get(&req.url),
50 "POST" => client.post(&req.url),
51 "PUT" => client.put(&req.url),
52 "DELETE" => client.delete(&req.url),
53 _ => return Err(format!("Unsupported HTTP method: {}", req.method).into()),
54 };
55
56 for (key, value) in req.headers {
57 request_builder = request_builder.header(key, value);
58 }
59
60 if let Some(body) = req.body {
61 request_builder = request_builder.body(body);
62 }
63
64 let response = request_builder
65 .send()
66 .await
67 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
68
69 let status = response.status().as_u16();
70 let ok = response.status().is_success();
71 let body_bytes = response
72 .bytes()
73 .await
74 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?
75 .to_vec();
76
77 Ok(FetchResponse {
78 status,
79 ok,
80 body: body_bytes,
81 })
82 })
83 })
84}
85
86pub struct WebexMessageHandler {
88 token: Arc<Mutex<String>>,
89 http_do: HttpDoFn,
90 device_manager: Arc<Mutex<DeviceManager>>,
91 mercury_socket: Arc<MercurySocket>,
92 kms_client: Arc<Mutex<Option<KmsClient>>>,
93 kms_response_handler: Arc<Mutex<Option<KmsResponseHandler>>>,
95 registration: Arc<Mutex<Option<DeviceRegistration>>>,
96 connected: Arc<Mutex<bool>>,
97 connecting: Arc<Mutex<bool>>,
98
99 #[allow(dead_code)]
100 config: Config,
101 event_tx: mpsc::UnboundedSender<HandlerEvent>,
102 event_rx: Arc<Mutex<Option<mpsc::UnboundedReceiver<HandlerEvent>>>>,
103}
104
105impl WebexMessageHandler {
106 pub fn new(config: Config) -> Result<Self, WebexError> {
108 if config.token.is_empty() {
109 return Err(WebexError::Internal(
110 "WebexMessageHandler requires a non-empty token string".into(),
111 ));
112 }
113
114 match config.mode {
116 NetworkMode::Injected => {
117 if config.fetch.is_none() || config.web_socket_factory.is_none() {
118 return Err(WebexError::Internal(
119 "Injected mode requires both fetch and web_socket_factory".into(),
120 ));
121 }
122 if config.client.is_some() {
123 return Err(WebexError::Internal(
124 "Cannot use native proxy parameters (client) in injected mode".into(),
125 ));
126 }
127 }
128 NetworkMode::Native => {
129 if config.fetch.is_some() || config.web_socket_factory.is_some() {
130 return Err(WebexError::Internal(
131 "Cannot provide fetch/web_socket_factory in native mode — set mode to Injected".into(),
132 ));
133 }
134 }
135 }
136
137 let (http_do, ws_factory) = match config.mode {
139 NetworkMode::Native => {
140 let client = config.client.clone().unwrap_or_else(|| reqwest::Client::new());
141 let http_adapter = create_native_http_adapter(client.clone());
142 (http_adapter, None)
143 }
144 NetworkMode::Injected => {
145 let http_adapter = config.fetch.clone().unwrap();
146 let ws_factory = config.web_socket_factory.clone();
147 (http_adapter, ws_factory)
148 }
149 };
150
151 let mercury_socket = MercurySocket::new(
152 ws_factory,
153 Duration::from_secs_f64(config.ping_interval),
154 Duration::from_secs_f64(config.pong_timeout),
155 Duration::from_secs_f64(config.reconnect_backoff_max),
156 config.max_reconnect_attempts,
157 );
158
159 let (event_tx, event_rx) = mpsc::unbounded_channel();
160
161 Ok(Self {
162 token: Arc::new(Mutex::new(config.token.clone())),
163 http_do: http_do.clone(),
164 device_manager: Arc::new(Mutex::new(DeviceManager::new(http_do.clone()))),
165 mercury_socket: Arc::new(mercury_socket),
166 kms_client: Arc::new(Mutex::new(None)),
167 kms_response_handler: Arc::new(Mutex::new(None)),
168 registration: Arc::new(Mutex::new(None)),
169 connected: Arc::new(Mutex::new(false)),
170 connecting: Arc::new(Mutex::new(false)),
171 config,
172 event_tx,
173 event_rx: Arc::new(Mutex::new(Some(event_rx))),
174 })
175 }
176
177 pub async fn take_event_rx(&self) -> Option<mpsc::UnboundedReceiver<HandlerEvent>> {
179 self.event_rx.lock().await.take()
180 }
181
182 pub async fn connect(&self) -> Result<(), WebexError> {
184 {
185 let connecting = self.connecting.lock().await;
186 if *connecting {
187 return Err(WebexError::Internal("connect() already in progress".into()));
188 }
189 }
190 {
191 let connected = self.connected.lock().await;
192 if *connected {
193 return Err(WebexError::Internal(
194 "Already connected. Call disconnect() first, or use reconnect().".into(),
195 ));
196 }
197 }
198
199 info!("Connecting to Webex...");
200 *self.connecting.lock().await = true;
201
202 let result = self.connect_internal().await;
203
204 *self.connecting.lock().await = false;
205
206 match result {
207 Ok(()) => {
208 *self.connected.lock().await = true;
209 info!("Connected to Webex");
210 let _ = self.event_tx.send(HandlerEvent::Connected);
211 Ok(())
212 }
213 Err(e) => Err(e),
214 }
215 }
216
217 async fn connect_internal(&self) -> Result<(), WebexError> {
218 let token = self.token.lock().await.clone();
219
220 let reg = {
222 let mut dm = self.device_manager.lock().await;
223 dm.register(&token).await?
224 };
225 info!("Device registered");
226
227 let kms = KmsClient::new(
229 self.http_do.clone(),
230 &token,
231 ®.device_url,
232 ®.user_id,
233 ®.encryption_service_url,
234 );
235
236 let response_handler = kms.response_handler();
239 *self.kms_response_handler.lock().await = Some(response_handler);
240 *self.kms_client.lock().await = Some(kms);
241
242 self.mercury_socket
244 .connect(®.web_socket_url, &token)
245 .await?;
246 info!("Mercury connected");
247
248 self.start_mercury_event_loop().await;
250
251 {
253 let mut kms_guard = self.kms_client.lock().await;
254 if let Some(ref mut kms) = *kms_guard {
255 kms.initialize().await?;
256 }
257 }
258 info!("KMS initialized");
259
260 *self.registration.lock().await = Some(reg);
262
263 Ok(())
264 }
265
266 async fn start_mercury_event_loop(&self) {
268 let mut mercury_rx = match self.mercury_socket.take_event_rx().await {
269 Some(rx) => rx,
270 None => {
271 warn!("Mercury event receiver already taken");
272 return;
273 }
274 };
275
276 let kms_client = self.kms_client.clone();
277 let kms_response_handler = self.kms_response_handler.clone();
278 let event_tx = self.event_tx.clone();
279 let connected = self.connected.clone();
280 let registration = self.registration.clone();
281 let device_manager = self.device_manager.clone();
282 let token = self.token.clone();
283
284 tokio::spawn(async move {
285 while let Some(event) = mercury_rx.recv().await {
286 match event {
287 MercuryEvent::KmsResponse(data) => {
288 let handler_guard = kms_response_handler.lock().await;
291 if let Some(ref handler) = *handler_guard {
292 handler.handle_kms_message(&data).await;
293 }
294 }
295 MercuryEvent::Activity(activity) => {
296 let kms_client_clone = kms_client.clone();
299 let event_tx_clone = event_tx.clone();
300 tokio::spawn(async move {
301 let mut kms_guard = kms_client_clone.lock().await;
302 if let Some(ref mut kms) = *kms_guard {
303 Self::handle_activity_static(kms, &activity, &event_tx_clone).await;
304 } else {
305 warn!("Received activity but KMS client not initialized");
306 }
307 });
308 }
309 MercuryEvent::Connected => {
310 info!("Mercury reconnected, refreshing device and KMS");
311
312 let tok = token.lock().await.clone();
314 {
315 let reg_guard = registration.lock().await;
316 if reg_guard.is_some() {
317 let dm = device_manager.lock().await;
318 match dm.refresh(&tok).await {
319 Ok(new_reg) => {
320 drop(reg_guard);
321 *registration.lock().await = Some(new_reg);
322 }
323 Err(e) => {
324 warn!("Device refresh on reconnect failed: {e}");
325 }
326 }
327 }
328 }
329
330 {
332 let mut kms_guard = kms_client.lock().await;
333 if let Some(ref mut kms) = *kms_guard {
334 if let Err(e) = kms.initialize().await {
335 warn!("KMS re-init on reconnect failed: {e}");
336 }
337 }
338 }
339
340 *connected.lock().await = true;
341 let _ = event_tx.send(HandlerEvent::Connected);
342 }
343 MercuryEvent::Disconnected(reason) => {
344 *connected.lock().await = false;
345 let _ = event_tx.send(HandlerEvent::Disconnected(reason));
346 }
347 MercuryEvent::Reconnecting(attempt) => {
348 let _ = event_tx.send(HandlerEvent::Reconnecting(attempt));
349 }
350 MercuryEvent::Error(msg) => {
351 let _ = event_tx.send(HandlerEvent::Error(msg));
352 }
353 }
354 }
355 });
356 }
357
358 async fn handle_activity_static(
360 kms: &mut KmsClient,
361 activity: &MercuryActivity,
362 event_tx: &mpsc::UnboundedSender<HandlerEvent>,
363 ) {
364 if activity.verb == "post" && activity.object.object_type == "comment" {
366 let mut decryptor = MessageDecryptor::new(kms);
367 match decryptor.decrypt_activity(activity).await {
368 Ok(decrypted) => {
369 let msg = DecryptedMessage {
370 id: decrypted.object.id.clone(),
371 room_id: decrypted.target.id.clone(),
372 person_id: decrypted.actor.id.clone(),
373 person_email: decrypted
374 .actor
375 .email_address
376 .clone()
377 .unwrap_or_default(),
378 text: decrypted.object.display_name.clone().unwrap_or_default(),
379 html: decrypted.object.content.clone(),
380 created: decrypted.published.clone(),
381 room_type: infer_room_type(&decrypted),
382 raw: decrypted,
383 };
384 let _ = event_tx.send(HandlerEvent::MessageCreated(msg));
385 }
386 Err(e) => {
387 error!("Error decrypting activity: {e}");
388 let _ = event_tx.send(HandlerEvent::Error(e.to_string()));
389 }
390 }
391 return;
392 }
393
394 if activity.verb == "delete" && activity.object.object_type == "activity" {
396 let _ = event_tx.send(HandlerEvent::MessageDeleted(DeletedMessage {
397 message_id: activity.object.id.clone(),
398 room_id: activity.target.id.clone(),
399 person_id: activity.actor.id.clone(),
400 }));
401 }
402 }
403
404 pub async fn disconnect(&self) {
406 info!("Disconnecting from Webex...");
407 *self.connected.lock().await = false;
408
409 self.mercury_socket.disconnect().await;
410
411 let token = self.token.lock().await.clone();
412 {
413 let reg = self.registration.lock().await;
414 if reg.is_some() {
415 let mut dm = self.device_manager.lock().await;
416 if let Err(e) = dm.unregister(&token).await {
417 warn!("Failed to unregister device: {e}");
418 } else {
419 info!("Device unregistered");
420 }
421 }
422 }
423
424 *self.registration.lock().await = None;
425 *self.kms_client.lock().await = None;
426 *self.kms_response_handler.lock().await = None;
427 }
428
429 pub async fn reconnect(&self, new_token: &str) -> Result<(), WebexError> {
431 if new_token.is_empty() {
432 return Err(WebexError::Internal(
433 "reconnect() requires a non-empty token string".into(),
434 ));
435 }
436
437 info!("Reconnecting with new token...");
438 self.disconnect().await;
439
440 *self.token.lock().await = new_token.to_string();
441 self.connect().await
442 }
443
444 pub async fn connected(&self) -> bool {
446 let conn = *self.connected.lock().await;
447 conn && self.mercury_socket.connected().await
448 }
449
450 pub async fn status(&self) -> HandlerStatus {
452 let reconnect_attempt = self.mercury_socket.current_reconnect_attempts().await;
453 let ws_open = self.mercury_socket.connected().await;
454 let is_connected = *self.connected.lock().await;
455 let is_connecting = *self.connecting.lock().await;
456
457 let status = if is_connected && ws_open {
458 ConnectionStatus::Connected
459 } else if is_connecting {
460 ConnectionStatus::Connecting
461 } else if reconnect_attempt > 0 {
462 ConnectionStatus::Reconnecting
463 } else {
464 ConnectionStatus::Disconnected
465 };
466
467 HandlerStatus {
468 status,
469 web_socket_open: ws_open,
470 kms_initialized: self.kms_client.lock().await.is_some(),
471 device_registered: self.registration.lock().await.is_some(),
472 reconnect_attempt,
473 }
474 }
475}
476
477fn infer_room_type(activity: &MercuryActivity) -> Option<String> {
478 let tags = &activity.target.tags;
479 if tags.contains(&"ONE_ON_ONE".to_string()) {
480 return Some("direct".to_string());
481 }
482 if tags.contains(&"TEAM".to_string())
483 || tags.contains(&"LOCKED".to_string())
484 || tags.contains(&"GROUP".to_string())
485 {
486 return Some("group".to_string());
487 }
488 None
489}