1use crate::device_manager::DeviceManager;
4use crate::errors::WebexError;
5use crate::kms_client::{KmsClient, KmsResponseHandler};
6use crate::mention_parser::parse_mentions;
7use crate::mercury_socket::{MercuryEvent, MercurySocket};
8use crate::message_decryptor::MessageDecryptor;
9use crate::types::{
10 AttachmentAction, Config, ConnectionStatus, DecryptedMessage, DeletedMessage,
11 DeviceRegistration, FetchRequest, FetchResponse, HandlerStatus, MembershipActivity,
12 MercuryActivity, NetworkMode, RoomActivity,
13};
14use std::collections::HashMap;
15use std::future::Future;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19use tokio::sync::{mpsc, Mutex};
20use tracing::{error, info, warn};
21
22type HttpDoFn = Arc<
24 dyn Fn(FetchRequest) -> Pin<Box<dyn Future<Output = Result<FetchResponse, Box<dyn std::error::Error + Send + Sync>>> + Send>>
25 + Send
26 + Sync,
27>;
28
29#[derive(Debug, Clone)]
31pub enum HandlerEvent {
32 MessageCreated(DecryptedMessage),
34 MessageUpdated(DecryptedMessage),
36 MessageDeleted(DeletedMessage),
38 MembershipCreated(MembershipActivity),
40 AttachmentActionCreated(AttachmentAction),
42 RoomCreated(RoomActivity),
44 RoomUpdated(RoomActivity),
46 Connected,
48 Disconnected(String),
50 Reconnecting(u32),
52 Error(String),
54}
55
56fn extract_person_uuid(id: &str) -> String {
66 use base64::engine::general_purpose::{STANDARD, STANDARD_NO_PAD};
67 use base64::Engine;
68
69 let decoded_bytes = STANDARD
71 .decode(id)
72 .or_else(|_| STANDARD_NO_PAD.decode(id));
73
74 if let Ok(bytes) = decoded_bytes {
75 if let Ok(decoded) = String::from_utf8(bytes) {
76 if decoded.starts_with("ciscospark://") {
77 if let Some(uuid) = decoded.rsplit('/').next() {
78 if !uuid.is_empty() {
79 return uuid.to_string();
80 }
81 }
82 }
83 }
84 }
85
86 id.to_string()
87}
88
89fn create_native_http_adapter(client: reqwest::Client) -> HttpDoFn {
91 Arc::new(move |req: FetchRequest| {
92 let client = client.clone();
93 Box::pin(async move {
94 let mut request_builder = match req.method.as_str() {
95 "GET" => client.get(&req.url),
96 "POST" => client.post(&req.url),
97 "PUT" => client.put(&req.url),
98 "DELETE" => client.delete(&req.url),
99 _ => return Err(format!("Unsupported HTTP method: {}", req.method).into()),
100 };
101
102 for (key, value) in req.headers {
103 request_builder = request_builder.header(key, value);
104 }
105
106 if let Some(body) = req.body {
107 request_builder = request_builder.body(body);
108 }
109
110 let response = request_builder
111 .send()
112 .await
113 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
114
115 let status = response.status().as_u16();
116 let ok = response.status().is_success();
117 let body_bytes = response
118 .bytes()
119 .await
120 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?
121 .to_vec();
122
123 Ok(FetchResponse {
124 status,
125 ok,
126 body: body_bytes,
127 })
128 })
129 })
130}
131
132pub struct WebexMessageHandler {
134 token: Arc<Mutex<String>>,
135 http_do: HttpDoFn,
136 device_manager: Arc<Mutex<DeviceManager>>,
137 mercury_socket: Arc<MercurySocket>,
138 kms_client: Arc<Mutex<Option<KmsClient>>>,
139 kms_response_handler: Arc<Mutex<Option<KmsResponseHandler>>>,
141 registration: Arc<Mutex<Option<DeviceRegistration>>>,
142 connected: Arc<Mutex<bool>>,
143 connecting: Arc<Mutex<bool>>,
144 ignore_self_messages: bool,
145 bot_person_id: Arc<Mutex<Option<String>>>,
146 recent_activity_ids: Arc<Mutex<HashMap<String, Instant>>>,
148
149 #[allow(dead_code)]
150 config: Config,
151 event_tx: mpsc::UnboundedSender<HandlerEvent>,
152 event_rx: Arc<Mutex<Option<mpsc::UnboundedReceiver<HandlerEvent>>>>,
153}
154
155impl WebexMessageHandler {
156 pub fn new(config: Config) -> Result<Self, WebexError> {
158 if config.token.is_empty() {
159 return Err(WebexError::Internal(
160 "WebexMessageHandler requires a non-empty token string".into(),
161 ));
162 }
163
164 match config.mode {
166 NetworkMode::Injected => {
167 if config.fetch.is_none() || config.web_socket_factory.is_none() {
168 return Err(WebexError::Internal(
169 "Injected mode requires both fetch and web_socket_factory".into(),
170 ));
171 }
172 if config.client.is_some() {
173 return Err(WebexError::Internal(
174 "Cannot use native proxy parameters (client) in injected mode".into(),
175 ));
176 }
177 }
178 NetworkMode::Native => {
179 if config.fetch.is_some() || config.web_socket_factory.is_some() {
180 return Err(WebexError::Internal(
181 "Cannot provide fetch/web_socket_factory in native mode — set mode to Injected".into(),
182 ));
183 }
184 }
185 }
186
187 let (http_do, ws_factory) = match config.mode {
189 NetworkMode::Native => {
190 let client = config.client.clone().unwrap_or_default();
191 let http_adapter = create_native_http_adapter(client.clone());
192 (http_adapter, None)
193 }
194 NetworkMode::Injected => {
195 let http_adapter = config.fetch.clone().expect("Injected mode requires fetch adapter");
196 let ws_factory = config.web_socket_factory.clone();
197 (http_adapter, ws_factory)
198 }
199 };
200
201 let mercury_socket = MercurySocket::new(
202 ws_factory,
203 Duration::from_secs_f64(config.ping_interval),
204 Duration::from_secs_f64(config.pong_timeout),
205 Duration::from_secs_f64(config.reconnect_backoff_max),
206 config.max_reconnect_attempts,
207 );
208
209 let (event_tx, event_rx) = mpsc::unbounded_channel();
210
211 let ignore_self_messages = config.ignore_self_messages;
212
213 Ok(Self {
214 token: Arc::new(Mutex::new(config.token.clone())),
215 http_do: http_do.clone(),
216 device_manager: Arc::new(Mutex::new(DeviceManager::new(http_do.clone()))),
217 mercury_socket: Arc::new(mercury_socket),
218 kms_client: Arc::new(Mutex::new(None)),
219 kms_response_handler: Arc::new(Mutex::new(None)),
220 registration: Arc::new(Mutex::new(None)),
221 connected: Arc::new(Mutex::new(false)),
222 connecting: Arc::new(Mutex::new(false)),
223 ignore_self_messages,
224 bot_person_id: Arc::new(Mutex::new(None)),
225 recent_activity_ids: Arc::new(Mutex::new(HashMap::new())),
226 config,
227 event_tx,
228 event_rx: Arc::new(Mutex::new(Some(event_rx))),
229 })
230 }
231
232 pub async fn take_event_rx(&self) -> Option<mpsc::UnboundedReceiver<HandlerEvent>> {
234 self.event_rx.lock().await.take()
235 }
236
237 pub async fn connect(&self) -> Result<(), WebexError> {
239 {
240 let connecting = self.connecting.lock().await;
241 let connected = self.connected.lock().await;
242 if *connecting {
243 return Err(WebexError::Internal("connect() already in progress".into()));
244 }
245 if *connected {
246 return Err(WebexError::Internal(
247 "Already connected. Call disconnect() first, or use reconnect().".into(),
248 ));
249 }
250 }
251
252 info!("Connecting to Webex...");
253 *self.connecting.lock().await = true;
254
255 let result = self.connect_internal().await;
256
257 *self.connecting.lock().await = false;
258
259 match result {
260 Ok(()) => {
261 *self.connected.lock().await = true;
262 info!("Connected to Webex");
263 if self.event_tx.send(HandlerEvent::Connected).is_err() {
264 warn!("Event receiver dropped, cannot send Connected event");
265 }
266 Ok(())
267 }
268 Err(e) => Err(e),
269 }
270 }
271
272 async fn fetch_bot_person_id(&self) -> Result<(), WebexError> {
273 info!("Fetching bot person info for self-message filtering");
274 let token = self.token.lock().await.clone();
275 let req = FetchRequest {
276 url: "https://webexapis.com/v1/people/me".into(),
277 method: "GET".into(),
278 headers: {
279 let mut h = std::collections::HashMap::new();
280 h.insert("Authorization".into(), format!("Bearer {}", token));
281 h.insert("Content-Type".into(), "application/json".into());
282 h
283 },
284 body: None,
285 };
286
287 let resp = (self.http_do)(req).await.map_err(|e| {
288 WebexError::Internal(format!(
289 "Failed to fetch bot identity for self-message filtering: {e}. \
290 Set ignore_self_messages to false to skip this check (not recommended — may cause message loops)."
291 ))
292 })?;
293
294 if !resp.ok {
295 return Err(WebexError::Internal(format!(
296 "Failed to fetch bot identity for self-message filtering: HTTP {}. \
297 Set ignore_self_messages to false to skip this check (not recommended — may cause message loops).",
298 resp.status
299 )));
300 }
301
302 let data: serde_json::Value = serde_json::from_slice(&resp.body).map_err(|e| {
303 WebexError::Internal(format!("Failed to parse bot identity response: {e}"))
304 })?;
305
306 let id = data
307 .get("id")
308 .and_then(|v| v.as_str())
309 .ok_or_else(|| WebexError::Internal("Bot identity response missing 'id' field".into()))?;
310
311 let uuid = extract_person_uuid(id);
312 info!("Bot person ID cached for self-message filtering: {}", uuid);
313 *self.bot_person_id.lock().await = Some(uuid);
314 Ok(())
315 }
316
317 async fn connect_internal(&self) -> Result<(), WebexError> {
318 let token = self.token.lock().await.clone();
319
320 let reg = {
322 let mut dm = self.device_manager.lock().await;
323 dm.register(&token).await?
324 };
325 info!("Device registered");
326
327 if self.ignore_self_messages {
329 self.fetch_bot_person_id().await?;
330 }
331
332 let kms = KmsClient::new(
334 self.http_do.clone(),
335 &token,
336 ®.device_url,
337 ®.user_id,
338 ®.encryption_service_url,
339 );
340
341 let response_handler = kms.response_handler();
344 *self.kms_response_handler.lock().await = Some(response_handler);
345 *self.kms_client.lock().await = Some(kms);
346
347 self.mercury_socket
349 .connect(®.web_socket_url, &token)
350 .await?;
351 info!("Mercury connected");
352
353 self.start_mercury_event_loop().await;
355
356 {
358 let mut kms_guard = self.kms_client.lock().await;
359 if let Some(ref mut kms) = *kms_guard {
360 kms.initialize().await?;
361 }
362 }
363 info!("KMS initialized");
364
365 *self.registration.lock().await = Some(reg);
367
368 Ok(())
369 }
370
371 async fn start_mercury_event_loop(&self) {
373 let mut mercury_rx = match self.mercury_socket.take_event_rx().await {
374 Some(rx) => rx,
375 None => {
376 warn!("Mercury event receiver already taken");
377 return;
378 }
379 };
380
381 let kms_client = self.kms_client.clone();
382 let kms_response_handler = self.kms_response_handler.clone();
383 let event_tx = self.event_tx.clone();
384 let connected = self.connected.clone();
385 let registration = self.registration.clone();
386 let device_manager = self.device_manager.clone();
387 let token = self.token.clone();
388 let bot_person_id = self.bot_person_id.clone();
389 let recent_activity_ids = self.recent_activity_ids.clone();
390
391 tokio::spawn(async move {
392 let mut sweep_interval = tokio::time::interval(Duration::from_secs(30));
393 loop {
394 tokio::select! {
395 Some(event) = mercury_rx.recv() => {
396 match event {
397 MercuryEvent::KmsResponse(data) => {
398 let handler_guard = kms_response_handler.lock().await;
401 if let Some(ref handler) = *handler_guard {
402 handler.handle_kms_message(&data).await;
403 }
404 }
405 MercuryEvent::Activity(activity) => {
406 let kms_client_clone = kms_client.clone();
409 let event_tx_clone = event_tx.clone();
410 let bot_person_id = bot_person_id.clone();
411 tokio::spawn(async move {
412 let mut kms_guard = kms_client_clone.lock().await;
413 if let Some(ref mut kms) = *kms_guard {
414 let bot_id = bot_person_id.lock().await.clone();
415 Self::handle_activity_static(kms, &activity, &event_tx_clone, bot_id.as_deref()).await;
416 } else {
417 warn!("Received activity but KMS client not initialized");
418 }
419 });
420 }
421 MercuryEvent::Connected => {
422 info!("Mercury reconnected, refreshing device and KMS");
423
424 let tok = token.lock().await.clone();
426 {
427 let reg_guard = registration.lock().await;
428 if reg_guard.is_some() {
429 let dm = device_manager.lock().await;
430 match dm.refresh(&tok).await {
431 Ok(new_reg) => {
432 drop(reg_guard);
433 *registration.lock().await = Some(new_reg);
434 }
435 Err(e) => {
436 warn!("Device refresh on reconnect failed: {e}");
437 }
438 }
439 }
440 }
441
442 {
444 let mut kms_guard = kms_client.lock().await;
445 if let Some(ref mut kms) = *kms_guard {
446 if let Err(e) = kms.initialize().await {
447 warn!("KMS re-init on reconnect failed: {e}");
448 }
449 }
450 }
451
452 *connected.lock().await = true;
453 if event_tx.send(HandlerEvent::Connected).is_err() {
454 warn!("Event receiver dropped, cannot send Connected event");
455 }
456 }
457 MercuryEvent::Disconnected(reason) => {
458 *connected.lock().await = false;
459 if event_tx.send(HandlerEvent::Disconnected(reason)).is_err() {
460 warn!("Event receiver dropped, cannot send Disconnected event");
461 }
462 }
463 MercuryEvent::Reconnecting(attempt) => {
464 if event_tx.send(HandlerEvent::Reconnecting(attempt)).is_err() {
465 warn!("Event receiver dropped, cannot send Reconnecting event");
466 }
467 }
468 MercuryEvent::Error(msg) => {
469 if event_tx.send(HandlerEvent::Error(msg)).is_err() {
470 warn!("Event receiver dropped, cannot send Error event");
471 }
472 }
473 }
474 }
475 _ = sweep_interval.tick() => {
476 let mut ids = recent_activity_ids.lock().await;
477 let cutoff = Instant::now() - Duration::from_secs(300);
478 ids.retain(|_, &mut t| t > cutoff);
479 }
480 }
481 }
482 });
483 }
484
485 async fn handle_activity_static(
487 kms: &mut KmsClient,
488 activity: &MercuryActivity,
489 event_tx: &mpsc::UnboundedSender<HandlerEvent>,
490 bot_person_id: Option<&str>,
491 ) {
492 if (activity.verb == "post" || activity.verb == "update") && activity.object.object_type == "comment" {
494 let mut decryptor = MessageDecryptor::new(kms);
495 match decryptor.decrypt_activity(activity).await {
496 Ok(decrypted) => {
497 let mentions = parse_mentions(decrypted.object.content.as_deref());
498 let msg = DecryptedMessage {
499 id: decrypted.id.clone(),
500 parent_id: decrypted.parent.as_ref().map(|p| p.id.clone()),
501 mentioned_people: mentions.mentioned_people,
502 mentioned_groups: mentions.mentioned_groups,
503 room_id: decrypted.target.id.clone(),
504 person_id: decrypted.actor.id.clone(),
505 person_email: decrypted
506 .actor
507 .email_address
508 .clone()
509 .unwrap_or_default(),
510 text: decrypted.object.display_name.clone().unwrap_or_default(),
511 html: decrypted.object.content.clone(),
512 created: decrypted.published.clone(),
513 room_type: infer_room_type(&decrypted),
514 files: decrypted.object.files.clone().unwrap_or_default(),
515 raw: decrypted,
516 };
517
518 if let Some(bot_id) = bot_person_id {
520 if extract_person_uuid(&msg.person_id) == bot_id {
521 info!("Ignoring self-message from bot ({})", bot_id);
522 return;
523 }
524 }
525
526 let event = if activity.verb == "update" {
527 HandlerEvent::MessageUpdated(msg)
528 } else {
529 HandlerEvent::MessageCreated(msg)
530 };
531 if event_tx.send(event).is_err() {
532 warn!("Event receiver dropped, cannot send message event");
533 }
534 }
535 Err(e) => {
536 error!("Error decrypting activity: {e}");
537 if event_tx.send(HandlerEvent::Error(e.to_string())).is_err() {
538 warn!("Event receiver dropped, cannot send Error event");
539 }
540 }
541 }
542 return;
543 }
544
545 if activity.verb == "delete" && activity.object.object_type == "activity" {
547 if event_tx.send(HandlerEvent::MessageDeleted(DeletedMessage {
548 message_id: activity.object.id.clone(),
549 room_id: activity.target.id.clone(),
550 person_id: activity.actor.id.clone(),
551 })).is_err() {
552 warn!("Event receiver dropped, cannot send MessageDeleted event");
553 }
554 return;
555 }
556
557 let membership_verbs = ["add", "leave", "assignModerator", "unassignModerator"];
559 if membership_verbs.contains(&activity.verb.as_str())
560 && activity.object.object_type == "person"
561 {
562 let event = HandlerEvent::MembershipCreated(MembershipActivity {
563 id: activity.id.clone(),
564 actor_id: activity.actor.id.clone(),
565 person_id: activity.object.id.clone(),
566 room_id: activity.target.id.clone(),
567 action: activity.verb.clone(),
568 created: activity.published.clone(),
569 room_type: infer_room_type(activity),
570 raw: activity.clone(),
571 });
572 if event_tx.send(event).is_err() {
573 warn!("Event receiver dropped, cannot send MembershipCreated event");
574 }
575 return;
576 }
577
578 if activity.verb == "cardAction" && activity.object.object_type == "submit" {
580 let event = HandlerEvent::AttachmentActionCreated(AttachmentAction {
581 id: activity.id.clone(),
582 message_id: activity.parent.as_ref().map(|p| p.id.clone()).unwrap_or_default(),
583 person_id: activity.actor.id.clone(),
584 person_email: activity.actor.email_address.clone().unwrap_or_default(),
585 room_id: activity.target.id.clone(),
586 inputs: activity.object.inputs.clone().unwrap_or(serde_json::Value::Object(Default::default())),
587 created: activity.published.clone(),
588 raw: activity.clone(),
589 });
590 if event_tx.send(event).is_err() {
591 warn!("Event receiver dropped, cannot send AttachmentActionCreated event");
592 }
593 return;
594 }
595
596 if (activity.verb == "create" || activity.verb == "update")
598 && activity.object.object_type == "conversation"
599 {
600 let action = if activity.verb == "create" { "created" } else { "updated" };
601 let ra = RoomActivity {
602 id: activity.id.clone(),
603 room_id: activity.target.id.clone(),
604 actor_id: activity.actor.id.clone(),
605 action: action.to_string(),
606 created: activity.published.clone(),
607 raw: activity.clone(),
608 };
609 let event = if activity.verb == "create" {
610 HandlerEvent::RoomCreated(ra)
611 } else {
612 HandlerEvent::RoomUpdated(ra)
613 };
614 if event_tx.send(event).is_err() {
615 warn!("Event receiver dropped, cannot send room event");
616 }
617 }
618 }
619
620 pub async fn disconnect(&self) {
622 info!("Disconnecting from Webex...");
623 *self.connected.lock().await = false;
624
625 self.mercury_socket.disconnect().await;
626
627 let token = self.token.lock().await.clone();
628 {
629 let reg = self.registration.lock().await;
630 if reg.is_some() {
631 let mut dm = self.device_manager.lock().await;
632 if let Err(e) = dm.unregister(&token).await {
633 warn!("Failed to unregister device: {e}");
634 } else {
635 info!("Device unregistered");
636 }
637 }
638 }
639
640 *self.registration.lock().await = None;
641 *self.kms_client.lock().await = None;
642 *self.kms_response_handler.lock().await = None;
643 *self.bot_person_id.lock().await = None;
644 }
645
646 pub async fn reconnect(&self, new_token: &str) -> Result<(), WebexError> {
648 if new_token.is_empty() {
649 return Err(WebexError::Internal(
650 "reconnect() requires a non-empty token string".into(),
651 ));
652 }
653
654 info!("Reconnecting with new token...");
655 self.disconnect().await;
656
657 *self.token.lock().await = new_token.to_string();
658 self.connect().await
659 }
660
661 pub async fn connected(&self) -> bool {
663 let conn = *self.connected.lock().await;
664 conn && self.mercury_socket.connected().await
665 }
666
667 pub async fn status(&self) -> HandlerStatus {
669 let reconnect_attempt = self.mercury_socket.current_reconnect_attempts().await;
670 let ws_open = self.mercury_socket.connected().await;
671 let is_connected = *self.connected.lock().await;
672 let is_connecting = *self.connecting.lock().await;
673
674 let status = if is_connected && ws_open {
675 ConnectionStatus::Connected
676 } else if is_connecting {
677 ConnectionStatus::Connecting
678 } else if reconnect_attempt > 0 {
679 ConnectionStatus::Reconnecting
680 } else {
681 ConnectionStatus::Disconnected
682 };
683
684 HandlerStatus {
685 status,
686 web_socket_open: ws_open,
687 kms_initialized: self.kms_client.lock().await.is_some(),
688 device_registered: self.registration.lock().await.is_some(),
689 reconnect_attempt,
690 }
691 }
692}
693
694fn infer_room_type(activity: &MercuryActivity) -> Option<String> {
695 let tags = &activity.target.tags;
696 if tags.contains(&"ONE_ON_ONE".to_string()) {
697 return Some("direct".to_string());
698 }
699 if tags.contains(&"TEAM".to_string())
700 || tags.contains(&"LOCKED".to_string())
701 || tags.contains(&"GROUP".to_string())
702 {
703 return Some("group".to_string());
704 }
705 None
706}