Skip to main content

webex_message_handler/
handler.rs

1//! Main orchestrator: device registration, Mercury WebSocket, KMS, decryption.
2
3use crate::device_manager::DeviceManager;
4use crate::errors::WebexError;
5use crate::kms_client::{KmsClient, KmsResponseHandler};
6use crate::mercury_socket::{MercuryEvent, MercurySocket};
7use crate::message_decryptor::MessageDecryptor;
8use crate::types::{
9    Config, ConnectionStatus, DecryptedMessage, DeletedMessage, DeviceRegistration, FetchRequest,
10    FetchResponse, HandlerStatus, MercuryActivity, NetworkMode,
11};
12use std::future::Future;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::{mpsc, Mutex};
17use tracing::{error, info, warn};
18
19// Internal adapter type (alias for public type)
20type HttpDoFn = Arc<
21    dyn Fn(FetchRequest) -> Pin<Box<dyn Future<Output = Result<FetchResponse, Box<dyn std::error::Error + Send + Sync>>> + Send>>
22        + Send
23        + Sync,
24>;
25
26/// Events emitted by WebexMessageHandler.
27#[derive(Debug, Clone)]
28pub enum HandlerEvent {
29    /// A new message was received and decrypted.
30    MessageCreated(DecryptedMessage),
31    /// A message was deleted.
32    MessageDeleted(DeletedMessage),
33    /// Successfully connected (or reconnected).
34    Connected,
35    /// Disconnected with a reason string.
36    Disconnected(String),
37    /// Reconnecting (attempt number).
38    Reconnecting(u32),
39    /// An error occurred.
40    Error(String),
41}
42
43/// Create a native HTTP adapter that wraps reqwest::Client.
44fn create_native_http_adapter(client: reqwest::Client) -> HttpDoFn {
45    Arc::new(move |req: FetchRequest| {
46        let client = client.clone();
47        Box::pin(async move {
48            let mut request_builder = match req.method.as_str() {
49                "GET" => client.get(&req.url),
50                "POST" => client.post(&req.url),
51                "PUT" => client.put(&req.url),
52                "DELETE" => client.delete(&req.url),
53                _ => return Err(format!("Unsupported HTTP method: {}", req.method).into()),
54            };
55
56            for (key, value) in req.headers {
57                request_builder = request_builder.header(key, value);
58            }
59
60            if let Some(body) = req.body {
61                request_builder = request_builder.body(body);
62            }
63
64            let response = request_builder
65                .send()
66                .await
67                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
68
69            let status = response.status().as_u16();
70            let ok = response.status().is_success();
71            let body_bytes = response
72                .bytes()
73                .await
74                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?
75                .to_vec();
76
77            Ok(FetchResponse {
78                status,
79                ok,
80                body: body_bytes,
81            })
82        })
83    })
84}
85
86/// Receives and decrypts Webex messages over Mercury WebSocket.
87pub struct WebexMessageHandler {
88    token: Arc<Mutex<String>>,
89    http_do: HttpDoFn,
90    device_manager: Arc<Mutex<DeviceManager>>,
91    mercury_socket: Arc<MercurySocket>,
92    kms_client: Arc<Mutex<Option<KmsClient>>>,
93    /// Separate handle for resolving KMS responses without locking kms_client.
94    kms_response_handler: Arc<Mutex<Option<KmsResponseHandler>>>,
95    registration: Arc<Mutex<Option<DeviceRegistration>>>,
96    connected: Arc<Mutex<bool>>,
97    connecting: Arc<Mutex<bool>>,
98    ignore_self_messages: bool,
99    bot_person_id: Arc<Mutex<Option<String>>>,
100
101    #[allow(dead_code)]
102    config: Config,
103    event_tx: mpsc::UnboundedSender<HandlerEvent>,
104    event_rx: Arc<Mutex<Option<mpsc::UnboundedReceiver<HandlerEvent>>>>,
105}
106
107impl WebexMessageHandler {
108    /// Create a new WebexMessageHandler.
109    pub fn new(config: Config) -> Result<Self, WebexError> {
110        if config.token.is_empty() {
111            return Err(WebexError::Internal(
112                "WebexMessageHandler requires a non-empty token string".into(),
113            ));
114        }
115
116        // Validate networking mode configuration
117        match config.mode {
118            NetworkMode::Injected => {
119                if config.fetch.is_none() || config.web_socket_factory.is_none() {
120                    return Err(WebexError::Internal(
121                        "Injected mode requires both fetch and web_socket_factory".into(),
122                    ));
123                }
124                if config.client.is_some() {
125                    return Err(WebexError::Internal(
126                        "Cannot use native proxy parameters (client) in injected mode".into(),
127                    ));
128                }
129            }
130            NetworkMode::Native => {
131                if config.fetch.is_some() || config.web_socket_factory.is_some() {
132                    return Err(WebexError::Internal(
133                        "Cannot provide fetch/web_socket_factory in native mode — set mode to Injected".into(),
134                    ));
135                }
136            }
137        }
138
139        // Create adapters based on mode
140        let (http_do, ws_factory) = match config.mode {
141            NetworkMode::Native => {
142                let client = config.client.clone().unwrap_or_else(|| reqwest::Client::new());
143                let http_adapter = create_native_http_adapter(client.clone());
144                (http_adapter, None)
145            }
146            NetworkMode::Injected => {
147                let http_adapter = config.fetch.clone().unwrap();
148                let ws_factory = config.web_socket_factory.clone();
149                (http_adapter, ws_factory)
150            }
151        };
152
153        let mercury_socket = MercurySocket::new(
154            ws_factory,
155            Duration::from_secs_f64(config.ping_interval),
156            Duration::from_secs_f64(config.pong_timeout),
157            Duration::from_secs_f64(config.reconnect_backoff_max),
158            config.max_reconnect_attempts,
159        );
160
161        let (event_tx, event_rx) = mpsc::unbounded_channel();
162
163        let ignore_self_messages = config.ignore_self_messages;
164
165        Ok(Self {
166            token: Arc::new(Mutex::new(config.token.clone())),
167            http_do: http_do.clone(),
168            device_manager: Arc::new(Mutex::new(DeviceManager::new(http_do.clone()))),
169            mercury_socket: Arc::new(mercury_socket),
170            kms_client: Arc::new(Mutex::new(None)),
171            kms_response_handler: Arc::new(Mutex::new(None)),
172            registration: Arc::new(Mutex::new(None)),
173            connected: Arc::new(Mutex::new(false)),
174            connecting: Arc::new(Mutex::new(false)),
175            ignore_self_messages,
176            bot_person_id: Arc::new(Mutex::new(None)),
177            config,
178            event_tx,
179            event_rx: Arc::new(Mutex::new(Some(event_rx))),
180        })
181    }
182
183    /// Take the event receiver. Can only be called once.
184    pub async fn take_event_rx(&self) -> Option<mpsc::UnboundedReceiver<HandlerEvent>> {
185        self.event_rx.lock().await.take()
186    }
187
188    /// Connect to Webex (register device, connect Mercury, init KMS).
189    pub async fn connect(&self) -> Result<(), WebexError> {
190        {
191            let connecting = self.connecting.lock().await;
192            if *connecting {
193                return Err(WebexError::Internal("connect() already in progress".into()));
194            }
195        }
196        {
197            let connected = self.connected.lock().await;
198            if *connected {
199                return Err(WebexError::Internal(
200                    "Already connected. Call disconnect() first, or use reconnect().".into(),
201                ));
202            }
203        }
204
205        info!("Connecting to Webex...");
206        *self.connecting.lock().await = true;
207
208        let result = self.connect_internal().await;
209
210        *self.connecting.lock().await = false;
211
212        match result {
213            Ok(()) => {
214                *self.connected.lock().await = true;
215                info!("Connected to Webex");
216                let _ = self.event_tx.send(HandlerEvent::Connected);
217                Ok(())
218            }
219            Err(e) => Err(e),
220        }
221    }
222
223    async fn fetch_bot_person_id(&self) {
224        info!("Fetching bot person info for self-message filtering");
225        let token = self.token.lock().await.clone();
226        let req = FetchRequest {
227            url: "https://webexapis.com/v1/people/me".into(),
228            method: "GET".into(),
229            headers: {
230                let mut h = std::collections::HashMap::new();
231                h.insert("Authorization".into(), format!("Bearer {}", token));
232                h.insert("Content-Type".into(), "application/json".into());
233                h
234            },
235            body: None,
236        };
237
238        match (self.http_do)(req).await {
239            Ok(resp) => {
240                if !resp.ok {
241                    warn!("Failed to fetch bot person info: HTTP {}", resp.status);
242                    return;
243                }
244                match serde_json::from_slice::<serde_json::Value>(&resp.body) {
245                    Ok(data) => {
246                        if let Some(id) = data.get("id").and_then(|v| v.as_str()) {
247                            info!("Bot person ID cached for self-message filtering: {}", id);
248                            *self.bot_person_id.lock().await = Some(id.to_string());
249                        }
250                    }
251                    Err(e) => {
252                        warn!("Error parsing bot person info: {}", e);
253                    }
254                }
255            }
256            Err(e) => {
257                warn!("Error fetching bot person info: {}", e);
258            }
259        }
260    }
261
262    async fn connect_internal(&self) -> Result<(), WebexError> {
263        let token = self.token.lock().await.clone();
264
265        // Step 1: Register device with WDM
266        let reg = {
267            let mut dm = self.device_manager.lock().await;
268            dm.register(&token).await?
269        };
270        info!("Device registered");
271
272        // Step 1.5: Fetch bot person info if self-message filtering is enabled
273        if self.ignore_self_messages {
274            self.fetch_bot_person_id().await;
275        }
276
277        // Step 2: Create KMS client
278        let kms = KmsClient::new(
279            self.http_do.clone(),
280            &token,
281            &reg.device_url,
282            &reg.user_id,
283            &reg.encryption_service_url,
284        );
285
286        // Get the response handler BEFORE storing the KMS client so the
287        // event loop can resolve pending requests without locking kms_client.
288        let response_handler = kms.response_handler();
289        *self.kms_response_handler.lock().await = Some(response_handler);
290        *self.kms_client.lock().await = Some(kms);
291
292        // Step 3: Connect Mercury WebSocket (KMS responses arrive here)
293        self.mercury_socket
294            .connect(&reg.web_socket_url, &token)
295            .await?;
296        info!("Mercury connected");
297
298        // Step 4: Start Mercury event loop
299        self.start_mercury_event_loop().await;
300
301        // Step 5: Initialize KMS (ECDH handshake — response comes via Mercury)
302        {
303            let mut kms_guard = self.kms_client.lock().await;
304            if let Some(ref mut kms) = *kms_guard {
305                kms.initialize().await?;
306            }
307        }
308        info!("KMS initialized");
309
310        // Store registration
311        *self.registration.lock().await = Some(reg);
312
313        Ok(())
314    }
315
316    /// Start processing Mercury events in a background task.
317    async fn start_mercury_event_loop(&self) {
318        let mut mercury_rx = match self.mercury_socket.take_event_rx().await {
319            Some(rx) => rx,
320            None => {
321                warn!("Mercury event receiver already taken");
322                return;
323            }
324        };
325
326        let kms_client = self.kms_client.clone();
327        let kms_response_handler = self.kms_response_handler.clone();
328        let event_tx = self.event_tx.clone();
329        let connected = self.connected.clone();
330        let registration = self.registration.clone();
331        let device_manager = self.device_manager.clone();
332        let token = self.token.clone();
333        let bot_person_id = self.bot_person_id.clone();
334
335        tokio::spawn(async move {
336            while let Some(event) = mercury_rx.recv().await {
337                match event {
338                    MercuryEvent::KmsResponse(data) => {
339                        // Use the separate response handler to avoid deadlock
340                        // with kms_client lock (held during initialize/get_key).
341                        let handler_guard = kms_response_handler.lock().await;
342                        if let Some(ref handler) = *handler_guard {
343                            handler.handle_kms_message(&data).await;
344                        }
345                    }
346                    MercuryEvent::Activity(activity) => {
347                        // Spawn in a separate task so the event loop can continue
348                        // processing KMS responses (needed for key retrieval during decryption).
349                        let kms_client_clone = kms_client.clone();
350                        let event_tx_clone = event_tx.clone();
351                        let bot_person_id = bot_person_id.clone();
352                        tokio::spawn(async move {
353                            let mut kms_guard = kms_client_clone.lock().await;
354                            if let Some(ref mut kms) = *kms_guard {
355                                let bot_id = bot_person_id.lock().await.clone();
356                                Self::handle_activity_static(kms, &activity, &event_tx_clone, bot_id.as_deref()).await;
357                            } else {
358                                warn!("Received activity but KMS client not initialized");
359                            }
360                        });
361                    }
362                    MercuryEvent::Connected => {
363                        info!("Mercury reconnected, refreshing device and KMS");
364
365                        // Refresh device
366                        let tok = token.lock().await.clone();
367                        {
368                            let reg_guard = registration.lock().await;
369                            if reg_guard.is_some() {
370                                let dm = device_manager.lock().await;
371                                match dm.refresh(&tok).await {
372                                    Ok(new_reg) => {
373                                        drop(reg_guard);
374                                        *registration.lock().await = Some(new_reg);
375                                    }
376                                    Err(e) => {
377                                        warn!("Device refresh on reconnect failed: {e}");
378                                    }
379                                }
380                            }
381                        }
382
383                        // Re-init KMS
384                        {
385                            let mut kms_guard = kms_client.lock().await;
386                            if let Some(ref mut kms) = *kms_guard {
387                                if let Err(e) = kms.initialize().await {
388                                    warn!("KMS re-init on reconnect failed: {e}");
389                                }
390                            }
391                        }
392
393                        *connected.lock().await = true;
394                        let _ = event_tx.send(HandlerEvent::Connected);
395                    }
396                    MercuryEvent::Disconnected(reason) => {
397                        *connected.lock().await = false;
398                        let _ = event_tx.send(HandlerEvent::Disconnected(reason));
399                    }
400                    MercuryEvent::Reconnecting(attempt) => {
401                        let _ = event_tx.send(HandlerEvent::Reconnecting(attempt));
402                    }
403                    MercuryEvent::Error(msg) => {
404                        let _ = event_tx.send(HandlerEvent::Error(msg));
405                    }
406                }
407            }
408        });
409    }
410
411    /// Handle a single activity (decrypt and route).
412    async fn handle_activity_static(
413        kms: &mut KmsClient,
414        activity: &MercuryActivity,
415        event_tx: &mpsc::UnboundedSender<HandlerEvent>,
416        bot_person_id: Option<&str>,
417    ) {
418        // message:created — verb=post + objectType=comment
419        if activity.verb == "post" && activity.object.object_type == "comment" {
420            let mut decryptor = MessageDecryptor::new(kms);
421            match decryptor.decrypt_activity(activity).await {
422                Ok(decrypted) => {
423                    let msg = DecryptedMessage {
424                        id: decrypted.object.id.clone(),
425                        room_id: decrypted.target.id.clone(),
426                        person_id: decrypted.actor.id.clone(),
427                        person_email: decrypted
428                            .actor
429                            .email_address
430                            .clone()
431                            .unwrap_or_default(),
432                        text: decrypted.object.display_name.clone().unwrap_or_default(),
433                        html: decrypted.object.content.clone(),
434                        created: decrypted.published.clone(),
435                        room_type: infer_room_type(&decrypted),
436                        raw: decrypted,
437                    };
438
439                    // Filter self-messages if enabled
440                    if let Some(bot_id) = bot_person_id {
441                        if msg.person_id == bot_id {
442                            info!("Ignoring self-message from bot ({})", bot_id);
443                            return;
444                        }
445                    }
446
447                    let _ = event_tx.send(HandlerEvent::MessageCreated(msg));
448                }
449                Err(e) => {
450                    error!("Error decrypting activity: {e}");
451                    let _ = event_tx.send(HandlerEvent::Error(e.to_string()));
452                }
453            }
454            return;
455        }
456
457        // message:deleted — verb=delete + objectType=activity
458        if activity.verb == "delete" && activity.object.object_type == "activity" {
459            let _ = event_tx.send(HandlerEvent::MessageDeleted(DeletedMessage {
460                message_id: activity.object.id.clone(),
461                room_id: activity.target.id.clone(),
462                person_id: activity.actor.id.clone(),
463            }));
464        }
465    }
466
467    /// Disconnect from Webex.
468    pub async fn disconnect(&self) {
469        info!("Disconnecting from Webex...");
470        *self.connected.lock().await = false;
471
472        self.mercury_socket.disconnect().await;
473
474        let token = self.token.lock().await.clone();
475        {
476            let reg = self.registration.lock().await;
477            if reg.is_some() {
478                let mut dm = self.device_manager.lock().await;
479                if let Err(e) = dm.unregister(&token).await {
480                    warn!("Failed to unregister device: {e}");
481                } else {
482                    info!("Device unregistered");
483                }
484            }
485        }
486
487        *self.registration.lock().await = None;
488        *self.kms_client.lock().await = None;
489        *self.kms_response_handler.lock().await = None;
490        *self.bot_person_id.lock().await = None;
491    }
492
493    /// Update the access token and re-establish the connection.
494    pub async fn reconnect(&self, new_token: &str) -> Result<(), WebexError> {
495        if new_token.is_empty() {
496            return Err(WebexError::Internal(
497                "reconnect() requires a non-empty token string".into(),
498            ));
499        }
500
501        info!("Reconnecting with new token...");
502        self.disconnect().await;
503
504        *self.token.lock().await = new_token.to_string();
505        self.connect().await
506    }
507
508    /// Whether the handler is fully connected.
509    pub async fn connected(&self) -> bool {
510        let conn = *self.connected.lock().await;
511        conn && self.mercury_socket.connected().await
512    }
513
514    /// Returns a structured health check of all connection subsystems.
515    pub async fn status(&self) -> HandlerStatus {
516        let reconnect_attempt = self.mercury_socket.current_reconnect_attempts().await;
517        let ws_open = self.mercury_socket.connected().await;
518        let is_connected = *self.connected.lock().await;
519        let is_connecting = *self.connecting.lock().await;
520
521        let status = if is_connected && ws_open {
522            ConnectionStatus::Connected
523        } else if is_connecting {
524            ConnectionStatus::Connecting
525        } else if reconnect_attempt > 0 {
526            ConnectionStatus::Reconnecting
527        } else {
528            ConnectionStatus::Disconnected
529        };
530
531        HandlerStatus {
532            status,
533            web_socket_open: ws_open,
534            kms_initialized: self.kms_client.lock().await.is_some(),
535            device_registered: self.registration.lock().await.is_some(),
536            reconnect_attempt,
537        }
538    }
539}
540
541fn infer_room_type(activity: &MercuryActivity) -> Option<String> {
542    let tags = &activity.target.tags;
543    if tags.contains(&"ONE_ON_ONE".to_string()) {
544        return Some("direct".to_string());
545    }
546    if tags.contains(&"TEAM".to_string())
547        || tags.contains(&"LOCKED".to_string())
548        || tags.contains(&"GROUP".to_string())
549    {
550        return Some("group".to_string());
551    }
552    None
553}