use bytes::Bytes;
use clasp_core::{Action, Message, Scope, WelcomeMessage, PROTOCOL_VERSION};
use clasp_transport::TransportSender;
use parking_lot::RwLock;
use std::collections::HashSet;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;
use uuid::Uuid;
pub type SessionId = String;
const DROP_NOTIFICATION_THRESHOLD: u32 = 100; const DROP_WINDOW_SECONDS: u64 = 10; const DROP_NOTIFICATION_COOLDOWN_SECONDS: u64 = 10;
pub struct Session {
pub id: SessionId,
pub name: String,
pub features: Vec<String>,
sender: Arc<dyn TransportSender>,
subscriptions: RwLock<HashSet<u32>>,
pub created_at: Instant,
pub last_activity: RwLock<Instant>,
pub authenticated: bool,
pub token: Option<String>,
pub subject: Option<String>,
scopes: Vec<Scope>,
messages_this_second: AtomicU32,
last_rate_limit_second: AtomicU64,
drops_in_window: AtomicU32,
drop_window_start: AtomicU64,
last_drop_notification: AtomicU64,
total_drops: AtomicU64,
#[cfg(feature = "federation")]
federation_peer: bool,
#[cfg(feature = "federation")]
federation_router_id: parking_lot::RwLock<Option<String>>,
#[cfg(feature = "federation")]
federation_namespaces: parking_lot::RwLock<Vec<String>>,
}
#[doc(hidden)]
struct StubSender;
#[async_trait::async_trait]
impl TransportSender for StubSender {
async fn send(&self, _data: Bytes) -> clasp_transport::Result<()> {
Ok(())
}
fn try_send(&self, _data: Bytes) -> clasp_transport::Result<()> {
Ok(())
}
fn is_connected(&self) -> bool {
false
}
async fn close(&self) -> clasp_transport::Result<()> {
Ok(())
}
}
impl Session {
#[doc(hidden)]
pub fn stub(subject: Option<String>) -> Self {
let mut s = Self::new(Arc::new(StubSender), "test-stub".to_string(), vec![]);
s.subject = subject;
s
}
#[doc(hidden)]
#[cfg(feature = "federation")]
pub fn stub_federation(name: &str) -> Self {
Self::new(
Arc::new(StubSender),
name.to_string(),
vec!["federation".to_string()],
)
}
pub fn new(sender: Arc<dyn TransportSender>, name: String, features: Vec<String>) -> Self {
let now = Instant::now();
#[cfg(feature = "federation")]
let is_federation_peer = features.iter().any(|f| f == "federation");
Self {
id: Uuid::new_v4().to_string(),
name,
features,
sender,
subscriptions: RwLock::new(HashSet::new()),
created_at: now,
last_activity: RwLock::new(now),
authenticated: false,
token: None,
subject: None,
scopes: Vec::new(),
messages_this_second: AtomicU32::new(0),
last_rate_limit_second: AtomicU64::new(0),
drops_in_window: AtomicU32::new(0),
drop_window_start: AtomicU64::new(0),
last_drop_notification: AtomicU64::new(0),
total_drops: AtomicU64::new(0),
#[cfg(feature = "federation")]
federation_peer: is_federation_peer,
#[cfg(feature = "federation")]
federation_router_id: parking_lot::RwLock::new(None),
#[cfg(feature = "federation")]
federation_namespaces: parking_lot::RwLock::new(Vec::new()),
}
}
pub fn set_authenticated(
&mut self,
token: String,
subject: Option<String>,
scopes: Vec<Scope>,
) {
self.authenticated = true;
self.token = Some(token);
self.subject = subject;
self.scopes = scopes;
}
pub fn has_scope(&self, action: Action, address: &str) -> bool {
if self.scopes.is_empty() && !self.authenticated {
return true;
}
self.scopes
.iter()
.any(|scope| scope.allows(action, address))
}
pub fn has_strict_read_scope(&self, address: &str) -> bool {
if self.scopes.is_empty() && !self.authenticated {
return true;
}
self.scopes
.iter()
.any(|scope| scope.action() == Action::Read && scope.allows(Action::Read, address))
}
pub fn scopes(&self) -> &[Scope] {
&self.scopes
}
pub async fn send(&self, data: Bytes) -> Result<(), clasp_transport::TransportError> {
self.sender.send(data).await?;
*self.last_activity.write() = Instant::now();
Ok(())
}
pub fn try_send(&self, data: Bytes) -> Result<(), clasp_transport::TransportError> {
self.sender.try_send(data)?;
*self.last_activity.write() = Instant::now();
Ok(())
}
pub async fn send_message(&self, message: &Message) -> Result<(), clasp_core::Error> {
let data = clasp_core::codec::encode(message)?;
self.send(data)
.await
.map_err(|e| clasp_core::Error::ConnectionError(e.to_string()))?;
Ok(())
}
pub fn welcome_message(&self, server_name: &str, server_features: &[String]) -> Message {
Message::Welcome(WelcomeMessage {
version: PROTOCOL_VERSION,
session: self.id.clone(),
name: server_name.to_string(),
features: server_features.to_vec(),
time: clasp_core::time::now(),
token: None,
})
}
pub fn add_subscription(&self, id: u32) {
self.subscriptions.write().insert(id);
}
pub fn remove_subscription(&self, id: u32) -> bool {
self.subscriptions.write().remove(&id)
}
pub fn subscriptions(&self) -> Vec<u32> {
self.subscriptions.read().iter().cloned().collect()
}
pub fn is_connected(&self) -> bool {
self.sender.is_connected()
}
pub fn touch(&self) {
*self.last_activity.write() = Instant::now();
}
pub fn idle_duration(&self) -> std::time::Duration {
self.last_activity.read().elapsed()
}
pub fn check_rate_limit(&self, max_per_second: u32) -> bool {
if max_per_second == 0 {
return true; }
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let last_second = self.last_rate_limit_second.load(Ordering::Relaxed);
if now != last_second {
self.messages_this_second.store(1, Ordering::Relaxed);
self.last_rate_limit_second.store(now, Ordering::Relaxed);
true
} else {
let count = self.messages_this_second.fetch_add(1, Ordering::Relaxed) + 1;
count <= max_per_second
}
}
pub fn messages_per_second(&self) -> u32 {
self.messages_this_second.load(Ordering::Relaxed)
}
pub fn record_drop(&self) -> bool {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
self.total_drops.fetch_add(1, Ordering::Relaxed);
let window_start = self.drop_window_start.load(Ordering::Relaxed);
if now >= window_start + DROP_WINDOW_SECONDS {
self.drops_in_window.store(1, Ordering::Relaxed);
self.drop_window_start.store(now, Ordering::Relaxed);
return false; }
let drops = self.drops_in_window.fetch_add(1, Ordering::Relaxed) + 1;
if drops >= DROP_NOTIFICATION_THRESHOLD {
let last_notification = self.last_drop_notification.load(Ordering::Relaxed);
if now >= last_notification + DROP_NOTIFICATION_COOLDOWN_SECONDS {
self.last_drop_notification.store(now, Ordering::Relaxed);
return true;
}
}
false
}
pub fn total_drops(&self) -> u64 {
self.total_drops.load(Ordering::Relaxed)
}
pub fn drops_in_window(&self) -> u32 {
self.drops_in_window.load(Ordering::Relaxed)
}
#[cfg(feature = "federation")]
pub fn is_federation_peer(&self) -> bool {
self.federation_peer
}
#[cfg(feature = "federation")]
pub fn federation_router_id(&self) -> Option<String> {
self.federation_router_id.read().clone()
}
#[cfg(feature = "federation")]
pub fn set_federation_router_id(&self, id: String) {
*self.federation_router_id.write() = Some(id);
}
#[cfg(feature = "federation")]
pub fn federation_namespaces(&self) -> Vec<String> {
self.federation_namespaces.read().clone()
}
#[cfg(feature = "federation")]
pub fn set_federation_namespaces(&self, patterns: Vec<String>) {
*self.federation_namespaces.write() = patterns;
}
}
impl std::fmt::Debug for Session {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Session")
.field("id", &self.id)
.field("name", &self.name)
.field("features", &self.features)
.field("authenticated", &self.authenticated)
.field("subject", &self.subject)
.field("scopes", &self.scopes.len())
.finish()
}
}