use super::backends::{SessionBackend, SessionError};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::mpsc;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ReplicationStrategy {
AsyncReplication,
SyncReplication,
AcknowledgedReplication,
}
#[derive(Debug, Clone)]
enum ReplicationEvent {
Save {
session_key: String,
data: Vec<u8>,
ttl: Option<u64>,
},
Delete { session_key: String },
}
#[derive(Debug, Clone)]
pub struct ReplicationConfig {
pub channel_buffer_size: usize,
pub retry_attempts: u32,
pub retry_delay_ms: u64,
}
impl Default for ReplicationConfig {
fn default() -> Self {
Self {
channel_buffer_size: 1000,
retry_attempts: 3,
retry_delay_ms: 100,
}
}
}
#[derive(Clone)]
pub struct ReplicatedSessionBackend<P, S> {
primary: Arc<P>,
secondary: Arc<S>,
strategy: ReplicationStrategy,
#[allow(dead_code)]
config: ReplicationConfig,
replication_tx: Option<mpsc::UnboundedSender<ReplicationEvent>>,
}
impl<P, S> ReplicatedSessionBackend<P, S>
where
P: SessionBackend + Clone + 'static,
S: SessionBackend + Clone + 'static,
{
pub fn new(primary: P, secondary: S, strategy: ReplicationStrategy) -> Self {
Self::with_config(primary, secondary, strategy, ReplicationConfig::default())
}
pub fn with_config(
primary: P,
secondary: S,
strategy: ReplicationStrategy,
config: ReplicationConfig,
) -> Self {
let primary = Arc::new(primary);
let secondary = Arc::new(secondary);
let replication_tx = if matches!(strategy, ReplicationStrategy::AsyncReplication) {
let (tx, rx) = mpsc::unbounded_channel();
let secondary_clone = Arc::clone(&secondary);
let config_clone = config.clone();
tokio::spawn(async move {
Self::replication_worker(rx, secondary_clone, config_clone).await;
});
Some(tx)
} else {
None
};
Self {
primary,
secondary,
strategy,
config,
replication_tx,
}
}
async fn replication_worker(
mut rx: mpsc::UnboundedReceiver<ReplicationEvent>,
secondary: Arc<S>,
config: ReplicationConfig,
) {
while let Some(event) = rx.recv().await {
let mut attempts = 0;
loop {
let result = match &event {
ReplicationEvent::Save {
session_key,
data,
ttl,
} => {
match serde_json::from_slice::<serde_json::Value>(data) {
Ok(value) => secondary.save(session_key, &value, *ttl).await,
Err(e) => Err(SessionError::SerializationError(e.to_string())),
}
}
ReplicationEvent::Delete { session_key } => secondary.delete(session_key).await,
};
match result {
Ok(_) => break, Err(e) => {
attempts += 1;
if attempts >= config.retry_attempts {
tracing::error!(
event = ?event,
attempts = attempts,
error = %e,
"Replication failed after retries"
);
break;
}
tracing::warn!(
event = ?event,
attempt = attempts,
error = %e,
"Replication failed, retrying"
);
tokio::time::sleep(tokio::time::Duration::from_millis(
config.retry_delay_ms,
))
.await;
}
}
}
}
}
pub fn primary(&self) -> &P {
&self.primary
}
pub fn secondary(&self) -> &S {
&self.secondary
}
pub fn strategy(&self) -> ReplicationStrategy {
self.strategy
}
}
#[async_trait]
impl<P, S> SessionBackend for ReplicatedSessionBackend<P, S>
where
P: SessionBackend + Clone + 'static,
S: SessionBackend + Clone + 'static,
{
async fn load<T>(&self, session_key: &str) -> Result<Option<T>, SessionError>
where
T: for<'de> Deserialize<'de> + Serialize + Send + Sync,
{
let result = self.primary.load(session_key).await?;
if result.is_none() {
return self.secondary.load(session_key).await;
}
Ok(result)
}
async fn save<T>(
&self,
session_key: &str,
data: &T,
ttl: Option<u64>,
) -> Result<(), SessionError>
where
T: Serialize + Send + Sync,
{
match self.strategy {
ReplicationStrategy::AsyncReplication => {
self.primary.save(session_key, data, ttl).await?;
if let Some(ref tx) = self.replication_tx {
let serialized = serde_json::to_vec(data)
.map_err(|e| SessionError::SerializationError(e.to_string()))?;
let _ = tx.send(ReplicationEvent::Save {
session_key: session_key.to_string(),
data: serialized,
ttl,
});
}
Ok(())
}
ReplicationStrategy::SyncReplication => {
let primary_future = self.primary.save(session_key, data, ttl);
let secondary_future = self.secondary.save(session_key, data, ttl);
let (primary_result, secondary_result) =
tokio::join!(primary_future, secondary_future);
primary_result?;
secondary_result?;
Ok(())
}
ReplicationStrategy::AcknowledgedReplication => {
self.primary.save(session_key, data, ttl).await?;
self.secondary.save(session_key, data, ttl).await?;
Ok(())
}
}
}
async fn delete(&self, session_key: &str) -> Result<(), SessionError> {
match self.strategy {
ReplicationStrategy::AsyncReplication => {
self.primary.delete(session_key).await?;
if let Some(ref tx) = self.replication_tx {
let _ = tx.send(ReplicationEvent::Delete {
session_key: session_key.to_string(),
});
}
Ok(())
}
ReplicationStrategy::SyncReplication => {
let primary_future = self.primary.delete(session_key);
let secondary_future = self.secondary.delete(session_key);
let (primary_result, secondary_result) =
tokio::join!(primary_future, secondary_future);
primary_result?;
secondary_result?;
Ok(())
}
ReplicationStrategy::AcknowledgedReplication => {
self.primary.delete(session_key).await?;
self.secondary.delete(session_key).await?;
Ok(())
}
}
}
async fn exists(&self, session_key: &str) -> Result<bool, SessionError> {
let exists = self.primary.exists(session_key).await?;
if !exists {
return self.secondary.exists(session_key).await;
}
Ok(exists)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::sessions::InMemorySessionBackend;
use rstest::rstest;
#[rstest]
#[tokio::test]
async fn test_async_replication_save() {
let primary = InMemorySessionBackend::new();
let secondary = InMemorySessionBackend::new();
let replicated = ReplicatedSessionBackend::new(
primary.clone(),
secondary.clone(),
ReplicationStrategy::AsyncReplication,
);
let data = serde_json::json!({"key": "value"});
replicated.save("test_key", &data, None).await.unwrap();
let primary_data: Option<serde_json::Value> = primary.load("test_key").await.unwrap();
assert_eq!(primary_data.unwrap(), data);
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let secondary_data: Option<serde_json::Value> = secondary.load("test_key").await.unwrap();
assert_eq!(secondary_data.unwrap(), data);
}
#[rstest]
#[tokio::test]
async fn test_sync_replication_save() {
let primary = InMemorySessionBackend::new();
let secondary = InMemorySessionBackend::new();
let replicated = ReplicatedSessionBackend::new(
primary.clone(),
secondary.clone(),
ReplicationStrategy::SyncReplication,
);
let data = serde_json::json!({"key": "value"});
replicated.save("test_key", &data, None).await.unwrap();
let primary_data: Option<serde_json::Value> = primary.load("test_key").await.unwrap();
assert_eq!(primary_data.unwrap(), data);
let secondary_data: Option<serde_json::Value> = secondary.load("test_key").await.unwrap();
assert_eq!(secondary_data.unwrap(), data);
}
#[rstest]
#[tokio::test]
async fn test_acknowledged_replication_save() {
let primary = InMemorySessionBackend::new();
let secondary = InMemorySessionBackend::new();
let replicated = ReplicatedSessionBackend::new(
primary.clone(),
secondary.clone(),
ReplicationStrategy::AcknowledgedReplication,
);
let data = serde_json::json!({"key": "value"});
replicated.save("test_key", &data, None).await.unwrap();
let primary_data: Option<serde_json::Value> = primary.load("test_key").await.unwrap();
assert_eq!(primary_data.unwrap(), data);
let secondary_data: Option<serde_json::Value> = secondary.load("test_key").await.unwrap();
assert_eq!(secondary_data.unwrap(), data);
}
#[rstest]
#[tokio::test]
async fn test_replication_delete() {
let primary = InMemorySessionBackend::new();
let secondary = InMemorySessionBackend::new();
let replicated = ReplicatedSessionBackend::new(
primary.clone(),
secondary.clone(),
ReplicationStrategy::SyncReplication,
);
let data = serde_json::json!({"key": "value"});
replicated.save("test_key", &data, None).await.unwrap();
replicated.delete("test_key").await.unwrap();
assert!(!primary.exists("test_key").await.unwrap());
assert!(!secondary.exists("test_key").await.unwrap());
}
#[rstest]
#[tokio::test]
async fn test_replication_load_fallback() {
let primary = InMemorySessionBackend::new();
let secondary = InMemorySessionBackend::new();
let replicated = ReplicatedSessionBackend::new(
primary.clone(),
secondary.clone(),
ReplicationStrategy::AsyncReplication,
);
let data = serde_json::json!({"key": "value"});
secondary.save("test_key", &data, None).await.unwrap();
let loaded: Option<serde_json::Value> = replicated.load("test_key").await.unwrap();
assert_eq!(loaded.unwrap(), data);
}
#[rstest]
#[tokio::test]
async fn test_replication_config() {
let config = ReplicationConfig {
channel_buffer_size: 2000,
retry_attempts: 5,
retry_delay_ms: 200,
};
let primary = InMemorySessionBackend::new();
let secondary = InMemorySessionBackend::new();
let replicated = ReplicatedSessionBackend::with_config(
primary,
secondary,
ReplicationStrategy::AsyncReplication,
config.clone(),
);
assert_eq!(replicated.config.channel_buffer_size, 2000);
assert_eq!(replicated.config.retry_attempts, 5);
assert_eq!(replicated.config.retry_delay_ms, 200);
}
#[rstest]
#[tokio::test]
async fn test_replication_strategy_getter() {
let primary = InMemorySessionBackend::new();
let secondary = InMemorySessionBackend::new();
let replicated =
ReplicatedSessionBackend::new(primary, secondary, ReplicationStrategy::SyncReplication);
assert_eq!(replicated.strategy(), ReplicationStrategy::SyncReplication);
}
#[rstest]
#[tokio::test]
async fn test_async_replication_delete_propagates() {
let primary = InMemorySessionBackend::new();
let secondary = InMemorySessionBackend::new();
let data = serde_json::json!({"key": "value"});
primary.save("test_key", &data, None).await.unwrap();
secondary.save("test_key", &data, None).await.unwrap();
let replicated = ReplicatedSessionBackend::new(
primary.clone(),
secondary.clone(),
ReplicationStrategy::AsyncReplication,
);
replicated.delete("test_key").await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let primary_data: Option<serde_json::Value> = primary.load("test_key").await.unwrap();
assert_eq!(primary_data, None);
let secondary_data: Option<serde_json::Value> = secondary.load("test_key").await.unwrap();
assert_eq!(secondary_data, None);
}
#[rstest]
#[tokio::test]
async fn test_sync_replication_delete_both_removed() {
let primary = InMemorySessionBackend::new();
let secondary = InMemorySessionBackend::new();
let data = serde_json::json!({"key": "value"});
let replicated = ReplicatedSessionBackend::new(
primary.clone(),
secondary.clone(),
ReplicationStrategy::SyncReplication,
);
replicated.save("test_key", &data, None).await.unwrap();
replicated.delete("test_key").await.unwrap();
let primary_data: Option<serde_json::Value> = primary.load("test_key").await.unwrap();
assert_eq!(primary_data, None);
let secondary_data: Option<serde_json::Value> = secondary.load("test_key").await.unwrap();
assert_eq!(secondary_data, None);
}
#[rstest]
#[tokio::test]
async fn test_load_prefers_primary() {
let primary = InMemorySessionBackend::new();
let secondary = InMemorySessionBackend::new();
let primary_data = serde_json::json!({"source": "primary_data"});
let secondary_data = serde_json::json!({"source": "secondary_data"});
primary.save("test_key", &primary_data, None).await.unwrap();
secondary
.save("test_key", &secondary_data, None)
.await
.unwrap();
let replicated = ReplicatedSessionBackend::new(
primary.clone(),
secondary.clone(),
ReplicationStrategy::SyncReplication,
);
let loaded: Option<serde_json::Value> = replicated.load("test_key").await.unwrap();
assert_eq!(loaded.unwrap(), primary_data);
}
#[rstest]
#[tokio::test]
async fn test_acknowledged_replication_save_both() {
let primary = InMemorySessionBackend::new();
let secondary = InMemorySessionBackend::new();
let replicated = ReplicatedSessionBackend::new(
primary.clone(),
secondary.clone(),
ReplicationStrategy::AcknowledgedReplication,
);
let data = serde_json::json!({"key": "value"});
replicated.save("test_key", &data, None).await.unwrap();
assert!(primary.exists("test_key").await.unwrap());
assert!(secondary.exists("test_key").await.unwrap());
}
}