1use crate::device_manager::DeviceManager;
4use crate::errors::WebexError;
5use crate::kms_client::{KmsClient, KmsResponseHandler};
6use crate::mercury_socket::{MercuryEvent, MercurySocket};
7use crate::message_decryptor::MessageDecryptor;
8use crate::types::{
9 Config, ConnectionStatus, DecryptedMessage, DeletedMessage, DeviceRegistration, FetchRequest,
10 FetchResponse, HandlerStatus, MercuryActivity, NetworkMode,
11};
12use std::future::Future;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::{mpsc, Mutex};
17use tracing::{error, info, warn};
18
19type HttpDoFn = Arc<
21 dyn Fn(FetchRequest) -> Pin<Box<dyn Future<Output = Result<FetchResponse, Box<dyn std::error::Error + Send + Sync>>> + Send>>
22 + Send
23 + Sync,
24>;
25
26#[derive(Debug, Clone)]
28pub enum HandlerEvent {
29 MessageCreated(DecryptedMessage),
31 MessageDeleted(DeletedMessage),
33 Connected,
35 Disconnected(String),
37 Reconnecting(u32),
39 Error(String),
41}
42
43fn create_native_http_adapter(client: reqwest::Client) -> HttpDoFn {
45 Arc::new(move |req: FetchRequest| {
46 let client = client.clone();
47 Box::pin(async move {
48 let mut request_builder = match req.method.as_str() {
49 "GET" => client.get(&req.url),
50 "POST" => client.post(&req.url),
51 "PUT" => client.put(&req.url),
52 "DELETE" => client.delete(&req.url),
53 _ => return Err(format!("Unsupported HTTP method: {}", req.method).into()),
54 };
55
56 for (key, value) in req.headers {
57 request_builder = request_builder.header(key, value);
58 }
59
60 if let Some(body) = req.body {
61 request_builder = request_builder.body(body);
62 }
63
64 let response = request_builder
65 .send()
66 .await
67 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
68
69 let status = response.status().as_u16();
70 let ok = response.status().is_success();
71 let body_bytes = response
72 .bytes()
73 .await
74 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?
75 .to_vec();
76
77 Ok(FetchResponse {
78 status,
79 ok,
80 body: body_bytes,
81 })
82 })
83 })
84}
85
86pub struct WebexMessageHandler {
88 token: Arc<Mutex<String>>,
89 http_do: HttpDoFn,
90 device_manager: Arc<Mutex<DeviceManager>>,
91 mercury_socket: Arc<MercurySocket>,
92 kms_client: Arc<Mutex<Option<KmsClient>>>,
93 kms_response_handler: Arc<Mutex<Option<KmsResponseHandler>>>,
95 registration: Arc<Mutex<Option<DeviceRegistration>>>,
96 connected: Arc<Mutex<bool>>,
97 connecting: Arc<Mutex<bool>>,
98 ignore_self_messages: bool,
99 bot_person_id: Arc<Mutex<Option<String>>>,
100
101 #[allow(dead_code)]
102 config: Config,
103 event_tx: mpsc::UnboundedSender<HandlerEvent>,
104 event_rx: Arc<Mutex<Option<mpsc::UnboundedReceiver<HandlerEvent>>>>,
105}
106
107impl WebexMessageHandler {
108 pub fn new(config: Config) -> Result<Self, WebexError> {
110 if config.token.is_empty() {
111 return Err(WebexError::Internal(
112 "WebexMessageHandler requires a non-empty token string".into(),
113 ));
114 }
115
116 match config.mode {
118 NetworkMode::Injected => {
119 if config.fetch.is_none() || config.web_socket_factory.is_none() {
120 return Err(WebexError::Internal(
121 "Injected mode requires both fetch and web_socket_factory".into(),
122 ));
123 }
124 if config.client.is_some() {
125 return Err(WebexError::Internal(
126 "Cannot use native proxy parameters (client) in injected mode".into(),
127 ));
128 }
129 }
130 NetworkMode::Native => {
131 if config.fetch.is_some() || config.web_socket_factory.is_some() {
132 return Err(WebexError::Internal(
133 "Cannot provide fetch/web_socket_factory in native mode — set mode to Injected".into(),
134 ));
135 }
136 }
137 }
138
139 let (http_do, ws_factory) = match config.mode {
141 NetworkMode::Native => {
142 let client = config.client.clone().unwrap_or_else(|| reqwest::Client::new());
143 let http_adapter = create_native_http_adapter(client.clone());
144 (http_adapter, None)
145 }
146 NetworkMode::Injected => {
147 let http_adapter = config.fetch.clone().unwrap();
148 let ws_factory = config.web_socket_factory.clone();
149 (http_adapter, ws_factory)
150 }
151 };
152
153 let mercury_socket = MercurySocket::new(
154 ws_factory,
155 Duration::from_secs_f64(config.ping_interval),
156 Duration::from_secs_f64(config.pong_timeout),
157 Duration::from_secs_f64(config.reconnect_backoff_max),
158 config.max_reconnect_attempts,
159 );
160
161 let (event_tx, event_rx) = mpsc::unbounded_channel();
162
163 let ignore_self_messages = config.ignore_self_messages;
164
165 Ok(Self {
166 token: Arc::new(Mutex::new(config.token.clone())),
167 http_do: http_do.clone(),
168 device_manager: Arc::new(Mutex::new(DeviceManager::new(http_do.clone()))),
169 mercury_socket: Arc::new(mercury_socket),
170 kms_client: Arc::new(Mutex::new(None)),
171 kms_response_handler: Arc::new(Mutex::new(None)),
172 registration: Arc::new(Mutex::new(None)),
173 connected: Arc::new(Mutex::new(false)),
174 connecting: Arc::new(Mutex::new(false)),
175 ignore_self_messages,
176 bot_person_id: Arc::new(Mutex::new(None)),
177 config,
178 event_tx,
179 event_rx: Arc::new(Mutex::new(Some(event_rx))),
180 })
181 }
182
183 pub async fn take_event_rx(&self) -> Option<mpsc::UnboundedReceiver<HandlerEvent>> {
185 self.event_rx.lock().await.take()
186 }
187
188 pub async fn connect(&self) -> Result<(), WebexError> {
190 {
191 let connecting = self.connecting.lock().await;
192 if *connecting {
193 return Err(WebexError::Internal("connect() already in progress".into()));
194 }
195 }
196 {
197 let connected = self.connected.lock().await;
198 if *connected {
199 return Err(WebexError::Internal(
200 "Already connected. Call disconnect() first, or use reconnect().".into(),
201 ));
202 }
203 }
204
205 info!("Connecting to Webex...");
206 *self.connecting.lock().await = true;
207
208 let result = self.connect_internal().await;
209
210 *self.connecting.lock().await = false;
211
212 match result {
213 Ok(()) => {
214 *self.connected.lock().await = true;
215 info!("Connected to Webex");
216 let _ = self.event_tx.send(HandlerEvent::Connected);
217 Ok(())
218 }
219 Err(e) => Err(e),
220 }
221 }
222
223 async fn fetch_bot_person_id(&self) {
224 info!("Fetching bot person info for self-message filtering");
225 let token = self.token.lock().await.clone();
226 let req = FetchRequest {
227 url: "https://webexapis.com/v1/people/me".into(),
228 method: "GET".into(),
229 headers: {
230 let mut h = std::collections::HashMap::new();
231 h.insert("Authorization".into(), format!("Bearer {}", token));
232 h.insert("Content-Type".into(), "application/json".into());
233 h
234 },
235 body: None,
236 };
237
238 match (self.http_do)(req).await {
239 Ok(resp) => {
240 if !resp.ok {
241 warn!("Failed to fetch bot person info: HTTP {}", resp.status);
242 return;
243 }
244 match serde_json::from_slice::<serde_json::Value>(&resp.body) {
245 Ok(data) => {
246 if let Some(id) = data.get("id").and_then(|v| v.as_str()) {
247 info!("Bot person ID cached for self-message filtering: {}", id);
248 *self.bot_person_id.lock().await = Some(id.to_string());
249 }
250 }
251 Err(e) => {
252 warn!("Error parsing bot person info: {}", e);
253 }
254 }
255 }
256 Err(e) => {
257 warn!("Error fetching bot person info: {}", e);
258 }
259 }
260 }
261
262 async fn connect_internal(&self) -> Result<(), WebexError> {
263 let token = self.token.lock().await.clone();
264
265 let reg = {
267 let mut dm = self.device_manager.lock().await;
268 dm.register(&token).await?
269 };
270 info!("Device registered");
271
272 if self.ignore_self_messages {
274 self.fetch_bot_person_id().await;
275 }
276
277 let kms = KmsClient::new(
279 self.http_do.clone(),
280 &token,
281 ®.device_url,
282 ®.user_id,
283 ®.encryption_service_url,
284 );
285
286 let response_handler = kms.response_handler();
289 *self.kms_response_handler.lock().await = Some(response_handler);
290 *self.kms_client.lock().await = Some(kms);
291
292 self.mercury_socket
294 .connect(®.web_socket_url, &token)
295 .await?;
296 info!("Mercury connected");
297
298 self.start_mercury_event_loop().await;
300
301 {
303 let mut kms_guard = self.kms_client.lock().await;
304 if let Some(ref mut kms) = *kms_guard {
305 kms.initialize().await?;
306 }
307 }
308 info!("KMS initialized");
309
310 *self.registration.lock().await = Some(reg);
312
313 Ok(())
314 }
315
316 async fn start_mercury_event_loop(&self) {
318 let mut mercury_rx = match self.mercury_socket.take_event_rx().await {
319 Some(rx) => rx,
320 None => {
321 warn!("Mercury event receiver already taken");
322 return;
323 }
324 };
325
326 let kms_client = self.kms_client.clone();
327 let kms_response_handler = self.kms_response_handler.clone();
328 let event_tx = self.event_tx.clone();
329 let connected = self.connected.clone();
330 let registration = self.registration.clone();
331 let device_manager = self.device_manager.clone();
332 let token = self.token.clone();
333 let bot_person_id = self.bot_person_id.clone();
334
335 tokio::spawn(async move {
336 while let Some(event) = mercury_rx.recv().await {
337 match event {
338 MercuryEvent::KmsResponse(data) => {
339 let handler_guard = kms_response_handler.lock().await;
342 if let Some(ref handler) = *handler_guard {
343 handler.handle_kms_message(&data).await;
344 }
345 }
346 MercuryEvent::Activity(activity) => {
347 let kms_client_clone = kms_client.clone();
350 let event_tx_clone = event_tx.clone();
351 let bot_person_id = bot_person_id.clone();
352 tokio::spawn(async move {
353 let mut kms_guard = kms_client_clone.lock().await;
354 if let Some(ref mut kms) = *kms_guard {
355 let bot_id = bot_person_id.lock().await.clone();
356 Self::handle_activity_static(kms, &activity, &event_tx_clone, bot_id.as_deref()).await;
357 } else {
358 warn!("Received activity but KMS client not initialized");
359 }
360 });
361 }
362 MercuryEvent::Connected => {
363 info!("Mercury reconnected, refreshing device and KMS");
364
365 let tok = token.lock().await.clone();
367 {
368 let reg_guard = registration.lock().await;
369 if reg_guard.is_some() {
370 let dm = device_manager.lock().await;
371 match dm.refresh(&tok).await {
372 Ok(new_reg) => {
373 drop(reg_guard);
374 *registration.lock().await = Some(new_reg);
375 }
376 Err(e) => {
377 warn!("Device refresh on reconnect failed: {e}");
378 }
379 }
380 }
381 }
382
383 {
385 let mut kms_guard = kms_client.lock().await;
386 if let Some(ref mut kms) = *kms_guard {
387 if let Err(e) = kms.initialize().await {
388 warn!("KMS re-init on reconnect failed: {e}");
389 }
390 }
391 }
392
393 *connected.lock().await = true;
394 let _ = event_tx.send(HandlerEvent::Connected);
395 }
396 MercuryEvent::Disconnected(reason) => {
397 *connected.lock().await = false;
398 let _ = event_tx.send(HandlerEvent::Disconnected(reason));
399 }
400 MercuryEvent::Reconnecting(attempt) => {
401 let _ = event_tx.send(HandlerEvent::Reconnecting(attempt));
402 }
403 MercuryEvent::Error(msg) => {
404 let _ = event_tx.send(HandlerEvent::Error(msg));
405 }
406 }
407 }
408 });
409 }
410
411 async fn handle_activity_static(
413 kms: &mut KmsClient,
414 activity: &MercuryActivity,
415 event_tx: &mpsc::UnboundedSender<HandlerEvent>,
416 bot_person_id: Option<&str>,
417 ) {
418 if activity.verb == "post" && activity.object.object_type == "comment" {
420 let mut decryptor = MessageDecryptor::new(kms);
421 match decryptor.decrypt_activity(activity).await {
422 Ok(decrypted) => {
423 let msg = DecryptedMessage {
424 id: decrypted.object.id.clone(),
425 room_id: decrypted.target.id.clone(),
426 person_id: decrypted.actor.id.clone(),
427 person_email: decrypted
428 .actor
429 .email_address
430 .clone()
431 .unwrap_or_default(),
432 text: decrypted.object.display_name.clone().unwrap_or_default(),
433 html: decrypted.object.content.clone(),
434 created: decrypted.published.clone(),
435 room_type: infer_room_type(&decrypted),
436 raw: decrypted,
437 };
438
439 if let Some(bot_id) = bot_person_id {
441 if msg.person_id == bot_id {
442 info!("Ignoring self-message from bot ({})", bot_id);
443 return;
444 }
445 }
446
447 let _ = event_tx.send(HandlerEvent::MessageCreated(msg));
448 }
449 Err(e) => {
450 error!("Error decrypting activity: {e}");
451 let _ = event_tx.send(HandlerEvent::Error(e.to_string()));
452 }
453 }
454 return;
455 }
456
457 if activity.verb == "delete" && activity.object.object_type == "activity" {
459 let _ = event_tx.send(HandlerEvent::MessageDeleted(DeletedMessage {
460 message_id: activity.object.id.clone(),
461 room_id: activity.target.id.clone(),
462 person_id: activity.actor.id.clone(),
463 }));
464 }
465 }
466
467 pub async fn disconnect(&self) {
469 info!("Disconnecting from Webex...");
470 *self.connected.lock().await = false;
471
472 self.mercury_socket.disconnect().await;
473
474 let token = self.token.lock().await.clone();
475 {
476 let reg = self.registration.lock().await;
477 if reg.is_some() {
478 let mut dm = self.device_manager.lock().await;
479 if let Err(e) = dm.unregister(&token).await {
480 warn!("Failed to unregister device: {e}");
481 } else {
482 info!("Device unregistered");
483 }
484 }
485 }
486
487 *self.registration.lock().await = None;
488 *self.kms_client.lock().await = None;
489 *self.kms_response_handler.lock().await = None;
490 *self.bot_person_id.lock().await = None;
491 }
492
493 pub async fn reconnect(&self, new_token: &str) -> Result<(), WebexError> {
495 if new_token.is_empty() {
496 return Err(WebexError::Internal(
497 "reconnect() requires a non-empty token string".into(),
498 ));
499 }
500
501 info!("Reconnecting with new token...");
502 self.disconnect().await;
503
504 *self.token.lock().await = new_token.to_string();
505 self.connect().await
506 }
507
508 pub async fn connected(&self) -> bool {
510 let conn = *self.connected.lock().await;
511 conn && self.mercury_socket.connected().await
512 }
513
514 pub async fn status(&self) -> HandlerStatus {
516 let reconnect_attempt = self.mercury_socket.current_reconnect_attempts().await;
517 let ws_open = self.mercury_socket.connected().await;
518 let is_connected = *self.connected.lock().await;
519 let is_connecting = *self.connecting.lock().await;
520
521 let status = if is_connected && ws_open {
522 ConnectionStatus::Connected
523 } else if is_connecting {
524 ConnectionStatus::Connecting
525 } else if reconnect_attempt > 0 {
526 ConnectionStatus::Reconnecting
527 } else {
528 ConnectionStatus::Disconnected
529 };
530
531 HandlerStatus {
532 status,
533 web_socket_open: ws_open,
534 kms_initialized: self.kms_client.lock().await.is_some(),
535 device_registered: self.registration.lock().await.is_some(),
536 reconnect_attempt,
537 }
538 }
539}
540
541fn infer_room_type(activity: &MercuryActivity) -> Option<String> {
542 let tags = &activity.target.tags;
543 if tags.contains(&"ONE_ON_ONE".to_string()) {
544 return Some("direct".to_string());
545 }
546 if tags.contains(&"TEAM".to_string())
547 || tags.contains(&"LOCKED".to_string())
548 || tags.contains(&"GROUP".to_string())
549 {
550 return Some("group".to_string());
551 }
552 None
553}