Skip to main content

webex_message_handler/
handler.rs

1//! Main orchestrator: device registration, Mercury WebSocket, KMS, decryption.
2
3use crate::device_manager::DeviceManager;
4use crate::errors::WebexError;
5use crate::kms_client::{KmsClient, KmsResponseHandler};
6use crate::mention_parser::parse_mentions;
7use crate::mercury_socket::{MercuryEvent, MercurySocket};
8use crate::message_decryptor::MessageDecryptor;
9use crate::types::{
10    AttachmentAction, Config, ConnectionStatus, DecryptedMessage, DeletedMessage,
11    DeviceRegistration, FetchRequest, FetchResponse, HandlerStatus, MembershipActivity,
12    MercuryActivity, NetworkMode, RoomActivity,
13};
14use std::collections::HashMap;
15use std::future::Future;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19use tokio::sync::{mpsc, Mutex};
20use tracing::{error, info, warn};
21
22// Internal adapter type (alias for public type)
23type HttpDoFn = Arc<
24    dyn Fn(FetchRequest) -> Pin<Box<dyn Future<Output = Result<FetchResponse, Box<dyn std::error::Error + Send + Sync>>> + Send>>
25        + Send
26        + Sync,
27>;
28
29/// Events emitted by WebexMessageHandler.
30#[derive(Debug, Clone)]
31pub enum HandlerEvent {
32    /// A new message was received and decrypted.
33    MessageCreated(DecryptedMessage),
34    /// A message was edited.
35    MessageUpdated(DecryptedMessage),
36    /// A message was deleted.
37    MessageDeleted(DeletedMessage),
38    /// A membership event occurred (add, leave, assignModerator, unassignModerator).
39    MembershipCreated(MembershipActivity),
40    /// An adaptive card was submitted.
41    AttachmentActionCreated(AttachmentAction),
42    /// A room was created.
43    RoomCreated(RoomActivity),
44    /// A room was updated.
45    RoomUpdated(RoomActivity),
46    /// Successfully connected (or reconnected).
47    Connected,
48    /// Disconnected with a reason string.
49    Disconnected(String),
50    /// Reconnecting (attempt number).
51    Reconnecting(u32),
52    /// An error occurred.
53    Error(String),
54}
55
56/// Extract the raw UUID from a Webex person ID.
57///
58/// The Webex REST API returns base64-encoded IDs like:
59///   `"Y2lzY29zcGFyazovL3VzL1BFT1BMRS9mYjUx..."` → `"ciscospark://us/PEOPLE/fb51254f-..."`
60///
61/// Mercury wire format uses raw UUIDs:
62///   `"fb51254f-3b37-4e50-aa04-45744c2effc7"`
63///
64/// This function normalizes both formats to the raw UUID for comparison.
65fn extract_person_uuid(id: &str) -> String {
66    use base64::engine::general_purpose::{STANDARD, STANDARD_NO_PAD};
67    use base64::Engine;
68
69    // Try standard base64, then no-padding variant
70    let decoded_bytes = STANDARD
71        .decode(id)
72        .or_else(|_| STANDARD_NO_PAD.decode(id));
73
74    if let Ok(bytes) = decoded_bytes {
75        if let Ok(decoded) = String::from_utf8(bytes) {
76            if decoded.starts_with("ciscospark://") {
77                if let Some(uuid) = decoded.rsplit('/').next() {
78                    if !uuid.is_empty() {
79                        return uuid.to_string();
80                    }
81                }
82            }
83        }
84    }
85
86    id.to_string()
87}
88
89/// Create a native HTTP adapter that wraps reqwest::Client.
90fn create_native_http_adapter(client: reqwest::Client) -> HttpDoFn {
91    Arc::new(move |req: FetchRequest| {
92        let client = client.clone();
93        Box::pin(async move {
94            let mut request_builder = match req.method.as_str() {
95                "GET" => client.get(&req.url),
96                "POST" => client.post(&req.url),
97                "PUT" => client.put(&req.url),
98                "DELETE" => client.delete(&req.url),
99                _ => return Err(format!("Unsupported HTTP method: {}", req.method).into()),
100            };
101
102            for (key, value) in req.headers {
103                request_builder = request_builder.header(key, value);
104            }
105
106            if let Some(body) = req.body {
107                request_builder = request_builder.body(body);
108            }
109
110            let response = request_builder
111                .send()
112                .await
113                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
114
115            let status = response.status().as_u16();
116            let ok = response.status().is_success();
117            let body_bytes = response
118                .bytes()
119                .await
120                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?
121                .to_vec();
122
123            Ok(FetchResponse {
124                status,
125                ok,
126                body: body_bytes,
127            })
128        })
129    })
130}
131
132/// Receives and decrypts Webex messages over Mercury WebSocket.
133pub struct WebexMessageHandler {
134    token: Arc<Mutex<String>>,
135    http_do: HttpDoFn,
136    device_manager: Arc<Mutex<DeviceManager>>,
137    mercury_socket: Arc<MercurySocket>,
138    kms_client: Arc<Mutex<Option<KmsClient>>>,
139    /// Separate handle for resolving KMS responses without locking kms_client.
140    kms_response_handler: Arc<Mutex<Option<KmsResponseHandler>>>,
141    registration: Arc<Mutex<Option<DeviceRegistration>>>,
142    connected: Arc<Mutex<bool>>,
143    connecting: Arc<Mutex<bool>>,
144    ignore_self_messages: bool,
145    bot_person_id: Arc<Mutex<Option<String>>>,
146    /// Track recent activity IDs to prevent replay attacks (activity_id -> Instant)
147    recent_activity_ids: Arc<Mutex<HashMap<String, Instant>>>,
148
149    #[allow(dead_code)]
150    config: Config,
151    event_tx: mpsc::UnboundedSender<HandlerEvent>,
152    event_rx: Arc<Mutex<Option<mpsc::UnboundedReceiver<HandlerEvent>>>>,
153}
154
155impl WebexMessageHandler {
156    /// Create a new WebexMessageHandler.
157    pub fn new(config: Config) -> Result<Self, WebexError> {
158        if config.token.is_empty() {
159            return Err(WebexError::Internal(
160                "WebexMessageHandler requires a non-empty token string".into(),
161            ));
162        }
163
164        // Validate networking mode configuration
165        match config.mode {
166            NetworkMode::Injected => {
167                if config.fetch.is_none() || config.web_socket_factory.is_none() {
168                    return Err(WebexError::Internal(
169                        "Injected mode requires both fetch and web_socket_factory".into(),
170                    ));
171                }
172                if config.client.is_some() {
173                    return Err(WebexError::Internal(
174                        "Cannot use native proxy parameters (client) in injected mode".into(),
175                    ));
176                }
177            }
178            NetworkMode::Native => {
179                if config.fetch.is_some() || config.web_socket_factory.is_some() {
180                    return Err(WebexError::Internal(
181                        "Cannot provide fetch/web_socket_factory in native mode — set mode to Injected".into(),
182                    ));
183                }
184            }
185        }
186
187        // Create adapters based on mode
188        let (http_do, ws_factory) = match config.mode {
189            NetworkMode::Native => {
190                let client = config.client.clone().unwrap_or_default();
191                let http_adapter = create_native_http_adapter(client.clone());
192                (http_adapter, None)
193            }
194            NetworkMode::Injected => {
195                let http_adapter = config.fetch.clone().expect("Injected mode requires fetch adapter");
196                let ws_factory = config.web_socket_factory.clone();
197                (http_adapter, ws_factory)
198            }
199        };
200
201        let mercury_socket = MercurySocket::new(
202            ws_factory,
203            Duration::from_secs_f64(config.ping_interval),
204            Duration::from_secs_f64(config.pong_timeout),
205            Duration::from_secs_f64(config.reconnect_backoff_max),
206            config.max_reconnect_attempts,
207        );
208
209        let (event_tx, event_rx) = mpsc::unbounded_channel();
210
211        let ignore_self_messages = config.ignore_self_messages;
212
213        Ok(Self {
214            token: Arc::new(Mutex::new(config.token.clone())),
215            http_do: http_do.clone(),
216            device_manager: Arc::new(Mutex::new(DeviceManager::new(http_do.clone()))),
217            mercury_socket: Arc::new(mercury_socket),
218            kms_client: Arc::new(Mutex::new(None)),
219            kms_response_handler: Arc::new(Mutex::new(None)),
220            registration: Arc::new(Mutex::new(None)),
221            connected: Arc::new(Mutex::new(false)),
222            connecting: Arc::new(Mutex::new(false)),
223            ignore_self_messages,
224            bot_person_id: Arc::new(Mutex::new(None)),
225            recent_activity_ids: Arc::new(Mutex::new(HashMap::new())),
226            config,
227            event_tx,
228            event_rx: Arc::new(Mutex::new(Some(event_rx))),
229        })
230    }
231
232    /// Take the event receiver. Can only be called once.
233    pub async fn take_event_rx(&self) -> Option<mpsc::UnboundedReceiver<HandlerEvent>> {
234        self.event_rx.lock().await.take()
235    }
236
237    /// Connect to Webex (register device, connect Mercury, init KMS).
238    pub async fn connect(&self) -> Result<(), WebexError> {
239        {
240            let connecting = self.connecting.lock().await;
241            let connected = self.connected.lock().await;
242            if *connecting {
243                return Err(WebexError::Internal("connect() already in progress".into()));
244            }
245            if *connected {
246                return Err(WebexError::Internal(
247                    "Already connected. Call disconnect() first, or use reconnect().".into(),
248                ));
249            }
250        }
251
252        info!("Connecting to Webex...");
253        *self.connecting.lock().await = true;
254
255        let result = self.connect_internal().await;
256
257        *self.connecting.lock().await = false;
258
259        match result {
260            Ok(()) => {
261                *self.connected.lock().await = true;
262                info!("Connected to Webex");
263                if self.event_tx.send(HandlerEvent::Connected).is_err() {
264                    warn!("Event receiver dropped, cannot send Connected event");
265                }
266                Ok(())
267            }
268            Err(e) => Err(e),
269        }
270    }
271
272    async fn fetch_bot_person_id(&self) -> Result<(), WebexError> {
273        info!("Fetching bot person info for self-message filtering");
274        let token = self.token.lock().await.clone();
275        let req = FetchRequest {
276            url: "https://webexapis.com/v1/people/me".into(),
277            method: "GET".into(),
278            headers: {
279                let mut h = std::collections::HashMap::new();
280                h.insert("Authorization".into(), format!("Bearer {}", token));
281                h.insert("Content-Type".into(), "application/json".into());
282                h
283            },
284            body: None,
285        };
286
287        let resp = (self.http_do)(req).await.map_err(|e| {
288            WebexError::Internal(format!(
289                "Failed to fetch bot identity for self-message filtering: {e}. \
290                 Set ignore_self_messages to false to skip this check (not recommended — may cause message loops)."
291            ))
292        })?;
293
294        if !resp.ok {
295            return Err(WebexError::Internal(format!(
296                "Failed to fetch bot identity for self-message filtering: HTTP {}. \
297                 Set ignore_self_messages to false to skip this check (not recommended — may cause message loops).",
298                resp.status
299            )));
300        }
301
302        let data: serde_json::Value = serde_json::from_slice(&resp.body).map_err(|e| {
303            WebexError::Internal(format!("Failed to parse bot identity response: {e}"))
304        })?;
305
306        let id = data
307            .get("id")
308            .and_then(|v| v.as_str())
309            .ok_or_else(|| WebexError::Internal("Bot identity response missing 'id' field".into()))?;
310
311        let uuid = extract_person_uuid(id);
312        info!("Bot person ID cached for self-message filtering: {}", uuid);
313        *self.bot_person_id.lock().await = Some(uuid);
314        Ok(())
315    }
316
317    async fn connect_internal(&self) -> Result<(), WebexError> {
318        let token = self.token.lock().await.clone();
319
320        // Step 1: Register device with WDM
321        let reg = {
322            let mut dm = self.device_manager.lock().await;
323            dm.register(&token).await?
324        };
325        info!("Device registered");
326
327        // Step 1.5: Fetch bot person info if self-message filtering is enabled
328        if self.ignore_self_messages {
329            self.fetch_bot_person_id().await?;
330        }
331
332        // Step 2: Create KMS client
333        let kms = KmsClient::new(
334            self.http_do.clone(),
335            &token,
336            &reg.device_url,
337            &reg.user_id,
338            &reg.encryption_service_url,
339        );
340
341        // Get the response handler BEFORE storing the KMS client so the
342        // event loop can resolve pending requests without locking kms_client.
343        let response_handler = kms.response_handler();
344        *self.kms_response_handler.lock().await = Some(response_handler);
345        *self.kms_client.lock().await = Some(kms);
346
347        // Step 3: Connect Mercury WebSocket (KMS responses arrive here)
348        self.mercury_socket
349            .connect(&reg.web_socket_url, &token)
350            .await?;
351        info!("Mercury connected");
352
353        // Step 4: Start Mercury event loop
354        self.start_mercury_event_loop().await;
355
356        // Step 5: Initialize KMS (ECDH handshake — response comes via Mercury)
357        {
358            let mut kms_guard = self.kms_client.lock().await;
359            if let Some(ref mut kms) = *kms_guard {
360                kms.initialize().await?;
361            }
362        }
363        info!("KMS initialized");
364
365        // Store registration
366        *self.registration.lock().await = Some(reg);
367
368        Ok(())
369    }
370
371    /// Start processing Mercury events in a background task.
372    async fn start_mercury_event_loop(&self) {
373        let mut mercury_rx = match self.mercury_socket.take_event_rx().await {
374            Some(rx) => rx,
375            None => {
376                warn!("Mercury event receiver already taken");
377                return;
378            }
379        };
380
381        let kms_client = self.kms_client.clone();
382        let kms_response_handler = self.kms_response_handler.clone();
383        let event_tx = self.event_tx.clone();
384        let connected = self.connected.clone();
385        let registration = self.registration.clone();
386        let device_manager = self.device_manager.clone();
387        let token = self.token.clone();
388        let bot_person_id = self.bot_person_id.clone();
389        let recent_activity_ids = self.recent_activity_ids.clone();
390
391        tokio::spawn(async move {
392            let mut sweep_interval = tokio::time::interval(Duration::from_secs(30));
393            loop {
394                tokio::select! {
395                    Some(event) = mercury_rx.recv() => {
396                match event {
397                    MercuryEvent::KmsResponse(data) => {
398                        // Use the separate response handler to avoid deadlock
399                        // with kms_client lock (held during initialize/get_key).
400                        let handler_guard = kms_response_handler.lock().await;
401                        if let Some(ref handler) = *handler_guard {
402                            handler.handle_kms_message(&data).await;
403                        }
404                    }
405                    MercuryEvent::Activity(activity) => {
406                        // Spawn in a separate task so the event loop can continue
407                        // processing KMS responses (needed for key retrieval during decryption).
408                        let kms_client_clone = kms_client.clone();
409                        let event_tx_clone = event_tx.clone();
410                        let bot_person_id = bot_person_id.clone();
411                        tokio::spawn(async move {
412                            let mut kms_guard = kms_client_clone.lock().await;
413                            if let Some(ref mut kms) = *kms_guard {
414                                let bot_id = bot_person_id.lock().await.clone();
415                                Self::handle_activity_static(kms, &activity, &event_tx_clone, bot_id.as_deref()).await;
416                            } else {
417                                warn!("Received activity but KMS client not initialized");
418                            }
419                        });
420                    }
421                    MercuryEvent::Connected => {
422                        info!("Mercury reconnected, refreshing device and KMS");
423
424                        // Refresh device
425                        let tok = token.lock().await.clone();
426                        {
427                            let reg_guard = registration.lock().await;
428                            if reg_guard.is_some() {
429                                let dm = device_manager.lock().await;
430                                match dm.refresh(&tok).await {
431                                    Ok(new_reg) => {
432                                        drop(reg_guard);
433                                        *registration.lock().await = Some(new_reg);
434                                    }
435                                    Err(e) => {
436                                        warn!("Device refresh on reconnect failed: {e}");
437                                    }
438                                }
439                            }
440                        }
441
442                        // Re-init KMS
443                        {
444                            let mut kms_guard = kms_client.lock().await;
445                            if let Some(ref mut kms) = *kms_guard {
446                                if let Err(e) = kms.initialize().await {
447                                    warn!("KMS re-init on reconnect failed: {e}");
448                                }
449                            }
450                        }
451
452                        *connected.lock().await = true;
453                        if event_tx.send(HandlerEvent::Connected).is_err() {
454                            warn!("Event receiver dropped, cannot send Connected event");
455                        }
456                    }
457                    MercuryEvent::Disconnected(reason) => {
458                        *connected.lock().await = false;
459                        if event_tx.send(HandlerEvent::Disconnected(reason)).is_err() {
460                            warn!("Event receiver dropped, cannot send Disconnected event");
461                        }
462                    }
463                    MercuryEvent::Reconnecting(attempt) => {
464                        if event_tx.send(HandlerEvent::Reconnecting(attempt)).is_err() {
465                            warn!("Event receiver dropped, cannot send Reconnecting event");
466                        }
467                    }
468                    MercuryEvent::Error(msg) => {
469                        if event_tx.send(HandlerEvent::Error(msg)).is_err() {
470                            warn!("Event receiver dropped, cannot send Error event");
471                        }
472                    }
473                }
474                    }
475                    _ = sweep_interval.tick() => {
476                        let mut ids = recent_activity_ids.lock().await;
477                        let cutoff = Instant::now() - Duration::from_secs(300);
478                        ids.retain(|_, &mut t| t > cutoff);
479                    }
480                }
481            }
482        });
483    }
484
485    /// Handle a single activity (decrypt and route).
486    async fn handle_activity_static(
487        kms: &mut KmsClient,
488        activity: &MercuryActivity,
489        event_tx: &mpsc::UnboundedSender<HandlerEvent>,
490        bot_person_id: Option<&str>,
491    ) {
492        // message:created or message:updated — verb=post/update + objectType=comment
493        if (activity.verb == "post" || activity.verb == "update") && activity.object.object_type == "comment" {
494            let mut decryptor = MessageDecryptor::new(kms);
495            match decryptor.decrypt_activity(activity).await {
496                Ok(decrypted) => {
497                    let mentions = parse_mentions(decrypted.object.content.as_deref());
498                    let msg = DecryptedMessage {
499                        id: decrypted.id.clone(),
500                        parent_id: decrypted.parent.as_ref().map(|p| p.id.clone()),
501                        mentioned_people: mentions.mentioned_people,
502                        mentioned_groups: mentions.mentioned_groups,
503                        room_id: decrypted.target.id.clone(),
504                        person_id: decrypted.actor.id.clone(),
505                        person_email: decrypted
506                            .actor
507                            .email_address
508                            .clone()
509                            .unwrap_or_default(),
510                        text: decrypted.object.display_name.clone().unwrap_or_default(),
511                        html: decrypted.object.content.clone(),
512                        created: decrypted.published.clone(),
513                        room_type: infer_room_type(&decrypted),
514                        files: decrypted.object.files.clone().unwrap_or_default(),
515                        raw: decrypted,
516                    };
517
518                    // Filter self-messages if enabled
519                    if let Some(bot_id) = bot_person_id {
520                        if extract_person_uuid(&msg.person_id) == bot_id {
521                            info!("Ignoring self-message from bot ({})", bot_id);
522                            return;
523                        }
524                    }
525
526                    let event = if activity.verb == "update" {
527                        HandlerEvent::MessageUpdated(msg)
528                    } else {
529                        HandlerEvent::MessageCreated(msg)
530                    };
531                    if event_tx.send(event).is_err() {
532                        warn!("Event receiver dropped, cannot send message event");
533                    }
534                }
535                Err(e) => {
536                    error!("Error decrypting activity: {e}");
537                    if event_tx.send(HandlerEvent::Error(e.to_string())).is_err() {
538                        warn!("Event receiver dropped, cannot send Error event");
539                    }
540                }
541            }
542            return;
543        }
544
545        // message:deleted — verb=delete + objectType=activity
546        if activity.verb == "delete" && activity.object.object_type == "activity" {
547            if event_tx.send(HandlerEvent::MessageDeleted(DeletedMessage {
548                message_id: activity.object.id.clone(),
549                room_id: activity.target.id.clone(),
550                person_id: activity.actor.id.clone(),
551            })).is_err() {
552                warn!("Event receiver dropped, cannot send MessageDeleted event");
553            }
554            return;
555        }
556
557        // membership:created — membership verbs + objectType=person
558        let membership_verbs = ["add", "leave", "assignModerator", "unassignModerator"];
559        if membership_verbs.contains(&activity.verb.as_str())
560            && activity.object.object_type == "person"
561        {
562            let event = HandlerEvent::MembershipCreated(MembershipActivity {
563                id: activity.id.clone(),
564                actor_id: activity.actor.id.clone(),
565                person_id: activity.object.id.clone(),
566                room_id: activity.target.id.clone(),
567                action: activity.verb.clone(),
568                created: activity.published.clone(),
569                room_type: infer_room_type(activity),
570                raw: activity.clone(),
571            });
572            if event_tx.send(event).is_err() {
573                warn!("Event receiver dropped, cannot send MembershipCreated event");
574            }
575            return;
576        }
577
578        // attachmentAction:created — verb=cardAction + objectType=submit
579        if activity.verb == "cardAction" && activity.object.object_type == "submit" {
580            let event = HandlerEvent::AttachmentActionCreated(AttachmentAction {
581                id: activity.id.clone(),
582                message_id: activity.parent.as_ref().map(|p| p.id.clone()).unwrap_or_default(),
583                person_id: activity.actor.id.clone(),
584                person_email: activity.actor.email_address.clone().unwrap_or_default(),
585                room_id: activity.target.id.clone(),
586                inputs: activity.object.inputs.clone().unwrap_or(serde_json::Value::Object(Default::default())),
587                created: activity.published.clone(),
588                raw: activity.clone(),
589            });
590            if event_tx.send(event).is_err() {
591                warn!("Event receiver dropped, cannot send AttachmentActionCreated event");
592            }
593            return;
594        }
595
596        // room:created or room:updated — verb=create/update + object.objectType=conversation
597        if (activity.verb == "create" || activity.verb == "update")
598            && activity.object.object_type == "conversation"
599        {
600            let action = if activity.verb == "create" { "created" } else { "updated" };
601            let ra = RoomActivity {
602                id: activity.id.clone(),
603                room_id: activity.target.id.clone(),
604                actor_id: activity.actor.id.clone(),
605                action: action.to_string(),
606                created: activity.published.clone(),
607                raw: activity.clone(),
608            };
609            let event = if activity.verb == "create" {
610                HandlerEvent::RoomCreated(ra)
611            } else {
612                HandlerEvent::RoomUpdated(ra)
613            };
614            if event_tx.send(event).is_err() {
615                warn!("Event receiver dropped, cannot send room event");
616            }
617        }
618    }
619
620    /// Disconnect from Webex.
621    pub async fn disconnect(&self) {
622        info!("Disconnecting from Webex...");
623        *self.connected.lock().await = false;
624
625        self.mercury_socket.disconnect().await;
626
627        let token = self.token.lock().await.clone();
628        {
629            let reg = self.registration.lock().await;
630            if reg.is_some() {
631                let mut dm = self.device_manager.lock().await;
632                if let Err(e) = dm.unregister(&token).await {
633                    warn!("Failed to unregister device: {e}");
634                } else {
635                    info!("Device unregistered");
636                }
637            }
638        }
639
640        *self.registration.lock().await = None;
641        *self.kms_client.lock().await = None;
642        *self.kms_response_handler.lock().await = None;
643        *self.bot_person_id.lock().await = None;
644    }
645
646    /// Update the access token and re-establish the connection.
647    pub async fn reconnect(&self, new_token: &str) -> Result<(), WebexError> {
648        if new_token.is_empty() {
649            return Err(WebexError::Internal(
650                "reconnect() requires a non-empty token string".into(),
651            ));
652        }
653
654        info!("Reconnecting with new token...");
655        self.disconnect().await;
656
657        *self.token.lock().await = new_token.to_string();
658        self.connect().await
659    }
660
661    /// Whether the handler is fully connected.
662    pub async fn connected(&self) -> bool {
663        let conn = *self.connected.lock().await;
664        conn && self.mercury_socket.connected().await
665    }
666
667    /// Returns a structured health check of all connection subsystems.
668    pub async fn status(&self) -> HandlerStatus {
669        let reconnect_attempt = self.mercury_socket.current_reconnect_attempts().await;
670        let ws_open = self.mercury_socket.connected().await;
671        let is_connected = *self.connected.lock().await;
672        let is_connecting = *self.connecting.lock().await;
673
674        let status = if is_connected && ws_open {
675            ConnectionStatus::Connected
676        } else if is_connecting {
677            ConnectionStatus::Connecting
678        } else if reconnect_attempt > 0 {
679            ConnectionStatus::Reconnecting
680        } else {
681            ConnectionStatus::Disconnected
682        };
683
684        HandlerStatus {
685            status,
686            web_socket_open: ws_open,
687            kms_initialized: self.kms_client.lock().await.is_some(),
688            device_registered: self.registration.lock().await.is_some(),
689            reconnect_attempt,
690        }
691    }
692}
693
694fn infer_room_type(activity: &MercuryActivity) -> Option<String> {
695    let tags = &activity.target.tags;
696    if tags.contains(&"ONE_ON_ONE".to_string()) {
697        return Some("direct".to_string());
698    }
699    if tags.contains(&"TEAM".to_string())
700        || tags.contains(&"LOCKED".to_string())
701        || tags.contains(&"GROUP".to_string())
702    {
703        return Some("group".to_string());
704    }
705    None
706}