1use axum::{
4 Json, Router,
5 extract::{DefaultBodyLimit, Path as AxumPath, Query, State},
6 http::{
7 HeaderValue, Method, StatusCode,
8 header::{self, AUTHORIZATION, CONTENT_TYPE},
9 },
10 middleware,
11 response::{IntoResponse, Response},
12 routing::{get, post},
13};
14use serde::{Deserialize, Serialize};
15use serde_json::Value as JsonValue;
16use std::{sync::Arc, time::Instant};
17use tower_http::{
18 cors::{AllowOrigin, CorsLayer},
19 trace::TraceLayer,
20};
21
22use crate::{
23 application::{
24 commands::*,
25 handlers::{
26 CommandHandlerGat, QueryHandlerGat,
27 command_handlers::SessionCommandHandler,
28 query_handlers::{SessionQueryHandler, StreamQueryHandler, SystemQueryHandler},
29 },
30 queries::*,
31 },
32 domain::{
33 aggregates::stream_session::{SessionConfig, SessionHealth},
34 ports::{
35 DictionaryStore, EventPublisherGat, NoopDictionaryStore, StreamRepositoryGat,
36 StreamStoreGat,
37 },
38 value_objects::{Priority, SessionId, StreamId},
39 },
40 infrastructure::http::middleware::{RateLimitMiddleware, security_middleware},
41};
42
43#[derive(Debug, Clone)]
56pub struct HttpServerConfig {
57 pub allowed_origins: Vec<String>,
74}
75
76impl Default for HttpServerConfig {
77 fn default() -> Self {
81 Self {
82 allowed_origins: vec!["http://localhost:3000".to_string()],
83 }
84 }
85}
86
87fn build_cors_layer(config: &HttpServerConfig) -> Result<CorsLayer, PjsError> {
95 let base = CorsLayer::new()
100 .allow_methods([Method::GET, Method::POST])
101 .allow_headers([CONTENT_TYPE, AUTHORIZATION])
102 .max_age(std::time::Duration::from_secs(3600));
103
104 let has_wildcard = config.allowed_origins.iter().any(|o| o == "*");
105 let has_explicit = config.allowed_origins.iter().any(|o| o != "*");
106
107 let layer = match (
108 config.allowed_origins.is_empty(),
109 has_wildcard,
110 has_explicit,
111 ) {
112 (true, _, _) => base.allow_origin(AllowOrigin::list(std::iter::empty::<HeaderValue>())),
113 (_, true, true) => {
114 return Err(PjsError::HttpError(
115 "CORS: wildcard '*' cannot be combined with explicit origins".into(),
116 ));
117 }
118 (_, true, false) => base.allow_origin(tower_http::cors::Any),
119 (_, false, _) => {
120 let origins: Vec<HeaderValue> = config
121 .allowed_origins
122 .iter()
123 .map(|o| {
124 o.parse::<HeaderValue>()
125 .map_err(|e| PjsError::HttpError(format!("invalid CORS origin {o:?}: {e}")))
126 })
127 .collect::<Result<_, _>>()?;
128 base.allow_origin(AllowOrigin::list(origins))
129 }
130 };
131 Ok(layer)
132}
133
134pub struct PjsAppState<R, P, S>
139where
140 R: StreamRepositoryGat + Send + Sync + 'static,
141 P: EventPublisherGat + Send + Sync + 'static,
142 S: StreamStoreGat + Send + Sync + 'static,
143{
144 command_handler: Arc<SessionCommandHandler<R, P>>,
145 session_query_handler: Arc<SessionQueryHandler<R>>,
146 stream_query_handler: Arc<StreamQueryHandler<R, S>>,
147 system_handler: Arc<SystemQueryHandler<R>>,
148 pub(crate) dictionary_store: Arc<dyn DictionaryStore>,
149}
150
151impl<R, P, S> Clone for PjsAppState<R, P, S>
152where
153 R: StreamRepositoryGat + Send + Sync + 'static,
154 P: EventPublisherGat + Send + Sync + 'static,
155 S: StreamStoreGat + Send + Sync + 'static,
156{
157 fn clone(&self) -> Self {
158 Self {
159 command_handler: self.command_handler.clone(),
160 session_query_handler: self.session_query_handler.clone(),
161 stream_query_handler: self.stream_query_handler.clone(),
162 system_handler: self.system_handler.clone(),
163 dictionary_store: self.dictionary_store.clone(),
164 }
165 }
166}
167
168impl<R, P, S> PjsAppState<R, P, S>
169where
170 R: StreamRepositoryGat + Send + Sync + 'static,
171 P: EventPublisherGat + Send + Sync + 'static,
172 S: StreamStoreGat + Send + Sync + 'static,
173{
174 pub fn new(repository: Arc<R>, event_publisher: Arc<P>, stream_store: Arc<S>) -> Self {
182 Self::with_dictionary_store(
183 repository,
184 event_publisher,
185 stream_store,
186 Arc::new(NoopDictionaryStore),
187 )
188 }
189
190 pub fn with_dictionary_store(
195 repository: Arc<R>,
196 event_publisher: Arc<P>,
197 stream_store: Arc<S>,
198 dictionary_store: Arc<dyn DictionaryStore>,
199 ) -> Self {
200 let started_at = Instant::now();
201 Self {
202 command_handler: Arc::new(SessionCommandHandler::new(
203 repository.clone(),
204 event_publisher,
205 )),
206 session_query_handler: Arc::new(SessionQueryHandler::new(repository.clone())),
207 stream_query_handler: Arc::new(StreamQueryHandler::new(
208 repository.clone(),
209 stream_store,
210 )),
211 system_handler: Arc::new(SystemQueryHandler::with_start_time(repository, started_at)),
212 dictionary_store,
213 }
214 }
215}
216
217#[derive(Debug, Deserialize)]
219pub struct CreateSessionRequest {
220 pub max_concurrent_streams: Option<usize>,
221 pub timeout_seconds: Option<u64>,
222 pub client_info: Option<String>,
223}
224
225#[derive(Debug, Serialize)]
227pub struct CreateSessionResponse {
228 pub session_id: String,
229 pub expires_at: chrono::DateTime<chrono::Utc>,
230}
231
232#[derive(Debug, Deserialize)]
234pub struct StartStreamRequest {
235 pub data: JsonValue,
236 pub priority_threshold: Option<u8>,
237 pub max_frames: Option<usize>,
238}
239
240#[derive(Debug, Deserialize)]
242pub struct StreamParams {
243 pub session_id: String,
244 pub priority: Option<u8>,
245 pub format: Option<String>,
246}
247
248#[derive(Debug, Serialize)]
250pub struct SessionHealthResponse {
251 pub is_healthy: bool,
252 pub active_streams: usize,
253 pub failed_streams: usize,
254 pub is_expired: bool,
255 pub uptime_seconds: i64,
256}
257
258impl From<SessionHealth> for SessionHealthResponse {
259 fn from(health: SessionHealth) -> Self {
260 Self {
261 is_healthy: health.is_healthy,
262 active_streams: health.active_streams,
263 failed_streams: health.failed_streams,
264 is_expired: health.is_expired,
265 uptime_seconds: health.uptime_seconds,
266 }
267 }
268}
269
270pub fn create_pjs_router<R, P, S>() -> Router<PjsAppState<R, P, S>>
282where
283 R: StreamRepositoryGat + Send + Sync + 'static,
284 P: EventPublisherGat + Send + Sync + 'static,
285 S: StreamStoreGat + Send + Sync + 'static,
286{
287 create_pjs_router_with_config::<R, P, S>(&HttpServerConfig::default())
288 .expect("default HttpServerConfig must always produce a valid CORS layer")
289}
290
291pub fn create_pjs_router_with_config<R, P, S>(
309 config: &HttpServerConfig,
310) -> Result<Router<PjsAppState<R, P, S>>, PjsError>
311where
312 R: StreamRepositoryGat + Send + Sync + 'static,
313 P: EventPublisherGat + Send + Sync + 'static,
314 S: StreamStoreGat + Send + Sync + 'static,
315{
316 let all_routes = public_routes::<R, P, S>().merge(protected_routes::<R, P, S>());
317 apply_common_layers(all_routes, config)
318}
319
320pub fn create_pjs_router_with_rate_limit<R, P, S>(
334 rate_limit_middleware: RateLimitMiddleware,
335) -> Router<PjsAppState<R, P, S>>
336where
337 R: StreamRepositoryGat + Send + Sync + 'static,
338 P: EventPublisherGat + Send + Sync + 'static,
339 S: StreamStoreGat + Send + Sync + 'static,
340{
341 create_pjs_router_with_rate_limit_and_config::<R, P, S>(
342 &HttpServerConfig::default(),
343 rate_limit_middleware,
344 )
345 .expect("default HttpServerConfig must always produce a valid CORS layer")
346}
347
348pub fn create_pjs_router_with_rate_limit_and_config<R, P, S>(
354 config: &HttpServerConfig,
355 rate_limit_middleware: RateLimitMiddleware,
356) -> Result<Router<PjsAppState<R, P, S>>, PjsError>
357where
358 R: StreamRepositoryGat + Send + Sync + 'static,
359 P: EventPublisherGat + Send + Sync + 'static,
360 S: StreamStoreGat + Send + Sync + 'static,
361{
362 let all_routes = public_routes::<R, P, S>()
363 .merge(protected_routes::<R, P, S>())
364 .layer(rate_limit_middleware);
365 apply_common_layers(all_routes, config)
366}
367
368#[cfg(feature = "http-server")]
392pub fn create_pjs_router_with_auth<R, P, S>(
393 config: &HttpServerConfig,
394 auth: crate::infrastructure::http::auth::ApiKeyAuthLayer,
395) -> Result<Router<PjsAppState<R, P, S>>, PjsError>
396where
397 R: StreamRepositoryGat + Send + Sync + 'static,
398 P: EventPublisherGat + Send + Sync + 'static,
399 S: StreamStoreGat + Send + Sync + 'static,
400{
401 let protected = protected_routes::<R, P, S>().layer(auth);
404 let merged = public_routes::<R, P, S>().merge(protected);
405 apply_common_layers(merged, config)
406}
407
408#[cfg(feature = "http-server")]
427pub fn create_pjs_router_with_rate_limit_and_auth<R, P, S>(
428 config: &HttpServerConfig,
429 rate_limit: RateLimitMiddleware,
430 auth: crate::infrastructure::http::auth::ApiKeyAuthLayer,
431) -> Result<Router<PjsAppState<R, P, S>>, PjsError>
432where
433 R: StreamRepositoryGat + Send + Sync + 'static,
434 P: EventPublisherGat + Send + Sync + 'static,
435 S: StreamStoreGat + Send + Sync + 'static,
436{
437 let protected = protected_routes::<R, P, S>().layer(auth);
440 let merged = public_routes::<R, P, S>()
441 .merge(protected)
442 .layer(rate_limit);
443 apply_common_layers(merged, config)
444}
445
446fn public_routes<R, P, S>() -> Router<PjsAppState<R, P, S>>
452where
453 R: StreamRepositoryGat + Send + Sync + 'static,
454 P: EventPublisherGat + Send + Sync + 'static,
455 S: StreamStoreGat + Send + Sync + 'static,
456{
457 let router = Router::new().route("/pjs/health", get(system_health));
458
459 #[cfg(feature = "metrics")]
460 let router = router.route(
461 "/metrics",
462 get(crate::infrastructure::http::metrics::metrics_handler),
463 );
464
465 router
466}
467
468fn protected_routes<R, P, S>() -> Router<PjsAppState<R, P, S>>
470where
471 R: StreamRepositoryGat + Send + Sync + 'static,
472 P: EventPublisherGat + Send + Sync + 'static,
473 S: StreamStoreGat + Send + Sync + 'static,
474{
475 let router = Router::new()
476 .route("/pjs/sessions", post(create_session::<R, P, S>))
477 .route("/pjs/sessions/{session_id}", get(get_session::<R, P, S>))
478 .route(
479 "/pjs/sessions/{session_id}/health",
480 get(session_health::<R, P, S>),
481 )
482 .route(
483 "/pjs/sessions/{session_id}/stats",
484 get(get_session_stats::<R, P, S>),
485 )
486 .route(
487 "/pjs/sessions/{session_id}/streams",
488 post(create_stream::<R, P, S>),
489 )
490 .route(
491 "/pjs/sessions/{session_id}/streams/{stream_id}/start",
492 post(start_stream::<R, P, S>),
493 )
494 .route(
495 "/pjs/sessions/{session_id}/streams/{stream_id}",
496 get(get_stream::<R, P, S>),
497 )
498 .route(
499 "/pjs/sessions/{session_id}/streams/{stream_id}/frames",
500 get(get_stream_frames::<R, P, S>),
501 )
502 .route("/pjs/sessions/search", get(search_sessions::<R, P, S>))
503 .route("/pjs/sessions", get(list_sessions::<R, P, S>))
504 .route("/pjs/stats", get(get_system_stats::<R, P, S>));
505
506 #[cfg(all(feature = "compression", not(target_arch = "wasm32")))]
507 let router = router.route(
508 "/pjs/sessions/{session_id}/dictionary",
509 get(crate::infrastructure::http::dictionary::get_session_dictionary::<R, P, S>),
510 );
511
512 router
513}
514
515fn apply_common_layers<R, P, S>(
525 router: Router<PjsAppState<R, P, S>>,
526 config: &HttpServerConfig,
527) -> Result<Router<PjsAppState<R, P, S>>, PjsError>
528where
529 R: StreamRepositoryGat + Send + Sync + 'static,
530 P: EventPublisherGat + Send + Sync + 'static,
531 S: StreamStoreGat + Send + Sync + 'static,
532{
533 let cors = build_cors_layer(config)?;
534 Ok(router
535 .layer(middleware::from_fn(security_middleware))
536 .layer(DefaultBodyLimit::max(10 * 1024 * 1024))
537 .layer(cors)
538 .layer(TraceLayer::new_for_http()))
539}
540
541async fn create_session<R, P, S>(
543 State(state): State<PjsAppState<R, P, S>>,
544 headers: axum::http::HeaderMap,
545 Json(request): Json<CreateSessionRequest>,
546) -> Result<Json<CreateSessionResponse>, PjsError>
547where
548 R: StreamRepositoryGat + Send + Sync + 'static,
549 P: EventPublisherGat + Send + Sync + 'static,
550 S: StreamStoreGat + Send + Sync + 'static,
551{
552 let config = SessionConfig {
553 max_concurrent_streams: request.max_concurrent_streams.unwrap_or(10),
554 session_timeout_seconds: request.timeout_seconds.unwrap_or(3600),
555 default_stream_config: Default::default(),
556 enable_compression: true,
557 metadata: Default::default(),
558 };
559
560 let user_agent = headers
561 .get(header::USER_AGENT)
562 .and_then(|h| h.to_str().ok())
563 .map(String::from);
564
565 let command = CreateSessionCommand {
566 config,
567 client_info: request.client_info,
568 user_agent,
569 ip_address: None,
570 };
571
572 let session_id: SessionId = CommandHandlerGat::handle(&*state.command_handler, command)
573 .await
574 .map_err(PjsError::Application)?;
575
576 let expires_at = chrono::Utc::now()
577 + chrono::Duration::seconds(request.timeout_seconds.unwrap_or(3600) as i64);
578
579 Ok(Json(CreateSessionResponse {
580 session_id: session_id.to_string(),
581 expires_at,
582 }))
583}
584
585async fn get_session<R, P, S>(
587 State(state): State<PjsAppState<R, P, S>>,
588 AxumPath(session_id): AxumPath<String>,
589) -> Result<Json<SessionResponse>, PjsError>
590where
591 R: StreamRepositoryGat + Send + Sync + 'static,
592 P: EventPublisherGat + Send + Sync + 'static,
593 S: StreamStoreGat + Send + Sync + 'static,
594{
595 let session_id =
596 SessionId::from_string(&session_id).map_err(|_| PjsError::InvalidSessionId(session_id))?;
597
598 let query = GetSessionQuery {
599 session_id: session_id.into(),
600 };
601
602 let response = <SessionQueryHandler<R> as QueryHandlerGat<GetSessionQuery>>::handle(
603 &*state.session_query_handler,
604 query,
605 )
606 .await
607 .map_err(PjsError::Application)?;
608
609 Ok(Json(response))
610}
611
612async fn session_health<R, P, S>(
614 State(state): State<PjsAppState<R, P, S>>,
615 AxumPath(session_id): AxumPath<String>,
616) -> Result<Json<SessionHealthResponse>, PjsError>
617where
618 R: StreamRepositoryGat + Send + Sync + 'static,
619 P: EventPublisherGat + Send + Sync + 'static,
620 S: StreamStoreGat + Send + Sync + 'static,
621{
622 let session_id =
623 SessionId::from_string(&session_id).map_err(|_| PjsError::InvalidSessionId(session_id))?;
624
625 let query = GetSessionHealthQuery {
626 session_id: session_id.into(),
627 };
628
629 let response = <SessionQueryHandler<R> as QueryHandlerGat<GetSessionHealthQuery>>::handle(
630 &*state.session_query_handler,
631 query,
632 )
633 .await
634 .map_err(PjsError::Application)?;
635
636 Ok(Json(SessionHealthResponse::from(response.health)))
637}
638
639async fn create_stream<R, P, S>(
645 State(state): State<PjsAppState<R, P, S>>,
646 AxumPath(session_id): AxumPath<String>,
647 Json(request): Json<StartStreamRequest>,
648) -> Result<Json<serde_json::Value>, PjsError>
649where
650 R: StreamRepositoryGat + Send + Sync + 'static,
651 P: EventPublisherGat + Send + Sync + 'static,
652 S: StreamStoreGat + Send + Sync + 'static,
653{
654 let session_id =
655 SessionId::from_string(&session_id).map_err(|_| PjsError::InvalidSessionId(session_id))?;
656
657 let command = CreateStreamCommand {
658 session_id: session_id.into(),
659 source_data: request.data,
660 config: None,
661 };
662
663 let stream_id: StreamId = CommandHandlerGat::handle(&*state.command_handler, command)
664 .await
665 .map_err(PjsError::Application)?;
666
667 Ok(Json(serde_json::json!({
668 "stream_id": stream_id.to_string(),
669 "status": "created"
670 })))
671}
672
673async fn start_stream<R, P, S>(
675 State(state): State<PjsAppState<R, P, S>>,
676 AxumPath((session_id, stream_id)): AxumPath<(String, String)>,
677) -> Result<Json<serde_json::Value>, PjsError>
678where
679 R: StreamRepositoryGat + Send + Sync + 'static,
680 P: EventPublisherGat + Send + Sync + 'static,
681 S: StreamStoreGat + Send + Sync + 'static,
682{
683 let session_id = SessionId::from_string(&session_id)
684 .map_err(|_| PjsError::InvalidSessionId(session_id.clone()))?;
685 let stream_id =
686 StreamId::from_string(&stream_id).map_err(|_| PjsError::InvalidStreamId(stream_id))?;
687
688 let command = StartStreamCommand {
689 session_id: session_id.into(),
690 stream_id: stream_id.into(),
691 };
692
693 <SessionCommandHandler<R, P> as CommandHandlerGat<StartStreamCommand>>::handle(
694 &*state.command_handler,
695 command,
696 )
697 .await
698 .map_err(PjsError::Application)?;
699
700 Ok(Json(serde_json::json!({
701 "stream_id": stream_id.to_string(),
702 "status": "started"
703 })))
704}
705
706async fn get_stream<R, P, S>(
708 State(state): State<PjsAppState<R, P, S>>,
709 AxumPath((session_id, stream_id)): AxumPath<(String, String)>,
710) -> Result<Json<StreamResponse>, PjsError>
711where
712 R: StreamRepositoryGat + Send + Sync + 'static,
713 P: EventPublisherGat + Send + Sync + 'static,
714 S: StreamStoreGat + Send + Sync + 'static,
715{
716 let session_id = SessionId::from_string(&session_id)
717 .map_err(|_| PjsError::InvalidSessionId(session_id.clone()))?;
718 let stream_id =
719 StreamId::from_string(&stream_id).map_err(|_| PjsError::InvalidStreamId(stream_id))?;
720
721 let query = GetStreamQuery {
722 session_id: session_id.into(),
723 stream_id: stream_id.into(),
724 };
725
726 let response = <StreamQueryHandler<R, S> as QueryHandlerGat<GetStreamQuery>>::handle(
727 &*state.stream_query_handler,
728 query,
729 )
730 .await
731 .map_err(PjsError::Application)?;
732
733 Ok(Json(response))
734}
735
736async fn list_sessions<R, P, S>(
738 State(state): State<PjsAppState<R, P, S>>,
739 Query(params): Query<PaginationParams>,
740) -> Result<Json<SessionsResponse>, PjsError>
741where
742 R: StreamRepositoryGat + Send + Sync + 'static,
743 P: EventPublisherGat + Send + Sync + 'static,
744 S: StreamStoreGat + Send + Sync + 'static,
745{
746 let query = GetActiveSessionsQuery {
747 limit: params.limit,
748 offset: params.offset,
749 };
750
751 let response = <SessionQueryHandler<R> as QueryHandlerGat<GetActiveSessionsQuery>>::handle(
752 &*state.session_query_handler,
753 query,
754 )
755 .await
756 .map_err(PjsError::Application)?;
757
758 Ok(Json(response))
759}
760
761async fn search_sessions<R, P, S>(
763 State(state): State<PjsAppState<R, P, S>>,
764 Query(params): Query<SearchSessionsParams>,
765) -> Result<Json<SessionsResponse>, PjsError>
766where
767 R: StreamRepositoryGat + Send + Sync + 'static,
768 P: EventPublisherGat + Send + Sync + 'static,
769 S: StreamStoreGat + Send + Sync + 'static,
770{
771 let sort_by = params.sort_by.as_deref().and_then(|s| match s {
772 "created_at" => Some(SessionSortField::CreatedAt),
773 "updated_at" => Some(SessionSortField::UpdatedAt),
774 "stream_count" => Some(SessionSortField::StreamCount),
775 "total_bytes" => Some(SessionSortField::TotalBytes),
776 _ => None,
777 });
778 let sort_order = params.sort_order.as_deref().and_then(|s| match s {
779 "ascending" | "asc" => Some(SortOrder::Ascending),
780 "descending" | "desc" => Some(SortOrder::Descending),
781 _ => None,
782 });
783 let query = SearchSessionsQuery {
784 filters: SessionFilters {
785 state: params.state,
786 created_after: None,
787 created_before: None,
788 client_info: None,
789 has_active_streams: None,
790 },
791 sort_by,
792 sort_order,
793 limit: params.limit,
794 offset: params.offset,
795 };
796 let response = <SessionQueryHandler<R> as QueryHandlerGat<SearchSessionsQuery>>::handle(
797 &*state.session_query_handler,
798 query,
799 )
800 .await
801 .map_err(PjsError::Application)?;
802 Ok(Json(response))
803}
804
805#[derive(Debug, Deserialize)]
807pub struct PaginationParams {
808 pub limit: Option<usize>,
809 pub offset: Option<usize>,
810}
811
812#[derive(Debug, Deserialize)]
814pub struct SearchSessionsParams {
815 pub state: Option<String>,
816 pub sort_by: Option<String>,
817 pub sort_order: Option<String>,
818 pub limit: Option<usize>,
819 pub offset: Option<usize>,
820}
821
822async fn system_health() -> Json<serde_json::Value> {
824 Json(serde_json::json!({
825 "status": "healthy",
826 "version": env!("CARGO_PKG_VERSION"),
827 "features": ["pjs_streaming", "axum_integration", "gat_handlers"]
828 }))
829}
830
831async fn get_system_stats<R, P, S>(
833 State(state): State<PjsAppState<R, P, S>>,
834) -> Result<Json<SystemStatsResponse>, PjsError>
835where
836 R: StreamRepositoryGat + Send + Sync + 'static,
837 P: EventPublisherGat + Send + Sync + 'static,
838 S: StreamStoreGat + Send + Sync + 'static,
839{
840 let query = GetSystemStatsQuery {
841 include_historical: false,
842 };
843
844 let response = <SystemQueryHandler<R> as QueryHandlerGat<GetSystemStatsQuery>>::handle(
845 &*state.system_handler,
846 query,
847 )
848 .await
849 .map_err(PjsError::Application)?;
850
851 Ok(Json(response))
852}
853
854#[derive(Debug, Deserialize)]
856pub struct FrameQueryParams {
857 pub since_sequence: Option<u64>,
858 pub priority: Option<u8>,
859 pub limit: Option<usize>,
860}
861
862async fn get_stream_frames<R, P, S>(
864 State(state): State<PjsAppState<R, P, S>>,
865 AxumPath((session_id, stream_id)): AxumPath<(String, String)>,
866 Query(params): Query<FrameQueryParams>,
867) -> Result<Json<FramesResponse>, PjsError>
868where
869 R: StreamRepositoryGat + Send + Sync + 'static,
870 P: EventPublisherGat + Send + Sync + 'static,
871 S: StreamStoreGat + Send + Sync + 'static,
872{
873 let session_id = SessionId::from_string(&session_id)
874 .map_err(|_| PjsError::InvalidSessionId(session_id.clone()))?;
875 let stream_id =
876 StreamId::from_string(&stream_id).map_err(|_| PjsError::InvalidStreamId(stream_id))?;
877
878 let priority_filter = params
879 .priority
880 .map(|p| Priority::new(p).map(Into::into))
881 .transpose()
882 .map_err(|e: crate::domain::DomainError| PjsError::InvalidPriority(e.to_string()))?;
883
884 let query = GetStreamFramesQuery {
885 session_id: session_id.into(),
886 stream_id: stream_id.into(),
887 since_sequence: params.since_sequence,
888 priority_filter,
889 limit: params.limit,
890 };
891
892 let response = <StreamQueryHandler<R, S> as QueryHandlerGat<GetStreamFramesQuery>>::handle(
893 &*state.stream_query_handler,
894 query,
895 )
896 .await
897 .map_err(PjsError::Application)?;
898
899 Ok(Json(response))
900}
901
902async fn get_session_stats<R, P, S>(
904 State(state): State<PjsAppState<R, P, S>>,
905 AxumPath(session_id): AxumPath<String>,
906) -> Result<Json<SessionStatsResponse>, PjsError>
907where
908 R: StreamRepositoryGat + Send + Sync + 'static,
909 P: EventPublisherGat + Send + Sync + 'static,
910 S: StreamStoreGat + Send + Sync + 'static,
911{
912 let session_id =
913 SessionId::from_string(&session_id).map_err(|_| PjsError::InvalidSessionId(session_id))?;
914
915 let query = GetSessionStatsQuery {
916 session_id: session_id.into(),
917 };
918
919 let response = <SessionQueryHandler<R> as QueryHandlerGat<GetSessionStatsQuery>>::handle(
920 &*state.session_query_handler,
921 query,
922 )
923 .await
924 .map_err(PjsError::Application)?;
925
926 Ok(Json(response))
927}
928
929#[derive(Debug, thiserror::Error)]
952pub enum PjsError {
953 #[error("Application error: {0}")]
954 Application(#[from] crate::application::ApplicationError),
955
956 #[error("Invalid session ID: {0}")]
957 InvalidSessionId(String),
958
959 #[error("Invalid stream ID: {0}")]
960 InvalidStreamId(String),
961
962 #[error("Invalid priority: {0}")]
963 InvalidPriority(String),
964
965 #[error("HTTP error: {0}")]
966 HttpError(String),
967}
968
969impl IntoResponse for PjsError {
970 fn into_response(self) -> Response {
971 let (status, error_message) = match &self {
972 PjsError::Application(app_err) => {
973 use crate::application::ApplicationError;
974 let status = match app_err {
975 ApplicationError::NotFound(_) => StatusCode::NOT_FOUND,
976 ApplicationError::Validation(_) => StatusCode::BAD_REQUEST,
977 ApplicationError::Authorization(_) => StatusCode::UNAUTHORIZED,
978 ApplicationError::Concurrency(_) | ApplicationError::Conflict(_) => {
979 StatusCode::CONFLICT
980 }
981 ApplicationError::Domain(_) | ApplicationError::Logic(_) => {
982 StatusCode::INTERNAL_SERVER_ERROR
983 }
984 };
985 (status, self.to_string())
986 }
987 PjsError::InvalidSessionId(_) => (StatusCode::BAD_REQUEST, self.to_string()),
988 PjsError::InvalidStreamId(_) => (StatusCode::BAD_REQUEST, self.to_string()),
989 PjsError::InvalidPriority(_) => (StatusCode::BAD_REQUEST, self.to_string()),
990 PjsError::HttpError(_) => (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()),
991 };
992
993 let body = Json(serde_json::json!({
994 "error": error_message
995 }));
996
997 (status, body).into_response()
998 }
999}
1000
1001#[cfg(test)]
1002mod tests {
1003 use super::*;
1004
1005 #[test]
1008 fn cors_empty_origins_denies_all() {
1009 let config = HttpServerConfig {
1010 allowed_origins: vec![],
1011 };
1012 let result = build_cors_layer(&config);
1014 assert!(
1015 result.is_ok(),
1016 "empty origins should return Ok (deny-all layer)"
1017 );
1018 }
1019
1020 #[test]
1021 fn cors_wildcard_only_is_ok() {
1022 let config = HttpServerConfig {
1023 allowed_origins: vec!["*".to_string()],
1024 };
1025 let result = build_cors_layer(&config);
1026 assert!(result.is_ok(), "wildcard-only should return Ok");
1027 }
1028
1029 #[test]
1030 fn cors_mixed_wildcard_and_explicit_is_err() {
1031 let config = HttpServerConfig {
1032 allowed_origins: vec!["*".to_string(), "http://example.com".to_string()],
1033 };
1034 let result = build_cors_layer(&config);
1035 assert!(
1036 result.is_err(),
1037 "mixing wildcard with explicit origins must fail"
1038 );
1039 let msg = result.unwrap_err().to_string();
1040 assert!(
1041 msg.contains("wildcard"),
1042 "error message should mention wildcard: {msg}"
1043 );
1044 }
1045
1046 #[test]
1047 fn cors_valid_single_origin_is_ok() {
1048 let config = HttpServerConfig {
1049 allowed_origins: vec!["http://example.com".to_string()],
1050 };
1051 assert!(build_cors_layer(&config).is_ok());
1052 }
1053
1054 #[test]
1055 fn cors_valid_multiple_origins_is_ok() {
1056 let config = HttpServerConfig {
1057 allowed_origins: vec![
1058 "https://app.example.com".to_string(),
1059 "https://admin.example.com".to_string(),
1060 ],
1061 };
1062 assert!(build_cors_layer(&config).is_ok());
1063 }
1064
1065 #[test]
1066 fn cors_invalid_origin_string_is_err() {
1067 let config = HttpServerConfig {
1068 allowed_origins: vec!["not a\nvalid header".to_string()],
1070 };
1071 let result = build_cors_layer(&config);
1072 assert!(result.is_err(), "invalid origin string must return Err");
1073 }
1074
1075 #[test]
1076 fn default_config_is_valid() {
1077 assert!(
1080 build_cors_layer(&HttpServerConfig::default()).is_ok(),
1081 "default HttpServerConfig must produce a valid CORS layer"
1082 );
1083 }
1084
1085 use crate::domain::{
1088 aggregates::StreamSession,
1089 entities::Stream,
1090 events::DomainEvent,
1091 ports::{
1092 EventPublisherGat, Pagination, PriorityDistribution, SessionHealthSnapshot,
1093 SessionQueryCriteria, SessionQueryResult, StreamFilter, StreamRepositoryGat,
1094 StreamStatistics, StreamStatus, StreamStoreGat,
1095 },
1096 value_objects::{SessionId, StreamId},
1097 };
1098 use chrono::Utc;
1099 use std::collections::HashMap;
1100
1101 struct MockRepository {
1102 sessions: parking_lot::Mutex<HashMap<SessionId, StreamSession>>,
1103 }
1104
1105 impl MockRepository {
1106 fn new() -> Self {
1107 Self {
1108 sessions: parking_lot::Mutex::new(HashMap::new()),
1109 }
1110 }
1111 }
1112
1113 impl StreamRepositoryGat for MockRepository {
1114 type FindSessionFuture<'a>
1115 = impl std::future::Future<Output = crate::domain::DomainResult<Option<StreamSession>>>
1116 + Send
1117 + 'a
1118 where
1119 Self: 'a;
1120
1121 type SaveSessionFuture<'a>
1122 = impl std::future::Future<Output = crate::domain::DomainResult<()>> + Send + 'a
1123 where
1124 Self: 'a;
1125
1126 type RemoveSessionFuture<'a>
1127 = impl std::future::Future<Output = crate::domain::DomainResult<()>> + Send + 'a
1128 where
1129 Self: 'a;
1130
1131 type FindActiveSessionsFuture<'a>
1132 = impl std::future::Future<Output = crate::domain::DomainResult<Vec<StreamSession>>>
1133 + Send
1134 + 'a
1135 where
1136 Self: 'a;
1137
1138 type FindSessionsByCriteriaFuture<'a>
1139 = impl std::future::Future<Output = crate::domain::DomainResult<SessionQueryResult>>
1140 + Send
1141 + 'a
1142 where
1143 Self: 'a;
1144
1145 type GetSessionHealthFuture<'a>
1146 = impl std::future::Future<Output = crate::domain::DomainResult<SessionHealthSnapshot>>
1147 + Send
1148 + 'a
1149 where
1150 Self: 'a;
1151
1152 type SessionExistsFuture<'a>
1153 = impl std::future::Future<Output = crate::domain::DomainResult<bool>> + Send + 'a
1154 where
1155 Self: 'a;
1156
1157 fn find_session(&self, session_id: SessionId) -> Self::FindSessionFuture<'_> {
1158 async move { Ok(self.sessions.lock().get(&session_id).cloned()) }
1159 }
1160
1161 fn save_session(&self, session: StreamSession) -> Self::SaveSessionFuture<'_> {
1162 async move {
1163 self.sessions.lock().insert(session.id(), session);
1164 Ok(())
1165 }
1166 }
1167
1168 fn remove_session(&self, session_id: SessionId) -> Self::RemoveSessionFuture<'_> {
1169 async move {
1170 self.sessions.lock().remove(&session_id);
1171 Ok(())
1172 }
1173 }
1174
1175 fn find_active_sessions(&self) -> Self::FindActiveSessionsFuture<'_> {
1176 async move { Ok(self.sessions.lock().values().cloned().collect()) }
1177 }
1178
1179 fn find_sessions_by_criteria(
1180 &self,
1181 _criteria: SessionQueryCriteria,
1182 pagination: Pagination,
1183 ) -> Self::FindSessionsByCriteriaFuture<'_> {
1184 async move {
1185 let sessions: Vec<_> = self.sessions.lock().values().cloned().collect();
1186 let total_count = sessions.len();
1187 let paginated: Vec<_> = sessions
1188 .into_iter()
1189 .skip(pagination.offset)
1190 .take(pagination.limit)
1191 .collect();
1192 let has_more = pagination.offset + paginated.len() < total_count;
1193 Ok(SessionQueryResult {
1194 sessions: paginated,
1195 total_count,
1196 has_more,
1197 query_duration_ms: 0,
1198 scan_limit_reached: false,
1199 })
1200 }
1201 }
1202
1203 fn get_session_health(&self, session_id: SessionId) -> Self::GetSessionHealthFuture<'_> {
1204 async move {
1205 Ok(SessionHealthSnapshot {
1206 session_id,
1207 is_healthy: true,
1208 active_streams: 0,
1209 total_frames: 0,
1210 last_activity: Utc::now(),
1211 error_rate: 0.0,
1212 metrics: HashMap::new(),
1213 })
1214 }
1215 }
1216
1217 fn session_exists(&self, session_id: SessionId) -> Self::SessionExistsFuture<'_> {
1218 async move { Ok(self.sessions.lock().contains_key(&session_id)) }
1219 }
1220 }
1221
1222 struct MockEventPublisher;
1223
1224 impl EventPublisherGat for MockEventPublisher {
1225 type PublishFuture<'a>
1226 = impl std::future::Future<Output = crate::domain::DomainResult<()>> + Send + 'a
1227 where
1228 Self: 'a;
1229
1230 type PublishBatchFuture<'a>
1231 = impl std::future::Future<Output = crate::domain::DomainResult<()>> + Send + 'a
1232 where
1233 Self: 'a;
1234
1235 fn publish(&self, _event: DomainEvent) -> Self::PublishFuture<'_> {
1236 async move { Ok(()) }
1237 }
1238
1239 fn publish_batch(&self, _events: Vec<DomainEvent>) -> Self::PublishBatchFuture<'_> {
1240 async move { Ok(()) }
1241 }
1242 }
1243
1244 struct MockStreamStore;
1245
1246 impl StreamStoreGat for MockStreamStore {
1247 type StoreStreamFuture<'a>
1248 = impl std::future::Future<Output = crate::domain::DomainResult<()>> + Send + 'a
1249 where
1250 Self: 'a;
1251
1252 type GetStreamFuture<'a>
1253 = impl std::future::Future<Output = crate::domain::DomainResult<Option<Stream>>>
1254 + Send
1255 + 'a
1256 where
1257 Self: 'a;
1258
1259 type DeleteStreamFuture<'a>
1260 = impl std::future::Future<Output = crate::domain::DomainResult<()>> + Send + 'a
1261 where
1262 Self: 'a;
1263
1264 type ListStreamsForSessionFuture<'a>
1265 =
1266 impl std::future::Future<Output = crate::domain::DomainResult<Vec<Stream>>> + Send + 'a
1267 where
1268 Self: 'a;
1269
1270 type FindStreamsBySessionFuture<'a>
1271 =
1272 impl std::future::Future<Output = crate::domain::DomainResult<Vec<Stream>>> + Send + 'a
1273 where
1274 Self: 'a;
1275
1276 type UpdateStreamStatusFuture<'a>
1277 = impl std::future::Future<Output = crate::domain::DomainResult<()>> + Send + 'a
1278 where
1279 Self: 'a;
1280
1281 type GetStreamStatisticsFuture<'a>
1282 = impl std::future::Future<Output = crate::domain::DomainResult<StreamStatistics>>
1283 + Send
1284 + 'a
1285 where
1286 Self: 'a;
1287
1288 fn store_stream(&self, _stream: Stream) -> Self::StoreStreamFuture<'_> {
1289 async move { Ok(()) }
1290 }
1291
1292 fn get_stream(&self, _stream_id: StreamId) -> Self::GetStreamFuture<'_> {
1293 async move { Ok(None) }
1294 }
1295
1296 fn delete_stream(&self, _stream_id: StreamId) -> Self::DeleteStreamFuture<'_> {
1297 async move { Ok(()) }
1298 }
1299
1300 fn list_streams_for_session(
1301 &self,
1302 _session_id: SessionId,
1303 ) -> Self::ListStreamsForSessionFuture<'_> {
1304 async move { Ok(vec![]) }
1305 }
1306
1307 fn find_streams_by_session(
1308 &self,
1309 _session_id: SessionId,
1310 _filter: StreamFilter,
1311 ) -> Self::FindStreamsBySessionFuture<'_> {
1312 async move { Ok(vec![]) }
1313 }
1314
1315 fn update_stream_status(
1316 &self,
1317 _stream_id: StreamId,
1318 _status: StreamStatus,
1319 ) -> Self::UpdateStreamStatusFuture<'_> {
1320 async move { Ok(()) }
1321 }
1322
1323 fn get_stream_statistics(
1324 &self,
1325 _stream_id: StreamId,
1326 ) -> Self::GetStreamStatisticsFuture<'_> {
1327 async move {
1328 Ok(StreamStatistics {
1329 total_frames: 0,
1330 total_bytes: 0,
1331 priority_distribution: PriorityDistribution::default(),
1332 avg_frame_size: 0.0,
1333 creation_time: Utc::now(),
1334 completion_time: None,
1335 processing_duration: None,
1336 })
1337 }
1338 }
1339 }
1340
1341 #[tokio::test]
1342 async fn test_system_health() {
1343 let response = system_health().await;
1344 let health_data: serde_json::Value = response.0;
1345
1346 assert_eq!(health_data["status"], "healthy");
1347 assert!(!health_data["features"].as_array().unwrap().is_empty());
1348 }
1349
1350 #[tokio::test]
1351 async fn test_app_state_creation() {
1352 let repository = Arc::new(MockRepository::new());
1353 let event_publisher = Arc::new(MockEventPublisher);
1354 let stream_store = Arc::new(MockStreamStore);
1355
1356 let _state = PjsAppState::new(repository, event_publisher, stream_store);
1357 }
1358
1359 #[tokio::test]
1360 async fn test_get_system_stats_returns_real_uptime() {
1361 use crate::application::handlers::QueryHandlerGat;
1362 use crate::application::handlers::query_handlers::SystemQueryHandler;
1363 use crate::application::queries::GetSystemStatsQuery;
1364 use std::time::{Duration, Instant};
1365
1366 let repository = Arc::new(MockRepository::new());
1367 let started_at = Instant::now() - Duration::from_secs(5);
1369 let handler = SystemQueryHandler::with_start_time(repository, started_at);
1370
1371 let query = GetSystemStatsQuery {
1372 include_historical: false,
1373 };
1374 let result = QueryHandlerGat::handle(&handler, query).await.unwrap();
1375
1376 assert!(
1378 result.uptime_seconds >= 5,
1379 "uptime_seconds should be at least 5, got {}",
1380 result.uptime_seconds
1381 );
1382 assert_ne!(
1384 result.uptime_seconds, 3600,
1385 "uptime_seconds must not be the hard-coded placeholder 3600"
1386 );
1387 }
1388
1389 #[cfg(feature = "metrics")]
1390 #[tokio::test]
1391 async fn test_metrics_endpoint_returns_prometheus_format() {
1392 use crate::infrastructure::http::metrics::install_global_recorder;
1393
1394 let handle = install_global_recorder().expect("recorder install should succeed");
1396 let rendered = handle.render();
1397 assert!(
1400 !rendered.contains("{\"error\""),
1401 "rendered metrics should not be a JSON error: {rendered}"
1402 );
1403
1404 let handle2 = install_global_recorder().expect("second call must not fail");
1406 assert_eq!(
1407 handle.render(),
1408 handle2.render(),
1409 "both handles must render the same metrics"
1410 );
1411 }
1412
1413 #[cfg(feature = "metrics")]
1414 #[test]
1415 fn test_metrics_router_has_metrics_route() {
1416 let _router =
1419 create_pjs_router_with_config::<MockRepository, MockEventPublisher, MockStreamStore>(
1420 &HttpServerConfig::default(),
1421 )
1422 .expect("router should build successfully with metrics feature");
1423 }
1424
1425 #[tokio::test]
1426 async fn search_sessions_route_returns_ok() {
1427 use axum::http::Request;
1428 use tower::ServiceExt;
1429
1430 let repository = Arc::new(MockRepository::new());
1431 let event_publisher = Arc::new(MockEventPublisher);
1432 let stream_store = Arc::new(MockStreamStore);
1433 let state = PjsAppState::new(repository, event_publisher, stream_store);
1434
1435 let router =
1436 create_pjs_router_with_config::<MockRepository, MockEventPublisher, MockStreamStore>(
1437 &HttpServerConfig::default(),
1438 )
1439 .expect("router should build")
1440 .with_state(state);
1441
1442 let req = Request::builder()
1443 .uri("/pjs/sessions/search")
1444 .body(axum::body::Body::empty())
1445 .unwrap();
1446
1447 let resp = router.oneshot(req).await.unwrap();
1448 assert_eq!(resp.status(), StatusCode::OK);
1449 }
1450}