1use crate::error::RealtimeError;
4use crate::subscription::{Channel, ClientSubscriptions};
5use parking_lot::RwLock;
6use std::sync::Arc;
7use tokio::sync::mpsc;
8
9pub type ClientId = String;
11
12#[derive(Debug)]
14pub struct Client {
15 pub id: ClientId,
17 sender: mpsc::UnboundedSender<String>,
19 subscriptions: RwLock<ClientSubscriptions>,
21 pub metadata: ClientMetadata,
23}
24
25impl Client {
26 pub fn new(id: ClientId, sender: mpsc::UnboundedSender<String>) -> Self {
28 Self {
29 id,
30 sender,
31 subscriptions: RwLock::new(ClientSubscriptions::new()),
32 metadata: ClientMetadata::default(),
33 }
34 }
35
36 pub fn send(&self, message: String) -> Result<(), RealtimeError> {
38 self.sender
39 .send(message)
40 .map_err(|_| RealtimeError::ChannelClosed)
41 }
42
43 pub fn subscribe(&self, channel: Channel) -> Result<bool, RealtimeError> {
45 self.subscriptions.write().subscribe(channel)
46 }
47
48 pub fn unsubscribe(&self, channel: &Channel) -> bool {
50 self.subscriptions.write().unsubscribe(channel)
51 }
52
53 pub fn is_subscribed(&self, channel: &Channel) -> bool {
55 self.subscriptions.read().is_subscribed(channel)
56 }
57
58 pub fn matches_event(&self, event_channel: &str) -> bool {
60 self.subscriptions.read().matches_event(event_channel)
61 }
62
63 pub fn subscription_count(&self) -> usize {
65 self.subscriptions.read().count()
66 }
67
68 pub fn clear_subscriptions(&self) {
70 self.subscriptions.write().clear();
71 }
72}
73
74#[derive(Debug, Default)]
76pub struct ClientMetadata {
77 pub connected_at: u64,
79 pub user_id: Option<String>,
81 pub ip_address: Option<String>,
83 pub user_agent: Option<String>,
85}
86
87impl ClientMetadata {
88 pub fn now() -> Self {
90 Self {
91 connected_at: std::time::SystemTime::now()
92 .duration_since(std::time::UNIX_EPOCH)
93 .unwrap_or_default()
94 .as_secs(),
95 user_id: None,
96 ip_address: None,
97 user_agent: None,
98 }
99 }
100}
101
102pub type ClientReceiver = mpsc::UnboundedReceiver<String>;
104
105pub fn create_client(id: ClientId) -> (Arc<Client>, ClientReceiver) {
107 let (sender, receiver) = mpsc::unbounded_channel();
108 let client = Arc::new(Client::new(id, sender));
109 (client, receiver)
110}
111
112#[cfg(test)]
113mod tests {
114 use super::*;
115
116 #[test]
117 fn test_client_creation() {
118 let (client, _rx) = create_client("test-client".to_string());
119 assert_eq!(client.id, "test-client");
120 assert_eq!(client.subscription_count(), 0);
121 }
122
123 #[test]
124 fn test_client_subscribe() {
125 let (client, _rx) = create_client("test-client".to_string());
126
127 let channel = Channel::parse("repo:alice/myrepo").unwrap();
128 assert!(client.subscribe(channel.clone()).unwrap());
129 assert!(client.is_subscribed(&channel));
130 assert_eq!(client.subscription_count(), 1);
131 }
132
133 #[test]
134 fn test_client_unsubscribe() {
135 let (client, _rx) = create_client("test-client".to_string());
136
137 let channel = Channel::parse("repo:alice/myrepo").unwrap();
138 client.subscribe(channel.clone()).unwrap();
139 assert!(client.unsubscribe(&channel));
140 assert!(!client.is_subscribed(&channel));
141 assert_eq!(client.subscription_count(), 0);
142 }
143
144 #[test]
145 fn test_client_matches_event() {
146 let (client, _rx) = create_client("test-client".to_string());
147
148 client
149 .subscribe(Channel::parse("repo:alice/myrepo").unwrap())
150 .unwrap();
151
152 assert!(client.matches_event("repo:alice/myrepo"));
153 assert!(client.matches_event("repo:alice/myrepo/prs"));
154 assert!(!client.matches_event("repo:bob/otherrepo"));
155 }
156
157 #[test]
158 fn test_client_send() {
159 let (client, mut rx) = create_client("test-client".to_string());
160
161 client.send("test message".to_string()).unwrap();
162
163 let msg = rx.try_recv().unwrap();
165 assert_eq!(msg, "test message");
166 }
167
168 #[test]
169 fn test_client_clear_subscriptions() {
170 let (client, _rx) = create_client("test-client".to_string());
171
172 client
173 .subscribe(Channel::parse("repo:alice/myrepo").unwrap())
174 .unwrap();
175 client
176 .subscribe(Channel::parse("user:alice").unwrap())
177 .unwrap();
178
179 assert_eq!(client.subscription_count(), 2);
180
181 client.clear_subscriptions();
182 assert_eq!(client.subscription_count(), 0);
183 }
184}