Skip to main content

webex_message_handler/
handler.rs

1//! Main orchestrator: device registration, Mercury WebSocket, KMS, decryption.
2
3use crate::device_manager::DeviceManager;
4use crate::errors::WebexError;
5use crate::kms_client::{KmsClient, KmsResponseHandler};
6use crate::mercury_socket::{MercuryEvent, MercurySocket};
7use crate::message_decryptor::MessageDecryptor;
8use crate::types::{
9    Config, ConnectionStatus, DecryptedMessage, DeletedMessage, DeviceRegistration, FetchRequest,
10    FetchResponse, HandlerStatus, MercuryActivity, NetworkMode,
11};
12use std::future::Future;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::{mpsc, Mutex};
17use tracing::{error, info, warn};
18
19// Internal adapter type (alias for public type)
20type HttpDoFn = Arc<
21    dyn Fn(FetchRequest) -> Pin<Box<dyn Future<Output = Result<FetchResponse, Box<dyn std::error::Error + Send + Sync>>> + Send>>
22        + Send
23        + Sync,
24>;
25
26/// Events emitted by WebexMessageHandler.
27#[derive(Debug, Clone)]
28pub enum HandlerEvent {
29    /// A new message was received and decrypted.
30    MessageCreated(DecryptedMessage),
31    /// A message was deleted.
32    MessageDeleted(DeletedMessage),
33    /// Successfully connected (or reconnected).
34    Connected,
35    /// Disconnected with a reason string.
36    Disconnected(String),
37    /// Reconnecting (attempt number).
38    Reconnecting(u32),
39    /// An error occurred.
40    Error(String),
41}
42
43/// Create a native HTTP adapter that wraps reqwest::Client.
44fn create_native_http_adapter(client: reqwest::Client) -> HttpDoFn {
45    Arc::new(move |req: FetchRequest| {
46        let client = client.clone();
47        Box::pin(async move {
48            let mut request_builder = match req.method.as_str() {
49                "GET" => client.get(&req.url),
50                "POST" => client.post(&req.url),
51                "PUT" => client.put(&req.url),
52                "DELETE" => client.delete(&req.url),
53                _ => return Err(format!("Unsupported HTTP method: {}", req.method).into()),
54            };
55
56            for (key, value) in req.headers {
57                request_builder = request_builder.header(key, value);
58            }
59
60            if let Some(body) = req.body {
61                request_builder = request_builder.body(body);
62            }
63
64            let response = request_builder
65                .send()
66                .await
67                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
68
69            let status = response.status().as_u16();
70            let ok = response.status().is_success();
71            let body_bytes = response
72                .bytes()
73                .await
74                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?
75                .to_vec();
76
77            Ok(FetchResponse {
78                status,
79                ok,
80                body: body_bytes,
81            })
82        })
83    })
84}
85
86/// Receives and decrypts Webex messages over Mercury WebSocket.
87pub struct WebexMessageHandler {
88    token: Arc<Mutex<String>>,
89    http_do: HttpDoFn,
90    device_manager: Arc<Mutex<DeviceManager>>,
91    mercury_socket: Arc<MercurySocket>,
92    kms_client: Arc<Mutex<Option<KmsClient>>>,
93    /// Separate handle for resolving KMS responses without locking kms_client.
94    kms_response_handler: Arc<Mutex<Option<KmsResponseHandler>>>,
95    registration: Arc<Mutex<Option<DeviceRegistration>>>,
96    connected: Arc<Mutex<bool>>,
97    connecting: Arc<Mutex<bool>>,
98
99    #[allow(dead_code)]
100    config: Config,
101    event_tx: mpsc::UnboundedSender<HandlerEvent>,
102    event_rx: Arc<Mutex<Option<mpsc::UnboundedReceiver<HandlerEvent>>>>,
103}
104
105impl WebexMessageHandler {
106    /// Create a new WebexMessageHandler.
107    pub fn new(config: Config) -> Result<Self, WebexError> {
108        if config.token.is_empty() {
109            return Err(WebexError::Internal(
110                "WebexMessageHandler requires a non-empty token string".into(),
111            ));
112        }
113
114        // Validate networking mode configuration
115        match config.mode {
116            NetworkMode::Injected => {
117                if config.fetch.is_none() || config.web_socket_factory.is_none() {
118                    return Err(WebexError::Internal(
119                        "Injected mode requires both fetch and web_socket_factory".into(),
120                    ));
121                }
122                if config.client.is_some() {
123                    return Err(WebexError::Internal(
124                        "Cannot use native proxy parameters (client) in injected mode".into(),
125                    ));
126                }
127            }
128            NetworkMode::Native => {
129                if config.fetch.is_some() || config.web_socket_factory.is_some() {
130                    return Err(WebexError::Internal(
131                        "Cannot provide fetch/web_socket_factory in native mode — set mode to Injected".into(),
132                    ));
133                }
134            }
135        }
136
137        // Create adapters based on mode
138        let (http_do, ws_factory) = match config.mode {
139            NetworkMode::Native => {
140                let client = config.client.clone().unwrap_or_else(|| reqwest::Client::new());
141                let http_adapter = create_native_http_adapter(client.clone());
142                (http_adapter, None)
143            }
144            NetworkMode::Injected => {
145                let http_adapter = config.fetch.clone().unwrap();
146                let ws_factory = config.web_socket_factory.clone();
147                (http_adapter, ws_factory)
148            }
149        };
150
151        let client = config.client.clone().unwrap_or_else(|| reqwest::Client::new());
152
153        let mercury_socket = MercurySocket::new(
154            ws_factory,
155            Duration::from_secs_f64(config.ping_interval),
156            Duration::from_secs_f64(config.pong_timeout),
157            Duration::from_secs_f64(config.reconnect_backoff_max),
158            config.max_reconnect_attempts,
159        );
160
161        let (event_tx, event_rx) = mpsc::unbounded_channel();
162
163        Ok(Self {
164            token: Arc::new(Mutex::new(config.token.clone())),
165            http_do: http_do.clone(),
166            device_manager: Arc::new(Mutex::new(DeviceManager::new(http_do.clone()))),
167            mercury_socket: Arc::new(mercury_socket),
168            kms_client: Arc::new(Mutex::new(None)),
169            kms_response_handler: Arc::new(Mutex::new(None)),
170            registration: Arc::new(Mutex::new(None)),
171            connected: Arc::new(Mutex::new(false)),
172            connecting: Arc::new(Mutex::new(false)),
173            config,
174            event_tx,
175            event_rx: Arc::new(Mutex::new(Some(event_rx))),
176        })
177    }
178
179    /// Take the event receiver. Can only be called once.
180    pub async fn take_event_rx(&self) -> Option<mpsc::UnboundedReceiver<HandlerEvent>> {
181        self.event_rx.lock().await.take()
182    }
183
184    /// Connect to Webex (register device, connect Mercury, init KMS).
185    pub async fn connect(&self) -> Result<(), WebexError> {
186        {
187            let connecting = self.connecting.lock().await;
188            if *connecting {
189                return Err(WebexError::Internal("connect() already in progress".into()));
190            }
191        }
192        {
193            let connected = self.connected.lock().await;
194            if *connected {
195                return Err(WebexError::Internal(
196                    "Already connected. Call disconnect() first, or use reconnect().".into(),
197                ));
198            }
199        }
200
201        info!("Connecting to Webex...");
202        *self.connecting.lock().await = true;
203
204        let result = self.connect_internal().await;
205
206        *self.connecting.lock().await = false;
207
208        match result {
209            Ok(()) => {
210                *self.connected.lock().await = true;
211                info!("Connected to Webex");
212                let _ = self.event_tx.send(HandlerEvent::Connected);
213                Ok(())
214            }
215            Err(e) => Err(e),
216        }
217    }
218
219    async fn connect_internal(&self) -> Result<(), WebexError> {
220        let token = self.token.lock().await.clone();
221
222        // Step 1: Register device with WDM
223        let reg = {
224            let mut dm = self.device_manager.lock().await;
225            dm.register(&token).await?
226        };
227        info!("Device registered");
228
229        // Step 2: Create KMS client
230        let kms = KmsClient::new(
231            self.http_do.clone(),
232            &token,
233            &reg.device_url,
234            &reg.user_id,
235            &reg.encryption_service_url,
236        );
237
238        // Get the response handler BEFORE storing the KMS client so the
239        // event loop can resolve pending requests without locking kms_client.
240        let response_handler = kms.response_handler();
241        *self.kms_response_handler.lock().await = Some(response_handler);
242        *self.kms_client.lock().await = Some(kms);
243
244        // Step 3: Connect Mercury WebSocket (KMS responses arrive here)
245        self.mercury_socket
246            .connect(&reg.web_socket_url, &token)
247            .await?;
248        info!("Mercury connected");
249
250        // Step 4: Start Mercury event loop
251        self.start_mercury_event_loop().await;
252
253        // Step 5: Initialize KMS (ECDH handshake — response comes via Mercury)
254        {
255            let mut kms_guard = self.kms_client.lock().await;
256            if let Some(ref mut kms) = *kms_guard {
257                kms.initialize().await?;
258            }
259        }
260        info!("KMS initialized");
261
262        // Store registration
263        *self.registration.lock().await = Some(reg);
264
265        Ok(())
266    }
267
268    /// Start processing Mercury events in a background task.
269    async fn start_mercury_event_loop(&self) {
270        let mut mercury_rx = match self.mercury_socket.take_event_rx().await {
271            Some(rx) => rx,
272            None => {
273                warn!("Mercury event receiver already taken");
274                return;
275            }
276        };
277
278        let kms_client = self.kms_client.clone();
279        let kms_response_handler = self.kms_response_handler.clone();
280        let event_tx = self.event_tx.clone();
281        let connected = self.connected.clone();
282        let registration = self.registration.clone();
283        let device_manager = self.device_manager.clone();
284        let token = self.token.clone();
285
286        tokio::spawn(async move {
287            while let Some(event) = mercury_rx.recv().await {
288                match event {
289                    MercuryEvent::KmsResponse(data) => {
290                        // Use the separate response handler to avoid deadlock
291                        // with kms_client lock (held during initialize/get_key).
292                        let handler_guard = kms_response_handler.lock().await;
293                        if let Some(ref handler) = *handler_guard {
294                            handler.handle_kms_message(&data).await;
295                        }
296                    }
297                    MercuryEvent::Activity(activity) => {
298                        // Spawn in a separate task so the event loop can continue
299                        // processing KMS responses (needed for key retrieval during decryption).
300                        let kms_client_clone = kms_client.clone();
301                        let event_tx_clone = event_tx.clone();
302                        tokio::spawn(async move {
303                            let mut kms_guard = kms_client_clone.lock().await;
304                            if let Some(ref mut kms) = *kms_guard {
305                                Self::handle_activity_static(kms, &activity, &event_tx_clone).await;
306                            } else {
307                                warn!("Received activity but KMS client not initialized");
308                            }
309                        });
310                    }
311                    MercuryEvent::Connected => {
312                        info!("Mercury reconnected, refreshing device and KMS");
313
314                        // Refresh device
315                        let tok = token.lock().await.clone();
316                        {
317                            let reg_guard = registration.lock().await;
318                            if reg_guard.is_some() {
319                                let dm = device_manager.lock().await;
320                                match dm.refresh(&tok).await {
321                                    Ok(new_reg) => {
322                                        drop(reg_guard);
323                                        *registration.lock().await = Some(new_reg);
324                                    }
325                                    Err(e) => {
326                                        warn!("Device refresh on reconnect failed: {e}");
327                                    }
328                                }
329                            }
330                        }
331
332                        // Re-init KMS
333                        {
334                            let mut kms_guard = kms_client.lock().await;
335                            if let Some(ref mut kms) = *kms_guard {
336                                if let Err(e) = kms.initialize().await {
337                                    warn!("KMS re-init on reconnect failed: {e}");
338                                }
339                            }
340                        }
341
342                        *connected.lock().await = true;
343                        let _ = event_tx.send(HandlerEvent::Connected);
344                    }
345                    MercuryEvent::Disconnected(reason) => {
346                        *connected.lock().await = false;
347                        let _ = event_tx.send(HandlerEvent::Disconnected(reason));
348                    }
349                    MercuryEvent::Reconnecting(attempt) => {
350                        let _ = event_tx.send(HandlerEvent::Reconnecting(attempt));
351                    }
352                    MercuryEvent::Error(msg) => {
353                        let _ = event_tx.send(HandlerEvent::Error(msg));
354                    }
355                }
356            }
357        });
358    }
359
360    /// Handle a single activity (decrypt and route).
361    async fn handle_activity_static(
362        kms: &mut KmsClient,
363        activity: &MercuryActivity,
364        event_tx: &mpsc::UnboundedSender<HandlerEvent>,
365    ) {
366        // message:created — verb=post + objectType=comment
367        if activity.verb == "post" && activity.object.object_type == "comment" {
368            let mut decryptor = MessageDecryptor::new(kms);
369            match decryptor.decrypt_activity(activity).await {
370                Ok(decrypted) => {
371                    let msg = DecryptedMessage {
372                        id: decrypted.object.id.clone(),
373                        room_id: decrypted.target.id.clone(),
374                        person_id: decrypted.actor.id.clone(),
375                        person_email: decrypted
376                            .actor
377                            .email_address
378                            .clone()
379                            .unwrap_or_default(),
380                        text: decrypted.object.display_name.clone().unwrap_or_default(),
381                        html: decrypted.object.content.clone(),
382                        created: decrypted.published.clone(),
383                        room_type: infer_room_type(&decrypted),
384                        raw: decrypted,
385                    };
386                    let _ = event_tx.send(HandlerEvent::MessageCreated(msg));
387                }
388                Err(e) => {
389                    error!("Error decrypting activity: {e}");
390                    let _ = event_tx.send(HandlerEvent::Error(e.to_string()));
391                }
392            }
393            return;
394        }
395
396        // message:deleted — verb=delete + objectType=activity
397        if activity.verb == "delete" && activity.object.object_type == "activity" {
398            let _ = event_tx.send(HandlerEvent::MessageDeleted(DeletedMessage {
399                message_id: activity.object.id.clone(),
400                room_id: activity.target.id.clone(),
401                person_id: activity.actor.id.clone(),
402            }));
403        }
404    }
405
406    /// Disconnect from Webex.
407    pub async fn disconnect(&self) {
408        info!("Disconnecting from Webex...");
409        *self.connected.lock().await = false;
410
411        self.mercury_socket.disconnect().await;
412
413        let token = self.token.lock().await.clone();
414        {
415            let reg = self.registration.lock().await;
416            if reg.is_some() {
417                let mut dm = self.device_manager.lock().await;
418                if let Err(e) = dm.unregister(&token).await {
419                    warn!("Failed to unregister device: {e}");
420                } else {
421                    info!("Device unregistered");
422                }
423            }
424        }
425
426        *self.registration.lock().await = None;
427        *self.kms_client.lock().await = None;
428        *self.kms_response_handler.lock().await = None;
429    }
430
431    /// Update the access token and re-establish the connection.
432    pub async fn reconnect(&self, new_token: &str) -> Result<(), WebexError> {
433        if new_token.is_empty() {
434            return Err(WebexError::Internal(
435                "reconnect() requires a non-empty token string".into(),
436            ));
437        }
438
439        info!("Reconnecting with new token...");
440        self.disconnect().await;
441
442        *self.token.lock().await = new_token.to_string();
443        self.connect().await
444    }
445
446    /// Whether the handler is fully connected.
447    pub async fn connected(&self) -> bool {
448        let conn = *self.connected.lock().await;
449        conn && self.mercury_socket.connected().await
450    }
451
452    /// Returns a structured health check of all connection subsystems.
453    pub async fn status(&self) -> HandlerStatus {
454        let reconnect_attempt = self.mercury_socket.current_reconnect_attempts().await;
455        let ws_open = self.mercury_socket.connected().await;
456        let is_connected = *self.connected.lock().await;
457        let is_connecting = *self.connecting.lock().await;
458
459        let status = if is_connected && ws_open {
460            ConnectionStatus::Connected
461        } else if is_connecting {
462            ConnectionStatus::Connecting
463        } else if reconnect_attempt > 0 {
464            ConnectionStatus::Reconnecting
465        } else {
466            ConnectionStatus::Disconnected
467        };
468
469        HandlerStatus {
470            status,
471            web_socket_open: ws_open,
472            kms_initialized: self.kms_client.lock().await.is_some(),
473            device_registered: self.registration.lock().await.is_some(),
474            reconnect_attempt,
475        }
476    }
477}
478
479fn infer_room_type(activity: &MercuryActivity) -> Option<String> {
480    let tags = &activity.target.tags;
481    if tags.contains(&"ONE_ON_ONE".to_string()) {
482        return Some("direct".to_string());
483    }
484    if tags.contains(&"TEAM".to_string())
485        || tags.contains(&"LOCKED".to_string())
486        || tags.contains(&"GROUP".to_string())
487    {
488        return Some("group".to_string());
489    }
490    None
491}