use crate::agents::request_reply::{create_request_reply, send_response, ResponseChannel};
use crate::agents::default_agent_config;
use crate::auth::session::{FlashMessage, SessionData, SessionId};
use acton_reactive::prelude::*;
use chrono::{DateTime, Duration, Utc};
use std::cmp::Reverse;
use std::collections::{BinaryHeap, HashMap};
use tokio::sync::oneshot;
type SessionAgentBuilder = ManagedAgent<Idle, SessionManagerAgent>;
#[cfg(feature = "redis")]
use deadpool_redis::Pool as RedisPool;
#[derive(Debug, Default, Clone)]
pub struct SessionManagerAgent {
sessions: HashMap<SessionId, SessionData>,
expiry_queue: BinaryHeap<Reverse<(DateTime<Utc>, SessionId)>>,
#[cfg(feature = "redis")]
redis: Option<RedisPool>,
}
#[derive(Clone, Debug)]
pub struct LoadSession {
pub session_id: SessionId,
pub response_tx: Option<ResponseChannel<Option<SessionData>>>,
}
impl LoadSession {
#[must_use]
pub const fn new(session_id: SessionId) -> Self {
Self {
session_id,
response_tx: None,
}
}
#[must_use]
pub fn with_response(session_id: SessionId) -> (Self, oneshot::Receiver<Option<SessionData>>) {
let (response_tx, rx) = create_request_reply();
let request = Self {
session_id,
response_tx: Some(response_tx),
};
(request, rx)
}
}
#[derive(Clone, Debug)]
pub struct SaveSession {
pub session_id: SessionId,
pub data: SessionData,
pub response_tx: Option<ResponseChannel<bool>>,
}
impl SaveSession {
#[must_use]
pub const fn new(session_id: SessionId, data: SessionData) -> Self {
Self {
session_id,
data,
response_tx: None,
}
}
#[must_use]
pub fn with_confirmation(
session_id: SessionId,
data: SessionData,
) -> (Self, oneshot::Receiver<bool>) {
let (response_tx, rx) = create_request_reply();
let request = Self {
session_id,
data,
response_tx: Some(response_tx),
};
(request, rx)
}
}
#[derive(Clone, Debug)]
pub struct TakeFlashes {
pub session_id: SessionId,
pub response_tx: Option<ResponseChannel<Vec<FlashMessage>>>,
}
impl TakeFlashes {
#[must_use]
pub const fn new(session_id: SessionId) -> Self {
Self {
session_id,
response_tx: None,
}
}
#[must_use]
pub fn with_response(session_id: SessionId) -> (Self, oneshot::Receiver<Vec<FlashMessage>>) {
let (response_tx, rx) = create_request_reply();
let request = Self {
session_id,
response_tx: Some(response_tx),
};
(request, rx)
}
}
#[derive(Clone, Debug)]
pub struct DeleteSession {
pub session_id: SessionId,
}
#[derive(Clone, Debug)]
pub struct CleanupExpired;
#[derive(Clone, Debug)]
pub struct AddFlash {
pub session_id: SessionId,
pub message: FlashMessage,
}
impl SessionManagerAgent {
pub async fn spawn(runtime: &mut AgentRuntime) -> anyhow::Result<AgentHandle> {
let config = default_agent_config("session_manager")?;
let builder = runtime.new_agent_with_config::<Self>(config).await;
Self::configure_handlers(builder).await
}
#[cfg(feature = "redis")]
pub async fn spawn_with_redis(
runtime: &mut AgentRuntime,
redis_pool: RedisPool,
) -> anyhow::Result<AgentHandle> {
let config = default_agent_config("session_manager")?;
let mut builder = runtime.new_agent_with_config::<Self>(config).await;
builder.model.redis = Some(redis_pool);
Self::configure_handlers(builder).await
}
async fn configure_handlers(mut builder: SessionAgentBuilder) -> anyhow::Result<AgentHandle> {
builder
.act_on::<LoadSession>(|agent, envelope| {
let session_id = envelope.message().session_id.clone();
let response_tx = envelope.message().response_tx.clone();
let session = agent.model.sessions.get(&session_id).cloned();
let reply_envelope = envelope.reply_envelope();
Box::pin(async move {
let result = session.and_then(|mut data| {
if data.validate_and_touch(Duration::hours(24)) {
Some(data)
} else {
None
}
});
if let Some(tx) = response_tx {
let _ = send_response(tx, result.clone()).await;
}
let _: () = reply_envelope.send(result).await;
})
})
.mutate_on::<SaveSession>(|agent, envelope| {
let session_id = envelope.message().session_id.clone();
let data = envelope.message().data.clone();
let response_tx = envelope.message().response_tx.clone();
agent
.model
.sessions
.insert(session_id.clone(), data.clone());
agent
.model
.expiry_queue
.push(Reverse((data.expires_at, session_id)));
AgentReply::from_async(async move {
if let Some(tx) = response_tx {
let _ = send_response(tx, true).await;
}
})
})
.mutate_on::<TakeFlashes>(|agent, envelope| {
let session_id = envelope.message().session_id.clone();
let response_tx = envelope.message().response_tx.clone();
let reply_envelope = envelope.reply_envelope();
let messages = agent
.model
.sessions
.get_mut(&session_id)
.map(|session| std::mem::take(&mut session.flash_messages))
.unwrap_or_default();
AgentReply::from_async(async move {
if let Some(tx) = response_tx {
let _ = send_response(tx, messages.clone()).await;
}
let _: () = reply_envelope.send(messages).await;
})
})
.mutate_on::<DeleteSession>(|agent, envelope| {
agent.model.sessions.remove(&envelope.message().session_id);
AgentReply::immediate()
})
.mutate_on::<CleanupExpired>(|agent, _envelope| {
let now = Utc::now();
let mut expired = Vec::new();
loop {
let should_pop = agent
.model
.expiry_queue
.peek()
.is_some_and(|Reverse((expiry, _))| *expiry <= now);
if should_pop {
if let Some(Reverse((_, session_id))) = agent.model.expiry_queue.pop() {
expired.push(session_id);
}
} else {
break;
}
}
for session_id in expired {
agent.model.sessions.remove(&session_id);
}
AgentReply::immediate()
})
.mutate_on::<AddFlash>(|agent, envelope| {
let session_id = envelope.message().session_id.clone();
let message = envelope.message().message.clone();
if let Some(session) = agent.model.sessions.get_mut(&session_id) {
session.flash_messages.push(message);
}
AgentReply::immediate()
});
Ok(builder.start().await)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test(flavor = "multi_thread")]
async fn test_session_manager_creation() {
let mut runtime = ActonApp::launch();
let result = SessionManagerAgent::spawn(&mut runtime).await;
assert!(result.is_ok());
runtime.shutdown_all().await.expect("Failed to shutdown");
}
#[tokio::test(flavor = "multi_thread")]
async fn test_session_save_and_load_with_verification() {
let mut runtime = ActonApp::launch();
let session_manager = SessionManagerAgent::spawn(&mut runtime).await.unwrap();
let session_id = SessionId::generate();
let mut data = SessionData::new();
data.set("test_key".to_string(), "test_value".to_string())
.unwrap();
session_manager
.send(SaveSession::new(session_id.clone(), data.clone()))
.await;
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
let (request, rx) = LoadSession::with_response(session_id.clone());
session_manager.send(request).await;
let loaded_data = tokio::time::timeout(tokio::time::Duration::from_secs(1), rx)
.await
.expect("Timeout waiting for response")
.expect("Channel closed");
assert!(loaded_data.is_some(), "Session should exist");
let loaded = loaded_data.unwrap();
let loaded_value: Option<String> = loaded.get("test_key").unwrap();
assert_eq!(loaded_value, Some("test_value".to_string()));
runtime.shutdown_all().await.expect("Failed to shutdown");
}
#[tokio::test(flavor = "multi_thread")]
async fn test_session_not_found() {
let mut runtime = ActonApp::launch();
let session_manager = SessionManagerAgent::spawn(&mut runtime).await.unwrap();
let session_id = SessionId::generate();
let (request, rx) = LoadSession::with_response(session_id);
session_manager.send(request).await;
let result = tokio::time::timeout(tokio::time::Duration::from_secs(1), rx)
.await
.expect("Timeout waiting for response")
.expect("Channel closed");
assert!(result.is_none(), "Session should not exist");
runtime.shutdown_all().await.expect("Failed to shutdown");
}
#[tokio::test(flavor = "multi_thread")]
async fn test_session_delete_with_verification() {
let mut runtime = ActonApp::launch();
let session_manager = SessionManagerAgent::spawn(&mut runtime).await.unwrap();
let session_id = SessionId::generate();
let data = SessionData::new();
session_manager
.send(SaveSession::new(session_id.clone(), data))
.await;
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
let (request, rx) = LoadSession::with_response(session_id.clone());
session_manager.send(request).await;
let result = tokio::time::timeout(tokio::time::Duration::from_secs(1), rx)
.await
.expect("Timeout")
.expect("Channel closed");
assert!(result.is_some(), "Session should exist before deletion");
session_manager
.send(DeleteSession {
session_id: session_id.clone(),
})
.await;
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
let (request, rx) = LoadSession::with_response(session_id);
session_manager.send(request).await;
let result = tokio::time::timeout(tokio::time::Duration::from_secs(1), rx)
.await
.expect("Timeout")
.expect("Channel closed");
assert!(result.is_none(), "Session should not exist after deletion");
runtime.shutdown_all().await.expect("Failed to shutdown");
}
#[tokio::test(flavor = "multi_thread")]
async fn test_flash_messages_with_verification() {
let mut runtime = ActonApp::launch();
let session_manager = SessionManagerAgent::spawn(&mut runtime).await.unwrap();
let session_id = SessionId::generate();
let data = SessionData::new();
session_manager
.send(SaveSession::new(session_id.clone(), data))
.await;
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
session_manager
.send(AddFlash {
session_id: session_id.clone(),
message: FlashMessage::success("Success message"),
})
.await;
session_manager
.send(AddFlash {
session_id: session_id.clone(),
message: FlashMessage::error("Error message"),
})
.await;
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
let (request, rx) = TakeFlashes::with_response(session_id.clone());
session_manager.send(request).await;
let flashes = tokio::time::timeout(tokio::time::Duration::from_secs(1), rx)
.await
.expect("Timeout waiting for response")
.expect("Channel closed");
assert_eq!(flashes.len(), 2, "Should have 2 flash messages");
assert_eq!(flashes[0].message, "Success message");
assert_eq!(flashes[1].message, "Error message");
let (request, rx) = TakeFlashes::with_response(session_id);
session_manager.send(request).await;
let flashes = tokio::time::timeout(tokio::time::Duration::from_secs(1), rx)
.await
.expect("Timeout")
.expect("Channel closed");
assert_eq!(flashes.len(), 0, "Flashes should be cleared after taking");
runtime.shutdown_all().await.expect("Failed to shutdown");
}
#[tokio::test(flavor = "multi_thread")]
async fn test_session_expiry_cleanup() {
let mut runtime = ActonApp::launch();
let session_manager = SessionManagerAgent::spawn(&mut runtime).await.unwrap();
let session_id = SessionId::generate();
let mut data = SessionData::new();
data.expires_at = Utc::now() - Duration::hours(1);
session_manager
.send(SaveSession::new(session_id.clone(), data))
.await;
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
session_manager.send(CleanupExpired).await;
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
let (request, rx) = LoadSession::with_response(session_id);
session_manager.send(request).await;
let result = tokio::time::timeout(tokio::time::Duration::from_secs(1), rx)
.await
.expect("Timeout")
.expect("Channel closed");
assert!(result.is_none(), "Expired session should not be returned");
runtime.shutdown_all().await.expect("Failed to shutdown");
}
#[tokio::test(flavor = "multi_thread")]
async fn test_session_touch_extends_expiry() {
let mut runtime = ActonApp::launch();
let session_manager = SessionManagerAgent::spawn(&mut runtime).await.unwrap();
let session_id = SessionId::generate();
let mut data = SessionData::new();
let original_expiry = Utc::now() + Duration::hours(1);
data.expires_at = original_expiry;
session_manager
.send(SaveSession::new(session_id.clone(), data))
.await;
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
let (request, rx) = LoadSession::with_response(session_id);
session_manager.send(request).await;
let loaded = tokio::time::timeout(tokio::time::Duration::from_secs(1), rx)
.await
.expect("Timeout")
.expect("Channel closed");
assert!(loaded.is_some(), "Session should exist");
let loaded_data = loaded.unwrap();
assert!(
loaded_data.expires_at > original_expiry,
"Expiry should be extended after touch"
);
runtime.shutdown_all().await.expect("Failed to shutdown");
}
#[tokio::test(flavor = "multi_thread")]
async fn test_save_with_confirmation() {
let mut runtime = ActonApp::launch();
let session_manager = SessionManagerAgent::spawn(&mut runtime).await.unwrap();
let session_id = SessionId::generate();
let data = SessionData::new();
let (request, rx) = SaveSession::with_confirmation(session_id, data);
session_manager.send(request).await;
let confirmed = tokio::time::timeout(tokio::time::Duration::from_secs(1), rx)
.await
.expect("Timeout waiting for confirmation")
.expect("Channel closed");
assert!(confirmed, "Save should be confirmed");
runtime.shutdown_all().await.expect("Failed to shutdown");
}
#[tokio::test(flavor = "multi_thread")]
async fn test_concurrent_flash_messages() {
let mut runtime = ActonApp::launch();
let session_manager = SessionManagerAgent::spawn(&mut runtime).await.unwrap();
let session_id = SessionId::generate();
let data = SessionData::new();
session_manager
.send(SaveSession::new(session_id.clone(), data))
.await;
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
let handles: Vec<_> = (0..10)
.map(|i| {
let sm = session_manager.clone();
let sid = session_id.clone();
tokio::spawn(async move {
sm.send(AddFlash {
session_id: sid,
message: FlashMessage::info(format!("Message {i}")),
})
.await;
})
})
.collect();
for handle in handles {
handle.await.unwrap();
}
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let (request, rx) = TakeFlashes::with_response(session_id);
session_manager.send(request).await;
let flashes = tokio::time::timeout(tokio::time::Duration::from_secs(1), rx)
.await
.expect("Timeout")
.expect("Channel closed");
assert_eq!(
flashes.len(),
10,
"Should have all 10 flash messages despite concurrent adds"
);
runtime.shutdown_all().await.expect("Failed to shutdown");
}
}