Skip to main content

webex_message_handler/
handler.rs

1//! Main orchestrator: device registration, Mercury WebSocket, KMS, decryption.
2
3use crate::device_manager::DeviceManager;
4use crate::errors::WebexError;
5use crate::kms_client::{KmsClient, KmsResponseHandler};
6use crate::mercury_socket::{MercuryEvent, MercurySocket};
7use crate::message_decryptor::MessageDecryptor;
8use crate::types::{
9    Config, ConnectionStatus, DecryptedMessage, DeletedMessage, DeviceRegistration, HandlerStatus,
10    MercuryActivity,
11};
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::sync::{mpsc, Mutex};
15use tracing::{error, info, warn};
16
17/// Events emitted by WebexMessageHandler.
18#[derive(Debug, Clone)]
19pub enum HandlerEvent {
20    /// A new message was received and decrypted.
21    MessageCreated(DecryptedMessage),
22    /// A message was deleted.
23    MessageDeleted(DeletedMessage),
24    /// Successfully connected (or reconnected).
25    Connected,
26    /// Disconnected with a reason string.
27    Disconnected(String),
28    /// Reconnecting (attempt number).
29    Reconnecting(u32),
30    /// An error occurred.
31    Error(String),
32}
33
34/// Receives and decrypts Webex messages over Mercury WebSocket.
35pub struct WebexMessageHandler {
36    token: Arc<Mutex<String>>,
37    client: reqwest::Client,
38    device_manager: Arc<Mutex<DeviceManager>>,
39    mercury_socket: Arc<MercurySocket>,
40    kms_client: Arc<Mutex<Option<KmsClient>>>,
41    /// Separate handle for resolving KMS responses without locking kms_client.
42    kms_response_handler: Arc<Mutex<Option<KmsResponseHandler>>>,
43    registration: Arc<Mutex<Option<DeviceRegistration>>>,
44    connected: Arc<Mutex<bool>>,
45    connecting: Arc<Mutex<bool>>,
46
47    #[allow(dead_code)]
48    config: Config,
49    event_tx: mpsc::UnboundedSender<HandlerEvent>,
50    event_rx: Arc<Mutex<Option<mpsc::UnboundedReceiver<HandlerEvent>>>>,
51}
52
53impl WebexMessageHandler {
54    /// Create a new WebexMessageHandler.
55    pub fn new(config: Config) -> Result<Self, WebexError> {
56        if config.token.is_empty() {
57            return Err(WebexError::Internal(
58                "WebexMessageHandler requires a non-empty token string".into(),
59            ));
60        }
61
62        let client = config.client.clone().unwrap_or_else(|| reqwest::Client::new());
63
64        let mercury_socket = MercurySocket::new(
65            client.clone(),
66            Duration::from_secs_f64(config.ping_interval),
67            Duration::from_secs_f64(config.pong_timeout),
68            Duration::from_secs_f64(config.reconnect_backoff_max),
69            config.max_reconnect_attempts,
70        );
71
72        let (event_tx, event_rx) = mpsc::unbounded_channel();
73
74        Ok(Self {
75            token: Arc::new(Mutex::new(config.token.clone())),
76            client: client.clone(),
77            device_manager: Arc::new(Mutex::new(DeviceManager::new(client.clone()))),
78            mercury_socket: Arc::new(mercury_socket),
79            kms_client: Arc::new(Mutex::new(None)),
80            kms_response_handler: Arc::new(Mutex::new(None)),
81            registration: Arc::new(Mutex::new(None)),
82            connected: Arc::new(Mutex::new(false)),
83            connecting: Arc::new(Mutex::new(false)),
84            config,
85            event_tx,
86            event_rx: Arc::new(Mutex::new(Some(event_rx))),
87        })
88    }
89
90    /// Take the event receiver. Can only be called once.
91    pub async fn take_event_rx(&self) -> Option<mpsc::UnboundedReceiver<HandlerEvent>> {
92        self.event_rx.lock().await.take()
93    }
94
95    /// Connect to Webex (register device, connect Mercury, init KMS).
96    pub async fn connect(&self) -> Result<(), WebexError> {
97        {
98            let connecting = self.connecting.lock().await;
99            if *connecting {
100                return Err(WebexError::Internal("connect() already in progress".into()));
101            }
102        }
103        {
104            let connected = self.connected.lock().await;
105            if *connected {
106                return Err(WebexError::Internal(
107                    "Already connected. Call disconnect() first, or use reconnect().".into(),
108                ));
109            }
110        }
111
112        info!("Connecting to Webex...");
113        *self.connecting.lock().await = true;
114
115        let result = self.connect_internal().await;
116
117        *self.connecting.lock().await = false;
118
119        match result {
120            Ok(()) => {
121                *self.connected.lock().await = true;
122                info!("Connected to Webex");
123                let _ = self.event_tx.send(HandlerEvent::Connected);
124                Ok(())
125            }
126            Err(e) => Err(e),
127        }
128    }
129
130    async fn connect_internal(&self) -> Result<(), WebexError> {
131        let token = self.token.lock().await.clone();
132
133        // Step 1: Register device with WDM
134        let reg = {
135            let mut dm = self.device_manager.lock().await;
136            dm.register(&token).await?
137        };
138        info!("Device registered");
139
140        // Step 2: Create KMS client
141        let kms = KmsClient::new(
142            self.client.clone(),
143            &token,
144            &reg.device_url,
145            &reg.user_id,
146            &reg.encryption_service_url,
147        );
148
149        // Get the response handler BEFORE storing the KMS client so the
150        // event loop can resolve pending requests without locking kms_client.
151        let response_handler = kms.response_handler();
152        *self.kms_response_handler.lock().await = Some(response_handler);
153        *self.kms_client.lock().await = Some(kms);
154
155        // Step 3: Connect Mercury WebSocket (KMS responses arrive here)
156        self.mercury_socket
157            .connect(&reg.web_socket_url, &token)
158            .await?;
159        info!("Mercury connected");
160
161        // Step 4: Start Mercury event loop
162        self.start_mercury_event_loop().await;
163
164        // Step 5: Initialize KMS (ECDH handshake — response comes via Mercury)
165        {
166            let mut kms_guard = self.kms_client.lock().await;
167            if let Some(ref mut kms) = *kms_guard {
168                kms.initialize().await?;
169            }
170        }
171        info!("KMS initialized");
172
173        // Store registration
174        *self.registration.lock().await = Some(reg);
175
176        Ok(())
177    }
178
179    /// Start processing Mercury events in a background task.
180    async fn start_mercury_event_loop(&self) {
181        let mut mercury_rx = match self.mercury_socket.take_event_rx().await {
182            Some(rx) => rx,
183            None => {
184                warn!("Mercury event receiver already taken");
185                return;
186            }
187        };
188
189        let kms_client = self.kms_client.clone();
190        let kms_response_handler = self.kms_response_handler.clone();
191        let event_tx = self.event_tx.clone();
192        let connected = self.connected.clone();
193        let registration = self.registration.clone();
194        let device_manager = self.device_manager.clone();
195        let token = self.token.clone();
196
197        tokio::spawn(async move {
198            while let Some(event) = mercury_rx.recv().await {
199                match event {
200                    MercuryEvent::KmsResponse(data) => {
201                        // Use the separate response handler to avoid deadlock
202                        // with kms_client lock (held during initialize/get_key).
203                        let handler_guard = kms_response_handler.lock().await;
204                        if let Some(ref handler) = *handler_guard {
205                            handler.handle_kms_message(&data).await;
206                        }
207                    }
208                    MercuryEvent::Activity(activity) => {
209                        // Spawn in a separate task so the event loop can continue
210                        // processing KMS responses (needed for key retrieval during decryption).
211                        let kms_client_clone = kms_client.clone();
212                        let event_tx_clone = event_tx.clone();
213                        tokio::spawn(async move {
214                            let mut kms_guard = kms_client_clone.lock().await;
215                            if let Some(ref mut kms) = *kms_guard {
216                                Self::handle_activity_static(kms, &activity, &event_tx_clone).await;
217                            } else {
218                                warn!("Received activity but KMS client not initialized");
219                            }
220                        });
221                    }
222                    MercuryEvent::Connected => {
223                        info!("Mercury reconnected, refreshing device and KMS");
224
225                        // Refresh device
226                        let tok = token.lock().await.clone();
227                        {
228                            let reg_guard = registration.lock().await;
229                            if reg_guard.is_some() {
230                                let dm = device_manager.lock().await;
231                                match dm.refresh(&tok).await {
232                                    Ok(new_reg) => {
233                                        drop(reg_guard);
234                                        *registration.lock().await = Some(new_reg);
235                                    }
236                                    Err(e) => {
237                                        warn!("Device refresh on reconnect failed: {e}");
238                                    }
239                                }
240                            }
241                        }
242
243                        // Re-init KMS
244                        {
245                            let mut kms_guard = kms_client.lock().await;
246                            if let Some(ref mut kms) = *kms_guard {
247                                if let Err(e) = kms.initialize().await {
248                                    warn!("KMS re-init on reconnect failed: {e}");
249                                }
250                            }
251                        }
252
253                        *connected.lock().await = true;
254                        let _ = event_tx.send(HandlerEvent::Connected);
255                    }
256                    MercuryEvent::Disconnected(reason) => {
257                        *connected.lock().await = false;
258                        let _ = event_tx.send(HandlerEvent::Disconnected(reason));
259                    }
260                    MercuryEvent::Reconnecting(attempt) => {
261                        let _ = event_tx.send(HandlerEvent::Reconnecting(attempt));
262                    }
263                    MercuryEvent::Error(msg) => {
264                        let _ = event_tx.send(HandlerEvent::Error(msg));
265                    }
266                }
267            }
268        });
269    }
270
271    /// Handle a single activity (decrypt and route).
272    async fn handle_activity_static(
273        kms: &mut KmsClient,
274        activity: &MercuryActivity,
275        event_tx: &mpsc::UnboundedSender<HandlerEvent>,
276    ) {
277        // message:created — verb=post + objectType=comment
278        if activity.verb == "post" && activity.object.object_type == "comment" {
279            let mut decryptor = MessageDecryptor::new(kms);
280            match decryptor.decrypt_activity(activity).await {
281                Ok(decrypted) => {
282                    let msg = DecryptedMessage {
283                        id: decrypted.object.id.clone(),
284                        room_id: decrypted.target.id.clone(),
285                        person_id: decrypted.actor.id.clone(),
286                        person_email: decrypted
287                            .actor
288                            .email_address
289                            .clone()
290                            .unwrap_or_default(),
291                        text: decrypted.object.display_name.clone().unwrap_or_default(),
292                        html: decrypted.object.content.clone(),
293                        created: decrypted.published.clone(),
294                        room_type: infer_room_type(&decrypted),
295                        raw: decrypted,
296                    };
297                    let _ = event_tx.send(HandlerEvent::MessageCreated(msg));
298                }
299                Err(e) => {
300                    error!("Error decrypting activity: {e}");
301                    let _ = event_tx.send(HandlerEvent::Error(e.to_string()));
302                }
303            }
304            return;
305        }
306
307        // message:deleted — verb=delete + objectType=activity
308        if activity.verb == "delete" && activity.object.object_type == "activity" {
309            let _ = event_tx.send(HandlerEvent::MessageDeleted(DeletedMessage {
310                message_id: activity.object.id.clone(),
311                room_id: activity.target.id.clone(),
312                person_id: activity.actor.id.clone(),
313            }));
314        }
315    }
316
317    /// Disconnect from Webex.
318    pub async fn disconnect(&self) {
319        info!("Disconnecting from Webex...");
320        *self.connected.lock().await = false;
321
322        self.mercury_socket.disconnect().await;
323
324        let token = self.token.lock().await.clone();
325        {
326            let reg = self.registration.lock().await;
327            if reg.is_some() {
328                let mut dm = self.device_manager.lock().await;
329                if let Err(e) = dm.unregister(&token).await {
330                    warn!("Failed to unregister device: {e}");
331                } else {
332                    info!("Device unregistered");
333                }
334            }
335        }
336
337        *self.registration.lock().await = None;
338        *self.kms_client.lock().await = None;
339        *self.kms_response_handler.lock().await = None;
340    }
341
342    /// Update the access token and re-establish the connection.
343    pub async fn reconnect(&self, new_token: &str) -> Result<(), WebexError> {
344        if new_token.is_empty() {
345            return Err(WebexError::Internal(
346                "reconnect() requires a non-empty token string".into(),
347            ));
348        }
349
350        info!("Reconnecting with new token...");
351        self.disconnect().await;
352
353        *self.token.lock().await = new_token.to_string();
354        self.connect().await
355    }
356
357    /// Whether the handler is fully connected.
358    pub async fn connected(&self) -> bool {
359        let conn = *self.connected.lock().await;
360        conn && self.mercury_socket.connected().await
361    }
362
363    /// Returns a structured health check of all connection subsystems.
364    pub async fn status(&self) -> HandlerStatus {
365        let reconnect_attempt = self.mercury_socket.current_reconnect_attempts().await;
366        let ws_open = self.mercury_socket.connected().await;
367        let is_connected = *self.connected.lock().await;
368        let is_connecting = *self.connecting.lock().await;
369
370        let status = if is_connected && ws_open {
371            ConnectionStatus::Connected
372        } else if is_connecting {
373            ConnectionStatus::Connecting
374        } else if reconnect_attempt > 0 {
375            ConnectionStatus::Reconnecting
376        } else {
377            ConnectionStatus::Disconnected
378        };
379
380        HandlerStatus {
381            status,
382            web_socket_open: ws_open,
383            kms_initialized: self.kms_client.lock().await.is_some(),
384            device_registered: self.registration.lock().await.is_some(),
385            reconnect_attempt,
386        }
387    }
388}
389
390fn infer_room_type(activity: &MercuryActivity) -> Option<String> {
391    let tags = &activity.target.tags;
392    if tags.contains(&"ONE_ON_ONE".to_string()) {
393        return Some("direct".to_string());
394    }
395    if tags.contains(&"TEAM".to_string())
396        || tags.contains(&"LOCKED".to_string())
397        || tags.contains(&"GROUP".to_string())
398    {
399        return Some("group".to_string());
400    }
401    None
402}