pjson_rs/application/handlers/
query_handlers.rs1use crate::{
4 application::{ApplicationError, ApplicationResult, handlers::QueryHandler, queries::*},
5 domain::{
6 aggregates::StreamSession,
7 entities::Stream,
8 events::EventStore,
9 ports::{StreamRepository, StreamStore},
10 },
11};
12use async_trait::async_trait;
13use std::sync::Arc;
14
15#[derive(Debug)]
17pub struct SessionQueryHandler<R>
18where
19 R: StreamRepository,
20{
21 repository: Arc<R>,
22}
23
24impl<R> SessionQueryHandler<R>
25where
26 R: StreamRepository,
27{
28 pub fn new(repository: Arc<R>) -> Self {
29 Self { repository }
30 }
31}
32
33#[async_trait]
34impl<R> QueryHandler<GetSessionQuery, SessionResponse> for SessionQueryHandler<R>
35where
36 R: StreamRepository + Send + Sync,
37{
38 async fn handle(&self, query: GetSessionQuery) -> ApplicationResult<SessionResponse> {
39 let session = self
40 .repository
41 .find_session(query.session_id.into())
42 .await
43 .map_err(ApplicationError::Domain)?
44 .ok_or_else(|| {
45 ApplicationError::NotFound(format!("Session {} not found", query.session_id))
46 })?;
47
48 Ok(SessionResponse { session })
49 }
50}
51
52#[async_trait]
53impl<R> QueryHandler<GetActiveSessionsQuery, SessionsResponse> for SessionQueryHandler<R>
54where
55 R: StreamRepository + Send + Sync,
56{
57 async fn handle(&self, query: GetActiveSessionsQuery) -> ApplicationResult<SessionsResponse> {
58 let mut sessions = self
59 .repository
60 .find_active_sessions()
61 .await
62 .map_err(ApplicationError::Domain)?;
63
64 let total_count = sessions.len();
66
67 if let Some(offset) = query.offset {
68 if offset < sessions.len() {
69 sessions = sessions.into_iter().skip(offset).collect();
70 } else {
71 sessions.clear();
72 }
73 }
74
75 if let Some(limit) = query.limit {
76 sessions.truncate(limit);
77 }
78
79 Ok(SessionsResponse {
80 sessions,
81 total_count,
82 })
83 }
84}
85
86#[async_trait]
87impl<R> QueryHandler<GetSessionHealthQuery, HealthResponse> for SessionQueryHandler<R>
88where
89 R: StreamRepository + Send + Sync,
90{
91 async fn handle(&self, query: GetSessionHealthQuery) -> ApplicationResult<HealthResponse> {
92 let session = self
93 .repository
94 .find_session(query.session_id.into())
95 .await
96 .map_err(ApplicationError::Domain)?
97 .ok_or_else(|| {
98 ApplicationError::NotFound(format!("Session {} not found", query.session_id))
99 })?;
100
101 let health = session.health_check();
102
103 Ok(HealthResponse { health })
104 }
105}
106
107#[async_trait]
108impl<R> QueryHandler<SearchSessionsQuery, SessionsResponse> for SessionQueryHandler<R>
109where
110 R: StreamRepository + Send + Sync,
111{
112 async fn handle(&self, query: SearchSessionsQuery) -> ApplicationResult<SessionsResponse> {
113 let mut sessions = self
115 .repository
116 .find_active_sessions()
117 .await
118 .map_err(ApplicationError::Domain)?;
119
120 sessions.retain(|session| self.matches_filters(session, &query.filters));
122
123 if let Some(sort_field) = &query.sort_by {
125 let ascending = query
126 .sort_order
127 .as_ref()
128 .is_none_or(|order| matches!(order, SortOrder::Ascending));
129
130 sessions.sort_by(|a, b| {
131 let cmp = match sort_field {
132 SessionSortField::CreatedAt => a.created_at().cmp(&b.created_at()),
133 SessionSortField::UpdatedAt => a.updated_at().cmp(&b.updated_at()),
134 SessionSortField::StreamCount => a.streams().len().cmp(&b.streams().len()),
135 SessionSortField::TotalBytes => {
136 a.stats().total_bytes.cmp(&b.stats().total_bytes)
137 }
138 };
139
140 if ascending { cmp } else { cmp.reverse() }
141 });
142 }
143
144 let total_count = sessions.len();
146
147 if let Some(offset) = query.offset {
148 if offset < sessions.len() {
149 sessions = sessions.into_iter().skip(offset).collect();
150 } else {
151 sessions.clear();
152 }
153 }
154
155 if let Some(limit) = query.limit {
156 sessions.truncate(limit);
157 }
158
159 Ok(SessionsResponse {
160 sessions,
161 total_count,
162 })
163 }
164}
165
166impl<R> SessionQueryHandler<R>
167where
168 R: StreamRepository,
169{
170 fn matches_filters(&self, session: &StreamSession, filters: &SessionFilters) -> bool {
171 if let Some(ref state_filter) = filters.state {
173 let state_str = format!("{:?}", session.state()).to_lowercase();
174 if !state_str.contains(&state_filter.to_lowercase()) {
175 return false;
176 }
177 }
178
179 if let Some(after) = filters.created_after
181 && session.created_at() <= after {
182 return false;
183 }
184
185 if let Some(before) = filters.created_before
186 && session.created_at() >= before {
187 return false;
188 }
189
190 if let Some(ref client_filter) = filters.client_info {
192 let _ = client_filter; }
196
197 if let Some(has_active) = filters.has_active_streams {
199 let has_active_streams = session.streams().values().any(|stream| stream.is_active());
200 if has_active != has_active_streams {
201 return false;
202 }
203 }
204
205 true
206 }
207}
208
209#[derive(Debug)]
211pub struct StreamQueryHandler<R, S>
212where
213 R: StreamRepository,
214 S: StreamStore,
215{
216 session_repository: Arc<R>,
217 #[allow(dead_code)]
218 stream_store: Arc<S>,
219}
220
221impl<R, S> StreamQueryHandler<R, S>
222where
223 R: StreamRepository,
224 S: StreamStore,
225{
226 pub fn new(session_repository: Arc<R>, stream_store: Arc<S>) -> Self {
227 Self {
228 session_repository,
229 stream_store,
230 }
231 }
232}
233
234#[async_trait]
235impl<R, S> QueryHandler<GetStreamQuery, StreamResponse> for StreamQueryHandler<R, S>
236where
237 R: StreamRepository + Send + Sync,
238 S: StreamStore + Send + Sync,
239{
240 async fn handle(&self, query: GetStreamQuery) -> ApplicationResult<StreamResponse> {
241 let session = self
242 .session_repository
243 .find_session(query.session_id.into())
244 .await
245 .map_err(ApplicationError::Domain)?
246 .ok_or_else(|| {
247 ApplicationError::NotFound(format!("Session {} not found", query.session_id))
248 })?;
249
250 let stream = session
251 .get_stream(query.stream_id.into())
252 .ok_or_else(|| {
253 ApplicationError::NotFound(format!("Stream {} not found", query.stream_id))
254 })?
255 .clone();
256
257 Ok(StreamResponse { stream })
258 }
259}
260
261#[async_trait]
262impl<R, S> QueryHandler<GetStreamsForSessionQuery, StreamsResponse> for StreamQueryHandler<R, S>
263where
264 R: StreamRepository + Send + Sync,
265 S: StreamStore + Send + Sync,
266{
267 async fn handle(&self, query: GetStreamsForSessionQuery) -> ApplicationResult<StreamsResponse> {
268 let session = self
269 .session_repository
270 .find_session(query.session_id.into())
271 .await
272 .map_err(ApplicationError::Domain)?
273 .ok_or_else(|| {
274 ApplicationError::NotFound(format!("Session {} not found", query.session_id))
275 })?;
276
277 let streams: Vec<Stream> = session
278 .streams()
279 .values()
280 .filter(|stream| query.include_inactive || stream.is_active())
281 .cloned()
282 .collect();
283
284 Ok(StreamsResponse { streams })
285 }
286}
287
288#[derive(Debug)]
290pub struct EventQueryHandler<E>
291where
292 E: EventStore,
293{
294 event_store: Arc<E>,
295}
296
297impl<E> EventQueryHandler<E>
298where
299 E: EventStore,
300{
301 pub fn new(event_store: Arc<E>) -> Self {
302 Self { event_store }
303 }
304}
305
306#[async_trait]
307impl<E> QueryHandler<GetSessionEventsQuery, EventsResponse> for EventQueryHandler<E>
308where
309 E: EventStore + Send + Sync,
310{
311 async fn handle(&self, query: GetSessionEventsQuery) -> ApplicationResult<EventsResponse> {
312 let mut events = self
313 .event_store
314 .get_events_for_session(query.session_id.into())
315 .map_err(ApplicationError::Logic)?;
316
317 if let Some(since) = query.since {
319 events.retain(|event| event.timestamp() > since);
320 }
321
322 if let Some(ref event_types) = query.event_types {
324 events.retain(|event| event_types.contains(&event.event_type().to_string()));
325 }
326
327 let total_count = events.len();
328
329 if let Some(limit) = query.limit {
331 events.truncate(limit);
332 }
333
334 Ok(EventsResponse {
335 events,
336 total_count,
337 })
338 }
339}
340
341#[async_trait]
342impl<E> QueryHandler<GetStreamEventsQuery, EventsResponse> for EventQueryHandler<E>
343where
344 E: EventStore + Send + Sync,
345{
346 async fn handle(&self, query: GetStreamEventsQuery) -> ApplicationResult<EventsResponse> {
347 let mut events = self
348 .event_store
349 .get_events_for_stream(query.stream_id.into())
350 .map_err(ApplicationError::Logic)?;
351
352 if let Some(since) = query.since {
354 events.retain(|event| event.timestamp() > since);
355 }
356
357 let total_count = events.len();
358
359 if let Some(limit) = query.limit {
361 events.truncate(limit);
362 }
363
364 Ok(EventsResponse {
365 events,
366 total_count,
367 })
368 }
369}
370
371#[derive(Debug)]
373pub struct SystemQueryHandler<R>
374where
375 R: StreamRepository,
376{
377 repository: Arc<R>,
378}
379
380impl<R> SystemQueryHandler<R>
381where
382 R: StreamRepository,
383{
384 pub fn new(repository: Arc<R>) -> Self {
385 Self { repository }
386 }
387}
388
389#[async_trait]
390impl<R> QueryHandler<GetSystemStatsQuery, SystemStatsResponse> for SystemQueryHandler<R>
391where
392 R: StreamRepository + Send + Sync,
393{
394 async fn handle(&self, _query: GetSystemStatsQuery) -> ApplicationResult<SystemStatsResponse> {
395 let sessions = self
396 .repository
397 .find_active_sessions()
398 .await
399 .map_err(ApplicationError::Domain)?;
400
401 let total_sessions = sessions.len() as u64;
402 let active_sessions = sessions.iter().filter(|s| s.is_active()).count() as u64;
403
404 let mut total_streams = 0u64;
405 let mut active_streams = 0u64;
406 let mut total_frames = 0u64;
407 let mut total_bytes = 0u64;
408 let mut total_duration_ms = 0f64;
409 let mut completed_sessions = 0u64;
410
411 for session in &sessions {
412 let stats = session.stats();
413 total_streams += stats.total_streams;
414 active_streams += stats.active_streams;
415 total_frames += stats.total_frames;
416 total_bytes += stats.total_bytes;
417
418 if let Some(duration) = session.duration() {
419 total_duration_ms += duration.num_milliseconds() as f64;
420 completed_sessions += 1;
421 }
422 }
423
424 let average_session_duration_seconds = if completed_sessions > 0 {
425 total_duration_ms / completed_sessions as f64 / 1000.0
426 } else {
427 0.0
428 };
429
430 let uptime_seconds = 3600; let frames_per_second = total_frames as f64 / uptime_seconds as f64;
433 let bytes_per_second = total_bytes as f64 / uptime_seconds as f64;
434
435 Ok(SystemStatsResponse {
436 total_sessions,
437 active_sessions,
438 total_streams,
439 active_streams,
440 total_frames,
441 total_bytes,
442 average_session_duration_seconds,
443 frames_per_second,
444 bytes_per_second,
445 uptime_seconds: uptime_seconds as u64,
446 })
447 }
448}
449
450#[cfg(test)]
451mod tests {
452 use super::*;
453 use crate::domain::{
454 aggregates::{StreamSession, stream_session::SessionConfig},
455 value_objects::{SessionId, StreamId},
456 };
457 use std::collections::HashMap;
458
459 struct MockRepository {
461 sessions: std::sync::Mutex<HashMap<SessionId, StreamSession>>,
462 }
463
464 impl MockRepository {
465 fn new() -> Self {
466 Self {
467 sessions: std::sync::Mutex::new(HashMap::new()),
468 }
469 }
470
471 fn add_session(&self, session: StreamSession) {
472 self.sessions.lock().unwrap().insert(session.id(), session);
473 }
474 }
475
476 #[async_trait]
477 impl StreamRepository for MockRepository {
478 async fn find_session(
479 &self,
480 session_id: SessionId,
481 ) -> crate::domain::DomainResult<Option<StreamSession>> {
482 Ok(self.sessions.lock().unwrap().get(&session_id).cloned())
483 }
484
485 async fn save_session(&self, session: StreamSession) -> crate::domain::DomainResult<()> {
486 self.sessions.lock().unwrap().insert(session.id(), session);
487 Ok(())
488 }
489
490 async fn remove_session(&self, session_id: SessionId) -> crate::domain::DomainResult<()> {
491 self.sessions.lock().unwrap().remove(&session_id);
492 Ok(())
493 }
494
495 async fn find_active_sessions(&self) -> crate::domain::DomainResult<Vec<StreamSession>> {
496 Ok(self.sessions.lock().unwrap().values().cloned().collect())
497 }
498 }
499
500 struct MockStreamStore;
501
502 #[async_trait]
503 impl StreamStore for MockStreamStore {
504 async fn store_stream(&self, _stream: crate::domain::entities::Stream) -> crate::domain::DomainResult<()> {
505 Ok(())
506 }
507
508 async fn get_stream(&self, _stream_id: StreamId) -> crate::domain::DomainResult<Option<crate::domain::entities::Stream>> {
509 Ok(None)
510 }
511
512 async fn delete_stream(&self, _stream_id: StreamId) -> crate::domain::DomainResult<()> {
513 Ok(())
514 }
515
516 async fn list_streams_for_session(&self, _session_id: SessionId) -> crate::domain::DomainResult<Vec<crate::domain::entities::Stream>> {
517 Ok(vec![])
518 }
519 }
520
521 struct MockEventStore;
522
523 impl MockEventStore {
524 fn new() -> Self {
525 Self
526 }
527 }
528
529 impl EventStore for MockEventStore {
530 fn append_events(&mut self, _events: Vec<crate::domain::events::DomainEvent>) -> Result<(), String> {
531 Ok(())
532 }
533
534 fn get_events_for_session(&self, _session_id: SessionId) -> Result<Vec<crate::domain::events::DomainEvent>, String> {
535 Ok(vec![])
536 }
537
538 fn get_events_for_stream(&self, _stream_id: StreamId) -> Result<Vec<crate::domain::events::DomainEvent>, String> {
539 Ok(vec![])
540 }
541
542 fn get_events_since(&self, _since: chrono::DateTime<chrono::Utc>) -> Result<Vec<crate::domain::events::DomainEvent>, String> {
543 Ok(vec![])
544 }
545 }
546
547 #[tokio::test]
548 async fn test_get_session_query() {
549 let repository = Arc::new(MockRepository::new());
550 let handler = SessionQueryHandler::new(repository.clone());
551
552 let mut session = StreamSession::new(SessionConfig::default());
554 let _ = session.activate();
555 let session_id = session.id();
556 repository.add_session(session);
557
558 let query = GetSessionQuery { session_id: session_id.into() };
560 let result = handler.handle(query).await;
561
562 assert!(result.is_ok());
563 let response = result.unwrap();
564 assert_eq!(response.session.id(), session_id);
565 }
566
567 #[tokio::test]
568 async fn test_get_session_not_found() {
569 let repository = Arc::new(MockRepository::new());
570 let handler = SessionQueryHandler::new(repository);
571
572 let query = GetSessionQuery {
573 session_id: SessionId::new().into(),
574 };
575 let result = handler.handle(query).await;
576
577 assert!(result.is_err());
578 match result.err().unwrap() {
579 ApplicationError::NotFound(_) => {}
580 _ => panic!("Expected NotFound error"),
581 }
582 }
583
584 #[tokio::test]
585 async fn test_get_active_sessions_query() {
586 let repository = Arc::new(MockRepository::new());
587 let handler = SessionQueryHandler::new(repository.clone());
588
589 for i in 0..5 {
591 let mut session = StreamSession::new(SessionConfig::default());
592 if i < 3 {
593 let _ = session.activate();
594 }
595 repository.add_session(session);
596 }
597
598 let query = GetActiveSessionsQuery {
600 offset: None,
601 limit: None,
602 };
603 let result = handler.handle(query).await;
604
605 assert!(result.is_ok());
606 let response = result.unwrap();
607 assert_eq!(response.sessions.len(), 5);
608 assert_eq!(response.total_count, 5);
609 }
610
611 #[tokio::test]
612 async fn test_get_active_sessions_with_pagination() {
613 let repository = Arc::new(MockRepository::new());
614 let handler = SessionQueryHandler::new(repository.clone());
615
616 for _ in 0..10 {
618 let mut session = StreamSession::new(SessionConfig::default());
619 let _ = session.activate();
620 repository.add_session(session);
621 }
622
623 let query = GetActiveSessionsQuery {
625 offset: Some(3),
626 limit: Some(4),
627 };
628 let result = handler.handle(query).await;
629
630 assert!(result.is_ok());
631 let response = result.unwrap();
632 assert_eq!(response.sessions.len(), 4);
633 assert_eq!(response.total_count, 10);
634 }
635
636 #[tokio::test]
637 async fn test_get_session_health_query() {
638 let repository = Arc::new(MockRepository::new());
639 let handler = SessionQueryHandler::new(repository.clone());
640
641 let mut session = StreamSession::new(SessionConfig::default());
643 let _ = session.activate();
644 let session_id = session.id();
645 repository.add_session(session);
646
647 let query = GetSessionHealthQuery { session_id: session_id.into() };
649 let result = handler.handle(query).await;
650
651 assert!(result.is_ok());
652 let response = result.unwrap();
653 assert!(response.health.is_healthy);
654 }
655
656 #[tokio::test]
657 async fn test_session_handler_creation() {
658 let repository = Arc::new(MockRepository::new());
659 let handler = SessionQueryHandler::new(repository.clone());
660
661 assert!(std::ptr::eq(handler.repository.as_ref(), repository.as_ref()));
663 }
664
665 #[tokio::test]
666 async fn test_stream_handler_creation() {
667 let session_repository = Arc::new(MockRepository::new());
668 let stream_store = Arc::new(MockStreamStore);
669 let handler = StreamQueryHandler::new(session_repository.clone(), stream_store.clone());
670
671 assert!(std::ptr::eq(handler.session_repository.as_ref(), session_repository.as_ref()));
673 }
674
675 #[tokio::test]
676 async fn test_event_handler_creation() {
677 let event_store = Arc::new(MockEventStore::new());
678 let handler = EventQueryHandler::new(event_store.clone());
679
680 assert!(std::ptr::eq(handler.event_store.as_ref(), event_store.as_ref()));
682 }
683
684 #[tokio::test]
685 async fn test_system_handler_creation() {
686 let repository = Arc::new(MockRepository::new());
687 let handler = SystemQueryHandler::new(repository.clone());
688
689 assert!(std::ptr::eq(handler.repository.as_ref(), repository.as_ref()));
691 }
692}