use crate::error::RealtimeError;
use crate::subscription::{Channel, ClientSubscriptions};
use parking_lot::RwLock;
use std::sync::Arc;
use tokio::sync::mpsc;
pub type ClientId = String;
#[derive(Debug)]
pub struct Client {
pub id: ClientId,
sender: mpsc::UnboundedSender<String>,
subscriptions: RwLock<ClientSubscriptions>,
pub metadata: ClientMetadata,
}
impl Client {
pub fn new(id: ClientId, sender: mpsc::UnboundedSender<String>) -> Self {
Self {
id,
sender,
subscriptions: RwLock::new(ClientSubscriptions::new()),
metadata: ClientMetadata::default(),
}
}
pub fn send(&self, message: String) -> Result<(), RealtimeError> {
self.sender
.send(message)
.map_err(|_| RealtimeError::ChannelClosed)
}
pub fn subscribe(&self, channel: Channel) -> Result<bool, RealtimeError> {
self.subscriptions.write().subscribe(channel)
}
pub fn unsubscribe(&self, channel: &Channel) -> bool {
self.subscriptions.write().unsubscribe(channel)
}
pub fn is_subscribed(&self, channel: &Channel) -> bool {
self.subscriptions.read().is_subscribed(channel)
}
pub fn matches_event(&self, event_channel: &str) -> bool {
self.subscriptions.read().matches_event(event_channel)
}
pub fn subscription_count(&self) -> usize {
self.subscriptions.read().count()
}
pub fn clear_subscriptions(&self) {
self.subscriptions.write().clear();
}
}
#[derive(Debug, Default)]
pub struct ClientMetadata {
pub connected_at: u64,
pub user_id: Option<String>,
pub ip_address: Option<String>,
pub user_agent: Option<String>,
}
impl ClientMetadata {
pub fn now() -> Self {
Self {
connected_at: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
user_id: None,
ip_address: None,
user_agent: None,
}
}
}
pub type ClientReceiver = mpsc::UnboundedReceiver<String>;
pub fn create_client(id: ClientId) -> (Arc<Client>, ClientReceiver) {
let (sender, receiver) = mpsc::unbounded_channel();
let client = Arc::new(Client::new(id, sender));
(client, receiver)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_client_creation() {
let (client, _rx) = create_client("test-client".to_string());
assert_eq!(client.id, "test-client");
assert_eq!(client.subscription_count(), 0);
}
#[test]
fn test_client_subscribe() {
let (client, _rx) = create_client("test-client".to_string());
let channel = Channel::parse("repo:alice/myrepo").unwrap();
assert!(client.subscribe(channel.clone()).unwrap());
assert!(client.is_subscribed(&channel));
assert_eq!(client.subscription_count(), 1);
}
#[test]
fn test_client_unsubscribe() {
let (client, _rx) = create_client("test-client".to_string());
let channel = Channel::parse("repo:alice/myrepo").unwrap();
client.subscribe(channel.clone()).unwrap();
assert!(client.unsubscribe(&channel));
assert!(!client.is_subscribed(&channel));
assert_eq!(client.subscription_count(), 0);
}
#[test]
fn test_client_matches_event() {
let (client, _rx) = create_client("test-client".to_string());
client
.subscribe(Channel::parse("repo:alice/myrepo").unwrap())
.unwrap();
assert!(client.matches_event("repo:alice/myrepo"));
assert!(client.matches_event("repo:alice/myrepo/prs"));
assert!(!client.matches_event("repo:bob/otherrepo"));
}
#[test]
fn test_client_send() {
let (client, mut rx) = create_client("test-client".to_string());
client.send("test message".to_string()).unwrap();
let msg = rx.try_recv().unwrap();
assert_eq!(msg, "test message");
}
#[test]
fn test_client_clear_subscriptions() {
let (client, _rx) = create_client("test-client".to_string());
client
.subscribe(Channel::parse("repo:alice/myrepo").unwrap())
.unwrap();
client
.subscribe(Channel::parse("user:alice").unwrap())
.unwrap();
assert_eq!(client.subscription_count(), 2);
client.clear_subscriptions();
assert_eq!(client.subscription_count(), 0);
}
}