Skip to main content

edgebase_core/
room.rs

1// room.rs — RoomClient v2 for EdgeBase Rust SDK.
2//
3// Complete redesign from v1:
4//   - 2 client-visible state areas: sharedState (all clients), playerState (per-player)
5//   - Client can only read + subscribe + send(). All writes are server-only.
6//   - send() returns a Result resolved by requestId matching via oneshot channel
7//   - Subscription returns a Subscription struct with unsubscribe()
8//   - namespace + roomId identification (replaces single roomId)
9//
10// Usage:
11//   let room = RoomClient::new(&url, "game", "lobby-1", move || token.clone(), None);
12//   room.join().await?;
13//   let sub = room.on_shared_state(|state, changes| { ... });
14//   let result = room.send("SET_SCORE", Some(json!({"score": 42}))).await?;
15//   sub.unsubscribe();
16//   room.leave().await;
17
18use crate::error::Error;
19use serde_json::{json, Value};
20use std::collections::HashMap;
21use std::sync::{Arc, Mutex, RwLock};
22use tokio::sync::{mpsc, oneshot};
23use tokio::time::{sleep, timeout, Duration};
24use uuid::Uuid;
25
26// ── Public types ──────────────────────────────────────────────────────────────
27
28pub struct RoomOptions {
29    pub auto_reconnect: bool,
30    pub max_reconnect_attempts: u32,
31    pub reconnect_base_delay_ms: u64,
32    /// Timeout for send() requests in ms (default: 10000)
33    pub send_timeout_ms: u64,
34}
35
36impl Default for RoomOptions {
37    fn default() -> Self {
38        Self {
39            auto_reconnect: true,
40            max_reconnect_attempts: 10,
41            reconnect_base_delay_ms: 1000,
42            send_timeout_ms: 10_000,
43        }
44    }
45}
46
47// ── Subscription ──────────────────────────────────────────────────────────────
48
49/// Handle returned by on_* methods. Call `unsubscribe()` to remove the handler.
50pub struct Subscription {
51    remove_fn: Mutex<Option<Box<dyn FnOnce() + Send>>>,
52}
53
54impl Subscription {
55    fn new(remove_fn: impl FnOnce() + Send + 'static) -> Self {
56        Self {
57            remove_fn: Mutex::new(Some(Box::new(remove_fn))),
58        }
59    }
60
61    /// Remove the handler. Safe to call multiple times (subsequent calls are no-ops).
62    pub fn unsubscribe(self) {
63        if let Some(f) = self.remove_fn.lock().unwrap().take() {
64            f();
65        }
66    }
67}
68
69impl Drop for Subscription {
70    fn drop(&mut self) {
71        if let Some(f) = self.remove_fn.lock().unwrap().take() {
72            f();
73        }
74    }
75}
76
77// ── Handler type aliases (internal) ──────────────────────────────────────────
78
79type StateHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
80type MessageHandler = Arc<dyn Fn(&Value) + Send + Sync>;
81type ErrorHandler = Arc<dyn Fn(&str, &str) + Send + Sync>;
82type KickedHandler = Arc<dyn Fn() + Send + Sync>;
83type MembersSyncHandler = Arc<dyn Fn(&Value) + Send + Sync>;
84type MemberHandler = Arc<dyn Fn(&Value) + Send + Sync>;
85type MemberLeaveHandler = Arc<dyn Fn(&Value, &str) + Send + Sync>;
86type MemberStateHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
87type SignalHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
88type AnySignalHandler = Arc<dyn Fn(&str, &Value, &Value) + Send + Sync>;
89type MediaTrackHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
90type MediaStateHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
91type MediaDeviceHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
92type ReconnectHandler = Arc<dyn Fn(&Value) + Send + Sync>;
93type ConnectionStateHandler = Arc<dyn Fn(&str) + Send + Sync>;
94
95/// ID-tagged handler entry for removal by Subscription.
96type HandlerList<H> = Arc<Mutex<Vec<(u64, H)>>>;
97
98pub(crate) enum RoomWsCommand {
99    Send(String),
100    Close,
101}
102
103const ROOM_EXPLICIT_LEAVE_CLOSE_DELAY: Duration = Duration::from_millis(40);
104const ROOM_STATE_IDLE: &str = "idle";
105const ROOM_STATE_CONNECTING: &str = "connecting";
106const ROOM_STATE_CONNECTED: &str = "connected";
107const ROOM_STATE_RECONNECTING: &str = "reconnecting";
108const ROOM_STATE_DISCONNECTED: &str = "disconnected";
109const ROOM_STATE_KICKED: &str = "kicked";
110
111// ── RoomClient v2 ─────────────────────────────────────────────────────────────
112
113pub struct RoomClient {
114    /// Room namespace (e.g. "game", "chat")
115    pub namespace: String,
116    /// Room instance ID within the namespace
117    pub room_id: String,
118
119    // ── State ───
120    shared_state: RwLock<Value>,
121    shared_version: RwLock<u64>,
122    player_state: RwLock<Value>,
123    player_version: RwLock<u64>,
124    members: RwLock<Value>,
125    media_members: RwLock<Value>,
126    current_user_id: Mutex<Option<String>>,
127    current_connection_id: Mutex<Option<String>>,
128    connection_state: RwLock<String>,
129    reconnect_info: RwLock<Option<Value>>,
130
131    // ── Config ───
132    base_url: String,
133    token_fn: Box<dyn Fn() -> String + Send + Sync>,
134    opts: RoomOptions,
135
136    // ── Handlers (Arc-wrapped so Subscription closures can hold clones) ───
137    shared_state_handlers: HandlerList<StateHandler>,
138    player_state_handlers: HandlerList<StateHandler>,
139    message_handlers: Arc<Mutex<HashMap<String, Vec<(u64, MessageHandler)>>>>,
140    error_handlers: HandlerList<ErrorHandler>,
141    kicked_handlers: HandlerList<KickedHandler>,
142    member_sync_handlers: HandlerList<MembersSyncHandler>,
143    member_join_handlers: HandlerList<MemberHandler>,
144    member_leave_handlers: HandlerList<MemberLeaveHandler>,
145    member_state_handlers: HandlerList<MemberStateHandler>,
146    signal_handlers: Arc<Mutex<HashMap<String, Vec<(u64, SignalHandler)>>>>,
147    any_signal_handlers: HandlerList<AnySignalHandler>,
148    media_track_handlers: HandlerList<MediaTrackHandler>,
149    media_track_removed_handlers: HandlerList<MediaTrackHandler>,
150    media_state_handlers: HandlerList<MediaStateHandler>,
151    media_device_handlers: HandlerList<MediaDeviceHandler>,
152    reconnect_handlers: HandlerList<ReconnectHandler>,
153    connection_state_handlers: HandlerList<ConnectionStateHandler>,
154    handler_id_counter: Mutex<u64>,
155
156    // ── Pending send() requests (requestId → oneshot Sender) ───
157    pending_requests: Mutex<HashMap<String, oneshot::Sender<Result<Value, Error>>>>,
158    pending_signal_requests: Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
159    pending_admin_requests: Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
160    pending_member_state_requests: Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
161    pending_media_requests: Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
162
163    // ── WS send channel ───
164    send_tx: Mutex<Option<mpsc::Sender<RoomWsCommand>>>,
165
166    // ── Control ───
167    stop_tx: Mutex<Option<mpsc::Sender<()>>>,
168    intentionally_left: Mutex<bool>,
169}
170
171impl RoomClient {
172    /// Create a new v2 RoomClient.
173    ///
174    /// # Arguments
175    /// * `base_url` - EdgeBase server URL (http or https)
176    /// * `namespace` - Room namespace (e.g. "game", "chat")
177    /// * `room_id` - Room instance ID within the namespace
178    /// * `token_fn` - Closure that returns the current access token
179    /// * `opts` - Optional RoomOptions for reconnect and timeout configuration
180    pub fn new(
181        base_url: &str,
182        namespace: &str,
183        room_id: &str,
184        token_fn: impl Fn() -> String + Send + Sync + 'static,
185        opts: Option<RoomOptions>,
186    ) -> Arc<Self> {
187        Arc::new(Self {
188            namespace: namespace.to_string(),
189            room_id: room_id.to_string(),
190            shared_state: RwLock::new(json!({})),
191            shared_version: RwLock::new(0),
192            player_state: RwLock::new(json!({})),
193            player_version: RwLock::new(0),
194            members: RwLock::new(json!([])),
195            media_members: RwLock::new(json!([])),
196            current_user_id: Mutex::new(None),
197            current_connection_id: Mutex::new(None),
198            connection_state: RwLock::new(ROOM_STATE_IDLE.to_string()),
199            reconnect_info: RwLock::new(None),
200            base_url: base_url.trim_end_matches('/').to_string(),
201            token_fn: Box::new(token_fn),
202            opts: opts.unwrap_or_default(),
203            shared_state_handlers: Arc::new(Mutex::new(vec![])),
204            player_state_handlers: Arc::new(Mutex::new(vec![])),
205            message_handlers: Arc::new(Mutex::new(HashMap::new())),
206            error_handlers: Arc::new(Mutex::new(vec![])),
207            kicked_handlers: Arc::new(Mutex::new(vec![])),
208            member_sync_handlers: Arc::new(Mutex::new(vec![])),
209            member_join_handlers: Arc::new(Mutex::new(vec![])),
210            member_leave_handlers: Arc::new(Mutex::new(vec![])),
211            member_state_handlers: Arc::new(Mutex::new(vec![])),
212            signal_handlers: Arc::new(Mutex::new(HashMap::new())),
213            any_signal_handlers: Arc::new(Mutex::new(vec![])),
214            media_track_handlers: Arc::new(Mutex::new(vec![])),
215            media_track_removed_handlers: Arc::new(Mutex::new(vec![])),
216            media_state_handlers: Arc::new(Mutex::new(vec![])),
217            media_device_handlers: Arc::new(Mutex::new(vec![])),
218            reconnect_handlers: Arc::new(Mutex::new(vec![])),
219            connection_state_handlers: Arc::new(Mutex::new(vec![])),
220            handler_id_counter: Mutex::new(0),
221            pending_requests: Mutex::new(HashMap::new()),
222            pending_signal_requests: Mutex::new(HashMap::new()),
223            pending_admin_requests: Mutex::new(HashMap::new()),
224            pending_member_state_requests: Mutex::new(HashMap::new()),
225            pending_media_requests: Mutex::new(HashMap::new()),
226            send_tx: Mutex::new(None),
227            stop_tx: Mutex::new(None),
228            intentionally_left: Mutex::new(false),
229        })
230    }
231
232    // ── State Accessors ──────────────────────────────────────────────────────
233
234    /// Get current shared state (read-only snapshot).
235    pub fn get_shared_state(&self) -> Value {
236        self.shared_state.read().unwrap().clone()
237    }
238
239    /// Get current player state (read-only snapshot).
240    pub fn get_player_state(&self) -> Value {
241        self.player_state.read().unwrap().clone()
242    }
243
244    /// Get the current logical room members snapshot.
245    pub fn list_members(&self) -> Value {
246        self.members.read().unwrap().clone()
247    }
248
249    /// Get the current media member snapshot.
250    pub fn list_media_members(&self) -> Value {
251        self.media_members.read().unwrap().clone()
252    }
253
254    /// Get the current session connection state.
255    pub fn connection_state(&self) -> String {
256        self.connection_state.read().unwrap().clone()
257    }
258
259    pub fn state(self: &Arc<Self>) -> RoomStateNamespace {
260        RoomStateNamespace::new(Arc::clone(self))
261    }
262
263    pub fn meta(self: &Arc<Self>) -> RoomMetaNamespace {
264        RoomMetaNamespace::new(Arc::clone(self))
265    }
266
267    pub fn signals(self: &Arc<Self>) -> RoomSignalsNamespace {
268        RoomSignalsNamespace::new(Arc::clone(self))
269    }
270
271    pub fn members(self: &Arc<Self>) -> RoomMembersNamespace {
272        RoomMembersNamespace::new(Arc::clone(self))
273    }
274
275    pub fn admin(self: &Arc<Self>) -> RoomAdminNamespace {
276        RoomAdminNamespace::new(Arc::clone(self))
277    }
278
279    pub fn media(self: &Arc<Self>) -> RoomMediaNamespace {
280        RoomMediaNamespace::new(Arc::clone(self))
281    }
282
283    pub fn session(self: &Arc<Self>) -> RoomSessionNamespace {
284        RoomSessionNamespace::new(Arc::clone(self))
285    }
286
287    // ── Metadata (HTTP, no WebSocket needed) ─────────────────────────────────
288
289    /// Get room metadata without joining (HTTP GET).
290    /// Returns developer-defined metadata set by room.setMetadata() on the server.
291    pub async fn get_metadata(&self) -> Result<Value, Error> {
292        Self::get_metadata_static(&self.base_url, &self.namespace, &self.room_id).await
293    }
294
295    /// Static: Get room metadata without creating a RoomClient instance.
296    /// Useful for lobby screens where you need room info before joining.
297    pub async fn get_metadata_static(
298        base_url: &str,
299        namespace: &str,
300        room_id: &str,
301    ) -> Result<Value, Error> {
302        let url = format!(
303            "{}/api/room/metadata?namespace={}&id={}",
304            base_url.trim_end_matches('/'),
305            urlencoding::encode(namespace),
306            urlencoding::encode(room_id)
307        );
308        let resp = reqwest::get(&url)
309            .await
310            .map_err(|e| Error::Room(format!("Failed to get room metadata: {}", e)))?;
311        if !resp.status().is_success() {
312            return Err(Error::Room(format!(
313                "Failed to get room metadata: {}",
314                resp.status()
315            )));
316        }
317        let body = resp
318            .text()
319            .await
320            .map_err(|e| Error::Room(format!("Failed to read room metadata body: {}", e)))?;
321        serde_json::from_str(&body)
322            .map_err(|e| Error::Room(format!("Failed to parse room metadata: {}", e)))
323    }
324
325    // ── Connection Lifecycle ─────────────────────────────────────────────────
326
327    /// Connect to the room, authenticate, and join.
328    pub async fn join(self: &Arc<Self>) -> Result<(), Error> {
329        *self.intentionally_left.lock().unwrap() = false;
330        self.set_connection_state(ROOM_STATE_CONNECTING);
331
332        let (stop_tx, mut stop_rx) = mpsc::channel::<()>(1);
333        *self.stop_tx.lock().unwrap() = Some(stop_tx);
334
335        let this = Arc::clone(self);
336        tokio::spawn(async move {
337            let mut attempts = 0u32;
338            loop {
339                match this.establish().await {
340                    Ok(mut ws_rx) => {
341                        attempts = 0;
342                        loop {
343                            tokio::select! {
344                                _ = stop_rx.recv() => return,
345                                msg = ws_rx.recv() => {
346                                    match msg {
347                                        Some(raw) => this.handle_message(&raw),
348                                        None => break, // WS closed
349                                    }
350                                }
351                            }
352                        }
353                    }
354                    Err(_) => {}
355                }
356                if *this.intentionally_left.lock().unwrap() {
357                    return;
358                }
359                if !this.opts.auto_reconnect || attempts >= this.opts.max_reconnect_attempts {
360                    this.set_connection_state(ROOM_STATE_DISCONNECTED);
361                    return;
362                }
363                let delay = (this.opts.reconnect_base_delay_ms * (1u64 << attempts)).min(30_000);
364                attempts += 1;
365                this.begin_reconnect_attempt(attempts as u64);
366                sleep(Duration::from_millis(delay)).await;
367            }
368        });
369        Ok(())
370    }
371
372    /// Leave the room and disconnect. Cleans up all pending send() requests.
373    pub async fn leave(&self) {
374        *self.intentionally_left.lock().unwrap() = true;
375
376        let send_tx = self.send_tx.lock().unwrap().clone();
377        if let Some(tx) = send_tx.as_ref() {
378            let _ = tx
379                .send(RoomWsCommand::Send(
380                    json!({"type": "leave"}).to_string(),
381                ))
382                .await;
383            sleep(ROOM_EXPLICIT_LEAVE_CLOSE_DELAY).await;
384            let _ = tx.send(RoomWsCommand::Close).await;
385        }
386
387        if let Some(tx) = self.stop_tx.lock().unwrap().take() {
388            let _ = tx.send(()).await;
389        }
390
391        *self.send_tx.lock().unwrap() = None;
392
393        // Drop all pending send() requests — senders are dropped, receivers get RecvError
394        self.pending_requests.lock().unwrap().clear();
395
396        // Reset state
397        *self.shared_state.write().unwrap() = json!({});
398        *self.shared_version.write().unwrap() = 0;
399        *self.player_state.write().unwrap() = json!({});
400        *self.player_version.write().unwrap() = 0;
401        *self.members.write().unwrap() = json!([]);
402        *self.media_members.write().unwrap() = json!([]);
403        *self.current_user_id.lock().unwrap() = None;
404        *self.current_connection_id.lock().unwrap() = None;
405        *self.reconnect_info.write().unwrap() = None;
406        self.pending_signal_requests.lock().unwrap().clear();
407        self.pending_admin_requests.lock().unwrap().clear();
408        self.pending_member_state_requests.lock().unwrap().clear();
409        self.pending_media_requests.lock().unwrap().clear();
410        self.set_connection_state(ROOM_STATE_IDLE);
411    }
412
413    // ── Actions ──────────────────────────────────────────────────────────────
414
415    /// Send an action to the server.
416    /// Returns a Result that resolves with the action result from the server.
417    ///
418    /// # Example
419    /// ```ignore
420    /// let result = room.send("SET_SCORE", Some(json!({"score": 42}))).await?;
421    /// ```
422    pub async fn send(&self, action_type: &str, payload: Option<Value>) -> Result<Value, Error> {
423        if self.send_tx.lock().unwrap().is_none() {
424            return Err(Error::Room("Not connected to room".to_string()));
425        }
426
427        let request_id = Uuid::new_v4().to_string();
428        let (tx, rx) = oneshot::channel::<Result<Value, Error>>();
429
430        self.pending_requests
431            .lock()
432            .unwrap()
433            .insert(request_id.clone(), tx);
434
435        self.ws_send(json!({
436            "type": "send",
437            "actionType": action_type,
438            "payload": payload.unwrap_or(json!({})),
439            "requestId": request_id,
440        }));
441
442        let timeout_ms = self.opts.send_timeout_ms;
443        let action_type_owned = action_type.to_string();
444        let req_id = request_id.clone();
445
446        match timeout(Duration::from_millis(timeout_ms), rx).await {
447            Ok(Ok(result)) => result,
448            Ok(Err(_)) => {
449                // oneshot channel closed (room left or sender dropped)
450                self.pending_requests.lock().unwrap().remove(&req_id);
451                Err(Error::Room(
452                    "Room left while waiting for action result".to_string(),
453                ))
454            }
455            Err(_) => {
456                // Timeout
457                self.pending_requests.lock().unwrap().remove(&req_id);
458                Err(Error::RoomTimeout(format!(
459                    "Action '{}' timed out",
460                    action_type_owned
461                )))
462            }
463        }
464    }
465
466    // ── Subscriptions (v2 API) ───────────────────────────────────────────────
467
468    /// Subscribe to shared state changes.
469    /// Handler receives (full_state, changes) on each sync/delta.
470    pub fn on_shared_state(
471        &self,
472        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
473    ) -> Subscription {
474        let id = self.next_handler_id();
475        let handler = Arc::new(handler) as StateHandler;
476        self.shared_state_handlers.lock().unwrap().push((id, handler));
477
478        let list = Arc::clone(&self.shared_state_handlers);
479        Subscription::new(move || {
480            list.lock().unwrap().retain(|(hid, _)| *hid != id);
481        })
482    }
483
484    /// Subscribe to player state changes.
485    /// Handler receives (full_state, changes) on each sync/delta.
486    pub fn on_player_state(
487        &self,
488        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
489    ) -> Subscription {
490        let id = self.next_handler_id();
491        let handler = Arc::new(handler) as StateHandler;
492        self.player_state_handlers.lock().unwrap().push((id, handler));
493
494        let list = Arc::clone(&self.player_state_handlers);
495        Subscription::new(move || {
496            list.lock().unwrap().retain(|(hid, _)| *hid != id);
497        })
498    }
499
500    /// Subscribe to messages of a specific type sent by room.sendMessage().
501    ///
502    /// # Example
503    /// ```ignore
504    /// let sub = room.on_message("game_over", |data| { println!("{:?}", data); });
505    /// ```
506    pub fn on_message(
507        &self,
508        msg_type: &str,
509        handler: impl Fn(&Value) + Send + Sync + 'static,
510    ) -> Subscription {
511        let id = self.next_handler_id();
512        let handler = Arc::new(handler) as MessageHandler;
513        let msg_type = msg_type.to_string();
514        {
515            let mut map = self.message_handlers.lock().unwrap();
516            map.entry(msg_type.clone())
517                .or_insert_with(Vec::new)
518                .push((id, handler));
519        }
520
521        let map = Arc::clone(&self.message_handlers);
522        Subscription::new(move || {
523            if let Some(list) = map.lock().unwrap().get_mut(&msg_type) {
524                list.retain(|(hid, _)| *hid != id);
525            }
526        })
527    }
528
529    /// Subscribe to error events.
530    pub fn on_error(
531        &self,
532        handler: impl Fn(&str, &str) + Send + Sync + 'static,
533    ) -> Subscription {
534        let id = self.next_handler_id();
535        let handler = Arc::new(handler) as ErrorHandler;
536        self.error_handlers.lock().unwrap().push((id, handler));
537
538        let list = Arc::clone(&self.error_handlers);
539        Subscription::new(move || {
540            list.lock().unwrap().retain(|(hid, _)| *hid != id);
541        })
542    }
543
544    /// Subscribe to kick events. After being kicked, auto-reconnect is disabled.
545    pub fn on_kicked(&self, handler: impl Fn() + Send + Sync + 'static) -> Subscription {
546        let id = self.next_handler_id();
547        let handler = Arc::new(handler) as KickedHandler;
548        self.kicked_handlers.lock().unwrap().push((id, handler));
549
550        let list = Arc::clone(&self.kicked_handlers);
551        Subscription::new(move || {
552            list.lock().unwrap().retain(|(hid, _)| *hid != id);
553        })
554    }
555
556    pub fn on_members_sync(
557        &self,
558        handler: impl Fn(&Value) + Send + Sync + 'static,
559    ) -> Subscription {
560        let id = self.next_handler_id();
561        let handler = Arc::new(handler) as MembersSyncHandler;
562        self.member_sync_handlers.lock().unwrap().push((id, handler));
563
564        let list = Arc::clone(&self.member_sync_handlers);
565        Subscription::new(move || {
566            list.lock().unwrap().retain(|(hid, _)| *hid != id);
567        })
568    }
569
570    pub fn on_member_join(
571        &self,
572        handler: impl Fn(&Value) + Send + Sync + 'static,
573    ) -> Subscription {
574        let id = self.next_handler_id();
575        let handler = Arc::new(handler) as MemberHandler;
576        self.member_join_handlers.lock().unwrap().push((id, handler));
577
578        let list = Arc::clone(&self.member_join_handlers);
579        Subscription::new(move || {
580            list.lock().unwrap().retain(|(hid, _)| *hid != id);
581        })
582    }
583
584    pub fn on_member_leave(
585        &self,
586        handler: impl Fn(&Value, &str) + Send + Sync + 'static,
587    ) -> Subscription {
588        let id = self.next_handler_id();
589        let handler = Arc::new(handler) as MemberLeaveHandler;
590        self.member_leave_handlers.lock().unwrap().push((id, handler));
591
592        let list = Arc::clone(&self.member_leave_handlers);
593        Subscription::new(move || {
594            list.lock().unwrap().retain(|(hid, _)| *hid != id);
595        })
596    }
597
598    pub fn on_member_state_change(
599        &self,
600        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
601    ) -> Subscription {
602        let id = self.next_handler_id();
603        let handler = Arc::new(handler) as MemberStateHandler;
604        self.member_state_handlers.lock().unwrap().push((id, handler));
605
606        let list = Arc::clone(&self.member_state_handlers);
607        Subscription::new(move || {
608            list.lock().unwrap().retain(|(hid, _)| *hid != id);
609        })
610    }
611
612    pub fn on_signal(
613        &self,
614        event: &str,
615        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
616    ) -> Subscription {
617        let id = self.next_handler_id();
618        let handler = Arc::new(handler) as SignalHandler;
619        let event = event.to_string();
620        {
621            let mut map = self.signal_handlers.lock().unwrap();
622            map.entry(event.clone())
623                .or_insert_with(Vec::new)
624                .push((id, handler));
625        }
626
627        let map = Arc::clone(&self.signal_handlers);
628        Subscription::new(move || {
629            if let Some(list) = map.lock().unwrap().get_mut(&event) {
630                list.retain(|(hid, _)| *hid != id);
631            }
632        })
633    }
634
635    pub fn on_any_signal(
636        &self,
637        handler: impl Fn(&str, &Value, &Value) + Send + Sync + 'static,
638    ) -> Subscription {
639        let id = self.next_handler_id();
640        let handler = Arc::new(handler) as AnySignalHandler;
641        self.any_signal_handlers.lock().unwrap().push((id, handler));
642
643        let list = Arc::clone(&self.any_signal_handlers);
644        Subscription::new(move || {
645            list.lock().unwrap().retain(|(hid, _)| *hid != id);
646        })
647    }
648
649    pub fn on_media_track(
650        &self,
651        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
652    ) -> Subscription {
653        let id = self.next_handler_id();
654        let handler = Arc::new(handler) as MediaTrackHandler;
655        self.media_track_handlers.lock().unwrap().push((id, handler));
656
657        let list = Arc::clone(&self.media_track_handlers);
658        Subscription::new(move || {
659            list.lock().unwrap().retain(|(hid, _)| *hid != id);
660        })
661    }
662
663    pub fn on_media_track_removed(
664        &self,
665        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
666    ) -> Subscription {
667        let id = self.next_handler_id();
668        let handler = Arc::new(handler) as MediaTrackHandler;
669        self.media_track_removed_handlers
670            .lock()
671            .unwrap()
672            .push((id, handler));
673
674        let list = Arc::clone(&self.media_track_removed_handlers);
675        Subscription::new(move || {
676            list.lock().unwrap().retain(|(hid, _)| *hid != id);
677        })
678    }
679
680    pub fn on_media_state_change(
681        &self,
682        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
683    ) -> Subscription {
684        let id = self.next_handler_id();
685        let handler = Arc::new(handler) as MediaStateHandler;
686        self.media_state_handlers.lock().unwrap().push((id, handler));
687
688        let list = Arc::clone(&self.media_state_handlers);
689        Subscription::new(move || {
690            list.lock().unwrap().retain(|(hid, _)| *hid != id);
691        })
692    }
693
694    pub fn on_media_device_change(
695        &self,
696        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
697    ) -> Subscription {
698        let id = self.next_handler_id();
699        let handler = Arc::new(handler) as MediaDeviceHandler;
700        self.media_device_handlers.lock().unwrap().push((id, handler));
701
702        let list = Arc::clone(&self.media_device_handlers);
703        Subscription::new(move || {
704            list.lock().unwrap().retain(|(hid, _)| *hid != id);
705        })
706    }
707
708    pub fn on_reconnect(
709        &self,
710        handler: impl Fn(&Value) + Send + Sync + 'static,
711    ) -> Subscription {
712        let id = self.next_handler_id();
713        let handler = Arc::new(handler) as ReconnectHandler;
714        self.reconnect_handlers.lock().unwrap().push((id, handler));
715
716        let list = Arc::clone(&self.reconnect_handlers);
717        Subscription::new(move || {
718            list.lock().unwrap().retain(|(hid, _)| *hid != id);
719        })
720    }
721
722    pub fn on_connection_state_change(
723        &self,
724        handler: impl Fn(&str) + Send + Sync + 'static,
725    ) -> Subscription {
726        let id = self.next_handler_id();
727        let handler = Arc::new(handler) as ConnectionStateHandler;
728        self.connection_state_handlers
729            .lock()
730            .unwrap()
731            .push((id, handler));
732
733        let list = Arc::clone(&self.connection_state_handlers);
734        Subscription::new(move || {
735            list.lock().unwrap().retain(|(hid, _)| *hid != id);
736        })
737    }
738
739    pub async fn send_signal(
740        &self,
741        event: &str,
742        payload: Option<Value>,
743        options: Option<Value>,
744    ) -> Result<(), Error> {
745        if self.send_tx.lock().unwrap().is_none() {
746            return Err(Error::Room("Not connected to room".to_string()));
747        }
748
749        let request_id = Uuid::new_v4().to_string();
750        let options = options.unwrap_or_else(|| json!({}));
751        let message = json!({
752            "type": "signal",
753            "event": event,
754            "payload": payload.unwrap_or_else(|| json!({})),
755            "requestId": request_id,
756            "memberId": options.get("memberId").cloned().unwrap_or(Value::Null),
757            "includeSelf": options.get("includeSelf").and_then(Value::as_bool).unwrap_or(false),
758        });
759
760        self.send_unit_request(&self.pending_signal_requests, request_id, message, format!("Signal '{}' timed out", event)).await
761    }
762
763    pub async fn send_member_state(&self, state: Value) -> Result<(), Error> {
764        if self.send_tx.lock().unwrap().is_none() {
765            return Err(Error::Room("Not connected to room".to_string()));
766        }
767
768        let request_id = Uuid::new_v4().to_string();
769        let message = json!({
770            "type": "member_state",
771            "state": state,
772            "requestId": request_id,
773        });
774
775        self.send_unit_request(
776            &self.pending_member_state_requests,
777            request_id,
778            message,
779            "Member state update timed out".to_string(),
780        ).await
781    }
782
783    pub async fn clear_member_state(&self) -> Result<(), Error> {
784        if self.send_tx.lock().unwrap().is_none() {
785            return Err(Error::Room("Not connected to room".to_string()));
786        }
787
788        let request_id = Uuid::new_v4().to_string();
789        let message = json!({
790            "type": "member_state_clear",
791            "requestId": request_id,
792        });
793
794        self.send_unit_request(
795            &self.pending_member_state_requests,
796            request_id,
797            message,
798            "Member state clear timed out".to_string(),
799        ).await
800    }
801
802    pub async fn send_admin(
803        &self,
804        operation: &str,
805        member_id: &str,
806        payload: Option<Value>,
807    ) -> Result<(), Error> {
808        if self.send_tx.lock().unwrap().is_none() {
809            return Err(Error::Room("Not connected to room".to_string()));
810        }
811
812        let request_id = Uuid::new_v4().to_string();
813        let message = json!({
814            "type": "admin",
815            "operation": operation,
816            "memberId": member_id,
817            "payload": payload.unwrap_or_else(|| json!({})),
818            "requestId": request_id,
819        });
820
821        self.send_unit_request(
822            &self.pending_admin_requests,
823            request_id,
824            message,
825            format!("Admin '{}' timed out", operation),
826        ).await
827    }
828
829    pub async fn send_media(
830        &self,
831        operation: &str,
832        kind: &str,
833        payload: Option<Value>,
834    ) -> Result<(), Error> {
835        if self.send_tx.lock().unwrap().is_none() {
836            return Err(Error::Room("Not connected to room".to_string()));
837        }
838
839        let request_id = Uuid::new_v4().to_string();
840        let message = json!({
841            "type": "media",
842            "operation": operation,
843            "kind": kind,
844            "payload": payload.unwrap_or_else(|| json!({})),
845            "requestId": request_id,
846        });
847
848        self.send_unit_request(
849            &self.pending_media_requests,
850            request_id,
851            message,
852            format!("Media '{}:{}' timed out", operation, kind),
853        ).await
854    }
855
856    pub async fn switch_media_devices(&self, payload: Value) -> Result<(), Error> {
857        if let Some(device_id) = payload.get("audioInputId").and_then(Value::as_str) {
858            self.send_media("device", "audio", Some(json!({ "deviceId": device_id }))).await?;
859        }
860        if let Some(device_id) = payload.get("videoInputId").and_then(Value::as_str) {
861            self.send_media("device", "video", Some(json!({ "deviceId": device_id }))).await?;
862        }
863        if let Some(device_id) = payload.get("screenInputId").and_then(Value::as_str) {
864            self.send_media("device", "screen", Some(json!({ "deviceId": device_id }))).await?;
865        }
866        Ok(())
867    }
868
869    // ── Private: Connection ──────────────────────────────────────────────────
870
871    fn ws_url(&self) -> String {
872        let u = self
873            .base_url
874            .replace("https://", "wss://")
875            .replace("http://", "ws://");
876        format!(
877            "{}/api/room?namespace={}&id={}",
878            u,
879            urlencoding::encode(&self.namespace),
880            urlencoding::encode(&self.room_id)
881        )
882    }
883
884    /// Send a raw JSON message over the WS write channel.
885    fn ws_send(&self, msg: Value) {
886        let s = msg.to_string();
887        if let Some(tx) = self.send_tx.lock().unwrap().as_ref() {
888            let tx = tx.clone();
889            tokio::spawn(async move {
890                let _ = tx.send(RoomWsCommand::Send(s)).await;
891            });
892        }
893    }
894
895    #[cfg(test)]
896    pub(crate) fn attach_send_channel_for_testing(&self, tx: mpsc::Sender<RoomWsCommand>) {
897        *self.send_tx.lock().unwrap() = Some(tx);
898    }
899
900    #[cfg(test)]
901    pub(crate) fn handle_message_for_testing(&self, raw: &str) {
902        self.handle_message(raw);
903    }
904
905    async fn establish(&self) -> anyhow::Result<mpsc::Receiver<String>> {
906        use futures_util::{SinkExt, StreamExt};
907        use tokio_tungstenite::{connect_async, tungstenite::Message};
908
909        let (ws_stream, _response) = connect_async(self.ws_url()).await?;
910        let (mut write, mut read) = ws_stream.split();
911
912        // Auth
913        let auth = json!({"type": "auth", "token": (self.token_fn)(), "sdkVersion": "0.1.3"});
914        write.send(Message::Text(auth.to_string().into())).await?;
915
916        // Wait for auth_success
917        let join_raw = if let Some(Ok(Message::Text(raw))) = read.next().await {
918            let raw_str: &str = &raw;
919            let resp: Value = serde_json::from_str(raw_str)?;
920            let t = resp["type"].as_str().unwrap_or("");
921            if t != "auth_success" && t != "auth_refreshed" {
922                anyhow::bail!("Room auth failed: {}", resp["message"]);
923            }
924
925            *self.current_user_id.lock().unwrap() = resp["userId"].as_str().map(|value| value.to_string());
926            *self.current_connection_id.lock().unwrap() = resp["connectionId"].as_str().map(|value| value.to_string());
927
928            // v2: join with last known shared + player state for eviction recovery
929            let join_msg = json!({
930                "type": "join",
931                "lastSharedState": *self.shared_state.read().unwrap(),
932                "lastSharedVersion": *self.shared_version.read().unwrap(),
933                "lastPlayerState": *self.player_state.read().unwrap(),
934                "lastPlayerVersion": *self.player_version.read().unwrap(),
935            });
936            join_msg.to_string()
937        } else {
938            anyhow::bail!("Room: no auth response");
939        };
940
941        // Shared WS write channel
942        let (write_tx, mut write_rx) = mpsc::channel::<RoomWsCommand>(128);
943        *self.send_tx.lock().unwrap() = Some(write_tx.clone());
944
945        // WS writer task
946        tokio::spawn(async move {
947            while let Some(command) = write_rx.recv().await {
948                match command {
949                    RoomWsCommand::Send(msg) => {
950                        if write.send(Message::Text(msg.into())).await.is_err() {
951                            break;
952                        }
953                    }
954                    RoomWsCommand::Close => {
955                        let _ = write.send(Message::Close(None)).await;
956                        break;
957                    }
958                }
959            }
960        });
961
962        // Send join message
963        let _ = write_tx.send(RoomWsCommand::Send(join_raw)).await;
964
965        // Heartbeat
966        let htx = write_tx.clone();
967        tokio::spawn(async move {
968            loop {
969                sleep(Duration::from_secs(30)).await;
970                if htx
971                    .send(RoomWsCommand::Send(json!({"type":"ping"}).to_string()))
972                    .await
973                    .is_err()
974                {
975                    break;
976                }
977            }
978        });
979
980        // WS reader → msg_rx channel consumed by join() loop
981        let (msg_tx, msg_rx) = mpsc::channel::<String>(128);
982        tokio::spawn(async move {
983            while let Some(Ok(Message::Text(raw))) = read.next().await {
984                let raw_str: String = raw.into();
985                if msg_tx.send(raw_str).await.is_err() {
986                    break;
987                }
988            }
989        });
990
991        Ok(msg_rx)
992    }
993
994    // ── Private: Message Handling ────────────────────────────────────────────
995
996    fn handle_message(&self, raw: &str) {
997        let msg: Value = match serde_json::from_str(raw) {
998            Ok(v) => v,
999            Err(_) => return,
1000        };
1001        let t = msg["type"].as_str().unwrap_or("");
1002
1003        match t {
1004            "sync" => self.handle_sync(&msg),
1005            "shared_delta" => self.handle_shared_delta(&msg),
1006            "player_delta" => self.handle_player_delta(&msg),
1007            "action_result" => self.handle_action_result(&msg),
1008            "action_error" => self.handle_action_error(&msg),
1009            "message" => self.handle_server_message(&msg),
1010            "signal" => self.handle_signal(&msg),
1011            "signal_sent" => self.resolve_pending_unit_request(&self.pending_signal_requests, &msg),
1012            "signal_error" => self.reject_pending_unit_request(&self.pending_signal_requests, &msg, "Signal error"),
1013            "members_sync" => self.handle_members_sync(&msg),
1014            "member_join" => self.handle_member_join(&msg),
1015            "member_leave" => self.handle_member_leave(&msg),
1016            "member_state" => self.handle_member_state(&msg),
1017            "member_state_error" => self.reject_pending_unit_request(&self.pending_member_state_requests, &msg, "Member state error"),
1018            "admin_result" => self.resolve_pending_unit_request(&self.pending_admin_requests, &msg),
1019            "admin_error" => self.reject_pending_unit_request(&self.pending_admin_requests, &msg, "Admin error"),
1020            "media_sync" => self.handle_media_sync(&msg),
1021            "media_track" => self.handle_media_track(&msg),
1022            "media_track_removed" => self.handle_media_track_removed(&msg),
1023            "media_state" => self.handle_media_state(&msg),
1024            "media_device" => self.handle_media_device(&msg),
1025            "media_result" => self.resolve_pending_unit_request(&self.pending_media_requests, &msg),
1026            "media_error" => self.reject_pending_unit_request(&self.pending_media_requests, &msg, "Media error"),
1027            "kicked" => self.handle_kicked(),
1028            "error" => self.handle_error(&msg),
1029            "pong" => {}
1030            _ => {}
1031        }
1032    }
1033
1034    fn handle_sync(&self, msg: &Value) {
1035        *self.shared_state.write().unwrap() = msg["sharedState"].clone();
1036        *self.shared_version.write().unwrap() = msg["sharedVersion"].as_u64().unwrap_or(0);
1037        *self.player_state.write().unwrap() = msg["playerState"].clone();
1038        *self.player_version.write().unwrap() = msg["playerVersion"].as_u64().unwrap_or(0);
1039        self.set_connection_state(ROOM_STATE_CONNECTED);
1040
1041        if let Some(info) = self.reconnect_info.write().unwrap().take() {
1042            for (_, handler) in self.reconnect_handlers.lock().unwrap().iter() {
1043                handler(&info);
1044            }
1045        }
1046
1047        // Notify shared state handlers (full state as changes on sync)
1048        let shared = self.shared_state.read().unwrap().clone();
1049        for (_, handler) in self.shared_state_handlers.lock().unwrap().iter() {
1050            handler(&shared, &shared);
1051        }
1052        let player = self.player_state.read().unwrap().clone();
1053        for (_, handler) in self.player_state_handlers.lock().unwrap().iter() {
1054            handler(&player, &player);
1055        }
1056    }
1057
1058    fn handle_shared_delta(&self, msg: &Value) {
1059        let delta = &msg["delta"];
1060        *self.shared_version.write().unwrap() = msg["version"].as_u64().unwrap_or(0);
1061
1062        if let Value::Object(map) = delta {
1063            let mut state = self.shared_state.write().unwrap();
1064            for (path, value) in map {
1065                deep_set(&mut state, path, value.clone());
1066            }
1067        }
1068
1069        let state = self.shared_state.read().unwrap().clone();
1070        for (_, handler) in self.shared_state_handlers.lock().unwrap().iter() {
1071            handler(&state, delta);
1072        }
1073    }
1074
1075    fn handle_player_delta(&self, msg: &Value) {
1076        let delta = &msg["delta"];
1077        *self.player_version.write().unwrap() = msg["version"].as_u64().unwrap_or(0);
1078
1079        if let Value::Object(map) = delta {
1080            let mut state = self.player_state.write().unwrap();
1081            for (path, value) in map {
1082                deep_set(&mut state, path, value.clone());
1083            }
1084        }
1085
1086        let state = self.player_state.read().unwrap().clone();
1087        for (_, handler) in self.player_state_handlers.lock().unwrap().iter() {
1088            handler(&state, delta);
1089        }
1090    }
1091
1092    fn handle_action_result(&self, msg: &Value) {
1093        let request_id = msg["requestId"].as_str().unwrap_or("");
1094        if let Some(tx) = self.pending_requests.lock().unwrap().remove(request_id) {
1095            let _ = tx.send(Ok(msg["result"].clone()));
1096        }
1097    }
1098
1099    fn handle_action_error(&self, msg: &Value) {
1100        let request_id = msg["requestId"].as_str().unwrap_or("");
1101        if let Some(tx) = self.pending_requests.lock().unwrap().remove(request_id) {
1102            let message = msg["message"].as_str().unwrap_or("Unknown error");
1103            let _ = tx.send(Err(Error::Room(message.to_string())));
1104        }
1105    }
1106
1107    fn handle_server_message(&self, msg: &Value) {
1108        let message_type = msg["messageType"].as_str().unwrap_or("");
1109        let data = &msg["data"];
1110
1111        let handlers = self.message_handlers.lock().unwrap();
1112        if let Some(list) = handlers.get(message_type) {
1113            for (_, handler) in list {
1114                handler(data);
1115            }
1116        }
1117    }
1118
1119    fn handle_signal(&self, msg: &Value) {
1120        let event = msg["event"].as_str().unwrap_or("");
1121        let payload = &msg["payload"];
1122        let meta = &msg["meta"];
1123
1124        if let Some(list) = self.signal_handlers.lock().unwrap().get(event) {
1125            for (_, handler) in list {
1126                handler(payload, meta);
1127            }
1128        }
1129
1130        for (_, handler) in self.any_signal_handlers.lock().unwrap().iter() {
1131            handler(event, payload, meta);
1132        }
1133    }
1134
1135    fn handle_members_sync(&self, msg: &Value) {
1136        let members = normalize_members(&msg["members"]);
1137        *self.members.write().unwrap() = members.clone();
1138        self.merge_members_into_media();
1139
1140        for (_, handler) in self.member_sync_handlers.lock().unwrap().iter() {
1141            handler(&members);
1142        }
1143    }
1144
1145    fn handle_member_join(&self, msg: &Value) {
1146        if let Some(member) = normalize_member(&msg["member"]) {
1147            self.upsert_member(member.clone());
1148            for (_, handler) in self.member_join_handlers.lock().unwrap().iter() {
1149                handler(&member);
1150            }
1151        }
1152    }
1153
1154    fn handle_member_leave(&self, msg: &Value) {
1155        if let Some(member) = normalize_member(&msg["member"]) {
1156            let member_id = member["memberId"].as_str().unwrap_or("");
1157            self.remove_member(member_id);
1158            let reason = msg["reason"].as_str().unwrap_or("leave");
1159            for (_, handler) in self.member_leave_handlers.lock().unwrap().iter() {
1160                handler(&member, reason);
1161            }
1162        }
1163    }
1164
1165    fn handle_member_state(&self, msg: &Value) {
1166        if let Some(mut member) = normalize_member(&msg["member"]) {
1167            let state = object_or_empty(&msg["state"]);
1168            member["state"] = state.clone();
1169            self.upsert_member(member.clone());
1170            self.resolve_pending_unit_request(&self.pending_member_state_requests, msg);
1171
1172            for (_, handler) in self.member_state_handlers.lock().unwrap().iter() {
1173                handler(&member, &state);
1174            }
1175        }
1176    }
1177
1178    fn handle_media_sync(&self, msg: &Value) {
1179        let media_members = normalize_media_members(&msg["members"]);
1180        *self.media_members.write().unwrap() = media_members.clone();
1181        self.merge_members_into_media();
1182    }
1183
1184    fn handle_media_track(&self, msg: &Value) {
1185        if let (Some(member), Some(track)) = (
1186            normalize_member(&msg["member"]),
1187            normalize_track(&msg["track"]),
1188        ) {
1189            self.upsert_media_track(&member, &track);
1190            for (_, handler) in self.media_track_handlers.lock().unwrap().iter() {
1191                handler(&track, &member);
1192            }
1193        }
1194    }
1195
1196    fn handle_media_track_removed(&self, msg: &Value) {
1197        if let (Some(member), Some(track)) = (
1198            normalize_member(&msg["member"]),
1199            normalize_track(&msg["track"]),
1200        ) {
1201            self.remove_media_track(&member, &track);
1202            for (_, handler) in self
1203                .media_track_removed_handlers
1204                .lock()
1205                .unwrap()
1206                .iter()
1207            {
1208                handler(&track, &member);
1209            }
1210        }
1211    }
1212
1213    fn handle_media_state(&self, msg: &Value) {
1214        if let Some(member) = normalize_member(&msg["member"]) {
1215            let state = object_or_empty(&msg["state"]);
1216            self.upsert_media_state(&member, state.clone());
1217            for (_, handler) in self.media_state_handlers.lock().unwrap().iter() {
1218                handler(&member, &state);
1219            }
1220        }
1221    }
1222
1223    fn handle_media_device(&self, msg: &Value) {
1224        if let Some(member) = normalize_member(&msg["member"]) {
1225            let change = json!({
1226                "kind": msg["kind"].clone(),
1227                "deviceId": msg["deviceId"].clone(),
1228            });
1229            self.apply_media_device_change(&member, &change);
1230            for (_, handler) in self.media_device_handlers.lock().unwrap().iter() {
1231                handler(&member, &change);
1232            }
1233        }
1234    }
1235
1236    fn handle_kicked(&self) {
1237        self.set_connection_state(ROOM_STATE_KICKED);
1238        *self.intentionally_left.lock().unwrap() = true;
1239        for (_, handler) in self.kicked_handlers.lock().unwrap().iter() {
1240            handler();
1241        }
1242    }
1243
1244    fn handle_error(&self, msg: &Value) {
1245        let code = msg["code"].as_str().unwrap_or("");
1246        let message = msg["message"].as_str().unwrap_or("");
1247        for (_, handler) in self.error_handlers.lock().unwrap().iter() {
1248            handler(code, message);
1249        }
1250    }
1251
1252    // ── Private: Helpers ─────────────────────────────────────────────────────
1253
1254    async fn send_unit_request(
1255        &self,
1256        pending: &Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
1257        request_id: String,
1258        message: Value,
1259        timeout_message: String,
1260    ) -> Result<(), Error> {
1261        let (tx, rx) = oneshot::channel::<Result<(), Error>>();
1262        pending.lock().unwrap().insert(request_id.clone(), tx);
1263        self.ws_send(message);
1264
1265        match timeout(Duration::from_millis(self.opts.send_timeout_ms), rx).await {
1266            Ok(Ok(result)) => result,
1267            Ok(Err(_)) => {
1268                pending.lock().unwrap().remove(&request_id);
1269                Err(Error::Room(
1270                    "Room left while waiting for room control result".to_string(),
1271                ))
1272            }
1273            Err(_) => {
1274                pending.lock().unwrap().remove(&request_id);
1275                Err(Error::RoomTimeout(timeout_message))
1276            }
1277        }
1278    }
1279
1280    fn resolve_pending_unit_request(
1281        &self,
1282        pending: &Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
1283        msg: &Value,
1284    ) {
1285        let request_id = msg["requestId"].as_str().unwrap_or("");
1286        if let Some(tx) = pending.lock().unwrap().remove(request_id) {
1287            let _ = tx.send(Ok(()));
1288        }
1289    }
1290
1291    fn reject_pending_unit_request(
1292        &self,
1293        pending: &Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
1294        msg: &Value,
1295        fallback: &str,
1296    ) {
1297        let request_id = msg["requestId"].as_str().unwrap_or("");
1298        if let Some(tx) = pending.lock().unwrap().remove(request_id) {
1299            let message = msg["message"].as_str().unwrap_or(fallback);
1300            let _ = tx.send(Err(Error::Room(message.to_string())));
1301        }
1302    }
1303
1304    fn set_connection_state(&self, next: &str) {
1305        let mut state = self.connection_state.write().unwrap();
1306        if state.as_str() == next {
1307            return;
1308        }
1309        *state = next.to_string();
1310        drop(state);
1311
1312        for (_, handler) in self.connection_state_handlers.lock().unwrap().iter() {
1313            handler(next);
1314        }
1315    }
1316
1317    fn begin_reconnect_attempt(&self, attempt: u64) {
1318        *self.reconnect_info.write().unwrap() = Some(json!({ "attempt": attempt }));
1319        self.set_connection_state(ROOM_STATE_RECONNECTING);
1320    }
1321
1322    fn upsert_member(&self, member: Value) {
1323        let member_id = member["memberId"].as_str().unwrap_or("").to_string();
1324        if member_id.is_empty() {
1325            return;
1326        }
1327
1328        let mut members = self.members.write().unwrap();
1329        let list = members.as_array_mut().expect("members array");
1330        if let Some(existing) = list
1331            .iter_mut()
1332            .find(|entry| entry["memberId"].as_str() == Some(member_id.as_str()))
1333        {
1334            *existing = member;
1335        } else {
1336            list.push(member);
1337        }
1338        drop(members);
1339        self.merge_members_into_media();
1340    }
1341
1342    fn remove_member(&self, member_id: &str) {
1343        {
1344            let mut members = self.members.write().unwrap();
1345            if let Some(list) = members.as_array_mut() {
1346                list.retain(|entry| entry["memberId"].as_str() != Some(member_id));
1347            }
1348        }
1349        {
1350            let mut media_members = self.media_members.write().unwrap();
1351            if let Some(list) = media_members.as_array_mut() {
1352                list.retain(|entry| entry["member"]["memberId"].as_str() != Some(member_id));
1353            }
1354        }
1355    }
1356
1357    fn merge_members_into_media(&self) {
1358        let members = self.members.read().unwrap().clone();
1359        let mut media_members = self.media_members.write().unwrap();
1360        if let Some(list) = media_members.as_array_mut() {
1361            for media_member in list.iter_mut() {
1362                let member_id = media_member["member"]["memberId"].as_str().unwrap_or("");
1363                if let Some(member) = members
1364                    .as_array()
1365                    .and_then(|entries| {
1366                        entries
1367                            .iter()
1368                            .find(|entry| entry["memberId"].as_str() == Some(member_id))
1369                    })
1370                {
1371                    media_member["member"] = member.clone();
1372                }
1373            }
1374        }
1375    }
1376
1377    fn ensure_media_member(&self, member: &Value) -> usize {
1378        self.upsert_member(member.clone());
1379        let member_id = member["memberId"].as_str().unwrap_or("");
1380        let mut media_members = self.media_members.write().unwrap();
1381        let list = media_members.as_array_mut().expect("media members array");
1382        if let Some(index) = list
1383            .iter()
1384            .position(|entry| entry["member"]["memberId"].as_str() == Some(member_id))
1385        {
1386            list[index]["member"] = member.clone();
1387            return index;
1388        }
1389
1390        list.push(json!({
1391            "member": member,
1392            "state": {},
1393            "tracks": [],
1394        }));
1395        list.len() - 1
1396    }
1397
1398    fn upsert_media_track(&self, member: &Value, track: &Value) {
1399        let index = self.ensure_media_member(member);
1400        let mut media_members = self.media_members.write().unwrap();
1401        let list = media_members.as_array_mut().expect("media members array");
1402        let tracks = list[index]["tracks"].as_array_mut().expect("tracks array");
1403        let kind = track["kind"].as_str().unwrap_or("");
1404        if let Some(existing) = tracks
1405            .iter_mut()
1406            .find(|entry| entry["kind"].as_str() == Some(kind))
1407        {
1408            *existing = track.clone();
1409        } else {
1410            tracks.push(track.clone());
1411        }
1412        apply_track_to_state(&mut list[index]["state"], track, true);
1413    }
1414
1415    fn remove_media_track(&self, member: &Value, track: &Value) {
1416        let index = self.ensure_media_member(member);
1417        let mut media_members = self.media_members.write().unwrap();
1418        let list = media_members.as_array_mut().expect("media members array");
1419        let kind = track["kind"].as_str().unwrap_or("");
1420        if let Some(tracks) = list[index]["tracks"].as_array_mut() {
1421            tracks.retain(|entry| entry["kind"].as_str() != Some(kind));
1422        }
1423        apply_track_to_state(&mut list[index]["state"], track, false);
1424    }
1425
1426    fn upsert_media_state(&self, member: &Value, state: Value) {
1427        let index = self.ensure_media_member(member);
1428        let mut media_members = self.media_members.write().unwrap();
1429        let list = media_members.as_array_mut().expect("media members array");
1430        list[index]["state"] = state;
1431    }
1432
1433    fn apply_media_device_change(&self, member: &Value, change: &Value) {
1434        let index = self.ensure_media_member(member);
1435        let mut media_members = self.media_members.write().unwrap();
1436        let list = media_members.as_array_mut().expect("media members array");
1437        let kind = change["kind"].as_str().unwrap_or("");
1438        let device_id = change["deviceId"].clone();
1439        if let Some(state) = list[index]["state"].as_object_mut() {
1440            let kind_state = state.entry(kind.to_string()).or_insert_with(|| json!({}));
1441            if let Some(kind_state_map) = kind_state.as_object_mut() {
1442                kind_state_map.insert("deviceId".to_string(), device_id);
1443            }
1444        }
1445    }
1446
1447    fn next_handler_id(&self) -> u64 {
1448        let mut counter = self.handler_id_counter.lock().unwrap();
1449        *counter += 1;
1450        *counter
1451    }
1452}
1453
1454// ── Helper ────────────────────────────────────────────────────────────────────
1455
1456fn deep_set(obj: &mut Value, path: &str, value: Value) {
1457    if let Some(dot) = path.find('.') {
1458        let head = &path[..dot];
1459        let tail = &path[dot + 1..];
1460        if let Value::Object(map) = obj {
1461            let nested = map.entry(head.to_string()).or_insert(json!({}));
1462            deep_set(nested, tail, value);
1463        }
1464    } else if let Value::Object(map) = obj {
1465        if value.is_null() {
1466            map.remove(path);
1467        } else {
1468            map.insert(path.to_string(), value);
1469        }
1470    }
1471}
1472
1473fn object_or_empty(value: &Value) -> Value {
1474    if value.is_object() {
1475        value.clone()
1476    } else {
1477        json!({})
1478    }
1479}
1480
1481fn normalize_member(value: &Value) -> Option<Value> {
1482    let member_id = value["memberId"].as_str()?;
1483    let user_id = value["userId"].as_str()?;
1484    let mut member = json!({
1485        "memberId": member_id,
1486        "userId": user_id,
1487        "state": object_or_empty(&value["state"]),
1488    });
1489
1490    if let Some(connection_id) = value["connectionId"].as_str() {
1491        member["connectionId"] = Value::String(connection_id.to_string());
1492    }
1493    if let Some(connection_count) = value["connectionCount"].as_u64() {
1494        member["connectionCount"] = Value::from(connection_count);
1495    }
1496    if let Some(role) = value["role"].as_str() {
1497        member["role"] = Value::String(role.to_string());
1498    }
1499    Some(member)
1500}
1501
1502fn normalize_members(value: &Value) -> Value {
1503    Value::Array(
1504        value
1505            .as_array()
1506            .into_iter()
1507            .flatten()
1508            .filter_map(normalize_member)
1509            .collect(),
1510    )
1511}
1512
1513fn normalize_track(value: &Value) -> Option<Value> {
1514    let kind = value["kind"].as_str()?;
1515    let mut track = json!({
1516        "kind": kind,
1517        "muted": value["muted"].as_bool().unwrap_or(false),
1518    });
1519
1520    if let Some(track_id) = value["trackId"].as_str() {
1521        track["trackId"] = Value::String(track_id.to_string());
1522    }
1523    if let Some(device_id) = value["deviceId"].as_str() {
1524        track["deviceId"] = Value::String(device_id.to_string());
1525    }
1526    if let Some(published_at) = value["publishedAt"].as_u64() {
1527        track["publishedAt"] = Value::from(published_at);
1528    }
1529    if let Some(admin_disabled) = value["adminDisabled"].as_bool() {
1530        track["adminDisabled"] = Value::Bool(admin_disabled);
1531    }
1532    Some(track)
1533}
1534
1535fn normalize_tracks(value: &Value) -> Value {
1536    Value::Array(
1537        value
1538            .as_array()
1539            .into_iter()
1540            .flatten()
1541            .filter_map(normalize_track)
1542            .collect(),
1543    )
1544}
1545
1546fn normalize_media_members(value: &Value) -> Value {
1547    Value::Array(
1548        value
1549            .as_array()
1550            .into_iter()
1551            .flatten()
1552            .filter_map(|entry| {
1553                let member = normalize_member(&entry["member"])?;
1554                Some(json!({
1555                    "member": member,
1556                    "state": object_or_empty(&entry["state"]),
1557                    "tracks": normalize_tracks(&entry["tracks"]),
1558                }))
1559            })
1560            .collect(),
1561    )
1562}
1563
1564fn apply_track_to_state(state: &mut Value, track: &Value, published: bool) {
1565    if !state.is_object() {
1566        *state = json!({});
1567    }
1568
1569    let kind = match track["kind"].as_str() {
1570        Some(kind) => kind,
1571        None => return,
1572    };
1573
1574    let state_map = state.as_object_mut().expect("state object");
1575    let kind_state = state_map
1576        .entry(kind.to_string())
1577        .or_insert_with(|| json!({}));
1578    let kind_state_map = kind_state.as_object_mut().expect("kind state object");
1579    kind_state_map.insert(
1580        "published".to_string(),
1581        Value::Bool(published),
1582    );
1583    kind_state_map.insert(
1584        "muted".to_string(),
1585        Value::Bool(track["muted"].as_bool().unwrap_or(false)),
1586    );
1587
1588    if published {
1589        if let Some(track_id) = track.get("trackId") {
1590            kind_state_map.insert("trackId".to_string(), track_id.clone());
1591        }
1592        if let Some(device_id) = track.get("deviceId") {
1593            kind_state_map.insert("deviceId".to_string(), device_id.clone());
1594        }
1595        if let Some(published_at) = track.get("publishedAt") {
1596            kind_state_map.insert("publishedAt".to_string(), published_at.clone());
1597        }
1598        if let Some(admin_disabled) = track.get("adminDisabled") {
1599            kind_state_map.insert("adminDisabled".to_string(), admin_disabled.clone());
1600        }
1601    } else {
1602        kind_state_map.remove("trackId");
1603        kind_state_map.remove("publishedAt");
1604        if let Some(admin_disabled) = track.get("adminDisabled") {
1605            kind_state_map.insert("adminDisabled".to_string(), admin_disabled.clone());
1606        }
1607    }
1608}
1609
1610pub struct RoomStateNamespace {
1611    client: Arc<RoomClient>,
1612}
1613
1614impl RoomStateNamespace {
1615    fn new(client: Arc<RoomClient>) -> Self {
1616        Self { client }
1617    }
1618
1619    pub fn get_shared(&self) -> Value {
1620        self.client.get_shared_state()
1621    }
1622
1623    pub fn get_mine(&self) -> Value {
1624        self.client.get_player_state()
1625    }
1626
1627    pub fn on_shared_change(
1628        &self,
1629        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1630    ) -> Subscription {
1631        self.client.on_shared_state(handler)
1632    }
1633
1634    pub fn on_mine_change(
1635        &self,
1636        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1637    ) -> Subscription {
1638        self.client.on_player_state(handler)
1639    }
1640
1641    pub async fn send(&self, action_type: &str, payload: Option<Value>) -> Result<Value, Error> {
1642        self.client.send(action_type, payload).await
1643    }
1644}
1645
1646pub struct RoomMetaNamespace {
1647    client: Arc<RoomClient>,
1648}
1649
1650impl RoomMetaNamespace {
1651    fn new(client: Arc<RoomClient>) -> Self {
1652        Self { client }
1653    }
1654
1655    pub async fn get(&self) -> Result<Value, Error> {
1656        self.client.get_metadata().await
1657    }
1658}
1659
1660pub struct RoomSignalsNamespace {
1661    client: Arc<RoomClient>,
1662}
1663
1664impl RoomSignalsNamespace {
1665    fn new(client: Arc<RoomClient>) -> Self {
1666        Self { client }
1667    }
1668
1669    pub async fn send(
1670        &self,
1671        event: &str,
1672        payload: Option<Value>,
1673        options: Option<Value>,
1674    ) -> Result<(), Error> {
1675        self.client.send_signal(event, payload, options).await
1676    }
1677
1678    pub async fn send_to(
1679        &self,
1680        member_id: &str,
1681        event: &str,
1682        payload: Option<Value>,
1683    ) -> Result<(), Error> {
1684        self.client
1685            .send_signal(event, payload, Some(json!({ "memberId": member_id })))
1686            .await
1687    }
1688
1689    pub fn on(
1690        &self,
1691        event: &str,
1692        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1693    ) -> Subscription {
1694        self.client.on_signal(event, handler)
1695    }
1696
1697    pub fn on_any(
1698        &self,
1699        handler: impl Fn(&str, &Value, &Value) + Send + Sync + 'static,
1700    ) -> Subscription {
1701        self.client.on_any_signal(handler)
1702    }
1703}
1704
1705pub struct RoomMembersNamespace {
1706    client: Arc<RoomClient>,
1707}
1708
1709impl RoomMembersNamespace {
1710    fn new(client: Arc<RoomClient>) -> Self {
1711        Self { client }
1712    }
1713
1714    pub fn list(&self) -> Value {
1715        self.client.list_members()
1716    }
1717
1718    pub fn on_sync(
1719        &self,
1720        handler: impl Fn(&Value) + Send + Sync + 'static,
1721    ) -> Subscription {
1722        self.client.on_members_sync(handler)
1723    }
1724
1725    pub fn on_join(
1726        &self,
1727        handler: impl Fn(&Value) + Send + Sync + 'static,
1728    ) -> Subscription {
1729        self.client.on_member_join(handler)
1730    }
1731
1732    pub fn on_leave(
1733        &self,
1734        handler: impl Fn(&Value, &str) + Send + Sync + 'static,
1735    ) -> Subscription {
1736        self.client.on_member_leave(handler)
1737    }
1738
1739    pub async fn set_state(&self, state: Value) -> Result<(), Error> {
1740        self.client.send_member_state(state).await
1741    }
1742
1743    pub async fn clear_state(&self) -> Result<(), Error> {
1744        self.client.clear_member_state().await
1745    }
1746
1747    pub fn on_state_change(
1748        &self,
1749        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1750    ) -> Subscription {
1751        self.client.on_member_state_change(handler)
1752    }
1753}
1754
1755pub struct RoomAdminNamespace {
1756    client: Arc<RoomClient>,
1757}
1758
1759impl RoomAdminNamespace {
1760    fn new(client: Arc<RoomClient>) -> Self {
1761        Self { client }
1762    }
1763
1764    pub async fn kick(&self, member_id: &str) -> Result<(), Error> {
1765        self.client.send_admin("kick", member_id, None).await
1766    }
1767
1768    pub async fn mute(&self, member_id: &str) -> Result<(), Error> {
1769        self.client.send_admin("mute", member_id, None).await
1770    }
1771
1772    pub async fn block(&self, member_id: &str) -> Result<(), Error> {
1773        self.client.send_admin("block", member_id, None).await
1774    }
1775
1776    pub async fn set_role(&self, member_id: &str, role: &str) -> Result<(), Error> {
1777        self.client
1778            .send_admin("setRole", member_id, Some(json!({ "role": role })))
1779            .await
1780    }
1781
1782    pub async fn disable_video(&self, member_id: &str) -> Result<(), Error> {
1783        self.client.send_admin("disableVideo", member_id, None).await
1784    }
1785
1786    pub async fn stop_screen_share(&self, member_id: &str) -> Result<(), Error> {
1787        self.client
1788            .send_admin("stopScreenShare", member_id, None)
1789            .await
1790    }
1791}
1792
1793pub struct RoomMediaKindNamespace {
1794    client: Arc<RoomClient>,
1795    kind: &'static str,
1796}
1797
1798impl RoomMediaKindNamespace {
1799    fn new(client: Arc<RoomClient>, kind: &'static str) -> Self {
1800        Self { client, kind }
1801    }
1802
1803    pub async fn enable(&self, payload: Option<Value>) -> Result<(), Error> {
1804        self.client.send_media("publish", self.kind, payload).await
1805    }
1806
1807    pub async fn disable(&self) -> Result<(), Error> {
1808        self.client.send_media("unpublish", self.kind, None).await
1809    }
1810
1811    pub async fn set_muted(&self, muted: bool) -> Result<(), Error> {
1812        self.client
1813            .send_media("mute", self.kind, Some(json!({ "muted": muted })))
1814            .await
1815    }
1816}
1817
1818pub struct RoomScreenMediaNamespace {
1819    client: Arc<RoomClient>,
1820}
1821
1822impl RoomScreenMediaNamespace {
1823    fn new(client: Arc<RoomClient>) -> Self {
1824        Self { client }
1825    }
1826
1827    pub async fn start(&self, payload: Option<Value>) -> Result<(), Error> {
1828        self.client.send_media("publish", "screen", payload).await
1829    }
1830
1831    pub async fn stop(&self) -> Result<(), Error> {
1832        self.client.send_media("unpublish", "screen", None).await
1833    }
1834}
1835
1836pub struct RoomMediaDevicesNamespace {
1837    client: Arc<RoomClient>,
1838}
1839
1840impl RoomMediaDevicesNamespace {
1841    fn new(client: Arc<RoomClient>) -> Self {
1842        Self { client }
1843    }
1844
1845    pub async fn switch_inputs(&self, payload: Value) -> Result<(), Error> {
1846        self.client.switch_media_devices(payload).await
1847    }
1848}
1849
1850pub struct RoomMediaNamespace {
1851    client: Arc<RoomClient>,
1852}
1853
1854impl RoomMediaNamespace {
1855    fn new(client: Arc<RoomClient>) -> Self {
1856        Self { client }
1857    }
1858
1859    pub fn list(&self) -> Value {
1860        self.client.list_media_members()
1861    }
1862
1863    pub fn audio(&self) -> RoomMediaKindNamespace {
1864        RoomMediaKindNamespace::new(Arc::clone(&self.client), "audio")
1865    }
1866
1867    pub fn video(&self) -> RoomMediaKindNamespace {
1868        RoomMediaKindNamespace::new(Arc::clone(&self.client), "video")
1869    }
1870
1871    pub fn screen(&self) -> RoomScreenMediaNamespace {
1872        RoomScreenMediaNamespace::new(Arc::clone(&self.client))
1873    }
1874
1875    pub fn devices(&self) -> RoomMediaDevicesNamespace {
1876        RoomMediaDevicesNamespace::new(Arc::clone(&self.client))
1877    }
1878
1879    pub fn on_track(
1880        &self,
1881        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1882    ) -> Subscription {
1883        self.client.on_media_track(handler)
1884    }
1885
1886    pub fn on_track_removed(
1887        &self,
1888        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1889    ) -> Subscription {
1890        self.client.on_media_track_removed(handler)
1891    }
1892
1893    pub fn on_state_change(
1894        &self,
1895        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1896    ) -> Subscription {
1897        self.client.on_media_state_change(handler)
1898    }
1899
1900    pub fn on_device_change(
1901        &self,
1902        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1903    ) -> Subscription {
1904        self.client.on_media_device_change(handler)
1905    }
1906}
1907
1908pub struct RoomSessionNamespace {
1909    client: Arc<RoomClient>,
1910}
1911
1912impl RoomSessionNamespace {
1913    fn new(client: Arc<RoomClient>) -> Self {
1914        Self { client }
1915    }
1916
1917    pub fn on_error(
1918        &self,
1919        handler: impl Fn(&str, &str) + Send + Sync + 'static,
1920    ) -> Subscription {
1921        self.client.on_error(handler)
1922    }
1923
1924    pub fn on_kicked(&self, handler: impl Fn() + Send + Sync + 'static) -> Subscription {
1925        self.client.on_kicked(handler)
1926    }
1927
1928    pub fn on_reconnect(
1929        &self,
1930        handler: impl Fn(&Value) + Send + Sync + 'static,
1931    ) -> Subscription {
1932        self.client.on_reconnect(handler)
1933    }
1934
1935    pub fn on_connection_state_change(
1936        &self,
1937        handler: impl Fn(&str) + Send + Sync + 'static,
1938    ) -> Subscription {
1939        self.client.on_connection_state_change(handler)
1940    }
1941
1942    pub fn connection_state(&self) -> String {
1943        self.client.connection_state()
1944    }
1945
1946    pub fn user_id(&self) -> Option<String> {
1947        self.client.current_user_id.lock().unwrap().clone()
1948    }
1949
1950    pub fn connection_id(&self) -> Option<String> {
1951        self.client.current_connection_id.lock().unwrap().clone()
1952    }
1953}