pub struct ClientManager { /* private fields */ }Expand description
Manages all connected WebSocket clients using lock-free DashMap.
Key design decisions:
- Uses DashMap for lock-free concurrent access to client registry
- Uses try_send instead of send to never block on slow clients
- Disconnects clients that are backpressured (queue full) to prevent cascade failures
- All public methods are non-blocking or use fine-grained per-key locks
- Supports configurable rate limiting per IP, subject, and global defaults
Implementations§
Source§impl ClientManager
impl ClientManager
pub fn new() -> Self
Sourcepub fn with_config(config: RateLimitConfig) -> Self
pub fn with_config(config: RateLimitConfig) -> Self
Create a new ClientManager with the given rate limit configuration
Sourcepub fn from_env() -> Self
pub fn from_env() -> Self
Load configuration from environment variables
See RateLimitConfig::from_env for supported variables.
Sourcepub fn with_timeout(self, timeout: Duration) -> Self
pub fn with_timeout(self, timeout: Duration) -> Self
Set the client timeout for stale client cleanup
Sourcepub fn with_message_queue_size(self, queue_size: usize) -> Self
pub fn with_message_queue_size(self, queue_size: usize) -> Self
Set the message queue size per client
Sourcepub fn with_max_connections_per_ip(self, max: usize) -> Self
pub fn with_max_connections_per_ip(self, max: usize) -> Self
Set a global limit on connections per IP address
Sourcepub fn with_rate_limit_window(self, window: Duration) -> Self
pub fn with_rate_limit_window(self, window: Duration) -> Self
Set the rate limit window duration
Sourcepub fn with_default_limits(self, limits: Limits) -> Self
pub fn with_default_limits(self, limits: Limits) -> Self
Set default limits applied when auth token doesn’t specify limits
These limits act as server-wide fallbacks for connections where the authentication token doesn’t include explicit limits.
Sourcepub fn with_rate_limiter(self, rate_limiter: Arc<WebSocketRateLimiter>) -> Self
pub fn with_rate_limiter(self, rate_limiter: Arc<WebSocketRateLimiter>) -> Self
Set a WebSocket rate limiter for granular rate control
Sourcepub fn rate_limiter(&self) -> Option<&WebSocketRateLimiter>
pub fn rate_limiter(&self) -> Option<&WebSocketRateLimiter>
Get the rate limiter if configured
Sourcepub fn rate_limit_config(&self) -> &RateLimitConfig
pub fn rate_limit_config(&self) -> &RateLimitConfig
Get the current rate limit configuration
Sourcepub fn add_client(
&self,
client_id: Uuid,
ws_sender: WebSocketSender,
auth_context: Option<AuthContext>,
remote_addr: SocketAddr,
)
pub fn add_client( &self, client_id: Uuid, ws_sender: WebSocketSender, auth_context: Option<AuthContext>, remote_addr: SocketAddr, )
Add a new client connection.
Spawns a dedicated sender task for this client that reads from its mpsc channel and writes to the WebSocket. If the WebSocket write fails, the client is automatically removed from the registry.
Sourcepub fn remove_client(&self, client_id: Uuid)
pub fn remove_client(&self, client_id: Uuid)
Remove a client from the registry.
Sourcepub fn update_client_auth(
&self,
client_id: Uuid,
auth_context: AuthContext,
) -> bool
pub fn update_client_auth( &self, client_id: Uuid, auth_context: AuthContext, ) -> bool
Update the auth context for a client.
Used for in-band auth refresh without reconnecting.
Sourcepub fn check_and_remove_expired(&self, client_id: Uuid) -> bool
pub fn check_and_remove_expired(&self, client_id: Uuid) -> bool
Check if a client’s token has expired.
Returns true if the client has an auth context and it has expired. If expired, the client is removed from the registry.
Sourcepub fn client_count(&self) -> usize
pub fn client_count(&self) -> usize
Get the current number of connected clients.
This is lock-free and returns an approximate count (may be slightly stale under high concurrency, which is fine for max_clients checks).
Sourcepub fn send_to_client(
&self,
client_id: Uuid,
data: Arc<Bytes>,
) -> Result<(), SendError>
pub fn send_to_client( &self, client_id: Uuid, data: Arc<Bytes>, ) -> Result<(), SendError>
Send data to a specific client (non-blocking).
This method NEVER blocks. If the client’s queue is full, the client is considered too slow and is disconnected to prevent cascade failures. Use this for live streaming updates.
For initial snapshots where you expect to send many messages at once,
use send_to_client_async instead which will wait for queue space.
Sourcepub async fn send_to_client_async(
&self,
client_id: Uuid,
data: Arc<Bytes>,
) -> Result<(), SendError>
pub async fn send_to_client_async( &self, client_id: Uuid, data: Arc<Bytes>, ) -> Result<(), SendError>
Send data to a specific client (async, waits for queue space).
This method will wait if the client’s queue is full, allowing the client time to catch up. Use this for initial snapshots where you need to send many messages at once.
For live streaming updates, use send_to_client instead which will
disconnect slow clients rather than blocking.
Sourcepub async fn send_text_to_client(
&self,
client_id: Uuid,
text: String,
) -> Result<(), SendError>
pub async fn send_text_to_client( &self, client_id: Uuid, text: String, ) -> Result<(), SendError>
Send a text message to a specific client (async).
This method sends a text message directly to the client’s WebSocket. Used for control messages like auth refresh responses.
Sourcepub async fn send_compressed_async(
&self,
client_id: Uuid,
payload: CompressedPayload,
) -> Result<(), SendError>
pub async fn send_compressed_async( &self, client_id: Uuid, payload: CompressedPayload, ) -> Result<(), SendError>
Send a potentially compressed payload to a client (async).
Compressed payloads are sent as binary frames (raw gzip). Uncompressed payloads are sent as text frames (JSON).
Sourcepub fn update_subscription(
&self,
client_id: Uuid,
subscription: Subscription,
) -> bool
pub fn update_subscription( &self, client_id: Uuid, subscription: Subscription, ) -> bool
Update the subscription for a client.
Sourcepub fn update_client_last_seen(&self, client_id: Uuid)
pub fn update_client_last_seen(&self, client_id: Uuid)
Update the last_seen timestamp for a client.
Sourcepub fn check_inbound_message_allowed(
&self,
client_id: Uuid,
) -> Result<(), AuthDeny>
pub fn check_inbound_message_allowed( &self, client_id: Uuid, ) -> Result<(), AuthDeny>
Check whether an inbound message is allowed for a client.
Sourcepub fn get_subscription(&self, client_id: Uuid) -> Option<Subscription>
pub fn get_subscription(&self, client_id: Uuid) -> Option<Subscription>
Get the subscription for a client.
Sourcepub fn has_client(&self, client_id: Uuid) -> bool
pub fn has_client(&self, client_id: Uuid) -> bool
Check if a client exists.
pub async fn add_client_subscription( &self, client_id: Uuid, sub_key: String, token: CancellationToken, ) -> bool
pub async fn remove_client_subscription( &self, client_id: Uuid, sub_key: &str, ) -> bool
pub async fn cancel_all_client_subscriptions(&self, client_id: Uuid)
Sourcepub fn cleanup_stale_clients(&self) -> usize
pub fn cleanup_stale_clients(&self) -> usize
Remove stale clients that haven’t been seen within the timeout period.
Sourcepub fn start_cleanup_task(&self)
pub fn start_cleanup_task(&self)
Start a background task that periodically cleans up stale clients.
Sourcepub async fn check_connection_allowed(
&self,
remote_addr: SocketAddr,
auth_context: &Option<AuthContext>,
) -> Result<(), AuthDeny>
pub async fn check_connection_allowed( &self, remote_addr: SocketAddr, auth_context: &Option<AuthContext>, ) -> Result<(), AuthDeny>
ENFORCEMENT HOOKS
These methods provide hooks for enforcing limits based on auth context. They check limits before allowing operations and return errors if limits are exceeded. Check if a connection is allowed for the given auth context.
Returns Ok(()) if the connection is allowed, or an error with a reason if not.
Sourcepub async fn check_subscription_allowed(
&self,
client_id: Uuid,
) -> Result<(), AuthDeny>
pub async fn check_subscription_allowed( &self, client_id: Uuid, ) -> Result<(), AuthDeny>
Check if a subscription is allowed for the given client.
Returns Ok(()) if the subscription is allowed, or an error with a reason if not.
Sourcepub fn get_metering_key(&self, client_id: Uuid) -> Option<String>
pub fn get_metering_key(&self, client_id: Uuid) -> Option<String>
Get metering key for a client
Sourcepub fn get_auth_context(&self, client_id: Uuid) -> Option<AuthContext>
pub fn get_auth_context(&self, client_id: Uuid) -> Option<AuthContext>
Get auth context for a client.
Trait Implementations§
Source§impl Clone for ClientManager
impl Clone for ClientManager
Source§fn clone(&self) -> ClientManager
fn clone(&self) -> ClientManager
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read more