use crate::{
application::{ApplicationError, ApplicationResult, handlers::QueryHandlerGat, queries::*},
domain::{
aggregates::StreamSession,
entities::Stream,
events::EventStore,
ports::{StreamRepositoryGat, StreamStoreGat},
},
};
use std::sync::Arc;
#[derive(Debug)]
pub struct SessionQueryHandler<R>
where
R: StreamRepositoryGat + 'static,
{
repository: Arc<R>,
}
impl<R> SessionQueryHandler<R>
where
R: StreamRepositoryGat + 'static,
{
pub fn new(repository: Arc<R>) -> Self {
Self { repository }
}
}
impl<R> QueryHandlerGat<GetSessionQuery> for SessionQueryHandler<R>
where
R: StreamRepositoryGat + Send + Sync,
{
type Response = SessionResponse;
type HandleFuture<'a>
= impl std::future::Future<Output = ApplicationResult<Self::Response>> + Send + 'a
where
Self: 'a;
fn handle(&self, query: GetSessionQuery) -> Self::HandleFuture<'_> {
async move {
let session = self
.repository
.find_session(query.session_id.into())
.await
.map_err(ApplicationError::Domain)?
.ok_or_else(|| {
ApplicationError::NotFound(format!("Session {} not found", query.session_id))
})?;
Ok(SessionResponse { session })
}
}
}
impl<R> QueryHandlerGat<GetActiveSessionsQuery> for SessionQueryHandler<R>
where
R: StreamRepositoryGat + Send + Sync,
{
type Response = SessionsResponse;
type HandleFuture<'a>
= impl std::future::Future<Output = ApplicationResult<Self::Response>> + Send + 'a
where
Self: 'a;
fn handle(&self, query: GetActiveSessionsQuery) -> Self::HandleFuture<'_> {
async move {
let mut sessions = self
.repository
.find_active_sessions()
.await
.map_err(ApplicationError::Domain)?;
let total_count = sessions.len();
if let Some(offset) = query.offset {
if offset < sessions.len() {
sessions = sessions.into_iter().skip(offset).collect();
} else {
sessions.clear();
}
}
if let Some(limit) = query.limit {
sessions.truncate(limit);
}
Ok(SessionsResponse {
sessions,
total_count,
})
}
}
}
impl<R> QueryHandlerGat<GetSessionHealthQuery> for SessionQueryHandler<R>
where
R: StreamRepositoryGat + Send + Sync,
{
type Response = HealthResponse;
type HandleFuture<'a>
= impl std::future::Future<Output = ApplicationResult<Self::Response>> + Send + 'a
where
Self: 'a;
fn handle(&self, query: GetSessionHealthQuery) -> Self::HandleFuture<'_> {
async move {
let session = self
.repository
.find_session(query.session_id.into())
.await
.map_err(ApplicationError::Domain)?
.ok_or_else(|| {
ApplicationError::NotFound(format!("Session {} not found", query.session_id))
})?;
let health = session.health_check();
Ok(HealthResponse { health })
}
}
}
impl<R> QueryHandlerGat<SearchSessionsQuery> for SessionQueryHandler<R>
where
R: StreamRepositoryGat + Send + Sync,
{
type Response = SessionsResponse;
type HandleFuture<'a>
= impl std::future::Future<Output = ApplicationResult<Self::Response>> + Send + 'a
where
Self: 'a;
fn handle(&self, query: SearchSessionsQuery) -> Self::HandleFuture<'_> {
async move {
let mut sessions = self
.repository
.find_active_sessions()
.await
.map_err(ApplicationError::Domain)?;
sessions.retain(|session| self.matches_filters(session, &query.filters));
if let Some(sort_field) = &query.sort_by {
let ascending = query
.sort_order
.as_ref()
.is_none_or(|order| matches!(order, SortOrder::Ascending));
sessions.sort_by(|a, b| {
let cmp = match sort_field {
SessionSortField::CreatedAt => a.created_at().cmp(&b.created_at()),
SessionSortField::UpdatedAt => a.updated_at().cmp(&b.updated_at()),
SessionSortField::StreamCount => a.streams().len().cmp(&b.streams().len()),
SessionSortField::TotalBytes => {
a.stats().total_bytes.cmp(&b.stats().total_bytes)
}
};
if ascending { cmp } else { cmp.reverse() }
});
}
let total_count = sessions.len();
if let Some(offset) = query.offset {
if offset < sessions.len() {
sessions = sessions.into_iter().skip(offset).collect();
} else {
sessions.clear();
}
}
if let Some(limit) = query.limit {
sessions.truncate(limit);
}
Ok(SessionsResponse {
sessions,
total_count,
})
}
}
}
impl<R> SessionQueryHandler<R>
where
R: StreamRepositoryGat + 'static,
{
fn matches_filters(&self, session: &StreamSession, filters: &SessionFilters) -> bool {
if let Some(ref state_filter) = filters.state {
let state_str = format!("{:?}", session.state()).to_lowercase();
if !state_str.contains(&state_filter.to_lowercase()) {
return false;
}
}
if let Some(after) = filters.created_after
&& session.created_at() <= after
{
return false;
}
if let Some(before) = filters.created_before
&& session.created_at() >= before
{
return false;
}
if let Some(ref client_filter) = filters.client_info {
let _ = client_filter; }
if let Some(has_active) = filters.has_active_streams {
let has_active_streams = session.streams().values().any(|stream| stream.is_active());
if has_active != has_active_streams {
return false;
}
}
true
}
}
#[derive(Debug)]
pub struct StreamQueryHandler<R, S>
where
R: StreamRepositoryGat + 'static,
S: StreamStoreGat + 'static,
{
session_repository: Arc<R>,
#[allow(dead_code)]
stream_store: Arc<S>,
}
impl<R, S> StreamQueryHandler<R, S>
where
R: StreamRepositoryGat + 'static,
S: StreamStoreGat + 'static,
{
pub fn new(session_repository: Arc<R>, stream_store: Arc<S>) -> Self {
Self {
session_repository,
stream_store,
}
}
}
impl<R, S> QueryHandlerGat<GetStreamQuery> for StreamQueryHandler<R, S>
where
R: StreamRepositoryGat + Send + Sync,
S: StreamStoreGat + Send + Sync,
{
type Response = StreamResponse;
type HandleFuture<'a>
= impl std::future::Future<Output = ApplicationResult<Self::Response>> + Send + 'a
where
Self: 'a;
fn handle(&self, query: GetStreamQuery) -> Self::HandleFuture<'_> {
async move {
let session = self
.session_repository
.find_session(query.session_id.into())
.await
.map_err(ApplicationError::Domain)?
.ok_or_else(|| {
ApplicationError::NotFound(format!("Session {} not found", query.session_id))
})?;
let stream = session
.get_stream(query.stream_id.into())
.ok_or_else(|| {
ApplicationError::NotFound(format!("Stream {} not found", query.stream_id))
})?
.clone();
Ok(StreamResponse { stream })
}
}
}
impl<R, S> QueryHandlerGat<GetStreamsForSessionQuery> for StreamQueryHandler<R, S>
where
R: StreamRepositoryGat + Send + Sync,
S: StreamStoreGat + Send + Sync,
{
type Response = StreamsResponse;
type HandleFuture<'a>
= impl std::future::Future<Output = ApplicationResult<Self::Response>> + Send + 'a
where
Self: 'a;
fn handle(&self, query: GetStreamsForSessionQuery) -> Self::HandleFuture<'_> {
async move {
let session = self
.session_repository
.find_session(query.session_id.into())
.await
.map_err(ApplicationError::Domain)?
.ok_or_else(|| {
ApplicationError::NotFound(format!("Session {} not found", query.session_id))
})?;
let streams: Vec<Stream> = session
.streams()
.values()
.filter(|stream| query.include_inactive || stream.is_active())
.cloned()
.collect();
Ok(StreamsResponse { streams })
}
}
}
#[derive(Debug)]
pub struct EventQueryHandler<E>
where
E: EventStore,
{
event_store: Arc<E>,
}
impl<E> EventQueryHandler<E>
where
E: EventStore,
{
pub fn new(event_store: Arc<E>) -> Self {
Self { event_store }
}
}
impl<E> QueryHandlerGat<GetSessionEventsQuery> for EventQueryHandler<E>
where
E: EventStore + Send + Sync,
{
type Response = EventsResponse;
type HandleFuture<'a>
= impl std::future::Future<Output = ApplicationResult<Self::Response>> + Send + 'a
where
Self: 'a;
fn handle(&self, query: GetSessionEventsQuery) -> Self::HandleFuture<'_> {
async move {
let mut events = self
.event_store
.get_events_for_session(query.session_id.into())
.map_err(ApplicationError::Logic)?;
if let Some(since) = query.since {
events.retain(|event| event.timestamp() > since);
}
if let Some(ref event_types) = query.event_types {
events.retain(|event| event_types.contains(&event.event_type().to_string()));
}
let total_count = events.len();
if let Some(limit) = query.limit {
events.truncate(limit);
}
Ok(EventsResponse {
events,
total_count,
})
}
}
}
impl<E> QueryHandlerGat<GetStreamEventsQuery> for EventQueryHandler<E>
where
E: EventStore + Send + Sync,
{
type Response = EventsResponse;
type HandleFuture<'a>
= impl std::future::Future<Output = ApplicationResult<Self::Response>> + Send + 'a
where
Self: 'a;
fn handle(&self, query: GetStreamEventsQuery) -> Self::HandleFuture<'_> {
async move {
let mut events = self
.event_store
.get_events_for_stream(query.stream_id.into())
.map_err(ApplicationError::Logic)?;
if let Some(since) = query.since {
events.retain(|event| event.timestamp() > since);
}
let total_count = events.len();
if let Some(limit) = query.limit {
events.truncate(limit);
}
Ok(EventsResponse {
events,
total_count,
})
}
}
}
#[derive(Debug)]
pub struct SystemQueryHandler<R>
where
R: StreamRepositoryGat + 'static,
{
repository: Arc<R>,
}
impl<R> SystemQueryHandler<R>
where
R: StreamRepositoryGat + 'static,
{
pub fn new(repository: Arc<R>) -> Self {
Self { repository }
}
}
impl<R> QueryHandlerGat<GetSystemStatsQuery> for SystemQueryHandler<R>
where
R: StreamRepositoryGat + Send + Sync,
{
type Response = SystemStatsResponse;
type HandleFuture<'a>
= impl std::future::Future<Output = ApplicationResult<Self::Response>> + Send + 'a
where
Self: 'a;
fn handle(&self, _query: GetSystemStatsQuery) -> Self::HandleFuture<'_> {
async move {
let sessions = self
.repository
.find_active_sessions()
.await
.map_err(ApplicationError::Domain)?;
let total_sessions = sessions.len() as u64;
let active_sessions = sessions.iter().filter(|s| s.is_active()).count() as u64;
let mut total_streams = 0u64;
let mut active_streams = 0u64;
let mut total_frames = 0u64;
let mut total_bytes = 0u64;
let mut total_duration_ms = 0f64;
let mut completed_sessions = 0u64;
for session in &sessions {
let stats = session.stats();
total_streams += stats.total_streams;
active_streams += stats.active_streams;
total_frames += stats.total_frames;
total_bytes += stats.total_bytes;
if let Some(duration) = session.duration() {
total_duration_ms += duration.num_milliseconds() as f64;
completed_sessions += 1;
}
}
let average_session_duration_seconds = if completed_sessions > 0 {
total_duration_ms / completed_sessions as f64 / 1000.0
} else {
0.0
};
let uptime_seconds = 3600; let frames_per_second = total_frames as f64 / uptime_seconds as f64;
let bytes_per_second = total_bytes as f64 / uptime_seconds as f64;
Ok(SystemStatsResponse {
total_sessions,
active_sessions,
total_streams,
active_streams,
total_frames,
total_bytes,
average_session_duration_seconds,
frames_per_second,
bytes_per_second,
uptime_seconds: uptime_seconds as u64,
})
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::domain::{
aggregates::{StreamSession, stream_session::SessionConfig},
ports::{
Pagination, PriorityDistribution, SessionHealthSnapshot, SessionQueryCriteria,
SessionQueryResult, StreamFilter, StreamStatistics, StreamStatus,
},
value_objects::{SessionId, StreamId},
};
use chrono::Utc;
use std::collections::HashMap;
struct MockRepository {
sessions: parking_lot::Mutex<HashMap<SessionId, StreamSession>>,
}
impl MockRepository {
fn new() -> Self {
Self {
sessions: parking_lot::Mutex::new(HashMap::new()),
}
}
fn add_session(&self, session: StreamSession) {
self.sessions.lock().insert(session.id(), session);
}
}
impl StreamRepositoryGat for MockRepository {
type FindSessionFuture<'a>
= impl std::future::Future<Output = crate::domain::DomainResult<Option<StreamSession>>>
+ Send
+ 'a
where
Self: 'a;
type SaveSessionFuture<'a>
= impl std::future::Future<Output = crate::domain::DomainResult<()>> + Send + 'a
where
Self: 'a;
type RemoveSessionFuture<'a>
= impl std::future::Future<Output = crate::domain::DomainResult<()>> + Send + 'a
where
Self: 'a;
type FindActiveSessionsFuture<'a>
= impl std::future::Future<Output = crate::domain::DomainResult<Vec<StreamSession>>>
+ Send
+ 'a
where
Self: 'a;
type FindSessionsByCriteriaFuture<'a>
= impl std::future::Future<Output = crate::domain::DomainResult<SessionQueryResult>>
+ Send
+ 'a
where
Self: 'a;
type GetSessionHealthFuture<'a>
= impl std::future::Future<Output = crate::domain::DomainResult<SessionHealthSnapshot>>
+ Send
+ 'a
where
Self: 'a;
type SessionExistsFuture<'a>
= impl std::future::Future<Output = crate::domain::DomainResult<bool>> + Send + 'a
where
Self: 'a;
fn find_session(&self, session_id: SessionId) -> Self::FindSessionFuture<'_> {
async move { Ok(self.sessions.lock().get(&session_id).cloned()) }
}
fn save_session(&self, session: StreamSession) -> Self::SaveSessionFuture<'_> {
async move {
self.sessions.lock().insert(session.id(), session);
Ok(())
}
}
fn remove_session(&self, session_id: SessionId) -> Self::RemoveSessionFuture<'_> {
async move {
self.sessions.lock().remove(&session_id);
Ok(())
}
}
fn find_active_sessions(&self) -> Self::FindActiveSessionsFuture<'_> {
async move { Ok(self.sessions.lock().values().cloned().collect()) }
}
fn find_sessions_by_criteria(
&self,
_criteria: SessionQueryCriteria,
pagination: Pagination,
) -> Self::FindSessionsByCriteriaFuture<'_> {
async move {
let sessions: Vec<_> = self.sessions.lock().values().cloned().collect();
let total_count = sessions.len();
let paginated: Vec<_> = sessions
.into_iter()
.skip(pagination.offset)
.take(pagination.limit)
.collect();
let has_more = pagination.offset + paginated.len() < total_count;
Ok(SessionQueryResult {
sessions: paginated,
total_count,
has_more,
query_duration_ms: 0,
scan_limit_reached: false,
})
}
}
fn get_session_health(&self, session_id: SessionId) -> Self::GetSessionHealthFuture<'_> {
async move {
Ok(SessionHealthSnapshot {
session_id,
is_healthy: true,
active_streams: 0,
total_frames: 0,
last_activity: Utc::now(),
error_rate: 0.0,
metrics: HashMap::new(),
})
}
}
fn session_exists(&self, session_id: SessionId) -> Self::SessionExistsFuture<'_> {
async move { Ok(self.sessions.lock().contains_key(&session_id)) }
}
}
struct MockStreamStore;
impl StreamStoreGat for MockStreamStore {
type StoreStreamFuture<'a>
= impl std::future::Future<Output = crate::domain::DomainResult<()>> + Send + 'a
where
Self: 'a;
type GetStreamFuture<'a>
= impl std::future::Future<
Output = crate::domain::DomainResult<Option<crate::domain::entities::Stream>>,
> + Send
+ 'a
where
Self: 'a;
type DeleteStreamFuture<'a>
= impl std::future::Future<Output = crate::domain::DomainResult<()>> + Send + 'a
where
Self: 'a;
type ListStreamsForSessionFuture<'a>
= impl std::future::Future<
Output = crate::domain::DomainResult<Vec<crate::domain::entities::Stream>>,
> + Send
+ 'a
where
Self: 'a;
type FindStreamsBySessionFuture<'a>
= impl std::future::Future<
Output = crate::domain::DomainResult<Vec<crate::domain::entities::Stream>>,
> + Send
+ 'a
where
Self: 'a;
type UpdateStreamStatusFuture<'a>
= impl std::future::Future<Output = crate::domain::DomainResult<()>> + Send + 'a
where
Self: 'a;
type GetStreamStatisticsFuture<'a>
= impl std::future::Future<Output = crate::domain::DomainResult<StreamStatistics>>
+ Send
+ 'a
where
Self: 'a;
fn store_stream(
&self,
_stream: crate::domain::entities::Stream,
) -> Self::StoreStreamFuture<'_> {
async move { Ok(()) }
}
fn get_stream(&self, _stream_id: StreamId) -> Self::GetStreamFuture<'_> {
async move { Ok(None) }
}
fn delete_stream(&self, _stream_id: StreamId) -> Self::DeleteStreamFuture<'_> {
async move { Ok(()) }
}
fn list_streams_for_session(
&self,
_session_id: SessionId,
) -> Self::ListStreamsForSessionFuture<'_> {
async move { Ok(vec![]) }
}
fn find_streams_by_session(
&self,
_session_id: SessionId,
_filter: StreamFilter,
) -> Self::FindStreamsBySessionFuture<'_> {
async move { Ok(vec![]) }
}
fn update_stream_status(
&self,
_stream_id: StreamId,
_status: StreamStatus,
) -> Self::UpdateStreamStatusFuture<'_> {
async move { Ok(()) }
}
fn get_stream_statistics(
&self,
_stream_id: StreamId,
) -> Self::GetStreamStatisticsFuture<'_> {
async move {
Ok(StreamStatistics {
total_frames: 0,
total_bytes: 0,
priority_distribution: PriorityDistribution::default(),
avg_frame_size: 0.0,
creation_time: Utc::now(),
completion_time: None,
processing_duration: None,
})
}
}
}
struct MockEventStore;
impl MockEventStore {
fn new() -> Self {
Self
}
}
impl EventStore for MockEventStore {
fn append_events(
&mut self,
_events: Vec<crate::domain::events::DomainEvent>,
) -> Result<(), String> {
Ok(())
}
fn get_events_for_session(
&self,
_session_id: SessionId,
) -> Result<Vec<crate::domain::events::DomainEvent>, String> {
Ok(vec![])
}
fn get_events_for_stream(
&self,
_stream_id: StreamId,
) -> Result<Vec<crate::domain::events::DomainEvent>, String> {
Ok(vec![])
}
fn get_events_since(
&self,
_since: chrono::DateTime<chrono::Utc>,
) -> Result<Vec<crate::domain::events::DomainEvent>, String> {
Ok(vec![])
}
}
#[tokio::test]
async fn test_get_session_query() {
let repository = Arc::new(MockRepository::new());
let handler = SessionQueryHandler::new(repository.clone());
let mut session = StreamSession::new(SessionConfig::default());
let _ = session.activate();
let session_id = session.id();
repository.add_session(session);
let query = GetSessionQuery {
session_id: session_id.into(),
};
let result = handler.handle(query).await;
assert!(result.is_ok());
let response = result.unwrap();
assert_eq!(response.session.id(), session_id);
}
#[tokio::test]
async fn test_get_session_not_found() {
let repository = Arc::new(MockRepository::new());
let handler = SessionQueryHandler::new(repository);
let query = GetSessionQuery {
session_id: SessionId::new().into(),
};
let result = handler.handle(query).await;
assert!(result.is_err());
match result.err().unwrap() {
ApplicationError::NotFound(_) => {}
_ => panic!("Expected NotFound error"),
}
}
#[tokio::test]
async fn test_get_active_sessions_query() {
let repository = Arc::new(MockRepository::new());
let handler = SessionQueryHandler::new(repository.clone());
for i in 0..5 {
let mut session = StreamSession::new(SessionConfig::default());
if i < 3 {
let _ = session.activate();
}
repository.add_session(session);
}
let query = GetActiveSessionsQuery {
offset: None,
limit: None,
};
let result = handler.handle(query).await;
assert!(result.is_ok());
let response = result.unwrap();
assert_eq!(response.sessions.len(), 5);
assert_eq!(response.total_count, 5);
}
#[tokio::test]
async fn test_get_active_sessions_with_pagination() {
let repository = Arc::new(MockRepository::new());
let handler = SessionQueryHandler::new(repository.clone());
for _ in 0..10 {
let mut session = StreamSession::new(SessionConfig::default());
let _ = session.activate();
repository.add_session(session);
}
let query = GetActiveSessionsQuery {
offset: Some(3),
limit: Some(4),
};
let result = handler.handle(query).await;
assert!(result.is_ok());
let response = result.unwrap();
assert_eq!(response.sessions.len(), 4);
assert_eq!(response.total_count, 10);
}
#[tokio::test]
async fn test_get_session_health_query() {
let repository = Arc::new(MockRepository::new());
let handler = SessionQueryHandler::new(repository.clone());
let mut session = StreamSession::new(SessionConfig::default());
let _ = session.activate();
let session_id = session.id();
repository.add_session(session);
let query = GetSessionHealthQuery {
session_id: session_id.into(),
};
let result = handler.handle(query).await;
assert!(result.is_ok());
let response = result.unwrap();
assert!(response.health.is_healthy);
}
#[tokio::test]
async fn test_session_handler_creation() {
let repository = Arc::new(MockRepository::new());
let handler = SessionQueryHandler::new(repository.clone());
assert!(std::ptr::eq(
handler.repository.as_ref(),
repository.as_ref()
));
}
#[tokio::test]
async fn test_stream_handler_creation() {
let session_repository = Arc::new(MockRepository::new());
let stream_store = Arc::new(MockStreamStore);
let handler = StreamQueryHandler::new(session_repository.clone(), stream_store.clone());
assert!(std::ptr::eq(
handler.session_repository.as_ref(),
session_repository.as_ref()
));
}
#[tokio::test]
async fn test_event_handler_creation() {
let event_store = Arc::new(MockEventStore::new());
let handler = EventQueryHandler::new(event_store.clone());
assert!(std::ptr::eq(
handler.event_store.as_ref(),
event_store.as_ref()
));
}
#[tokio::test]
async fn test_system_handler_creation() {
let repository = Arc::new(MockRepository::new());
let handler = SystemQueryHandler::new(repository.clone());
assert!(std::ptr::eq(
handler.repository.as_ref(),
repository.as_ref()
));
}
#[tokio::test]
async fn test_get_active_sessions_empty() {
let repository = Arc::new(MockRepository::new());
let handler = SessionQueryHandler::new(repository);
let query = GetActiveSessionsQuery {
limit: Some(10),
offset: Some(0),
};
let result = QueryHandlerGat::handle(&handler, query).await;
assert!(result.is_ok());
let response = result.unwrap();
assert_eq!(response.sessions.len(), 0);
assert_eq!(response.total_count, 0);
}
#[tokio::test]
async fn test_get_active_sessions_with_limit() {
let repository = Arc::new(MockRepository::new());
let handler = SessionQueryHandler::new(repository);
let query = GetActiveSessionsQuery {
limit: Some(5),
offset: None,
};
let result = QueryHandlerGat::handle(&handler, query).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_get_active_sessions_with_offset() {
let repository = Arc::new(MockRepository::new());
let handler = SessionQueryHandler::new(repository);
let query = GetActiveSessionsQuery {
limit: None,
offset: Some(10),
};
let result = QueryHandlerGat::handle(&handler, query).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_get_active_sessions_offset_beyond_count() {
let repository = Arc::new(MockRepository::new());
let handler = SessionQueryHandler::new(repository);
let query = GetActiveSessionsQuery {
limit: Some(10),
offset: Some(1000),
};
let result = QueryHandlerGat::handle(&handler, query).await;
assert!(result.is_ok());
let response = result.unwrap();
assert_eq!(response.sessions.len(), 0);
}
#[tokio::test]
async fn test_get_stream_not_found() {
use crate::domain::value_objects::{SessionId, StreamId};
let session_repository = Arc::new(MockRepository::new());
let stream_store = Arc::new(MockStreamStore);
let handler = StreamQueryHandler::new(session_repository, stream_store);
let query = GetStreamQuery {
session_id: SessionId::new().into(),
stream_id: StreamId::new().into(),
};
let result: ApplicationResult<StreamResponse> =
QueryHandlerGat::handle(&handler, query).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_session_query_not_found() {
use crate::domain::value_objects::SessionId;
let repository = Arc::new(MockRepository::new());
let handler = SessionQueryHandler::new(repository);
let query = GetSessionQuery {
session_id: SessionId::new().into(),
};
let result = QueryHandlerGat::handle(&handler, query).await;
assert!(result.is_err());
}
}