1use crate::device_manager::DeviceManager;
4use crate::errors::WebexError;
5use crate::kms_client::{KmsClient, KmsResponseHandler};
6use crate::mercury_socket::{MercuryEvent, MercurySocket};
7use crate::message_decryptor::MessageDecryptor;
8use crate::types::{
9 Config, ConnectionStatus, DecryptedMessage, DeletedMessage, DeviceRegistration, FetchRequest,
10 FetchResponse, HandlerStatus, MembershipActivity, MercuryActivity, NetworkMode,
11};
12use std::collections::HashMap;
13use std::future::Future;
14use std::pin::Pin;
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17use tokio::sync::{mpsc, Mutex};
18use tracing::{error, info, warn};
19
20type HttpDoFn = Arc<
22 dyn Fn(FetchRequest) -> Pin<Box<dyn Future<Output = Result<FetchResponse, Box<dyn std::error::Error + Send + Sync>>> + Send>>
23 + Send
24 + Sync,
25>;
26
27#[derive(Debug, Clone)]
29pub enum HandlerEvent {
30 MessageCreated(DecryptedMessage),
32 MessageDeleted(DeletedMessage),
34 MembershipCreated(MembershipActivity),
36 Connected,
38 Disconnected(String),
40 Reconnecting(u32),
42 Error(String),
44}
45
46fn extract_person_uuid(id: &str) -> String {
56 use base64::engine::general_purpose::{STANDARD, STANDARD_NO_PAD};
57 use base64::Engine;
58
59 let decoded_bytes = STANDARD
61 .decode(id)
62 .or_else(|_| STANDARD_NO_PAD.decode(id));
63
64 if let Ok(bytes) = decoded_bytes {
65 if let Ok(decoded) = String::from_utf8(bytes) {
66 if decoded.starts_with("ciscospark://") {
67 if let Some(uuid) = decoded.rsplit('/').next() {
68 if !uuid.is_empty() {
69 return uuid.to_string();
70 }
71 }
72 }
73 }
74 }
75
76 id.to_string()
77}
78
79fn create_native_http_adapter(client: reqwest::Client) -> HttpDoFn {
81 Arc::new(move |req: FetchRequest| {
82 let client = client.clone();
83 Box::pin(async move {
84 let mut request_builder = match req.method.as_str() {
85 "GET" => client.get(&req.url),
86 "POST" => client.post(&req.url),
87 "PUT" => client.put(&req.url),
88 "DELETE" => client.delete(&req.url),
89 _ => return Err(format!("Unsupported HTTP method: {}", req.method).into()),
90 };
91
92 for (key, value) in req.headers {
93 request_builder = request_builder.header(key, value);
94 }
95
96 if let Some(body) = req.body {
97 request_builder = request_builder.body(body);
98 }
99
100 let response = request_builder
101 .send()
102 .await
103 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
104
105 let status = response.status().as_u16();
106 let ok = response.status().is_success();
107 let body_bytes = response
108 .bytes()
109 .await
110 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?
111 .to_vec();
112
113 Ok(FetchResponse {
114 status,
115 ok,
116 body: body_bytes,
117 })
118 })
119 })
120}
121
122pub struct WebexMessageHandler {
124 token: Arc<Mutex<String>>,
125 http_do: HttpDoFn,
126 device_manager: Arc<Mutex<DeviceManager>>,
127 mercury_socket: Arc<MercurySocket>,
128 kms_client: Arc<Mutex<Option<KmsClient>>>,
129 kms_response_handler: Arc<Mutex<Option<KmsResponseHandler>>>,
131 registration: Arc<Mutex<Option<DeviceRegistration>>>,
132 connected: Arc<Mutex<bool>>,
133 connecting: Arc<Mutex<bool>>,
134 ignore_self_messages: bool,
135 bot_person_id: Arc<Mutex<Option<String>>>,
136 recent_activity_ids: Arc<Mutex<HashMap<String, Instant>>>,
138
139 #[allow(dead_code)]
140 config: Config,
141 event_tx: mpsc::UnboundedSender<HandlerEvent>,
142 event_rx: Arc<Mutex<Option<mpsc::UnboundedReceiver<HandlerEvent>>>>,
143}
144
145impl WebexMessageHandler {
146 pub fn new(config: Config) -> Result<Self, WebexError> {
148 if config.token.is_empty() {
149 return Err(WebexError::Internal(
150 "WebexMessageHandler requires a non-empty token string".into(),
151 ));
152 }
153
154 match config.mode {
156 NetworkMode::Injected => {
157 if config.fetch.is_none() || config.web_socket_factory.is_none() {
158 return Err(WebexError::Internal(
159 "Injected mode requires both fetch and web_socket_factory".into(),
160 ));
161 }
162 if config.client.is_some() {
163 return Err(WebexError::Internal(
164 "Cannot use native proxy parameters (client) in injected mode".into(),
165 ));
166 }
167 }
168 NetworkMode::Native => {
169 if config.fetch.is_some() || config.web_socket_factory.is_some() {
170 return Err(WebexError::Internal(
171 "Cannot provide fetch/web_socket_factory in native mode — set mode to Injected".into(),
172 ));
173 }
174 }
175 }
176
177 let (http_do, ws_factory) = match config.mode {
179 NetworkMode::Native => {
180 let client = config.client.clone().unwrap_or_default();
181 let http_adapter = create_native_http_adapter(client.clone());
182 (http_adapter, None)
183 }
184 NetworkMode::Injected => {
185 let http_adapter = config.fetch.clone().expect("Injected mode requires fetch adapter");
186 let ws_factory = config.web_socket_factory.clone();
187 (http_adapter, ws_factory)
188 }
189 };
190
191 let mercury_socket = MercurySocket::new(
192 ws_factory,
193 Duration::from_secs_f64(config.ping_interval),
194 Duration::from_secs_f64(config.pong_timeout),
195 Duration::from_secs_f64(config.reconnect_backoff_max),
196 config.max_reconnect_attempts,
197 );
198
199 let (event_tx, event_rx) = mpsc::unbounded_channel();
200
201 let ignore_self_messages = config.ignore_self_messages;
202
203 Ok(Self {
204 token: Arc::new(Mutex::new(config.token.clone())),
205 http_do: http_do.clone(),
206 device_manager: Arc::new(Mutex::new(DeviceManager::new(http_do.clone()))),
207 mercury_socket: Arc::new(mercury_socket),
208 kms_client: Arc::new(Mutex::new(None)),
209 kms_response_handler: Arc::new(Mutex::new(None)),
210 registration: Arc::new(Mutex::new(None)),
211 connected: Arc::new(Mutex::new(false)),
212 connecting: Arc::new(Mutex::new(false)),
213 ignore_self_messages,
214 bot_person_id: Arc::new(Mutex::new(None)),
215 recent_activity_ids: Arc::new(Mutex::new(HashMap::new())),
216 config,
217 event_tx,
218 event_rx: Arc::new(Mutex::new(Some(event_rx))),
219 })
220 }
221
222 pub async fn take_event_rx(&self) -> Option<mpsc::UnboundedReceiver<HandlerEvent>> {
224 self.event_rx.lock().await.take()
225 }
226
227 pub async fn connect(&self) -> Result<(), WebexError> {
229 {
230 let connecting = self.connecting.lock().await;
231 let connected = self.connected.lock().await;
232 if *connecting {
233 return Err(WebexError::Internal("connect() already in progress".into()));
234 }
235 if *connected {
236 return Err(WebexError::Internal(
237 "Already connected. Call disconnect() first, or use reconnect().".into(),
238 ));
239 }
240 }
241
242 info!("Connecting to Webex...");
243 *self.connecting.lock().await = true;
244
245 let result = self.connect_internal().await;
246
247 *self.connecting.lock().await = false;
248
249 match result {
250 Ok(()) => {
251 *self.connected.lock().await = true;
252 info!("Connected to Webex");
253 if self.event_tx.send(HandlerEvent::Connected).is_err() {
254 warn!("Event receiver dropped, cannot send Connected event");
255 }
256 Ok(())
257 }
258 Err(e) => Err(e),
259 }
260 }
261
262 async fn fetch_bot_person_id(&self) -> Result<(), WebexError> {
263 info!("Fetching bot person info for self-message filtering");
264 let token = self.token.lock().await.clone();
265 let req = FetchRequest {
266 url: "https://webexapis.com/v1/people/me".into(),
267 method: "GET".into(),
268 headers: {
269 let mut h = std::collections::HashMap::new();
270 h.insert("Authorization".into(), format!("Bearer {}", token));
271 h.insert("Content-Type".into(), "application/json".into());
272 h
273 },
274 body: None,
275 };
276
277 let resp = (self.http_do)(req).await.map_err(|e| {
278 WebexError::Internal(format!(
279 "Failed to fetch bot identity for self-message filtering: {e}. \
280 Set ignore_self_messages to false to skip this check (not recommended — may cause message loops)."
281 ))
282 })?;
283
284 if !resp.ok {
285 return Err(WebexError::Internal(format!(
286 "Failed to fetch bot identity for self-message filtering: HTTP {}. \
287 Set ignore_self_messages to false to skip this check (not recommended — may cause message loops).",
288 resp.status
289 )));
290 }
291
292 let data: serde_json::Value = serde_json::from_slice(&resp.body).map_err(|e| {
293 WebexError::Internal(format!("Failed to parse bot identity response: {e}"))
294 })?;
295
296 let id = data
297 .get("id")
298 .and_then(|v| v.as_str())
299 .ok_or_else(|| WebexError::Internal("Bot identity response missing 'id' field".into()))?;
300
301 let uuid = extract_person_uuid(id);
302 info!("Bot person ID cached for self-message filtering: {}", uuid);
303 *self.bot_person_id.lock().await = Some(uuid);
304 Ok(())
305 }
306
307 async fn connect_internal(&self) -> Result<(), WebexError> {
308 let token = self.token.lock().await.clone();
309
310 let reg = {
312 let mut dm = self.device_manager.lock().await;
313 dm.register(&token).await?
314 };
315 info!("Device registered");
316
317 if self.ignore_self_messages {
319 self.fetch_bot_person_id().await?;
320 }
321
322 let kms = KmsClient::new(
324 self.http_do.clone(),
325 &token,
326 ®.device_url,
327 ®.user_id,
328 ®.encryption_service_url,
329 );
330
331 let response_handler = kms.response_handler();
334 *self.kms_response_handler.lock().await = Some(response_handler);
335 *self.kms_client.lock().await = Some(kms);
336
337 self.mercury_socket
339 .connect(®.web_socket_url, &token)
340 .await?;
341 info!("Mercury connected");
342
343 self.start_mercury_event_loop().await;
345
346 {
348 let mut kms_guard = self.kms_client.lock().await;
349 if let Some(ref mut kms) = *kms_guard {
350 kms.initialize().await?;
351 }
352 }
353 info!("KMS initialized");
354
355 *self.registration.lock().await = Some(reg);
357
358 Ok(())
359 }
360
361 async fn start_mercury_event_loop(&self) {
363 let mut mercury_rx = match self.mercury_socket.take_event_rx().await {
364 Some(rx) => rx,
365 None => {
366 warn!("Mercury event receiver already taken");
367 return;
368 }
369 };
370
371 let kms_client = self.kms_client.clone();
372 let kms_response_handler = self.kms_response_handler.clone();
373 let event_tx = self.event_tx.clone();
374 let connected = self.connected.clone();
375 let registration = self.registration.clone();
376 let device_manager = self.device_manager.clone();
377 let token = self.token.clone();
378 let bot_person_id = self.bot_person_id.clone();
379 let recent_activity_ids = self.recent_activity_ids.clone();
380
381 tokio::spawn(async move {
382 let mut sweep_interval = tokio::time::interval(Duration::from_secs(30));
383 loop {
384 tokio::select! {
385 Some(event) = mercury_rx.recv() => {
386 match event {
387 MercuryEvent::KmsResponse(data) => {
388 let handler_guard = kms_response_handler.lock().await;
391 if let Some(ref handler) = *handler_guard {
392 handler.handle_kms_message(&data).await;
393 }
394 }
395 MercuryEvent::Activity(activity) => {
396 let kms_client_clone = kms_client.clone();
399 let event_tx_clone = event_tx.clone();
400 let bot_person_id = bot_person_id.clone();
401 tokio::spawn(async move {
402 let mut kms_guard = kms_client_clone.lock().await;
403 if let Some(ref mut kms) = *kms_guard {
404 let bot_id = bot_person_id.lock().await.clone();
405 Self::handle_activity_static(kms, &activity, &event_tx_clone, bot_id.as_deref()).await;
406 } else {
407 warn!("Received activity but KMS client not initialized");
408 }
409 });
410 }
411 MercuryEvent::Connected => {
412 info!("Mercury reconnected, refreshing device and KMS");
413
414 let tok = token.lock().await.clone();
416 {
417 let reg_guard = registration.lock().await;
418 if reg_guard.is_some() {
419 let dm = device_manager.lock().await;
420 match dm.refresh(&tok).await {
421 Ok(new_reg) => {
422 drop(reg_guard);
423 *registration.lock().await = Some(new_reg);
424 }
425 Err(e) => {
426 warn!("Device refresh on reconnect failed: {e}");
427 }
428 }
429 }
430 }
431
432 {
434 let mut kms_guard = kms_client.lock().await;
435 if let Some(ref mut kms) = *kms_guard {
436 if let Err(e) = kms.initialize().await {
437 warn!("KMS re-init on reconnect failed: {e}");
438 }
439 }
440 }
441
442 *connected.lock().await = true;
443 if event_tx.send(HandlerEvent::Connected).is_err() {
444 warn!("Event receiver dropped, cannot send Connected event");
445 }
446 }
447 MercuryEvent::Disconnected(reason) => {
448 *connected.lock().await = false;
449 if event_tx.send(HandlerEvent::Disconnected(reason)).is_err() {
450 warn!("Event receiver dropped, cannot send Disconnected event");
451 }
452 }
453 MercuryEvent::Reconnecting(attempt) => {
454 if event_tx.send(HandlerEvent::Reconnecting(attempt)).is_err() {
455 warn!("Event receiver dropped, cannot send Reconnecting event");
456 }
457 }
458 MercuryEvent::Error(msg) => {
459 if event_tx.send(HandlerEvent::Error(msg)).is_err() {
460 warn!("Event receiver dropped, cannot send Error event");
461 }
462 }
463 }
464 }
465 _ = sweep_interval.tick() => {
466 let mut ids = recent_activity_ids.lock().await;
467 let cutoff = Instant::now() - Duration::from_secs(300);
468 ids.retain(|_, &mut t| t > cutoff);
469 }
470 }
471 }
472 });
473 }
474
475 async fn handle_activity_static(
477 kms: &mut KmsClient,
478 activity: &MercuryActivity,
479 event_tx: &mpsc::UnboundedSender<HandlerEvent>,
480 bot_person_id: Option<&str>,
481 ) {
482 if activity.verb == "post" && activity.object.object_type == "comment" {
484 let mut decryptor = MessageDecryptor::new(kms);
485 match decryptor.decrypt_activity(activity).await {
486 Ok(decrypted) => {
487 let msg = DecryptedMessage {
488 id: decrypted.object.id.clone(),
489 room_id: decrypted.target.id.clone(),
490 person_id: decrypted.actor.id.clone(),
491 person_email: decrypted
492 .actor
493 .email_address
494 .clone()
495 .unwrap_or_default(),
496 text: decrypted.object.display_name.clone().unwrap_or_default(),
497 html: decrypted.object.content.clone(),
498 created: decrypted.published.clone(),
499 room_type: infer_room_type(&decrypted),
500 raw: decrypted,
501 };
502
503 if let Some(bot_id) = bot_person_id {
505 if extract_person_uuid(&msg.person_id) == bot_id {
506 info!("Ignoring self-message from bot ({})", bot_id);
507 return;
508 }
509 }
510
511 if event_tx.send(HandlerEvent::MessageCreated(msg)).is_err() {
512 warn!("Event receiver dropped, cannot send MessageCreated event");
513 }
514 }
515 Err(e) => {
516 error!("Error decrypting activity: {e}");
517 if event_tx.send(HandlerEvent::Error(e.to_string())).is_err() {
518 warn!("Event receiver dropped, cannot send Error event");
519 }
520 }
521 }
522 return;
523 }
524
525 if activity.verb == "delete" && activity.object.object_type == "activity" {
527 if event_tx.send(HandlerEvent::MessageDeleted(DeletedMessage {
528 message_id: activity.object.id.clone(),
529 room_id: activity.target.id.clone(),
530 person_id: activity.actor.id.clone(),
531 })).is_err() {
532 warn!("Event receiver dropped, cannot send MessageDeleted event");
533 }
534 return;
535 }
536
537 let membership_verbs = ["add", "leave", "assignModerator", "unassignModerator"];
539 if membership_verbs.contains(&activity.verb.as_str())
540 && activity.object.object_type == "person"
541 {
542 let event = HandlerEvent::MembershipCreated(MembershipActivity {
543 id: activity.id.clone(),
544 actor_id: activity.actor.id.clone(),
545 person_id: activity.object.id.clone(),
546 room_id: activity.target.id.clone(),
547 action: activity.verb.clone(),
548 created: activity.published.clone(),
549 room_type: infer_room_type(activity),
550 raw: activity.clone(),
551 });
552 if event_tx.send(event).is_err() {
553 warn!("Event receiver dropped, cannot send MembershipCreated event");
554 }
555 }
556 }
557
558 pub async fn disconnect(&self) {
560 info!("Disconnecting from Webex...");
561 *self.connected.lock().await = false;
562
563 self.mercury_socket.disconnect().await;
564
565 let token = self.token.lock().await.clone();
566 {
567 let reg = self.registration.lock().await;
568 if reg.is_some() {
569 let mut dm = self.device_manager.lock().await;
570 if let Err(e) = dm.unregister(&token).await {
571 warn!("Failed to unregister device: {e}");
572 } else {
573 info!("Device unregistered");
574 }
575 }
576 }
577
578 *self.registration.lock().await = None;
579 *self.kms_client.lock().await = None;
580 *self.kms_response_handler.lock().await = None;
581 *self.bot_person_id.lock().await = None;
582 }
583
584 pub async fn reconnect(&self, new_token: &str) -> Result<(), WebexError> {
586 if new_token.is_empty() {
587 return Err(WebexError::Internal(
588 "reconnect() requires a non-empty token string".into(),
589 ));
590 }
591
592 info!("Reconnecting with new token...");
593 self.disconnect().await;
594
595 *self.token.lock().await = new_token.to_string();
596 self.connect().await
597 }
598
599 pub async fn connected(&self) -> bool {
601 let conn = *self.connected.lock().await;
602 conn && self.mercury_socket.connected().await
603 }
604
605 pub async fn status(&self) -> HandlerStatus {
607 let reconnect_attempt = self.mercury_socket.current_reconnect_attempts().await;
608 let ws_open = self.mercury_socket.connected().await;
609 let is_connected = *self.connected.lock().await;
610 let is_connecting = *self.connecting.lock().await;
611
612 let status = if is_connected && ws_open {
613 ConnectionStatus::Connected
614 } else if is_connecting {
615 ConnectionStatus::Connecting
616 } else if reconnect_attempt > 0 {
617 ConnectionStatus::Reconnecting
618 } else {
619 ConnectionStatus::Disconnected
620 };
621
622 HandlerStatus {
623 status,
624 web_socket_open: ws_open,
625 kms_initialized: self.kms_client.lock().await.is_some(),
626 device_registered: self.registration.lock().await.is_some(),
627 reconnect_attempt,
628 }
629 }
630}
631
632fn infer_room_type(activity: &MercuryActivity) -> Option<String> {
633 let tags = &activity.target.tags;
634 if tags.contains(&"ONE_ON_ONE".to_string()) {
635 return Some("direct".to_string());
636 }
637 if tags.contains(&"TEAM".to_string())
638 || tags.contains(&"LOCKED".to_string())
639 || tags.contains(&"GROUP".to_string())
640 {
641 return Some("group".to_string());
642 }
643 None
644}