use super::backends::{SessionBackend, SessionError};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
mod logger;
pub use logger::LoggerAnalytics;
#[cfg(feature = "analytics-prometheus")]
mod prometheus;
#[cfg(feature = "analytics-prometheus")]
pub use self::prometheus::PrometheusAnalytics;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum DeletionReason {
Explicit,
Expired,
Invalidated,
Replaced,
}
#[derive(Debug, Clone)]
pub enum SessionEvent {
Created {
session_key: String,
size_bytes: usize,
ttl_secs: Option<u64>,
timestamp: DateTime<Utc>,
},
Accessed {
session_key: String,
latency_ms: u64,
hit: bool,
timestamp: DateTime<Utc>,
},
Deleted {
session_key: String,
reason: DeletionReason,
timestamp: DateTime<Utc>,
},
Expired {
session_key: String,
age_secs: u64,
timestamp: DateTime<Utc>,
},
}
#[async_trait]
pub trait SessionAnalytics: Send + Sync {
async fn record_event(&self, event: SessionEvent);
}
#[derive(Clone)]
pub struct CompositeAnalytics {
backends: Vec<Arc<dyn SessionAnalytics>>,
}
impl CompositeAnalytics {
pub fn new() -> Self {
Self {
backends: Vec::new(),
}
}
pub fn add<A: SessionAnalytics + 'static>(&mut self, analytics: A) {
self.backends.push(Arc::new(analytics));
}
}
impl Default for CompositeAnalytics {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl SessionAnalytics for CompositeAnalytics {
async fn record_event(&self, event: SessionEvent) {
for backend in &self.backends {
backend.record_event(event.clone()).await;
}
}
}
#[derive(Clone)]
pub struct InstrumentedSessionBackend<B, A> {
backend: B,
analytics: A,
}
impl<B, A> InstrumentedSessionBackend<B, A>
where
B: SessionBackend + Clone,
A: SessionAnalytics + Clone,
{
pub fn new(backend: B, analytics: A) -> Self {
Self { backend, analytics }
}
pub fn backend(&self) -> &B {
&self.backend
}
pub fn analytics(&self) -> &A {
&self.analytics
}
}
#[async_trait]
impl<B, A> SessionBackend for InstrumentedSessionBackend<B, A>
where
B: SessionBackend + Clone,
A: SessionAnalytics + Clone,
{
async fn load<T>(&self, session_key: &str) -> Result<Option<T>, SessionError>
where
T: for<'de> Deserialize<'de> + Serialize + Send + Sync,
{
let start = std::time::Instant::now();
let result = self.backend.load(session_key).await;
let latency_ms = start.elapsed().as_millis() as u64;
let hit = result.as_ref().map(|opt| opt.is_some()).unwrap_or(false);
self.analytics
.record_event(SessionEvent::Accessed {
session_key: session_key.to_string(),
latency_ms,
hit,
timestamp: Utc::now(),
})
.await;
result
}
async fn save<T>(
&self,
session_key: &str,
data: &T,
ttl: Option<u64>,
) -> Result<(), SessionError>
where
T: Serialize + Send + Sync,
{
let serialized = serde_json::to_vec(data)
.map_err(|e| SessionError::SerializationError(e.to_string()))?;
let size_bytes = serialized.len();
let result = self.backend.save(session_key, data, ttl).await;
if result.is_ok() {
self.analytics
.record_event(SessionEvent::Created {
session_key: session_key.to_string(),
size_bytes,
ttl_secs: ttl,
timestamp: Utc::now(),
})
.await;
}
result
}
async fn delete(&self, session_key: &str) -> Result<(), SessionError> {
let result = self.backend.delete(session_key).await;
if result.is_ok() {
self.analytics
.record_event(SessionEvent::Deleted {
session_key: session_key.to_string(),
reason: DeletionReason::Explicit,
timestamp: Utc::now(),
})
.await;
}
result
}
async fn exists(&self, session_key: &str) -> Result<bool, SessionError> {
self.backend.exists(session_key).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::sessions::InMemorySessionBackend;
#[tokio::test]
async fn test_instrumented_backend_save() {
let backend = InMemorySessionBackend::new();
let analytics = LoggerAnalytics::new();
let instrumented = InstrumentedSessionBackend::new(backend, analytics);
let data = serde_json::json!({"key": "value"});
instrumented
.save("test_key", &data, Some(3600))
.await
.unwrap();
let loaded: Option<serde_json::Value> = instrumented.load("test_key").await.unwrap();
assert_eq!(loaded.unwrap(), data);
}
#[tokio::test]
async fn test_instrumented_backend_delete() {
let backend = InMemorySessionBackend::new();
let analytics = LoggerAnalytics::new();
let instrumented = InstrumentedSessionBackend::new(backend, analytics);
let data = serde_json::json!({"key": "value"});
instrumented.save("test_key", &data, None).await.unwrap();
assert!(instrumented.exists("test_key").await.unwrap());
instrumented.delete("test_key").await.unwrap();
assert!(!instrumented.exists("test_key").await.unwrap());
}
#[tokio::test]
async fn test_composite_analytics() {
let mut composite = CompositeAnalytics::new();
composite.add(LoggerAnalytics::new());
let backend = InMemorySessionBackend::new();
let instrumented = InstrumentedSessionBackend::new(backend, composite);
let data = serde_json::json!({"key": "value"});
instrumented.save("test_key", &data, None).await.unwrap();
}
}