1use crate::error::Error;
19use serde_json::{json, Value};
20use std::collections::HashMap;
21use std::sync::{Arc, Mutex, RwLock};
22use tokio::sync::{mpsc, oneshot};
23use tokio::time::{sleep, timeout, Duration};
24use uuid::Uuid;
25
26pub struct RoomOptions {
29 pub auto_reconnect: bool,
30 pub max_reconnect_attempts: u32,
31 pub reconnect_base_delay_ms: u64,
32 pub send_timeout_ms: u64,
34}
35
36impl Default for RoomOptions {
37 fn default() -> Self {
38 Self {
39 auto_reconnect: true,
40 max_reconnect_attempts: 10,
41 reconnect_base_delay_ms: 1000,
42 send_timeout_ms: 10_000,
43 }
44 }
45}
46
47pub struct Subscription {
51 remove_fn: Mutex<Option<Box<dyn FnOnce() + Send>>>,
52}
53
54impl Subscription {
55 fn new(remove_fn: impl FnOnce() + Send + 'static) -> Self {
56 Self {
57 remove_fn: Mutex::new(Some(Box::new(remove_fn))),
58 }
59 }
60
61 pub fn unsubscribe(self) {
63 if let Some(f) = self.remove_fn.lock().unwrap().take() {
64 f();
65 }
66 }
67}
68
69impl Drop for Subscription {
70 fn drop(&mut self) {
71 if let Some(f) = self.remove_fn.lock().unwrap().take() {
72 f();
73 }
74 }
75}
76
77type StateHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
80type MessageHandler = Arc<dyn Fn(&Value) + Send + Sync>;
81type ErrorHandler = Arc<dyn Fn(&str, &str) + Send + Sync>;
82type KickedHandler = Arc<dyn Fn() + Send + Sync>;
83type MembersSyncHandler = Arc<dyn Fn(&Value) + Send + Sync>;
84type MemberHandler = Arc<dyn Fn(&Value) + Send + Sync>;
85type MemberLeaveHandler = Arc<dyn Fn(&Value, &str) + Send + Sync>;
86type MemberStateHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
87type SignalHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
88type AnySignalHandler = Arc<dyn Fn(&str, &Value, &Value) + Send + Sync>;
89type MediaTrackHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
90type MediaStateHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
91type MediaDeviceHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
92type ReconnectHandler = Arc<dyn Fn(&Value) + Send + Sync>;
93type ConnectionStateHandler = Arc<dyn Fn(&str) + Send + Sync>;
94
95type HandlerList<H> = Arc<Mutex<Vec<(u64, H)>>>;
97
98pub(crate) enum RoomWsCommand {
99 Send(String),
100 Close,
101}
102
103const ROOM_EXPLICIT_LEAVE_CLOSE_DELAY: Duration = Duration::from_millis(40);
104const ROOM_STATE_IDLE: &str = "idle";
105const ROOM_STATE_CONNECTING: &str = "connecting";
106const ROOM_STATE_CONNECTED: &str = "connected";
107const ROOM_STATE_RECONNECTING: &str = "reconnecting";
108const ROOM_STATE_DISCONNECTED: &str = "disconnected";
109const ROOM_STATE_KICKED: &str = "kicked";
110
111pub struct RoomClient {
114 pub namespace: String,
116 pub room_id: String,
118
119 shared_state: RwLock<Value>,
121 shared_version: RwLock<u64>,
122 player_state: RwLock<Value>,
123 player_version: RwLock<u64>,
124 members: RwLock<Value>,
125 media_members: RwLock<Value>,
126 current_user_id: Mutex<Option<String>>,
127 current_connection_id: Mutex<Option<String>>,
128 connection_state: RwLock<String>,
129 reconnect_info: RwLock<Option<Value>>,
130
131 base_url: String,
133 token_fn: Box<dyn Fn() -> String + Send + Sync>,
134 opts: RoomOptions,
135
136 shared_state_handlers: HandlerList<StateHandler>,
138 player_state_handlers: HandlerList<StateHandler>,
139 message_handlers: Arc<Mutex<HashMap<String, Vec<(u64, MessageHandler)>>>>,
140 error_handlers: HandlerList<ErrorHandler>,
141 kicked_handlers: HandlerList<KickedHandler>,
142 member_sync_handlers: HandlerList<MembersSyncHandler>,
143 member_join_handlers: HandlerList<MemberHandler>,
144 member_leave_handlers: HandlerList<MemberLeaveHandler>,
145 member_state_handlers: HandlerList<MemberStateHandler>,
146 signal_handlers: Arc<Mutex<HashMap<String, Vec<(u64, SignalHandler)>>>>,
147 any_signal_handlers: HandlerList<AnySignalHandler>,
148 media_track_handlers: HandlerList<MediaTrackHandler>,
149 media_track_removed_handlers: HandlerList<MediaTrackHandler>,
150 media_state_handlers: HandlerList<MediaStateHandler>,
151 media_device_handlers: HandlerList<MediaDeviceHandler>,
152 reconnect_handlers: HandlerList<ReconnectHandler>,
153 connection_state_handlers: HandlerList<ConnectionStateHandler>,
154 handler_id_counter: Mutex<u64>,
155
156 pending_requests: Mutex<HashMap<String, oneshot::Sender<Result<Value, Error>>>>,
158 pending_signal_requests: Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
159 pending_admin_requests: Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
160 pending_member_state_requests: Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
161 pending_media_requests: Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
162
163 send_tx: Mutex<Option<mpsc::Sender<RoomWsCommand>>>,
165
166 stop_tx: Mutex<Option<mpsc::Sender<()>>>,
168 intentionally_left: Mutex<bool>,
169}
170
171impl RoomClient {
172 pub fn new(
181 base_url: &str,
182 namespace: &str,
183 room_id: &str,
184 token_fn: impl Fn() -> String + Send + Sync + 'static,
185 opts: Option<RoomOptions>,
186 ) -> Arc<Self> {
187 Arc::new(Self {
188 namespace: namespace.to_string(),
189 room_id: room_id.to_string(),
190 shared_state: RwLock::new(json!({})),
191 shared_version: RwLock::new(0),
192 player_state: RwLock::new(json!({})),
193 player_version: RwLock::new(0),
194 members: RwLock::new(json!([])),
195 media_members: RwLock::new(json!([])),
196 current_user_id: Mutex::new(None),
197 current_connection_id: Mutex::new(None),
198 connection_state: RwLock::new(ROOM_STATE_IDLE.to_string()),
199 reconnect_info: RwLock::new(None),
200 base_url: base_url.trim_end_matches('/').to_string(),
201 token_fn: Box::new(token_fn),
202 opts: opts.unwrap_or_default(),
203 shared_state_handlers: Arc::new(Mutex::new(vec![])),
204 player_state_handlers: Arc::new(Mutex::new(vec![])),
205 message_handlers: Arc::new(Mutex::new(HashMap::new())),
206 error_handlers: Arc::new(Mutex::new(vec![])),
207 kicked_handlers: Arc::new(Mutex::new(vec![])),
208 member_sync_handlers: Arc::new(Mutex::new(vec![])),
209 member_join_handlers: Arc::new(Mutex::new(vec![])),
210 member_leave_handlers: Arc::new(Mutex::new(vec![])),
211 member_state_handlers: Arc::new(Mutex::new(vec![])),
212 signal_handlers: Arc::new(Mutex::new(HashMap::new())),
213 any_signal_handlers: Arc::new(Mutex::new(vec![])),
214 media_track_handlers: Arc::new(Mutex::new(vec![])),
215 media_track_removed_handlers: Arc::new(Mutex::new(vec![])),
216 media_state_handlers: Arc::new(Mutex::new(vec![])),
217 media_device_handlers: Arc::new(Mutex::new(vec![])),
218 reconnect_handlers: Arc::new(Mutex::new(vec![])),
219 connection_state_handlers: Arc::new(Mutex::new(vec![])),
220 handler_id_counter: Mutex::new(0),
221 pending_requests: Mutex::new(HashMap::new()),
222 pending_signal_requests: Mutex::new(HashMap::new()),
223 pending_admin_requests: Mutex::new(HashMap::new()),
224 pending_member_state_requests: Mutex::new(HashMap::new()),
225 pending_media_requests: Mutex::new(HashMap::new()),
226 send_tx: Mutex::new(None),
227 stop_tx: Mutex::new(None),
228 intentionally_left: Mutex::new(false),
229 })
230 }
231
232 pub fn get_shared_state(&self) -> Value {
236 self.shared_state.read().unwrap().clone()
237 }
238
239 pub fn get_player_state(&self) -> Value {
241 self.player_state.read().unwrap().clone()
242 }
243
244 pub fn list_members(&self) -> Value {
246 self.members.read().unwrap().clone()
247 }
248
249 pub fn list_media_members(&self) -> Value {
251 self.media_members.read().unwrap().clone()
252 }
253
254 pub fn connection_state(&self) -> String {
256 self.connection_state.read().unwrap().clone()
257 }
258
259 pub fn state(self: &Arc<Self>) -> RoomStateNamespace {
260 RoomStateNamespace::new(Arc::clone(self))
261 }
262
263 pub fn meta(self: &Arc<Self>) -> RoomMetaNamespace {
264 RoomMetaNamespace::new(Arc::clone(self))
265 }
266
267 pub fn signals(self: &Arc<Self>) -> RoomSignalsNamespace {
268 RoomSignalsNamespace::new(Arc::clone(self))
269 }
270
271 pub fn members(self: &Arc<Self>) -> RoomMembersNamespace {
272 RoomMembersNamespace::new(Arc::clone(self))
273 }
274
275 pub fn admin(self: &Arc<Self>) -> RoomAdminNamespace {
276 RoomAdminNamespace::new(Arc::clone(self))
277 }
278
279 pub fn media(self: &Arc<Self>) -> RoomMediaNamespace {
280 RoomMediaNamespace::new(Arc::clone(self))
281 }
282
283 pub fn session(self: &Arc<Self>) -> RoomSessionNamespace {
284 RoomSessionNamespace::new(Arc::clone(self))
285 }
286
287 pub async fn get_metadata(&self) -> Result<Value, Error> {
292 Self::get_metadata_static(&self.base_url, &self.namespace, &self.room_id).await
293 }
294
295 pub async fn get_metadata_static(
298 base_url: &str,
299 namespace: &str,
300 room_id: &str,
301 ) -> Result<Value, Error> {
302 let url = format!(
303 "{}/api/room/metadata?namespace={}&id={}",
304 base_url.trim_end_matches('/'),
305 urlencoding::encode(namespace),
306 urlencoding::encode(room_id)
307 );
308 let resp = reqwest::get(&url)
309 .await
310 .map_err(|e| Error::Room(format!("Failed to get room metadata: {}", e)))?;
311 if !resp.status().is_success() {
312 return Err(Error::Room(format!(
313 "Failed to get room metadata: {}",
314 resp.status()
315 )));
316 }
317 let body = resp
318 .text()
319 .await
320 .map_err(|e| Error::Room(format!("Failed to read room metadata body: {}", e)))?;
321 serde_json::from_str(&body)
322 .map_err(|e| Error::Room(format!("Failed to parse room metadata: {}", e)))
323 }
324
325 pub async fn join(self: &Arc<Self>) -> Result<(), Error> {
329 *self.intentionally_left.lock().unwrap() = false;
330 self.set_connection_state(ROOM_STATE_CONNECTING);
331
332 let (stop_tx, mut stop_rx) = mpsc::channel::<()>(1);
333 *self.stop_tx.lock().unwrap() = Some(stop_tx);
334
335 let this = Arc::clone(self);
336 tokio::spawn(async move {
337 let mut attempts = 0u32;
338 loop {
339 match this.establish().await {
340 Ok(mut ws_rx) => {
341 attempts = 0;
342 loop {
343 tokio::select! {
344 _ = stop_rx.recv() => return,
345 msg = ws_rx.recv() => {
346 match msg {
347 Some(raw) => this.handle_message(&raw),
348 None => break, }
350 }
351 }
352 }
353 }
354 Err(_) => {}
355 }
356 if *this.intentionally_left.lock().unwrap() {
357 return;
358 }
359 if !this.opts.auto_reconnect || attempts >= this.opts.max_reconnect_attempts {
360 this.set_connection_state(ROOM_STATE_DISCONNECTED);
361 return;
362 }
363 let delay = (this.opts.reconnect_base_delay_ms * (1u64 << attempts)).min(30_000);
364 attempts += 1;
365 this.begin_reconnect_attempt(attempts as u64);
366 sleep(Duration::from_millis(delay)).await;
367 }
368 });
369 Ok(())
370 }
371
372 pub async fn leave(&self) {
374 *self.intentionally_left.lock().unwrap() = true;
375
376 let send_tx = self.send_tx.lock().unwrap().clone();
377 if let Some(tx) = send_tx.as_ref() {
378 let _ = tx
379 .send(RoomWsCommand::Send(
380 json!({"type": "leave"}).to_string(),
381 ))
382 .await;
383 sleep(ROOM_EXPLICIT_LEAVE_CLOSE_DELAY).await;
384 let _ = tx.send(RoomWsCommand::Close).await;
385 }
386
387 if let Some(tx) = self.stop_tx.lock().unwrap().take() {
388 let _ = tx.send(()).await;
389 }
390
391 *self.send_tx.lock().unwrap() = None;
392
393 self.pending_requests.lock().unwrap().clear();
395
396 *self.shared_state.write().unwrap() = json!({});
398 *self.shared_version.write().unwrap() = 0;
399 *self.player_state.write().unwrap() = json!({});
400 *self.player_version.write().unwrap() = 0;
401 *self.members.write().unwrap() = json!([]);
402 *self.media_members.write().unwrap() = json!([]);
403 *self.current_user_id.lock().unwrap() = None;
404 *self.current_connection_id.lock().unwrap() = None;
405 *self.reconnect_info.write().unwrap() = None;
406 self.pending_signal_requests.lock().unwrap().clear();
407 self.pending_admin_requests.lock().unwrap().clear();
408 self.pending_member_state_requests.lock().unwrap().clear();
409 self.pending_media_requests.lock().unwrap().clear();
410 self.set_connection_state(ROOM_STATE_IDLE);
411 }
412
413 pub async fn send(&self, action_type: &str, payload: Option<Value>) -> Result<Value, Error> {
423 if self.send_tx.lock().unwrap().is_none() {
424 return Err(Error::Room("Not connected to room".to_string()));
425 }
426
427 let request_id = Uuid::new_v4().to_string();
428 let (tx, rx) = oneshot::channel::<Result<Value, Error>>();
429
430 self.pending_requests
431 .lock()
432 .unwrap()
433 .insert(request_id.clone(), tx);
434
435 self.ws_send(json!({
436 "type": "send",
437 "actionType": action_type,
438 "payload": payload.unwrap_or(json!({})),
439 "requestId": request_id,
440 }));
441
442 let timeout_ms = self.opts.send_timeout_ms;
443 let action_type_owned = action_type.to_string();
444 let req_id = request_id.clone();
445
446 match timeout(Duration::from_millis(timeout_ms), rx).await {
447 Ok(Ok(result)) => result,
448 Ok(Err(_)) => {
449 self.pending_requests.lock().unwrap().remove(&req_id);
451 Err(Error::Room(
452 "Room left while waiting for action result".to_string(),
453 ))
454 }
455 Err(_) => {
456 self.pending_requests.lock().unwrap().remove(&req_id);
458 Err(Error::RoomTimeout(format!(
459 "Action '{}' timed out",
460 action_type_owned
461 )))
462 }
463 }
464 }
465
466 pub fn on_shared_state(
471 &self,
472 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
473 ) -> Subscription {
474 let id = self.next_handler_id();
475 let handler = Arc::new(handler) as StateHandler;
476 self.shared_state_handlers.lock().unwrap().push((id, handler));
477
478 let list = Arc::clone(&self.shared_state_handlers);
479 Subscription::new(move || {
480 list.lock().unwrap().retain(|(hid, _)| *hid != id);
481 })
482 }
483
484 pub fn on_player_state(
487 &self,
488 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
489 ) -> Subscription {
490 let id = self.next_handler_id();
491 let handler = Arc::new(handler) as StateHandler;
492 self.player_state_handlers.lock().unwrap().push((id, handler));
493
494 let list = Arc::clone(&self.player_state_handlers);
495 Subscription::new(move || {
496 list.lock().unwrap().retain(|(hid, _)| *hid != id);
497 })
498 }
499
500 pub fn on_message(
507 &self,
508 msg_type: &str,
509 handler: impl Fn(&Value) + Send + Sync + 'static,
510 ) -> Subscription {
511 let id = self.next_handler_id();
512 let handler = Arc::new(handler) as MessageHandler;
513 let msg_type = msg_type.to_string();
514 {
515 let mut map = self.message_handlers.lock().unwrap();
516 map.entry(msg_type.clone())
517 .or_insert_with(Vec::new)
518 .push((id, handler));
519 }
520
521 let map = Arc::clone(&self.message_handlers);
522 Subscription::new(move || {
523 if let Some(list) = map.lock().unwrap().get_mut(&msg_type) {
524 list.retain(|(hid, _)| *hid != id);
525 }
526 })
527 }
528
529 pub fn on_error(
531 &self,
532 handler: impl Fn(&str, &str) + Send + Sync + 'static,
533 ) -> Subscription {
534 let id = self.next_handler_id();
535 let handler = Arc::new(handler) as ErrorHandler;
536 self.error_handlers.lock().unwrap().push((id, handler));
537
538 let list = Arc::clone(&self.error_handlers);
539 Subscription::new(move || {
540 list.lock().unwrap().retain(|(hid, _)| *hid != id);
541 })
542 }
543
544 pub fn on_kicked(&self, handler: impl Fn() + Send + Sync + 'static) -> Subscription {
546 let id = self.next_handler_id();
547 let handler = Arc::new(handler) as KickedHandler;
548 self.kicked_handlers.lock().unwrap().push((id, handler));
549
550 let list = Arc::clone(&self.kicked_handlers);
551 Subscription::new(move || {
552 list.lock().unwrap().retain(|(hid, _)| *hid != id);
553 })
554 }
555
556 pub fn on_members_sync(
557 &self,
558 handler: impl Fn(&Value) + Send + Sync + 'static,
559 ) -> Subscription {
560 let id = self.next_handler_id();
561 let handler = Arc::new(handler) as MembersSyncHandler;
562 self.member_sync_handlers.lock().unwrap().push((id, handler));
563
564 let list = Arc::clone(&self.member_sync_handlers);
565 Subscription::new(move || {
566 list.lock().unwrap().retain(|(hid, _)| *hid != id);
567 })
568 }
569
570 pub fn on_member_join(
571 &self,
572 handler: impl Fn(&Value) + Send + Sync + 'static,
573 ) -> Subscription {
574 let id = self.next_handler_id();
575 let handler = Arc::new(handler) as MemberHandler;
576 self.member_join_handlers.lock().unwrap().push((id, handler));
577
578 let list = Arc::clone(&self.member_join_handlers);
579 Subscription::new(move || {
580 list.lock().unwrap().retain(|(hid, _)| *hid != id);
581 })
582 }
583
584 pub fn on_member_leave(
585 &self,
586 handler: impl Fn(&Value, &str) + Send + Sync + 'static,
587 ) -> Subscription {
588 let id = self.next_handler_id();
589 let handler = Arc::new(handler) as MemberLeaveHandler;
590 self.member_leave_handlers.lock().unwrap().push((id, handler));
591
592 let list = Arc::clone(&self.member_leave_handlers);
593 Subscription::new(move || {
594 list.lock().unwrap().retain(|(hid, _)| *hid != id);
595 })
596 }
597
598 pub fn on_member_state_change(
599 &self,
600 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
601 ) -> Subscription {
602 let id = self.next_handler_id();
603 let handler = Arc::new(handler) as MemberStateHandler;
604 self.member_state_handlers.lock().unwrap().push((id, handler));
605
606 let list = Arc::clone(&self.member_state_handlers);
607 Subscription::new(move || {
608 list.lock().unwrap().retain(|(hid, _)| *hid != id);
609 })
610 }
611
612 pub fn on_signal(
613 &self,
614 event: &str,
615 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
616 ) -> Subscription {
617 let id = self.next_handler_id();
618 let handler = Arc::new(handler) as SignalHandler;
619 let event = event.to_string();
620 {
621 let mut map = self.signal_handlers.lock().unwrap();
622 map.entry(event.clone())
623 .or_insert_with(Vec::new)
624 .push((id, handler));
625 }
626
627 let map = Arc::clone(&self.signal_handlers);
628 Subscription::new(move || {
629 if let Some(list) = map.lock().unwrap().get_mut(&event) {
630 list.retain(|(hid, _)| *hid != id);
631 }
632 })
633 }
634
635 pub fn on_any_signal(
636 &self,
637 handler: impl Fn(&str, &Value, &Value) + Send + Sync + 'static,
638 ) -> Subscription {
639 let id = self.next_handler_id();
640 let handler = Arc::new(handler) as AnySignalHandler;
641 self.any_signal_handlers.lock().unwrap().push((id, handler));
642
643 let list = Arc::clone(&self.any_signal_handlers);
644 Subscription::new(move || {
645 list.lock().unwrap().retain(|(hid, _)| *hid != id);
646 })
647 }
648
649 pub fn on_media_track(
650 &self,
651 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
652 ) -> Subscription {
653 let id = self.next_handler_id();
654 let handler = Arc::new(handler) as MediaTrackHandler;
655 self.media_track_handlers.lock().unwrap().push((id, handler));
656
657 let list = Arc::clone(&self.media_track_handlers);
658 Subscription::new(move || {
659 list.lock().unwrap().retain(|(hid, _)| *hid != id);
660 })
661 }
662
663 pub fn on_media_track_removed(
664 &self,
665 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
666 ) -> Subscription {
667 let id = self.next_handler_id();
668 let handler = Arc::new(handler) as MediaTrackHandler;
669 self.media_track_removed_handlers
670 .lock()
671 .unwrap()
672 .push((id, handler));
673
674 let list = Arc::clone(&self.media_track_removed_handlers);
675 Subscription::new(move || {
676 list.lock().unwrap().retain(|(hid, _)| *hid != id);
677 })
678 }
679
680 pub fn on_media_state_change(
681 &self,
682 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
683 ) -> Subscription {
684 let id = self.next_handler_id();
685 let handler = Arc::new(handler) as MediaStateHandler;
686 self.media_state_handlers.lock().unwrap().push((id, handler));
687
688 let list = Arc::clone(&self.media_state_handlers);
689 Subscription::new(move || {
690 list.lock().unwrap().retain(|(hid, _)| *hid != id);
691 })
692 }
693
694 pub fn on_media_device_change(
695 &self,
696 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
697 ) -> Subscription {
698 let id = self.next_handler_id();
699 let handler = Arc::new(handler) as MediaDeviceHandler;
700 self.media_device_handlers.lock().unwrap().push((id, handler));
701
702 let list = Arc::clone(&self.media_device_handlers);
703 Subscription::new(move || {
704 list.lock().unwrap().retain(|(hid, _)| *hid != id);
705 })
706 }
707
708 pub fn on_reconnect(
709 &self,
710 handler: impl Fn(&Value) + Send + Sync + 'static,
711 ) -> Subscription {
712 let id = self.next_handler_id();
713 let handler = Arc::new(handler) as ReconnectHandler;
714 self.reconnect_handlers.lock().unwrap().push((id, handler));
715
716 let list = Arc::clone(&self.reconnect_handlers);
717 Subscription::new(move || {
718 list.lock().unwrap().retain(|(hid, _)| *hid != id);
719 })
720 }
721
722 pub fn on_connection_state_change(
723 &self,
724 handler: impl Fn(&str) + Send + Sync + 'static,
725 ) -> Subscription {
726 let id = self.next_handler_id();
727 let handler = Arc::new(handler) as ConnectionStateHandler;
728 self.connection_state_handlers
729 .lock()
730 .unwrap()
731 .push((id, handler));
732
733 let list = Arc::clone(&self.connection_state_handlers);
734 Subscription::new(move || {
735 list.lock().unwrap().retain(|(hid, _)| *hid != id);
736 })
737 }
738
739 pub async fn send_signal(
740 &self,
741 event: &str,
742 payload: Option<Value>,
743 options: Option<Value>,
744 ) -> Result<(), Error> {
745 if self.send_tx.lock().unwrap().is_none() {
746 return Err(Error::Room("Not connected to room".to_string()));
747 }
748
749 let request_id = Uuid::new_v4().to_string();
750 let options = options.unwrap_or_else(|| json!({}));
751 let message = json!({
752 "type": "signal",
753 "event": event,
754 "payload": payload.unwrap_or_else(|| json!({})),
755 "requestId": request_id,
756 "memberId": options.get("memberId").cloned().unwrap_or(Value::Null),
757 "includeSelf": options.get("includeSelf").and_then(Value::as_bool).unwrap_or(false),
758 });
759
760 self.send_unit_request(&self.pending_signal_requests, request_id, message, format!("Signal '{}' timed out", event)).await
761 }
762
763 pub async fn send_member_state(&self, state: Value) -> Result<(), Error> {
764 if self.send_tx.lock().unwrap().is_none() {
765 return Err(Error::Room("Not connected to room".to_string()));
766 }
767
768 let request_id = Uuid::new_v4().to_string();
769 let message = json!({
770 "type": "member_state",
771 "state": state,
772 "requestId": request_id,
773 });
774
775 self.send_unit_request(
776 &self.pending_member_state_requests,
777 request_id,
778 message,
779 "Member state update timed out".to_string(),
780 ).await
781 }
782
783 pub async fn clear_member_state(&self) -> Result<(), Error> {
784 if self.send_tx.lock().unwrap().is_none() {
785 return Err(Error::Room("Not connected to room".to_string()));
786 }
787
788 let request_id = Uuid::new_v4().to_string();
789 let message = json!({
790 "type": "member_state_clear",
791 "requestId": request_id,
792 });
793
794 self.send_unit_request(
795 &self.pending_member_state_requests,
796 request_id,
797 message,
798 "Member state clear timed out".to_string(),
799 ).await
800 }
801
802 pub async fn send_admin(
803 &self,
804 operation: &str,
805 member_id: &str,
806 payload: Option<Value>,
807 ) -> Result<(), Error> {
808 if self.send_tx.lock().unwrap().is_none() {
809 return Err(Error::Room("Not connected to room".to_string()));
810 }
811
812 let request_id = Uuid::new_v4().to_string();
813 let message = json!({
814 "type": "admin",
815 "operation": operation,
816 "memberId": member_id,
817 "payload": payload.unwrap_or_else(|| json!({})),
818 "requestId": request_id,
819 });
820
821 self.send_unit_request(
822 &self.pending_admin_requests,
823 request_id,
824 message,
825 format!("Admin '{}' timed out", operation),
826 ).await
827 }
828
829 pub async fn send_media(
830 &self,
831 operation: &str,
832 kind: &str,
833 payload: Option<Value>,
834 ) -> Result<(), Error> {
835 if self.send_tx.lock().unwrap().is_none() {
836 return Err(Error::Room("Not connected to room".to_string()));
837 }
838
839 let request_id = Uuid::new_v4().to_string();
840 let message = json!({
841 "type": "media",
842 "operation": operation,
843 "kind": kind,
844 "payload": payload.unwrap_or_else(|| json!({})),
845 "requestId": request_id,
846 });
847
848 self.send_unit_request(
849 &self.pending_media_requests,
850 request_id,
851 message,
852 format!("Media '{}:{}' timed out", operation, kind),
853 ).await
854 }
855
856 pub async fn switch_media_devices(&self, payload: Value) -> Result<(), Error> {
857 if let Some(device_id) = payload.get("audioInputId").and_then(Value::as_str) {
858 self.send_media("device", "audio", Some(json!({ "deviceId": device_id }))).await?;
859 }
860 if let Some(device_id) = payload.get("videoInputId").and_then(Value::as_str) {
861 self.send_media("device", "video", Some(json!({ "deviceId": device_id }))).await?;
862 }
863 if let Some(device_id) = payload.get("screenInputId").and_then(Value::as_str) {
864 self.send_media("device", "screen", Some(json!({ "deviceId": device_id }))).await?;
865 }
866 Ok(())
867 }
868
869 fn ws_url(&self) -> String {
872 let u = self
873 .base_url
874 .replace("https://", "wss://")
875 .replace("http://", "ws://");
876 format!(
877 "{}/api/room?namespace={}&id={}",
878 u,
879 urlencoding::encode(&self.namespace),
880 urlencoding::encode(&self.room_id)
881 )
882 }
883
884 fn ws_send(&self, msg: Value) {
886 let s = msg.to_string();
887 if let Some(tx) = self.send_tx.lock().unwrap().as_ref() {
888 let tx = tx.clone();
889 tokio::spawn(async move {
890 let _ = tx.send(RoomWsCommand::Send(s)).await;
891 });
892 }
893 }
894
895 #[cfg(test)]
896 pub(crate) fn attach_send_channel_for_testing(&self, tx: mpsc::Sender<RoomWsCommand>) {
897 *self.send_tx.lock().unwrap() = Some(tx);
898 }
899
900 #[cfg(test)]
901 pub(crate) fn handle_message_for_testing(&self, raw: &str) {
902 self.handle_message(raw);
903 }
904
905 async fn establish(&self) -> anyhow::Result<mpsc::Receiver<String>> {
906 use futures_util::{SinkExt, StreamExt};
907 use tokio_tungstenite::{connect_async, tungstenite::Message};
908
909 let (ws_stream, _response) = connect_async(self.ws_url()).await?;
910 let (mut write, mut read) = ws_stream.split();
911
912 let auth = json!({"type": "auth", "token": (self.token_fn)(), "sdkVersion": "0.1.3"});
914 write.send(Message::Text(auth.to_string().into())).await?;
915
916 let join_raw = if let Some(Ok(Message::Text(raw))) = read.next().await {
918 let raw_str: &str = &raw;
919 let resp: Value = serde_json::from_str(raw_str)?;
920 let t = resp["type"].as_str().unwrap_or("");
921 if t != "auth_success" && t != "auth_refreshed" {
922 anyhow::bail!("Room auth failed: {}", resp["message"]);
923 }
924
925 *self.current_user_id.lock().unwrap() = resp["userId"].as_str().map(|value| value.to_string());
926 *self.current_connection_id.lock().unwrap() = resp["connectionId"].as_str().map(|value| value.to_string());
927
928 let join_msg = json!({
930 "type": "join",
931 "lastSharedState": *self.shared_state.read().unwrap(),
932 "lastSharedVersion": *self.shared_version.read().unwrap(),
933 "lastPlayerState": *self.player_state.read().unwrap(),
934 "lastPlayerVersion": *self.player_version.read().unwrap(),
935 });
936 join_msg.to_string()
937 } else {
938 anyhow::bail!("Room: no auth response");
939 };
940
941 let (write_tx, mut write_rx) = mpsc::channel::<RoomWsCommand>(128);
943 *self.send_tx.lock().unwrap() = Some(write_tx.clone());
944
945 tokio::spawn(async move {
947 while let Some(command) = write_rx.recv().await {
948 match command {
949 RoomWsCommand::Send(msg) => {
950 if write.send(Message::Text(msg.into())).await.is_err() {
951 break;
952 }
953 }
954 RoomWsCommand::Close => {
955 let _ = write.send(Message::Close(None)).await;
956 break;
957 }
958 }
959 }
960 });
961
962 let _ = write_tx.send(RoomWsCommand::Send(join_raw)).await;
964
965 let htx = write_tx.clone();
967 tokio::spawn(async move {
968 loop {
969 sleep(Duration::from_secs(30)).await;
970 if htx
971 .send(RoomWsCommand::Send(json!({"type":"ping"}).to_string()))
972 .await
973 .is_err()
974 {
975 break;
976 }
977 }
978 });
979
980 let (msg_tx, msg_rx) = mpsc::channel::<String>(128);
982 tokio::spawn(async move {
983 while let Some(Ok(Message::Text(raw))) = read.next().await {
984 let raw_str: String = raw.into();
985 if msg_tx.send(raw_str).await.is_err() {
986 break;
987 }
988 }
989 });
990
991 Ok(msg_rx)
992 }
993
994 fn handle_message(&self, raw: &str) {
997 let msg: Value = match serde_json::from_str(raw) {
998 Ok(v) => v,
999 Err(_) => return,
1000 };
1001 let t = msg["type"].as_str().unwrap_or("");
1002
1003 match t {
1004 "sync" => self.handle_sync(&msg),
1005 "shared_delta" => self.handle_shared_delta(&msg),
1006 "player_delta" => self.handle_player_delta(&msg),
1007 "action_result" => self.handle_action_result(&msg),
1008 "action_error" => self.handle_action_error(&msg),
1009 "message" => self.handle_server_message(&msg),
1010 "signal" => self.handle_signal(&msg),
1011 "signal_sent" => self.resolve_pending_unit_request(&self.pending_signal_requests, &msg),
1012 "signal_error" => self.reject_pending_unit_request(&self.pending_signal_requests, &msg, "Signal error"),
1013 "members_sync" => self.handle_members_sync(&msg),
1014 "member_join" => self.handle_member_join(&msg),
1015 "member_leave" => self.handle_member_leave(&msg),
1016 "member_state" => self.handle_member_state(&msg),
1017 "member_state_error" => self.reject_pending_unit_request(&self.pending_member_state_requests, &msg, "Member state error"),
1018 "admin_result" => self.resolve_pending_unit_request(&self.pending_admin_requests, &msg),
1019 "admin_error" => self.reject_pending_unit_request(&self.pending_admin_requests, &msg, "Admin error"),
1020 "media_sync" => self.handle_media_sync(&msg),
1021 "media_track" => self.handle_media_track(&msg),
1022 "media_track_removed" => self.handle_media_track_removed(&msg),
1023 "media_state" => self.handle_media_state(&msg),
1024 "media_device" => self.handle_media_device(&msg),
1025 "media_result" => self.resolve_pending_unit_request(&self.pending_media_requests, &msg),
1026 "media_error" => self.reject_pending_unit_request(&self.pending_media_requests, &msg, "Media error"),
1027 "kicked" => self.handle_kicked(),
1028 "error" => self.handle_error(&msg),
1029 "pong" => {}
1030 _ => {}
1031 }
1032 }
1033
1034 fn handle_sync(&self, msg: &Value) {
1035 *self.shared_state.write().unwrap() = msg["sharedState"].clone();
1036 *self.shared_version.write().unwrap() = msg["sharedVersion"].as_u64().unwrap_or(0);
1037 *self.player_state.write().unwrap() = msg["playerState"].clone();
1038 *self.player_version.write().unwrap() = msg["playerVersion"].as_u64().unwrap_or(0);
1039 self.set_connection_state(ROOM_STATE_CONNECTED);
1040
1041 if let Some(info) = self.reconnect_info.write().unwrap().take() {
1042 for (_, handler) in self.reconnect_handlers.lock().unwrap().iter() {
1043 handler(&info);
1044 }
1045 }
1046
1047 let shared = self.shared_state.read().unwrap().clone();
1049 for (_, handler) in self.shared_state_handlers.lock().unwrap().iter() {
1050 handler(&shared, &shared);
1051 }
1052 let player = self.player_state.read().unwrap().clone();
1053 for (_, handler) in self.player_state_handlers.lock().unwrap().iter() {
1054 handler(&player, &player);
1055 }
1056 }
1057
1058 fn handle_shared_delta(&self, msg: &Value) {
1059 let delta = &msg["delta"];
1060 *self.shared_version.write().unwrap() = msg["version"].as_u64().unwrap_or(0);
1061
1062 if let Value::Object(map) = delta {
1063 let mut state = self.shared_state.write().unwrap();
1064 for (path, value) in map {
1065 deep_set(&mut state, path, value.clone());
1066 }
1067 }
1068
1069 let state = self.shared_state.read().unwrap().clone();
1070 for (_, handler) in self.shared_state_handlers.lock().unwrap().iter() {
1071 handler(&state, delta);
1072 }
1073 }
1074
1075 fn handle_player_delta(&self, msg: &Value) {
1076 let delta = &msg["delta"];
1077 *self.player_version.write().unwrap() = msg["version"].as_u64().unwrap_or(0);
1078
1079 if let Value::Object(map) = delta {
1080 let mut state = self.player_state.write().unwrap();
1081 for (path, value) in map {
1082 deep_set(&mut state, path, value.clone());
1083 }
1084 }
1085
1086 let state = self.player_state.read().unwrap().clone();
1087 for (_, handler) in self.player_state_handlers.lock().unwrap().iter() {
1088 handler(&state, delta);
1089 }
1090 }
1091
1092 fn handle_action_result(&self, msg: &Value) {
1093 let request_id = msg["requestId"].as_str().unwrap_or("");
1094 if let Some(tx) = self.pending_requests.lock().unwrap().remove(request_id) {
1095 let _ = tx.send(Ok(msg["result"].clone()));
1096 }
1097 }
1098
1099 fn handle_action_error(&self, msg: &Value) {
1100 let request_id = msg["requestId"].as_str().unwrap_or("");
1101 if let Some(tx) = self.pending_requests.lock().unwrap().remove(request_id) {
1102 let message = msg["message"].as_str().unwrap_or("Unknown error");
1103 let _ = tx.send(Err(Error::Room(message.to_string())));
1104 }
1105 }
1106
1107 fn handle_server_message(&self, msg: &Value) {
1108 let message_type = msg["messageType"].as_str().unwrap_or("");
1109 let data = &msg["data"];
1110
1111 let handlers = self.message_handlers.lock().unwrap();
1112 if let Some(list) = handlers.get(message_type) {
1113 for (_, handler) in list {
1114 handler(data);
1115 }
1116 }
1117 }
1118
1119 fn handle_signal(&self, msg: &Value) {
1120 let event = msg["event"].as_str().unwrap_or("");
1121 let payload = &msg["payload"];
1122 let meta = &msg["meta"];
1123
1124 if let Some(list) = self.signal_handlers.lock().unwrap().get(event) {
1125 for (_, handler) in list {
1126 handler(payload, meta);
1127 }
1128 }
1129
1130 for (_, handler) in self.any_signal_handlers.lock().unwrap().iter() {
1131 handler(event, payload, meta);
1132 }
1133 }
1134
1135 fn handle_members_sync(&self, msg: &Value) {
1136 let members = normalize_members(&msg["members"]);
1137 *self.members.write().unwrap() = members.clone();
1138 self.merge_members_into_media();
1139
1140 for (_, handler) in self.member_sync_handlers.lock().unwrap().iter() {
1141 handler(&members);
1142 }
1143 }
1144
1145 fn handle_member_join(&self, msg: &Value) {
1146 if let Some(member) = normalize_member(&msg["member"]) {
1147 self.upsert_member(member.clone());
1148 for (_, handler) in self.member_join_handlers.lock().unwrap().iter() {
1149 handler(&member);
1150 }
1151 }
1152 }
1153
1154 fn handle_member_leave(&self, msg: &Value) {
1155 if let Some(member) = normalize_member(&msg["member"]) {
1156 let member_id = member["memberId"].as_str().unwrap_or("");
1157 self.remove_member(member_id);
1158 let reason = msg["reason"].as_str().unwrap_or("leave");
1159 for (_, handler) in self.member_leave_handlers.lock().unwrap().iter() {
1160 handler(&member, reason);
1161 }
1162 }
1163 }
1164
1165 fn handle_member_state(&self, msg: &Value) {
1166 if let Some(mut member) = normalize_member(&msg["member"]) {
1167 let state = object_or_empty(&msg["state"]);
1168 member["state"] = state.clone();
1169 self.upsert_member(member.clone());
1170 self.resolve_pending_unit_request(&self.pending_member_state_requests, msg);
1171
1172 for (_, handler) in self.member_state_handlers.lock().unwrap().iter() {
1173 handler(&member, &state);
1174 }
1175 }
1176 }
1177
1178 fn handle_media_sync(&self, msg: &Value) {
1179 let media_members = normalize_media_members(&msg["members"]);
1180 *self.media_members.write().unwrap() = media_members.clone();
1181 self.merge_members_into_media();
1182 }
1183
1184 fn handle_media_track(&self, msg: &Value) {
1185 if let (Some(member), Some(track)) = (
1186 normalize_member(&msg["member"]),
1187 normalize_track(&msg["track"]),
1188 ) {
1189 self.upsert_media_track(&member, &track);
1190 for (_, handler) in self.media_track_handlers.lock().unwrap().iter() {
1191 handler(&track, &member);
1192 }
1193 }
1194 }
1195
1196 fn handle_media_track_removed(&self, msg: &Value) {
1197 if let (Some(member), Some(track)) = (
1198 normalize_member(&msg["member"]),
1199 normalize_track(&msg["track"]),
1200 ) {
1201 self.remove_media_track(&member, &track);
1202 for (_, handler) in self
1203 .media_track_removed_handlers
1204 .lock()
1205 .unwrap()
1206 .iter()
1207 {
1208 handler(&track, &member);
1209 }
1210 }
1211 }
1212
1213 fn handle_media_state(&self, msg: &Value) {
1214 if let Some(member) = normalize_member(&msg["member"]) {
1215 let state = object_or_empty(&msg["state"]);
1216 self.upsert_media_state(&member, state.clone());
1217 for (_, handler) in self.media_state_handlers.lock().unwrap().iter() {
1218 handler(&member, &state);
1219 }
1220 }
1221 }
1222
1223 fn handle_media_device(&self, msg: &Value) {
1224 if let Some(member) = normalize_member(&msg["member"]) {
1225 let change = json!({
1226 "kind": msg["kind"].clone(),
1227 "deviceId": msg["deviceId"].clone(),
1228 });
1229 self.apply_media_device_change(&member, &change);
1230 for (_, handler) in self.media_device_handlers.lock().unwrap().iter() {
1231 handler(&member, &change);
1232 }
1233 }
1234 }
1235
1236 fn handle_kicked(&self) {
1237 self.set_connection_state(ROOM_STATE_KICKED);
1238 *self.intentionally_left.lock().unwrap() = true;
1239 for (_, handler) in self.kicked_handlers.lock().unwrap().iter() {
1240 handler();
1241 }
1242 }
1243
1244 fn handle_error(&self, msg: &Value) {
1245 let code = msg["code"].as_str().unwrap_or("");
1246 let message = msg["message"].as_str().unwrap_or("");
1247 for (_, handler) in self.error_handlers.lock().unwrap().iter() {
1248 handler(code, message);
1249 }
1250 }
1251
1252 async fn send_unit_request(
1255 &self,
1256 pending: &Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
1257 request_id: String,
1258 message: Value,
1259 timeout_message: String,
1260 ) -> Result<(), Error> {
1261 let (tx, rx) = oneshot::channel::<Result<(), Error>>();
1262 pending.lock().unwrap().insert(request_id.clone(), tx);
1263 self.ws_send(message);
1264
1265 match timeout(Duration::from_millis(self.opts.send_timeout_ms), rx).await {
1266 Ok(Ok(result)) => result,
1267 Ok(Err(_)) => {
1268 pending.lock().unwrap().remove(&request_id);
1269 Err(Error::Room(
1270 "Room left while waiting for room control result".to_string(),
1271 ))
1272 }
1273 Err(_) => {
1274 pending.lock().unwrap().remove(&request_id);
1275 Err(Error::RoomTimeout(timeout_message))
1276 }
1277 }
1278 }
1279
1280 fn resolve_pending_unit_request(
1281 &self,
1282 pending: &Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
1283 msg: &Value,
1284 ) {
1285 let request_id = msg["requestId"].as_str().unwrap_or("");
1286 if let Some(tx) = pending.lock().unwrap().remove(request_id) {
1287 let _ = tx.send(Ok(()));
1288 }
1289 }
1290
1291 fn reject_pending_unit_request(
1292 &self,
1293 pending: &Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
1294 msg: &Value,
1295 fallback: &str,
1296 ) {
1297 let request_id = msg["requestId"].as_str().unwrap_or("");
1298 if let Some(tx) = pending.lock().unwrap().remove(request_id) {
1299 let message = msg["message"].as_str().unwrap_or(fallback);
1300 let _ = tx.send(Err(Error::Room(message.to_string())));
1301 }
1302 }
1303
1304 fn set_connection_state(&self, next: &str) {
1305 let mut state = self.connection_state.write().unwrap();
1306 if state.as_str() == next {
1307 return;
1308 }
1309 *state = next.to_string();
1310 drop(state);
1311
1312 for (_, handler) in self.connection_state_handlers.lock().unwrap().iter() {
1313 handler(next);
1314 }
1315 }
1316
1317 fn begin_reconnect_attempt(&self, attempt: u64) {
1318 *self.reconnect_info.write().unwrap() = Some(json!({ "attempt": attempt }));
1319 self.set_connection_state(ROOM_STATE_RECONNECTING);
1320 }
1321
1322 fn upsert_member(&self, member: Value) {
1323 let member_id = member["memberId"].as_str().unwrap_or("").to_string();
1324 if member_id.is_empty() {
1325 return;
1326 }
1327
1328 let mut members = self.members.write().unwrap();
1329 let list = members.as_array_mut().expect("members array");
1330 if let Some(existing) = list
1331 .iter_mut()
1332 .find(|entry| entry["memberId"].as_str() == Some(member_id.as_str()))
1333 {
1334 *existing = member;
1335 } else {
1336 list.push(member);
1337 }
1338 drop(members);
1339 self.merge_members_into_media();
1340 }
1341
1342 fn remove_member(&self, member_id: &str) {
1343 {
1344 let mut members = self.members.write().unwrap();
1345 if let Some(list) = members.as_array_mut() {
1346 list.retain(|entry| entry["memberId"].as_str() != Some(member_id));
1347 }
1348 }
1349 {
1350 let mut media_members = self.media_members.write().unwrap();
1351 if let Some(list) = media_members.as_array_mut() {
1352 list.retain(|entry| entry["member"]["memberId"].as_str() != Some(member_id));
1353 }
1354 }
1355 }
1356
1357 fn merge_members_into_media(&self) {
1358 let members = self.members.read().unwrap().clone();
1359 let mut media_members = self.media_members.write().unwrap();
1360 if let Some(list) = media_members.as_array_mut() {
1361 for media_member in list.iter_mut() {
1362 let member_id = media_member["member"]["memberId"].as_str().unwrap_or("");
1363 if let Some(member) = members
1364 .as_array()
1365 .and_then(|entries| {
1366 entries
1367 .iter()
1368 .find(|entry| entry["memberId"].as_str() == Some(member_id))
1369 })
1370 {
1371 media_member["member"] = member.clone();
1372 }
1373 }
1374 }
1375 }
1376
1377 fn ensure_media_member(&self, member: &Value) -> usize {
1378 self.upsert_member(member.clone());
1379 let member_id = member["memberId"].as_str().unwrap_or("");
1380 let mut media_members = self.media_members.write().unwrap();
1381 let list = media_members.as_array_mut().expect("media members array");
1382 if let Some(index) = list
1383 .iter()
1384 .position(|entry| entry["member"]["memberId"].as_str() == Some(member_id))
1385 {
1386 list[index]["member"] = member.clone();
1387 return index;
1388 }
1389
1390 list.push(json!({
1391 "member": member,
1392 "state": {},
1393 "tracks": [],
1394 }));
1395 list.len() - 1
1396 }
1397
1398 fn upsert_media_track(&self, member: &Value, track: &Value) {
1399 let index = self.ensure_media_member(member);
1400 let mut media_members = self.media_members.write().unwrap();
1401 let list = media_members.as_array_mut().expect("media members array");
1402 let tracks = list[index]["tracks"].as_array_mut().expect("tracks array");
1403 let kind = track["kind"].as_str().unwrap_or("");
1404 if let Some(existing) = tracks
1405 .iter_mut()
1406 .find(|entry| entry["kind"].as_str() == Some(kind))
1407 {
1408 *existing = track.clone();
1409 } else {
1410 tracks.push(track.clone());
1411 }
1412 apply_track_to_state(&mut list[index]["state"], track, true);
1413 }
1414
1415 fn remove_media_track(&self, member: &Value, track: &Value) {
1416 let index = self.ensure_media_member(member);
1417 let mut media_members = self.media_members.write().unwrap();
1418 let list = media_members.as_array_mut().expect("media members array");
1419 let kind = track["kind"].as_str().unwrap_or("");
1420 if let Some(tracks) = list[index]["tracks"].as_array_mut() {
1421 tracks.retain(|entry| entry["kind"].as_str() != Some(kind));
1422 }
1423 apply_track_to_state(&mut list[index]["state"], track, false);
1424 }
1425
1426 fn upsert_media_state(&self, member: &Value, state: Value) {
1427 let index = self.ensure_media_member(member);
1428 let mut media_members = self.media_members.write().unwrap();
1429 let list = media_members.as_array_mut().expect("media members array");
1430 list[index]["state"] = state;
1431 }
1432
1433 fn apply_media_device_change(&self, member: &Value, change: &Value) {
1434 let index = self.ensure_media_member(member);
1435 let mut media_members = self.media_members.write().unwrap();
1436 let list = media_members.as_array_mut().expect("media members array");
1437 let kind = change["kind"].as_str().unwrap_or("");
1438 let device_id = change["deviceId"].clone();
1439 if let Some(state) = list[index]["state"].as_object_mut() {
1440 let kind_state = state.entry(kind.to_string()).or_insert_with(|| json!({}));
1441 if let Some(kind_state_map) = kind_state.as_object_mut() {
1442 kind_state_map.insert("deviceId".to_string(), device_id);
1443 }
1444 }
1445 }
1446
1447 fn next_handler_id(&self) -> u64 {
1448 let mut counter = self.handler_id_counter.lock().unwrap();
1449 *counter += 1;
1450 *counter
1451 }
1452}
1453
1454fn deep_set(obj: &mut Value, path: &str, value: Value) {
1457 if let Some(dot) = path.find('.') {
1458 let head = &path[..dot];
1459 let tail = &path[dot + 1..];
1460 if let Value::Object(map) = obj {
1461 let nested = map.entry(head.to_string()).or_insert(json!({}));
1462 deep_set(nested, tail, value);
1463 }
1464 } else if let Value::Object(map) = obj {
1465 if value.is_null() {
1466 map.remove(path);
1467 } else {
1468 map.insert(path.to_string(), value);
1469 }
1470 }
1471}
1472
1473fn object_or_empty(value: &Value) -> Value {
1474 if value.is_object() {
1475 value.clone()
1476 } else {
1477 json!({})
1478 }
1479}
1480
1481fn normalize_member(value: &Value) -> Option<Value> {
1482 let member_id = value["memberId"].as_str()?;
1483 let user_id = value["userId"].as_str()?;
1484 let mut member = json!({
1485 "memberId": member_id,
1486 "userId": user_id,
1487 "state": object_or_empty(&value["state"]),
1488 });
1489
1490 if let Some(connection_id) = value["connectionId"].as_str() {
1491 member["connectionId"] = Value::String(connection_id.to_string());
1492 }
1493 if let Some(connection_count) = value["connectionCount"].as_u64() {
1494 member["connectionCount"] = Value::from(connection_count);
1495 }
1496 if let Some(role) = value["role"].as_str() {
1497 member["role"] = Value::String(role.to_string());
1498 }
1499 Some(member)
1500}
1501
1502fn normalize_members(value: &Value) -> Value {
1503 Value::Array(
1504 value
1505 .as_array()
1506 .into_iter()
1507 .flatten()
1508 .filter_map(normalize_member)
1509 .collect(),
1510 )
1511}
1512
1513fn normalize_track(value: &Value) -> Option<Value> {
1514 let kind = value["kind"].as_str()?;
1515 let mut track = json!({
1516 "kind": kind,
1517 "muted": value["muted"].as_bool().unwrap_or(false),
1518 });
1519
1520 if let Some(track_id) = value["trackId"].as_str() {
1521 track["trackId"] = Value::String(track_id.to_string());
1522 }
1523 if let Some(device_id) = value["deviceId"].as_str() {
1524 track["deviceId"] = Value::String(device_id.to_string());
1525 }
1526 if let Some(published_at) = value["publishedAt"].as_u64() {
1527 track["publishedAt"] = Value::from(published_at);
1528 }
1529 if let Some(admin_disabled) = value["adminDisabled"].as_bool() {
1530 track["adminDisabled"] = Value::Bool(admin_disabled);
1531 }
1532 Some(track)
1533}
1534
1535fn normalize_tracks(value: &Value) -> Value {
1536 Value::Array(
1537 value
1538 .as_array()
1539 .into_iter()
1540 .flatten()
1541 .filter_map(normalize_track)
1542 .collect(),
1543 )
1544}
1545
1546fn normalize_media_members(value: &Value) -> Value {
1547 Value::Array(
1548 value
1549 .as_array()
1550 .into_iter()
1551 .flatten()
1552 .filter_map(|entry| {
1553 let member = normalize_member(&entry["member"])?;
1554 Some(json!({
1555 "member": member,
1556 "state": object_or_empty(&entry["state"]),
1557 "tracks": normalize_tracks(&entry["tracks"]),
1558 }))
1559 })
1560 .collect(),
1561 )
1562}
1563
1564fn apply_track_to_state(state: &mut Value, track: &Value, published: bool) {
1565 if !state.is_object() {
1566 *state = json!({});
1567 }
1568
1569 let kind = match track["kind"].as_str() {
1570 Some(kind) => kind,
1571 None => return,
1572 };
1573
1574 let state_map = state.as_object_mut().expect("state object");
1575 let kind_state = state_map
1576 .entry(kind.to_string())
1577 .or_insert_with(|| json!({}));
1578 let kind_state_map = kind_state.as_object_mut().expect("kind state object");
1579 kind_state_map.insert(
1580 "published".to_string(),
1581 Value::Bool(published),
1582 );
1583 kind_state_map.insert(
1584 "muted".to_string(),
1585 Value::Bool(track["muted"].as_bool().unwrap_or(false)),
1586 );
1587
1588 if published {
1589 if let Some(track_id) = track.get("trackId") {
1590 kind_state_map.insert("trackId".to_string(), track_id.clone());
1591 }
1592 if let Some(device_id) = track.get("deviceId") {
1593 kind_state_map.insert("deviceId".to_string(), device_id.clone());
1594 }
1595 if let Some(published_at) = track.get("publishedAt") {
1596 kind_state_map.insert("publishedAt".to_string(), published_at.clone());
1597 }
1598 if let Some(admin_disabled) = track.get("adminDisabled") {
1599 kind_state_map.insert("adminDisabled".to_string(), admin_disabled.clone());
1600 }
1601 } else {
1602 kind_state_map.remove("trackId");
1603 kind_state_map.remove("publishedAt");
1604 if let Some(admin_disabled) = track.get("adminDisabled") {
1605 kind_state_map.insert("adminDisabled".to_string(), admin_disabled.clone());
1606 }
1607 }
1608}
1609
1610pub struct RoomStateNamespace {
1611 client: Arc<RoomClient>,
1612}
1613
1614impl RoomStateNamespace {
1615 fn new(client: Arc<RoomClient>) -> Self {
1616 Self { client }
1617 }
1618
1619 pub fn get_shared(&self) -> Value {
1620 self.client.get_shared_state()
1621 }
1622
1623 pub fn get_mine(&self) -> Value {
1624 self.client.get_player_state()
1625 }
1626
1627 pub fn on_shared_change(
1628 &self,
1629 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1630 ) -> Subscription {
1631 self.client.on_shared_state(handler)
1632 }
1633
1634 pub fn on_mine_change(
1635 &self,
1636 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1637 ) -> Subscription {
1638 self.client.on_player_state(handler)
1639 }
1640
1641 pub async fn send(&self, action_type: &str, payload: Option<Value>) -> Result<Value, Error> {
1642 self.client.send(action_type, payload).await
1643 }
1644}
1645
1646pub struct RoomMetaNamespace {
1647 client: Arc<RoomClient>,
1648}
1649
1650impl RoomMetaNamespace {
1651 fn new(client: Arc<RoomClient>) -> Self {
1652 Self { client }
1653 }
1654
1655 pub async fn get(&self) -> Result<Value, Error> {
1656 self.client.get_metadata().await
1657 }
1658}
1659
1660pub struct RoomSignalsNamespace {
1661 client: Arc<RoomClient>,
1662}
1663
1664impl RoomSignalsNamespace {
1665 fn new(client: Arc<RoomClient>) -> Self {
1666 Self { client }
1667 }
1668
1669 pub async fn send(
1670 &self,
1671 event: &str,
1672 payload: Option<Value>,
1673 options: Option<Value>,
1674 ) -> Result<(), Error> {
1675 self.client.send_signal(event, payload, options).await
1676 }
1677
1678 pub async fn send_to(
1679 &self,
1680 member_id: &str,
1681 event: &str,
1682 payload: Option<Value>,
1683 ) -> Result<(), Error> {
1684 self.client
1685 .send_signal(event, payload, Some(json!({ "memberId": member_id })))
1686 .await
1687 }
1688
1689 pub fn on(
1690 &self,
1691 event: &str,
1692 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1693 ) -> Subscription {
1694 self.client.on_signal(event, handler)
1695 }
1696
1697 pub fn on_any(
1698 &self,
1699 handler: impl Fn(&str, &Value, &Value) + Send + Sync + 'static,
1700 ) -> Subscription {
1701 self.client.on_any_signal(handler)
1702 }
1703}
1704
1705pub struct RoomMembersNamespace {
1706 client: Arc<RoomClient>,
1707}
1708
1709impl RoomMembersNamespace {
1710 fn new(client: Arc<RoomClient>) -> Self {
1711 Self { client }
1712 }
1713
1714 pub fn list(&self) -> Value {
1715 self.client.list_members()
1716 }
1717
1718 pub fn on_sync(
1719 &self,
1720 handler: impl Fn(&Value) + Send + Sync + 'static,
1721 ) -> Subscription {
1722 self.client.on_members_sync(handler)
1723 }
1724
1725 pub fn on_join(
1726 &self,
1727 handler: impl Fn(&Value) + Send + Sync + 'static,
1728 ) -> Subscription {
1729 self.client.on_member_join(handler)
1730 }
1731
1732 pub fn on_leave(
1733 &self,
1734 handler: impl Fn(&Value, &str) + Send + Sync + 'static,
1735 ) -> Subscription {
1736 self.client.on_member_leave(handler)
1737 }
1738
1739 pub async fn set_state(&self, state: Value) -> Result<(), Error> {
1740 self.client.send_member_state(state).await
1741 }
1742
1743 pub async fn clear_state(&self) -> Result<(), Error> {
1744 self.client.clear_member_state().await
1745 }
1746
1747 pub fn on_state_change(
1748 &self,
1749 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1750 ) -> Subscription {
1751 self.client.on_member_state_change(handler)
1752 }
1753}
1754
1755pub struct RoomAdminNamespace {
1756 client: Arc<RoomClient>,
1757}
1758
1759impl RoomAdminNamespace {
1760 fn new(client: Arc<RoomClient>) -> Self {
1761 Self { client }
1762 }
1763
1764 pub async fn kick(&self, member_id: &str) -> Result<(), Error> {
1765 self.client.send_admin("kick", member_id, None).await
1766 }
1767
1768 pub async fn mute(&self, member_id: &str) -> Result<(), Error> {
1769 self.client.send_admin("mute", member_id, None).await
1770 }
1771
1772 pub async fn block(&self, member_id: &str) -> Result<(), Error> {
1773 self.client.send_admin("block", member_id, None).await
1774 }
1775
1776 pub async fn set_role(&self, member_id: &str, role: &str) -> Result<(), Error> {
1777 self.client
1778 .send_admin("setRole", member_id, Some(json!({ "role": role })))
1779 .await
1780 }
1781
1782 pub async fn disable_video(&self, member_id: &str) -> Result<(), Error> {
1783 self.client.send_admin("disableVideo", member_id, None).await
1784 }
1785
1786 pub async fn stop_screen_share(&self, member_id: &str) -> Result<(), Error> {
1787 self.client
1788 .send_admin("stopScreenShare", member_id, None)
1789 .await
1790 }
1791}
1792
1793pub struct RoomMediaKindNamespace {
1794 client: Arc<RoomClient>,
1795 kind: &'static str,
1796}
1797
1798impl RoomMediaKindNamespace {
1799 fn new(client: Arc<RoomClient>, kind: &'static str) -> Self {
1800 Self { client, kind }
1801 }
1802
1803 pub async fn enable(&self, payload: Option<Value>) -> Result<(), Error> {
1804 self.client.send_media("publish", self.kind, payload).await
1805 }
1806
1807 pub async fn disable(&self) -> Result<(), Error> {
1808 self.client.send_media("unpublish", self.kind, None).await
1809 }
1810
1811 pub async fn set_muted(&self, muted: bool) -> Result<(), Error> {
1812 self.client
1813 .send_media("mute", self.kind, Some(json!({ "muted": muted })))
1814 .await
1815 }
1816}
1817
1818pub struct RoomScreenMediaNamespace {
1819 client: Arc<RoomClient>,
1820}
1821
1822impl RoomScreenMediaNamespace {
1823 fn new(client: Arc<RoomClient>) -> Self {
1824 Self { client }
1825 }
1826
1827 pub async fn start(&self, payload: Option<Value>) -> Result<(), Error> {
1828 self.client.send_media("publish", "screen", payload).await
1829 }
1830
1831 pub async fn stop(&self) -> Result<(), Error> {
1832 self.client.send_media("unpublish", "screen", None).await
1833 }
1834}
1835
1836pub struct RoomMediaDevicesNamespace {
1837 client: Arc<RoomClient>,
1838}
1839
1840impl RoomMediaDevicesNamespace {
1841 fn new(client: Arc<RoomClient>) -> Self {
1842 Self { client }
1843 }
1844
1845 pub async fn switch_inputs(&self, payload: Value) -> Result<(), Error> {
1846 self.client.switch_media_devices(payload).await
1847 }
1848}
1849
1850pub struct RoomMediaNamespace {
1851 client: Arc<RoomClient>,
1852}
1853
1854impl RoomMediaNamespace {
1855 fn new(client: Arc<RoomClient>) -> Self {
1856 Self { client }
1857 }
1858
1859 pub fn list(&self) -> Value {
1860 self.client.list_media_members()
1861 }
1862
1863 pub fn audio(&self) -> RoomMediaKindNamespace {
1864 RoomMediaKindNamespace::new(Arc::clone(&self.client), "audio")
1865 }
1866
1867 pub fn video(&self) -> RoomMediaKindNamespace {
1868 RoomMediaKindNamespace::new(Arc::clone(&self.client), "video")
1869 }
1870
1871 pub fn screen(&self) -> RoomScreenMediaNamespace {
1872 RoomScreenMediaNamespace::new(Arc::clone(&self.client))
1873 }
1874
1875 pub fn devices(&self) -> RoomMediaDevicesNamespace {
1876 RoomMediaDevicesNamespace::new(Arc::clone(&self.client))
1877 }
1878
1879 pub fn on_track(
1880 &self,
1881 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1882 ) -> Subscription {
1883 self.client.on_media_track(handler)
1884 }
1885
1886 pub fn on_track_removed(
1887 &self,
1888 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1889 ) -> Subscription {
1890 self.client.on_media_track_removed(handler)
1891 }
1892
1893 pub fn on_state_change(
1894 &self,
1895 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1896 ) -> Subscription {
1897 self.client.on_media_state_change(handler)
1898 }
1899
1900 pub fn on_device_change(
1901 &self,
1902 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1903 ) -> Subscription {
1904 self.client.on_media_device_change(handler)
1905 }
1906}
1907
1908pub struct RoomSessionNamespace {
1909 client: Arc<RoomClient>,
1910}
1911
1912impl RoomSessionNamespace {
1913 fn new(client: Arc<RoomClient>) -> Self {
1914 Self { client }
1915 }
1916
1917 pub fn on_error(
1918 &self,
1919 handler: impl Fn(&str, &str) + Send + Sync + 'static,
1920 ) -> Subscription {
1921 self.client.on_error(handler)
1922 }
1923
1924 pub fn on_kicked(&self, handler: impl Fn() + Send + Sync + 'static) -> Subscription {
1925 self.client.on_kicked(handler)
1926 }
1927
1928 pub fn on_reconnect(
1929 &self,
1930 handler: impl Fn(&Value) + Send + Sync + 'static,
1931 ) -> Subscription {
1932 self.client.on_reconnect(handler)
1933 }
1934
1935 pub fn on_connection_state_change(
1936 &self,
1937 handler: impl Fn(&str) + Send + Sync + 'static,
1938 ) -> Subscription {
1939 self.client.on_connection_state_change(handler)
1940 }
1941
1942 pub fn connection_state(&self) -> String {
1943 self.client.connection_state()
1944 }
1945
1946 pub fn user_id(&self) -> Option<String> {
1947 self.client.current_user_id.lock().unwrap().clone()
1948 }
1949
1950 pub fn connection_id(&self) -> Option<String> {
1951 self.client.current_connection_id.lock().unwrap().clone()
1952 }
1953}