Skip to main content

webex_message_handler/
handler.rs

1//! Main orchestrator: device registration, Mercury WebSocket, KMS, decryption.
2
3use crate::device_manager::DeviceManager;
4use crate::errors::WebexError;
5use crate::kms_client::{KmsClient, KmsResponseHandler};
6use crate::mercury_socket::{MercuryEvent, MercurySocket};
7use crate::message_decryptor::MessageDecryptor;
8use crate::types::{
9    Config, ConnectionStatus, DecryptedMessage, DeletedMessage, DeviceRegistration, FetchRequest,
10    FetchResponse, HandlerStatus, MembershipActivity, MercuryActivity, NetworkMode,
11};
12use std::future::Future;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::{mpsc, Mutex};
17use tracing::{error, info, warn};
18
19// Internal adapter type (alias for public type)
20type HttpDoFn = Arc<
21    dyn Fn(FetchRequest) -> Pin<Box<dyn Future<Output = Result<FetchResponse, Box<dyn std::error::Error + Send + Sync>>> + Send>>
22        + Send
23        + Sync,
24>;
25
26/// Events emitted by WebexMessageHandler.
27#[derive(Debug, Clone)]
28pub enum HandlerEvent {
29    /// A new message was received and decrypted.
30    MessageCreated(DecryptedMessage),
31    /// A message was deleted.
32    MessageDeleted(DeletedMessage),
33    /// A membership event occurred (add, leave, assignModerator, unassignModerator).
34    MembershipCreated(MembershipActivity),
35    /// Successfully connected (or reconnected).
36    Connected,
37    /// Disconnected with a reason string.
38    Disconnected(String),
39    /// Reconnecting (attempt number).
40    Reconnecting(u32),
41    /// An error occurred.
42    Error(String),
43}
44
45/// Extract the raw UUID from a Webex person ID.
46///
47/// The Webex REST API returns base64-encoded IDs like:
48///   `"Y2lzY29zcGFyazovL3VzL1BFT1BMRS9mYjUx..."` → `"ciscospark://us/PEOPLE/fb51254f-..."`
49///
50/// Mercury wire format uses raw UUIDs:
51///   `"fb51254f-3b37-4e50-aa04-45744c2effc7"`
52///
53/// This function normalizes both formats to the raw UUID for comparison.
54fn extract_person_uuid(id: &str) -> String {
55    use base64::engine::general_purpose::{STANDARD, STANDARD_NO_PAD};
56    use base64::Engine;
57
58    // Try standard base64, then no-padding variant
59    let decoded_bytes = STANDARD
60        .decode(id)
61        .or_else(|_| STANDARD_NO_PAD.decode(id));
62
63    if let Ok(bytes) = decoded_bytes {
64        if let Ok(decoded) = String::from_utf8(bytes) {
65            if decoded.starts_with("ciscospark://") {
66                if let Some(uuid) = decoded.rsplit('/').next() {
67                    if !uuid.is_empty() {
68                        return uuid.to_string();
69                    }
70                }
71            }
72        }
73    }
74
75    id.to_string()
76}
77
78/// Create a native HTTP adapter that wraps reqwest::Client.
79fn create_native_http_adapter(client: reqwest::Client) -> HttpDoFn {
80    Arc::new(move |req: FetchRequest| {
81        let client = client.clone();
82        Box::pin(async move {
83            let mut request_builder = match req.method.as_str() {
84                "GET" => client.get(&req.url),
85                "POST" => client.post(&req.url),
86                "PUT" => client.put(&req.url),
87                "DELETE" => client.delete(&req.url),
88                _ => return Err(format!("Unsupported HTTP method: {}", req.method).into()),
89            };
90
91            for (key, value) in req.headers {
92                request_builder = request_builder.header(key, value);
93            }
94
95            if let Some(body) = req.body {
96                request_builder = request_builder.body(body);
97            }
98
99            let response = request_builder
100                .send()
101                .await
102                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
103
104            let status = response.status().as_u16();
105            let ok = response.status().is_success();
106            let body_bytes = response
107                .bytes()
108                .await
109                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?
110                .to_vec();
111
112            Ok(FetchResponse {
113                status,
114                ok,
115                body: body_bytes,
116            })
117        })
118    })
119}
120
121/// Receives and decrypts Webex messages over Mercury WebSocket.
122pub struct WebexMessageHandler {
123    token: Arc<Mutex<String>>,
124    http_do: HttpDoFn,
125    device_manager: Arc<Mutex<DeviceManager>>,
126    mercury_socket: Arc<MercurySocket>,
127    kms_client: Arc<Mutex<Option<KmsClient>>>,
128    /// Separate handle for resolving KMS responses without locking kms_client.
129    kms_response_handler: Arc<Mutex<Option<KmsResponseHandler>>>,
130    registration: Arc<Mutex<Option<DeviceRegistration>>>,
131    connected: Arc<Mutex<bool>>,
132    connecting: Arc<Mutex<bool>>,
133    ignore_self_messages: bool,
134    bot_person_id: Arc<Mutex<Option<String>>>,
135
136    #[allow(dead_code)]
137    config: Config,
138    event_tx: mpsc::UnboundedSender<HandlerEvent>,
139    event_rx: Arc<Mutex<Option<mpsc::UnboundedReceiver<HandlerEvent>>>>,
140}
141
142impl WebexMessageHandler {
143    /// Create a new WebexMessageHandler.
144    pub fn new(config: Config) -> Result<Self, WebexError> {
145        if config.token.is_empty() {
146            return Err(WebexError::Internal(
147                "WebexMessageHandler requires a non-empty token string".into(),
148            ));
149        }
150
151        // Validate networking mode configuration
152        match config.mode {
153            NetworkMode::Injected => {
154                if config.fetch.is_none() || config.web_socket_factory.is_none() {
155                    return Err(WebexError::Internal(
156                        "Injected mode requires both fetch and web_socket_factory".into(),
157                    ));
158                }
159                if config.client.is_some() {
160                    return Err(WebexError::Internal(
161                        "Cannot use native proxy parameters (client) in injected mode".into(),
162                    ));
163                }
164            }
165            NetworkMode::Native => {
166                if config.fetch.is_some() || config.web_socket_factory.is_some() {
167                    return Err(WebexError::Internal(
168                        "Cannot provide fetch/web_socket_factory in native mode — set mode to Injected".into(),
169                    ));
170                }
171            }
172        }
173
174        // Create adapters based on mode
175        let (http_do, ws_factory) = match config.mode {
176            NetworkMode::Native => {
177                let client = config.client.clone().unwrap_or_default();
178                let http_adapter = create_native_http_adapter(client.clone());
179                (http_adapter, None)
180            }
181            NetworkMode::Injected => {
182                let http_adapter = config.fetch.clone().unwrap();
183                let ws_factory = config.web_socket_factory.clone();
184                (http_adapter, ws_factory)
185            }
186        };
187
188        let mercury_socket = MercurySocket::new(
189            ws_factory,
190            Duration::from_secs_f64(config.ping_interval),
191            Duration::from_secs_f64(config.pong_timeout),
192            Duration::from_secs_f64(config.reconnect_backoff_max),
193            config.max_reconnect_attempts,
194        );
195
196        let (event_tx, event_rx) = mpsc::unbounded_channel();
197
198        let ignore_self_messages = config.ignore_self_messages;
199
200        Ok(Self {
201            token: Arc::new(Mutex::new(config.token.clone())),
202            http_do: http_do.clone(),
203            device_manager: Arc::new(Mutex::new(DeviceManager::new(http_do.clone()))),
204            mercury_socket: Arc::new(mercury_socket),
205            kms_client: Arc::new(Mutex::new(None)),
206            kms_response_handler: Arc::new(Mutex::new(None)),
207            registration: Arc::new(Mutex::new(None)),
208            connected: Arc::new(Mutex::new(false)),
209            connecting: Arc::new(Mutex::new(false)),
210            ignore_self_messages,
211            bot_person_id: Arc::new(Mutex::new(None)),
212            config,
213            event_tx,
214            event_rx: Arc::new(Mutex::new(Some(event_rx))),
215        })
216    }
217
218    /// Take the event receiver. Can only be called once.
219    pub async fn take_event_rx(&self) -> Option<mpsc::UnboundedReceiver<HandlerEvent>> {
220        self.event_rx.lock().await.take()
221    }
222
223    /// Connect to Webex (register device, connect Mercury, init KMS).
224    pub async fn connect(&self) -> Result<(), WebexError> {
225        {
226            let connecting = self.connecting.lock().await;
227            if *connecting {
228                return Err(WebexError::Internal("connect() already in progress".into()));
229            }
230        }
231        {
232            let connected = self.connected.lock().await;
233            if *connected {
234                return Err(WebexError::Internal(
235                    "Already connected. Call disconnect() first, or use reconnect().".into(),
236                ));
237            }
238        }
239
240        info!("Connecting to Webex...");
241        *self.connecting.lock().await = true;
242
243        let result = self.connect_internal().await;
244
245        *self.connecting.lock().await = false;
246
247        match result {
248            Ok(()) => {
249                *self.connected.lock().await = true;
250                info!("Connected to Webex");
251                let _ = self.event_tx.send(HandlerEvent::Connected);
252                Ok(())
253            }
254            Err(e) => Err(e),
255        }
256    }
257
258    async fn fetch_bot_person_id(&self) -> Result<(), WebexError> {
259        info!("Fetching bot person info for self-message filtering");
260        let token = self.token.lock().await.clone();
261        let req = FetchRequest {
262            url: "https://webexapis.com/v1/people/me".into(),
263            method: "GET".into(),
264            headers: {
265                let mut h = std::collections::HashMap::new();
266                h.insert("Authorization".into(), format!("Bearer {}", token));
267                h.insert("Content-Type".into(), "application/json".into());
268                h
269            },
270            body: None,
271        };
272
273        let resp = (self.http_do)(req).await.map_err(|e| {
274            WebexError::Internal(format!(
275                "Failed to fetch bot identity for self-message filtering: {e}. \
276                 Set ignore_self_messages to false to skip this check (not recommended — may cause message loops)."
277            ))
278        })?;
279
280        if !resp.ok {
281            return Err(WebexError::Internal(format!(
282                "Failed to fetch bot identity for self-message filtering: HTTP {}. \
283                 Set ignore_self_messages to false to skip this check (not recommended — may cause message loops).",
284                resp.status
285            )));
286        }
287
288        let data: serde_json::Value = serde_json::from_slice(&resp.body).map_err(|e| {
289            WebexError::Internal(format!("Failed to parse bot identity response: {e}"))
290        })?;
291
292        let id = data
293            .get("id")
294            .and_then(|v| v.as_str())
295            .ok_or_else(|| WebexError::Internal("Bot identity response missing 'id' field".into()))?;
296
297        let uuid = extract_person_uuid(id);
298        info!("Bot person ID cached for self-message filtering: {}", uuid);
299        *self.bot_person_id.lock().await = Some(uuid);
300        Ok(())
301    }
302
303    async fn connect_internal(&self) -> Result<(), WebexError> {
304        let token = self.token.lock().await.clone();
305
306        // Step 1: Register device with WDM
307        let reg = {
308            let mut dm = self.device_manager.lock().await;
309            dm.register(&token).await?
310        };
311        info!("Device registered");
312
313        // Step 1.5: Fetch bot person info if self-message filtering is enabled
314        if self.ignore_self_messages {
315            self.fetch_bot_person_id().await?;
316        }
317
318        // Step 2: Create KMS client
319        let kms = KmsClient::new(
320            self.http_do.clone(),
321            &token,
322            &reg.device_url,
323            &reg.user_id,
324            &reg.encryption_service_url,
325        );
326
327        // Get the response handler BEFORE storing the KMS client so the
328        // event loop can resolve pending requests without locking kms_client.
329        let response_handler = kms.response_handler();
330        *self.kms_response_handler.lock().await = Some(response_handler);
331        *self.kms_client.lock().await = Some(kms);
332
333        // Step 3: Connect Mercury WebSocket (KMS responses arrive here)
334        self.mercury_socket
335            .connect(&reg.web_socket_url, &token)
336            .await?;
337        info!("Mercury connected");
338
339        // Step 4: Start Mercury event loop
340        self.start_mercury_event_loop().await;
341
342        // Step 5: Initialize KMS (ECDH handshake — response comes via Mercury)
343        {
344            let mut kms_guard = self.kms_client.lock().await;
345            if let Some(ref mut kms) = *kms_guard {
346                kms.initialize().await?;
347            }
348        }
349        info!("KMS initialized");
350
351        // Store registration
352        *self.registration.lock().await = Some(reg);
353
354        Ok(())
355    }
356
357    /// Start processing Mercury events in a background task.
358    async fn start_mercury_event_loop(&self) {
359        let mut mercury_rx = match self.mercury_socket.take_event_rx().await {
360            Some(rx) => rx,
361            None => {
362                warn!("Mercury event receiver already taken");
363                return;
364            }
365        };
366
367        let kms_client = self.kms_client.clone();
368        let kms_response_handler = self.kms_response_handler.clone();
369        let event_tx = self.event_tx.clone();
370        let connected = self.connected.clone();
371        let registration = self.registration.clone();
372        let device_manager = self.device_manager.clone();
373        let token = self.token.clone();
374        let bot_person_id = self.bot_person_id.clone();
375
376        tokio::spawn(async move {
377            while let Some(event) = mercury_rx.recv().await {
378                match event {
379                    MercuryEvent::KmsResponse(data) => {
380                        // Use the separate response handler to avoid deadlock
381                        // with kms_client lock (held during initialize/get_key).
382                        let handler_guard = kms_response_handler.lock().await;
383                        if let Some(ref handler) = *handler_guard {
384                            handler.handle_kms_message(&data).await;
385                        }
386                    }
387                    MercuryEvent::Activity(activity) => {
388                        // Spawn in a separate task so the event loop can continue
389                        // processing KMS responses (needed for key retrieval during decryption).
390                        let kms_client_clone = kms_client.clone();
391                        let event_tx_clone = event_tx.clone();
392                        let bot_person_id = bot_person_id.clone();
393                        tokio::spawn(async move {
394                            let mut kms_guard = kms_client_clone.lock().await;
395                            if let Some(ref mut kms) = *kms_guard {
396                                let bot_id = bot_person_id.lock().await.clone();
397                                Self::handle_activity_static(kms, &activity, &event_tx_clone, bot_id.as_deref()).await;
398                            } else {
399                                warn!("Received activity but KMS client not initialized");
400                            }
401                        });
402                    }
403                    MercuryEvent::Connected => {
404                        info!("Mercury reconnected, refreshing device and KMS");
405
406                        // Refresh device
407                        let tok = token.lock().await.clone();
408                        {
409                            let reg_guard = registration.lock().await;
410                            if reg_guard.is_some() {
411                                let dm = device_manager.lock().await;
412                                match dm.refresh(&tok).await {
413                                    Ok(new_reg) => {
414                                        drop(reg_guard);
415                                        *registration.lock().await = Some(new_reg);
416                                    }
417                                    Err(e) => {
418                                        warn!("Device refresh on reconnect failed: {e}");
419                                    }
420                                }
421                            }
422                        }
423
424                        // Re-init KMS
425                        {
426                            let mut kms_guard = kms_client.lock().await;
427                            if let Some(ref mut kms) = *kms_guard {
428                                if let Err(e) = kms.initialize().await {
429                                    warn!("KMS re-init on reconnect failed: {e}");
430                                }
431                            }
432                        }
433
434                        *connected.lock().await = true;
435                        let _ = event_tx.send(HandlerEvent::Connected);
436                    }
437                    MercuryEvent::Disconnected(reason) => {
438                        *connected.lock().await = false;
439                        let _ = event_tx.send(HandlerEvent::Disconnected(reason));
440                    }
441                    MercuryEvent::Reconnecting(attempt) => {
442                        let _ = event_tx.send(HandlerEvent::Reconnecting(attempt));
443                    }
444                    MercuryEvent::Error(msg) => {
445                        let _ = event_tx.send(HandlerEvent::Error(msg));
446                    }
447                }
448            }
449        });
450    }
451
452    /// Handle a single activity (decrypt and route).
453    async fn handle_activity_static(
454        kms: &mut KmsClient,
455        activity: &MercuryActivity,
456        event_tx: &mpsc::UnboundedSender<HandlerEvent>,
457        bot_person_id: Option<&str>,
458    ) {
459        // message:created — verb=post + objectType=comment
460        if activity.verb == "post" && activity.object.object_type == "comment" {
461            let mut decryptor = MessageDecryptor::new(kms);
462            match decryptor.decrypt_activity(activity).await {
463                Ok(decrypted) => {
464                    let msg = DecryptedMessage {
465                        id: decrypted.object.id.clone(),
466                        room_id: decrypted.target.id.clone(),
467                        person_id: decrypted.actor.id.clone(),
468                        person_email: decrypted
469                            .actor
470                            .email_address
471                            .clone()
472                            .unwrap_or_default(),
473                        text: decrypted.object.display_name.clone().unwrap_or_default(),
474                        html: decrypted.object.content.clone(),
475                        created: decrypted.published.clone(),
476                        room_type: infer_room_type(&decrypted),
477                        raw: decrypted,
478                    };
479
480                    // Filter self-messages if enabled
481                    if let Some(bot_id) = bot_person_id {
482                        if extract_person_uuid(&msg.person_id) == bot_id {
483                            info!("Ignoring self-message from bot ({})", bot_id);
484                            return;
485                        }
486                    }
487
488                    let _ = event_tx.send(HandlerEvent::MessageCreated(msg));
489                }
490                Err(e) => {
491                    error!("Error decrypting activity: {e}");
492                    let _ = event_tx.send(HandlerEvent::Error(e.to_string()));
493                }
494            }
495            return;
496        }
497
498        // message:deleted — verb=delete + objectType=activity
499        if activity.verb == "delete" && activity.object.object_type == "activity" {
500            let _ = event_tx.send(HandlerEvent::MessageDeleted(DeletedMessage {
501                message_id: activity.object.id.clone(),
502                room_id: activity.target.id.clone(),
503                person_id: activity.actor.id.clone(),
504            }));
505            return;
506        }
507
508        // membership:created — membership verbs + objectType=person
509        let membership_verbs = ["add", "leave", "assignModerator", "unassignModerator"];
510        if membership_verbs.contains(&activity.verb.as_str())
511            && activity.object.object_type == "person"
512        {
513            let _ = event_tx.send(HandlerEvent::MembershipCreated(MembershipActivity {
514                id: activity.id.clone(),
515                actor_id: activity.actor.id.clone(),
516                person_id: activity.object.id.clone(),
517                room_id: activity.target.id.clone(),
518                action: activity.verb.clone(),
519                created: activity.published.clone(),
520                room_type: infer_room_type(activity),
521                raw: activity.clone(),
522            }));
523        }
524    }
525
526    /// Disconnect from Webex.
527    pub async fn disconnect(&self) {
528        info!("Disconnecting from Webex...");
529        *self.connected.lock().await = false;
530
531        self.mercury_socket.disconnect().await;
532
533        let token = self.token.lock().await.clone();
534        {
535            let reg = self.registration.lock().await;
536            if reg.is_some() {
537                let mut dm = self.device_manager.lock().await;
538                if let Err(e) = dm.unregister(&token).await {
539                    warn!("Failed to unregister device: {e}");
540                } else {
541                    info!("Device unregistered");
542                }
543            }
544        }
545
546        *self.registration.lock().await = None;
547        *self.kms_client.lock().await = None;
548        *self.kms_response_handler.lock().await = None;
549        *self.bot_person_id.lock().await = None;
550    }
551
552    /// Update the access token and re-establish the connection.
553    pub async fn reconnect(&self, new_token: &str) -> Result<(), WebexError> {
554        if new_token.is_empty() {
555            return Err(WebexError::Internal(
556                "reconnect() requires a non-empty token string".into(),
557            ));
558        }
559
560        info!("Reconnecting with new token...");
561        self.disconnect().await;
562
563        *self.token.lock().await = new_token.to_string();
564        self.connect().await
565    }
566
567    /// Whether the handler is fully connected.
568    pub async fn connected(&self) -> bool {
569        let conn = *self.connected.lock().await;
570        conn && self.mercury_socket.connected().await
571    }
572
573    /// Returns a structured health check of all connection subsystems.
574    pub async fn status(&self) -> HandlerStatus {
575        let reconnect_attempt = self.mercury_socket.current_reconnect_attempts().await;
576        let ws_open = self.mercury_socket.connected().await;
577        let is_connected = *self.connected.lock().await;
578        let is_connecting = *self.connecting.lock().await;
579
580        let status = if is_connected && ws_open {
581            ConnectionStatus::Connected
582        } else if is_connecting {
583            ConnectionStatus::Connecting
584        } else if reconnect_attempt > 0 {
585            ConnectionStatus::Reconnecting
586        } else {
587            ConnectionStatus::Disconnected
588        };
589
590        HandlerStatus {
591            status,
592            web_socket_open: ws_open,
593            kms_initialized: self.kms_client.lock().await.is_some(),
594            device_registered: self.registration.lock().await.is_some(),
595            reconnect_attempt,
596        }
597    }
598}
599
600fn infer_room_type(activity: &MercuryActivity) -> Option<String> {
601    let tags = &activity.target.tags;
602    if tags.contains(&"ONE_ON_ONE".to_string()) {
603        return Some("direct".to_string());
604    }
605    if tags.contains(&"TEAM".to_string())
606        || tags.contains(&"LOCKED".to_string())
607        || tags.contains(&"GROUP".to_string())
608    {
609        return Some("group".to_string());
610    }
611    None
612}