1use crate::device_manager::DeviceManager;
4use crate::errors::WebexError;
5use crate::kms_client::{KmsClient, KmsResponseHandler};
6use crate::mercury_socket::{MercuryEvent, MercurySocket};
7use crate::message_decryptor::MessageDecryptor;
8use crate::types::{
9 Config, ConnectionStatus, DecryptedMessage, DeletedMessage, DeviceRegistration, FetchRequest,
10 FetchResponse, HandlerStatus, MembershipActivity, MercuryActivity, NetworkMode,
11};
12use std::future::Future;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::{mpsc, Mutex};
17use tracing::{error, info, warn};
18
19type HttpDoFn = Arc<
21 dyn Fn(FetchRequest) -> Pin<Box<dyn Future<Output = Result<FetchResponse, Box<dyn std::error::Error + Send + Sync>>> + Send>>
22 + Send
23 + Sync,
24>;
25
26#[derive(Debug, Clone)]
28pub enum HandlerEvent {
29 MessageCreated(DecryptedMessage),
31 MessageDeleted(DeletedMessage),
33 MembershipCreated(MembershipActivity),
35 Connected,
37 Disconnected(String),
39 Reconnecting(u32),
41 Error(String),
43}
44
45fn extract_person_uuid(id: &str) -> String {
55 use base64::engine::general_purpose::{STANDARD, STANDARD_NO_PAD};
56 use base64::Engine;
57
58 let decoded_bytes = STANDARD
60 .decode(id)
61 .or_else(|_| STANDARD_NO_PAD.decode(id));
62
63 if let Ok(bytes) = decoded_bytes {
64 if let Ok(decoded) = String::from_utf8(bytes) {
65 if decoded.starts_with("ciscospark://") {
66 if let Some(uuid) = decoded.rsplit('/').next() {
67 if !uuid.is_empty() {
68 return uuid.to_string();
69 }
70 }
71 }
72 }
73 }
74
75 id.to_string()
76}
77
78fn create_native_http_adapter(client: reqwest::Client) -> HttpDoFn {
80 Arc::new(move |req: FetchRequest| {
81 let client = client.clone();
82 Box::pin(async move {
83 let mut request_builder = match req.method.as_str() {
84 "GET" => client.get(&req.url),
85 "POST" => client.post(&req.url),
86 "PUT" => client.put(&req.url),
87 "DELETE" => client.delete(&req.url),
88 _ => return Err(format!("Unsupported HTTP method: {}", req.method).into()),
89 };
90
91 for (key, value) in req.headers {
92 request_builder = request_builder.header(key, value);
93 }
94
95 if let Some(body) = req.body {
96 request_builder = request_builder.body(body);
97 }
98
99 let response = request_builder
100 .send()
101 .await
102 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
103
104 let status = response.status().as_u16();
105 let ok = response.status().is_success();
106 let body_bytes = response
107 .bytes()
108 .await
109 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?
110 .to_vec();
111
112 Ok(FetchResponse {
113 status,
114 ok,
115 body: body_bytes,
116 })
117 })
118 })
119}
120
121pub struct WebexMessageHandler {
123 token: Arc<Mutex<String>>,
124 http_do: HttpDoFn,
125 device_manager: Arc<Mutex<DeviceManager>>,
126 mercury_socket: Arc<MercurySocket>,
127 kms_client: Arc<Mutex<Option<KmsClient>>>,
128 kms_response_handler: Arc<Mutex<Option<KmsResponseHandler>>>,
130 registration: Arc<Mutex<Option<DeviceRegistration>>>,
131 connected: Arc<Mutex<bool>>,
132 connecting: Arc<Mutex<bool>>,
133 ignore_self_messages: bool,
134 bot_person_id: Arc<Mutex<Option<String>>>,
135
136 #[allow(dead_code)]
137 config: Config,
138 event_tx: mpsc::UnboundedSender<HandlerEvent>,
139 event_rx: Arc<Mutex<Option<mpsc::UnboundedReceiver<HandlerEvent>>>>,
140}
141
142impl WebexMessageHandler {
143 pub fn new(config: Config) -> Result<Self, WebexError> {
145 if config.token.is_empty() {
146 return Err(WebexError::Internal(
147 "WebexMessageHandler requires a non-empty token string".into(),
148 ));
149 }
150
151 match config.mode {
153 NetworkMode::Injected => {
154 if config.fetch.is_none() || config.web_socket_factory.is_none() {
155 return Err(WebexError::Internal(
156 "Injected mode requires both fetch and web_socket_factory".into(),
157 ));
158 }
159 if config.client.is_some() {
160 return Err(WebexError::Internal(
161 "Cannot use native proxy parameters (client) in injected mode".into(),
162 ));
163 }
164 }
165 NetworkMode::Native => {
166 if config.fetch.is_some() || config.web_socket_factory.is_some() {
167 return Err(WebexError::Internal(
168 "Cannot provide fetch/web_socket_factory in native mode — set mode to Injected".into(),
169 ));
170 }
171 }
172 }
173
174 let (http_do, ws_factory) = match config.mode {
176 NetworkMode::Native => {
177 let client = config.client.clone().unwrap_or_default();
178 let http_adapter = create_native_http_adapter(client.clone());
179 (http_adapter, None)
180 }
181 NetworkMode::Injected => {
182 let http_adapter = config.fetch.clone().expect("Injected mode requires fetch adapter");
183 let ws_factory = config.web_socket_factory.clone();
184 (http_adapter, ws_factory)
185 }
186 };
187
188 let mercury_socket = MercurySocket::new(
189 ws_factory,
190 Duration::from_secs_f64(config.ping_interval),
191 Duration::from_secs_f64(config.pong_timeout),
192 Duration::from_secs_f64(config.reconnect_backoff_max),
193 config.max_reconnect_attempts,
194 );
195
196 let (event_tx, event_rx) = mpsc::unbounded_channel();
197
198 let ignore_self_messages = config.ignore_self_messages;
199
200 Ok(Self {
201 token: Arc::new(Mutex::new(config.token.clone())),
202 http_do: http_do.clone(),
203 device_manager: Arc::new(Mutex::new(DeviceManager::new(http_do.clone()))),
204 mercury_socket: Arc::new(mercury_socket),
205 kms_client: Arc::new(Mutex::new(None)),
206 kms_response_handler: Arc::new(Mutex::new(None)),
207 registration: Arc::new(Mutex::new(None)),
208 connected: Arc::new(Mutex::new(false)),
209 connecting: Arc::new(Mutex::new(false)),
210 ignore_self_messages,
211 bot_person_id: Arc::new(Mutex::new(None)),
212 config,
213 event_tx,
214 event_rx: Arc::new(Mutex::new(Some(event_rx))),
215 })
216 }
217
218 pub async fn take_event_rx(&self) -> Option<mpsc::UnboundedReceiver<HandlerEvent>> {
220 self.event_rx.lock().await.take()
221 }
222
223 pub async fn connect(&self) -> Result<(), WebexError> {
225 {
226 let connecting = self.connecting.lock().await;
227 let connected = self.connected.lock().await;
228 if *connecting {
229 return Err(WebexError::Internal("connect() already in progress".into()));
230 }
231 if *connected {
232 return Err(WebexError::Internal(
233 "Already connected. Call disconnect() first, or use reconnect().".into(),
234 ));
235 }
236 }
237
238 info!("Connecting to Webex...");
239 *self.connecting.lock().await = true;
240
241 let result = self.connect_internal().await;
242
243 *self.connecting.lock().await = false;
244
245 match result {
246 Ok(()) => {
247 *self.connected.lock().await = true;
248 info!("Connected to Webex");
249 if self.event_tx.send(HandlerEvent::Connected).is_err() {
250 warn!("Event receiver dropped, cannot send Connected event");
251 }
252 Ok(())
253 }
254 Err(e) => Err(e),
255 }
256 }
257
258 async fn fetch_bot_person_id(&self) -> Result<(), WebexError> {
259 info!("Fetching bot person info for self-message filtering");
260 let token = self.token.lock().await.clone();
261 let req = FetchRequest {
262 url: "https://webexapis.com/v1/people/me".into(),
263 method: "GET".into(),
264 headers: {
265 let mut h = std::collections::HashMap::new();
266 h.insert("Authorization".into(), format!("Bearer {}", token));
267 h.insert("Content-Type".into(), "application/json".into());
268 h
269 },
270 body: None,
271 };
272
273 let resp = (self.http_do)(req).await.map_err(|e| {
274 WebexError::Internal(format!(
275 "Failed to fetch bot identity for self-message filtering: {e}. \
276 Set ignore_self_messages to false to skip this check (not recommended — may cause message loops)."
277 ))
278 })?;
279
280 if !resp.ok {
281 return Err(WebexError::Internal(format!(
282 "Failed to fetch bot identity for self-message filtering: HTTP {}. \
283 Set ignore_self_messages to false to skip this check (not recommended — may cause message loops).",
284 resp.status
285 )));
286 }
287
288 let data: serde_json::Value = serde_json::from_slice(&resp.body).map_err(|e| {
289 WebexError::Internal(format!("Failed to parse bot identity response: {e}"))
290 })?;
291
292 let id = data
293 .get("id")
294 .and_then(|v| v.as_str())
295 .ok_or_else(|| WebexError::Internal("Bot identity response missing 'id' field".into()))?;
296
297 let uuid = extract_person_uuid(id);
298 info!("Bot person ID cached for self-message filtering: {}", uuid);
299 *self.bot_person_id.lock().await = Some(uuid);
300 Ok(())
301 }
302
303 async fn connect_internal(&self) -> Result<(), WebexError> {
304 let token = self.token.lock().await.clone();
305
306 let reg = {
308 let mut dm = self.device_manager.lock().await;
309 dm.register(&token).await?
310 };
311 info!("Device registered");
312
313 if self.ignore_self_messages {
315 self.fetch_bot_person_id().await?;
316 }
317
318 let kms = KmsClient::new(
320 self.http_do.clone(),
321 &token,
322 ®.device_url,
323 ®.user_id,
324 ®.encryption_service_url,
325 );
326
327 let response_handler = kms.response_handler();
330 *self.kms_response_handler.lock().await = Some(response_handler);
331 *self.kms_client.lock().await = Some(kms);
332
333 self.mercury_socket
335 .connect(®.web_socket_url, &token)
336 .await?;
337 info!("Mercury connected");
338
339 self.start_mercury_event_loop().await;
341
342 {
344 let mut kms_guard = self.kms_client.lock().await;
345 if let Some(ref mut kms) = *kms_guard {
346 kms.initialize().await?;
347 }
348 }
349 info!("KMS initialized");
350
351 *self.registration.lock().await = Some(reg);
353
354 Ok(())
355 }
356
357 async fn start_mercury_event_loop(&self) {
359 let mut mercury_rx = match self.mercury_socket.take_event_rx().await {
360 Some(rx) => rx,
361 None => {
362 warn!("Mercury event receiver already taken");
363 return;
364 }
365 };
366
367 let kms_client = self.kms_client.clone();
368 let kms_response_handler = self.kms_response_handler.clone();
369 let event_tx = self.event_tx.clone();
370 let connected = self.connected.clone();
371 let registration = self.registration.clone();
372 let device_manager = self.device_manager.clone();
373 let token = self.token.clone();
374 let bot_person_id = self.bot_person_id.clone();
375
376 tokio::spawn(async move {
377 while let Some(event) = mercury_rx.recv().await {
378 match event {
379 MercuryEvent::KmsResponse(data) => {
380 let handler_guard = kms_response_handler.lock().await;
383 if let Some(ref handler) = *handler_guard {
384 handler.handle_kms_message(&data).await;
385 }
386 }
387 MercuryEvent::Activity(activity) => {
388 let kms_client_clone = kms_client.clone();
391 let event_tx_clone = event_tx.clone();
392 let bot_person_id = bot_person_id.clone();
393 tokio::spawn(async move {
394 let mut kms_guard = kms_client_clone.lock().await;
395 if let Some(ref mut kms) = *kms_guard {
396 let bot_id = bot_person_id.lock().await.clone();
397 Self::handle_activity_static(kms, &activity, &event_tx_clone, bot_id.as_deref()).await;
398 } else {
399 warn!("Received activity but KMS client not initialized");
400 }
401 });
402 }
403 MercuryEvent::Connected => {
404 info!("Mercury reconnected, refreshing device and KMS");
405
406 let tok = token.lock().await.clone();
408 {
409 let reg_guard = registration.lock().await;
410 if reg_guard.is_some() {
411 let dm = device_manager.lock().await;
412 match dm.refresh(&tok).await {
413 Ok(new_reg) => {
414 drop(reg_guard);
415 *registration.lock().await = Some(new_reg);
416 }
417 Err(e) => {
418 warn!("Device refresh on reconnect failed: {e}");
419 }
420 }
421 }
422 }
423
424 {
426 let mut kms_guard = kms_client.lock().await;
427 if let Some(ref mut kms) = *kms_guard {
428 if let Err(e) = kms.initialize().await {
429 warn!("KMS re-init on reconnect failed: {e}");
430 }
431 }
432 }
433
434 *connected.lock().await = true;
435 if event_tx.send(HandlerEvent::Connected).is_err() {
436 warn!("Event receiver dropped, cannot send Connected event");
437 }
438 }
439 MercuryEvent::Disconnected(reason) => {
440 *connected.lock().await = false;
441 if event_tx.send(HandlerEvent::Disconnected(reason)).is_err() {
442 warn!("Event receiver dropped, cannot send Disconnected event");
443 }
444 }
445 MercuryEvent::Reconnecting(attempt) => {
446 if event_tx.send(HandlerEvent::Reconnecting(attempt)).is_err() {
447 warn!("Event receiver dropped, cannot send Reconnecting event");
448 }
449 }
450 MercuryEvent::Error(msg) => {
451 if event_tx.send(HandlerEvent::Error(msg)).is_err() {
452 warn!("Event receiver dropped, cannot send Error event");
453 }
454 }
455 }
456 }
457 });
458 }
459
460 async fn handle_activity_static(
462 kms: &mut KmsClient,
463 activity: &MercuryActivity,
464 event_tx: &mpsc::UnboundedSender<HandlerEvent>,
465 bot_person_id: Option<&str>,
466 ) {
467 if activity.verb == "post" && activity.object.object_type == "comment" {
469 let mut decryptor = MessageDecryptor::new(kms);
470 match decryptor.decrypt_activity(activity).await {
471 Ok(decrypted) => {
472 let msg = DecryptedMessage {
473 id: decrypted.object.id.clone(),
474 room_id: decrypted.target.id.clone(),
475 person_id: decrypted.actor.id.clone(),
476 person_email: decrypted
477 .actor
478 .email_address
479 .clone()
480 .unwrap_or_default(),
481 text: decrypted.object.display_name.clone().unwrap_or_default(),
482 html: decrypted.object.content.clone(),
483 created: decrypted.published.clone(),
484 room_type: infer_room_type(&decrypted),
485 raw: decrypted,
486 };
487
488 if let Some(bot_id) = bot_person_id {
490 if extract_person_uuid(&msg.person_id) == bot_id {
491 info!("Ignoring self-message from bot ({})", bot_id);
492 return;
493 }
494 }
495
496 if event_tx.send(HandlerEvent::MessageCreated(msg)).is_err() {
497 warn!("Event receiver dropped, cannot send MessageCreated event");
498 }
499 }
500 Err(e) => {
501 error!("Error decrypting activity: {e}");
502 if event_tx.send(HandlerEvent::Error(e.to_string())).is_err() {
503 warn!("Event receiver dropped, cannot send Error event");
504 }
505 }
506 }
507 return;
508 }
509
510 if activity.verb == "delete" && activity.object.object_type == "activity" {
512 if event_tx.send(HandlerEvent::MessageDeleted(DeletedMessage {
513 message_id: activity.object.id.clone(),
514 room_id: activity.target.id.clone(),
515 person_id: activity.actor.id.clone(),
516 })).is_err() {
517 warn!("Event receiver dropped, cannot send MessageDeleted event");
518 }
519 return;
520 }
521
522 let membership_verbs = ["add", "leave", "assignModerator", "unassignModerator"];
524 if membership_verbs.contains(&activity.verb.as_str())
525 && activity.object.object_type == "person"
526 {
527 let event = HandlerEvent::MembershipCreated(MembershipActivity {
528 id: activity.id.clone(),
529 actor_id: activity.actor.id.clone(),
530 person_id: activity.object.id.clone(),
531 room_id: activity.target.id.clone(),
532 action: activity.verb.clone(),
533 created: activity.published.clone(),
534 room_type: infer_room_type(activity),
535 raw: activity.clone(),
536 });
537 if event_tx.send(event).is_err() {
538 warn!("Event receiver dropped, cannot send MembershipCreated event");
539 }
540 }
541 }
542
543 pub async fn disconnect(&self) {
545 info!("Disconnecting from Webex...");
546 *self.connected.lock().await = false;
547
548 self.mercury_socket.disconnect().await;
549
550 let token = self.token.lock().await.clone();
551 {
552 let reg = self.registration.lock().await;
553 if reg.is_some() {
554 let mut dm = self.device_manager.lock().await;
555 if let Err(e) = dm.unregister(&token).await {
556 warn!("Failed to unregister device: {e}");
557 } else {
558 info!("Device unregistered");
559 }
560 }
561 }
562
563 *self.registration.lock().await = None;
564 *self.kms_client.lock().await = None;
565 *self.kms_response_handler.lock().await = None;
566 *self.bot_person_id.lock().await = None;
567 }
568
569 pub async fn reconnect(&self, new_token: &str) -> Result<(), WebexError> {
571 if new_token.is_empty() {
572 return Err(WebexError::Internal(
573 "reconnect() requires a non-empty token string".into(),
574 ));
575 }
576
577 info!("Reconnecting with new token...");
578 self.disconnect().await;
579
580 *self.token.lock().await = new_token.to_string();
581 self.connect().await
582 }
583
584 pub async fn connected(&self) -> bool {
586 let conn = *self.connected.lock().await;
587 conn && self.mercury_socket.connected().await
588 }
589
590 pub async fn status(&self) -> HandlerStatus {
592 let reconnect_attempt = self.mercury_socket.current_reconnect_attempts().await;
593 let ws_open = self.mercury_socket.connected().await;
594 let is_connected = *self.connected.lock().await;
595 let is_connecting = *self.connecting.lock().await;
596
597 let status = if is_connected && ws_open {
598 ConnectionStatus::Connected
599 } else if is_connecting {
600 ConnectionStatus::Connecting
601 } else if reconnect_attempt > 0 {
602 ConnectionStatus::Reconnecting
603 } else {
604 ConnectionStatus::Disconnected
605 };
606
607 HandlerStatus {
608 status,
609 web_socket_open: ws_open,
610 kms_initialized: self.kms_client.lock().await.is_some(),
611 device_registered: self.registration.lock().await.is_some(),
612 reconnect_attempt,
613 }
614 }
615}
616
617fn infer_room_type(activity: &MercuryActivity) -> Option<String> {
618 let tags = &activity.target.tags;
619 if tags.contains(&"ONE_ON_ONE".to_string()) {
620 return Some("direct".to_string());
621 }
622 if tags.contains(&"TEAM".to_string())
623 || tags.contains(&"LOCKED".to_string())
624 || tags.contains(&"GROUP".to_string())
625 {
626 return Some("group".to_string());
627 }
628 None
629}