Skip to main content

edgebase_core/
room.rs

1// room.rs — RoomClient v2 for EdgeBase Rust SDK.
2//
3// Complete redesign from v1:
4//   - 2 client-visible state areas: sharedState (all clients), playerState (per-player)
5//   - Client can only read + subscribe + send(). All writes are server-only.
6//   - send() returns a Result resolved by requestId matching via oneshot channel
7//   - Subscription returns a Subscription struct with unsubscribe()
8//   - namespace + roomId identification (replaces single roomId)
9//
10// Usage:
11//   let room = RoomClient::new(&url, "game", "lobby-1", move || token.clone(), None);
12//   room.join().await?;
13//   let sub = room.on_shared_state(|state, changes| { ... });
14//   let result = room.send("SET_SCORE", Some(json!({"score": 42}))).await?;
15//   sub.unsubscribe();
16//   room.leave().await;
17
18use crate::error::Error;
19use serde_json::{json, Value};
20use std::collections::HashMap;
21use std::sync::{Arc, Mutex, RwLock};
22use tokio::sync::{mpsc, oneshot};
23use tokio::time::{sleep, timeout, Duration};
24use uuid::Uuid;
25
26// ── Public types ──────────────────────────────────────────────────────────────
27
28pub struct RoomOptions {
29    pub auto_reconnect: bool,
30    pub max_reconnect_attempts: u32,
31    pub reconnect_base_delay_ms: u64,
32    /// Timeout for send() requests in ms (default: 10000)
33    pub send_timeout_ms: u64,
34    /// Timeout for WebSocket connection establishment in ms (default: 15000)
35    pub connection_timeout_ms: u64,
36}
37
38impl Default for RoomOptions {
39    fn default() -> Self {
40        Self {
41            auto_reconnect: true,
42            max_reconnect_attempts: 10,
43            reconnect_base_delay_ms: 1000,
44            send_timeout_ms: 10_000,
45            connection_timeout_ms: 15_000,
46        }
47    }
48}
49
50// ── Subscription ──────────────────────────────────────────────────────────────
51
52/// Handle returned by on_* methods. Call `unsubscribe()` to remove the handler.
53pub struct Subscription {
54    remove_fn: Mutex<Option<Box<dyn FnOnce() + Send>>>,
55}
56
57impl Subscription {
58    fn new(remove_fn: impl FnOnce() + Send + 'static) -> Self {
59        Self {
60            remove_fn: Mutex::new(Some(Box::new(remove_fn))),
61        }
62    }
63
64    /// Remove the handler. Safe to call multiple times (subsequent calls are no-ops).
65    pub fn unsubscribe(self) {
66        if let Some(f) = self.remove_fn.lock().unwrap().take() {
67            f();
68        }
69    }
70}
71
72impl Drop for Subscription {
73    fn drop(&mut self) {
74        if let Some(f) = self.remove_fn.lock().unwrap().take() {
75            f();
76        }
77    }
78}
79
80// ── Handler type aliases (internal) ──────────────────────────────────────────
81
82type StateHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
83type MessageHandler = Arc<dyn Fn(&Value) + Send + Sync>;
84type ErrorHandler = Arc<dyn Fn(&str, &str) + Send + Sync>;
85type KickedHandler = Arc<dyn Fn() + Send + Sync>;
86type MembersSyncHandler = Arc<dyn Fn(&Value) + Send + Sync>;
87type MemberHandler = Arc<dyn Fn(&Value) + Send + Sync>;
88type MemberLeaveHandler = Arc<dyn Fn(&Value, &str) + Send + Sync>;
89type MemberStateHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
90type SignalHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
91type AnySignalHandler = Arc<dyn Fn(&str, &Value, &Value) + Send + Sync>;
92type ReconnectHandler = Arc<dyn Fn(&Value) + Send + Sync>;
93type ConnectionStateHandler = Arc<dyn Fn(&str) + Send + Sync>;
94
95/// ID-tagged handler entry for removal by Subscription.
96type HandlerList<H> = Arc<Mutex<Vec<(u64, H)>>>;
97
98pub(crate) enum RoomWsCommand {
99    Send(String),
100    Close,
101}
102
103const ROOM_EXPLICIT_LEAVE_CLOSE_DELAY: Duration = Duration::from_millis(40);
104const ROOM_STATE_IDLE: &str = "idle";
105const ROOM_STATE_CONNECTING: &str = "connecting";
106const ROOM_STATE_CONNECTED: &str = "connected";
107const ROOM_STATE_RECONNECTING: &str = "reconnecting";
108const ROOM_STATE_DISCONNECTED: &str = "disconnected";
109const ROOM_STATE_KICKED: &str = "kicked";
110
111// ── RoomClient v2 ─────────────────────────────────────────────────────────────
112
113pub struct RoomClient {
114    /// Room namespace (e.g. "game", "chat")
115    pub namespace: String,
116    /// Room instance ID within the namespace
117    pub room_id: String,
118
119    // ── State ───
120    shared_state: RwLock<Value>,
121    shared_version: RwLock<u64>,
122    player_state: RwLock<Value>,
123    player_version: RwLock<u64>,
124    members: RwLock<Value>,
125    current_user_id: Mutex<Option<String>>,
126    current_connection_id: Mutex<Option<String>>,
127    connection_state: RwLock<String>,
128    reconnect_info: RwLock<Option<Value>>,
129
130    // ── Config ───
131    base_url: String,
132    token_fn: Box<dyn Fn() -> String + Send + Sync>,
133    opts: RoomOptions,
134
135    // ── Handlers (Arc-wrapped so Subscription closures can hold clones) ───
136    shared_state_handlers: HandlerList<StateHandler>,
137    player_state_handlers: HandlerList<StateHandler>,
138    message_handlers: Arc<Mutex<HashMap<String, Vec<(u64, MessageHandler)>>>>,
139    error_handlers: HandlerList<ErrorHandler>,
140    kicked_handlers: HandlerList<KickedHandler>,
141    member_sync_handlers: HandlerList<MembersSyncHandler>,
142    member_join_handlers: HandlerList<MemberHandler>,
143    member_leave_handlers: HandlerList<MemberLeaveHandler>,
144    member_state_handlers: HandlerList<MemberStateHandler>,
145    signal_handlers: Arc<Mutex<HashMap<String, Vec<(u64, SignalHandler)>>>>,
146    any_signal_handlers: HandlerList<AnySignalHandler>,
147    reconnect_handlers: HandlerList<ReconnectHandler>,
148    connection_state_handlers: HandlerList<ConnectionStateHandler>,
149    handler_id_counter: Mutex<u64>,
150
151    // ── Pending send() requests (requestId → oneshot Sender) ───
152    pending_requests: Mutex<HashMap<String, oneshot::Sender<Result<Value, Error>>>>,
153    pending_signal_requests: Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
154    pending_admin_requests: Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
155    pending_member_state_requests: Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
156    // ── WS send channel ───
157    send_tx: Mutex<Option<mpsc::Sender<RoomWsCommand>>>,
158
159    // ── Control ───
160    stop_tx: Mutex<Option<mpsc::Sender<()>>>,
161    intentionally_left: Mutex<bool>,
162}
163
164impl RoomClient {
165    /// Create a new v2 RoomClient.
166    ///
167    /// # Arguments
168    /// * `base_url` - EdgeBase server URL (http or https)
169    /// * `namespace` - Room namespace (e.g. "game", "chat")
170    /// * `room_id` - Room instance ID within the namespace
171    /// * `token_fn` - Closure that returns the current access token
172    /// * `opts` - Optional RoomOptions for reconnect and timeout configuration
173    pub fn new(
174        base_url: &str,
175        namespace: &str,
176        room_id: &str,
177        token_fn: impl Fn() -> String + Send + Sync + 'static,
178        opts: Option<RoomOptions>,
179    ) -> Arc<Self> {
180        Arc::new(Self {
181            namespace: namespace.to_string(),
182            room_id: room_id.to_string(),
183            shared_state: RwLock::new(json!({})),
184            shared_version: RwLock::new(0),
185            player_state: RwLock::new(json!({})),
186            player_version: RwLock::new(0),
187            members: RwLock::new(json!([])),
188            current_user_id: Mutex::new(None),
189            current_connection_id: Mutex::new(None),
190            connection_state: RwLock::new(ROOM_STATE_IDLE.to_string()),
191            reconnect_info: RwLock::new(None),
192            base_url: base_url.trim_end_matches('/').to_string(),
193            token_fn: Box::new(token_fn),
194            opts: opts.unwrap_or_default(),
195            shared_state_handlers: Arc::new(Mutex::new(vec![])),
196            player_state_handlers: Arc::new(Mutex::new(vec![])),
197            message_handlers: Arc::new(Mutex::new(HashMap::new())),
198            error_handlers: Arc::new(Mutex::new(vec![])),
199            kicked_handlers: Arc::new(Mutex::new(vec![])),
200            member_sync_handlers: Arc::new(Mutex::new(vec![])),
201            member_join_handlers: Arc::new(Mutex::new(vec![])),
202            member_leave_handlers: Arc::new(Mutex::new(vec![])),
203            member_state_handlers: Arc::new(Mutex::new(vec![])),
204            signal_handlers: Arc::new(Mutex::new(HashMap::new())),
205            any_signal_handlers: Arc::new(Mutex::new(vec![])),
206            reconnect_handlers: Arc::new(Mutex::new(vec![])),
207            connection_state_handlers: Arc::new(Mutex::new(vec![])),
208            handler_id_counter: Mutex::new(0),
209            pending_requests: Mutex::new(HashMap::new()),
210            pending_signal_requests: Mutex::new(HashMap::new()),
211            pending_admin_requests: Mutex::new(HashMap::new()),
212            pending_member_state_requests: Mutex::new(HashMap::new()),
213            send_tx: Mutex::new(None),
214            stop_tx: Mutex::new(None),
215            intentionally_left: Mutex::new(false),
216        })
217    }
218
219    // ── State Accessors ──────────────────────────────────────────────────────
220
221    /// Get current shared state (read-only snapshot).
222    pub fn get_shared_state(&self) -> Value {
223        self.shared_state.read().unwrap().clone()
224    }
225
226    /// Get current player state (read-only snapshot).
227    pub fn get_player_state(&self) -> Value {
228        self.player_state.read().unwrap().clone()
229    }
230
231    /// Get the current logical room members snapshot.
232    pub fn list_members(&self) -> Value {
233        self.members.read().unwrap().clone()
234    }
235
236    /// Get the current session connection state.
237    pub fn connection_state(&self) -> String {
238        self.connection_state.read().unwrap().clone()
239    }
240
241    pub fn state(self: &Arc<Self>) -> RoomStateNamespace {
242        RoomStateNamespace::new(Arc::clone(self))
243    }
244
245    pub fn meta(self: &Arc<Self>) -> RoomMetaNamespace {
246        RoomMetaNamespace::new(Arc::clone(self))
247    }
248
249    pub fn signals(self: &Arc<Self>) -> RoomSignalsNamespace {
250        RoomSignalsNamespace::new(Arc::clone(self))
251    }
252
253    pub fn members(self: &Arc<Self>) -> RoomMembersNamespace {
254        RoomMembersNamespace::new(Arc::clone(self))
255    }
256
257    pub fn admin(self: &Arc<Self>) -> RoomAdminNamespace {
258        RoomAdminNamespace::new(Arc::clone(self))
259    }
260
261    pub fn session(self: &Arc<Self>) -> RoomSessionNamespace {
262        RoomSessionNamespace::new(Arc::clone(self))
263    }
264
265    // ── Metadata (HTTP, no WebSocket needed) ─────────────────────────────────
266
267    /// Get room metadata without joining (HTTP GET).
268    /// Returns developer-defined metadata set by room.setMetadata() on the server.
269    pub async fn get_metadata(&self) -> Result<Value, Error> {
270        Self::get_metadata_static(&self.base_url, &self.namespace, &self.room_id).await
271    }
272
273    /// Static: Get room metadata without creating a RoomClient instance.
274    /// Useful for lobby screens where you need room info before joining.
275    pub async fn get_metadata_static(
276        base_url: &str,
277        namespace: &str,
278        room_id: &str,
279    ) -> Result<Value, Error> {
280        let url = format!(
281            "{}/api/room/metadata?namespace={}&id={}",
282            base_url.trim_end_matches('/'),
283            urlencoding::encode(namespace),
284            urlencoding::encode(room_id)
285        );
286        let resp = reqwest::get(&url)
287            .await
288            .map_err(|e| Error::Room(format!(
289                "Room metadata request could not reach {}. Make sure the EdgeBase server is running and reachable. Cause: {}",
290                url, e
291            )))?;
292        let status = resp.status();
293        let body = resp
294            .text()
295            .await
296            .map_err(|e| Error::Room(format!("Failed to read room metadata body from {}: {}", url, e)))?;
297        if !status.is_success() {
298            if let Ok(json) = serde_json::from_str::<Value>(&body) {
299                for key in ["message", "error", "detail"] {
300                    if let Some(message) = json.get(key).and_then(|value| value.as_str()) {
301                        let trimmed = message.trim();
302                        if !trimmed.is_empty() {
303                            return Err(Error::Room(trimmed.to_string()));
304                        }
305                    }
306                }
307            }
308            return Err(Error::Room(format!(
309                "Failed to load room metadata for '{}' in namespace '{}' (HTTP {}).",
310                room_id, namespace, status
311            )));
312        }
313        serde_json::from_str(&body)
314            .map_err(|e| Error::Room(format!("Failed to parse room metadata: {}", e)))
315    }
316
317    // ── Connection Lifecycle ─────────────────────────────────────────────────
318
319    /// Connect to the room, authenticate, and join.
320    pub async fn join(self: &Arc<Self>) -> Result<(), Error> {
321        *self.intentionally_left.lock().unwrap() = false;
322        self.set_connection_state(ROOM_STATE_CONNECTING);
323
324        let (stop_tx, mut stop_rx) = mpsc::channel::<()>(1);
325        *self.stop_tx.lock().unwrap() = Some(stop_tx);
326
327        let this = Arc::clone(self);
328        tokio::spawn(async move {
329            let mut attempts = 0u32;
330            loop {
331                match this.establish().await {
332                    Ok(mut ws_rx) => {
333                        attempts = 0;
334                        loop {
335                            tokio::select! {
336                                _ = stop_rx.recv() => return,
337                                msg = ws_rx.recv() => {
338                                    match msg {
339                                        Some(raw) => this.handle_message(&raw),
340                                        None => {
341                                            // WS closed — reject pending requests
342                                            // if disconnect was not intentional
343                                            if !*this.intentionally_left.lock().unwrap() {
344                                                this.reject_all_pending(
345                                                    "WebSocket disconnected",
346                                                );
347                                            }
348                                            break;
349                                        }
350                                    }
351                                }
352                            }
353                        }
354                    }
355                    Err(_) => {}
356                }
357                if *this.intentionally_left.lock().unwrap() {
358                    return;
359                }
360                if !this.opts.auto_reconnect || attempts >= this.opts.max_reconnect_attempts {
361                    this.set_connection_state(ROOM_STATE_DISCONNECTED);
362                    return;
363                }
364                let delay = (this.opts.reconnect_base_delay_ms * (1u64 << attempts)).min(30_000);
365                attempts += 1;
366                this.begin_reconnect_attempt(attempts as u64);
367                sleep(Duration::from_millis(delay)).await;
368            }
369        });
370        Ok(())
371    }
372
373    /// Leave the room and disconnect. Cleans up all pending send() requests.
374    pub async fn leave(&self) {
375        *self.intentionally_left.lock().unwrap() = true;
376
377        let send_tx = self.send_tx.lock().unwrap().clone();
378        if let Some(tx) = send_tx.as_ref() {
379            let _ = tx
380                .send(RoomWsCommand::Send(
381                    json!({"type": "leave"}).to_string(),
382                ))
383                .await;
384            sleep(ROOM_EXPLICIT_LEAVE_CLOSE_DELAY).await;
385            let _ = tx.send(RoomWsCommand::Close).await;
386        }
387
388        if let Some(tx) = self.stop_tx.lock().unwrap().take() {
389            let _ = tx.send(()).await;
390        }
391
392        *self.send_tx.lock().unwrap() = None;
393
394        // Reject all pending requests with explicit error
395        self.reject_all_pending("Room left");
396
397        // Reset state
398        *self.shared_state.write().unwrap() = json!({});
399        *self.shared_version.write().unwrap() = 0;
400        *self.player_state.write().unwrap() = json!({});
401        *self.player_version.write().unwrap() = 0;
402        *self.members.write().unwrap() = json!([]);
403        *self.current_user_id.lock().unwrap() = None;
404        *self.current_connection_id.lock().unwrap() = None;
405        *self.reconnect_info.write().unwrap() = None;
406        self.set_connection_state(ROOM_STATE_IDLE);
407    }
408
409    // ── Actions ──────────────────────────────────────────────────────────────
410
411    /// Send an action to the server.
412    /// Returns a Result that resolves with the action result from the server.
413    ///
414    /// # Example
415    /// ```ignore
416    /// let result = room.send("SET_SCORE", Some(json!({"score": 42}))).await?;
417    /// ```
418    pub async fn send(&self, action_type: &str, payload: Option<Value>) -> Result<Value, Error> {
419        if self.send_tx.lock().unwrap().is_none() {
420            return Err(Error::Room("Not connected to room. Call join() and wait for the room to connect before sending actions or signals.".to_string()));
421        }
422
423        let request_id = Uuid::new_v4().to_string();
424        let (tx, rx) = oneshot::channel::<Result<Value, Error>>();
425
426        self.pending_requests
427            .lock()
428            .unwrap()
429            .insert(request_id.clone(), tx);
430
431        self.ws_send(json!({
432            "type": "send",
433            "actionType": action_type,
434            "payload": payload.unwrap_or(json!({})),
435            "requestId": request_id,
436        }));
437
438        let timeout_ms = self.opts.send_timeout_ms;
439        let action_type_owned = action_type.to_string();
440        let req_id = request_id.clone();
441
442        match timeout(Duration::from_millis(timeout_ms), rx).await {
443            Ok(Ok(result)) => result,
444            Ok(Err(_)) => {
445                // oneshot channel closed (room left or sender dropped)
446                self.pending_requests.lock().unwrap().remove(&req_id);
447                Err(Error::Room(
448                    "Room left while waiting for action result".to_string(),
449                ))
450            }
451            Err(_) => {
452                // Timeout
453                self.pending_requests.lock().unwrap().remove(&req_id);
454                Err(Error::RoomTimeout(format!(
455                    "Action '{}' timed out",
456                    action_type_owned
457                )))
458            }
459        }
460    }
461
462    // ── Subscriptions (v2 API) ───────────────────────────────────────────────
463
464    /// Subscribe to shared state changes.
465    /// Handler receives (full_state, changes) on each sync/delta.
466    pub fn on_shared_state(
467        &self,
468        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
469    ) -> Subscription {
470        let id = self.next_handler_id();
471        let handler = Arc::new(handler) as StateHandler;
472        self.shared_state_handlers.lock().unwrap().push((id, handler));
473
474        let list = Arc::clone(&self.shared_state_handlers);
475        Subscription::new(move || {
476            list.lock().unwrap().retain(|(hid, _)| *hid != id);
477        })
478    }
479
480    /// Subscribe to player state changes.
481    /// Handler receives (full_state, changes) on each sync/delta.
482    pub fn on_player_state(
483        &self,
484        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
485    ) -> Subscription {
486        let id = self.next_handler_id();
487        let handler = Arc::new(handler) as StateHandler;
488        self.player_state_handlers.lock().unwrap().push((id, handler));
489
490        let list = Arc::clone(&self.player_state_handlers);
491        Subscription::new(move || {
492            list.lock().unwrap().retain(|(hid, _)| *hid != id);
493        })
494    }
495
496    /// Subscribe to messages of a specific type sent by room.sendMessage().
497    ///
498    /// # Example
499    /// ```ignore
500    /// let sub = room.on_message("game_over", |data| { println!("{:?}", data); });
501    /// ```
502    pub fn on_message(
503        &self,
504        msg_type: &str,
505        handler: impl Fn(&Value) + Send + Sync + 'static,
506    ) -> Subscription {
507        let id = self.next_handler_id();
508        let handler = Arc::new(handler) as MessageHandler;
509        let msg_type = msg_type.to_string();
510        {
511            let mut map = self.message_handlers.lock().unwrap();
512            map.entry(msg_type.clone())
513                .or_insert_with(Vec::new)
514                .push((id, handler));
515        }
516
517        let map = Arc::clone(&self.message_handlers);
518        Subscription::new(move || {
519            if let Some(list) = map.lock().unwrap().get_mut(&msg_type) {
520                list.retain(|(hid, _)| *hid != id);
521            }
522        })
523    }
524
525    /// Subscribe to error events.
526    pub fn on_error(
527        &self,
528        handler: impl Fn(&str, &str) + Send + Sync + 'static,
529    ) -> Subscription {
530        let id = self.next_handler_id();
531        let handler = Arc::new(handler) as ErrorHandler;
532        self.error_handlers.lock().unwrap().push((id, handler));
533
534        let list = Arc::clone(&self.error_handlers);
535        Subscription::new(move || {
536            list.lock().unwrap().retain(|(hid, _)| *hid != id);
537        })
538    }
539
540    /// Subscribe to kick events. After being kicked, auto-reconnect is disabled.
541    pub fn on_kicked(&self, handler: impl Fn() + Send + Sync + 'static) -> Subscription {
542        let id = self.next_handler_id();
543        let handler = Arc::new(handler) as KickedHandler;
544        self.kicked_handlers.lock().unwrap().push((id, handler));
545
546        let list = Arc::clone(&self.kicked_handlers);
547        Subscription::new(move || {
548            list.lock().unwrap().retain(|(hid, _)| *hid != id);
549        })
550    }
551
552    pub fn on_members_sync(
553        &self,
554        handler: impl Fn(&Value) + Send + Sync + 'static,
555    ) -> Subscription {
556        let id = self.next_handler_id();
557        let handler = Arc::new(handler) as MembersSyncHandler;
558        self.member_sync_handlers.lock().unwrap().push((id, handler));
559
560        let list = Arc::clone(&self.member_sync_handlers);
561        Subscription::new(move || {
562            list.lock().unwrap().retain(|(hid, _)| *hid != id);
563        })
564    }
565
566    pub fn on_member_join(
567        &self,
568        handler: impl Fn(&Value) + Send + Sync + 'static,
569    ) -> Subscription {
570        let id = self.next_handler_id();
571        let handler = Arc::new(handler) as MemberHandler;
572        self.member_join_handlers.lock().unwrap().push((id, handler));
573
574        let list = Arc::clone(&self.member_join_handlers);
575        Subscription::new(move || {
576            list.lock().unwrap().retain(|(hid, _)| *hid != id);
577        })
578    }
579
580    pub fn on_member_leave(
581        &self,
582        handler: impl Fn(&Value, &str) + Send + Sync + 'static,
583    ) -> Subscription {
584        let id = self.next_handler_id();
585        let handler = Arc::new(handler) as MemberLeaveHandler;
586        self.member_leave_handlers.lock().unwrap().push((id, handler));
587
588        let list = Arc::clone(&self.member_leave_handlers);
589        Subscription::new(move || {
590            list.lock().unwrap().retain(|(hid, _)| *hid != id);
591        })
592    }
593
594    pub fn on_member_state_change(
595        &self,
596        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
597    ) -> Subscription {
598        let id = self.next_handler_id();
599        let handler = Arc::new(handler) as MemberStateHandler;
600        self.member_state_handlers.lock().unwrap().push((id, handler));
601
602        let list = Arc::clone(&self.member_state_handlers);
603        Subscription::new(move || {
604            list.lock().unwrap().retain(|(hid, _)| *hid != id);
605        })
606    }
607
608    pub fn on_signal(
609        &self,
610        event: &str,
611        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
612    ) -> Subscription {
613        let id = self.next_handler_id();
614        let handler = Arc::new(handler) as SignalHandler;
615        let event = event.to_string();
616        {
617            let mut map = self.signal_handlers.lock().unwrap();
618            map.entry(event.clone())
619                .or_insert_with(Vec::new)
620                .push((id, handler));
621        }
622
623        let map = Arc::clone(&self.signal_handlers);
624        Subscription::new(move || {
625            if let Some(list) = map.lock().unwrap().get_mut(&event) {
626                list.retain(|(hid, _)| *hid != id);
627            }
628        })
629    }
630
631    pub fn on_any_signal(
632        &self,
633        handler: impl Fn(&str, &Value, &Value) + Send + Sync + 'static,
634    ) -> Subscription {
635        let id = self.next_handler_id();
636        let handler = Arc::new(handler) as AnySignalHandler;
637        self.any_signal_handlers.lock().unwrap().push((id, handler));
638
639        let list = Arc::clone(&self.any_signal_handlers);
640        Subscription::new(move || {
641            list.lock().unwrap().retain(|(hid, _)| *hid != id);
642        })
643    }
644
645    pub fn on_reconnect(
646        &self,
647        handler: impl Fn(&Value) + Send + Sync + 'static,
648    ) -> Subscription {
649        let id = self.next_handler_id();
650        let handler = Arc::new(handler) as ReconnectHandler;
651        self.reconnect_handlers.lock().unwrap().push((id, handler));
652
653        let list = Arc::clone(&self.reconnect_handlers);
654        Subscription::new(move || {
655            list.lock().unwrap().retain(|(hid, _)| *hid != id);
656        })
657    }
658
659    pub fn on_connection_state_change(
660        &self,
661        handler: impl Fn(&str) + Send + Sync + 'static,
662    ) -> Subscription {
663        let id = self.next_handler_id();
664        let handler = Arc::new(handler) as ConnectionStateHandler;
665        self.connection_state_handlers
666            .lock()
667            .unwrap()
668            .push((id, handler));
669
670        let list = Arc::clone(&self.connection_state_handlers);
671        Subscription::new(move || {
672            list.lock().unwrap().retain(|(hid, _)| *hid != id);
673        })
674    }
675
676    pub async fn send_signal(
677        &self,
678        event: &str,
679        payload: Option<Value>,
680        options: Option<Value>,
681    ) -> Result<(), Error> {
682        if self.send_tx.lock().unwrap().is_none() {
683            return Err(Error::Room("Not connected to room. Call join() and wait for the room to connect before sending actions or signals.".to_string()));
684        }
685
686        let request_id = Uuid::new_v4().to_string();
687        let options = options.unwrap_or_else(|| json!({}));
688        let message = json!({
689            "type": "signal",
690            "event": event,
691            "payload": payload.unwrap_or_else(|| json!({})),
692            "requestId": request_id,
693            "memberId": options.get("memberId").cloned().unwrap_or(Value::Null),
694            "includeSelf": options.get("includeSelf").and_then(Value::as_bool).unwrap_or(false),
695        });
696
697        self.send_unit_request(&self.pending_signal_requests, request_id, message, format!("Signal '{}' timed out", event)).await
698    }
699
700    pub async fn send_member_state(&self, state: Value) -> Result<(), Error> {
701        if self.send_tx.lock().unwrap().is_none() {
702            return Err(Error::Room("Not connected to room. Call join() and wait for the room to connect before sending actions or signals.".to_string()));
703        }
704
705        let request_id = Uuid::new_v4().to_string();
706        let message = json!({
707            "type": "member_state",
708            "state": state,
709            "requestId": request_id,
710        });
711
712        self.send_unit_request(
713            &self.pending_member_state_requests,
714            request_id,
715            message,
716            "Member state update timed out".to_string(),
717        ).await
718    }
719
720    pub async fn clear_member_state(&self) -> Result<(), Error> {
721        if self.send_tx.lock().unwrap().is_none() {
722            return Err(Error::Room("Not connected to room. Call join() and wait for the room to connect before sending actions or signals.".to_string()));
723        }
724
725        let request_id = Uuid::new_v4().to_string();
726        let message = json!({
727            "type": "member_state_clear",
728            "requestId": request_id,
729        });
730
731        self.send_unit_request(
732            &self.pending_member_state_requests,
733            request_id,
734            message,
735            "Member state clear timed out".to_string(),
736        ).await
737    }
738
739    pub async fn send_admin(
740        &self,
741        operation: &str,
742        member_id: &str,
743        payload: Option<Value>,
744    ) -> Result<(), Error> {
745        if self.send_tx.lock().unwrap().is_none() {
746            return Err(Error::Room("Not connected to room. Call join() and wait for the room to connect before sending actions or signals.".to_string()));
747        }
748
749        let request_id = Uuid::new_v4().to_string();
750        let message = json!({
751            "type": "admin",
752            "operation": operation,
753            "memberId": member_id,
754            "payload": payload.unwrap_or_else(|| json!({})),
755            "requestId": request_id,
756        });
757
758        self.send_unit_request(
759            &self.pending_admin_requests,
760            request_id,
761            message,
762            format!("Admin '{}' timed out", operation),
763        ).await
764    }
765
766    /// Reject all pending requests across all pending maps with an error message.
767    /// Called when the WebSocket disconnects unexpectedly.
768    fn reject_all_pending(&self, message: &str) {
769        let error_msg = message.to_string();
770
771        // Reject pending action requests
772        for (_, tx) in self.pending_requests.lock().unwrap().drain() {
773            let _ = tx.send(Err(Error::Room(error_msg.clone())));
774        }
775
776        // Reject pending signal requests
777        for (_, tx) in self.pending_signal_requests.lock().unwrap().drain() {
778            let _ = tx.send(Err(Error::Room(error_msg.clone())));
779        }
780
781        // Reject pending admin requests
782        for (_, tx) in self.pending_admin_requests.lock().unwrap().drain() {
783            let _ = tx.send(Err(Error::Room(error_msg.clone())));
784        }
785
786        // Reject pending member state requests
787        for (_, tx) in self.pending_member_state_requests.lock().unwrap().drain() {
788            let _ = tx.send(Err(Error::Room(error_msg.clone())));
789        }
790
791    }
792
793    /// Leave the room, clear all handler lists, and release resources.
794    /// After calling destroy(), this RoomClient instance should not be reused.
795    pub async fn destroy(self: &Arc<Self>) {
796        self.leave().await;
797
798        // Clear all handler lists
799        self.shared_state_handlers.lock().unwrap().clear();
800        self.player_state_handlers.lock().unwrap().clear();
801        self.message_handlers.lock().unwrap().clear();
802        self.error_handlers.lock().unwrap().clear();
803        self.kicked_handlers.lock().unwrap().clear();
804        self.member_sync_handlers.lock().unwrap().clear();
805        self.member_join_handlers.lock().unwrap().clear();
806        self.member_leave_handlers.lock().unwrap().clear();
807        self.member_state_handlers.lock().unwrap().clear();
808        self.signal_handlers.lock().unwrap().clear();
809        self.any_signal_handlers.lock().unwrap().clear();
810        self.reconnect_handlers.lock().unwrap().clear();
811        self.connection_state_handlers.lock().unwrap().clear();
812    }
813
814    // ── Private: Connection ──────────────────────────────────────────────────
815
816    fn ws_url(&self) -> String {
817        let u = self
818            .base_url
819            .replace("https://", "wss://")
820            .replace("http://", "ws://");
821        format!(
822            "{}/api/room?namespace={}&id={}",
823            u,
824            urlencoding::encode(&self.namespace),
825            urlencoding::encode(&self.room_id)
826        )
827    }
828
829    /// Send a raw JSON message over the WS write channel.
830    fn ws_send(&self, msg: Value) {
831        let s = msg.to_string();
832        if let Some(tx) = self.send_tx.lock().unwrap().as_ref() {
833            let tx = tx.clone();
834            tokio::spawn(async move {
835                let _ = tx.send(RoomWsCommand::Send(s)).await;
836            });
837        }
838    }
839
840    #[cfg(test)]
841    pub(crate) fn attach_send_channel_for_testing(&self, tx: mpsc::Sender<RoomWsCommand>) {
842        *self.send_tx.lock().unwrap() = Some(tx);
843    }
844
845    #[cfg(test)]
846    pub(crate) fn handle_message_for_testing(&self, raw: &str) {
847        self.handle_message(raw);
848    }
849
850    async fn establish(&self) -> anyhow::Result<mpsc::Receiver<String>> {
851        use futures_util::{SinkExt, StreamExt};
852        use tokio_tungstenite::{connect_async, tungstenite::Message};
853
854        let (ws_stream, _response) = tokio::time::timeout(
855            std::time::Duration::from_millis(self.opts.connection_timeout_ms),
856            connect_async(self.ws_url()),
857        )
858        .await
859        .map_err(|_| anyhow::anyhow!(
860            "Room WebSocket connection timed out after {}ms. Is the server running?",
861            self.opts.connection_timeout_ms,
862        ))??;
863        let (mut write, mut read) = ws_stream.split();
864
865        // Auth
866        let auth = json!({"type": "auth", "token": (self.token_fn)(), "sdkVersion": "0.2.8"});
867        write.send(Message::Text(auth.to_string().into())).await?;
868
869        // Wait for auth_success
870        let join_raw = if let Some(Ok(Message::Text(raw))) = read.next().await {
871            let raw_str: &str = &raw;
872            let resp: Value = serde_json::from_str(raw_str)?;
873            let t = resp["type"].as_str().unwrap_or("");
874            if t != "auth_success" && t != "auth_refreshed" {
875                anyhow::bail!("Room auth failed: {}", resp["message"]);
876            }
877
878            *self.current_user_id.lock().unwrap() = resp["userId"].as_str().map(|value| value.to_string());
879            *self.current_connection_id.lock().unwrap() = resp["connectionId"].as_str().map(|value| value.to_string());
880
881            // v2: join with last known shared + player state for eviction recovery
882            let join_msg = json!({
883                "type": "join",
884                "lastSharedState": *self.shared_state.read().unwrap(),
885                "lastSharedVersion": *self.shared_version.read().unwrap(),
886                "lastPlayerState": *self.player_state.read().unwrap(),
887                "lastPlayerVersion": *self.player_version.read().unwrap(),
888            });
889            join_msg.to_string()
890        } else {
891            anyhow::bail!("Room: no auth response");
892        };
893
894        // Shared WS write channel
895        let (write_tx, mut write_rx) = mpsc::channel::<RoomWsCommand>(128);
896        *self.send_tx.lock().unwrap() = Some(write_tx.clone());
897
898        // WS writer task
899        tokio::spawn(async move {
900            while let Some(command) = write_rx.recv().await {
901                match command {
902                    RoomWsCommand::Send(msg) => {
903                        if write.send(Message::Text(msg.into())).await.is_err() {
904                            break;
905                        }
906                    }
907                    RoomWsCommand::Close => {
908                        let _ = write.send(Message::Close(None)).await;
909                        break;
910                    }
911                }
912            }
913        });
914
915        // Send join message
916        let _ = write_tx.send(RoomWsCommand::Send(join_raw)).await;
917
918        // Heartbeat
919        let htx = write_tx.clone();
920        tokio::spawn(async move {
921            loop {
922                sleep(Duration::from_secs(30)).await;
923                if htx
924                    .send(RoomWsCommand::Send(json!({"type":"ping"}).to_string()))
925                    .await
926                    .is_err()
927                {
928                    break;
929                }
930            }
931        });
932
933        // WS reader → msg_rx channel consumed by join() loop
934        let (msg_tx, msg_rx) = mpsc::channel::<String>(128);
935        tokio::spawn(async move {
936            while let Some(Ok(Message::Text(raw))) = read.next().await {
937                let raw_str: String = raw.into();
938                if msg_tx.send(raw_str).await.is_err() {
939                    break;
940                }
941            }
942        });
943
944        Ok(msg_rx)
945    }
946
947    // ── Private: Message Handling ────────────────────────────────────────────
948
949    fn handle_message(&self, raw: &str) {
950        let msg: Value = match serde_json::from_str(raw) {
951            Ok(v) => v,
952            Err(_) => return,
953        };
954        let t = msg["type"].as_str().unwrap_or("");
955
956        match t {
957            "sync" => self.handle_sync(&msg),
958            "shared_delta" => self.handle_shared_delta(&msg),
959            "player_delta" => self.handle_player_delta(&msg),
960            "action_result" => self.handle_action_result(&msg),
961            "action_error" => self.handle_action_error(&msg),
962            "message" => self.handle_server_message(&msg),
963            "signal" => self.handle_signal(&msg),
964            "signal_sent" => self.resolve_pending_unit_request(&self.pending_signal_requests, &msg),
965            "signal_error" => self.reject_pending_unit_request(&self.pending_signal_requests, &msg, "Signal error"),
966            "members_sync" => self.handle_members_sync(&msg),
967            "member_join" => self.handle_member_join(&msg),
968            "member_leave" => self.handle_member_leave(&msg),
969            "member_state" => self.handle_member_state(&msg),
970            "member_state_error" => self.reject_pending_unit_request(&self.pending_member_state_requests, &msg, "Member state error"),
971            "admin_result" => self.resolve_pending_unit_request(&self.pending_admin_requests, &msg),
972            "admin_error" => self.reject_pending_unit_request(&self.pending_admin_requests, &msg, "Admin error"),
973            "kicked" => self.handle_kicked(),
974            "error" => self.handle_error(&msg),
975            "pong" => {}
976            _ => {}
977        }
978    }
979
980    fn handle_sync(&self, msg: &Value) {
981        *self.shared_state.write().unwrap() = msg["sharedState"].clone();
982        *self.shared_version.write().unwrap() = msg["sharedVersion"].as_u64().unwrap_or(0);
983        *self.player_state.write().unwrap() = msg["playerState"].clone();
984        *self.player_version.write().unwrap() = msg["playerVersion"].as_u64().unwrap_or(0);
985        self.set_connection_state(ROOM_STATE_CONNECTED);
986
987        if let Some(info) = self.reconnect_info.write().unwrap().take() {
988            for (_, handler) in self.reconnect_handlers.lock().unwrap().iter() {
989                handler(&info);
990            }
991        }
992
993        // Notify shared state handlers (full state as changes on sync)
994        let shared = self.shared_state.read().unwrap().clone();
995        for (_, handler) in self.shared_state_handlers.lock().unwrap().iter() {
996            handler(&shared, &shared);
997        }
998        let player = self.player_state.read().unwrap().clone();
999        for (_, handler) in self.player_state_handlers.lock().unwrap().iter() {
1000            handler(&player, &player);
1001        }
1002    }
1003
1004    fn handle_shared_delta(&self, msg: &Value) {
1005        let delta = &msg["delta"];
1006        *self.shared_version.write().unwrap() = msg["version"].as_u64().unwrap_or(0);
1007
1008        if let Value::Object(map) = delta {
1009            let mut state = self.shared_state.write().unwrap();
1010            for (path, value) in map {
1011                deep_set(&mut state, path, value.clone());
1012            }
1013        }
1014
1015        let state = self.shared_state.read().unwrap().clone();
1016        for (_, handler) in self.shared_state_handlers.lock().unwrap().iter() {
1017            handler(&state, delta);
1018        }
1019    }
1020
1021    fn handle_player_delta(&self, msg: &Value) {
1022        let delta = &msg["delta"];
1023        *self.player_version.write().unwrap() = msg["version"].as_u64().unwrap_or(0);
1024
1025        if let Value::Object(map) = delta {
1026            let mut state = self.player_state.write().unwrap();
1027            for (path, value) in map {
1028                deep_set(&mut state, path, value.clone());
1029            }
1030        }
1031
1032        let state = self.player_state.read().unwrap().clone();
1033        for (_, handler) in self.player_state_handlers.lock().unwrap().iter() {
1034            handler(&state, delta);
1035        }
1036    }
1037
1038    fn handle_action_result(&self, msg: &Value) {
1039        let request_id = msg["requestId"].as_str().unwrap_or("");
1040        if let Some(tx) = self.pending_requests.lock().unwrap().remove(request_id) {
1041            let _ = tx.send(Ok(msg["result"].clone()));
1042        }
1043    }
1044
1045    fn handle_action_error(&self, msg: &Value) {
1046        let request_id = msg["requestId"].as_str().unwrap_or("");
1047        if let Some(tx) = self.pending_requests.lock().unwrap().remove(request_id) {
1048            let message = msg["message"].as_str().unwrap_or("Unknown EdgeBase error. Check the server response or logs for details.");
1049            let _ = tx.send(Err(Error::Room(message.to_string())));
1050        }
1051    }
1052
1053    fn handle_server_message(&self, msg: &Value) {
1054        let message_type = msg["messageType"].as_str().unwrap_or("");
1055        let data = &msg["data"];
1056
1057        let handlers = self.message_handlers.lock().unwrap();
1058        if let Some(list) = handlers.get(message_type) {
1059            for (_, handler) in list {
1060                handler(data);
1061            }
1062        }
1063    }
1064
1065    fn handle_signal(&self, msg: &Value) {
1066        let event = msg["event"].as_str().unwrap_or("");
1067        let payload = &msg["payload"];
1068        let meta = &msg["meta"];
1069
1070        if let Some(list) = self.signal_handlers.lock().unwrap().get(event) {
1071            for (_, handler) in list {
1072                handler(payload, meta);
1073            }
1074        }
1075
1076        for (_, handler) in self.any_signal_handlers.lock().unwrap().iter() {
1077            handler(event, payload, meta);
1078        }
1079    }
1080
1081    fn handle_members_sync(&self, msg: &Value) {
1082        let members = normalize_members(&msg["members"]);
1083        *self.members.write().unwrap() = members.clone();
1084
1085        for (_, handler) in self.member_sync_handlers.lock().unwrap().iter() {
1086            handler(&members);
1087        }
1088    }
1089
1090    fn handle_member_join(&self, msg: &Value) {
1091        if let Some(member) = normalize_member(&msg["member"]) {
1092            self.upsert_member(member.clone());
1093            for (_, handler) in self.member_join_handlers.lock().unwrap().iter() {
1094                handler(&member);
1095            }
1096        }
1097    }
1098
1099    fn handle_member_leave(&self, msg: &Value) {
1100        if let Some(member) = normalize_member(&msg["member"]) {
1101            let member_id = member["memberId"].as_str().unwrap_or("");
1102            self.remove_member(member_id);
1103            let reason = msg["reason"].as_str().unwrap_or("leave");
1104            for (_, handler) in self.member_leave_handlers.lock().unwrap().iter() {
1105                handler(&member, reason);
1106            }
1107        }
1108    }
1109
1110    fn handle_member_state(&self, msg: &Value) {
1111        if let Some(mut member) = normalize_member(&msg["member"]) {
1112            let state = object_or_empty(&msg["state"]);
1113            member["state"] = state.clone();
1114            self.upsert_member(member.clone());
1115            self.resolve_pending_unit_request(&self.pending_member_state_requests, msg);
1116
1117            for (_, handler) in self.member_state_handlers.lock().unwrap().iter() {
1118                handler(&member, &state);
1119            }
1120        }
1121    }
1122
1123    fn handle_kicked(&self) {
1124        self.set_connection_state(ROOM_STATE_KICKED);
1125        *self.intentionally_left.lock().unwrap() = true;
1126        for (_, handler) in self.kicked_handlers.lock().unwrap().iter() {
1127            handler();
1128        }
1129    }
1130
1131    fn handle_error(&self, msg: &Value) {
1132        let code = msg["code"].as_str().unwrap_or("");
1133        let message = msg["message"].as_str().unwrap_or("");
1134        for (_, handler) in self.error_handlers.lock().unwrap().iter() {
1135            handler(code, message);
1136        }
1137    }
1138
1139    // ── Private: Helpers ─────────────────────────────────────────────────────
1140
1141    async fn send_unit_request(
1142        &self,
1143        pending: &Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
1144        request_id: String,
1145        message: Value,
1146        timeout_message: String,
1147    ) -> Result<(), Error> {
1148        let (tx, rx) = oneshot::channel::<Result<(), Error>>();
1149        pending.lock().unwrap().insert(request_id.clone(), tx);
1150        self.ws_send(message);
1151
1152        match timeout(Duration::from_millis(self.opts.send_timeout_ms), rx).await {
1153            Ok(Ok(result)) => result,
1154            Ok(Err(_)) => {
1155                pending.lock().unwrap().remove(&request_id);
1156                Err(Error::Room(
1157                    "Room left while waiting for room control result".to_string(),
1158                ))
1159            }
1160            Err(_) => {
1161                pending.lock().unwrap().remove(&request_id);
1162                Err(Error::RoomTimeout(timeout_message))
1163            }
1164        }
1165    }
1166
1167    fn resolve_pending_unit_request(
1168        &self,
1169        pending: &Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
1170        msg: &Value,
1171    ) {
1172        let request_id = msg["requestId"].as_str().unwrap_or("");
1173        if let Some(tx) = pending.lock().unwrap().remove(request_id) {
1174            let _ = tx.send(Ok(()));
1175        }
1176    }
1177
1178    fn reject_pending_unit_request(
1179        &self,
1180        pending: &Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
1181        msg: &Value,
1182        fallback: &str,
1183    ) {
1184        let request_id = msg["requestId"].as_str().unwrap_or("");
1185        if let Some(tx) = pending.lock().unwrap().remove(request_id) {
1186            let message = msg["message"].as_str().unwrap_or(fallback);
1187            let _ = tx.send(Err(Error::Room(message.to_string())));
1188        }
1189    }
1190
1191    fn set_connection_state(&self, next: &str) {
1192        let mut state = self.connection_state.write().unwrap();
1193        if state.as_str() == next {
1194            return;
1195        }
1196        *state = next.to_string();
1197        drop(state);
1198
1199        for (_, handler) in self.connection_state_handlers.lock().unwrap().iter() {
1200            handler(next);
1201        }
1202    }
1203
1204    fn begin_reconnect_attempt(&self, attempt: u64) {
1205        *self.reconnect_info.write().unwrap() = Some(json!({ "attempt": attempt }));
1206        self.set_connection_state(ROOM_STATE_RECONNECTING);
1207    }
1208
1209    fn upsert_member(&self, member: Value) {
1210        let member_id = member["memberId"].as_str().unwrap_or("").to_string();
1211        if member_id.is_empty() {
1212            return;
1213        }
1214
1215        let mut members = self.members.write().unwrap();
1216        let list = members.as_array_mut().expect("members array");
1217        if let Some(existing) = list
1218            .iter_mut()
1219            .find(|entry| entry["memberId"].as_str() == Some(member_id.as_str()))
1220        {
1221            *existing = member;
1222        } else {
1223            list.push(member);
1224        }
1225    }
1226
1227    fn remove_member(&self, member_id: &str) {
1228        let mut members = self.members.write().unwrap();
1229        if let Some(list) = members.as_array_mut() {
1230            list.retain(|entry| entry["memberId"].as_str() != Some(member_id));
1231        }
1232    }
1233
1234    fn next_handler_id(&self) -> u64 {
1235        let mut counter = self.handler_id_counter.lock().unwrap();
1236        *counter += 1;
1237        *counter
1238    }
1239}
1240
1241// ── Helper ────────────────────────────────────────────────────────────────────
1242
1243fn deep_set(obj: &mut Value, path: &str, value: Value) {
1244    if let Some(dot) = path.find('.') {
1245        let head = &path[..dot];
1246        let tail = &path[dot + 1..];
1247        if let Value::Object(map) = obj {
1248            let nested = map.entry(head.to_string()).or_insert(json!({}));
1249            deep_set(nested, tail, value);
1250        }
1251    } else if let Value::Object(map) = obj {
1252        if value.is_null() {
1253            map.remove(path);
1254        } else {
1255            map.insert(path.to_string(), value);
1256        }
1257    }
1258}
1259
1260fn object_or_empty(value: &Value) -> Value {
1261    if value.is_object() {
1262        value.clone()
1263    } else {
1264        json!({})
1265    }
1266}
1267
1268fn normalize_member(value: &Value) -> Option<Value> {
1269    let member_id = value["memberId"].as_str()?;
1270    let user_id = value["userId"].as_str()?;
1271    let mut member = json!({
1272        "memberId": member_id,
1273        "userId": user_id,
1274        "state": object_or_empty(&value["state"]),
1275    });
1276
1277    if let Some(connection_id) = value["connectionId"].as_str() {
1278        member["connectionId"] = Value::String(connection_id.to_string());
1279    }
1280    if let Some(connection_count) = value["connectionCount"].as_u64() {
1281        member["connectionCount"] = Value::from(connection_count);
1282    }
1283    if let Some(role) = value["role"].as_str() {
1284        member["role"] = Value::String(role.to_string());
1285    }
1286    Some(member)
1287}
1288
1289fn normalize_members(value: &Value) -> Value {
1290    Value::Array(
1291        value
1292            .as_array()
1293            .into_iter()
1294            .flatten()
1295            .filter_map(normalize_member)
1296            .collect(),
1297    )
1298}
1299
1300pub struct RoomStateNamespace {
1301    client: Arc<RoomClient>,
1302}
1303
1304impl RoomStateNamespace {
1305    fn new(client: Arc<RoomClient>) -> Self {
1306        Self { client }
1307    }
1308
1309    pub fn get_shared(&self) -> Value {
1310        self.client.get_shared_state()
1311    }
1312
1313    pub fn get_mine(&self) -> Value {
1314        self.client.get_player_state()
1315    }
1316
1317    pub fn on_shared_change(
1318        &self,
1319        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1320    ) -> Subscription {
1321        self.client.on_shared_state(handler)
1322    }
1323
1324    pub fn on_mine_change(
1325        &self,
1326        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1327    ) -> Subscription {
1328        self.client.on_player_state(handler)
1329    }
1330
1331    pub async fn send(&self, action_type: &str, payload: Option<Value>) -> Result<Value, Error> {
1332        self.client.send(action_type, payload).await
1333    }
1334}
1335
1336pub struct RoomMetaNamespace {
1337    client: Arc<RoomClient>,
1338}
1339
1340impl RoomMetaNamespace {
1341    fn new(client: Arc<RoomClient>) -> Self {
1342        Self { client }
1343    }
1344
1345    pub async fn get(&self) -> Result<Value, Error> {
1346        self.client.get_metadata().await
1347    }
1348}
1349
1350pub struct RoomSignalsNamespace {
1351    client: Arc<RoomClient>,
1352}
1353
1354impl RoomSignalsNamespace {
1355    fn new(client: Arc<RoomClient>) -> Self {
1356        Self { client }
1357    }
1358
1359    pub async fn send(
1360        &self,
1361        event: &str,
1362        payload: Option<Value>,
1363        options: Option<Value>,
1364    ) -> Result<(), Error> {
1365        self.client.send_signal(event, payload, options).await
1366    }
1367
1368    pub async fn send_to(
1369        &self,
1370        member_id: &str,
1371        event: &str,
1372        payload: Option<Value>,
1373    ) -> Result<(), Error> {
1374        self.client
1375            .send_signal(event, payload, Some(json!({ "memberId": member_id })))
1376            .await
1377    }
1378
1379    pub fn on(
1380        &self,
1381        event: &str,
1382        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1383    ) -> Subscription {
1384        self.client.on_signal(event, handler)
1385    }
1386
1387    pub fn on_any(
1388        &self,
1389        handler: impl Fn(&str, &Value, &Value) + Send + Sync + 'static,
1390    ) -> Subscription {
1391        self.client.on_any_signal(handler)
1392    }
1393}
1394
1395pub struct RoomMembersNamespace {
1396    client: Arc<RoomClient>,
1397}
1398
1399impl RoomMembersNamespace {
1400    fn new(client: Arc<RoomClient>) -> Self {
1401        Self { client }
1402    }
1403
1404    pub fn list(&self) -> Value {
1405        self.client.list_members()
1406    }
1407
1408    pub fn on_sync(
1409        &self,
1410        handler: impl Fn(&Value) + Send + Sync + 'static,
1411    ) -> Subscription {
1412        self.client.on_members_sync(handler)
1413    }
1414
1415    pub fn on_join(
1416        &self,
1417        handler: impl Fn(&Value) + Send + Sync + 'static,
1418    ) -> Subscription {
1419        self.client.on_member_join(handler)
1420    }
1421
1422    pub fn on_leave(
1423        &self,
1424        handler: impl Fn(&Value, &str) + Send + Sync + 'static,
1425    ) -> Subscription {
1426        self.client.on_member_leave(handler)
1427    }
1428
1429    pub async fn set_state(&self, state: Value) -> Result<(), Error> {
1430        self.client.send_member_state(state).await
1431    }
1432
1433    pub async fn clear_state(&self) -> Result<(), Error> {
1434        self.client.clear_member_state().await
1435    }
1436
1437    pub fn on_state_change(
1438        &self,
1439        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1440    ) -> Subscription {
1441        self.client.on_member_state_change(handler)
1442    }
1443}
1444
1445pub struct RoomAdminNamespace {
1446    client: Arc<RoomClient>,
1447}
1448
1449impl RoomAdminNamespace {
1450    fn new(client: Arc<RoomClient>) -> Self {
1451        Self { client }
1452    }
1453
1454    pub async fn kick(&self, member_id: &str) -> Result<(), Error> {
1455        self.client.send_admin("kick", member_id, None).await
1456    }
1457
1458    pub async fn block(&self, member_id: &str) -> Result<(), Error> {
1459        self.client.send_admin("block", member_id, None).await
1460    }
1461
1462    pub async fn set_role(&self, member_id: &str, role: &str) -> Result<(), Error> {
1463        self.client
1464            .send_admin("setRole", member_id, Some(json!({ "role": role })))
1465            .await
1466    }
1467
1468}
1469
1470pub struct RoomSessionNamespace {
1471    client: Arc<RoomClient>,
1472}
1473
1474impl RoomSessionNamespace {
1475    fn new(client: Arc<RoomClient>) -> Self {
1476        Self { client }
1477    }
1478
1479    pub fn on_error(
1480        &self,
1481        handler: impl Fn(&str, &str) + Send + Sync + 'static,
1482    ) -> Subscription {
1483        self.client.on_error(handler)
1484    }
1485
1486    pub fn on_kicked(&self, handler: impl Fn() + Send + Sync + 'static) -> Subscription {
1487        self.client.on_kicked(handler)
1488    }
1489
1490    pub fn on_reconnect(
1491        &self,
1492        handler: impl Fn(&Value) + Send + Sync + 'static,
1493    ) -> Subscription {
1494        self.client.on_reconnect(handler)
1495    }
1496
1497    pub fn on_connection_state_change(
1498        &self,
1499        handler: impl Fn(&str) + Send + Sync + 'static,
1500    ) -> Subscription {
1501        self.client.on_connection_state_change(handler)
1502    }
1503
1504    pub fn connection_state(&self) -> String {
1505        self.client.connection_state()
1506    }
1507
1508    pub fn user_id(&self) -> Option<String> {
1509        self.client.current_user_id.lock().unwrap().clone()
1510    }
1511
1512    pub fn connection_id(&self) -> Option<String> {
1513        self.client.current_connection_id.lock().unwrap().clone()
1514    }
1515}