Skip to main content

webex_message_handler/
handler.rs

1//! Main orchestrator: device registration, Mercury WebSocket, KMS, decryption.
2
3use crate::device_manager::DeviceManager;
4use crate::errors::WebexError;
5use crate::kms_client::{KmsClient, KmsResponseHandler};
6use crate::mercury_socket::{MercuryEvent, MercurySocket};
7use crate::message_decryptor::MessageDecryptor;
8use crate::types::{
9    Config, ConnectionStatus, DecryptedMessage, DeletedMessage, DeviceRegistration, FetchRequest,
10    FetchResponse, HandlerStatus, MercuryActivity, NetworkMode,
11};
12use std::future::Future;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::{mpsc, Mutex};
17use tracing::{error, info, warn};
18
19// Internal adapter type (alias for public type)
20type HttpDoFn = Arc<
21    dyn Fn(FetchRequest) -> Pin<Box<dyn Future<Output = Result<FetchResponse, Box<dyn std::error::Error + Send + Sync>>> + Send>>
22        + Send
23        + Sync,
24>;
25
26/// Events emitted by WebexMessageHandler.
27#[derive(Debug, Clone)]
28pub enum HandlerEvent {
29    /// A new message was received and decrypted.
30    MessageCreated(DecryptedMessage),
31    /// A message was deleted.
32    MessageDeleted(DeletedMessage),
33    /// Successfully connected (or reconnected).
34    Connected,
35    /// Disconnected with a reason string.
36    Disconnected(String),
37    /// Reconnecting (attempt number).
38    Reconnecting(u32),
39    /// An error occurred.
40    Error(String),
41}
42
43/// Create a native HTTP adapter that wraps reqwest::Client.
44fn create_native_http_adapter(client: reqwest::Client) -> HttpDoFn {
45    Arc::new(move |req: FetchRequest| {
46        let client = client.clone();
47        Box::pin(async move {
48            let mut request_builder = match req.method.as_str() {
49                "GET" => client.get(&req.url),
50                "POST" => client.post(&req.url),
51                "PUT" => client.put(&req.url),
52                "DELETE" => client.delete(&req.url),
53                _ => return Err(format!("Unsupported HTTP method: {}", req.method).into()),
54            };
55
56            for (key, value) in req.headers {
57                request_builder = request_builder.header(key, value);
58            }
59
60            if let Some(body) = req.body {
61                request_builder = request_builder.body(body);
62            }
63
64            let response = request_builder
65                .send()
66                .await
67                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
68
69            let status = response.status().as_u16();
70            let ok = response.status().is_success();
71            let body_bytes = response
72                .bytes()
73                .await
74                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?
75                .to_vec();
76
77            Ok(FetchResponse {
78                status,
79                ok,
80                body: body_bytes,
81            })
82        })
83    })
84}
85
86/// Receives and decrypts Webex messages over Mercury WebSocket.
87pub struct WebexMessageHandler {
88    token: Arc<Mutex<String>>,
89    http_do: HttpDoFn,
90    device_manager: Arc<Mutex<DeviceManager>>,
91    mercury_socket: Arc<MercurySocket>,
92    kms_client: Arc<Mutex<Option<KmsClient>>>,
93    /// Separate handle for resolving KMS responses without locking kms_client.
94    kms_response_handler: Arc<Mutex<Option<KmsResponseHandler>>>,
95    registration: Arc<Mutex<Option<DeviceRegistration>>>,
96    connected: Arc<Mutex<bool>>,
97    connecting: Arc<Mutex<bool>>,
98
99    #[allow(dead_code)]
100    config: Config,
101    event_tx: mpsc::UnboundedSender<HandlerEvent>,
102    event_rx: Arc<Mutex<Option<mpsc::UnboundedReceiver<HandlerEvent>>>>,
103}
104
105impl WebexMessageHandler {
106    /// Create a new WebexMessageHandler.
107    pub fn new(config: Config) -> Result<Self, WebexError> {
108        if config.token.is_empty() {
109            return Err(WebexError::Internal(
110                "WebexMessageHandler requires a non-empty token string".into(),
111            ));
112        }
113
114        // Validate networking mode configuration
115        match config.mode {
116            NetworkMode::Injected => {
117                if config.fetch.is_none() || config.web_socket_factory.is_none() {
118                    return Err(WebexError::Internal(
119                        "Injected mode requires both fetch and web_socket_factory".into(),
120                    ));
121                }
122                if config.client.is_some() {
123                    return Err(WebexError::Internal(
124                        "Cannot use native proxy parameters (client) in injected mode".into(),
125                    ));
126                }
127            }
128            NetworkMode::Native => {
129                if config.fetch.is_some() || config.web_socket_factory.is_some() {
130                    return Err(WebexError::Internal(
131                        "Cannot provide fetch/web_socket_factory in native mode — set mode to Injected".into(),
132                    ));
133                }
134            }
135        }
136
137        // Create adapters based on mode
138        let (http_do, ws_factory) = match config.mode {
139            NetworkMode::Native => {
140                let client = config.client.clone().unwrap_or_else(|| reqwest::Client::new());
141                let http_adapter = create_native_http_adapter(client.clone());
142                (http_adapter, None)
143            }
144            NetworkMode::Injected => {
145                let http_adapter = config.fetch.clone().unwrap();
146                let ws_factory = config.web_socket_factory.clone();
147                (http_adapter, ws_factory)
148            }
149        };
150
151        let mercury_socket = MercurySocket::new(
152            ws_factory,
153            Duration::from_secs_f64(config.ping_interval),
154            Duration::from_secs_f64(config.pong_timeout),
155            Duration::from_secs_f64(config.reconnect_backoff_max),
156            config.max_reconnect_attempts,
157        );
158
159        let (event_tx, event_rx) = mpsc::unbounded_channel();
160
161        Ok(Self {
162            token: Arc::new(Mutex::new(config.token.clone())),
163            http_do: http_do.clone(),
164            device_manager: Arc::new(Mutex::new(DeviceManager::new(http_do.clone()))),
165            mercury_socket: Arc::new(mercury_socket),
166            kms_client: Arc::new(Mutex::new(None)),
167            kms_response_handler: Arc::new(Mutex::new(None)),
168            registration: Arc::new(Mutex::new(None)),
169            connected: Arc::new(Mutex::new(false)),
170            connecting: Arc::new(Mutex::new(false)),
171            config,
172            event_tx,
173            event_rx: Arc::new(Mutex::new(Some(event_rx))),
174        })
175    }
176
177    /// Take the event receiver. Can only be called once.
178    pub async fn take_event_rx(&self) -> Option<mpsc::UnboundedReceiver<HandlerEvent>> {
179        self.event_rx.lock().await.take()
180    }
181
182    /// Connect to Webex (register device, connect Mercury, init KMS).
183    pub async fn connect(&self) -> Result<(), WebexError> {
184        {
185            let connecting = self.connecting.lock().await;
186            if *connecting {
187                return Err(WebexError::Internal("connect() already in progress".into()));
188            }
189        }
190        {
191            let connected = self.connected.lock().await;
192            if *connected {
193                return Err(WebexError::Internal(
194                    "Already connected. Call disconnect() first, or use reconnect().".into(),
195                ));
196            }
197        }
198
199        info!("Connecting to Webex...");
200        *self.connecting.lock().await = true;
201
202        let result = self.connect_internal().await;
203
204        *self.connecting.lock().await = false;
205
206        match result {
207            Ok(()) => {
208                *self.connected.lock().await = true;
209                info!("Connected to Webex");
210                let _ = self.event_tx.send(HandlerEvent::Connected);
211                Ok(())
212            }
213            Err(e) => Err(e),
214        }
215    }
216
217    async fn connect_internal(&self) -> Result<(), WebexError> {
218        let token = self.token.lock().await.clone();
219
220        // Step 1: Register device with WDM
221        let reg = {
222            let mut dm = self.device_manager.lock().await;
223            dm.register(&token).await?
224        };
225        info!("Device registered");
226
227        // Step 2: Create KMS client
228        let kms = KmsClient::new(
229            self.http_do.clone(),
230            &token,
231            &reg.device_url,
232            &reg.user_id,
233            &reg.encryption_service_url,
234        );
235
236        // Get the response handler BEFORE storing the KMS client so the
237        // event loop can resolve pending requests without locking kms_client.
238        let response_handler = kms.response_handler();
239        *self.kms_response_handler.lock().await = Some(response_handler);
240        *self.kms_client.lock().await = Some(kms);
241
242        // Step 3: Connect Mercury WebSocket (KMS responses arrive here)
243        self.mercury_socket
244            .connect(&reg.web_socket_url, &token)
245            .await?;
246        info!("Mercury connected");
247
248        // Step 4: Start Mercury event loop
249        self.start_mercury_event_loop().await;
250
251        // Step 5: Initialize KMS (ECDH handshake — response comes via Mercury)
252        {
253            let mut kms_guard = self.kms_client.lock().await;
254            if let Some(ref mut kms) = *kms_guard {
255                kms.initialize().await?;
256            }
257        }
258        info!("KMS initialized");
259
260        // Store registration
261        *self.registration.lock().await = Some(reg);
262
263        Ok(())
264    }
265
266    /// Start processing Mercury events in a background task.
267    async fn start_mercury_event_loop(&self) {
268        let mut mercury_rx = match self.mercury_socket.take_event_rx().await {
269            Some(rx) => rx,
270            None => {
271                warn!("Mercury event receiver already taken");
272                return;
273            }
274        };
275
276        let kms_client = self.kms_client.clone();
277        let kms_response_handler = self.kms_response_handler.clone();
278        let event_tx = self.event_tx.clone();
279        let connected = self.connected.clone();
280        let registration = self.registration.clone();
281        let device_manager = self.device_manager.clone();
282        let token = self.token.clone();
283
284        tokio::spawn(async move {
285            while let Some(event) = mercury_rx.recv().await {
286                match event {
287                    MercuryEvent::KmsResponse(data) => {
288                        // Use the separate response handler to avoid deadlock
289                        // with kms_client lock (held during initialize/get_key).
290                        let handler_guard = kms_response_handler.lock().await;
291                        if let Some(ref handler) = *handler_guard {
292                            handler.handle_kms_message(&data).await;
293                        }
294                    }
295                    MercuryEvent::Activity(activity) => {
296                        // Spawn in a separate task so the event loop can continue
297                        // processing KMS responses (needed for key retrieval during decryption).
298                        let kms_client_clone = kms_client.clone();
299                        let event_tx_clone = event_tx.clone();
300                        tokio::spawn(async move {
301                            let mut kms_guard = kms_client_clone.lock().await;
302                            if let Some(ref mut kms) = *kms_guard {
303                                Self::handle_activity_static(kms, &activity, &event_tx_clone).await;
304                            } else {
305                                warn!("Received activity but KMS client not initialized");
306                            }
307                        });
308                    }
309                    MercuryEvent::Connected => {
310                        info!("Mercury reconnected, refreshing device and KMS");
311
312                        // Refresh device
313                        let tok = token.lock().await.clone();
314                        {
315                            let reg_guard = registration.lock().await;
316                            if reg_guard.is_some() {
317                                let dm = device_manager.lock().await;
318                                match dm.refresh(&tok).await {
319                                    Ok(new_reg) => {
320                                        drop(reg_guard);
321                                        *registration.lock().await = Some(new_reg);
322                                    }
323                                    Err(e) => {
324                                        warn!("Device refresh on reconnect failed: {e}");
325                                    }
326                                }
327                            }
328                        }
329
330                        // Re-init KMS
331                        {
332                            let mut kms_guard = kms_client.lock().await;
333                            if let Some(ref mut kms) = *kms_guard {
334                                if let Err(e) = kms.initialize().await {
335                                    warn!("KMS re-init on reconnect failed: {e}");
336                                }
337                            }
338                        }
339
340                        *connected.lock().await = true;
341                        let _ = event_tx.send(HandlerEvent::Connected);
342                    }
343                    MercuryEvent::Disconnected(reason) => {
344                        *connected.lock().await = false;
345                        let _ = event_tx.send(HandlerEvent::Disconnected(reason));
346                    }
347                    MercuryEvent::Reconnecting(attempt) => {
348                        let _ = event_tx.send(HandlerEvent::Reconnecting(attempt));
349                    }
350                    MercuryEvent::Error(msg) => {
351                        let _ = event_tx.send(HandlerEvent::Error(msg));
352                    }
353                }
354            }
355        });
356    }
357
358    /// Handle a single activity (decrypt and route).
359    async fn handle_activity_static(
360        kms: &mut KmsClient,
361        activity: &MercuryActivity,
362        event_tx: &mpsc::UnboundedSender<HandlerEvent>,
363    ) {
364        // message:created — verb=post + objectType=comment
365        if activity.verb == "post" && activity.object.object_type == "comment" {
366            let mut decryptor = MessageDecryptor::new(kms);
367            match decryptor.decrypt_activity(activity).await {
368                Ok(decrypted) => {
369                    let msg = DecryptedMessage {
370                        id: decrypted.object.id.clone(),
371                        room_id: decrypted.target.id.clone(),
372                        person_id: decrypted.actor.id.clone(),
373                        person_email: decrypted
374                            .actor
375                            .email_address
376                            .clone()
377                            .unwrap_or_default(),
378                        text: decrypted.object.display_name.clone().unwrap_or_default(),
379                        html: decrypted.object.content.clone(),
380                        created: decrypted.published.clone(),
381                        room_type: infer_room_type(&decrypted),
382                        raw: decrypted,
383                    };
384                    let _ = event_tx.send(HandlerEvent::MessageCreated(msg));
385                }
386                Err(e) => {
387                    error!("Error decrypting activity: {e}");
388                    let _ = event_tx.send(HandlerEvent::Error(e.to_string()));
389                }
390            }
391            return;
392        }
393
394        // message:deleted — verb=delete + objectType=activity
395        if activity.verb == "delete" && activity.object.object_type == "activity" {
396            let _ = event_tx.send(HandlerEvent::MessageDeleted(DeletedMessage {
397                message_id: activity.object.id.clone(),
398                room_id: activity.target.id.clone(),
399                person_id: activity.actor.id.clone(),
400            }));
401        }
402    }
403
404    /// Disconnect from Webex.
405    pub async fn disconnect(&self) {
406        info!("Disconnecting from Webex...");
407        *self.connected.lock().await = false;
408
409        self.mercury_socket.disconnect().await;
410
411        let token = self.token.lock().await.clone();
412        {
413            let reg = self.registration.lock().await;
414            if reg.is_some() {
415                let mut dm = self.device_manager.lock().await;
416                if let Err(e) = dm.unregister(&token).await {
417                    warn!("Failed to unregister device: {e}");
418                } else {
419                    info!("Device unregistered");
420                }
421            }
422        }
423
424        *self.registration.lock().await = None;
425        *self.kms_client.lock().await = None;
426        *self.kms_response_handler.lock().await = None;
427    }
428
429    /// Update the access token and re-establish the connection.
430    pub async fn reconnect(&self, new_token: &str) -> Result<(), WebexError> {
431        if new_token.is_empty() {
432            return Err(WebexError::Internal(
433                "reconnect() requires a non-empty token string".into(),
434            ));
435        }
436
437        info!("Reconnecting with new token...");
438        self.disconnect().await;
439
440        *self.token.lock().await = new_token.to_string();
441        self.connect().await
442    }
443
444    /// Whether the handler is fully connected.
445    pub async fn connected(&self) -> bool {
446        let conn = *self.connected.lock().await;
447        conn && self.mercury_socket.connected().await
448    }
449
450    /// Returns a structured health check of all connection subsystems.
451    pub async fn status(&self) -> HandlerStatus {
452        let reconnect_attempt = self.mercury_socket.current_reconnect_attempts().await;
453        let ws_open = self.mercury_socket.connected().await;
454        let is_connected = *self.connected.lock().await;
455        let is_connecting = *self.connecting.lock().await;
456
457        let status = if is_connected && ws_open {
458            ConnectionStatus::Connected
459        } else if is_connecting {
460            ConnectionStatus::Connecting
461        } else if reconnect_attempt > 0 {
462            ConnectionStatus::Reconnecting
463        } else {
464            ConnectionStatus::Disconnected
465        };
466
467        HandlerStatus {
468            status,
469            web_socket_open: ws_open,
470            kms_initialized: self.kms_client.lock().await.is_some(),
471            device_registered: self.registration.lock().await.is_some(),
472            reconnect_attempt,
473        }
474    }
475}
476
477fn infer_room_type(activity: &MercuryActivity) -> Option<String> {
478    let tags = &activity.target.tags;
479    if tags.contains(&"ONE_ON_ONE".to_string()) {
480        return Some("direct".to_string());
481    }
482    if tags.contains(&"TEAM".to_string())
483        || tags.contains(&"LOCKED".to_string())
484        || tags.contains(&"GROUP".to_string())
485    {
486        return Some("group".to_string());
487    }
488    None
489}