Skip to main content

webex_message_handler/
handler.rs

1//! Main orchestrator: device registration, Mercury WebSocket, KMS, decryption.
2
3use crate::device_manager::DeviceManager;
4use crate::errors::WebexError;
5use crate::kms_client::{KmsClient, KmsResponseHandler};
6use crate::mercury_socket::{MercuryEvent, MercurySocket};
7use crate::message_decryptor::MessageDecryptor;
8use crate::types::{
9    Config, ConnectionStatus, DecryptedMessage, DeletedMessage, DeviceRegistration, FetchRequest,
10    FetchResponse, HandlerStatus, MembershipActivity, MercuryActivity, NetworkMode,
11};
12use std::future::Future;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::{mpsc, Mutex};
17use tracing::{error, info, warn};
18
19// Internal adapter type (alias for public type)
20type HttpDoFn = Arc<
21    dyn Fn(FetchRequest) -> Pin<Box<dyn Future<Output = Result<FetchResponse, Box<dyn std::error::Error + Send + Sync>>> + Send>>
22        + Send
23        + Sync,
24>;
25
26/// Events emitted by WebexMessageHandler.
27#[derive(Debug, Clone)]
28pub enum HandlerEvent {
29    /// A new message was received and decrypted.
30    MessageCreated(DecryptedMessage),
31    /// A message was deleted.
32    MessageDeleted(DeletedMessage),
33    /// A membership event occurred (add, leave, assignModerator, unassignModerator).
34    MembershipCreated(MembershipActivity),
35    /// Successfully connected (or reconnected).
36    Connected,
37    /// Disconnected with a reason string.
38    Disconnected(String),
39    /// Reconnecting (attempt number).
40    Reconnecting(u32),
41    /// An error occurred.
42    Error(String),
43}
44
45/// Extract the raw UUID from a Webex person ID.
46///
47/// The Webex REST API returns base64-encoded IDs like:
48///   `"Y2lzY29zcGFyazovL3VzL1BFT1BMRS9mYjUx..."` → `"ciscospark://us/PEOPLE/fb51254f-..."`
49///
50/// Mercury wire format uses raw UUIDs:
51///   `"fb51254f-3b37-4e50-aa04-45744c2effc7"`
52///
53/// This function normalizes both formats to the raw UUID for comparison.
54fn extract_person_uuid(id: &str) -> String {
55    use base64::engine::general_purpose::{STANDARD, STANDARD_NO_PAD};
56    use base64::Engine;
57
58    // Try standard base64, then no-padding variant
59    let decoded_bytes = STANDARD
60        .decode(id)
61        .or_else(|_| STANDARD_NO_PAD.decode(id));
62
63    if let Ok(bytes) = decoded_bytes {
64        if let Ok(decoded) = String::from_utf8(bytes) {
65            if decoded.starts_with("ciscospark://") {
66                if let Some(uuid) = decoded.rsplit('/').next() {
67                    if !uuid.is_empty() {
68                        return uuid.to_string();
69                    }
70                }
71            }
72        }
73    }
74
75    id.to_string()
76}
77
78/// Create a native HTTP adapter that wraps reqwest::Client.
79fn create_native_http_adapter(client: reqwest::Client) -> HttpDoFn {
80    Arc::new(move |req: FetchRequest| {
81        let client = client.clone();
82        Box::pin(async move {
83            let mut request_builder = match req.method.as_str() {
84                "GET" => client.get(&req.url),
85                "POST" => client.post(&req.url),
86                "PUT" => client.put(&req.url),
87                "DELETE" => client.delete(&req.url),
88                _ => return Err(format!("Unsupported HTTP method: {}", req.method).into()),
89            };
90
91            for (key, value) in req.headers {
92                request_builder = request_builder.header(key, value);
93            }
94
95            if let Some(body) = req.body {
96                request_builder = request_builder.body(body);
97            }
98
99            let response = request_builder
100                .send()
101                .await
102                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
103
104            let status = response.status().as_u16();
105            let ok = response.status().is_success();
106            let body_bytes = response
107                .bytes()
108                .await
109                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?
110                .to_vec();
111
112            Ok(FetchResponse {
113                status,
114                ok,
115                body: body_bytes,
116            })
117        })
118    })
119}
120
121/// Receives and decrypts Webex messages over Mercury WebSocket.
122pub struct WebexMessageHandler {
123    token: Arc<Mutex<String>>,
124    http_do: HttpDoFn,
125    device_manager: Arc<Mutex<DeviceManager>>,
126    mercury_socket: Arc<MercurySocket>,
127    kms_client: Arc<Mutex<Option<KmsClient>>>,
128    /// Separate handle for resolving KMS responses without locking kms_client.
129    kms_response_handler: Arc<Mutex<Option<KmsResponseHandler>>>,
130    registration: Arc<Mutex<Option<DeviceRegistration>>>,
131    connected: Arc<Mutex<bool>>,
132    connecting: Arc<Mutex<bool>>,
133    ignore_self_messages: bool,
134    bot_person_id: Arc<Mutex<Option<String>>>,
135
136    #[allow(dead_code)]
137    config: Config,
138    event_tx: mpsc::UnboundedSender<HandlerEvent>,
139    event_rx: Arc<Mutex<Option<mpsc::UnboundedReceiver<HandlerEvent>>>>,
140}
141
142impl WebexMessageHandler {
143    /// Create a new WebexMessageHandler.
144    pub fn new(config: Config) -> Result<Self, WebexError> {
145        if config.token.is_empty() {
146            return Err(WebexError::Internal(
147                "WebexMessageHandler requires a non-empty token string".into(),
148            ));
149        }
150
151        // Validate networking mode configuration
152        match config.mode {
153            NetworkMode::Injected => {
154                if config.fetch.is_none() || config.web_socket_factory.is_none() {
155                    return Err(WebexError::Internal(
156                        "Injected mode requires both fetch and web_socket_factory".into(),
157                    ));
158                }
159                if config.client.is_some() {
160                    return Err(WebexError::Internal(
161                        "Cannot use native proxy parameters (client) in injected mode".into(),
162                    ));
163                }
164            }
165            NetworkMode::Native => {
166                if config.fetch.is_some() || config.web_socket_factory.is_some() {
167                    return Err(WebexError::Internal(
168                        "Cannot provide fetch/web_socket_factory in native mode — set mode to Injected".into(),
169                    ));
170                }
171            }
172        }
173
174        // Create adapters based on mode
175        let (http_do, ws_factory) = match config.mode {
176            NetworkMode::Native => {
177                let client = config.client.clone().unwrap_or_default();
178                let http_adapter = create_native_http_adapter(client.clone());
179                (http_adapter, None)
180            }
181            NetworkMode::Injected => {
182                let http_adapter = config.fetch.clone().expect("Injected mode requires fetch adapter");
183                let ws_factory = config.web_socket_factory.clone();
184                (http_adapter, ws_factory)
185            }
186        };
187
188        let mercury_socket = MercurySocket::new(
189            ws_factory,
190            Duration::from_secs_f64(config.ping_interval),
191            Duration::from_secs_f64(config.pong_timeout),
192            Duration::from_secs_f64(config.reconnect_backoff_max),
193            config.max_reconnect_attempts,
194        );
195
196        let (event_tx, event_rx) = mpsc::unbounded_channel();
197
198        let ignore_self_messages = config.ignore_self_messages;
199
200        Ok(Self {
201            token: Arc::new(Mutex::new(config.token.clone())),
202            http_do: http_do.clone(),
203            device_manager: Arc::new(Mutex::new(DeviceManager::new(http_do.clone()))),
204            mercury_socket: Arc::new(mercury_socket),
205            kms_client: Arc::new(Mutex::new(None)),
206            kms_response_handler: Arc::new(Mutex::new(None)),
207            registration: Arc::new(Mutex::new(None)),
208            connected: Arc::new(Mutex::new(false)),
209            connecting: Arc::new(Mutex::new(false)),
210            ignore_self_messages,
211            bot_person_id: Arc::new(Mutex::new(None)),
212            config,
213            event_tx,
214            event_rx: Arc::new(Mutex::new(Some(event_rx))),
215        })
216    }
217
218    /// Take the event receiver. Can only be called once.
219    pub async fn take_event_rx(&self) -> Option<mpsc::UnboundedReceiver<HandlerEvent>> {
220        self.event_rx.lock().await.take()
221    }
222
223    /// Connect to Webex (register device, connect Mercury, init KMS).
224    pub async fn connect(&self) -> Result<(), WebexError> {
225        {
226            let connecting = self.connecting.lock().await;
227            let connected = self.connected.lock().await;
228            if *connecting {
229                return Err(WebexError::Internal("connect() already in progress".into()));
230            }
231            if *connected {
232                return Err(WebexError::Internal(
233                    "Already connected. Call disconnect() first, or use reconnect().".into(),
234                ));
235            }
236        }
237
238        info!("Connecting to Webex...");
239        *self.connecting.lock().await = true;
240
241        let result = self.connect_internal().await;
242
243        *self.connecting.lock().await = false;
244
245        match result {
246            Ok(()) => {
247                *self.connected.lock().await = true;
248                info!("Connected to Webex");
249                if self.event_tx.send(HandlerEvent::Connected).is_err() {
250                    warn!("Event receiver dropped, cannot send Connected event");
251                }
252                Ok(())
253            }
254            Err(e) => Err(e),
255        }
256    }
257
258    async fn fetch_bot_person_id(&self) -> Result<(), WebexError> {
259        info!("Fetching bot person info for self-message filtering");
260        let token = self.token.lock().await.clone();
261        let req = FetchRequest {
262            url: "https://webexapis.com/v1/people/me".into(),
263            method: "GET".into(),
264            headers: {
265                let mut h = std::collections::HashMap::new();
266                h.insert("Authorization".into(), format!("Bearer {}", token));
267                h.insert("Content-Type".into(), "application/json".into());
268                h
269            },
270            body: None,
271        };
272
273        let resp = (self.http_do)(req).await.map_err(|e| {
274            WebexError::Internal(format!(
275                "Failed to fetch bot identity for self-message filtering: {e}. \
276                 Set ignore_self_messages to false to skip this check (not recommended — may cause message loops)."
277            ))
278        })?;
279
280        if !resp.ok {
281            return Err(WebexError::Internal(format!(
282                "Failed to fetch bot identity for self-message filtering: HTTP {}. \
283                 Set ignore_self_messages to false to skip this check (not recommended — may cause message loops).",
284                resp.status
285            )));
286        }
287
288        let data: serde_json::Value = serde_json::from_slice(&resp.body).map_err(|e| {
289            WebexError::Internal(format!("Failed to parse bot identity response: {e}"))
290        })?;
291
292        let id = data
293            .get("id")
294            .and_then(|v| v.as_str())
295            .ok_or_else(|| WebexError::Internal("Bot identity response missing 'id' field".into()))?;
296
297        let uuid = extract_person_uuid(id);
298        info!("Bot person ID cached for self-message filtering: {}", uuid);
299        *self.bot_person_id.lock().await = Some(uuid);
300        Ok(())
301    }
302
303    async fn connect_internal(&self) -> Result<(), WebexError> {
304        let token = self.token.lock().await.clone();
305
306        // Step 1: Register device with WDM
307        let reg = {
308            let mut dm = self.device_manager.lock().await;
309            dm.register(&token).await?
310        };
311        info!("Device registered");
312
313        // Step 1.5: Fetch bot person info if self-message filtering is enabled
314        if self.ignore_self_messages {
315            self.fetch_bot_person_id().await?;
316        }
317
318        // Step 2: Create KMS client
319        let kms = KmsClient::new(
320            self.http_do.clone(),
321            &token,
322            &reg.device_url,
323            &reg.user_id,
324            &reg.encryption_service_url,
325        );
326
327        // Get the response handler BEFORE storing the KMS client so the
328        // event loop can resolve pending requests without locking kms_client.
329        let response_handler = kms.response_handler();
330        *self.kms_response_handler.lock().await = Some(response_handler);
331        *self.kms_client.lock().await = Some(kms);
332
333        // Step 3: Connect Mercury WebSocket (KMS responses arrive here)
334        self.mercury_socket
335            .connect(&reg.web_socket_url, &token)
336            .await?;
337        info!("Mercury connected");
338
339        // Step 4: Start Mercury event loop
340        self.start_mercury_event_loop().await;
341
342        // Step 5: Initialize KMS (ECDH handshake — response comes via Mercury)
343        {
344            let mut kms_guard = self.kms_client.lock().await;
345            if let Some(ref mut kms) = *kms_guard {
346                kms.initialize().await?;
347            }
348        }
349        info!("KMS initialized");
350
351        // Store registration
352        *self.registration.lock().await = Some(reg);
353
354        Ok(())
355    }
356
357    /// Start processing Mercury events in a background task.
358    async fn start_mercury_event_loop(&self) {
359        let mut mercury_rx = match self.mercury_socket.take_event_rx().await {
360            Some(rx) => rx,
361            None => {
362                warn!("Mercury event receiver already taken");
363                return;
364            }
365        };
366
367        let kms_client = self.kms_client.clone();
368        let kms_response_handler = self.kms_response_handler.clone();
369        let event_tx = self.event_tx.clone();
370        let connected = self.connected.clone();
371        let registration = self.registration.clone();
372        let device_manager = self.device_manager.clone();
373        let token = self.token.clone();
374        let bot_person_id = self.bot_person_id.clone();
375
376        tokio::spawn(async move {
377            while let Some(event) = mercury_rx.recv().await {
378                match event {
379                    MercuryEvent::KmsResponse(data) => {
380                        // Use the separate response handler to avoid deadlock
381                        // with kms_client lock (held during initialize/get_key).
382                        let handler_guard = kms_response_handler.lock().await;
383                        if let Some(ref handler) = *handler_guard {
384                            handler.handle_kms_message(&data).await;
385                        }
386                    }
387                    MercuryEvent::Activity(activity) => {
388                        // Spawn in a separate task so the event loop can continue
389                        // processing KMS responses (needed for key retrieval during decryption).
390                        let kms_client_clone = kms_client.clone();
391                        let event_tx_clone = event_tx.clone();
392                        let bot_person_id = bot_person_id.clone();
393                        tokio::spawn(async move {
394                            let mut kms_guard = kms_client_clone.lock().await;
395                            if let Some(ref mut kms) = *kms_guard {
396                                let bot_id = bot_person_id.lock().await.clone();
397                                Self::handle_activity_static(kms, &activity, &event_tx_clone, bot_id.as_deref()).await;
398                            } else {
399                                warn!("Received activity but KMS client not initialized");
400                            }
401                        });
402                    }
403                    MercuryEvent::Connected => {
404                        info!("Mercury reconnected, refreshing device and KMS");
405
406                        // Refresh device
407                        let tok = token.lock().await.clone();
408                        {
409                            let reg_guard = registration.lock().await;
410                            if reg_guard.is_some() {
411                                let dm = device_manager.lock().await;
412                                match dm.refresh(&tok).await {
413                                    Ok(new_reg) => {
414                                        drop(reg_guard);
415                                        *registration.lock().await = Some(new_reg);
416                                    }
417                                    Err(e) => {
418                                        warn!("Device refresh on reconnect failed: {e}");
419                                    }
420                                }
421                            }
422                        }
423
424                        // Re-init KMS
425                        {
426                            let mut kms_guard = kms_client.lock().await;
427                            if let Some(ref mut kms) = *kms_guard {
428                                if let Err(e) = kms.initialize().await {
429                                    warn!("KMS re-init on reconnect failed: {e}");
430                                }
431                            }
432                        }
433
434                        *connected.lock().await = true;
435                        if event_tx.send(HandlerEvent::Connected).is_err() {
436                            warn!("Event receiver dropped, cannot send Connected event");
437                        }
438                    }
439                    MercuryEvent::Disconnected(reason) => {
440                        *connected.lock().await = false;
441                        if event_tx.send(HandlerEvent::Disconnected(reason)).is_err() {
442                            warn!("Event receiver dropped, cannot send Disconnected event");
443                        }
444                    }
445                    MercuryEvent::Reconnecting(attempt) => {
446                        if event_tx.send(HandlerEvent::Reconnecting(attempt)).is_err() {
447                            warn!("Event receiver dropped, cannot send Reconnecting event");
448                        }
449                    }
450                    MercuryEvent::Error(msg) => {
451                        if event_tx.send(HandlerEvent::Error(msg)).is_err() {
452                            warn!("Event receiver dropped, cannot send Error event");
453                        }
454                    }
455                }
456            }
457        });
458    }
459
460    /// Handle a single activity (decrypt and route).
461    async fn handle_activity_static(
462        kms: &mut KmsClient,
463        activity: &MercuryActivity,
464        event_tx: &mpsc::UnboundedSender<HandlerEvent>,
465        bot_person_id: Option<&str>,
466    ) {
467        // message:created — verb=post + objectType=comment
468        if activity.verb == "post" && activity.object.object_type == "comment" {
469            let mut decryptor = MessageDecryptor::new(kms);
470            match decryptor.decrypt_activity(activity).await {
471                Ok(decrypted) => {
472                    let msg = DecryptedMessage {
473                        id: decrypted.object.id.clone(),
474                        room_id: decrypted.target.id.clone(),
475                        person_id: decrypted.actor.id.clone(),
476                        person_email: decrypted
477                            .actor
478                            .email_address
479                            .clone()
480                            .unwrap_or_default(),
481                        text: decrypted.object.display_name.clone().unwrap_or_default(),
482                        html: decrypted.object.content.clone(),
483                        created: decrypted.published.clone(),
484                        room_type: infer_room_type(&decrypted),
485                        raw: decrypted,
486                    };
487
488                    // Filter self-messages if enabled
489                    if let Some(bot_id) = bot_person_id {
490                        if extract_person_uuid(&msg.person_id) == bot_id {
491                            info!("Ignoring self-message from bot ({})", bot_id);
492                            return;
493                        }
494                    }
495
496                    if event_tx.send(HandlerEvent::MessageCreated(msg)).is_err() {
497                        warn!("Event receiver dropped, cannot send MessageCreated event");
498                    }
499                }
500                Err(e) => {
501                    error!("Error decrypting activity: {e}");
502                    if event_tx.send(HandlerEvent::Error(e.to_string())).is_err() {
503                        warn!("Event receiver dropped, cannot send Error event");
504                    }
505                }
506            }
507            return;
508        }
509
510        // message:deleted — verb=delete + objectType=activity
511        if activity.verb == "delete" && activity.object.object_type == "activity" {
512            if event_tx.send(HandlerEvent::MessageDeleted(DeletedMessage {
513                message_id: activity.object.id.clone(),
514                room_id: activity.target.id.clone(),
515                person_id: activity.actor.id.clone(),
516            })).is_err() {
517                warn!("Event receiver dropped, cannot send MessageDeleted event");
518            }
519            return;
520        }
521
522        // membership:created — membership verbs + objectType=person
523        let membership_verbs = ["add", "leave", "assignModerator", "unassignModerator"];
524        if membership_verbs.contains(&activity.verb.as_str())
525            && activity.object.object_type == "person"
526        {
527            let event = HandlerEvent::MembershipCreated(MembershipActivity {
528                id: activity.id.clone(),
529                actor_id: activity.actor.id.clone(),
530                person_id: activity.object.id.clone(),
531                room_id: activity.target.id.clone(),
532                action: activity.verb.clone(),
533                created: activity.published.clone(),
534                room_type: infer_room_type(activity),
535                raw: activity.clone(),
536            });
537            if event_tx.send(event).is_err() {
538                warn!("Event receiver dropped, cannot send MembershipCreated event");
539            }
540        }
541    }
542
543    /// Disconnect from Webex.
544    pub async fn disconnect(&self) {
545        info!("Disconnecting from Webex...");
546        *self.connected.lock().await = false;
547
548        self.mercury_socket.disconnect().await;
549
550        let token = self.token.lock().await.clone();
551        {
552            let reg = self.registration.lock().await;
553            if reg.is_some() {
554                let mut dm = self.device_manager.lock().await;
555                if let Err(e) = dm.unregister(&token).await {
556                    warn!("Failed to unregister device: {e}");
557                } else {
558                    info!("Device unregistered");
559                }
560            }
561        }
562
563        *self.registration.lock().await = None;
564        *self.kms_client.lock().await = None;
565        *self.kms_response_handler.lock().await = None;
566        *self.bot_person_id.lock().await = None;
567    }
568
569    /// Update the access token and re-establish the connection.
570    pub async fn reconnect(&self, new_token: &str) -> Result<(), WebexError> {
571        if new_token.is_empty() {
572            return Err(WebexError::Internal(
573                "reconnect() requires a non-empty token string".into(),
574            ));
575        }
576
577        info!("Reconnecting with new token...");
578        self.disconnect().await;
579
580        *self.token.lock().await = new_token.to_string();
581        self.connect().await
582    }
583
584    /// Whether the handler is fully connected.
585    pub async fn connected(&self) -> bool {
586        let conn = *self.connected.lock().await;
587        conn && self.mercury_socket.connected().await
588    }
589
590    /// Returns a structured health check of all connection subsystems.
591    pub async fn status(&self) -> HandlerStatus {
592        let reconnect_attempt = self.mercury_socket.current_reconnect_attempts().await;
593        let ws_open = self.mercury_socket.connected().await;
594        let is_connected = *self.connected.lock().await;
595        let is_connecting = *self.connecting.lock().await;
596
597        let status = if is_connected && ws_open {
598            ConnectionStatus::Connected
599        } else if is_connecting {
600            ConnectionStatus::Connecting
601        } else if reconnect_attempt > 0 {
602            ConnectionStatus::Reconnecting
603        } else {
604            ConnectionStatus::Disconnected
605        };
606
607        HandlerStatus {
608            status,
609            web_socket_open: ws_open,
610            kms_initialized: self.kms_client.lock().await.is_some(),
611            device_registered: self.registration.lock().await.is_some(),
612            reconnect_attempt,
613        }
614    }
615}
616
617fn infer_room_type(activity: &MercuryActivity) -> Option<String> {
618    let tags = &activity.target.tags;
619    if tags.contains(&"ONE_ON_ONE".to_string()) {
620        return Some("direct".to_string());
621    }
622    if tags.contains(&"TEAM".to_string())
623        || tags.contains(&"LOCKED".to_string())
624        || tags.contains(&"GROUP".to_string())
625    {
626        return Some("group".to_string());
627    }
628    None
629}