use crate::{
application::{
ApplicationResult,
commands::*,
handlers::{CommandHandler, QueryHandler},
queries::*,
},
domain::{
aggregates::{StreamSession, stream_session::SessionHealth},
value_objects::{SessionId, StreamId},
},
};
use std::sync::Arc;
#[derive(Debug)]
pub struct SessionService<CH, QH>
where
CH: CommandHandler<CreateSessionCommand, SessionId>
+ CommandHandler<CreateStreamCommand, StreamId>
+ CommandHandler<StartStreamCommand, ()>
+ CommandHandler<CompleteStreamCommand, ()>
+ CommandHandler<CloseSessionCommand, ()>,
QH: QueryHandler<GetSessionQuery, SessionResponse>
+ QueryHandler<GetSessionHealthQuery, HealthResponse>
+ QueryHandler<GetActiveSessionsQuery, SessionsResponse>,
{
command_handler: Arc<CH>,
query_handler: Arc<QH>,
}
impl<CH, QH> SessionService<CH, QH>
where
CH: CommandHandler<CreateSessionCommand, SessionId>
+ CommandHandler<CreateStreamCommand, StreamId>
+ CommandHandler<StartStreamCommand, ()>
+ CommandHandler<CompleteStreamCommand, ()>
+ CommandHandler<CloseSessionCommand, ()>
+ Send
+ Sync,
QH: QueryHandler<GetSessionQuery, SessionResponse>
+ QueryHandler<GetSessionHealthQuery, HealthResponse>
+ QueryHandler<GetActiveSessionsQuery, SessionsResponse>
+ Send
+ Sync,
{
pub fn new(command_handler: Arc<CH>, query_handler: Arc<QH>) -> Self {
Self {
command_handler,
query_handler,
}
}
pub async fn create_and_activate_session(
&self,
config: crate::domain::aggregates::stream_session::SessionConfig,
client_info: Option<String>,
user_agent: Option<String>,
ip_address: Option<String>,
) -> ApplicationResult<SessionId> {
let create_command = CreateSessionCommand {
config,
client_info,
user_agent,
ip_address,
};
self.command_handler.handle(create_command).await
}
pub async fn create_and_start_stream(
&self,
session_id: SessionId,
source_data: serde_json::Value,
config: Option<crate::domain::entities::stream::StreamConfig>,
) -> ApplicationResult<StreamId> {
let create_command = CreateStreamCommand {
session_id: session_id.into(),
source_data,
config,
};
let stream_id = self.command_handler.handle(create_command).await?;
let start_command = StartStreamCommand {
session_id: session_id.into(),
stream_id: stream_id.into(),
};
self.command_handler.handle(start_command).await?;
Ok(stream_id)
}
pub async fn get_session_with_health(
&self,
session_id: SessionId,
) -> ApplicationResult<SessionWithHealth> {
let session_query = GetSessionQuery { session_id: session_id.into() };
let session_response = self.query_handler.handle(session_query).await?;
let health_query = GetSessionHealthQuery { session_id: session_id.into() };
let health_response = self.query_handler.handle(health_query).await?;
Ok(SessionWithHealth {
session: session_response.session,
health: health_response.health,
})
}
pub async fn complete_stream_and_maybe_close_session(
&self,
session_id: SessionId,
stream_id: StreamId,
) -> ApplicationResult<SessionCompletionResult> {
let complete_command = CompleteStreamCommand {
session_id: session_id.into(),
stream_id: stream_id.into(),
checksum: None,
};
self.command_handler.handle(complete_command).await?;
let session_query = GetSessionQuery { session_id: session_id.into() };
let session_response = self.query_handler.handle(session_query).await?;
let active_streams = session_response
.session
.streams()
.values()
.filter(|s| s.is_active())
.count();
let session_closed = if active_streams == 0 {
let close_command = CloseSessionCommand { session_id: session_id.into() };
self.command_handler.handle(close_command).await?;
true
} else {
false
};
Ok(SessionCompletionResult {
stream_id,
session_closed,
remaining_active_streams: active_streams,
})
}
pub async fn get_sessions_overview(
&self,
limit: Option<usize>,
) -> ApplicationResult<SessionsOverview> {
let query = GetActiveSessionsQuery {
limit,
offset: None,
};
let response = self.query_handler.handle(query).await?;
let mut total_streams = 0u64;
let mut total_frames = 0u64;
let mut total_bytes = 0u64;
let mut healthy_sessions = 0usize;
for session in &response.sessions {
let stats = session.stats();
total_streams += stats.total_streams;
total_frames += stats.total_frames;
total_bytes += stats.total_bytes;
if session.health_check().is_healthy {
healthy_sessions += 1;
}
}
Ok(SessionsOverview {
sessions: response.sessions,
total_count: response.total_count,
healthy_count: healthy_sessions,
total_streams,
total_frames,
total_bytes,
})
}
pub async fn graceful_shutdown_session(
&self,
session_id: SessionId,
) -> ApplicationResult<SessionShutdownResult> {
let session_query = GetSessionQuery { session_id: session_id.into() };
let session_response = self.query_handler.handle(session_query).await?;
let active_stream_ids: Vec<StreamId> = session_response
.session
.streams()
.iter()
.filter(|(_, stream)| stream.is_active())
.map(|(id, _)| *id)
.collect();
let mut completed_streams = Vec::new();
let mut failed_streams = Vec::new();
for stream_id in &active_stream_ids {
let complete_command = CompleteStreamCommand {
session_id: session_id.into(),
stream_id: (*stream_id).into(),
checksum: None,
};
match self.command_handler.handle(complete_command).await {
Ok(_) => completed_streams.push(*stream_id),
Err(_) => failed_streams.push(*stream_id),
}
}
let close_command = CloseSessionCommand { session_id: session_id.into() };
let session_closed = self.command_handler.handle(close_command).await.is_ok();
Ok(SessionShutdownResult {
session_id,
session_closed,
completed_streams,
failed_streams,
})
}
}
#[derive(Debug, Clone)]
pub struct SessionWithHealth {
pub session: StreamSession,
pub health: SessionHealth,
}
#[derive(Debug, Clone)]
pub struct SessionCompletionResult {
pub stream_id: StreamId,
pub session_closed: bool,
pub remaining_active_streams: usize,
}
#[derive(Debug, Clone)]
pub struct SessionsOverview {
pub sessions: Vec<StreamSession>,
pub total_count: usize,
pub healthy_count: usize,
pub total_streams: u64,
pub total_frames: u64,
pub total_bytes: u64,
}
#[derive(Debug, Clone)]
pub struct SessionShutdownResult {
pub session_id: SessionId,
pub session_closed: bool,
pub completed_streams: Vec<StreamId>,
pub failed_streams: Vec<StreamId>,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
application::{ApplicationError, ApplicationResult, dto::priority_dto::FromDto},
domain::aggregates::stream_session::SessionConfig,
};
use async_trait::async_trait;
use std::collections::HashMap;
struct MockCommandHandler {
sessions: std::sync::Mutex<HashMap<SessionId, StreamSession>>,
}
impl MockCommandHandler {
fn new() -> Self {
Self {
sessions: std::sync::Mutex::new(HashMap::new()),
}
}
}
#[async_trait]
impl CommandHandler<CreateSessionCommand, SessionId> for MockCommandHandler {
async fn handle(&self, command: CreateSessionCommand) -> ApplicationResult<SessionId> {
let mut session = StreamSession::new(command.config);
let _ = session.activate();
let session_id = session.id();
self.sessions.lock()
.map_err(|_| ApplicationError::Concurrency("Sessions lock poisoned".to_string()))?
.insert(session_id, session);
Ok(session_id)
}
}
#[async_trait]
impl CommandHandler<CreateStreamCommand, StreamId> for MockCommandHandler {
async fn handle(&self, command: CreateStreamCommand) -> ApplicationResult<StreamId> {
let mut sessions = self.sessions.lock()
.map_err(|_| ApplicationError::Concurrency("Sessions lock poisoned".to_string()))?;
let session_id = SessionId::from_dto(command.session_id).map_err(ApplicationError::Domain)?;
if let Some(session) = sessions.get_mut(&session_id) {
let stream_id = session
.create_stream(command.source_data)
.map_err(ApplicationError::Domain)?;
Ok(stream_id)
} else {
Err(ApplicationError::NotFound("Session not found".to_string()))
}
}
}
#[async_trait]
impl CommandHandler<StartStreamCommand, ()> for MockCommandHandler {
async fn handle(&self, command: StartStreamCommand) -> ApplicationResult<()> {
let mut sessions = self.sessions.lock()
.map_err(|_| ApplicationError::Concurrency("Sessions lock poisoned".to_string()))?;
let session_id = SessionId::from_dto(command.session_id).map_err(ApplicationError::Domain)?;
if let Some(session) = sessions.get_mut(&session_id) {
let stream_id = StreamId::from_dto(command.stream_id).map_err(ApplicationError::Domain)?;
session
.start_stream(stream_id)
.map_err(ApplicationError::Domain)?;
Ok(())
} else {
Err(ApplicationError::NotFound("Session not found".to_string()))
}
}
}
#[async_trait]
impl CommandHandler<CompleteStreamCommand, ()> for MockCommandHandler {
async fn handle(&self, command: CompleteStreamCommand) -> ApplicationResult<()> {
let mut sessions = self.sessions.lock()
.map_err(|_| ApplicationError::Concurrency("Sessions lock poisoned".to_string()))?;
let session_id = SessionId::from_dto(command.session_id).map_err(ApplicationError::Domain)?;
if let Some(session) = sessions.get_mut(&session_id) {
let stream_id = StreamId::from_dto(command.stream_id).map_err(ApplicationError::Domain)?;
session
.complete_stream(stream_id)
.map_err(ApplicationError::Domain)?;
Ok(())
} else {
Err(ApplicationError::NotFound("Session not found".to_string()))
}
}
}
#[async_trait]
impl CommandHandler<CloseSessionCommand, ()> for MockCommandHandler {
async fn handle(&self, command: CloseSessionCommand) -> ApplicationResult<()> {
let mut sessions = self.sessions.lock()
.map_err(|_| ApplicationError::Concurrency("Sessions lock poisoned".to_string()))?;
let session_id = SessionId::from_dto(command.session_id).map_err(ApplicationError::Domain)?;
if let Some(session) = sessions.get_mut(&session_id) {
session.close().map_err(ApplicationError::Domain)?;
Ok(())
} else {
Err(ApplicationError::NotFound("Session not found".to_string()))
}
}
}
struct MockQueryHandler {
sessions: std::sync::Mutex<HashMap<SessionId, StreamSession>>,
}
impl MockQueryHandler {
fn new() -> Self {
Self {
sessions: std::sync::Mutex::new(HashMap::new()),
}
}
#[allow(dead_code)]
fn sync_sessions(&self, sessions: &HashMap<SessionId, StreamSession>) -> ApplicationResult<()> {
*self.sessions.lock()
.map_err(|_| ApplicationError::Concurrency("Sessions lock poisoned".to_string()))? = sessions.clone();
Ok(())
}
}
#[async_trait]
impl QueryHandler<GetSessionQuery, SessionResponse> for MockQueryHandler {
async fn handle(&self, query: GetSessionQuery) -> ApplicationResult<SessionResponse> {
let sessions = self.sessions.lock()
.map_err(|_| ApplicationError::Concurrency("Sessions lock poisoned".to_string()))?;
let session_id = SessionId::from_dto(query.session_id).map_err(ApplicationError::Domain)?;
if let Some(session) = sessions.get(&session_id) {
Ok(SessionResponse {
session: session.clone(),
})
} else {
Err(ApplicationError::NotFound("Session not found".to_string()))
}
}
}
#[async_trait]
impl QueryHandler<GetSessionHealthQuery, HealthResponse> for MockQueryHandler {
async fn handle(&self, query: GetSessionHealthQuery) -> ApplicationResult<HealthResponse> {
let sessions = self.sessions.lock()
.map_err(|_| ApplicationError::Concurrency("Sessions lock poisoned".to_string()))?;
let session_id = SessionId::from_dto(query.session_id).map_err(ApplicationError::Domain)?;
if let Some(session) = sessions.get(&session_id) {
Ok(HealthResponse {
health: session.health_check(),
})
} else {
Err(ApplicationError::NotFound("Session not found".to_string()))
}
}
}
#[async_trait]
impl QueryHandler<GetActiveSessionsQuery, SessionsResponse> for MockQueryHandler {
async fn handle(
&self,
query: GetActiveSessionsQuery,
) -> ApplicationResult<SessionsResponse> {
let sessions: Vec<_> = self.sessions.lock()
.map_err(|_| ApplicationError::Concurrency("Sessions lock poisoned".to_string()))?
.values().cloned().collect();
let limited_sessions = if let Some(limit) = query.limit {
sessions.into_iter().take(limit).collect()
} else {
sessions
};
Ok(SessionsResponse {
sessions: limited_sessions.clone(),
total_count: limited_sessions.len(),
})
}
}
#[tokio::test]
async fn test_create_and_activate_session() {
let command_handler = Arc::new(MockCommandHandler::new());
let query_handler = Arc::new(MockQueryHandler::new());
let service = SessionService::new(command_handler, query_handler);
let result = service
.create_and_activate_session(
SessionConfig::default(),
Some("test-client".to_string()),
None,
None,
)
.await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_session_service_creation() {
let command_handler = Arc::new(MockCommandHandler::new());
let query_handler = Arc::new(MockQueryHandler::new());
let service = SessionService::new(command_handler.clone(), query_handler.clone());
assert!(std::ptr::eq(service.command_handler.as_ref(), command_handler.as_ref()));
assert!(std::ptr::eq(service.query_handler.as_ref(), query_handler.as_ref()));
}
#[tokio::test]
async fn test_create_and_start_stream() {
let command_handler = Arc::new(MockCommandHandler::new());
let query_handler = Arc::new(MockQueryHandler::new());
let service = SessionService::new(command_handler, query_handler);
let session_id = service
.create_and_activate_session(
SessionConfig::default(),
None,
None,
None,
)
.await
.unwrap();
let command_sessions = service.command_handler.sessions.lock().unwrap().clone();
*service.query_handler.sessions.lock().unwrap() = command_sessions;
let stream_data = serde_json::json!({"test": "data"});
let result = service
.create_and_start_stream(session_id, stream_data, None)
.await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_get_session_with_health() {
let command_handler = Arc::new(MockCommandHandler::new());
let query_handler = Arc::new(MockQueryHandler::new());
let service = SessionService::new(command_handler, query_handler);
let session_id = service
.create_and_activate_session(
SessionConfig::default(),
None,
None,
None,
)
.await
.unwrap();
let command_sessions = service.command_handler.sessions.lock().unwrap().clone();
*service.query_handler.sessions.lock().unwrap() = command_sessions;
let result = service.get_session_with_health(session_id).await;
assert!(result.is_ok());
let session_with_health = result.unwrap();
assert_eq!(session_with_health.session.id(), session_id);
assert!(session_with_health.health.is_healthy);
}
#[tokio::test]
async fn test_get_session_with_health_not_found() {
let command_handler = Arc::new(MockCommandHandler::new());
let query_handler = Arc::new(MockQueryHandler::new());
let service = SessionService::new(command_handler, query_handler);
let non_existent_session_id = SessionId::new();
let result = service.get_session_with_health(non_existent_session_id).await;
assert!(result.is_err());
assert!(matches!(result.err().unwrap(), ApplicationError::NotFound(_)));
}
#[tokio::test]
async fn test_get_sessions_overview() {
let command_handler = Arc::new(MockCommandHandler::new());
let query_handler = Arc::new(MockQueryHandler::new());
let service = SessionService::new(command_handler, query_handler);
let mut session_ids = Vec::new();
for _ in 0..3 {
let session_id = service
.create_and_activate_session(
SessionConfig::default(),
None,
None,
None,
)
.await
.unwrap();
session_ids.push(session_id);
}
let command_sessions = service.command_handler.sessions.lock().unwrap().clone();
*service.query_handler.sessions.lock().unwrap() = command_sessions;
let result = service.get_sessions_overview(None).await;
assert!(result.is_ok());
let overview = result.unwrap();
assert_eq!(overview.sessions.len(), 3);
assert_eq!(overview.total_count, 3);
assert_eq!(overview.healthy_count, 3); }
#[tokio::test]
async fn test_get_sessions_overview_with_limit() {
let command_handler = Arc::new(MockCommandHandler::new());
let query_handler = Arc::new(MockQueryHandler::new());
let service = SessionService::new(command_handler, query_handler);
for _ in 0..5 {
let _ = service
.create_and_activate_session(
SessionConfig::default(),
None,
None,
None,
)
.await
.unwrap();
}
let command_sessions = service.command_handler.sessions.lock().unwrap().clone();
*service.query_handler.sessions.lock().unwrap() = command_sessions;
let result = service.get_sessions_overview(Some(2)).await;
assert!(result.is_ok());
let overview = result.unwrap();
assert_eq!(overview.sessions.len(), 2);
}
#[tokio::test]
async fn test_complete_stream_and_maybe_close_session() {
let command_handler = Arc::new(MockCommandHandler::new());
let query_handler = Arc::new(MockQueryHandler::new());
let service = SessionService::new(command_handler, query_handler);
let session_id = service
.create_and_activate_session(
SessionConfig::default(),
None,
None,
None,
)
.await
.unwrap();
let command_sessions = service.command_handler.sessions.lock().unwrap().clone();
*service.query_handler.sessions.lock().unwrap() = command_sessions.clone();
let stream_id = service
.create_and_start_stream(session_id, serde_json::json!({"test": "data"}), None)
.await
.unwrap();
let command_sessions = service.command_handler.sessions.lock().unwrap().clone();
*service.query_handler.sessions.lock().unwrap() = command_sessions;
let result = service
.complete_stream_and_maybe_close_session(session_id, stream_id)
.await;
assert!(result.is_ok());
let completion_result = result.unwrap();
assert_eq!(completion_result.stream_id, stream_id);
}
#[tokio::test]
async fn test_graceful_shutdown_session() {
let command_handler = Arc::new(MockCommandHandler::new());
let query_handler = Arc::new(MockQueryHandler::new());
let service = SessionService::new(command_handler, query_handler);
let session_id = service
.create_and_activate_session(
SessionConfig::default(),
None,
None,
None,
)
.await
.unwrap();
let command_sessions = service.command_handler.sessions.lock().unwrap().clone();
*service.query_handler.sessions.lock().unwrap() = command_sessions.clone();
let stream1_id = service
.create_and_start_stream(session_id, serde_json::json!({"stream": 1}), None)
.await
.unwrap();
let stream2_id = service
.create_and_start_stream(session_id, serde_json::json!({"stream": 2}), None)
.await
.unwrap();
let command_sessions = service.command_handler.sessions.lock().unwrap().clone();
*service.query_handler.sessions.lock().unwrap() = command_sessions;
let result = service.graceful_shutdown_session(session_id).await;
assert!(result.is_ok());
let shutdown_result = result.unwrap();
assert_eq!(shutdown_result.session_id, session_id);
assert!(shutdown_result.session_closed);
assert_eq!(shutdown_result.completed_streams.len(), 2);
assert_eq!(shutdown_result.failed_streams.len(), 0);
assert!(shutdown_result.completed_streams.contains(&stream1_id));
assert!(shutdown_result.completed_streams.contains(&stream2_id));
}
#[tokio::test]
async fn test_graceful_shutdown_session_not_found() {
let command_handler = Arc::new(MockCommandHandler::new());
let query_handler = Arc::new(MockQueryHandler::new());
let service = SessionService::new(command_handler, query_handler);
let non_existent_session_id = SessionId::new();
let result = service.graceful_shutdown_session(non_existent_session_id).await;
assert!(result.is_err());
assert!(matches!(result.err().unwrap(), ApplicationError::NotFound(_)));
}
#[tokio::test]
async fn test_session_with_health_structure() {
let session = StreamSession::new(SessionConfig::default());
let health = session.health_check();
let session_with_health = SessionWithHealth {
session: session.clone(),
health: health.clone(),
};
assert_eq!(session_with_health.session.id(), session.id());
assert_eq!(session_with_health.health.is_healthy, health.is_healthy);
}
#[tokio::test]
async fn test_session_completion_result_structure() {
let stream_id = StreamId::new();
let result = SessionCompletionResult {
stream_id,
session_closed: true,
remaining_active_streams: 0,
};
assert_eq!(result.stream_id, stream_id);
assert!(result.session_closed);
assert_eq!(result.remaining_active_streams, 0);
}
#[tokio::test]
async fn test_sessions_overview_structure() {
let sessions = vec![StreamSession::new(SessionConfig::default())];
let overview = SessionsOverview {
sessions: sessions.clone(),
total_count: 1,
healthy_count: 1,
total_streams: 0,
total_frames: 0,
total_bytes: 0,
};
assert_eq!(overview.sessions.len(), 1);
assert_eq!(overview.total_count, 1);
assert_eq!(overview.healthy_count, 1);
assert_eq!(overview.total_streams, 0);
}
#[tokio::test]
async fn test_session_shutdown_result_structure() {
let session_id = SessionId::new();
let stream_id = StreamId::new();
let result = SessionShutdownResult {
session_id,
session_closed: true,
completed_streams: vec![stream_id],
failed_streams: vec![],
};
assert_eq!(result.session_id, session_id);
assert!(result.session_closed);
assert_eq!(result.completed_streams.len(), 1);
assert_eq!(result.failed_streams.len(), 0);
}
}