1use crate::error::Error;
19use serde_json::{json, Value};
20use std::collections::HashMap;
21use std::sync::{Arc, Mutex, RwLock};
22use tokio::sync::{mpsc, oneshot};
23use tokio::time::{sleep, timeout, Duration};
24use uuid::Uuid;
25
26pub struct RoomOptions {
29 pub auto_reconnect: bool,
30 pub max_reconnect_attempts: u32,
31 pub reconnect_base_delay_ms: u64,
32 pub send_timeout_ms: u64,
34 pub connection_timeout_ms: u64,
36}
37
38impl Default for RoomOptions {
39 fn default() -> Self {
40 Self {
41 auto_reconnect: true,
42 max_reconnect_attempts: 10,
43 reconnect_base_delay_ms: 1000,
44 send_timeout_ms: 10_000,
45 connection_timeout_ms: 15_000,
46 }
47 }
48}
49
50pub struct Subscription {
54 remove_fn: Mutex<Option<Box<dyn FnOnce() + Send>>>,
55}
56
57impl Subscription {
58 fn new(remove_fn: impl FnOnce() + Send + 'static) -> Self {
59 Self {
60 remove_fn: Mutex::new(Some(Box::new(remove_fn))),
61 }
62 }
63
64 pub fn unsubscribe(self) {
66 if let Some(f) = self.remove_fn.lock().unwrap().take() {
67 f();
68 }
69 }
70}
71
72impl Drop for Subscription {
73 fn drop(&mut self) {
74 if let Some(f) = self.remove_fn.lock().unwrap().take() {
75 f();
76 }
77 }
78}
79
80type StateHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
83type MessageHandler = Arc<dyn Fn(&Value) + Send + Sync>;
84type ErrorHandler = Arc<dyn Fn(&str, &str) + Send + Sync>;
85type KickedHandler = Arc<dyn Fn() + Send + Sync>;
86type MembersSyncHandler = Arc<dyn Fn(&Value) + Send + Sync>;
87type MemberHandler = Arc<dyn Fn(&Value) + Send + Sync>;
88type MemberLeaveHandler = Arc<dyn Fn(&Value, &str) + Send + Sync>;
89type MemberStateHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
90type SignalHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
91type AnySignalHandler = Arc<dyn Fn(&str, &Value, &Value) + Send + Sync>;
92type MediaTrackHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
93type MediaStateHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
94type MediaDeviceHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
95type ReconnectHandler = Arc<dyn Fn(&Value) + Send + Sync>;
96type ConnectionStateHandler = Arc<dyn Fn(&str) + Send + Sync>;
97
98type HandlerList<H> = Arc<Mutex<Vec<(u64, H)>>>;
100
101pub(crate) enum RoomWsCommand {
102 Send(String),
103 Close,
104}
105
106const ROOM_EXPLICIT_LEAVE_CLOSE_DELAY: Duration = Duration::from_millis(40);
107const ROOM_STATE_IDLE: &str = "idle";
108const ROOM_STATE_CONNECTING: &str = "connecting";
109const ROOM_STATE_CONNECTED: &str = "connected";
110const ROOM_STATE_RECONNECTING: &str = "reconnecting";
111const ROOM_STATE_DISCONNECTED: &str = "disconnected";
112const ROOM_STATE_KICKED: &str = "kicked";
113
114pub struct RoomClient {
117 pub namespace: String,
119 pub room_id: String,
121
122 shared_state: RwLock<Value>,
124 shared_version: RwLock<u64>,
125 player_state: RwLock<Value>,
126 player_version: RwLock<u64>,
127 members: RwLock<Value>,
128 media_members: RwLock<Value>,
129 current_user_id: Mutex<Option<String>>,
130 current_connection_id: Mutex<Option<String>>,
131 connection_state: RwLock<String>,
132 reconnect_info: RwLock<Option<Value>>,
133
134 base_url: String,
136 token_fn: Box<dyn Fn() -> String + Send + Sync>,
137 opts: RoomOptions,
138
139 shared_state_handlers: HandlerList<StateHandler>,
141 player_state_handlers: HandlerList<StateHandler>,
142 message_handlers: Arc<Mutex<HashMap<String, Vec<(u64, MessageHandler)>>>>,
143 error_handlers: HandlerList<ErrorHandler>,
144 kicked_handlers: HandlerList<KickedHandler>,
145 member_sync_handlers: HandlerList<MembersSyncHandler>,
146 member_join_handlers: HandlerList<MemberHandler>,
147 member_leave_handlers: HandlerList<MemberLeaveHandler>,
148 member_state_handlers: HandlerList<MemberStateHandler>,
149 signal_handlers: Arc<Mutex<HashMap<String, Vec<(u64, SignalHandler)>>>>,
150 any_signal_handlers: HandlerList<AnySignalHandler>,
151 media_track_handlers: HandlerList<MediaTrackHandler>,
152 media_track_removed_handlers: HandlerList<MediaTrackHandler>,
153 media_state_handlers: HandlerList<MediaStateHandler>,
154 media_device_handlers: HandlerList<MediaDeviceHandler>,
155 reconnect_handlers: HandlerList<ReconnectHandler>,
156 connection_state_handlers: HandlerList<ConnectionStateHandler>,
157 handler_id_counter: Mutex<u64>,
158
159 pending_requests: Mutex<HashMap<String, oneshot::Sender<Result<Value, Error>>>>,
161 pending_signal_requests: Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
162 pending_admin_requests: Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
163 pending_member_state_requests: Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
164 pending_media_requests: Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
165
166 send_tx: Mutex<Option<mpsc::Sender<RoomWsCommand>>>,
168
169 stop_tx: Mutex<Option<mpsc::Sender<()>>>,
171 intentionally_left: Mutex<bool>,
172}
173
174impl RoomClient {
175 pub fn new(
184 base_url: &str,
185 namespace: &str,
186 room_id: &str,
187 token_fn: impl Fn() -> String + Send + Sync + 'static,
188 opts: Option<RoomOptions>,
189 ) -> Arc<Self> {
190 Arc::new(Self {
191 namespace: namespace.to_string(),
192 room_id: room_id.to_string(),
193 shared_state: RwLock::new(json!({})),
194 shared_version: RwLock::new(0),
195 player_state: RwLock::new(json!({})),
196 player_version: RwLock::new(0),
197 members: RwLock::new(json!([])),
198 media_members: RwLock::new(json!([])),
199 current_user_id: Mutex::new(None),
200 current_connection_id: Mutex::new(None),
201 connection_state: RwLock::new(ROOM_STATE_IDLE.to_string()),
202 reconnect_info: RwLock::new(None),
203 base_url: base_url.trim_end_matches('/').to_string(),
204 token_fn: Box::new(token_fn),
205 opts: opts.unwrap_or_default(),
206 shared_state_handlers: Arc::new(Mutex::new(vec![])),
207 player_state_handlers: Arc::new(Mutex::new(vec![])),
208 message_handlers: Arc::new(Mutex::new(HashMap::new())),
209 error_handlers: Arc::new(Mutex::new(vec![])),
210 kicked_handlers: Arc::new(Mutex::new(vec![])),
211 member_sync_handlers: Arc::new(Mutex::new(vec![])),
212 member_join_handlers: Arc::new(Mutex::new(vec![])),
213 member_leave_handlers: Arc::new(Mutex::new(vec![])),
214 member_state_handlers: Arc::new(Mutex::new(vec![])),
215 signal_handlers: Arc::new(Mutex::new(HashMap::new())),
216 any_signal_handlers: Arc::new(Mutex::new(vec![])),
217 media_track_handlers: Arc::new(Mutex::new(vec![])),
218 media_track_removed_handlers: Arc::new(Mutex::new(vec![])),
219 media_state_handlers: Arc::new(Mutex::new(vec![])),
220 media_device_handlers: Arc::new(Mutex::new(vec![])),
221 reconnect_handlers: Arc::new(Mutex::new(vec![])),
222 connection_state_handlers: Arc::new(Mutex::new(vec![])),
223 handler_id_counter: Mutex::new(0),
224 pending_requests: Mutex::new(HashMap::new()),
225 pending_signal_requests: Mutex::new(HashMap::new()),
226 pending_admin_requests: Mutex::new(HashMap::new()),
227 pending_member_state_requests: Mutex::new(HashMap::new()),
228 pending_media_requests: Mutex::new(HashMap::new()),
229 send_tx: Mutex::new(None),
230 stop_tx: Mutex::new(None),
231 intentionally_left: Mutex::new(false),
232 })
233 }
234
235 pub fn get_shared_state(&self) -> Value {
239 self.shared_state.read().unwrap().clone()
240 }
241
242 pub fn get_player_state(&self) -> Value {
244 self.player_state.read().unwrap().clone()
245 }
246
247 pub fn list_members(&self) -> Value {
249 self.members.read().unwrap().clone()
250 }
251
252 pub fn list_media_members(&self) -> Value {
254 self.media_members.read().unwrap().clone()
255 }
256
257 pub fn connection_state(&self) -> String {
259 self.connection_state.read().unwrap().clone()
260 }
261
262 pub fn state(self: &Arc<Self>) -> RoomStateNamespace {
263 RoomStateNamespace::new(Arc::clone(self))
264 }
265
266 pub fn meta(self: &Arc<Self>) -> RoomMetaNamespace {
267 RoomMetaNamespace::new(Arc::clone(self))
268 }
269
270 pub fn signals(self: &Arc<Self>) -> RoomSignalsNamespace {
271 RoomSignalsNamespace::new(Arc::clone(self))
272 }
273
274 pub fn members(self: &Arc<Self>) -> RoomMembersNamespace {
275 RoomMembersNamespace::new(Arc::clone(self))
276 }
277
278 pub fn admin(self: &Arc<Self>) -> RoomAdminNamespace {
279 RoomAdminNamespace::new(Arc::clone(self))
280 }
281
282 pub fn media(self: &Arc<Self>) -> RoomMediaNamespace {
283 RoomMediaNamespace::new(Arc::clone(self))
284 }
285
286 pub fn session(self: &Arc<Self>) -> RoomSessionNamespace {
287 RoomSessionNamespace::new(Arc::clone(self))
288 }
289
290 pub async fn get_metadata(&self) -> Result<Value, Error> {
295 Self::get_metadata_static(&self.base_url, &self.namespace, &self.room_id).await
296 }
297
298 pub async fn get_metadata_static(
301 base_url: &str,
302 namespace: &str,
303 room_id: &str,
304 ) -> Result<Value, Error> {
305 let url = format!(
306 "{}/api/room/metadata?namespace={}&id={}",
307 base_url.trim_end_matches('/'),
308 urlencoding::encode(namespace),
309 urlencoding::encode(room_id)
310 );
311 let resp = reqwest::get(&url)
312 .await
313 .map_err(|e| Error::Room(format!("Failed to get room metadata: {}", e)))?;
314 if !resp.status().is_success() {
315 return Err(Error::Room(format!(
316 "Failed to get room metadata: {}",
317 resp.status()
318 )));
319 }
320 let body = resp
321 .text()
322 .await
323 .map_err(|e| Error::Room(format!("Failed to read room metadata body: {}", e)))?;
324 serde_json::from_str(&body)
325 .map_err(|e| Error::Room(format!("Failed to parse room metadata: {}", e)))
326 }
327
328 pub async fn join(self: &Arc<Self>) -> Result<(), Error> {
332 *self.intentionally_left.lock().unwrap() = false;
333 self.set_connection_state(ROOM_STATE_CONNECTING);
334
335 let (stop_tx, mut stop_rx) = mpsc::channel::<()>(1);
336 *self.stop_tx.lock().unwrap() = Some(stop_tx);
337
338 let this = Arc::clone(self);
339 tokio::spawn(async move {
340 let mut attempts = 0u32;
341 loop {
342 match this.establish().await {
343 Ok(mut ws_rx) => {
344 attempts = 0;
345 loop {
346 tokio::select! {
347 _ = stop_rx.recv() => return,
348 msg = ws_rx.recv() => {
349 match msg {
350 Some(raw) => this.handle_message(&raw),
351 None => {
352 if !*this.intentionally_left.lock().unwrap() {
355 this.reject_all_pending(
356 "WebSocket disconnected",
357 );
358 }
359 break;
360 }
361 }
362 }
363 }
364 }
365 }
366 Err(_) => {}
367 }
368 if *this.intentionally_left.lock().unwrap() {
369 return;
370 }
371 if !this.opts.auto_reconnect || attempts >= this.opts.max_reconnect_attempts {
372 this.set_connection_state(ROOM_STATE_DISCONNECTED);
373 return;
374 }
375 let delay = (this.opts.reconnect_base_delay_ms * (1u64 << attempts)).min(30_000);
376 attempts += 1;
377 this.begin_reconnect_attempt(attempts as u64);
378 sleep(Duration::from_millis(delay)).await;
379 }
380 });
381 Ok(())
382 }
383
384 pub async fn leave(&self) {
386 *self.intentionally_left.lock().unwrap() = true;
387
388 let send_tx = self.send_tx.lock().unwrap().clone();
389 if let Some(tx) = send_tx.as_ref() {
390 let _ = tx
391 .send(RoomWsCommand::Send(
392 json!({"type": "leave"}).to_string(),
393 ))
394 .await;
395 sleep(ROOM_EXPLICIT_LEAVE_CLOSE_DELAY).await;
396 let _ = tx.send(RoomWsCommand::Close).await;
397 }
398
399 if let Some(tx) = self.stop_tx.lock().unwrap().take() {
400 let _ = tx.send(()).await;
401 }
402
403 *self.send_tx.lock().unwrap() = None;
404
405 self.reject_all_pending("Room left");
407
408 *self.shared_state.write().unwrap() = json!({});
410 *self.shared_version.write().unwrap() = 0;
411 *self.player_state.write().unwrap() = json!({});
412 *self.player_version.write().unwrap() = 0;
413 *self.members.write().unwrap() = json!([]);
414 *self.media_members.write().unwrap() = json!([]);
415 *self.current_user_id.lock().unwrap() = None;
416 *self.current_connection_id.lock().unwrap() = None;
417 *self.reconnect_info.write().unwrap() = None;
418 self.set_connection_state(ROOM_STATE_IDLE);
419 }
420
421 pub async fn send(&self, action_type: &str, payload: Option<Value>) -> Result<Value, Error> {
431 if self.send_tx.lock().unwrap().is_none() {
432 return Err(Error::Room("Not connected to room".to_string()));
433 }
434
435 let request_id = Uuid::new_v4().to_string();
436 let (tx, rx) = oneshot::channel::<Result<Value, Error>>();
437
438 self.pending_requests
439 .lock()
440 .unwrap()
441 .insert(request_id.clone(), tx);
442
443 self.ws_send(json!({
444 "type": "send",
445 "actionType": action_type,
446 "payload": payload.unwrap_or(json!({})),
447 "requestId": request_id,
448 }));
449
450 let timeout_ms = self.opts.send_timeout_ms;
451 let action_type_owned = action_type.to_string();
452 let req_id = request_id.clone();
453
454 match timeout(Duration::from_millis(timeout_ms), rx).await {
455 Ok(Ok(result)) => result,
456 Ok(Err(_)) => {
457 self.pending_requests.lock().unwrap().remove(&req_id);
459 Err(Error::Room(
460 "Room left while waiting for action result".to_string(),
461 ))
462 }
463 Err(_) => {
464 self.pending_requests.lock().unwrap().remove(&req_id);
466 Err(Error::RoomTimeout(format!(
467 "Action '{}' timed out",
468 action_type_owned
469 )))
470 }
471 }
472 }
473
474 pub fn on_shared_state(
479 &self,
480 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
481 ) -> Subscription {
482 let id = self.next_handler_id();
483 let handler = Arc::new(handler) as StateHandler;
484 self.shared_state_handlers.lock().unwrap().push((id, handler));
485
486 let list = Arc::clone(&self.shared_state_handlers);
487 Subscription::new(move || {
488 list.lock().unwrap().retain(|(hid, _)| *hid != id);
489 })
490 }
491
492 pub fn on_player_state(
495 &self,
496 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
497 ) -> Subscription {
498 let id = self.next_handler_id();
499 let handler = Arc::new(handler) as StateHandler;
500 self.player_state_handlers.lock().unwrap().push((id, handler));
501
502 let list = Arc::clone(&self.player_state_handlers);
503 Subscription::new(move || {
504 list.lock().unwrap().retain(|(hid, _)| *hid != id);
505 })
506 }
507
508 pub fn on_message(
515 &self,
516 msg_type: &str,
517 handler: impl Fn(&Value) + Send + Sync + 'static,
518 ) -> Subscription {
519 let id = self.next_handler_id();
520 let handler = Arc::new(handler) as MessageHandler;
521 let msg_type = msg_type.to_string();
522 {
523 let mut map = self.message_handlers.lock().unwrap();
524 map.entry(msg_type.clone())
525 .or_insert_with(Vec::new)
526 .push((id, handler));
527 }
528
529 let map = Arc::clone(&self.message_handlers);
530 Subscription::new(move || {
531 if let Some(list) = map.lock().unwrap().get_mut(&msg_type) {
532 list.retain(|(hid, _)| *hid != id);
533 }
534 })
535 }
536
537 pub fn on_error(
539 &self,
540 handler: impl Fn(&str, &str) + Send + Sync + 'static,
541 ) -> Subscription {
542 let id = self.next_handler_id();
543 let handler = Arc::new(handler) as ErrorHandler;
544 self.error_handlers.lock().unwrap().push((id, handler));
545
546 let list = Arc::clone(&self.error_handlers);
547 Subscription::new(move || {
548 list.lock().unwrap().retain(|(hid, _)| *hid != id);
549 })
550 }
551
552 pub fn on_kicked(&self, handler: impl Fn() + Send + Sync + 'static) -> Subscription {
554 let id = self.next_handler_id();
555 let handler = Arc::new(handler) as KickedHandler;
556 self.kicked_handlers.lock().unwrap().push((id, handler));
557
558 let list = Arc::clone(&self.kicked_handlers);
559 Subscription::new(move || {
560 list.lock().unwrap().retain(|(hid, _)| *hid != id);
561 })
562 }
563
564 pub fn on_members_sync(
565 &self,
566 handler: impl Fn(&Value) + Send + Sync + 'static,
567 ) -> Subscription {
568 let id = self.next_handler_id();
569 let handler = Arc::new(handler) as MembersSyncHandler;
570 self.member_sync_handlers.lock().unwrap().push((id, handler));
571
572 let list = Arc::clone(&self.member_sync_handlers);
573 Subscription::new(move || {
574 list.lock().unwrap().retain(|(hid, _)| *hid != id);
575 })
576 }
577
578 pub fn on_member_join(
579 &self,
580 handler: impl Fn(&Value) + Send + Sync + 'static,
581 ) -> Subscription {
582 let id = self.next_handler_id();
583 let handler = Arc::new(handler) as MemberHandler;
584 self.member_join_handlers.lock().unwrap().push((id, handler));
585
586 let list = Arc::clone(&self.member_join_handlers);
587 Subscription::new(move || {
588 list.lock().unwrap().retain(|(hid, _)| *hid != id);
589 })
590 }
591
592 pub fn on_member_leave(
593 &self,
594 handler: impl Fn(&Value, &str) + Send + Sync + 'static,
595 ) -> Subscription {
596 let id = self.next_handler_id();
597 let handler = Arc::new(handler) as MemberLeaveHandler;
598 self.member_leave_handlers.lock().unwrap().push((id, handler));
599
600 let list = Arc::clone(&self.member_leave_handlers);
601 Subscription::new(move || {
602 list.lock().unwrap().retain(|(hid, _)| *hid != id);
603 })
604 }
605
606 pub fn on_member_state_change(
607 &self,
608 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
609 ) -> Subscription {
610 let id = self.next_handler_id();
611 let handler = Arc::new(handler) as MemberStateHandler;
612 self.member_state_handlers.lock().unwrap().push((id, handler));
613
614 let list = Arc::clone(&self.member_state_handlers);
615 Subscription::new(move || {
616 list.lock().unwrap().retain(|(hid, _)| *hid != id);
617 })
618 }
619
620 pub fn on_signal(
621 &self,
622 event: &str,
623 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
624 ) -> Subscription {
625 let id = self.next_handler_id();
626 let handler = Arc::new(handler) as SignalHandler;
627 let event = event.to_string();
628 {
629 let mut map = self.signal_handlers.lock().unwrap();
630 map.entry(event.clone())
631 .or_insert_with(Vec::new)
632 .push((id, handler));
633 }
634
635 let map = Arc::clone(&self.signal_handlers);
636 Subscription::new(move || {
637 if let Some(list) = map.lock().unwrap().get_mut(&event) {
638 list.retain(|(hid, _)| *hid != id);
639 }
640 })
641 }
642
643 pub fn on_any_signal(
644 &self,
645 handler: impl Fn(&str, &Value, &Value) + Send + Sync + 'static,
646 ) -> Subscription {
647 let id = self.next_handler_id();
648 let handler = Arc::new(handler) as AnySignalHandler;
649 self.any_signal_handlers.lock().unwrap().push((id, handler));
650
651 let list = Arc::clone(&self.any_signal_handlers);
652 Subscription::new(move || {
653 list.lock().unwrap().retain(|(hid, _)| *hid != id);
654 })
655 }
656
657 pub fn on_media_track(
658 &self,
659 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
660 ) -> Subscription {
661 let id = self.next_handler_id();
662 let handler = Arc::new(handler) as MediaTrackHandler;
663 self.media_track_handlers.lock().unwrap().push((id, handler));
664
665 let list = Arc::clone(&self.media_track_handlers);
666 Subscription::new(move || {
667 list.lock().unwrap().retain(|(hid, _)| *hid != id);
668 })
669 }
670
671 pub fn on_media_track_removed(
672 &self,
673 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
674 ) -> Subscription {
675 let id = self.next_handler_id();
676 let handler = Arc::new(handler) as MediaTrackHandler;
677 self.media_track_removed_handlers
678 .lock()
679 .unwrap()
680 .push((id, handler));
681
682 let list = Arc::clone(&self.media_track_removed_handlers);
683 Subscription::new(move || {
684 list.lock().unwrap().retain(|(hid, _)| *hid != id);
685 })
686 }
687
688 pub fn on_media_state_change(
689 &self,
690 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
691 ) -> Subscription {
692 let id = self.next_handler_id();
693 let handler = Arc::new(handler) as MediaStateHandler;
694 self.media_state_handlers.lock().unwrap().push((id, handler));
695
696 let list = Arc::clone(&self.media_state_handlers);
697 Subscription::new(move || {
698 list.lock().unwrap().retain(|(hid, _)| *hid != id);
699 })
700 }
701
702 pub fn on_media_device_change(
703 &self,
704 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
705 ) -> Subscription {
706 let id = self.next_handler_id();
707 let handler = Arc::new(handler) as MediaDeviceHandler;
708 self.media_device_handlers.lock().unwrap().push((id, handler));
709
710 let list = Arc::clone(&self.media_device_handlers);
711 Subscription::new(move || {
712 list.lock().unwrap().retain(|(hid, _)| *hid != id);
713 })
714 }
715
716 pub fn on_reconnect(
717 &self,
718 handler: impl Fn(&Value) + Send + Sync + 'static,
719 ) -> Subscription {
720 let id = self.next_handler_id();
721 let handler = Arc::new(handler) as ReconnectHandler;
722 self.reconnect_handlers.lock().unwrap().push((id, handler));
723
724 let list = Arc::clone(&self.reconnect_handlers);
725 Subscription::new(move || {
726 list.lock().unwrap().retain(|(hid, _)| *hid != id);
727 })
728 }
729
730 pub fn on_connection_state_change(
731 &self,
732 handler: impl Fn(&str) + Send + Sync + 'static,
733 ) -> Subscription {
734 let id = self.next_handler_id();
735 let handler = Arc::new(handler) as ConnectionStateHandler;
736 self.connection_state_handlers
737 .lock()
738 .unwrap()
739 .push((id, handler));
740
741 let list = Arc::clone(&self.connection_state_handlers);
742 Subscription::new(move || {
743 list.lock().unwrap().retain(|(hid, _)| *hid != id);
744 })
745 }
746
747 pub async fn send_signal(
748 &self,
749 event: &str,
750 payload: Option<Value>,
751 options: Option<Value>,
752 ) -> Result<(), Error> {
753 if self.send_tx.lock().unwrap().is_none() {
754 return Err(Error::Room("Not connected to room".to_string()));
755 }
756
757 let request_id = Uuid::new_v4().to_string();
758 let options = options.unwrap_or_else(|| json!({}));
759 let message = json!({
760 "type": "signal",
761 "event": event,
762 "payload": payload.unwrap_or_else(|| json!({})),
763 "requestId": request_id,
764 "memberId": options.get("memberId").cloned().unwrap_or(Value::Null),
765 "includeSelf": options.get("includeSelf").and_then(Value::as_bool).unwrap_or(false),
766 });
767
768 self.send_unit_request(&self.pending_signal_requests, request_id, message, format!("Signal '{}' timed out", event)).await
769 }
770
771 pub async fn send_member_state(&self, state: Value) -> Result<(), Error> {
772 if self.send_tx.lock().unwrap().is_none() {
773 return Err(Error::Room("Not connected to room".to_string()));
774 }
775
776 let request_id = Uuid::new_v4().to_string();
777 let message = json!({
778 "type": "member_state",
779 "state": state,
780 "requestId": request_id,
781 });
782
783 self.send_unit_request(
784 &self.pending_member_state_requests,
785 request_id,
786 message,
787 "Member state update timed out".to_string(),
788 ).await
789 }
790
791 pub async fn clear_member_state(&self) -> Result<(), Error> {
792 if self.send_tx.lock().unwrap().is_none() {
793 return Err(Error::Room("Not connected to room".to_string()));
794 }
795
796 let request_id = Uuid::new_v4().to_string();
797 let message = json!({
798 "type": "member_state_clear",
799 "requestId": request_id,
800 });
801
802 self.send_unit_request(
803 &self.pending_member_state_requests,
804 request_id,
805 message,
806 "Member state clear timed out".to_string(),
807 ).await
808 }
809
810 pub async fn send_admin(
811 &self,
812 operation: &str,
813 member_id: &str,
814 payload: Option<Value>,
815 ) -> Result<(), Error> {
816 if self.send_tx.lock().unwrap().is_none() {
817 return Err(Error::Room("Not connected to room".to_string()));
818 }
819
820 let request_id = Uuid::new_v4().to_string();
821 let message = json!({
822 "type": "admin",
823 "operation": operation,
824 "memberId": member_id,
825 "payload": payload.unwrap_or_else(|| json!({})),
826 "requestId": request_id,
827 });
828
829 self.send_unit_request(
830 &self.pending_admin_requests,
831 request_id,
832 message,
833 format!("Admin '{}' timed out", operation),
834 ).await
835 }
836
837 pub async fn send_media(
838 &self,
839 operation: &str,
840 kind: &str,
841 payload: Option<Value>,
842 ) -> Result<(), Error> {
843 if self.send_tx.lock().unwrap().is_none() {
844 return Err(Error::Room("Not connected to room".to_string()));
845 }
846
847 let request_id = Uuid::new_v4().to_string();
848 let message = json!({
849 "type": "media",
850 "operation": operation,
851 "kind": kind,
852 "payload": payload.unwrap_or_else(|| json!({})),
853 "requestId": request_id,
854 });
855
856 self.send_unit_request(
857 &self.pending_media_requests,
858 request_id,
859 message,
860 format!("Media '{}:{}' timed out", operation, kind),
861 ).await
862 }
863
864 pub async fn switch_media_devices(&self, payload: Value) -> Result<(), Error> {
865 if let Some(device_id) = payload.get("audioInputId").and_then(Value::as_str) {
866 self.send_media("device", "audio", Some(json!({ "deviceId": device_id }))).await?;
867 }
868 if let Some(device_id) = payload.get("videoInputId").and_then(Value::as_str) {
869 self.send_media("device", "video", Some(json!({ "deviceId": device_id }))).await?;
870 }
871 if let Some(device_id) = payload.get("screenInputId").and_then(Value::as_str) {
872 self.send_media("device", "screen", Some(json!({ "deviceId": device_id }))).await?;
873 }
874 Ok(())
875 }
876
877 fn reject_all_pending(&self, message: &str) {
880 let error_msg = message.to_string();
881
882 for (_, tx) in self.pending_requests.lock().unwrap().drain() {
884 let _ = tx.send(Err(Error::Room(error_msg.clone())));
885 }
886
887 for (_, tx) in self.pending_signal_requests.lock().unwrap().drain() {
889 let _ = tx.send(Err(Error::Room(error_msg.clone())));
890 }
891
892 for (_, tx) in self.pending_admin_requests.lock().unwrap().drain() {
894 let _ = tx.send(Err(Error::Room(error_msg.clone())));
895 }
896
897 for (_, tx) in self.pending_member_state_requests.lock().unwrap().drain() {
899 let _ = tx.send(Err(Error::Room(error_msg.clone())));
900 }
901
902 for (_, tx) in self.pending_media_requests.lock().unwrap().drain() {
904 let _ = tx.send(Err(Error::Room(error_msg.clone())));
905 }
906 }
907
908 pub async fn destroy(self: &Arc<Self>) {
911 self.leave().await;
912
913 self.shared_state_handlers.lock().unwrap().clear();
915 self.player_state_handlers.lock().unwrap().clear();
916 self.message_handlers.lock().unwrap().clear();
917 self.error_handlers.lock().unwrap().clear();
918 self.kicked_handlers.lock().unwrap().clear();
919 self.member_sync_handlers.lock().unwrap().clear();
920 self.member_join_handlers.lock().unwrap().clear();
921 self.member_leave_handlers.lock().unwrap().clear();
922 self.member_state_handlers.lock().unwrap().clear();
923 self.signal_handlers.lock().unwrap().clear();
924 self.any_signal_handlers.lock().unwrap().clear();
925 self.media_track_handlers.lock().unwrap().clear();
926 self.media_track_removed_handlers.lock().unwrap().clear();
927 self.media_state_handlers.lock().unwrap().clear();
928 self.media_device_handlers.lock().unwrap().clear();
929 self.reconnect_handlers.lock().unwrap().clear();
930 self.connection_state_handlers.lock().unwrap().clear();
931 }
932
933 fn ws_url(&self) -> String {
936 let u = self
937 .base_url
938 .replace("https://", "wss://")
939 .replace("http://", "ws://");
940 format!(
941 "{}/api/room?namespace={}&id={}",
942 u,
943 urlencoding::encode(&self.namespace),
944 urlencoding::encode(&self.room_id)
945 )
946 }
947
948 fn ws_send(&self, msg: Value) {
950 let s = msg.to_string();
951 if let Some(tx) = self.send_tx.lock().unwrap().as_ref() {
952 let tx = tx.clone();
953 tokio::spawn(async move {
954 let _ = tx.send(RoomWsCommand::Send(s)).await;
955 });
956 }
957 }
958
959 #[cfg(test)]
960 pub(crate) fn attach_send_channel_for_testing(&self, tx: mpsc::Sender<RoomWsCommand>) {
961 *self.send_tx.lock().unwrap() = Some(tx);
962 }
963
964 #[cfg(test)]
965 pub(crate) fn handle_message_for_testing(&self, raw: &str) {
966 self.handle_message(raw);
967 }
968
969 async fn establish(&self) -> anyhow::Result<mpsc::Receiver<String>> {
970 use futures_util::{SinkExt, StreamExt};
971 use tokio_tungstenite::{connect_async, tungstenite::Message};
972
973 let (ws_stream, _response) = tokio::time::timeout(
974 std::time::Duration::from_millis(self.opts.connection_timeout_ms),
975 connect_async(self.ws_url()),
976 )
977 .await
978 .map_err(|_| anyhow::anyhow!(
979 "Room WebSocket connection timed out after {}ms. Is the server running?",
980 self.opts.connection_timeout_ms,
981 ))??;
982 let (mut write, mut read) = ws_stream.split();
983
984 let auth = json!({"type": "auth", "token": (self.token_fn)(), "sdkVersion": "0.2.2"});
986 write.send(Message::Text(auth.to_string().into())).await?;
987
988 let join_raw = if let Some(Ok(Message::Text(raw))) = read.next().await {
990 let raw_str: &str = &raw;
991 let resp: Value = serde_json::from_str(raw_str)?;
992 let t = resp["type"].as_str().unwrap_or("");
993 if t != "auth_success" && t != "auth_refreshed" {
994 anyhow::bail!("Room auth failed: {}", resp["message"]);
995 }
996
997 *self.current_user_id.lock().unwrap() = resp["userId"].as_str().map(|value| value.to_string());
998 *self.current_connection_id.lock().unwrap() = resp["connectionId"].as_str().map(|value| value.to_string());
999
1000 let join_msg = json!({
1002 "type": "join",
1003 "lastSharedState": *self.shared_state.read().unwrap(),
1004 "lastSharedVersion": *self.shared_version.read().unwrap(),
1005 "lastPlayerState": *self.player_state.read().unwrap(),
1006 "lastPlayerVersion": *self.player_version.read().unwrap(),
1007 });
1008 join_msg.to_string()
1009 } else {
1010 anyhow::bail!("Room: no auth response");
1011 };
1012
1013 let (write_tx, mut write_rx) = mpsc::channel::<RoomWsCommand>(128);
1015 *self.send_tx.lock().unwrap() = Some(write_tx.clone());
1016
1017 tokio::spawn(async move {
1019 while let Some(command) = write_rx.recv().await {
1020 match command {
1021 RoomWsCommand::Send(msg) => {
1022 if write.send(Message::Text(msg.into())).await.is_err() {
1023 break;
1024 }
1025 }
1026 RoomWsCommand::Close => {
1027 let _ = write.send(Message::Close(None)).await;
1028 break;
1029 }
1030 }
1031 }
1032 });
1033
1034 let _ = write_tx.send(RoomWsCommand::Send(join_raw)).await;
1036
1037 let htx = write_tx.clone();
1039 tokio::spawn(async move {
1040 loop {
1041 sleep(Duration::from_secs(30)).await;
1042 if htx
1043 .send(RoomWsCommand::Send(json!({"type":"ping"}).to_string()))
1044 .await
1045 .is_err()
1046 {
1047 break;
1048 }
1049 }
1050 });
1051
1052 let (msg_tx, msg_rx) = mpsc::channel::<String>(128);
1054 tokio::spawn(async move {
1055 while let Some(Ok(Message::Text(raw))) = read.next().await {
1056 let raw_str: String = raw.into();
1057 if msg_tx.send(raw_str).await.is_err() {
1058 break;
1059 }
1060 }
1061 });
1062
1063 Ok(msg_rx)
1064 }
1065
1066 fn handle_message(&self, raw: &str) {
1069 let msg: Value = match serde_json::from_str(raw) {
1070 Ok(v) => v,
1071 Err(_) => return,
1072 };
1073 let t = msg["type"].as_str().unwrap_or("");
1074
1075 match t {
1076 "sync" => self.handle_sync(&msg),
1077 "shared_delta" => self.handle_shared_delta(&msg),
1078 "player_delta" => self.handle_player_delta(&msg),
1079 "action_result" => self.handle_action_result(&msg),
1080 "action_error" => self.handle_action_error(&msg),
1081 "message" => self.handle_server_message(&msg),
1082 "signal" => self.handle_signal(&msg),
1083 "signal_sent" => self.resolve_pending_unit_request(&self.pending_signal_requests, &msg),
1084 "signal_error" => self.reject_pending_unit_request(&self.pending_signal_requests, &msg, "Signal error"),
1085 "members_sync" => self.handle_members_sync(&msg),
1086 "member_join" => self.handle_member_join(&msg),
1087 "member_leave" => self.handle_member_leave(&msg),
1088 "member_state" => self.handle_member_state(&msg),
1089 "member_state_error" => self.reject_pending_unit_request(&self.pending_member_state_requests, &msg, "Member state error"),
1090 "admin_result" => self.resolve_pending_unit_request(&self.pending_admin_requests, &msg),
1091 "admin_error" => self.reject_pending_unit_request(&self.pending_admin_requests, &msg, "Admin error"),
1092 "media_sync" => self.handle_media_sync(&msg),
1093 "media_track" => self.handle_media_track(&msg),
1094 "media_track_removed" => self.handle_media_track_removed(&msg),
1095 "media_state" => self.handle_media_state(&msg),
1096 "media_device" => self.handle_media_device(&msg),
1097 "media_result" => self.resolve_pending_unit_request(&self.pending_media_requests, &msg),
1098 "media_error" => self.reject_pending_unit_request(&self.pending_media_requests, &msg, "Media error"),
1099 "kicked" => self.handle_kicked(),
1100 "error" => self.handle_error(&msg),
1101 "pong" => {}
1102 _ => {}
1103 }
1104 }
1105
1106 fn handle_sync(&self, msg: &Value) {
1107 *self.shared_state.write().unwrap() = msg["sharedState"].clone();
1108 *self.shared_version.write().unwrap() = msg["sharedVersion"].as_u64().unwrap_or(0);
1109 *self.player_state.write().unwrap() = msg["playerState"].clone();
1110 *self.player_version.write().unwrap() = msg["playerVersion"].as_u64().unwrap_or(0);
1111 self.set_connection_state(ROOM_STATE_CONNECTED);
1112
1113 if let Some(info) = self.reconnect_info.write().unwrap().take() {
1114 for (_, handler) in self.reconnect_handlers.lock().unwrap().iter() {
1115 handler(&info);
1116 }
1117 }
1118
1119 let shared = self.shared_state.read().unwrap().clone();
1121 for (_, handler) in self.shared_state_handlers.lock().unwrap().iter() {
1122 handler(&shared, &shared);
1123 }
1124 let player = self.player_state.read().unwrap().clone();
1125 for (_, handler) in self.player_state_handlers.lock().unwrap().iter() {
1126 handler(&player, &player);
1127 }
1128 }
1129
1130 fn handle_shared_delta(&self, msg: &Value) {
1131 let delta = &msg["delta"];
1132 *self.shared_version.write().unwrap() = msg["version"].as_u64().unwrap_or(0);
1133
1134 if let Value::Object(map) = delta {
1135 let mut state = self.shared_state.write().unwrap();
1136 for (path, value) in map {
1137 deep_set(&mut state, path, value.clone());
1138 }
1139 }
1140
1141 let state = self.shared_state.read().unwrap().clone();
1142 for (_, handler) in self.shared_state_handlers.lock().unwrap().iter() {
1143 handler(&state, delta);
1144 }
1145 }
1146
1147 fn handle_player_delta(&self, msg: &Value) {
1148 let delta = &msg["delta"];
1149 *self.player_version.write().unwrap() = msg["version"].as_u64().unwrap_or(0);
1150
1151 if let Value::Object(map) = delta {
1152 let mut state = self.player_state.write().unwrap();
1153 for (path, value) in map {
1154 deep_set(&mut state, path, value.clone());
1155 }
1156 }
1157
1158 let state = self.player_state.read().unwrap().clone();
1159 for (_, handler) in self.player_state_handlers.lock().unwrap().iter() {
1160 handler(&state, delta);
1161 }
1162 }
1163
1164 fn handle_action_result(&self, msg: &Value) {
1165 let request_id = msg["requestId"].as_str().unwrap_or("");
1166 if let Some(tx) = self.pending_requests.lock().unwrap().remove(request_id) {
1167 let _ = tx.send(Ok(msg["result"].clone()));
1168 }
1169 }
1170
1171 fn handle_action_error(&self, msg: &Value) {
1172 let request_id = msg["requestId"].as_str().unwrap_or("");
1173 if let Some(tx) = self.pending_requests.lock().unwrap().remove(request_id) {
1174 let message = msg["message"].as_str().unwrap_or("Unknown error");
1175 let _ = tx.send(Err(Error::Room(message.to_string())));
1176 }
1177 }
1178
1179 fn handle_server_message(&self, msg: &Value) {
1180 let message_type = msg["messageType"].as_str().unwrap_or("");
1181 let data = &msg["data"];
1182
1183 let handlers = self.message_handlers.lock().unwrap();
1184 if let Some(list) = handlers.get(message_type) {
1185 for (_, handler) in list {
1186 handler(data);
1187 }
1188 }
1189 }
1190
1191 fn handle_signal(&self, msg: &Value) {
1192 let event = msg["event"].as_str().unwrap_or("");
1193 let payload = &msg["payload"];
1194 let meta = &msg["meta"];
1195
1196 if let Some(list) = self.signal_handlers.lock().unwrap().get(event) {
1197 for (_, handler) in list {
1198 handler(payload, meta);
1199 }
1200 }
1201
1202 for (_, handler) in self.any_signal_handlers.lock().unwrap().iter() {
1203 handler(event, payload, meta);
1204 }
1205 }
1206
1207 fn handle_members_sync(&self, msg: &Value) {
1208 let members = normalize_members(&msg["members"]);
1209 *self.members.write().unwrap() = members.clone();
1210 self.merge_members_into_media();
1211
1212 for (_, handler) in self.member_sync_handlers.lock().unwrap().iter() {
1213 handler(&members);
1214 }
1215 }
1216
1217 fn handle_member_join(&self, msg: &Value) {
1218 if let Some(member) = normalize_member(&msg["member"]) {
1219 self.upsert_member(member.clone());
1220 for (_, handler) in self.member_join_handlers.lock().unwrap().iter() {
1221 handler(&member);
1222 }
1223 }
1224 }
1225
1226 fn handle_member_leave(&self, msg: &Value) {
1227 if let Some(member) = normalize_member(&msg["member"]) {
1228 let member_id = member["memberId"].as_str().unwrap_or("");
1229 self.remove_member(member_id);
1230 let reason = msg["reason"].as_str().unwrap_or("leave");
1231 for (_, handler) in self.member_leave_handlers.lock().unwrap().iter() {
1232 handler(&member, reason);
1233 }
1234 }
1235 }
1236
1237 fn handle_member_state(&self, msg: &Value) {
1238 if let Some(mut member) = normalize_member(&msg["member"]) {
1239 let state = object_or_empty(&msg["state"]);
1240 member["state"] = state.clone();
1241 self.upsert_member(member.clone());
1242 self.resolve_pending_unit_request(&self.pending_member_state_requests, msg);
1243
1244 for (_, handler) in self.member_state_handlers.lock().unwrap().iter() {
1245 handler(&member, &state);
1246 }
1247 }
1248 }
1249
1250 fn handle_media_sync(&self, msg: &Value) {
1251 let media_members = normalize_media_members(&msg["members"]);
1252 *self.media_members.write().unwrap() = media_members.clone();
1253 self.merge_members_into_media();
1254 }
1255
1256 fn handle_media_track(&self, msg: &Value) {
1257 if let (Some(member), Some(track)) = (
1258 normalize_member(&msg["member"]),
1259 normalize_track(&msg["track"]),
1260 ) {
1261 self.upsert_media_track(&member, &track);
1262 for (_, handler) in self.media_track_handlers.lock().unwrap().iter() {
1263 handler(&track, &member);
1264 }
1265 }
1266 }
1267
1268 fn handle_media_track_removed(&self, msg: &Value) {
1269 if let (Some(member), Some(track)) = (
1270 normalize_member(&msg["member"]),
1271 normalize_track(&msg["track"]),
1272 ) {
1273 self.remove_media_track(&member, &track);
1274 for (_, handler) in self
1275 .media_track_removed_handlers
1276 .lock()
1277 .unwrap()
1278 .iter()
1279 {
1280 handler(&track, &member);
1281 }
1282 }
1283 }
1284
1285 fn handle_media_state(&self, msg: &Value) {
1286 if let Some(member) = normalize_member(&msg["member"]) {
1287 let state = object_or_empty(&msg["state"]);
1288 self.upsert_media_state(&member, state.clone());
1289 for (_, handler) in self.media_state_handlers.lock().unwrap().iter() {
1290 handler(&member, &state);
1291 }
1292 }
1293 }
1294
1295 fn handle_media_device(&self, msg: &Value) {
1296 if let Some(member) = normalize_member(&msg["member"]) {
1297 let change = json!({
1298 "kind": msg["kind"].clone(),
1299 "deviceId": msg["deviceId"].clone(),
1300 });
1301 self.apply_media_device_change(&member, &change);
1302 for (_, handler) in self.media_device_handlers.lock().unwrap().iter() {
1303 handler(&member, &change);
1304 }
1305 }
1306 }
1307
1308 fn handle_kicked(&self) {
1309 self.set_connection_state(ROOM_STATE_KICKED);
1310 *self.intentionally_left.lock().unwrap() = true;
1311 for (_, handler) in self.kicked_handlers.lock().unwrap().iter() {
1312 handler();
1313 }
1314 }
1315
1316 fn handle_error(&self, msg: &Value) {
1317 let code = msg["code"].as_str().unwrap_or("");
1318 let message = msg["message"].as_str().unwrap_or("");
1319 for (_, handler) in self.error_handlers.lock().unwrap().iter() {
1320 handler(code, message);
1321 }
1322 }
1323
1324 async fn send_unit_request(
1327 &self,
1328 pending: &Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
1329 request_id: String,
1330 message: Value,
1331 timeout_message: String,
1332 ) -> Result<(), Error> {
1333 let (tx, rx) = oneshot::channel::<Result<(), Error>>();
1334 pending.lock().unwrap().insert(request_id.clone(), tx);
1335 self.ws_send(message);
1336
1337 match timeout(Duration::from_millis(self.opts.send_timeout_ms), rx).await {
1338 Ok(Ok(result)) => result,
1339 Ok(Err(_)) => {
1340 pending.lock().unwrap().remove(&request_id);
1341 Err(Error::Room(
1342 "Room left while waiting for room control result".to_string(),
1343 ))
1344 }
1345 Err(_) => {
1346 pending.lock().unwrap().remove(&request_id);
1347 Err(Error::RoomTimeout(timeout_message))
1348 }
1349 }
1350 }
1351
1352 fn resolve_pending_unit_request(
1353 &self,
1354 pending: &Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
1355 msg: &Value,
1356 ) {
1357 let request_id = msg["requestId"].as_str().unwrap_or("");
1358 if let Some(tx) = pending.lock().unwrap().remove(request_id) {
1359 let _ = tx.send(Ok(()));
1360 }
1361 }
1362
1363 fn reject_pending_unit_request(
1364 &self,
1365 pending: &Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
1366 msg: &Value,
1367 fallback: &str,
1368 ) {
1369 let request_id = msg["requestId"].as_str().unwrap_or("");
1370 if let Some(tx) = pending.lock().unwrap().remove(request_id) {
1371 let message = msg["message"].as_str().unwrap_or(fallback);
1372 let _ = tx.send(Err(Error::Room(message.to_string())));
1373 }
1374 }
1375
1376 fn set_connection_state(&self, next: &str) {
1377 let mut state = self.connection_state.write().unwrap();
1378 if state.as_str() == next {
1379 return;
1380 }
1381 *state = next.to_string();
1382 drop(state);
1383
1384 for (_, handler) in self.connection_state_handlers.lock().unwrap().iter() {
1385 handler(next);
1386 }
1387 }
1388
1389 fn begin_reconnect_attempt(&self, attempt: u64) {
1390 *self.reconnect_info.write().unwrap() = Some(json!({ "attempt": attempt }));
1391 self.set_connection_state(ROOM_STATE_RECONNECTING);
1392 }
1393
1394 fn upsert_member(&self, member: Value) {
1395 let member_id = member["memberId"].as_str().unwrap_or("").to_string();
1396 if member_id.is_empty() {
1397 return;
1398 }
1399
1400 let mut members = self.members.write().unwrap();
1401 let list = members.as_array_mut().expect("members array");
1402 if let Some(existing) = list
1403 .iter_mut()
1404 .find(|entry| entry["memberId"].as_str() == Some(member_id.as_str()))
1405 {
1406 *existing = member;
1407 } else {
1408 list.push(member);
1409 }
1410 drop(members);
1411 self.merge_members_into_media();
1412 }
1413
1414 fn remove_member(&self, member_id: &str) {
1415 {
1416 let mut members = self.members.write().unwrap();
1417 if let Some(list) = members.as_array_mut() {
1418 list.retain(|entry| entry["memberId"].as_str() != Some(member_id));
1419 }
1420 }
1421 {
1422 let mut media_members = self.media_members.write().unwrap();
1423 if let Some(list) = media_members.as_array_mut() {
1424 list.retain(|entry| entry["member"]["memberId"].as_str() != Some(member_id));
1425 }
1426 }
1427 }
1428
1429 fn merge_members_into_media(&self) {
1430 let members = self.members.read().unwrap().clone();
1431 let mut media_members = self.media_members.write().unwrap();
1432 if let Some(list) = media_members.as_array_mut() {
1433 for media_member in list.iter_mut() {
1434 let member_id = media_member["member"]["memberId"].as_str().unwrap_or("");
1435 if let Some(member) = members
1436 .as_array()
1437 .and_then(|entries| {
1438 entries
1439 .iter()
1440 .find(|entry| entry["memberId"].as_str() == Some(member_id))
1441 })
1442 {
1443 media_member["member"] = member.clone();
1444 }
1445 }
1446 }
1447 }
1448
1449 fn ensure_media_member(&self, member: &Value) -> usize {
1450 self.upsert_member(member.clone());
1451 let member_id = member["memberId"].as_str().unwrap_or("");
1452 let mut media_members = self.media_members.write().unwrap();
1453 let list = media_members.as_array_mut().expect("media members array");
1454 if let Some(index) = list
1455 .iter()
1456 .position(|entry| entry["member"]["memberId"].as_str() == Some(member_id))
1457 {
1458 list[index]["member"] = member.clone();
1459 return index;
1460 }
1461
1462 list.push(json!({
1463 "member": member,
1464 "state": {},
1465 "tracks": [],
1466 }));
1467 list.len() - 1
1468 }
1469
1470 fn upsert_media_track(&self, member: &Value, track: &Value) {
1471 let index = self.ensure_media_member(member);
1472 let mut media_members = self.media_members.write().unwrap();
1473 let list = media_members.as_array_mut().expect("media members array");
1474 let tracks = list[index]["tracks"].as_array_mut().expect("tracks array");
1475 let kind = track["kind"].as_str().unwrap_or("");
1476 if let Some(existing) = tracks
1477 .iter_mut()
1478 .find(|entry| entry["kind"].as_str() == Some(kind))
1479 {
1480 *existing = track.clone();
1481 } else {
1482 tracks.push(track.clone());
1483 }
1484 apply_track_to_state(&mut list[index]["state"], track, true);
1485 }
1486
1487 fn remove_media_track(&self, member: &Value, track: &Value) {
1488 let index = self.ensure_media_member(member);
1489 let mut media_members = self.media_members.write().unwrap();
1490 let list = media_members.as_array_mut().expect("media members array");
1491 let kind = track["kind"].as_str().unwrap_or("");
1492 if let Some(tracks) = list[index]["tracks"].as_array_mut() {
1493 tracks.retain(|entry| entry["kind"].as_str() != Some(kind));
1494 }
1495 apply_track_to_state(&mut list[index]["state"], track, false);
1496 }
1497
1498 fn upsert_media_state(&self, member: &Value, state: Value) {
1499 let index = self.ensure_media_member(member);
1500 let mut media_members = self.media_members.write().unwrap();
1501 let list = media_members.as_array_mut().expect("media members array");
1502 list[index]["state"] = state;
1503 }
1504
1505 fn apply_media_device_change(&self, member: &Value, change: &Value) {
1506 let index = self.ensure_media_member(member);
1507 let mut media_members = self.media_members.write().unwrap();
1508 let list = media_members.as_array_mut().expect("media members array");
1509 let kind = change["kind"].as_str().unwrap_or("");
1510 let device_id = change["deviceId"].clone();
1511 if let Some(state) = list[index]["state"].as_object_mut() {
1512 let kind_state = state.entry(kind.to_string()).or_insert_with(|| json!({}));
1513 if let Some(kind_state_map) = kind_state.as_object_mut() {
1514 kind_state_map.insert("deviceId".to_string(), device_id);
1515 }
1516 }
1517 }
1518
1519 fn next_handler_id(&self) -> u64 {
1520 let mut counter = self.handler_id_counter.lock().unwrap();
1521 *counter += 1;
1522 *counter
1523 }
1524}
1525
1526fn deep_set(obj: &mut Value, path: &str, value: Value) {
1529 if let Some(dot) = path.find('.') {
1530 let head = &path[..dot];
1531 let tail = &path[dot + 1..];
1532 if let Value::Object(map) = obj {
1533 let nested = map.entry(head.to_string()).or_insert(json!({}));
1534 deep_set(nested, tail, value);
1535 }
1536 } else if let Value::Object(map) = obj {
1537 if value.is_null() {
1538 map.remove(path);
1539 } else {
1540 map.insert(path.to_string(), value);
1541 }
1542 }
1543}
1544
1545fn object_or_empty(value: &Value) -> Value {
1546 if value.is_object() {
1547 value.clone()
1548 } else {
1549 json!({})
1550 }
1551}
1552
1553fn normalize_member(value: &Value) -> Option<Value> {
1554 let member_id = value["memberId"].as_str()?;
1555 let user_id = value["userId"].as_str()?;
1556 let mut member = json!({
1557 "memberId": member_id,
1558 "userId": user_id,
1559 "state": object_or_empty(&value["state"]),
1560 });
1561
1562 if let Some(connection_id) = value["connectionId"].as_str() {
1563 member["connectionId"] = Value::String(connection_id.to_string());
1564 }
1565 if let Some(connection_count) = value["connectionCount"].as_u64() {
1566 member["connectionCount"] = Value::from(connection_count);
1567 }
1568 if let Some(role) = value["role"].as_str() {
1569 member["role"] = Value::String(role.to_string());
1570 }
1571 Some(member)
1572}
1573
1574fn normalize_members(value: &Value) -> Value {
1575 Value::Array(
1576 value
1577 .as_array()
1578 .into_iter()
1579 .flatten()
1580 .filter_map(normalize_member)
1581 .collect(),
1582 )
1583}
1584
1585fn normalize_track(value: &Value) -> Option<Value> {
1586 let kind = value["kind"].as_str()?;
1587 let mut track = json!({
1588 "kind": kind,
1589 "muted": value["muted"].as_bool().unwrap_or(false),
1590 });
1591
1592 if let Some(track_id) = value["trackId"].as_str() {
1593 track["trackId"] = Value::String(track_id.to_string());
1594 }
1595 if let Some(device_id) = value["deviceId"].as_str() {
1596 track["deviceId"] = Value::String(device_id.to_string());
1597 }
1598 if let Some(published_at) = value["publishedAt"].as_u64() {
1599 track["publishedAt"] = Value::from(published_at);
1600 }
1601 if let Some(admin_disabled) = value["adminDisabled"].as_bool() {
1602 track["adminDisabled"] = Value::Bool(admin_disabled);
1603 }
1604 Some(track)
1605}
1606
1607fn normalize_tracks(value: &Value) -> Value {
1608 Value::Array(
1609 value
1610 .as_array()
1611 .into_iter()
1612 .flatten()
1613 .filter_map(normalize_track)
1614 .collect(),
1615 )
1616}
1617
1618fn normalize_media_members(value: &Value) -> Value {
1619 Value::Array(
1620 value
1621 .as_array()
1622 .into_iter()
1623 .flatten()
1624 .filter_map(|entry| {
1625 let member = normalize_member(&entry["member"])?;
1626 Some(json!({
1627 "member": member,
1628 "state": object_or_empty(&entry["state"]),
1629 "tracks": normalize_tracks(&entry["tracks"]),
1630 }))
1631 })
1632 .collect(),
1633 )
1634}
1635
1636fn apply_track_to_state(state: &mut Value, track: &Value, published: bool) {
1637 if !state.is_object() {
1638 *state = json!({});
1639 }
1640
1641 let kind = match track["kind"].as_str() {
1642 Some(kind) => kind,
1643 None => return,
1644 };
1645
1646 let state_map = state.as_object_mut().expect("state object");
1647 let kind_state = state_map
1648 .entry(kind.to_string())
1649 .or_insert_with(|| json!({}));
1650 let kind_state_map = kind_state.as_object_mut().expect("kind state object");
1651 kind_state_map.insert(
1652 "published".to_string(),
1653 Value::Bool(published),
1654 );
1655 kind_state_map.insert(
1656 "muted".to_string(),
1657 Value::Bool(track["muted"].as_bool().unwrap_or(false)),
1658 );
1659
1660 if published {
1661 if let Some(track_id) = track.get("trackId") {
1662 kind_state_map.insert("trackId".to_string(), track_id.clone());
1663 }
1664 if let Some(device_id) = track.get("deviceId") {
1665 kind_state_map.insert("deviceId".to_string(), device_id.clone());
1666 }
1667 if let Some(published_at) = track.get("publishedAt") {
1668 kind_state_map.insert("publishedAt".to_string(), published_at.clone());
1669 }
1670 if let Some(admin_disabled) = track.get("adminDisabled") {
1671 kind_state_map.insert("adminDisabled".to_string(), admin_disabled.clone());
1672 }
1673 } else {
1674 kind_state_map.remove("trackId");
1675 kind_state_map.remove("publishedAt");
1676 if let Some(admin_disabled) = track.get("adminDisabled") {
1677 kind_state_map.insert("adminDisabled".to_string(), admin_disabled.clone());
1678 }
1679 }
1680}
1681
1682pub struct RoomStateNamespace {
1683 client: Arc<RoomClient>,
1684}
1685
1686impl RoomStateNamespace {
1687 fn new(client: Arc<RoomClient>) -> Self {
1688 Self { client }
1689 }
1690
1691 pub fn get_shared(&self) -> Value {
1692 self.client.get_shared_state()
1693 }
1694
1695 pub fn get_mine(&self) -> Value {
1696 self.client.get_player_state()
1697 }
1698
1699 pub fn on_shared_change(
1700 &self,
1701 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1702 ) -> Subscription {
1703 self.client.on_shared_state(handler)
1704 }
1705
1706 pub fn on_mine_change(
1707 &self,
1708 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1709 ) -> Subscription {
1710 self.client.on_player_state(handler)
1711 }
1712
1713 pub async fn send(&self, action_type: &str, payload: Option<Value>) -> Result<Value, Error> {
1714 self.client.send(action_type, payload).await
1715 }
1716}
1717
1718pub struct RoomMetaNamespace {
1719 client: Arc<RoomClient>,
1720}
1721
1722impl RoomMetaNamespace {
1723 fn new(client: Arc<RoomClient>) -> Self {
1724 Self { client }
1725 }
1726
1727 pub async fn get(&self) -> Result<Value, Error> {
1728 self.client.get_metadata().await
1729 }
1730}
1731
1732pub struct RoomSignalsNamespace {
1733 client: Arc<RoomClient>,
1734}
1735
1736impl RoomSignalsNamespace {
1737 fn new(client: Arc<RoomClient>) -> Self {
1738 Self { client }
1739 }
1740
1741 pub async fn send(
1742 &self,
1743 event: &str,
1744 payload: Option<Value>,
1745 options: Option<Value>,
1746 ) -> Result<(), Error> {
1747 self.client.send_signal(event, payload, options).await
1748 }
1749
1750 pub async fn send_to(
1751 &self,
1752 member_id: &str,
1753 event: &str,
1754 payload: Option<Value>,
1755 ) -> Result<(), Error> {
1756 self.client
1757 .send_signal(event, payload, Some(json!({ "memberId": member_id })))
1758 .await
1759 }
1760
1761 pub fn on(
1762 &self,
1763 event: &str,
1764 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1765 ) -> Subscription {
1766 self.client.on_signal(event, handler)
1767 }
1768
1769 pub fn on_any(
1770 &self,
1771 handler: impl Fn(&str, &Value, &Value) + Send + Sync + 'static,
1772 ) -> Subscription {
1773 self.client.on_any_signal(handler)
1774 }
1775}
1776
1777pub struct RoomMembersNamespace {
1778 client: Arc<RoomClient>,
1779}
1780
1781impl RoomMembersNamespace {
1782 fn new(client: Arc<RoomClient>) -> Self {
1783 Self { client }
1784 }
1785
1786 pub fn list(&self) -> Value {
1787 self.client.list_members()
1788 }
1789
1790 pub fn on_sync(
1791 &self,
1792 handler: impl Fn(&Value) + Send + Sync + 'static,
1793 ) -> Subscription {
1794 self.client.on_members_sync(handler)
1795 }
1796
1797 pub fn on_join(
1798 &self,
1799 handler: impl Fn(&Value) + Send + Sync + 'static,
1800 ) -> Subscription {
1801 self.client.on_member_join(handler)
1802 }
1803
1804 pub fn on_leave(
1805 &self,
1806 handler: impl Fn(&Value, &str) + Send + Sync + 'static,
1807 ) -> Subscription {
1808 self.client.on_member_leave(handler)
1809 }
1810
1811 pub async fn set_state(&self, state: Value) -> Result<(), Error> {
1812 self.client.send_member_state(state).await
1813 }
1814
1815 pub async fn clear_state(&self) -> Result<(), Error> {
1816 self.client.clear_member_state().await
1817 }
1818
1819 pub fn on_state_change(
1820 &self,
1821 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1822 ) -> Subscription {
1823 self.client.on_member_state_change(handler)
1824 }
1825}
1826
1827pub struct RoomAdminNamespace {
1828 client: Arc<RoomClient>,
1829}
1830
1831impl RoomAdminNamespace {
1832 fn new(client: Arc<RoomClient>) -> Self {
1833 Self { client }
1834 }
1835
1836 pub async fn kick(&self, member_id: &str) -> Result<(), Error> {
1837 self.client.send_admin("kick", member_id, None).await
1838 }
1839
1840 pub async fn mute(&self, member_id: &str) -> Result<(), Error> {
1841 self.client.send_admin("mute", member_id, None).await
1842 }
1843
1844 pub async fn block(&self, member_id: &str) -> Result<(), Error> {
1845 self.client.send_admin("block", member_id, None).await
1846 }
1847
1848 pub async fn set_role(&self, member_id: &str, role: &str) -> Result<(), Error> {
1849 self.client
1850 .send_admin("setRole", member_id, Some(json!({ "role": role })))
1851 .await
1852 }
1853
1854 pub async fn disable_video(&self, member_id: &str) -> Result<(), Error> {
1855 self.client.send_admin("disableVideo", member_id, None).await
1856 }
1857
1858 pub async fn stop_screen_share(&self, member_id: &str) -> Result<(), Error> {
1859 self.client
1860 .send_admin("stopScreenShare", member_id, None)
1861 .await
1862 }
1863}
1864
1865pub struct RoomMediaKindNamespace {
1866 client: Arc<RoomClient>,
1867 kind: &'static str,
1868}
1869
1870impl RoomMediaKindNamespace {
1871 fn new(client: Arc<RoomClient>, kind: &'static str) -> Self {
1872 Self { client, kind }
1873 }
1874
1875 pub async fn enable(&self, payload: Option<Value>) -> Result<(), Error> {
1876 self.client.send_media("publish", self.kind, payload).await
1877 }
1878
1879 pub async fn disable(&self) -> Result<(), Error> {
1880 self.client.send_media("unpublish", self.kind, None).await
1881 }
1882
1883 pub async fn set_muted(&self, muted: bool) -> Result<(), Error> {
1884 self.client
1885 .send_media("mute", self.kind, Some(json!({ "muted": muted })))
1886 .await
1887 }
1888}
1889
1890pub struct RoomScreenMediaNamespace {
1891 client: Arc<RoomClient>,
1892}
1893
1894impl RoomScreenMediaNamespace {
1895 fn new(client: Arc<RoomClient>) -> Self {
1896 Self { client }
1897 }
1898
1899 pub async fn start(&self, payload: Option<Value>) -> Result<(), Error> {
1900 self.client.send_media("publish", "screen", payload).await
1901 }
1902
1903 pub async fn stop(&self) -> Result<(), Error> {
1904 self.client.send_media("unpublish", "screen", None).await
1905 }
1906}
1907
1908pub struct RoomMediaDevicesNamespace {
1909 client: Arc<RoomClient>,
1910}
1911
1912impl RoomMediaDevicesNamespace {
1913 fn new(client: Arc<RoomClient>) -> Self {
1914 Self { client }
1915 }
1916
1917 pub async fn switch_inputs(&self, payload: Value) -> Result<(), Error> {
1918 self.client.switch_media_devices(payload).await
1919 }
1920}
1921
1922pub struct RoomMediaNamespace {
1923 client: Arc<RoomClient>,
1924}
1925
1926impl RoomMediaNamespace {
1927 fn new(client: Arc<RoomClient>) -> Self {
1928 Self { client }
1929 }
1930
1931 pub fn list(&self) -> Value {
1932 self.client.list_media_members()
1933 }
1934
1935 pub fn audio(&self) -> RoomMediaKindNamespace {
1936 RoomMediaKindNamespace::new(Arc::clone(&self.client), "audio")
1937 }
1938
1939 pub fn video(&self) -> RoomMediaKindNamespace {
1940 RoomMediaKindNamespace::new(Arc::clone(&self.client), "video")
1941 }
1942
1943 pub fn screen(&self) -> RoomScreenMediaNamespace {
1944 RoomScreenMediaNamespace::new(Arc::clone(&self.client))
1945 }
1946
1947 pub fn devices(&self) -> RoomMediaDevicesNamespace {
1948 RoomMediaDevicesNamespace::new(Arc::clone(&self.client))
1949 }
1950
1951 pub fn on_track(
1952 &self,
1953 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1954 ) -> Subscription {
1955 self.client.on_media_track(handler)
1956 }
1957
1958 pub fn on_track_removed(
1959 &self,
1960 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1961 ) -> Subscription {
1962 self.client.on_media_track_removed(handler)
1963 }
1964
1965 pub fn on_state_change(
1966 &self,
1967 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1968 ) -> Subscription {
1969 self.client.on_media_state_change(handler)
1970 }
1971
1972 pub fn on_device_change(
1973 &self,
1974 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1975 ) -> Subscription {
1976 self.client.on_media_device_change(handler)
1977 }
1978}
1979
1980pub struct RoomSessionNamespace {
1981 client: Arc<RoomClient>,
1982}
1983
1984impl RoomSessionNamespace {
1985 fn new(client: Arc<RoomClient>) -> Self {
1986 Self { client }
1987 }
1988
1989 pub fn on_error(
1990 &self,
1991 handler: impl Fn(&str, &str) + Send + Sync + 'static,
1992 ) -> Subscription {
1993 self.client.on_error(handler)
1994 }
1995
1996 pub fn on_kicked(&self, handler: impl Fn() + Send + Sync + 'static) -> Subscription {
1997 self.client.on_kicked(handler)
1998 }
1999
2000 pub fn on_reconnect(
2001 &self,
2002 handler: impl Fn(&Value) + Send + Sync + 'static,
2003 ) -> Subscription {
2004 self.client.on_reconnect(handler)
2005 }
2006
2007 pub fn on_connection_state_change(
2008 &self,
2009 handler: impl Fn(&str) + Send + Sync + 'static,
2010 ) -> Subscription {
2011 self.client.on_connection_state_change(handler)
2012 }
2013
2014 pub fn connection_state(&self) -> String {
2015 self.client.connection_state()
2016 }
2017
2018 pub fn user_id(&self) -> Option<String> {
2019 self.client.current_user_id.lock().unwrap().clone()
2020 }
2021
2022 pub fn connection_id(&self) -> Option<String> {
2023 self.client.current_connection_id.lock().unwrap().clone()
2024 }
2025}