1use crate::error::Error;
19use serde_json::{json, Value};
20use std::collections::HashMap;
21use std::sync::{Arc, Mutex, RwLock};
22use tokio::sync::{mpsc, oneshot};
23use tokio::time::{sleep, timeout, Duration};
24use uuid::Uuid;
25
26pub struct RoomOptions {
29 pub auto_reconnect: bool,
30 pub max_reconnect_attempts: u32,
31 pub reconnect_base_delay_ms: u64,
32 pub send_timeout_ms: u64,
34 pub connection_timeout_ms: u64,
36}
37
38impl Default for RoomOptions {
39 fn default() -> Self {
40 Self {
41 auto_reconnect: true,
42 max_reconnect_attempts: 10,
43 reconnect_base_delay_ms: 1000,
44 send_timeout_ms: 10_000,
45 connection_timeout_ms: 15_000,
46 }
47 }
48}
49
50pub struct Subscription {
54 remove_fn: Mutex<Option<Box<dyn FnOnce() + Send>>>,
55}
56
57impl Subscription {
58 fn new(remove_fn: impl FnOnce() + Send + 'static) -> Self {
59 Self {
60 remove_fn: Mutex::new(Some(Box::new(remove_fn))),
61 }
62 }
63
64 pub fn unsubscribe(self) {
66 if let Some(f) = self.remove_fn.lock().unwrap().take() {
67 f();
68 }
69 }
70}
71
72impl Drop for Subscription {
73 fn drop(&mut self) {
74 if let Some(f) = self.remove_fn.lock().unwrap().take() {
75 f();
76 }
77 }
78}
79
80type StateHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
83type MessageHandler = Arc<dyn Fn(&Value) + Send + Sync>;
84type ErrorHandler = Arc<dyn Fn(&str, &str) + Send + Sync>;
85type KickedHandler = Arc<dyn Fn() + Send + Sync>;
86type MembersSyncHandler = Arc<dyn Fn(&Value) + Send + Sync>;
87type MemberHandler = Arc<dyn Fn(&Value) + Send + Sync>;
88type MemberLeaveHandler = Arc<dyn Fn(&Value, &str) + Send + Sync>;
89type MemberStateHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
90type SignalHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
91type AnySignalHandler = Arc<dyn Fn(&str, &Value, &Value) + Send + Sync>;
92type MediaTrackHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
93type MediaStateHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
94type MediaDeviceHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
95type ReconnectHandler = Arc<dyn Fn(&Value) + Send + Sync>;
96type ConnectionStateHandler = Arc<dyn Fn(&str) + Send + Sync>;
97
98type HandlerList<H> = Arc<Mutex<Vec<(u64, H)>>>;
100
101pub(crate) enum RoomWsCommand {
102 Send(String),
103 Close,
104}
105
106const ROOM_EXPLICIT_LEAVE_CLOSE_DELAY: Duration = Duration::from_millis(40);
107const ROOM_STATE_IDLE: &str = "idle";
108const ROOM_STATE_CONNECTING: &str = "connecting";
109const ROOM_STATE_CONNECTED: &str = "connected";
110const ROOM_STATE_RECONNECTING: &str = "reconnecting";
111const ROOM_STATE_DISCONNECTED: &str = "disconnected";
112const ROOM_STATE_KICKED: &str = "kicked";
113
114pub struct RoomClient {
117 pub namespace: String,
119 pub room_id: String,
121
122 shared_state: RwLock<Value>,
124 shared_version: RwLock<u64>,
125 player_state: RwLock<Value>,
126 player_version: RwLock<u64>,
127 members: RwLock<Value>,
128 media_members: RwLock<Value>,
129 current_user_id: Mutex<Option<String>>,
130 current_connection_id: Mutex<Option<String>>,
131 connection_state: RwLock<String>,
132 reconnect_info: RwLock<Option<Value>>,
133
134 base_url: String,
136 token_fn: Box<dyn Fn() -> String + Send + Sync>,
137 opts: RoomOptions,
138
139 shared_state_handlers: HandlerList<StateHandler>,
141 player_state_handlers: HandlerList<StateHandler>,
142 message_handlers: Arc<Mutex<HashMap<String, Vec<(u64, MessageHandler)>>>>,
143 error_handlers: HandlerList<ErrorHandler>,
144 kicked_handlers: HandlerList<KickedHandler>,
145 member_sync_handlers: HandlerList<MembersSyncHandler>,
146 member_join_handlers: HandlerList<MemberHandler>,
147 member_leave_handlers: HandlerList<MemberLeaveHandler>,
148 member_state_handlers: HandlerList<MemberStateHandler>,
149 signal_handlers: Arc<Mutex<HashMap<String, Vec<(u64, SignalHandler)>>>>,
150 any_signal_handlers: HandlerList<AnySignalHandler>,
151 media_track_handlers: HandlerList<MediaTrackHandler>,
152 media_track_removed_handlers: HandlerList<MediaTrackHandler>,
153 media_state_handlers: HandlerList<MediaStateHandler>,
154 media_device_handlers: HandlerList<MediaDeviceHandler>,
155 reconnect_handlers: HandlerList<ReconnectHandler>,
156 connection_state_handlers: HandlerList<ConnectionStateHandler>,
157 handler_id_counter: Mutex<u64>,
158
159 pending_requests: Mutex<HashMap<String, oneshot::Sender<Result<Value, Error>>>>,
161 pending_signal_requests: Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
162 pending_admin_requests: Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
163 pending_member_state_requests: Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
164 pending_media_requests: Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
165
166 send_tx: Mutex<Option<mpsc::Sender<RoomWsCommand>>>,
168
169 stop_tx: Mutex<Option<mpsc::Sender<()>>>,
171 intentionally_left: Mutex<bool>,
172}
173
174impl RoomClient {
175 pub fn new(
184 base_url: &str,
185 namespace: &str,
186 room_id: &str,
187 token_fn: impl Fn() -> String + Send + Sync + 'static,
188 opts: Option<RoomOptions>,
189 ) -> Arc<Self> {
190 Arc::new(Self {
191 namespace: namespace.to_string(),
192 room_id: room_id.to_string(),
193 shared_state: RwLock::new(json!({})),
194 shared_version: RwLock::new(0),
195 player_state: RwLock::new(json!({})),
196 player_version: RwLock::new(0),
197 members: RwLock::new(json!([])),
198 media_members: RwLock::new(json!([])),
199 current_user_id: Mutex::new(None),
200 current_connection_id: Mutex::new(None),
201 connection_state: RwLock::new(ROOM_STATE_IDLE.to_string()),
202 reconnect_info: RwLock::new(None),
203 base_url: base_url.trim_end_matches('/').to_string(),
204 token_fn: Box::new(token_fn),
205 opts: opts.unwrap_or_default(),
206 shared_state_handlers: Arc::new(Mutex::new(vec![])),
207 player_state_handlers: Arc::new(Mutex::new(vec![])),
208 message_handlers: Arc::new(Mutex::new(HashMap::new())),
209 error_handlers: Arc::new(Mutex::new(vec![])),
210 kicked_handlers: Arc::new(Mutex::new(vec![])),
211 member_sync_handlers: Arc::new(Mutex::new(vec![])),
212 member_join_handlers: Arc::new(Mutex::new(vec![])),
213 member_leave_handlers: Arc::new(Mutex::new(vec![])),
214 member_state_handlers: Arc::new(Mutex::new(vec![])),
215 signal_handlers: Arc::new(Mutex::new(HashMap::new())),
216 any_signal_handlers: Arc::new(Mutex::new(vec![])),
217 media_track_handlers: Arc::new(Mutex::new(vec![])),
218 media_track_removed_handlers: Arc::new(Mutex::new(vec![])),
219 media_state_handlers: Arc::new(Mutex::new(vec![])),
220 media_device_handlers: Arc::new(Mutex::new(vec![])),
221 reconnect_handlers: Arc::new(Mutex::new(vec![])),
222 connection_state_handlers: Arc::new(Mutex::new(vec![])),
223 handler_id_counter: Mutex::new(0),
224 pending_requests: Mutex::new(HashMap::new()),
225 pending_signal_requests: Mutex::new(HashMap::new()),
226 pending_admin_requests: Mutex::new(HashMap::new()),
227 pending_member_state_requests: Mutex::new(HashMap::new()),
228 pending_media_requests: Mutex::new(HashMap::new()),
229 send_tx: Mutex::new(None),
230 stop_tx: Mutex::new(None),
231 intentionally_left: Mutex::new(false),
232 })
233 }
234
235 pub fn get_shared_state(&self) -> Value {
239 self.shared_state.read().unwrap().clone()
240 }
241
242 pub fn get_player_state(&self) -> Value {
244 self.player_state.read().unwrap().clone()
245 }
246
247 pub fn list_members(&self) -> Value {
249 self.members.read().unwrap().clone()
250 }
251
252 pub fn list_media_members(&self) -> Value {
254 self.media_members.read().unwrap().clone()
255 }
256
257 pub fn connection_state(&self) -> String {
259 self.connection_state.read().unwrap().clone()
260 }
261
262 pub fn state(self: &Arc<Self>) -> RoomStateNamespace {
263 RoomStateNamespace::new(Arc::clone(self))
264 }
265
266 pub fn meta(self: &Arc<Self>) -> RoomMetaNamespace {
267 RoomMetaNamespace::new(Arc::clone(self))
268 }
269
270 pub fn signals(self: &Arc<Self>) -> RoomSignalsNamespace {
271 RoomSignalsNamespace::new(Arc::clone(self))
272 }
273
274 pub fn members(self: &Arc<Self>) -> RoomMembersNamespace {
275 RoomMembersNamespace::new(Arc::clone(self))
276 }
277
278 pub fn admin(self: &Arc<Self>) -> RoomAdminNamespace {
279 RoomAdminNamespace::new(Arc::clone(self))
280 }
281
282 pub fn media(self: &Arc<Self>) -> RoomMediaNamespace {
283 RoomMediaNamespace::new(Arc::clone(self))
284 }
285
286 pub fn session(self: &Arc<Self>) -> RoomSessionNamespace {
287 RoomSessionNamespace::new(Arc::clone(self))
288 }
289
290 pub async fn get_metadata(&self) -> Result<Value, Error> {
295 Self::get_metadata_static(&self.base_url, &self.namespace, &self.room_id).await
296 }
297
298 pub async fn get_metadata_static(
301 base_url: &str,
302 namespace: &str,
303 room_id: &str,
304 ) -> Result<Value, Error> {
305 let url = format!(
306 "{}/api/room/metadata?namespace={}&id={}",
307 base_url.trim_end_matches('/'),
308 urlencoding::encode(namespace),
309 urlencoding::encode(room_id)
310 );
311 let resp = reqwest::get(&url)
312 .await
313 .map_err(|e| Error::Room(format!(
314 "Room metadata request could not reach {}. Make sure the EdgeBase server is running and reachable. Cause: {}",
315 url, e
316 )))?;
317 let status = resp.status();
318 let body = resp
319 .text()
320 .await
321 .map_err(|e| Error::Room(format!("Failed to read room metadata body from {}: {}", url, e)))?;
322 if !status.is_success() {
323 if let Ok(json) = serde_json::from_str::<Value>(&body) {
324 for key in ["message", "error", "detail"] {
325 if let Some(message) = json.get(key).and_then(|value| value.as_str()) {
326 let trimmed = message.trim();
327 if !trimmed.is_empty() {
328 return Err(Error::Room(trimmed.to_string()));
329 }
330 }
331 }
332 }
333 return Err(Error::Room(format!(
334 "Failed to load room metadata for '{}' in namespace '{}' (HTTP {}).",
335 room_id, namespace, status
336 )));
337 }
338 serde_json::from_str(&body)
339 .map_err(|e| Error::Room(format!("Failed to parse room metadata: {}", e)))
340 }
341
342 pub async fn join(self: &Arc<Self>) -> Result<(), Error> {
346 *self.intentionally_left.lock().unwrap() = false;
347 self.set_connection_state(ROOM_STATE_CONNECTING);
348
349 let (stop_tx, mut stop_rx) = mpsc::channel::<()>(1);
350 *self.stop_tx.lock().unwrap() = Some(stop_tx);
351
352 let this = Arc::clone(self);
353 tokio::spawn(async move {
354 let mut attempts = 0u32;
355 loop {
356 match this.establish().await {
357 Ok(mut ws_rx) => {
358 attempts = 0;
359 loop {
360 tokio::select! {
361 _ = stop_rx.recv() => return,
362 msg = ws_rx.recv() => {
363 match msg {
364 Some(raw) => this.handle_message(&raw),
365 None => {
366 if !*this.intentionally_left.lock().unwrap() {
369 this.reject_all_pending(
370 "WebSocket disconnected",
371 );
372 }
373 break;
374 }
375 }
376 }
377 }
378 }
379 }
380 Err(_) => {}
381 }
382 if *this.intentionally_left.lock().unwrap() {
383 return;
384 }
385 if !this.opts.auto_reconnect || attempts >= this.opts.max_reconnect_attempts {
386 this.set_connection_state(ROOM_STATE_DISCONNECTED);
387 return;
388 }
389 let delay = (this.opts.reconnect_base_delay_ms * (1u64 << attempts)).min(30_000);
390 attempts += 1;
391 this.begin_reconnect_attempt(attempts as u64);
392 sleep(Duration::from_millis(delay)).await;
393 }
394 });
395 Ok(())
396 }
397
398 pub async fn leave(&self) {
400 *self.intentionally_left.lock().unwrap() = true;
401
402 let send_tx = self.send_tx.lock().unwrap().clone();
403 if let Some(tx) = send_tx.as_ref() {
404 let _ = tx
405 .send(RoomWsCommand::Send(
406 json!({"type": "leave"}).to_string(),
407 ))
408 .await;
409 sleep(ROOM_EXPLICIT_LEAVE_CLOSE_DELAY).await;
410 let _ = tx.send(RoomWsCommand::Close).await;
411 }
412
413 if let Some(tx) = self.stop_tx.lock().unwrap().take() {
414 let _ = tx.send(()).await;
415 }
416
417 *self.send_tx.lock().unwrap() = None;
418
419 self.reject_all_pending("Room left");
421
422 *self.shared_state.write().unwrap() = json!({});
424 *self.shared_version.write().unwrap() = 0;
425 *self.player_state.write().unwrap() = json!({});
426 *self.player_version.write().unwrap() = 0;
427 *self.members.write().unwrap() = json!([]);
428 *self.media_members.write().unwrap() = json!([]);
429 *self.current_user_id.lock().unwrap() = None;
430 *self.current_connection_id.lock().unwrap() = None;
431 *self.reconnect_info.write().unwrap() = None;
432 self.set_connection_state(ROOM_STATE_IDLE);
433 }
434
435 pub async fn send(&self, action_type: &str, payload: Option<Value>) -> Result<Value, Error> {
445 if self.send_tx.lock().unwrap().is_none() {
446 return Err(Error::Room("Not connected to room. Call join() and wait for the room to connect before sending actions, signals, or media.".to_string()));
447 }
448
449 let request_id = Uuid::new_v4().to_string();
450 let (tx, rx) = oneshot::channel::<Result<Value, Error>>();
451
452 self.pending_requests
453 .lock()
454 .unwrap()
455 .insert(request_id.clone(), tx);
456
457 self.ws_send(json!({
458 "type": "send",
459 "actionType": action_type,
460 "payload": payload.unwrap_or(json!({})),
461 "requestId": request_id,
462 }));
463
464 let timeout_ms = self.opts.send_timeout_ms;
465 let action_type_owned = action_type.to_string();
466 let req_id = request_id.clone();
467
468 match timeout(Duration::from_millis(timeout_ms), rx).await {
469 Ok(Ok(result)) => result,
470 Ok(Err(_)) => {
471 self.pending_requests.lock().unwrap().remove(&req_id);
473 Err(Error::Room(
474 "Room left while waiting for action result".to_string(),
475 ))
476 }
477 Err(_) => {
478 self.pending_requests.lock().unwrap().remove(&req_id);
480 Err(Error::RoomTimeout(format!(
481 "Action '{}' timed out",
482 action_type_owned
483 )))
484 }
485 }
486 }
487
488 pub fn on_shared_state(
493 &self,
494 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
495 ) -> Subscription {
496 let id = self.next_handler_id();
497 let handler = Arc::new(handler) as StateHandler;
498 self.shared_state_handlers.lock().unwrap().push((id, handler));
499
500 let list = Arc::clone(&self.shared_state_handlers);
501 Subscription::new(move || {
502 list.lock().unwrap().retain(|(hid, _)| *hid != id);
503 })
504 }
505
506 pub fn on_player_state(
509 &self,
510 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
511 ) -> Subscription {
512 let id = self.next_handler_id();
513 let handler = Arc::new(handler) as StateHandler;
514 self.player_state_handlers.lock().unwrap().push((id, handler));
515
516 let list = Arc::clone(&self.player_state_handlers);
517 Subscription::new(move || {
518 list.lock().unwrap().retain(|(hid, _)| *hid != id);
519 })
520 }
521
522 pub fn on_message(
529 &self,
530 msg_type: &str,
531 handler: impl Fn(&Value) + Send + Sync + 'static,
532 ) -> Subscription {
533 let id = self.next_handler_id();
534 let handler = Arc::new(handler) as MessageHandler;
535 let msg_type = msg_type.to_string();
536 {
537 let mut map = self.message_handlers.lock().unwrap();
538 map.entry(msg_type.clone())
539 .or_insert_with(Vec::new)
540 .push((id, handler));
541 }
542
543 let map = Arc::clone(&self.message_handlers);
544 Subscription::new(move || {
545 if let Some(list) = map.lock().unwrap().get_mut(&msg_type) {
546 list.retain(|(hid, _)| *hid != id);
547 }
548 })
549 }
550
551 pub fn on_error(
553 &self,
554 handler: impl Fn(&str, &str) + Send + Sync + 'static,
555 ) -> Subscription {
556 let id = self.next_handler_id();
557 let handler = Arc::new(handler) as ErrorHandler;
558 self.error_handlers.lock().unwrap().push((id, handler));
559
560 let list = Arc::clone(&self.error_handlers);
561 Subscription::new(move || {
562 list.lock().unwrap().retain(|(hid, _)| *hid != id);
563 })
564 }
565
566 pub fn on_kicked(&self, handler: impl Fn() + Send + Sync + 'static) -> Subscription {
568 let id = self.next_handler_id();
569 let handler = Arc::new(handler) as KickedHandler;
570 self.kicked_handlers.lock().unwrap().push((id, handler));
571
572 let list = Arc::clone(&self.kicked_handlers);
573 Subscription::new(move || {
574 list.lock().unwrap().retain(|(hid, _)| *hid != id);
575 })
576 }
577
578 pub fn on_members_sync(
579 &self,
580 handler: impl Fn(&Value) + Send + Sync + 'static,
581 ) -> Subscription {
582 let id = self.next_handler_id();
583 let handler = Arc::new(handler) as MembersSyncHandler;
584 self.member_sync_handlers.lock().unwrap().push((id, handler));
585
586 let list = Arc::clone(&self.member_sync_handlers);
587 Subscription::new(move || {
588 list.lock().unwrap().retain(|(hid, _)| *hid != id);
589 })
590 }
591
592 pub fn on_member_join(
593 &self,
594 handler: impl Fn(&Value) + Send + Sync + 'static,
595 ) -> Subscription {
596 let id = self.next_handler_id();
597 let handler = Arc::new(handler) as MemberHandler;
598 self.member_join_handlers.lock().unwrap().push((id, handler));
599
600 let list = Arc::clone(&self.member_join_handlers);
601 Subscription::new(move || {
602 list.lock().unwrap().retain(|(hid, _)| *hid != id);
603 })
604 }
605
606 pub fn on_member_leave(
607 &self,
608 handler: impl Fn(&Value, &str) + Send + Sync + 'static,
609 ) -> Subscription {
610 let id = self.next_handler_id();
611 let handler = Arc::new(handler) as MemberLeaveHandler;
612 self.member_leave_handlers.lock().unwrap().push((id, handler));
613
614 let list = Arc::clone(&self.member_leave_handlers);
615 Subscription::new(move || {
616 list.lock().unwrap().retain(|(hid, _)| *hid != id);
617 })
618 }
619
620 pub fn on_member_state_change(
621 &self,
622 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
623 ) -> Subscription {
624 let id = self.next_handler_id();
625 let handler = Arc::new(handler) as MemberStateHandler;
626 self.member_state_handlers.lock().unwrap().push((id, handler));
627
628 let list = Arc::clone(&self.member_state_handlers);
629 Subscription::new(move || {
630 list.lock().unwrap().retain(|(hid, _)| *hid != id);
631 })
632 }
633
634 pub fn on_signal(
635 &self,
636 event: &str,
637 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
638 ) -> Subscription {
639 let id = self.next_handler_id();
640 let handler = Arc::new(handler) as SignalHandler;
641 let event = event.to_string();
642 {
643 let mut map = self.signal_handlers.lock().unwrap();
644 map.entry(event.clone())
645 .or_insert_with(Vec::new)
646 .push((id, handler));
647 }
648
649 let map = Arc::clone(&self.signal_handlers);
650 Subscription::new(move || {
651 if let Some(list) = map.lock().unwrap().get_mut(&event) {
652 list.retain(|(hid, _)| *hid != id);
653 }
654 })
655 }
656
657 pub fn on_any_signal(
658 &self,
659 handler: impl Fn(&str, &Value, &Value) + Send + Sync + 'static,
660 ) -> Subscription {
661 let id = self.next_handler_id();
662 let handler = Arc::new(handler) as AnySignalHandler;
663 self.any_signal_handlers.lock().unwrap().push((id, handler));
664
665 let list = Arc::clone(&self.any_signal_handlers);
666 Subscription::new(move || {
667 list.lock().unwrap().retain(|(hid, _)| *hid != id);
668 })
669 }
670
671 pub fn on_media_track(
672 &self,
673 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
674 ) -> Subscription {
675 let id = self.next_handler_id();
676 let handler = Arc::new(handler) as MediaTrackHandler;
677 self.media_track_handlers.lock().unwrap().push((id, handler));
678
679 let list = Arc::clone(&self.media_track_handlers);
680 Subscription::new(move || {
681 list.lock().unwrap().retain(|(hid, _)| *hid != id);
682 })
683 }
684
685 pub fn on_media_track_removed(
686 &self,
687 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
688 ) -> Subscription {
689 let id = self.next_handler_id();
690 let handler = Arc::new(handler) as MediaTrackHandler;
691 self.media_track_removed_handlers
692 .lock()
693 .unwrap()
694 .push((id, handler));
695
696 let list = Arc::clone(&self.media_track_removed_handlers);
697 Subscription::new(move || {
698 list.lock().unwrap().retain(|(hid, _)| *hid != id);
699 })
700 }
701
702 pub fn on_media_state_change(
703 &self,
704 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
705 ) -> Subscription {
706 let id = self.next_handler_id();
707 let handler = Arc::new(handler) as MediaStateHandler;
708 self.media_state_handlers.lock().unwrap().push((id, handler));
709
710 let list = Arc::clone(&self.media_state_handlers);
711 Subscription::new(move || {
712 list.lock().unwrap().retain(|(hid, _)| *hid != id);
713 })
714 }
715
716 pub fn on_media_device_change(
717 &self,
718 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
719 ) -> Subscription {
720 let id = self.next_handler_id();
721 let handler = Arc::new(handler) as MediaDeviceHandler;
722 self.media_device_handlers.lock().unwrap().push((id, handler));
723
724 let list = Arc::clone(&self.media_device_handlers);
725 Subscription::new(move || {
726 list.lock().unwrap().retain(|(hid, _)| *hid != id);
727 })
728 }
729
730 pub fn on_reconnect(
731 &self,
732 handler: impl Fn(&Value) + Send + Sync + 'static,
733 ) -> Subscription {
734 let id = self.next_handler_id();
735 let handler = Arc::new(handler) as ReconnectHandler;
736 self.reconnect_handlers.lock().unwrap().push((id, handler));
737
738 let list = Arc::clone(&self.reconnect_handlers);
739 Subscription::new(move || {
740 list.lock().unwrap().retain(|(hid, _)| *hid != id);
741 })
742 }
743
744 pub fn on_connection_state_change(
745 &self,
746 handler: impl Fn(&str) + Send + Sync + 'static,
747 ) -> Subscription {
748 let id = self.next_handler_id();
749 let handler = Arc::new(handler) as ConnectionStateHandler;
750 self.connection_state_handlers
751 .lock()
752 .unwrap()
753 .push((id, handler));
754
755 let list = Arc::clone(&self.connection_state_handlers);
756 Subscription::new(move || {
757 list.lock().unwrap().retain(|(hid, _)| *hid != id);
758 })
759 }
760
761 pub async fn send_signal(
762 &self,
763 event: &str,
764 payload: Option<Value>,
765 options: Option<Value>,
766 ) -> Result<(), Error> {
767 if self.send_tx.lock().unwrap().is_none() {
768 return Err(Error::Room("Not connected to room. Call join() and wait for the room to connect before sending actions, signals, or media.".to_string()));
769 }
770
771 let request_id = Uuid::new_v4().to_string();
772 let options = options.unwrap_or_else(|| json!({}));
773 let message = json!({
774 "type": "signal",
775 "event": event,
776 "payload": payload.unwrap_or_else(|| json!({})),
777 "requestId": request_id,
778 "memberId": options.get("memberId").cloned().unwrap_or(Value::Null),
779 "includeSelf": options.get("includeSelf").and_then(Value::as_bool).unwrap_or(false),
780 });
781
782 self.send_unit_request(&self.pending_signal_requests, request_id, message, format!("Signal '{}' timed out", event)).await
783 }
784
785 pub async fn send_member_state(&self, state: Value) -> Result<(), Error> {
786 if self.send_tx.lock().unwrap().is_none() {
787 return Err(Error::Room("Not connected to room. Call join() and wait for the room to connect before sending actions, signals, or media.".to_string()));
788 }
789
790 let request_id = Uuid::new_v4().to_string();
791 let message = json!({
792 "type": "member_state",
793 "state": state,
794 "requestId": request_id,
795 });
796
797 self.send_unit_request(
798 &self.pending_member_state_requests,
799 request_id,
800 message,
801 "Member state update timed out".to_string(),
802 ).await
803 }
804
805 pub async fn clear_member_state(&self) -> Result<(), Error> {
806 if self.send_tx.lock().unwrap().is_none() {
807 return Err(Error::Room("Not connected to room. Call join() and wait for the room to connect before sending actions, signals, or media.".to_string()));
808 }
809
810 let request_id = Uuid::new_v4().to_string();
811 let message = json!({
812 "type": "member_state_clear",
813 "requestId": request_id,
814 });
815
816 self.send_unit_request(
817 &self.pending_member_state_requests,
818 request_id,
819 message,
820 "Member state clear timed out".to_string(),
821 ).await
822 }
823
824 pub async fn send_admin(
825 &self,
826 operation: &str,
827 member_id: &str,
828 payload: Option<Value>,
829 ) -> Result<(), Error> {
830 if self.send_tx.lock().unwrap().is_none() {
831 return Err(Error::Room("Not connected to room. Call join() and wait for the room to connect before sending actions, signals, or media.".to_string()));
832 }
833
834 let request_id = Uuid::new_v4().to_string();
835 let message = json!({
836 "type": "admin",
837 "operation": operation,
838 "memberId": member_id,
839 "payload": payload.unwrap_or_else(|| json!({})),
840 "requestId": request_id,
841 });
842
843 self.send_unit_request(
844 &self.pending_admin_requests,
845 request_id,
846 message,
847 format!("Admin '{}' timed out", operation),
848 ).await
849 }
850
851 pub async fn send_media(
852 &self,
853 operation: &str,
854 kind: &str,
855 payload: Option<Value>,
856 ) -> Result<(), Error> {
857 if self.send_tx.lock().unwrap().is_none() {
858 return Err(Error::Room("Not connected to room. Call join() and wait for the room to connect before sending actions, signals, or media.".to_string()));
859 }
860
861 let request_id = Uuid::new_v4().to_string();
862 let message = json!({
863 "type": "media",
864 "operation": operation,
865 "kind": kind,
866 "payload": payload.unwrap_or_else(|| json!({})),
867 "requestId": request_id,
868 });
869
870 self.send_unit_request(
871 &self.pending_media_requests,
872 request_id,
873 message,
874 format!("Media '{}:{}' timed out", operation, kind),
875 ).await
876 }
877
878 pub async fn switch_media_devices(&self, payload: Value) -> Result<(), Error> {
879 if let Some(device_id) = payload.get("audioInputId").and_then(Value::as_str) {
880 self.send_media("device", "audio", Some(json!({ "deviceId": device_id }))).await?;
881 }
882 if let Some(device_id) = payload.get("videoInputId").and_then(Value::as_str) {
883 self.send_media("device", "video", Some(json!({ "deviceId": device_id }))).await?;
884 }
885 if let Some(device_id) = payload.get("screenInputId").and_then(Value::as_str) {
886 self.send_media("device", "screen", Some(json!({ "deviceId": device_id }))).await?;
887 }
888 Ok(())
889 }
890
891 fn reject_all_pending(&self, message: &str) {
894 let error_msg = message.to_string();
895
896 for (_, tx) in self.pending_requests.lock().unwrap().drain() {
898 let _ = tx.send(Err(Error::Room(error_msg.clone())));
899 }
900
901 for (_, tx) in self.pending_signal_requests.lock().unwrap().drain() {
903 let _ = tx.send(Err(Error::Room(error_msg.clone())));
904 }
905
906 for (_, tx) in self.pending_admin_requests.lock().unwrap().drain() {
908 let _ = tx.send(Err(Error::Room(error_msg.clone())));
909 }
910
911 for (_, tx) in self.pending_member_state_requests.lock().unwrap().drain() {
913 let _ = tx.send(Err(Error::Room(error_msg.clone())));
914 }
915
916 for (_, tx) in self.pending_media_requests.lock().unwrap().drain() {
918 let _ = tx.send(Err(Error::Room(error_msg.clone())));
919 }
920 }
921
922 pub async fn destroy(self: &Arc<Self>) {
925 self.leave().await;
926
927 self.shared_state_handlers.lock().unwrap().clear();
929 self.player_state_handlers.lock().unwrap().clear();
930 self.message_handlers.lock().unwrap().clear();
931 self.error_handlers.lock().unwrap().clear();
932 self.kicked_handlers.lock().unwrap().clear();
933 self.member_sync_handlers.lock().unwrap().clear();
934 self.member_join_handlers.lock().unwrap().clear();
935 self.member_leave_handlers.lock().unwrap().clear();
936 self.member_state_handlers.lock().unwrap().clear();
937 self.signal_handlers.lock().unwrap().clear();
938 self.any_signal_handlers.lock().unwrap().clear();
939 self.media_track_handlers.lock().unwrap().clear();
940 self.media_track_removed_handlers.lock().unwrap().clear();
941 self.media_state_handlers.lock().unwrap().clear();
942 self.media_device_handlers.lock().unwrap().clear();
943 self.reconnect_handlers.lock().unwrap().clear();
944 self.connection_state_handlers.lock().unwrap().clear();
945 }
946
947 fn ws_url(&self) -> String {
950 let u = self
951 .base_url
952 .replace("https://", "wss://")
953 .replace("http://", "ws://");
954 format!(
955 "{}/api/room?namespace={}&id={}",
956 u,
957 urlencoding::encode(&self.namespace),
958 urlencoding::encode(&self.room_id)
959 )
960 }
961
962 fn ws_send(&self, msg: Value) {
964 let s = msg.to_string();
965 if let Some(tx) = self.send_tx.lock().unwrap().as_ref() {
966 let tx = tx.clone();
967 tokio::spawn(async move {
968 let _ = tx.send(RoomWsCommand::Send(s)).await;
969 });
970 }
971 }
972
973 #[cfg(test)]
974 pub(crate) fn attach_send_channel_for_testing(&self, tx: mpsc::Sender<RoomWsCommand>) {
975 *self.send_tx.lock().unwrap() = Some(tx);
976 }
977
978 #[cfg(test)]
979 pub(crate) fn handle_message_for_testing(&self, raw: &str) {
980 self.handle_message(raw);
981 }
982
983 async fn establish(&self) -> anyhow::Result<mpsc::Receiver<String>> {
984 use futures_util::{SinkExt, StreamExt};
985 use tokio_tungstenite::{connect_async, tungstenite::Message};
986
987 let (ws_stream, _response) = tokio::time::timeout(
988 std::time::Duration::from_millis(self.opts.connection_timeout_ms),
989 connect_async(self.ws_url()),
990 )
991 .await
992 .map_err(|_| anyhow::anyhow!(
993 "Room WebSocket connection timed out after {}ms. Is the server running?",
994 self.opts.connection_timeout_ms,
995 ))??;
996 let (mut write, mut read) = ws_stream.split();
997
998 let auth = json!({"type": "auth", "token": (self.token_fn)(), "sdkVersion": "0.2.6"});
1000 write.send(Message::Text(auth.to_string().into())).await?;
1001
1002 let join_raw = if let Some(Ok(Message::Text(raw))) = read.next().await {
1004 let raw_str: &str = &raw;
1005 let resp: Value = serde_json::from_str(raw_str)?;
1006 let t = resp["type"].as_str().unwrap_or("");
1007 if t != "auth_success" && t != "auth_refreshed" {
1008 anyhow::bail!("Room auth failed: {}", resp["message"]);
1009 }
1010
1011 *self.current_user_id.lock().unwrap() = resp["userId"].as_str().map(|value| value.to_string());
1012 *self.current_connection_id.lock().unwrap() = resp["connectionId"].as_str().map(|value| value.to_string());
1013
1014 let join_msg = json!({
1016 "type": "join",
1017 "lastSharedState": *self.shared_state.read().unwrap(),
1018 "lastSharedVersion": *self.shared_version.read().unwrap(),
1019 "lastPlayerState": *self.player_state.read().unwrap(),
1020 "lastPlayerVersion": *self.player_version.read().unwrap(),
1021 });
1022 join_msg.to_string()
1023 } else {
1024 anyhow::bail!("Room: no auth response");
1025 };
1026
1027 let (write_tx, mut write_rx) = mpsc::channel::<RoomWsCommand>(128);
1029 *self.send_tx.lock().unwrap() = Some(write_tx.clone());
1030
1031 tokio::spawn(async move {
1033 while let Some(command) = write_rx.recv().await {
1034 match command {
1035 RoomWsCommand::Send(msg) => {
1036 if write.send(Message::Text(msg.into())).await.is_err() {
1037 break;
1038 }
1039 }
1040 RoomWsCommand::Close => {
1041 let _ = write.send(Message::Close(None)).await;
1042 break;
1043 }
1044 }
1045 }
1046 });
1047
1048 let _ = write_tx.send(RoomWsCommand::Send(join_raw)).await;
1050
1051 let htx = write_tx.clone();
1053 tokio::spawn(async move {
1054 loop {
1055 sleep(Duration::from_secs(30)).await;
1056 if htx
1057 .send(RoomWsCommand::Send(json!({"type":"ping"}).to_string()))
1058 .await
1059 .is_err()
1060 {
1061 break;
1062 }
1063 }
1064 });
1065
1066 let (msg_tx, msg_rx) = mpsc::channel::<String>(128);
1068 tokio::spawn(async move {
1069 while let Some(Ok(Message::Text(raw))) = read.next().await {
1070 let raw_str: String = raw.into();
1071 if msg_tx.send(raw_str).await.is_err() {
1072 break;
1073 }
1074 }
1075 });
1076
1077 Ok(msg_rx)
1078 }
1079
1080 fn handle_message(&self, raw: &str) {
1083 let msg: Value = match serde_json::from_str(raw) {
1084 Ok(v) => v,
1085 Err(_) => return,
1086 };
1087 let t = msg["type"].as_str().unwrap_or("");
1088
1089 match t {
1090 "sync" => self.handle_sync(&msg),
1091 "shared_delta" => self.handle_shared_delta(&msg),
1092 "player_delta" => self.handle_player_delta(&msg),
1093 "action_result" => self.handle_action_result(&msg),
1094 "action_error" => self.handle_action_error(&msg),
1095 "message" => self.handle_server_message(&msg),
1096 "signal" => self.handle_signal(&msg),
1097 "signal_sent" => self.resolve_pending_unit_request(&self.pending_signal_requests, &msg),
1098 "signal_error" => self.reject_pending_unit_request(&self.pending_signal_requests, &msg, "Signal error"),
1099 "members_sync" => self.handle_members_sync(&msg),
1100 "member_join" => self.handle_member_join(&msg),
1101 "member_leave" => self.handle_member_leave(&msg),
1102 "member_state" => self.handle_member_state(&msg),
1103 "member_state_error" => self.reject_pending_unit_request(&self.pending_member_state_requests, &msg, "Member state error"),
1104 "admin_result" => self.resolve_pending_unit_request(&self.pending_admin_requests, &msg),
1105 "admin_error" => self.reject_pending_unit_request(&self.pending_admin_requests, &msg, "Admin error"),
1106 "media_sync" => self.handle_media_sync(&msg),
1107 "media_track" => self.handle_media_track(&msg),
1108 "media_track_removed" => self.handle_media_track_removed(&msg),
1109 "media_state" => self.handle_media_state(&msg),
1110 "media_device" => self.handle_media_device(&msg),
1111 "media_result" => self.resolve_pending_unit_request(&self.pending_media_requests, &msg),
1112 "media_error" => self.reject_pending_unit_request(&self.pending_media_requests, &msg, "Media error"),
1113 "kicked" => self.handle_kicked(),
1114 "error" => self.handle_error(&msg),
1115 "pong" => {}
1116 _ => {}
1117 }
1118 }
1119
1120 fn handle_sync(&self, msg: &Value) {
1121 *self.shared_state.write().unwrap() = msg["sharedState"].clone();
1122 *self.shared_version.write().unwrap() = msg["sharedVersion"].as_u64().unwrap_or(0);
1123 *self.player_state.write().unwrap() = msg["playerState"].clone();
1124 *self.player_version.write().unwrap() = msg["playerVersion"].as_u64().unwrap_or(0);
1125 self.set_connection_state(ROOM_STATE_CONNECTED);
1126
1127 if let Some(info) = self.reconnect_info.write().unwrap().take() {
1128 for (_, handler) in self.reconnect_handlers.lock().unwrap().iter() {
1129 handler(&info);
1130 }
1131 }
1132
1133 let shared = self.shared_state.read().unwrap().clone();
1135 for (_, handler) in self.shared_state_handlers.lock().unwrap().iter() {
1136 handler(&shared, &shared);
1137 }
1138 let player = self.player_state.read().unwrap().clone();
1139 for (_, handler) in self.player_state_handlers.lock().unwrap().iter() {
1140 handler(&player, &player);
1141 }
1142 }
1143
1144 fn handle_shared_delta(&self, msg: &Value) {
1145 let delta = &msg["delta"];
1146 *self.shared_version.write().unwrap() = msg["version"].as_u64().unwrap_or(0);
1147
1148 if let Value::Object(map) = delta {
1149 let mut state = self.shared_state.write().unwrap();
1150 for (path, value) in map {
1151 deep_set(&mut state, path, value.clone());
1152 }
1153 }
1154
1155 let state = self.shared_state.read().unwrap().clone();
1156 for (_, handler) in self.shared_state_handlers.lock().unwrap().iter() {
1157 handler(&state, delta);
1158 }
1159 }
1160
1161 fn handle_player_delta(&self, msg: &Value) {
1162 let delta = &msg["delta"];
1163 *self.player_version.write().unwrap() = msg["version"].as_u64().unwrap_or(0);
1164
1165 if let Value::Object(map) = delta {
1166 let mut state = self.player_state.write().unwrap();
1167 for (path, value) in map {
1168 deep_set(&mut state, path, value.clone());
1169 }
1170 }
1171
1172 let state = self.player_state.read().unwrap().clone();
1173 for (_, handler) in self.player_state_handlers.lock().unwrap().iter() {
1174 handler(&state, delta);
1175 }
1176 }
1177
1178 fn handle_action_result(&self, msg: &Value) {
1179 let request_id = msg["requestId"].as_str().unwrap_or("");
1180 if let Some(tx) = self.pending_requests.lock().unwrap().remove(request_id) {
1181 let _ = tx.send(Ok(msg["result"].clone()));
1182 }
1183 }
1184
1185 fn handle_action_error(&self, msg: &Value) {
1186 let request_id = msg["requestId"].as_str().unwrap_or("");
1187 if let Some(tx) = self.pending_requests.lock().unwrap().remove(request_id) {
1188 let message = msg["message"].as_str().unwrap_or("Unknown EdgeBase error. Check the server response or logs for details.");
1189 let _ = tx.send(Err(Error::Room(message.to_string())));
1190 }
1191 }
1192
1193 fn handle_server_message(&self, msg: &Value) {
1194 let message_type = msg["messageType"].as_str().unwrap_or("");
1195 let data = &msg["data"];
1196
1197 let handlers = self.message_handlers.lock().unwrap();
1198 if let Some(list) = handlers.get(message_type) {
1199 for (_, handler) in list {
1200 handler(data);
1201 }
1202 }
1203 }
1204
1205 fn handle_signal(&self, msg: &Value) {
1206 let event = msg["event"].as_str().unwrap_or("");
1207 let payload = &msg["payload"];
1208 let meta = &msg["meta"];
1209
1210 if let Some(list) = self.signal_handlers.lock().unwrap().get(event) {
1211 for (_, handler) in list {
1212 handler(payload, meta);
1213 }
1214 }
1215
1216 for (_, handler) in self.any_signal_handlers.lock().unwrap().iter() {
1217 handler(event, payload, meta);
1218 }
1219 }
1220
1221 fn handle_members_sync(&self, msg: &Value) {
1222 let members = normalize_members(&msg["members"]);
1223 *self.members.write().unwrap() = members.clone();
1224 self.merge_members_into_media();
1225
1226 for (_, handler) in self.member_sync_handlers.lock().unwrap().iter() {
1227 handler(&members);
1228 }
1229 }
1230
1231 fn handle_member_join(&self, msg: &Value) {
1232 if let Some(member) = normalize_member(&msg["member"]) {
1233 self.upsert_member(member.clone());
1234 for (_, handler) in self.member_join_handlers.lock().unwrap().iter() {
1235 handler(&member);
1236 }
1237 }
1238 }
1239
1240 fn handle_member_leave(&self, msg: &Value) {
1241 if let Some(member) = normalize_member(&msg["member"]) {
1242 let member_id = member["memberId"].as_str().unwrap_or("");
1243 self.remove_member(member_id);
1244 let reason = msg["reason"].as_str().unwrap_or("leave");
1245 for (_, handler) in self.member_leave_handlers.lock().unwrap().iter() {
1246 handler(&member, reason);
1247 }
1248 }
1249 }
1250
1251 fn handle_member_state(&self, msg: &Value) {
1252 if let Some(mut member) = normalize_member(&msg["member"]) {
1253 let state = object_or_empty(&msg["state"]);
1254 member["state"] = state.clone();
1255 self.upsert_member(member.clone());
1256 self.resolve_pending_unit_request(&self.pending_member_state_requests, msg);
1257
1258 for (_, handler) in self.member_state_handlers.lock().unwrap().iter() {
1259 handler(&member, &state);
1260 }
1261 }
1262 }
1263
1264 fn handle_media_sync(&self, msg: &Value) {
1265 let media_members = normalize_media_members(&msg["members"]);
1266 *self.media_members.write().unwrap() = media_members.clone();
1267 self.merge_members_into_media();
1268 }
1269
1270 fn handle_media_track(&self, msg: &Value) {
1271 if let (Some(member), Some(track)) = (
1272 normalize_member(&msg["member"]),
1273 normalize_track(&msg["track"]),
1274 ) {
1275 self.upsert_media_track(&member, &track);
1276 for (_, handler) in self.media_track_handlers.lock().unwrap().iter() {
1277 handler(&track, &member);
1278 }
1279 }
1280 }
1281
1282 fn handle_media_track_removed(&self, msg: &Value) {
1283 if let (Some(member), Some(track)) = (
1284 normalize_member(&msg["member"]),
1285 normalize_track(&msg["track"]),
1286 ) {
1287 self.remove_media_track(&member, &track);
1288 for (_, handler) in self
1289 .media_track_removed_handlers
1290 .lock()
1291 .unwrap()
1292 .iter()
1293 {
1294 handler(&track, &member);
1295 }
1296 }
1297 }
1298
1299 fn handle_media_state(&self, msg: &Value) {
1300 if let Some(member) = normalize_member(&msg["member"]) {
1301 let state = object_or_empty(&msg["state"]);
1302 self.upsert_media_state(&member, state.clone());
1303 for (_, handler) in self.media_state_handlers.lock().unwrap().iter() {
1304 handler(&member, &state);
1305 }
1306 }
1307 }
1308
1309 fn handle_media_device(&self, msg: &Value) {
1310 if let Some(member) = normalize_member(&msg["member"]) {
1311 let change = json!({
1312 "kind": msg["kind"].clone(),
1313 "deviceId": msg["deviceId"].clone(),
1314 });
1315 self.apply_media_device_change(&member, &change);
1316 for (_, handler) in self.media_device_handlers.lock().unwrap().iter() {
1317 handler(&member, &change);
1318 }
1319 }
1320 }
1321
1322 fn handle_kicked(&self) {
1323 self.set_connection_state(ROOM_STATE_KICKED);
1324 *self.intentionally_left.lock().unwrap() = true;
1325 for (_, handler) in self.kicked_handlers.lock().unwrap().iter() {
1326 handler();
1327 }
1328 }
1329
1330 fn handle_error(&self, msg: &Value) {
1331 let code = msg["code"].as_str().unwrap_or("");
1332 let message = msg["message"].as_str().unwrap_or("");
1333 for (_, handler) in self.error_handlers.lock().unwrap().iter() {
1334 handler(code, message);
1335 }
1336 }
1337
1338 async fn send_unit_request(
1341 &self,
1342 pending: &Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
1343 request_id: String,
1344 message: Value,
1345 timeout_message: String,
1346 ) -> Result<(), Error> {
1347 let (tx, rx) = oneshot::channel::<Result<(), Error>>();
1348 pending.lock().unwrap().insert(request_id.clone(), tx);
1349 self.ws_send(message);
1350
1351 match timeout(Duration::from_millis(self.opts.send_timeout_ms), rx).await {
1352 Ok(Ok(result)) => result,
1353 Ok(Err(_)) => {
1354 pending.lock().unwrap().remove(&request_id);
1355 Err(Error::Room(
1356 "Room left while waiting for room control result".to_string(),
1357 ))
1358 }
1359 Err(_) => {
1360 pending.lock().unwrap().remove(&request_id);
1361 Err(Error::RoomTimeout(timeout_message))
1362 }
1363 }
1364 }
1365
1366 fn resolve_pending_unit_request(
1367 &self,
1368 pending: &Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
1369 msg: &Value,
1370 ) {
1371 let request_id = msg["requestId"].as_str().unwrap_or("");
1372 if let Some(tx) = pending.lock().unwrap().remove(request_id) {
1373 let _ = tx.send(Ok(()));
1374 }
1375 }
1376
1377 fn reject_pending_unit_request(
1378 &self,
1379 pending: &Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
1380 msg: &Value,
1381 fallback: &str,
1382 ) {
1383 let request_id = msg["requestId"].as_str().unwrap_or("");
1384 if let Some(tx) = pending.lock().unwrap().remove(request_id) {
1385 let message = msg["message"].as_str().unwrap_or(fallback);
1386 let _ = tx.send(Err(Error::Room(message.to_string())));
1387 }
1388 }
1389
1390 fn set_connection_state(&self, next: &str) {
1391 let mut state = self.connection_state.write().unwrap();
1392 if state.as_str() == next {
1393 return;
1394 }
1395 *state = next.to_string();
1396 drop(state);
1397
1398 for (_, handler) in self.connection_state_handlers.lock().unwrap().iter() {
1399 handler(next);
1400 }
1401 }
1402
1403 fn begin_reconnect_attempt(&self, attempt: u64) {
1404 *self.reconnect_info.write().unwrap() = Some(json!({ "attempt": attempt }));
1405 self.set_connection_state(ROOM_STATE_RECONNECTING);
1406 }
1407
1408 fn upsert_member(&self, member: Value) {
1409 let member_id = member["memberId"].as_str().unwrap_or("").to_string();
1410 if member_id.is_empty() {
1411 return;
1412 }
1413
1414 let mut members = self.members.write().unwrap();
1415 let list = members.as_array_mut().expect("members array");
1416 if let Some(existing) = list
1417 .iter_mut()
1418 .find(|entry| entry["memberId"].as_str() == Some(member_id.as_str()))
1419 {
1420 *existing = member;
1421 } else {
1422 list.push(member);
1423 }
1424 drop(members);
1425 self.merge_members_into_media();
1426 }
1427
1428 fn remove_member(&self, member_id: &str) {
1429 {
1430 let mut members = self.members.write().unwrap();
1431 if let Some(list) = members.as_array_mut() {
1432 list.retain(|entry| entry["memberId"].as_str() != Some(member_id));
1433 }
1434 }
1435 {
1436 let mut media_members = self.media_members.write().unwrap();
1437 if let Some(list) = media_members.as_array_mut() {
1438 list.retain(|entry| entry["member"]["memberId"].as_str() != Some(member_id));
1439 }
1440 }
1441 }
1442
1443 fn merge_members_into_media(&self) {
1444 let members = self.members.read().unwrap().clone();
1445 let mut media_members = self.media_members.write().unwrap();
1446 if let Some(list) = media_members.as_array_mut() {
1447 for media_member in list.iter_mut() {
1448 let member_id = media_member["member"]["memberId"].as_str().unwrap_or("");
1449 if let Some(member) = members
1450 .as_array()
1451 .and_then(|entries| {
1452 entries
1453 .iter()
1454 .find(|entry| entry["memberId"].as_str() == Some(member_id))
1455 })
1456 {
1457 media_member["member"] = member.clone();
1458 }
1459 }
1460 }
1461 }
1462
1463 fn ensure_media_member(&self, member: &Value) -> usize {
1464 self.upsert_member(member.clone());
1465 let member_id = member["memberId"].as_str().unwrap_or("");
1466 let mut media_members = self.media_members.write().unwrap();
1467 let list = media_members.as_array_mut().expect("media members array");
1468 if let Some(index) = list
1469 .iter()
1470 .position(|entry| entry["member"]["memberId"].as_str() == Some(member_id))
1471 {
1472 list[index]["member"] = member.clone();
1473 return index;
1474 }
1475
1476 list.push(json!({
1477 "member": member,
1478 "state": {},
1479 "tracks": [],
1480 }));
1481 list.len() - 1
1482 }
1483
1484 fn upsert_media_track(&self, member: &Value, track: &Value) {
1485 let index = self.ensure_media_member(member);
1486 let mut media_members = self.media_members.write().unwrap();
1487 let list = media_members.as_array_mut().expect("media members array");
1488 let tracks = list[index]["tracks"].as_array_mut().expect("tracks array");
1489 let kind = track["kind"].as_str().unwrap_or("");
1490 if let Some(existing) = tracks
1491 .iter_mut()
1492 .find(|entry| entry["kind"].as_str() == Some(kind))
1493 {
1494 *existing = track.clone();
1495 } else {
1496 tracks.push(track.clone());
1497 }
1498 apply_track_to_state(&mut list[index]["state"], track, true);
1499 }
1500
1501 fn remove_media_track(&self, member: &Value, track: &Value) {
1502 let index = self.ensure_media_member(member);
1503 let mut media_members = self.media_members.write().unwrap();
1504 let list = media_members.as_array_mut().expect("media members array");
1505 let kind = track["kind"].as_str().unwrap_or("");
1506 if let Some(tracks) = list[index]["tracks"].as_array_mut() {
1507 tracks.retain(|entry| entry["kind"].as_str() != Some(kind));
1508 }
1509 apply_track_to_state(&mut list[index]["state"], track, false);
1510 }
1511
1512 fn upsert_media_state(&self, member: &Value, state: Value) {
1513 let index = self.ensure_media_member(member);
1514 let mut media_members = self.media_members.write().unwrap();
1515 let list = media_members.as_array_mut().expect("media members array");
1516 list[index]["state"] = state;
1517 }
1518
1519 fn apply_media_device_change(&self, member: &Value, change: &Value) {
1520 let index = self.ensure_media_member(member);
1521 let mut media_members = self.media_members.write().unwrap();
1522 let list = media_members.as_array_mut().expect("media members array");
1523 let kind = change["kind"].as_str().unwrap_or("");
1524 let device_id = change["deviceId"].clone();
1525 if let Some(state) = list[index]["state"].as_object_mut() {
1526 let kind_state = state.entry(kind.to_string()).or_insert_with(|| json!({}));
1527 if let Some(kind_state_map) = kind_state.as_object_mut() {
1528 kind_state_map.insert("deviceId".to_string(), device_id);
1529 }
1530 }
1531 }
1532
1533 fn next_handler_id(&self) -> u64 {
1534 let mut counter = self.handler_id_counter.lock().unwrap();
1535 *counter += 1;
1536 *counter
1537 }
1538}
1539
1540fn deep_set(obj: &mut Value, path: &str, value: Value) {
1543 if let Some(dot) = path.find('.') {
1544 let head = &path[..dot];
1545 let tail = &path[dot + 1..];
1546 if let Value::Object(map) = obj {
1547 let nested = map.entry(head.to_string()).or_insert(json!({}));
1548 deep_set(nested, tail, value);
1549 }
1550 } else if let Value::Object(map) = obj {
1551 if value.is_null() {
1552 map.remove(path);
1553 } else {
1554 map.insert(path.to_string(), value);
1555 }
1556 }
1557}
1558
1559fn object_or_empty(value: &Value) -> Value {
1560 if value.is_object() {
1561 value.clone()
1562 } else {
1563 json!({})
1564 }
1565}
1566
1567fn normalize_member(value: &Value) -> Option<Value> {
1568 let member_id = value["memberId"].as_str()?;
1569 let user_id = value["userId"].as_str()?;
1570 let mut member = json!({
1571 "memberId": member_id,
1572 "userId": user_id,
1573 "state": object_or_empty(&value["state"]),
1574 });
1575
1576 if let Some(connection_id) = value["connectionId"].as_str() {
1577 member["connectionId"] = Value::String(connection_id.to_string());
1578 }
1579 if let Some(connection_count) = value["connectionCount"].as_u64() {
1580 member["connectionCount"] = Value::from(connection_count);
1581 }
1582 if let Some(role) = value["role"].as_str() {
1583 member["role"] = Value::String(role.to_string());
1584 }
1585 Some(member)
1586}
1587
1588fn normalize_members(value: &Value) -> Value {
1589 Value::Array(
1590 value
1591 .as_array()
1592 .into_iter()
1593 .flatten()
1594 .filter_map(normalize_member)
1595 .collect(),
1596 )
1597}
1598
1599fn normalize_track(value: &Value) -> Option<Value> {
1600 let kind = value["kind"].as_str()?;
1601 let mut track = json!({
1602 "kind": kind,
1603 "muted": value["muted"].as_bool().unwrap_or(false),
1604 });
1605
1606 if let Some(track_id) = value["trackId"].as_str() {
1607 track["trackId"] = Value::String(track_id.to_string());
1608 }
1609 if let Some(device_id) = value["deviceId"].as_str() {
1610 track["deviceId"] = Value::String(device_id.to_string());
1611 }
1612 if let Some(published_at) = value["publishedAt"].as_u64() {
1613 track["publishedAt"] = Value::from(published_at);
1614 }
1615 if let Some(admin_disabled) = value["adminDisabled"].as_bool() {
1616 track["adminDisabled"] = Value::Bool(admin_disabled);
1617 }
1618 Some(track)
1619}
1620
1621fn normalize_tracks(value: &Value) -> Value {
1622 Value::Array(
1623 value
1624 .as_array()
1625 .into_iter()
1626 .flatten()
1627 .filter_map(normalize_track)
1628 .collect(),
1629 )
1630}
1631
1632fn normalize_media_members(value: &Value) -> Value {
1633 Value::Array(
1634 value
1635 .as_array()
1636 .into_iter()
1637 .flatten()
1638 .filter_map(|entry| {
1639 let member = normalize_member(&entry["member"])?;
1640 Some(json!({
1641 "member": member,
1642 "state": object_or_empty(&entry["state"]),
1643 "tracks": normalize_tracks(&entry["tracks"]),
1644 }))
1645 })
1646 .collect(),
1647 )
1648}
1649
1650fn apply_track_to_state(state: &mut Value, track: &Value, published: bool) {
1651 if !state.is_object() {
1652 *state = json!({});
1653 }
1654
1655 let kind = match track["kind"].as_str() {
1656 Some(kind) => kind,
1657 None => return,
1658 };
1659
1660 let state_map = state.as_object_mut().expect("state object");
1661 let kind_state = state_map
1662 .entry(kind.to_string())
1663 .or_insert_with(|| json!({}));
1664 let kind_state_map = kind_state.as_object_mut().expect("kind state object");
1665 kind_state_map.insert(
1666 "published".to_string(),
1667 Value::Bool(published),
1668 );
1669 kind_state_map.insert(
1670 "muted".to_string(),
1671 Value::Bool(track["muted"].as_bool().unwrap_or(false)),
1672 );
1673
1674 if published {
1675 if let Some(track_id) = track.get("trackId") {
1676 kind_state_map.insert("trackId".to_string(), track_id.clone());
1677 }
1678 if let Some(device_id) = track.get("deviceId") {
1679 kind_state_map.insert("deviceId".to_string(), device_id.clone());
1680 }
1681 if let Some(published_at) = track.get("publishedAt") {
1682 kind_state_map.insert("publishedAt".to_string(), published_at.clone());
1683 }
1684 if let Some(admin_disabled) = track.get("adminDisabled") {
1685 kind_state_map.insert("adminDisabled".to_string(), admin_disabled.clone());
1686 }
1687 } else {
1688 kind_state_map.remove("trackId");
1689 kind_state_map.remove("publishedAt");
1690 if let Some(admin_disabled) = track.get("adminDisabled") {
1691 kind_state_map.insert("adminDisabled".to_string(), admin_disabled.clone());
1692 }
1693 }
1694}
1695
1696pub struct RoomStateNamespace {
1697 client: Arc<RoomClient>,
1698}
1699
1700impl RoomStateNamespace {
1701 fn new(client: Arc<RoomClient>) -> Self {
1702 Self { client }
1703 }
1704
1705 pub fn get_shared(&self) -> Value {
1706 self.client.get_shared_state()
1707 }
1708
1709 pub fn get_mine(&self) -> Value {
1710 self.client.get_player_state()
1711 }
1712
1713 pub fn on_shared_change(
1714 &self,
1715 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1716 ) -> Subscription {
1717 self.client.on_shared_state(handler)
1718 }
1719
1720 pub fn on_mine_change(
1721 &self,
1722 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1723 ) -> Subscription {
1724 self.client.on_player_state(handler)
1725 }
1726
1727 pub async fn send(&self, action_type: &str, payload: Option<Value>) -> Result<Value, Error> {
1728 self.client.send(action_type, payload).await
1729 }
1730}
1731
1732pub struct RoomMetaNamespace {
1733 client: Arc<RoomClient>,
1734}
1735
1736impl RoomMetaNamespace {
1737 fn new(client: Arc<RoomClient>) -> Self {
1738 Self { client }
1739 }
1740
1741 pub async fn get(&self) -> Result<Value, Error> {
1742 self.client.get_metadata().await
1743 }
1744}
1745
1746pub struct RoomSignalsNamespace {
1747 client: Arc<RoomClient>,
1748}
1749
1750impl RoomSignalsNamespace {
1751 fn new(client: Arc<RoomClient>) -> Self {
1752 Self { client }
1753 }
1754
1755 pub async fn send(
1756 &self,
1757 event: &str,
1758 payload: Option<Value>,
1759 options: Option<Value>,
1760 ) -> Result<(), Error> {
1761 self.client.send_signal(event, payload, options).await
1762 }
1763
1764 pub async fn send_to(
1765 &self,
1766 member_id: &str,
1767 event: &str,
1768 payload: Option<Value>,
1769 ) -> Result<(), Error> {
1770 self.client
1771 .send_signal(event, payload, Some(json!({ "memberId": member_id })))
1772 .await
1773 }
1774
1775 pub fn on(
1776 &self,
1777 event: &str,
1778 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1779 ) -> Subscription {
1780 self.client.on_signal(event, handler)
1781 }
1782
1783 pub fn on_any(
1784 &self,
1785 handler: impl Fn(&str, &Value, &Value) + Send + Sync + 'static,
1786 ) -> Subscription {
1787 self.client.on_any_signal(handler)
1788 }
1789}
1790
1791pub struct RoomMembersNamespace {
1792 client: Arc<RoomClient>,
1793}
1794
1795impl RoomMembersNamespace {
1796 fn new(client: Arc<RoomClient>) -> Self {
1797 Self { client }
1798 }
1799
1800 pub fn list(&self) -> Value {
1801 self.client.list_members()
1802 }
1803
1804 pub fn on_sync(
1805 &self,
1806 handler: impl Fn(&Value) + Send + Sync + 'static,
1807 ) -> Subscription {
1808 self.client.on_members_sync(handler)
1809 }
1810
1811 pub fn on_join(
1812 &self,
1813 handler: impl Fn(&Value) + Send + Sync + 'static,
1814 ) -> Subscription {
1815 self.client.on_member_join(handler)
1816 }
1817
1818 pub fn on_leave(
1819 &self,
1820 handler: impl Fn(&Value, &str) + Send + Sync + 'static,
1821 ) -> Subscription {
1822 self.client.on_member_leave(handler)
1823 }
1824
1825 pub async fn set_state(&self, state: Value) -> Result<(), Error> {
1826 self.client.send_member_state(state).await
1827 }
1828
1829 pub async fn clear_state(&self) -> Result<(), Error> {
1830 self.client.clear_member_state().await
1831 }
1832
1833 pub fn on_state_change(
1834 &self,
1835 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1836 ) -> Subscription {
1837 self.client.on_member_state_change(handler)
1838 }
1839}
1840
1841pub struct RoomAdminNamespace {
1842 client: Arc<RoomClient>,
1843}
1844
1845impl RoomAdminNamespace {
1846 fn new(client: Arc<RoomClient>) -> Self {
1847 Self { client }
1848 }
1849
1850 pub async fn kick(&self, member_id: &str) -> Result<(), Error> {
1851 self.client.send_admin("kick", member_id, None).await
1852 }
1853
1854 pub async fn mute(&self, member_id: &str) -> Result<(), Error> {
1855 self.client.send_admin("mute", member_id, None).await
1856 }
1857
1858 pub async fn block(&self, member_id: &str) -> Result<(), Error> {
1859 self.client.send_admin("block", member_id, None).await
1860 }
1861
1862 pub async fn set_role(&self, member_id: &str, role: &str) -> Result<(), Error> {
1863 self.client
1864 .send_admin("setRole", member_id, Some(json!({ "role": role })))
1865 .await
1866 }
1867
1868 pub async fn disable_video(&self, member_id: &str) -> Result<(), Error> {
1869 self.client.send_admin("disableVideo", member_id, None).await
1870 }
1871
1872 pub async fn stop_screen_share(&self, member_id: &str) -> Result<(), Error> {
1873 self.client
1874 .send_admin("stopScreenShare", member_id, None)
1875 .await
1876 }
1877}
1878
1879pub struct RoomMediaKindNamespace {
1880 client: Arc<RoomClient>,
1881 kind: &'static str,
1882}
1883
1884impl RoomMediaKindNamespace {
1885 fn new(client: Arc<RoomClient>, kind: &'static str) -> Self {
1886 Self { client, kind }
1887 }
1888
1889 pub async fn enable(&self, payload: Option<Value>) -> Result<(), Error> {
1890 self.client.send_media("publish", self.kind, payload).await
1891 }
1892
1893 pub async fn disable(&self) -> Result<(), Error> {
1894 self.client.send_media("unpublish", self.kind, None).await
1895 }
1896
1897 pub async fn set_muted(&self, muted: bool) -> Result<(), Error> {
1898 self.client
1899 .send_media("mute", self.kind, Some(json!({ "muted": muted })))
1900 .await
1901 }
1902}
1903
1904pub struct RoomScreenMediaNamespace {
1905 client: Arc<RoomClient>,
1906}
1907
1908impl RoomScreenMediaNamespace {
1909 fn new(client: Arc<RoomClient>) -> Self {
1910 Self { client }
1911 }
1912
1913 pub async fn start(&self, payload: Option<Value>) -> Result<(), Error> {
1914 self.client.send_media("publish", "screen", payload).await
1915 }
1916
1917 pub async fn stop(&self) -> Result<(), Error> {
1918 self.client.send_media("unpublish", "screen", None).await
1919 }
1920}
1921
1922pub struct RoomMediaDevicesNamespace {
1923 client: Arc<RoomClient>,
1924}
1925
1926impl RoomMediaDevicesNamespace {
1927 fn new(client: Arc<RoomClient>) -> Self {
1928 Self { client }
1929 }
1930
1931 pub async fn switch_inputs(&self, payload: Value) -> Result<(), Error> {
1932 self.client.switch_media_devices(payload).await
1933 }
1934}
1935
1936pub struct RoomMediaNamespace {
1937 client: Arc<RoomClient>,
1938}
1939
1940impl RoomMediaNamespace {
1941 fn new(client: Arc<RoomClient>) -> Self {
1942 Self { client }
1943 }
1944
1945 pub fn list(&self) -> Value {
1946 self.client.list_media_members()
1947 }
1948
1949 pub fn audio(&self) -> RoomMediaKindNamespace {
1950 RoomMediaKindNamespace::new(Arc::clone(&self.client), "audio")
1951 }
1952
1953 pub fn video(&self) -> RoomMediaKindNamespace {
1954 RoomMediaKindNamespace::new(Arc::clone(&self.client), "video")
1955 }
1956
1957 pub fn screen(&self) -> RoomScreenMediaNamespace {
1958 RoomScreenMediaNamespace::new(Arc::clone(&self.client))
1959 }
1960
1961 pub fn devices(&self) -> RoomMediaDevicesNamespace {
1962 RoomMediaDevicesNamespace::new(Arc::clone(&self.client))
1963 }
1964
1965 pub fn on_track(
1966 &self,
1967 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1968 ) -> Subscription {
1969 self.client.on_media_track(handler)
1970 }
1971
1972 pub fn on_track_removed(
1973 &self,
1974 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1975 ) -> Subscription {
1976 self.client.on_media_track_removed(handler)
1977 }
1978
1979 pub fn on_state_change(
1980 &self,
1981 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1982 ) -> Subscription {
1983 self.client.on_media_state_change(handler)
1984 }
1985
1986 pub fn on_device_change(
1987 &self,
1988 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1989 ) -> Subscription {
1990 self.client.on_media_device_change(handler)
1991 }
1992}
1993
1994pub struct RoomSessionNamespace {
1995 client: Arc<RoomClient>,
1996}
1997
1998impl RoomSessionNamespace {
1999 fn new(client: Arc<RoomClient>) -> Self {
2000 Self { client }
2001 }
2002
2003 pub fn on_error(
2004 &self,
2005 handler: impl Fn(&str, &str) + Send + Sync + 'static,
2006 ) -> Subscription {
2007 self.client.on_error(handler)
2008 }
2009
2010 pub fn on_kicked(&self, handler: impl Fn() + Send + Sync + 'static) -> Subscription {
2011 self.client.on_kicked(handler)
2012 }
2013
2014 pub fn on_reconnect(
2015 &self,
2016 handler: impl Fn(&Value) + Send + Sync + 'static,
2017 ) -> Subscription {
2018 self.client.on_reconnect(handler)
2019 }
2020
2021 pub fn on_connection_state_change(
2022 &self,
2023 handler: impl Fn(&str) + Send + Sync + 'static,
2024 ) -> Subscription {
2025 self.client.on_connection_state_change(handler)
2026 }
2027
2028 pub fn connection_state(&self) -> String {
2029 self.client.connection_state()
2030 }
2031
2032 pub fn user_id(&self) -> Option<String> {
2033 self.client.current_user_id.lock().unwrap().clone()
2034 }
2035
2036 pub fn connection_id(&self) -> Option<String> {
2037 self.client.current_connection_id.lock().unwrap().clone()
2038 }
2039}