Skip to main content

myko/server/
client_registry.rs

1//! Global client registry for sending messages to connected WebSocket clients.
2//!
3//! Provides a thread-safe mapping from client_id to WsWriter,
4//! enabling any part of the server to send messages to specific clients.
5
6use std::sync::{Arc, OnceLock};
7
8use dashmap::DashMap;
9use serde::Serialize;
10
11use super::WsWriter;
12use crate::{
13    command::{CommandId, CommandRequest},
14    wire::{MykoMessage, encode_command_message},
15};
16
17/// Thread-safe registry mapping client IDs to their WebSocket writers.
18pub struct ClientRegistry {
19    writers: DashMap<Arc<str>, Arc<dyn WsWriter>>,
20}
21
22impl ClientRegistry {
23    fn new() -> Self {
24        Self {
25            writers: DashMap::new(),
26        }
27    }
28
29    /// Register a client's writer.
30    pub fn register(&self, client_id: Arc<str>, writer: Arc<dyn WsWriter>) {
31        self.writers.insert(client_id, writer);
32    }
33
34    /// Unregister a client's writer.
35    pub fn unregister(&self, client_id: &str) {
36        self.writers.remove(client_id);
37    }
38
39    /// Send a message to a specific client.
40    ///
41    /// Returns `true` if the client was found and the message was sent.
42    pub fn send_to(&self, client_id: &str, msg: MykoMessage) -> bool {
43        if let Some(writer) = self.writers.get(client_id) {
44            writer.send(msg);
45            true
46        } else {
47            false
48        }
49    }
50
51    pub fn send_command_request_to<C>(&self, client_id: &str, request: &CommandRequest<C>) -> bool
52    where
53        C: CommandId + Serialize,
54    {
55        let Some(writer) = self.writers.get(client_id) else {
56            return false;
57        };
58
59        let command_id = request.command_id().to_string();
60        let protocol = writer.protocol();
61
62        match encode_command_message(protocol, request) {
63            Ok(payload) => {
64                writer.send_serialized_command(request.tx.clone(), command_id, payload);
65                true
66            }
67            Err(err) => {
68                log::error!(
69                    "Failed to serialize command {} for client {}: {}",
70                    request.command_id(),
71                    client_id,
72                    err
73                );
74                false
75            }
76        }
77    }
78
79    /// Number of currently connected clients.
80    pub fn len(&self) -> usize {
81        self.writers.len()
82    }
83
84    /// Returns true when there are no connected clients.
85    pub fn is_empty(&self) -> bool {
86        self.writers.is_empty()
87    }
88}
89
90// ─────────────────────────────────────────────────────────────────────────────
91// Global accessor (same pattern as sync_client)
92// ─────────────────────────────────────────────────────────────────────────────
93
94static CLIENT_REGISTRY: OnceLock<Arc<ClientRegistry>> = OnceLock::new();
95
96/// Initialize the global client registry.
97///
98/// Safe to call multiple times — only the first call has effect.
99pub fn init_client_registry() {
100    let _ = CLIENT_REGISTRY.set(Arc::new(ClientRegistry::new()));
101}
102
103/// Get the global client registry.
104///
105/// # Panics
106/// Panics if `init_client_registry()` has not been called.
107pub fn client_registry() -> Arc<ClientRegistry> {
108    CLIENT_REGISTRY
109        .get()
110        .expect("Client registry not initialized - call init_client_registry() first")
111        .clone()
112}
113
114/// Try to get the global client registry.
115///
116/// Returns None if `init_client_registry()` has not been called.
117pub fn try_client_registry() -> Option<Arc<ClientRegistry>> {
118    CLIENT_REGISTRY.get().cloned()
119}