Skip to main content

webex_message_handler/
handler.rs

1//! Main orchestrator: device registration, Mercury WebSocket, KMS, decryption.
2
3use crate::device_manager::DeviceManager;
4use crate::errors::WebexError;
5use crate::kms_client::{KmsClient, KmsResponseHandler};
6use crate::mercury_socket::{MercuryEvent, MercurySocket};
7use crate::message_decryptor::MessageDecryptor;
8use crate::types::{
9    Config, ConnectionStatus, DecryptedMessage, DeletedMessage, DeviceRegistration, FetchRequest,
10    FetchResponse, HandlerStatus, MembershipActivity, MercuryActivity, NetworkMode,
11};
12use std::collections::HashMap;
13use std::future::Future;
14use std::pin::Pin;
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17use tokio::sync::{mpsc, Mutex};
18use tracing::{error, info, warn};
19
20// Internal adapter type (alias for public type)
21type HttpDoFn = Arc<
22    dyn Fn(FetchRequest) -> Pin<Box<dyn Future<Output = Result<FetchResponse, Box<dyn std::error::Error + Send + Sync>>> + Send>>
23        + Send
24        + Sync,
25>;
26
27/// Events emitted by WebexMessageHandler.
28#[derive(Debug, Clone)]
29pub enum HandlerEvent {
30    /// A new message was received and decrypted.
31    MessageCreated(DecryptedMessage),
32    /// A message was deleted.
33    MessageDeleted(DeletedMessage),
34    /// A membership event occurred (add, leave, assignModerator, unassignModerator).
35    MembershipCreated(MembershipActivity),
36    /// Successfully connected (or reconnected).
37    Connected,
38    /// Disconnected with a reason string.
39    Disconnected(String),
40    /// Reconnecting (attempt number).
41    Reconnecting(u32),
42    /// An error occurred.
43    Error(String),
44}
45
46/// Extract the raw UUID from a Webex person ID.
47///
48/// The Webex REST API returns base64-encoded IDs like:
49///   `"Y2lzY29zcGFyazovL3VzL1BFT1BMRS9mYjUx..."` → `"ciscospark://us/PEOPLE/fb51254f-..."`
50///
51/// Mercury wire format uses raw UUIDs:
52///   `"fb51254f-3b37-4e50-aa04-45744c2effc7"`
53///
54/// This function normalizes both formats to the raw UUID for comparison.
55fn extract_person_uuid(id: &str) -> String {
56    use base64::engine::general_purpose::{STANDARD, STANDARD_NO_PAD};
57    use base64::Engine;
58
59    // Try standard base64, then no-padding variant
60    let decoded_bytes = STANDARD
61        .decode(id)
62        .or_else(|_| STANDARD_NO_PAD.decode(id));
63
64    if let Ok(bytes) = decoded_bytes {
65        if let Ok(decoded) = String::from_utf8(bytes) {
66            if decoded.starts_with("ciscospark://") {
67                if let Some(uuid) = decoded.rsplit('/').next() {
68                    if !uuid.is_empty() {
69                        return uuid.to_string();
70                    }
71                }
72            }
73        }
74    }
75
76    id.to_string()
77}
78
79/// Create a native HTTP adapter that wraps reqwest::Client.
80fn create_native_http_adapter(client: reqwest::Client) -> HttpDoFn {
81    Arc::new(move |req: FetchRequest| {
82        let client = client.clone();
83        Box::pin(async move {
84            let mut request_builder = match req.method.as_str() {
85                "GET" => client.get(&req.url),
86                "POST" => client.post(&req.url),
87                "PUT" => client.put(&req.url),
88                "DELETE" => client.delete(&req.url),
89                _ => return Err(format!("Unsupported HTTP method: {}", req.method).into()),
90            };
91
92            for (key, value) in req.headers {
93                request_builder = request_builder.header(key, value);
94            }
95
96            if let Some(body) = req.body {
97                request_builder = request_builder.body(body);
98            }
99
100            let response = request_builder
101                .send()
102                .await
103                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
104
105            let status = response.status().as_u16();
106            let ok = response.status().is_success();
107            let body_bytes = response
108                .bytes()
109                .await
110                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?
111                .to_vec();
112
113            Ok(FetchResponse {
114                status,
115                ok,
116                body: body_bytes,
117            })
118        })
119    })
120}
121
122/// Receives and decrypts Webex messages over Mercury WebSocket.
123pub struct WebexMessageHandler {
124    token: Arc<Mutex<String>>,
125    http_do: HttpDoFn,
126    device_manager: Arc<Mutex<DeviceManager>>,
127    mercury_socket: Arc<MercurySocket>,
128    kms_client: Arc<Mutex<Option<KmsClient>>>,
129    /// Separate handle for resolving KMS responses without locking kms_client.
130    kms_response_handler: Arc<Mutex<Option<KmsResponseHandler>>>,
131    registration: Arc<Mutex<Option<DeviceRegistration>>>,
132    connected: Arc<Mutex<bool>>,
133    connecting: Arc<Mutex<bool>>,
134    ignore_self_messages: bool,
135    bot_person_id: Arc<Mutex<Option<String>>>,
136    /// Track recent activity IDs to prevent replay attacks (activity_id -> Instant)
137    recent_activity_ids: Arc<Mutex<HashMap<String, Instant>>>,
138
139    #[allow(dead_code)]
140    config: Config,
141    event_tx: mpsc::UnboundedSender<HandlerEvent>,
142    event_rx: Arc<Mutex<Option<mpsc::UnboundedReceiver<HandlerEvent>>>>,
143}
144
145impl WebexMessageHandler {
146    /// Create a new WebexMessageHandler.
147    pub fn new(config: Config) -> Result<Self, WebexError> {
148        if config.token.is_empty() {
149            return Err(WebexError::Internal(
150                "WebexMessageHandler requires a non-empty token string".into(),
151            ));
152        }
153
154        // Validate networking mode configuration
155        match config.mode {
156            NetworkMode::Injected => {
157                if config.fetch.is_none() || config.web_socket_factory.is_none() {
158                    return Err(WebexError::Internal(
159                        "Injected mode requires both fetch and web_socket_factory".into(),
160                    ));
161                }
162                if config.client.is_some() {
163                    return Err(WebexError::Internal(
164                        "Cannot use native proxy parameters (client) in injected mode".into(),
165                    ));
166                }
167            }
168            NetworkMode::Native => {
169                if config.fetch.is_some() || config.web_socket_factory.is_some() {
170                    return Err(WebexError::Internal(
171                        "Cannot provide fetch/web_socket_factory in native mode — set mode to Injected".into(),
172                    ));
173                }
174            }
175        }
176
177        // Create adapters based on mode
178        let (http_do, ws_factory) = match config.mode {
179            NetworkMode::Native => {
180                let client = config.client.clone().unwrap_or_default();
181                let http_adapter = create_native_http_adapter(client.clone());
182                (http_adapter, None)
183            }
184            NetworkMode::Injected => {
185                let http_adapter = config.fetch.clone().expect("Injected mode requires fetch adapter");
186                let ws_factory = config.web_socket_factory.clone();
187                (http_adapter, ws_factory)
188            }
189        };
190
191        let mercury_socket = MercurySocket::new(
192            ws_factory,
193            Duration::from_secs_f64(config.ping_interval),
194            Duration::from_secs_f64(config.pong_timeout),
195            Duration::from_secs_f64(config.reconnect_backoff_max),
196            config.max_reconnect_attempts,
197        );
198
199        let (event_tx, event_rx) = mpsc::unbounded_channel();
200
201        let ignore_self_messages = config.ignore_self_messages;
202
203        Ok(Self {
204            token: Arc::new(Mutex::new(config.token.clone())),
205            http_do: http_do.clone(),
206            device_manager: Arc::new(Mutex::new(DeviceManager::new(http_do.clone()))),
207            mercury_socket: Arc::new(mercury_socket),
208            kms_client: Arc::new(Mutex::new(None)),
209            kms_response_handler: Arc::new(Mutex::new(None)),
210            registration: Arc::new(Mutex::new(None)),
211            connected: Arc::new(Mutex::new(false)),
212            connecting: Arc::new(Mutex::new(false)),
213            ignore_self_messages,
214            bot_person_id: Arc::new(Mutex::new(None)),
215            recent_activity_ids: Arc::new(Mutex::new(HashMap::new())),
216            config,
217            event_tx,
218            event_rx: Arc::new(Mutex::new(Some(event_rx))),
219        })
220    }
221
222    /// Take the event receiver. Can only be called once.
223    pub async fn take_event_rx(&self) -> Option<mpsc::UnboundedReceiver<HandlerEvent>> {
224        self.event_rx.lock().await.take()
225    }
226
227    /// Connect to Webex (register device, connect Mercury, init KMS).
228    pub async fn connect(&self) -> Result<(), WebexError> {
229        {
230            let connecting = self.connecting.lock().await;
231            let connected = self.connected.lock().await;
232            if *connecting {
233                return Err(WebexError::Internal("connect() already in progress".into()));
234            }
235            if *connected {
236                return Err(WebexError::Internal(
237                    "Already connected. Call disconnect() first, or use reconnect().".into(),
238                ));
239            }
240        }
241
242        info!("Connecting to Webex...");
243        *self.connecting.lock().await = true;
244
245        let result = self.connect_internal().await;
246
247        *self.connecting.lock().await = false;
248
249        match result {
250            Ok(()) => {
251                *self.connected.lock().await = true;
252                info!("Connected to Webex");
253                if self.event_tx.send(HandlerEvent::Connected).is_err() {
254                    warn!("Event receiver dropped, cannot send Connected event");
255                }
256                Ok(())
257            }
258            Err(e) => Err(e),
259        }
260    }
261
262    async fn fetch_bot_person_id(&self) -> Result<(), WebexError> {
263        info!("Fetching bot person info for self-message filtering");
264        let token = self.token.lock().await.clone();
265        let req = FetchRequest {
266            url: "https://webexapis.com/v1/people/me".into(),
267            method: "GET".into(),
268            headers: {
269                let mut h = std::collections::HashMap::new();
270                h.insert("Authorization".into(), format!("Bearer {}", token));
271                h.insert("Content-Type".into(), "application/json".into());
272                h
273            },
274            body: None,
275        };
276
277        let resp = (self.http_do)(req).await.map_err(|e| {
278            WebexError::Internal(format!(
279                "Failed to fetch bot identity for self-message filtering: {e}. \
280                 Set ignore_self_messages to false to skip this check (not recommended — may cause message loops)."
281            ))
282        })?;
283
284        if !resp.ok {
285            return Err(WebexError::Internal(format!(
286                "Failed to fetch bot identity for self-message filtering: HTTP {}. \
287                 Set ignore_self_messages to false to skip this check (not recommended — may cause message loops).",
288                resp.status
289            )));
290        }
291
292        let data: serde_json::Value = serde_json::from_slice(&resp.body).map_err(|e| {
293            WebexError::Internal(format!("Failed to parse bot identity response: {e}"))
294        })?;
295
296        let id = data
297            .get("id")
298            .and_then(|v| v.as_str())
299            .ok_or_else(|| WebexError::Internal("Bot identity response missing 'id' field".into()))?;
300
301        let uuid = extract_person_uuid(id);
302        info!("Bot person ID cached for self-message filtering: {}", uuid);
303        *self.bot_person_id.lock().await = Some(uuid);
304        Ok(())
305    }
306
307    async fn connect_internal(&self) -> Result<(), WebexError> {
308        let token = self.token.lock().await.clone();
309
310        // Step 1: Register device with WDM
311        let reg = {
312            let mut dm = self.device_manager.lock().await;
313            dm.register(&token).await?
314        };
315        info!("Device registered");
316
317        // Step 1.5: Fetch bot person info if self-message filtering is enabled
318        if self.ignore_self_messages {
319            self.fetch_bot_person_id().await?;
320        }
321
322        // Step 2: Create KMS client
323        let kms = KmsClient::new(
324            self.http_do.clone(),
325            &token,
326            &reg.device_url,
327            &reg.user_id,
328            &reg.encryption_service_url,
329        );
330
331        // Get the response handler BEFORE storing the KMS client so the
332        // event loop can resolve pending requests without locking kms_client.
333        let response_handler = kms.response_handler();
334        *self.kms_response_handler.lock().await = Some(response_handler);
335        *self.kms_client.lock().await = Some(kms);
336
337        // Step 3: Connect Mercury WebSocket (KMS responses arrive here)
338        self.mercury_socket
339            .connect(&reg.web_socket_url, &token)
340            .await?;
341        info!("Mercury connected");
342
343        // Step 4: Start Mercury event loop
344        self.start_mercury_event_loop().await;
345
346        // Step 5: Initialize KMS (ECDH handshake — response comes via Mercury)
347        {
348            let mut kms_guard = self.kms_client.lock().await;
349            if let Some(ref mut kms) = *kms_guard {
350                kms.initialize().await?;
351            }
352        }
353        info!("KMS initialized");
354
355        // Store registration
356        *self.registration.lock().await = Some(reg);
357
358        Ok(())
359    }
360
361    /// Start processing Mercury events in a background task.
362    async fn start_mercury_event_loop(&self) {
363        let mut mercury_rx = match self.mercury_socket.take_event_rx().await {
364            Some(rx) => rx,
365            None => {
366                warn!("Mercury event receiver already taken");
367                return;
368            }
369        };
370
371        let kms_client = self.kms_client.clone();
372        let kms_response_handler = self.kms_response_handler.clone();
373        let event_tx = self.event_tx.clone();
374        let connected = self.connected.clone();
375        let registration = self.registration.clone();
376        let device_manager = self.device_manager.clone();
377        let token = self.token.clone();
378        let bot_person_id = self.bot_person_id.clone();
379        let recent_activity_ids = self.recent_activity_ids.clone();
380
381        tokio::spawn(async move {
382            let mut sweep_interval = tokio::time::interval(Duration::from_secs(30));
383            loop {
384                tokio::select! {
385                    Some(event) = mercury_rx.recv() => {
386                match event {
387                    MercuryEvent::KmsResponse(data) => {
388                        // Use the separate response handler to avoid deadlock
389                        // with kms_client lock (held during initialize/get_key).
390                        let handler_guard = kms_response_handler.lock().await;
391                        if let Some(ref handler) = *handler_guard {
392                            handler.handle_kms_message(&data).await;
393                        }
394                    }
395                    MercuryEvent::Activity(activity) => {
396                        // Spawn in a separate task so the event loop can continue
397                        // processing KMS responses (needed for key retrieval during decryption).
398                        let kms_client_clone = kms_client.clone();
399                        let event_tx_clone = event_tx.clone();
400                        let bot_person_id = bot_person_id.clone();
401                        tokio::spawn(async move {
402                            let mut kms_guard = kms_client_clone.lock().await;
403                            if let Some(ref mut kms) = *kms_guard {
404                                let bot_id = bot_person_id.lock().await.clone();
405                                Self::handle_activity_static(kms, &activity, &event_tx_clone, bot_id.as_deref()).await;
406                            } else {
407                                warn!("Received activity but KMS client not initialized");
408                            }
409                        });
410                    }
411                    MercuryEvent::Connected => {
412                        info!("Mercury reconnected, refreshing device and KMS");
413
414                        // Refresh device
415                        let tok = token.lock().await.clone();
416                        {
417                            let reg_guard = registration.lock().await;
418                            if reg_guard.is_some() {
419                                let dm = device_manager.lock().await;
420                                match dm.refresh(&tok).await {
421                                    Ok(new_reg) => {
422                                        drop(reg_guard);
423                                        *registration.lock().await = Some(new_reg);
424                                    }
425                                    Err(e) => {
426                                        warn!("Device refresh on reconnect failed: {e}");
427                                    }
428                                }
429                            }
430                        }
431
432                        // Re-init KMS
433                        {
434                            let mut kms_guard = kms_client.lock().await;
435                            if let Some(ref mut kms) = *kms_guard {
436                                if let Err(e) = kms.initialize().await {
437                                    warn!("KMS re-init on reconnect failed: {e}");
438                                }
439                            }
440                        }
441
442                        *connected.lock().await = true;
443                        if event_tx.send(HandlerEvent::Connected).is_err() {
444                            warn!("Event receiver dropped, cannot send Connected event");
445                        }
446                    }
447                    MercuryEvent::Disconnected(reason) => {
448                        *connected.lock().await = false;
449                        if event_tx.send(HandlerEvent::Disconnected(reason)).is_err() {
450                            warn!("Event receiver dropped, cannot send Disconnected event");
451                        }
452                    }
453                    MercuryEvent::Reconnecting(attempt) => {
454                        if event_tx.send(HandlerEvent::Reconnecting(attempt)).is_err() {
455                            warn!("Event receiver dropped, cannot send Reconnecting event");
456                        }
457                    }
458                    MercuryEvent::Error(msg) => {
459                        if event_tx.send(HandlerEvent::Error(msg)).is_err() {
460                            warn!("Event receiver dropped, cannot send Error event");
461                        }
462                    }
463                }
464                    }
465                    _ = sweep_interval.tick() => {
466                        let mut ids = recent_activity_ids.lock().await;
467                        let cutoff = Instant::now() - Duration::from_secs(300);
468                        ids.retain(|_, &mut t| t > cutoff);
469                    }
470                }
471            }
472        });
473    }
474
475    /// Handle a single activity (decrypt and route).
476    async fn handle_activity_static(
477        kms: &mut KmsClient,
478        activity: &MercuryActivity,
479        event_tx: &mpsc::UnboundedSender<HandlerEvent>,
480        bot_person_id: Option<&str>,
481    ) {
482        // message:created — verb=post + objectType=comment
483        if activity.verb == "post" && activity.object.object_type == "comment" {
484            let mut decryptor = MessageDecryptor::new(kms);
485            match decryptor.decrypt_activity(activity).await {
486                Ok(decrypted) => {
487                    let msg = DecryptedMessage {
488                        id: decrypted.object.id.clone(),
489                        room_id: decrypted.target.id.clone(),
490                        person_id: decrypted.actor.id.clone(),
491                        person_email: decrypted
492                            .actor
493                            .email_address
494                            .clone()
495                            .unwrap_or_default(),
496                        text: decrypted.object.display_name.clone().unwrap_or_default(),
497                        html: decrypted.object.content.clone(),
498                        created: decrypted.published.clone(),
499                        room_type: infer_room_type(&decrypted),
500                        raw: decrypted,
501                    };
502
503                    // Filter self-messages if enabled
504                    if let Some(bot_id) = bot_person_id {
505                        if extract_person_uuid(&msg.person_id) == bot_id {
506                            info!("Ignoring self-message from bot ({})", bot_id);
507                            return;
508                        }
509                    }
510
511                    if event_tx.send(HandlerEvent::MessageCreated(msg)).is_err() {
512                        warn!("Event receiver dropped, cannot send MessageCreated event");
513                    }
514                }
515                Err(e) => {
516                    error!("Error decrypting activity: {e}");
517                    if event_tx.send(HandlerEvent::Error(e.to_string())).is_err() {
518                        warn!("Event receiver dropped, cannot send Error event");
519                    }
520                }
521            }
522            return;
523        }
524
525        // message:deleted — verb=delete + objectType=activity
526        if activity.verb == "delete" && activity.object.object_type == "activity" {
527            if event_tx.send(HandlerEvent::MessageDeleted(DeletedMessage {
528                message_id: activity.object.id.clone(),
529                room_id: activity.target.id.clone(),
530                person_id: activity.actor.id.clone(),
531            })).is_err() {
532                warn!("Event receiver dropped, cannot send MessageDeleted event");
533            }
534            return;
535        }
536
537        // membership:created — membership verbs + objectType=person
538        let membership_verbs = ["add", "leave", "assignModerator", "unassignModerator"];
539        if membership_verbs.contains(&activity.verb.as_str())
540            && activity.object.object_type == "person"
541        {
542            let event = HandlerEvent::MembershipCreated(MembershipActivity {
543                id: activity.id.clone(),
544                actor_id: activity.actor.id.clone(),
545                person_id: activity.object.id.clone(),
546                room_id: activity.target.id.clone(),
547                action: activity.verb.clone(),
548                created: activity.published.clone(),
549                room_type: infer_room_type(activity),
550                raw: activity.clone(),
551            });
552            if event_tx.send(event).is_err() {
553                warn!("Event receiver dropped, cannot send MembershipCreated event");
554            }
555        }
556    }
557
558    /// Disconnect from Webex.
559    pub async fn disconnect(&self) {
560        info!("Disconnecting from Webex...");
561        *self.connected.lock().await = false;
562
563        self.mercury_socket.disconnect().await;
564
565        let token = self.token.lock().await.clone();
566        {
567            let reg = self.registration.lock().await;
568            if reg.is_some() {
569                let mut dm = self.device_manager.lock().await;
570                if let Err(e) = dm.unregister(&token).await {
571                    warn!("Failed to unregister device: {e}");
572                } else {
573                    info!("Device unregistered");
574                }
575            }
576        }
577
578        *self.registration.lock().await = None;
579        *self.kms_client.lock().await = None;
580        *self.kms_response_handler.lock().await = None;
581        *self.bot_person_id.lock().await = None;
582    }
583
584    /// Update the access token and re-establish the connection.
585    pub async fn reconnect(&self, new_token: &str) -> Result<(), WebexError> {
586        if new_token.is_empty() {
587            return Err(WebexError::Internal(
588                "reconnect() requires a non-empty token string".into(),
589            ));
590        }
591
592        info!("Reconnecting with new token...");
593        self.disconnect().await;
594
595        *self.token.lock().await = new_token.to_string();
596        self.connect().await
597    }
598
599    /// Whether the handler is fully connected.
600    pub async fn connected(&self) -> bool {
601        let conn = *self.connected.lock().await;
602        conn && self.mercury_socket.connected().await
603    }
604
605    /// Returns a structured health check of all connection subsystems.
606    pub async fn status(&self) -> HandlerStatus {
607        let reconnect_attempt = self.mercury_socket.current_reconnect_attempts().await;
608        let ws_open = self.mercury_socket.connected().await;
609        let is_connected = *self.connected.lock().await;
610        let is_connecting = *self.connecting.lock().await;
611
612        let status = if is_connected && ws_open {
613            ConnectionStatus::Connected
614        } else if is_connecting {
615            ConnectionStatus::Connecting
616        } else if reconnect_attempt > 0 {
617            ConnectionStatus::Reconnecting
618        } else {
619            ConnectionStatus::Disconnected
620        };
621
622        HandlerStatus {
623            status,
624            web_socket_open: ws_open,
625            kms_initialized: self.kms_client.lock().await.is_some(),
626            device_registered: self.registration.lock().await.is_some(),
627            reconnect_attempt,
628        }
629    }
630}
631
632fn infer_room_type(activity: &MercuryActivity) -> Option<String> {
633    let tags = &activity.target.tags;
634    if tags.contains(&"ONE_ON_ONE".to_string()) {
635        return Some("direct".to_string());
636    }
637    if tags.contains(&"TEAM".to_string())
638        || tags.contains(&"LOCKED".to_string())
639        || tags.contains(&"GROUP".to_string())
640    {
641        return Some("group".to_string());
642    }
643    None
644}