guts_realtime/
client.rs

1//! Client connection management.
2
3use crate::error::RealtimeError;
4use crate::subscription::{Channel, ClientSubscriptions};
5use parking_lot::RwLock;
6use std::sync::Arc;
7use tokio::sync::mpsc;
8
9/// Unique identifier for a connected client.
10pub type ClientId = String;
11
12/// A connected WebSocket client.
13#[derive(Debug)]
14pub struct Client {
15    /// Unique client identifier.
16    pub id: ClientId,
17    /// Channel for sending messages to this client.
18    sender: mpsc::UnboundedSender<String>,
19    /// Client's subscriptions.
20    subscriptions: RwLock<ClientSubscriptions>,
21    /// Connection metadata.
22    pub metadata: ClientMetadata,
23}
24
25impl Client {
26    /// Create a new client with a message sender.
27    pub fn new(id: ClientId, sender: mpsc::UnboundedSender<String>) -> Self {
28        Self {
29            id,
30            sender,
31            subscriptions: RwLock::new(ClientSubscriptions::new()),
32            metadata: ClientMetadata::default(),
33        }
34    }
35
36    /// Send a message to this client.
37    pub fn send(&self, message: String) -> Result<(), RealtimeError> {
38        self.sender
39            .send(message)
40            .map_err(|_| RealtimeError::ChannelClosed)
41    }
42
43    /// Subscribe to a channel.
44    pub fn subscribe(&self, channel: Channel) -> Result<bool, RealtimeError> {
45        self.subscriptions.write().subscribe(channel)
46    }
47
48    /// Unsubscribe from a channel.
49    pub fn unsubscribe(&self, channel: &Channel) -> bool {
50        self.subscriptions.write().unsubscribe(channel)
51    }
52
53    /// Check if subscribed to a channel.
54    pub fn is_subscribed(&self, channel: &Channel) -> bool {
55        self.subscriptions.read().is_subscribed(channel)
56    }
57
58    /// Check if any subscription matches an event channel.
59    pub fn matches_event(&self, event_channel: &str) -> bool {
60        self.subscriptions.read().matches_event(event_channel)
61    }
62
63    /// Get subscription count.
64    pub fn subscription_count(&self) -> usize {
65        self.subscriptions.read().count()
66    }
67
68    /// Clear all subscriptions.
69    pub fn clear_subscriptions(&self) {
70        self.subscriptions.write().clear();
71    }
72}
73
74/// Metadata about a client connection.
75#[derive(Debug, Default)]
76pub struct ClientMetadata {
77    /// When the client connected (Unix timestamp).
78    pub connected_at: u64,
79    /// Optional user identifier if authenticated.
80    pub user_id: Option<String>,
81    /// Client IP address.
82    pub ip_address: Option<String>,
83    /// User agent string.
84    pub user_agent: Option<String>,
85}
86
87impl ClientMetadata {
88    /// Create metadata with current timestamp.
89    pub fn now() -> Self {
90        Self {
91            connected_at: std::time::SystemTime::now()
92                .duration_since(std::time::UNIX_EPOCH)
93                .unwrap_or_default()
94                .as_secs(),
95            user_id: None,
96            ip_address: None,
97            user_agent: None,
98        }
99    }
100}
101
102/// Handle for receiving messages from the hub to send to WebSocket.
103pub type ClientReceiver = mpsc::UnboundedReceiver<String>;
104
105/// Create a new client with its message receiver.
106pub fn create_client(id: ClientId) -> (Arc<Client>, ClientReceiver) {
107    let (sender, receiver) = mpsc::unbounded_channel();
108    let client = Arc::new(Client::new(id, sender));
109    (client, receiver)
110}
111
112#[cfg(test)]
113mod tests {
114    use super::*;
115
116    #[test]
117    fn test_client_creation() {
118        let (client, _rx) = create_client("test-client".to_string());
119        assert_eq!(client.id, "test-client");
120        assert_eq!(client.subscription_count(), 0);
121    }
122
123    #[test]
124    fn test_client_subscribe() {
125        let (client, _rx) = create_client("test-client".to_string());
126
127        let channel = Channel::parse("repo:alice/myrepo").unwrap();
128        assert!(client.subscribe(channel.clone()).unwrap());
129        assert!(client.is_subscribed(&channel));
130        assert_eq!(client.subscription_count(), 1);
131    }
132
133    #[test]
134    fn test_client_unsubscribe() {
135        let (client, _rx) = create_client("test-client".to_string());
136
137        let channel = Channel::parse("repo:alice/myrepo").unwrap();
138        client.subscribe(channel.clone()).unwrap();
139        assert!(client.unsubscribe(&channel));
140        assert!(!client.is_subscribed(&channel));
141        assert_eq!(client.subscription_count(), 0);
142    }
143
144    #[test]
145    fn test_client_matches_event() {
146        let (client, _rx) = create_client("test-client".to_string());
147
148        client
149            .subscribe(Channel::parse("repo:alice/myrepo").unwrap())
150            .unwrap();
151
152        assert!(client.matches_event("repo:alice/myrepo"));
153        assert!(client.matches_event("repo:alice/myrepo/prs"));
154        assert!(!client.matches_event("repo:bob/otherrepo"));
155    }
156
157    #[test]
158    fn test_client_send() {
159        let (client, mut rx) = create_client("test-client".to_string());
160
161        client.send("test message".to_string()).unwrap();
162
163        // Message should be in the receiver
164        let msg = rx.try_recv().unwrap();
165        assert_eq!(msg, "test message");
166    }
167
168    #[test]
169    fn test_client_clear_subscriptions() {
170        let (client, _rx) = create_client("test-client".to_string());
171
172        client
173            .subscribe(Channel::parse("repo:alice/myrepo").unwrap())
174            .unwrap();
175        client
176            .subscribe(Channel::parse("user:alice").unwrap())
177            .unwrap();
178
179        assert_eq!(client.subscription_count(), 2);
180
181        client.clear_subscriptions();
182        assert_eq!(client.subscription_count(), 0);
183    }
184}