pjson_rs/application/handlers/
query_handlers.rs

1//! Query handlers for read operations
2
3use crate::{
4    application::{ApplicationError, ApplicationResult, handlers::QueryHandler, queries::*},
5    domain::{
6        aggregates::StreamSession,
7        entities::Stream,
8        events::EventStore,
9        ports::{StreamRepository, StreamStore},
10    },
11};
12use async_trait::async_trait;
13use std::sync::Arc;
14
15/// Handler for session-related queries
16#[derive(Debug)]
17pub struct SessionQueryHandler<R>
18where
19    R: StreamRepository,
20{
21    repository: Arc<R>,
22}
23
24impl<R> SessionQueryHandler<R>
25where
26    R: StreamRepository,
27{
28    pub fn new(repository: Arc<R>) -> Self {
29        Self { repository }
30    }
31}
32
33#[async_trait]
34impl<R> QueryHandler<GetSessionQuery, SessionResponse> for SessionQueryHandler<R>
35where
36    R: StreamRepository + Send + Sync,
37{
38    async fn handle(&self, query: GetSessionQuery) -> ApplicationResult<SessionResponse> {
39        let session = self
40            .repository
41            .find_session(query.session_id.into())
42            .await
43            .map_err(ApplicationError::Domain)?
44            .ok_or_else(|| {
45                ApplicationError::NotFound(format!("Session {} not found", query.session_id))
46            })?;
47
48        Ok(SessionResponse { session })
49    }
50}
51
52#[async_trait]
53impl<R> QueryHandler<GetActiveSessionsQuery, SessionsResponse> for SessionQueryHandler<R>
54where
55    R: StreamRepository + Send + Sync,
56{
57    async fn handle(&self, query: GetActiveSessionsQuery) -> ApplicationResult<SessionsResponse> {
58        let mut sessions = self
59            .repository
60            .find_active_sessions()
61            .await
62            .map_err(ApplicationError::Domain)?;
63
64        // Apply pagination
65        let total_count = sessions.len();
66
67        if let Some(offset) = query.offset {
68            if offset < sessions.len() {
69                sessions = sessions.into_iter().skip(offset).collect();
70            } else {
71                sessions.clear();
72            }
73        }
74
75        if let Some(limit) = query.limit {
76            sessions.truncate(limit);
77        }
78
79        Ok(SessionsResponse {
80            sessions,
81            total_count,
82        })
83    }
84}
85
86#[async_trait]
87impl<R> QueryHandler<GetSessionHealthQuery, HealthResponse> for SessionQueryHandler<R>
88where
89    R: StreamRepository + Send + Sync,
90{
91    async fn handle(&self, query: GetSessionHealthQuery) -> ApplicationResult<HealthResponse> {
92        let session = self
93            .repository
94            .find_session(query.session_id.into())
95            .await
96            .map_err(ApplicationError::Domain)?
97            .ok_or_else(|| {
98                ApplicationError::NotFound(format!("Session {} not found", query.session_id))
99            })?;
100
101        let health = session.health_check();
102
103        Ok(HealthResponse { health })
104    }
105}
106
107#[async_trait]
108impl<R> QueryHandler<SearchSessionsQuery, SessionsResponse> for SessionQueryHandler<R>
109where
110    R: StreamRepository + Send + Sync,
111{
112    async fn handle(&self, query: SearchSessionsQuery) -> ApplicationResult<SessionsResponse> {
113        // Load all active sessions (in production, this would be more efficient with database filtering)
114        let mut sessions = self
115            .repository
116            .find_active_sessions()
117            .await
118            .map_err(ApplicationError::Domain)?;
119
120        // Apply filters
121        sessions.retain(|session| self.matches_filters(session, &query.filters));
122
123        // Apply sorting
124        if let Some(sort_field) = &query.sort_by {
125            let ascending = query
126                .sort_order
127                .as_ref()
128                .is_none_or(|order| matches!(order, SortOrder::Ascending));
129
130            sessions.sort_by(|a, b| {
131                let cmp = match sort_field {
132                    SessionSortField::CreatedAt => a.created_at().cmp(&b.created_at()),
133                    SessionSortField::UpdatedAt => a.updated_at().cmp(&b.updated_at()),
134                    SessionSortField::StreamCount => a.streams().len().cmp(&b.streams().len()),
135                    SessionSortField::TotalBytes => {
136                        a.stats().total_bytes.cmp(&b.stats().total_bytes)
137                    }
138                };
139
140                if ascending { cmp } else { cmp.reverse() }
141            });
142        }
143
144        // Apply pagination
145        let total_count = sessions.len();
146
147        if let Some(offset) = query.offset {
148            if offset < sessions.len() {
149                sessions = sessions.into_iter().skip(offset).collect();
150            } else {
151                sessions.clear();
152            }
153        }
154
155        if let Some(limit) = query.limit {
156            sessions.truncate(limit);
157        }
158
159        Ok(SessionsResponse {
160            sessions,
161            total_count,
162        })
163    }
164}
165
166impl<R> SessionQueryHandler<R>
167where
168    R: StreamRepository,
169{
170    fn matches_filters(&self, session: &StreamSession, filters: &SessionFilters) -> bool {
171        // State filter
172        if let Some(ref state_filter) = filters.state {
173            let state_str = format!("{:?}", session.state()).to_lowercase();
174            if !state_str.contains(&state_filter.to_lowercase()) {
175                return false;
176            }
177        }
178
179        // Date range filters
180        if let Some(after) = filters.created_after
181            && session.created_at() <= after {
182                return false;
183            }
184
185        if let Some(before) = filters.created_before
186            && session.created_at() >= before {
187                return false;
188            }
189
190        // Client info filter
191        if let Some(ref client_filter) = filters.client_info {
192            // Note: In real implementation, we'd need to store and access client info
193            // This is a placeholder for the filtering logic
194            let _ = client_filter; // Suppress unused warning
195        }
196
197        // Active streams filter
198        if let Some(has_active) = filters.has_active_streams {
199            let has_active_streams = session.streams().values().any(|stream| stream.is_active());
200            if has_active != has_active_streams {
201                return false;
202            }
203        }
204
205        true
206    }
207}
208
209/// Handler for stream-related queries
210#[derive(Debug)]
211pub struct StreamQueryHandler<R, S>
212where
213    R: StreamRepository,
214    S: StreamStore,
215{
216    session_repository: Arc<R>,
217    #[allow(dead_code)]
218    stream_store: Arc<S>,
219}
220
221impl<R, S> StreamQueryHandler<R, S>
222where
223    R: StreamRepository,
224    S: StreamStore,
225{
226    pub fn new(session_repository: Arc<R>, stream_store: Arc<S>) -> Self {
227        Self {
228            session_repository,
229            stream_store,
230        }
231    }
232}
233
234#[async_trait]
235impl<R, S> QueryHandler<GetStreamQuery, StreamResponse> for StreamQueryHandler<R, S>
236where
237    R: StreamRepository + Send + Sync,
238    S: StreamStore + Send + Sync,
239{
240    async fn handle(&self, query: GetStreamQuery) -> ApplicationResult<StreamResponse> {
241        let session = self
242            .session_repository
243            .find_session(query.session_id.into())
244            .await
245            .map_err(ApplicationError::Domain)?
246            .ok_or_else(|| {
247                ApplicationError::NotFound(format!("Session {} not found", query.session_id))
248            })?;
249
250        let stream = session
251            .get_stream(query.stream_id.into())
252            .ok_or_else(|| {
253                ApplicationError::NotFound(format!("Stream {} not found", query.stream_id))
254            })?
255            .clone();
256
257        Ok(StreamResponse { stream })
258    }
259}
260
261#[async_trait]
262impl<R, S> QueryHandler<GetStreamsForSessionQuery, StreamsResponse> for StreamQueryHandler<R, S>
263where
264    R: StreamRepository + Send + Sync,
265    S: StreamStore + Send + Sync,
266{
267    async fn handle(&self, query: GetStreamsForSessionQuery) -> ApplicationResult<StreamsResponse> {
268        let session = self
269            .session_repository
270            .find_session(query.session_id.into())
271            .await
272            .map_err(ApplicationError::Domain)?
273            .ok_or_else(|| {
274                ApplicationError::NotFound(format!("Session {} not found", query.session_id))
275            })?;
276
277        let streams: Vec<Stream> = session
278            .streams()
279            .values()
280            .filter(|stream| query.include_inactive || stream.is_active())
281            .cloned()
282            .collect();
283
284        Ok(StreamsResponse { streams })
285    }
286}
287
288/// Handler for event-related queries
289#[derive(Debug)]
290pub struct EventQueryHandler<E>
291where
292    E: EventStore,
293{
294    event_store: Arc<E>,
295}
296
297impl<E> EventQueryHandler<E>
298where
299    E: EventStore,
300{
301    pub fn new(event_store: Arc<E>) -> Self {
302        Self { event_store }
303    }
304}
305
306#[async_trait]
307impl<E> QueryHandler<GetSessionEventsQuery, EventsResponse> for EventQueryHandler<E>
308where
309    E: EventStore + Send + Sync,
310{
311    async fn handle(&self, query: GetSessionEventsQuery) -> ApplicationResult<EventsResponse> {
312        let mut events = self
313            .event_store
314            .get_events_for_session(query.session_id.into())
315            .map_err(ApplicationError::Logic)?;
316
317        // Apply time filter
318        if let Some(since) = query.since {
319            events.retain(|event| event.timestamp() > since);
320        }
321
322        // Apply event type filter
323        if let Some(ref event_types) = query.event_types {
324            events.retain(|event| event_types.contains(&event.event_type().to_string()));
325        }
326
327        let total_count = events.len();
328
329        // Apply limit
330        if let Some(limit) = query.limit {
331            events.truncate(limit);
332        }
333
334        Ok(EventsResponse {
335            events,
336            total_count,
337        })
338    }
339}
340
341#[async_trait]
342impl<E> QueryHandler<GetStreamEventsQuery, EventsResponse> for EventQueryHandler<E>
343where
344    E: EventStore + Send + Sync,
345{
346    async fn handle(&self, query: GetStreamEventsQuery) -> ApplicationResult<EventsResponse> {
347        let mut events = self
348            .event_store
349            .get_events_for_stream(query.stream_id.into())
350            .map_err(ApplicationError::Logic)?;
351
352        // Apply time filter
353        if let Some(since) = query.since {
354            events.retain(|event| event.timestamp() > since);
355        }
356
357        let total_count = events.len();
358
359        // Apply limit
360        if let Some(limit) = query.limit {
361            events.truncate(limit);
362        }
363
364        Ok(EventsResponse {
365            events,
366            total_count,
367        })
368    }
369}
370
371/// Handler for system statistics
372#[derive(Debug)]
373pub struct SystemQueryHandler<R>
374where
375    R: StreamRepository,
376{
377    repository: Arc<R>,
378}
379
380impl<R> SystemQueryHandler<R>
381where
382    R: StreamRepository,
383{
384    pub fn new(repository: Arc<R>) -> Self {
385        Self { repository }
386    }
387}
388
389#[async_trait]
390impl<R> QueryHandler<GetSystemStatsQuery, SystemStatsResponse> for SystemQueryHandler<R>
391where
392    R: StreamRepository + Send + Sync,
393{
394    async fn handle(&self, _query: GetSystemStatsQuery) -> ApplicationResult<SystemStatsResponse> {
395        let sessions = self
396            .repository
397            .find_active_sessions()
398            .await
399            .map_err(ApplicationError::Domain)?;
400
401        let total_sessions = sessions.len() as u64;
402        let active_sessions = sessions.iter().filter(|s| s.is_active()).count() as u64;
403
404        let mut total_streams = 0u64;
405        let mut active_streams = 0u64;
406        let mut total_frames = 0u64;
407        let mut total_bytes = 0u64;
408        let mut total_duration_ms = 0f64;
409        let mut completed_sessions = 0u64;
410
411        for session in &sessions {
412            let stats = session.stats();
413            total_streams += stats.total_streams;
414            active_streams += stats.active_streams;
415            total_frames += stats.total_frames;
416            total_bytes += stats.total_bytes;
417
418            if let Some(duration) = session.duration() {
419                total_duration_ms += duration.num_milliseconds() as f64;
420                completed_sessions += 1;
421            }
422        }
423
424        let average_session_duration_seconds = if completed_sessions > 0 {
425            total_duration_ms / completed_sessions as f64 / 1000.0
426        } else {
427            0.0
428        };
429
430        // Calculate rates (simplified - would need time-based tracking in production)
431        let uptime_seconds = 3600; // Placeholder - would track actual uptime
432        let frames_per_second = total_frames as f64 / uptime_seconds as f64;
433        let bytes_per_second = total_bytes as f64 / uptime_seconds as f64;
434
435        Ok(SystemStatsResponse {
436            total_sessions,
437            active_sessions,
438            total_streams,
439            active_streams,
440            total_frames,
441            total_bytes,
442            average_session_duration_seconds,
443            frames_per_second,
444            bytes_per_second,
445            uptime_seconds: uptime_seconds as u64,
446        })
447    }
448}
449
450#[cfg(test)]
451mod tests {
452    use super::*;
453    use crate::domain::{
454        aggregates::{StreamSession, stream_session::SessionConfig},
455        value_objects::{SessionId, StreamId},
456    };
457    use std::collections::HashMap;
458
459    // Mock implementations for testing
460    struct MockRepository {
461        sessions: std::sync::Mutex<HashMap<SessionId, StreamSession>>,
462    }
463
464    impl MockRepository {
465        fn new() -> Self {
466            Self {
467                sessions: std::sync::Mutex::new(HashMap::new()),
468            }
469        }
470
471        fn add_session(&self, session: StreamSession) {
472            self.sessions.lock().unwrap().insert(session.id(), session);
473        }
474    }
475
476    #[async_trait]
477    impl StreamRepository for MockRepository {
478        async fn find_session(
479            &self,
480            session_id: SessionId,
481        ) -> crate::domain::DomainResult<Option<StreamSession>> {
482            Ok(self.sessions.lock().unwrap().get(&session_id).cloned())
483        }
484
485        async fn save_session(&self, session: StreamSession) -> crate::domain::DomainResult<()> {
486            self.sessions.lock().unwrap().insert(session.id(), session);
487            Ok(())
488        }
489
490        async fn remove_session(&self, session_id: SessionId) -> crate::domain::DomainResult<()> {
491            self.sessions.lock().unwrap().remove(&session_id);
492            Ok(())
493        }
494
495        async fn find_active_sessions(&self) -> crate::domain::DomainResult<Vec<StreamSession>> {
496            Ok(self.sessions.lock().unwrap().values().cloned().collect())
497        }
498    }
499
500    struct MockStreamStore;
501
502    #[async_trait]
503    impl StreamStore for MockStreamStore {
504        async fn store_stream(&self, _stream: crate::domain::entities::Stream) -> crate::domain::DomainResult<()> {
505            Ok(())
506        }
507
508        async fn get_stream(&self, _stream_id: StreamId) -> crate::domain::DomainResult<Option<crate::domain::entities::Stream>> {
509            Ok(None)
510        }
511
512        async fn delete_stream(&self, _stream_id: StreamId) -> crate::domain::DomainResult<()> {
513            Ok(())
514        }
515
516        async fn list_streams_for_session(&self, _session_id: SessionId) -> crate::domain::DomainResult<Vec<crate::domain::entities::Stream>> {
517            Ok(vec![])
518        }
519    }
520
521    struct MockEventStore;
522
523    impl MockEventStore {
524        fn new() -> Self {
525            Self
526        }
527    }
528
529    impl EventStore for MockEventStore {
530        fn append_events(&mut self, _events: Vec<crate::domain::events::DomainEvent>) -> Result<(), String> {
531            Ok(())
532        }
533
534        fn get_events_for_session(&self, _session_id: SessionId) -> Result<Vec<crate::domain::events::DomainEvent>, String> {
535            Ok(vec![])
536        }
537
538        fn get_events_for_stream(&self, _stream_id: StreamId) -> Result<Vec<crate::domain::events::DomainEvent>, String> {
539            Ok(vec![])
540        }
541
542        fn get_events_since(&self, _since: chrono::DateTime<chrono::Utc>) -> Result<Vec<crate::domain::events::DomainEvent>, String> {
543            Ok(vec![])
544        }
545    }
546
547    #[tokio::test]
548    async fn test_get_session_query() {
549        let repository = Arc::new(MockRepository::new());
550        let handler = SessionQueryHandler::new(repository.clone());
551
552        // Create and add a session
553        let mut session = StreamSession::new(SessionConfig::default());
554        let _ = session.activate();
555        let session_id = session.id();
556        repository.add_session(session);
557
558        // Query the session
559        let query = GetSessionQuery { session_id: session_id.into() };
560        let result = handler.handle(query).await;
561
562        assert!(result.is_ok());
563        let response = result.unwrap();
564        assert_eq!(response.session.id(), session_id);
565    }
566
567    #[tokio::test]
568    async fn test_get_session_not_found() {
569        let repository = Arc::new(MockRepository::new());
570        let handler = SessionQueryHandler::new(repository);
571
572        let query = GetSessionQuery {
573            session_id: SessionId::new().into(),
574        };
575        let result = handler.handle(query).await;
576
577        assert!(result.is_err());
578        match result.err().unwrap() {
579            ApplicationError::NotFound(_) => {}
580            _ => panic!("Expected NotFound error"),
581        }
582    }
583
584    #[tokio::test]
585    async fn test_get_active_sessions_query() {
586        let repository = Arc::new(MockRepository::new());
587        let handler = SessionQueryHandler::new(repository.clone());
588
589        // Add multiple sessions
590        for i in 0..5 {
591            let mut session = StreamSession::new(SessionConfig::default());
592            if i < 3 {
593                let _ = session.activate();
594            }
595            repository.add_session(session);
596        }
597
598        // Query active sessions
599        let query = GetActiveSessionsQuery {
600            offset: None,
601            limit: None,
602        };
603        let result = handler.handle(query).await;
604
605        assert!(result.is_ok());
606        let response = result.unwrap();
607        assert_eq!(response.sessions.len(), 5);
608        assert_eq!(response.total_count, 5);
609    }
610
611    #[tokio::test]
612    async fn test_get_active_sessions_with_pagination() {
613        let repository = Arc::new(MockRepository::new());
614        let handler = SessionQueryHandler::new(repository.clone());
615
616        // Add 10 sessions
617        for _ in 0..10 {
618            let mut session = StreamSession::new(SessionConfig::default());
619            let _ = session.activate();
620            repository.add_session(session);
621        }
622
623        // Query with pagination
624        let query = GetActiveSessionsQuery {
625            offset: Some(3),
626            limit: Some(4),
627        };
628        let result = handler.handle(query).await;
629
630        assert!(result.is_ok());
631        let response = result.unwrap();
632        assert_eq!(response.sessions.len(), 4);
633        assert_eq!(response.total_count, 10);
634    }
635
636    #[tokio::test]
637    async fn test_get_session_health_query() {
638        let repository = Arc::new(MockRepository::new());
639        let handler = SessionQueryHandler::new(repository.clone());
640
641        // Create and add a session
642        let mut session = StreamSession::new(SessionConfig::default());
643        let _ = session.activate();
644        let session_id = session.id();
645        repository.add_session(session);
646
647        // Query session health
648        let query = GetSessionHealthQuery { session_id: session_id.into() };
649        let result = handler.handle(query).await;
650
651        assert!(result.is_ok());
652        let response = result.unwrap();
653        assert!(response.health.is_healthy);
654    }
655
656    #[tokio::test] 
657    async fn test_session_handler_creation() {
658        let repository = Arc::new(MockRepository::new());
659        let handler = SessionQueryHandler::new(repository.clone());
660        
661        // Test that handlers can be created successfully
662        assert!(std::ptr::eq(handler.repository.as_ref(), repository.as_ref()));
663    }
664
665    #[tokio::test]
666    async fn test_stream_handler_creation() {
667        let session_repository = Arc::new(MockRepository::new());
668        let stream_store = Arc::new(MockStreamStore);
669        let handler = StreamQueryHandler::new(session_repository.clone(), stream_store.clone());
670
671        // Test that handlers can be created successfully
672        assert!(std::ptr::eq(handler.session_repository.as_ref(), session_repository.as_ref()));
673    }
674
675    #[tokio::test]
676    async fn test_event_handler_creation() {
677        let event_store = Arc::new(MockEventStore::new());
678        let handler = EventQueryHandler::new(event_store.clone());
679
680        // Test that handlers can be created successfully
681        assert!(std::ptr::eq(handler.event_store.as_ref(), event_store.as_ref()));
682    }
683
684    #[tokio::test]
685    async fn test_system_handler_creation() {
686        let repository = Arc::new(MockRepository::new());
687        let handler = SystemQueryHandler::new(repository.clone());
688
689        // Test that handlers can be created successfully
690        assert!(std::ptr::eq(handler.repository.as_ref(), repository.as_ref()));
691    }
692}