1use axum::{
4 Json, Router,
5 extract::{DefaultBodyLimit, Path as AxumPath, Query, State},
6 http::{
7 HeaderValue, Method, StatusCode,
8 header::{self, AUTHORIZATION, CONTENT_TYPE},
9 },
10 middleware,
11 response::{IntoResponse, Response},
12 routing::{get, post},
13};
14use serde::{Deserialize, Serialize};
15use serde_json::Value as JsonValue;
16use std::{sync::Arc, time::Instant};
17use tower_http::{
18 cors::{AllowOrigin, CorsLayer},
19 trace::TraceLayer,
20};
21
22use crate::{
23 application::{
24 commands::*,
25 dto::PriorityDto,
26 handlers::{
27 CommandHandlerGat, QueryHandlerGat,
28 command_handlers::SessionCommandHandler,
29 query_handlers::{SessionQueryHandler, StreamQueryHandler, SystemQueryHandler},
30 },
31 queries::*,
32 },
33 domain::{
34 aggregates::stream_session::{SessionConfig, SessionHealth},
35 entities::Frame,
36 ports::{
37 DictionaryStore, EventPublisherGat, FrameStoreGat, NoopDictionaryStore,
38 StreamRepositoryGat, StreamStoreGat,
39 },
40 value_objects::{Priority, SessionId, StreamId},
41 },
42 infrastructure::{
43 adapters::InMemoryFrameStore,
44 http::middleware::{RateLimitMiddleware, security_middleware},
45 },
46};
47
48#[derive(Debug, Clone)]
67#[non_exhaustive]
68pub struct HttpServerConfig {
69 pub allowed_origins: Vec<String>,
86}
87
88impl HttpServerConfig {
89 pub fn new(allowed_origins: Vec<String>) -> Self {
104 Self { allowed_origins }
105 }
106}
107
108impl Default for HttpServerConfig {
109 fn default() -> Self {
113 Self {
114 allowed_origins: vec!["http://localhost:3000".to_string()],
115 }
116 }
117}
118
119fn build_cors_layer(config: &HttpServerConfig) -> Result<CorsLayer, PjsError> {
127 let base = CorsLayer::new()
132 .allow_methods([Method::GET, Method::POST])
133 .allow_headers([CONTENT_TYPE, AUTHORIZATION])
134 .max_age(std::time::Duration::from_secs(3600));
135
136 let has_wildcard = config.allowed_origins.iter().any(|o| o == "*");
137 let has_explicit = config.allowed_origins.iter().any(|o| o != "*");
138
139 let layer = match (
140 config.allowed_origins.is_empty(),
141 has_wildcard,
142 has_explicit,
143 ) {
144 (true, _, _) => base.allow_origin(AllowOrigin::list(std::iter::empty::<HeaderValue>())),
145 (_, true, true) => {
146 return Err(PjsError::HttpError(
147 "CORS: wildcard '*' cannot be combined with explicit origins".into(),
148 ));
149 }
150 (_, true, false) => base.allow_origin(tower_http::cors::Any),
151 (_, false, _) => {
152 let origins: Vec<HeaderValue> = config
153 .allowed_origins
154 .iter()
155 .map(|o| {
156 o.parse::<HeaderValue>()
157 .map_err(|e| PjsError::HttpError(format!("invalid CORS origin {o:?}: {e}")))
158 })
159 .collect::<Result<_, _>>()?;
160 base.allow_origin(AllowOrigin::list(origins))
161 }
162 };
163 Ok(layer)
164}
165
166pub struct PjsAppState<R, P, S, F = InMemoryFrameStore>
171where
172 R: StreamRepositoryGat + Send + Sync + 'static,
173 P: EventPublisherGat + Send + Sync + 'static,
174 S: StreamStoreGat + Send + Sync + 'static,
175 F: FrameStoreGat + Send + Sync + 'static,
176{
177 command_handler: Arc<SessionCommandHandler<R, P, F>>,
178 session_query_handler: Arc<SessionQueryHandler<R>>,
179 stream_query_handler: Arc<StreamQueryHandler<R, S, F>>,
180 system_handler: Arc<SystemQueryHandler<R>>,
181 pub(crate) dictionary_store: Arc<dyn DictionaryStore>,
182}
183
184impl<R, P, S, F> Clone for PjsAppState<R, P, S, F>
185where
186 R: StreamRepositoryGat + Send + Sync + 'static,
187 P: EventPublisherGat + Send + Sync + 'static,
188 S: StreamStoreGat + Send + Sync + 'static,
189 F: FrameStoreGat + Send + Sync + 'static,
190{
191 fn clone(&self) -> Self {
192 Self {
193 command_handler: self.command_handler.clone(),
194 session_query_handler: self.session_query_handler.clone(),
195 stream_query_handler: self.stream_query_handler.clone(),
196 system_handler: self.system_handler.clone(),
197 dictionary_store: self.dictionary_store.clone(),
198 }
199 }
200}
201
202impl<R, P, S> PjsAppState<R, P, S, InMemoryFrameStore>
203where
204 R: StreamRepositoryGat + Send + Sync + 'static,
205 P: EventPublisherGat + Send + Sync + 'static,
206 S: StreamStoreGat + Send + Sync + 'static,
207{
208 pub fn new(repository: Arc<R>, event_publisher: Arc<P>, stream_store: Arc<S>) -> Self {
217 Self::with_dictionary_store(
218 repository,
219 event_publisher,
220 stream_store,
221 Arc::new(NoopDictionaryStore),
222 )
223 }
224
225 pub fn with_dictionary_store(
231 repository: Arc<R>,
232 event_publisher: Arc<P>,
233 stream_store: Arc<S>,
234 dictionary_store: Arc<dyn DictionaryStore>,
235 ) -> Self {
236 Self::with_stores(
237 repository,
238 event_publisher,
239 stream_store,
240 dictionary_store,
241 Arc::new(InMemoryFrameStore::new()),
242 )
243 }
244}
245
246impl<R, P, S, F> PjsAppState<R, P, S, F>
247where
248 R: StreamRepositoryGat + Send + Sync + 'static,
249 P: EventPublisherGat + Send + Sync + 'static,
250 S: StreamStoreGat + Send + Sync + 'static,
251 F: FrameStoreGat + Send + Sync + 'static,
252{
253 pub fn with_stores(
256 repository: Arc<R>,
257 event_publisher: Arc<P>,
258 stream_store: Arc<S>,
259 dictionary_store: Arc<dyn DictionaryStore>,
260 frame_store: Arc<F>,
261 ) -> Self {
262 let started_at = Instant::now();
263 Self {
264 command_handler: Arc::new(SessionCommandHandler::with_stores(
265 repository.clone(),
266 event_publisher,
267 dictionary_store.clone(),
268 frame_store.clone(),
269 )),
270 session_query_handler: Arc::new(SessionQueryHandler::new(repository.clone())),
271 stream_query_handler: Arc::new(StreamQueryHandler::new(
272 repository.clone(),
273 stream_store,
274 frame_store,
275 )),
276 system_handler: Arc::new(SystemQueryHandler::with_start_time(repository, started_at)),
277 dictionary_store,
278 }
279 }
280}
281
282#[derive(Debug, Deserialize)]
284pub struct CreateSessionRequest {
285 pub max_concurrent_streams: Option<usize>,
287 pub timeout_seconds: Option<u64>,
289 pub client_info: Option<String>,
291}
292
293#[derive(Debug, Serialize)]
295pub struct CreateSessionResponse {
296 pub session_id: String,
298 pub expires_at: chrono::DateTime<chrono::Utc>,
300}
301
302#[derive(Debug, Deserialize)]
304pub struct StartStreamRequest {
305 pub data: JsonValue,
307 pub priority_threshold: Option<u8>,
309 pub max_frames: Option<usize>,
311}
312
313#[derive(Debug, Deserialize)]
315pub struct StreamParams {
316 pub session_id: String,
318 pub priority: Option<u8>,
320 pub format: Option<String>,
322}
323
324#[derive(Debug, Default, Deserialize)]
332pub struct GenerateFramesRequest {
333 pub priority_threshold: Option<u8>,
335 pub max_frames: Option<usize>,
337}
338
339#[derive(Debug, Serialize)]
346pub struct GenerateFramesResponse {
347 pub frames: Vec<Frame>,
349 pub frame_count: usize,
351}
352
353#[derive(Debug, Serialize)]
355pub struct SessionHealthResponse {
356 pub is_healthy: bool,
358 pub active_streams: usize,
360 pub failed_streams: usize,
362 pub is_expired: bool,
364 pub uptime_seconds: i64,
366}
367
368impl From<SessionHealth> for SessionHealthResponse {
369 fn from(health: SessionHealth) -> Self {
370 Self {
371 is_healthy: health.is_healthy,
372 active_streams: health.active_streams,
373 failed_streams: health.failed_streams,
374 is_expired: health.is_expired,
375 uptime_seconds: health.uptime_seconds,
376 }
377 }
378}
379
380pub fn create_pjs_router<R, P, S>() -> Router<PjsAppState<R, P, S>>
392where
393 R: StreamRepositoryGat + Send + Sync + 'static,
394 P: EventPublisherGat + Send + Sync + 'static,
395 S: StreamStoreGat + Send + Sync + 'static,
396{
397 create_pjs_router_with_config::<R, P, S>(&HttpServerConfig::default())
398 .expect("default HttpServerConfig must always produce a valid CORS layer")
399}
400
401pub fn create_pjs_router_with_config<R, P, S>(
418 config: &HttpServerConfig,
419) -> Result<Router<PjsAppState<R, P, S>>, PjsError>
420where
421 R: StreamRepositoryGat + Send + Sync + 'static,
422 P: EventPublisherGat + Send + Sync + 'static,
423 S: StreamStoreGat + Send + Sync + 'static,
424{
425 let all_routes = public_routes::<R, P, S>().merge(protected_routes::<R, P, S>());
426 apply_common_layers(all_routes, config)
427}
428
429pub fn create_pjs_router_with_rate_limit<R, P, S>(
443 rate_limit_middleware: RateLimitMiddleware,
444) -> Router<PjsAppState<R, P, S>>
445where
446 R: StreamRepositoryGat + Send + Sync + 'static,
447 P: EventPublisherGat + Send + Sync + 'static,
448 S: StreamStoreGat + Send + Sync + 'static,
449{
450 create_pjs_router_with_rate_limit_and_config::<R, P, S>(
451 &HttpServerConfig::default(),
452 rate_limit_middleware,
453 )
454 .expect("default HttpServerConfig must always produce a valid CORS layer")
455}
456
457pub fn create_pjs_router_with_rate_limit_and_config<R, P, S>(
463 config: &HttpServerConfig,
464 rate_limit_middleware: RateLimitMiddleware,
465) -> Result<Router<PjsAppState<R, P, S>>, PjsError>
466where
467 R: StreamRepositoryGat + Send + Sync + 'static,
468 P: EventPublisherGat + Send + Sync + 'static,
469 S: StreamStoreGat + Send + Sync + 'static,
470{
471 let all_routes = public_routes::<R, P, S>()
472 .merge(protected_routes::<R, P, S>())
473 .layer(rate_limit_middleware);
474 apply_common_layers(all_routes, config)
475}
476
477#[cfg(feature = "http-server")]
501pub fn create_pjs_router_with_auth<R, P, S>(
502 config: &HttpServerConfig,
503 auth: crate::infrastructure::http::auth::ApiKeyAuthLayer,
504) -> Result<Router<PjsAppState<R, P, S>>, PjsError>
505where
506 R: StreamRepositoryGat + Send + Sync + 'static,
507 P: EventPublisherGat + Send + Sync + 'static,
508 S: StreamStoreGat + Send + Sync + 'static,
509{
510 let protected = protected_routes::<R, P, S>().layer(auth);
513 let merged = public_routes::<R, P, S>().merge(protected);
514 apply_common_layers(merged, config)
515}
516
517#[cfg(feature = "http-server")]
536pub fn create_pjs_router_with_rate_limit_and_auth<R, P, S>(
537 config: &HttpServerConfig,
538 rate_limit: RateLimitMiddleware,
539 auth: crate::infrastructure::http::auth::ApiKeyAuthLayer,
540) -> Result<Router<PjsAppState<R, P, S>>, PjsError>
541where
542 R: StreamRepositoryGat + Send + Sync + 'static,
543 P: EventPublisherGat + Send + Sync + 'static,
544 S: StreamStoreGat + Send + Sync + 'static,
545{
546 let protected = protected_routes::<R, P, S>().layer(auth);
549 let merged = public_routes::<R, P, S>()
550 .merge(protected)
551 .layer(rate_limit);
552 apply_common_layers(merged, config)
553}
554
555fn public_routes<R, P, S>() -> Router<PjsAppState<R, P, S>>
561where
562 R: StreamRepositoryGat + Send + Sync + 'static,
563 P: EventPublisherGat + Send + Sync + 'static,
564 S: StreamStoreGat + Send + Sync + 'static,
565{
566 let router = Router::new().route("/pjs/health", get(system_health));
567
568 #[cfg(feature = "metrics")]
569 let router = router.route(
570 "/metrics",
571 get(crate::infrastructure::http::metrics::metrics_handler),
572 );
573
574 router
575}
576
577fn protected_routes<R, P, S>() -> Router<PjsAppState<R, P, S>>
579where
580 R: StreamRepositoryGat + Send + Sync + 'static,
581 P: EventPublisherGat + Send + Sync + 'static,
582 S: StreamStoreGat + Send + Sync + 'static,
583{
584 let router = Router::new()
585 .route("/pjs/sessions", post(create_session::<R, P, S>))
586 .route("/pjs/sessions/{session_id}", get(get_session::<R, P, S>))
587 .route(
588 "/pjs/sessions/{session_id}/health",
589 get(session_health::<R, P, S>),
590 )
591 .route(
592 "/pjs/sessions/{session_id}/stats",
593 get(get_session_stats::<R, P, S>),
594 )
595 .route(
596 "/pjs/sessions/{session_id}/streams",
597 post(create_stream::<R, P, S>),
598 )
599 .route(
600 "/pjs/sessions/{session_id}/streams/{stream_id}/start",
601 post(start_stream::<R, P, S>),
602 )
603 .route(
604 "/pjs/sessions/{session_id}/streams/{stream_id}/generate-frames",
605 post(generate_frames::<R, P, S>),
606 )
607 .route(
608 "/pjs/sessions/{session_id}/streams/{stream_id}",
609 get(get_stream::<R, P, S>),
610 )
611 .route(
612 "/pjs/sessions/{session_id}/streams/{stream_id}/frames",
613 get(get_stream_frames::<R, P, S>),
614 )
615 .route("/pjs/sessions/search", get(search_sessions::<R, P, S>))
616 .route("/pjs/sessions", get(list_sessions::<R, P, S>))
617 .route("/pjs/stats", get(get_system_stats::<R, P, S>));
618
619 #[cfg(all(feature = "compression", not(target_arch = "wasm32")))]
620 let router = router.route(
621 "/pjs/sessions/{session_id}/dictionary",
622 get(crate::infrastructure::http::dictionary::get_session_dictionary::<R, P, S>),
623 );
624
625 router
626}
627
628fn apply_common_layers<R, P, S>(
638 router: Router<PjsAppState<R, P, S>>,
639 config: &HttpServerConfig,
640) -> Result<Router<PjsAppState<R, P, S>>, PjsError>
641where
642 R: StreamRepositoryGat + Send + Sync + 'static,
643 P: EventPublisherGat + Send + Sync + 'static,
644 S: StreamStoreGat + Send + Sync + 'static,
645{
646 let cors = build_cors_layer(config)?;
647 Ok(router
648 .layer(middleware::from_fn(security_middleware))
649 .layer(DefaultBodyLimit::max(10 * 1024 * 1024))
650 .layer(cors)
651 .layer(TraceLayer::new_for_http()))
652}
653
654async fn create_session<R, P, S>(
656 State(state): State<PjsAppState<R, P, S>>,
657 headers: axum::http::HeaderMap,
658 Json(request): Json<CreateSessionRequest>,
659) -> Result<Json<CreateSessionResponse>, PjsError>
660where
661 R: StreamRepositoryGat + Send + Sync + 'static,
662 P: EventPublisherGat + Send + Sync + 'static,
663 S: StreamStoreGat + Send + Sync + 'static,
664{
665 let config = SessionConfig {
666 max_concurrent_streams: request.max_concurrent_streams.unwrap_or(10),
667 session_timeout_seconds: request.timeout_seconds.unwrap_or(3600),
668 default_stream_config: Default::default(),
669 enable_compression: true,
670 metadata: Default::default(),
671 };
672
673 let user_agent = headers
674 .get(header::USER_AGENT)
675 .and_then(|h| h.to_str().ok())
676 .map(String::from);
677
678 let command = CreateSessionCommand {
679 config,
680 client_info: request.client_info,
681 user_agent,
682 ip_address: None,
683 };
684
685 let session_id: SessionId = CommandHandlerGat::handle(&*state.command_handler, command)
686 .await
687 .map_err(PjsError::Application)?;
688
689 let expires_at = chrono::Utc::now()
690 + chrono::Duration::seconds(request.timeout_seconds.unwrap_or(3600) as i64);
691
692 Ok(Json(CreateSessionResponse {
693 session_id: session_id.to_string(),
694 expires_at,
695 }))
696}
697
698async fn get_session<R, P, S>(
700 State(state): State<PjsAppState<R, P, S>>,
701 AxumPath(session_id): AxumPath<String>,
702) -> Result<Json<SessionResponse>, PjsError>
703where
704 R: StreamRepositoryGat + Send + Sync + 'static,
705 P: EventPublisherGat + Send + Sync + 'static,
706 S: StreamStoreGat + Send + Sync + 'static,
707{
708 let session_id =
709 SessionId::from_string(&session_id).map_err(|_| PjsError::InvalidSessionId(session_id))?;
710
711 let query = GetSessionQuery {
712 session_id: session_id.into(),
713 };
714
715 let response = <SessionQueryHandler<R> as QueryHandlerGat<GetSessionQuery>>::handle(
716 &*state.session_query_handler,
717 query,
718 )
719 .await
720 .map_err(PjsError::Application)?;
721
722 Ok(Json(response))
723}
724
725async fn session_health<R, P, S>(
727 State(state): State<PjsAppState<R, P, S>>,
728 AxumPath(session_id): AxumPath<String>,
729) -> Result<Json<SessionHealthResponse>, PjsError>
730where
731 R: StreamRepositoryGat + Send + Sync + 'static,
732 P: EventPublisherGat + Send + Sync + 'static,
733 S: StreamStoreGat + Send + Sync + 'static,
734{
735 let session_id =
736 SessionId::from_string(&session_id).map_err(|_| PjsError::InvalidSessionId(session_id))?;
737
738 let query = GetSessionHealthQuery {
739 session_id: session_id.into(),
740 };
741
742 let response = <SessionQueryHandler<R> as QueryHandlerGat<GetSessionHealthQuery>>::handle(
743 &*state.session_query_handler,
744 query,
745 )
746 .await
747 .map_err(PjsError::Application)?;
748
749 Ok(Json(SessionHealthResponse::from(response.health)))
750}
751
752async fn create_stream<R, P, S>(
758 State(state): State<PjsAppState<R, P, S>>,
759 AxumPath(session_id): AxumPath<String>,
760 Json(request): Json<StartStreamRequest>,
761) -> Result<Json<serde_json::Value>, PjsError>
762where
763 R: StreamRepositoryGat + Send + Sync + 'static,
764 P: EventPublisherGat + Send + Sync + 'static,
765 S: StreamStoreGat + Send + Sync + 'static,
766{
767 let session_id =
768 SessionId::from_string(&session_id).map_err(|_| PjsError::InvalidSessionId(session_id))?;
769
770 let command = CreateStreamCommand {
771 session_id: session_id.into(),
772 source_data: request.data,
773 config: None,
774 };
775
776 let stream_id: StreamId = CommandHandlerGat::handle(&*state.command_handler, command)
777 .await
778 .map_err(PjsError::Application)?;
779
780 Ok(Json(serde_json::json!({
781 "stream_id": stream_id.to_string(),
782 "status": "created"
783 })))
784}
785
786async fn start_stream<R, P, S>(
788 State(state): State<PjsAppState<R, P, S>>,
789 AxumPath((session_id, stream_id)): AxumPath<(String, String)>,
790) -> Result<Json<serde_json::Value>, PjsError>
791where
792 R: StreamRepositoryGat + Send + Sync + 'static,
793 P: EventPublisherGat + Send + Sync + 'static,
794 S: StreamStoreGat + Send + Sync + 'static,
795{
796 let session_id = SessionId::from_string(&session_id)
797 .map_err(|_| PjsError::InvalidSessionId(session_id.clone()))?;
798 let stream_id =
799 StreamId::from_string(&stream_id).map_err(|_| PjsError::InvalidStreamId(stream_id))?;
800
801 let command = StartStreamCommand {
802 session_id: session_id.into(),
803 stream_id: stream_id.into(),
804 };
805
806 <SessionCommandHandler<R, P> as CommandHandlerGat<StartStreamCommand>>::handle(
807 &*state.command_handler,
808 command,
809 )
810 .await
811 .map_err(PjsError::Application)?;
812
813 Ok(Json(serde_json::json!({
814 "stream_id": stream_id.to_string(),
815 "status": "started"
816 })))
817}
818
819async fn generate_frames<R, P, S>(
827 State(state): State<PjsAppState<R, P, S>>,
828 AxumPath((session_id, stream_id)): AxumPath<(String, String)>,
829 request: Option<Json<GenerateFramesRequest>>,
830) -> Result<Json<GenerateFramesResponse>, PjsError>
831where
832 R: StreamRepositoryGat + Send + Sync + 'static,
833 P: EventPublisherGat + Send + Sync + 'static,
834 S: StreamStoreGat + Send + Sync + 'static,
835{
836 let session_id = SessionId::from_string(&session_id)
837 .map_err(|_| PjsError::InvalidSessionId(session_id.clone()))?;
838 let stream_id =
839 StreamId::from_string(&stream_id).map_err(|_| PjsError::InvalidStreamId(stream_id))?;
840
841 let Json(request) = request.unwrap_or_default();
842
843 let priority_value = request
844 .priority_threshold
845 .unwrap_or(Priority::BACKGROUND.value());
846 let priority_threshold =
847 PriorityDto::new(priority_value).map_err(|e| PjsError::InvalidPriority(e.to_string()))?;
848 let max_frames = request.max_frames.unwrap_or(16);
849
850 let command = GenerateFramesCommand {
851 session_id: session_id.into(),
852 stream_id: stream_id.into(),
853 priority_threshold,
854 max_frames,
855 };
856
857 let frames: Vec<Frame> = <SessionCommandHandler<R, P> as CommandHandlerGat<
858 GenerateFramesCommand,
859 >>::handle(&*state.command_handler, command)
860 .await
861 .map_err(PjsError::Application)?;
862
863 let frame_count = frames.len();
864 Ok(Json(GenerateFramesResponse {
865 frames,
866 frame_count,
867 }))
868}
869
870async fn get_stream<R, P, S>(
872 State(state): State<PjsAppState<R, P, S>>,
873 AxumPath((session_id, stream_id)): AxumPath<(String, String)>,
874) -> Result<Json<StreamResponse>, PjsError>
875where
876 R: StreamRepositoryGat + Send + Sync + 'static,
877 P: EventPublisherGat + Send + Sync + 'static,
878 S: StreamStoreGat + Send + Sync + 'static,
879{
880 let session_id = SessionId::from_string(&session_id)
881 .map_err(|_| PjsError::InvalidSessionId(session_id.clone()))?;
882 let stream_id =
883 StreamId::from_string(&stream_id).map_err(|_| PjsError::InvalidStreamId(stream_id))?;
884
885 let query = GetStreamQuery {
886 session_id: session_id.into(),
887 stream_id: stream_id.into(),
888 };
889
890 let response = <StreamQueryHandler<R, S, InMemoryFrameStore> as QueryHandlerGat<
891 GetStreamQuery,
892 >>::handle(&*state.stream_query_handler, query)
893 .await
894 .map_err(PjsError::Application)?;
895
896 Ok(Json(response))
897}
898
899async fn list_sessions<R, P, S>(
901 State(state): State<PjsAppState<R, P, S>>,
902 Query(params): Query<PaginationParams>,
903) -> Result<Json<SessionsResponse>, PjsError>
904where
905 R: StreamRepositoryGat + Send + Sync + 'static,
906 P: EventPublisherGat + Send + Sync + 'static,
907 S: StreamStoreGat + Send + Sync + 'static,
908{
909 let query = GetActiveSessionsQuery {
910 limit: params.limit,
911 offset: params.offset,
912 };
913
914 let response = <SessionQueryHandler<R> as QueryHandlerGat<GetActiveSessionsQuery>>::handle(
915 &*state.session_query_handler,
916 query,
917 )
918 .await
919 .map_err(PjsError::Application)?;
920
921 Ok(Json(response))
922}
923
924async fn search_sessions<R, P, S>(
926 State(state): State<PjsAppState<R, P, S>>,
927 Query(params): Query<SearchSessionsParams>,
928) -> Result<Json<SessionsResponse>, PjsError>
929where
930 R: StreamRepositoryGat + Send + Sync + 'static,
931 P: EventPublisherGat + Send + Sync + 'static,
932 S: StreamStoreGat + Send + Sync + 'static,
933{
934 let sort_by = params.sort_by.as_deref().and_then(|s| match s {
935 "created_at" => Some(SessionSortField::CreatedAt),
936 "updated_at" => Some(SessionSortField::UpdatedAt),
937 "stream_count" => Some(SessionSortField::StreamCount),
938 "total_bytes" => Some(SessionSortField::TotalBytes),
939 _ => None,
940 });
941 let sort_order = params.sort_order.as_deref().and_then(|s| match s {
942 "ascending" | "asc" => Some(SortOrder::Ascending),
943 "descending" | "desc" => Some(SortOrder::Descending),
944 _ => None,
945 });
946 let query = SearchSessionsQuery {
947 filters: SessionFilters {
948 state: params.state,
949 created_after: None,
950 created_before: None,
951 client_info: None,
952 has_active_streams: None,
953 },
954 sort_by,
955 sort_order,
956 limit: params.limit,
957 offset: params.offset,
958 };
959 let response = <SessionQueryHandler<R> as QueryHandlerGat<SearchSessionsQuery>>::handle(
960 &*state.session_query_handler,
961 query,
962 )
963 .await
964 .map_err(PjsError::Application)?;
965 Ok(Json(response))
966}
967
968#[derive(Debug, Deserialize)]
970pub struct PaginationParams {
971 pub limit: Option<usize>,
973 pub offset: Option<usize>,
975}
976
977#[derive(Debug, Deserialize)]
979pub struct SearchSessionsParams {
980 pub state: Option<String>,
982 pub sort_by: Option<String>,
984 pub sort_order: Option<String>,
986 pub limit: Option<usize>,
988 pub offset: Option<usize>,
990}
991
992async fn system_health() -> Json<serde_json::Value> {
994 Json(serde_json::json!({
995 "status": "healthy",
996 "version": env!("CARGO_PKG_VERSION"),
997 "features": ["pjs_streaming", "axum_integration", "gat_handlers"]
998 }))
999}
1000
1001async fn get_system_stats<R, P, S>(
1003 State(state): State<PjsAppState<R, P, S>>,
1004) -> Result<Json<SystemStatsResponse>, PjsError>
1005where
1006 R: StreamRepositoryGat + Send + Sync + 'static,
1007 P: EventPublisherGat + Send + Sync + 'static,
1008 S: StreamStoreGat + Send + Sync + 'static,
1009{
1010 let query = GetSystemStatsQuery {
1011 include_historical: false,
1012 };
1013
1014 let response = <SystemQueryHandler<R> as QueryHandlerGat<GetSystemStatsQuery>>::handle(
1015 &*state.system_handler,
1016 query,
1017 )
1018 .await
1019 .map_err(PjsError::Application)?;
1020
1021 Ok(Json(response))
1022}
1023
1024#[derive(Debug, Deserialize)]
1026pub struct FrameQueryParams {
1027 pub since_sequence: Option<u64>,
1029 pub priority: Option<u8>,
1031 pub limit: Option<usize>,
1033}
1034
1035async fn get_stream_frames<R, P, S>(
1037 State(state): State<PjsAppState<R, P, S>>,
1038 AxumPath((session_id, stream_id)): AxumPath<(String, String)>,
1039 Query(params): Query<FrameQueryParams>,
1040) -> Result<Json<FramesResponse>, PjsError>
1041where
1042 R: StreamRepositoryGat + Send + Sync + 'static,
1043 P: EventPublisherGat + Send + Sync + 'static,
1044 S: StreamStoreGat + Send + Sync + 'static,
1045{
1046 let session_id = SessionId::from_string(&session_id)
1047 .map_err(|_| PjsError::InvalidSessionId(session_id.clone()))?;
1048 let stream_id =
1049 StreamId::from_string(&stream_id).map_err(|_| PjsError::InvalidStreamId(stream_id))?;
1050
1051 let priority_filter = params
1052 .priority
1053 .map(|p| Priority::new(p).map(Into::into))
1054 .transpose()
1055 .map_err(|e: crate::domain::DomainError| PjsError::InvalidPriority(e.to_string()))?;
1056
1057 let query = GetStreamFramesQuery {
1058 session_id: session_id.into(),
1059 stream_id: stream_id.into(),
1060 since_sequence: params.since_sequence,
1061 priority_filter,
1062 limit: params.limit,
1063 };
1064
1065 let response = <StreamQueryHandler<R, S, InMemoryFrameStore> as QueryHandlerGat<
1066 GetStreamFramesQuery,
1067 >>::handle(&*state.stream_query_handler, query)
1068 .await
1069 .map_err(PjsError::Application)?;
1070
1071 Ok(Json(response))
1072}
1073
1074async fn get_session_stats<R, P, S>(
1076 State(state): State<PjsAppState<R, P, S>>,
1077 AxumPath(session_id): AxumPath<String>,
1078) -> Result<Json<SessionStatsResponse>, PjsError>
1079where
1080 R: StreamRepositoryGat + Send + Sync + 'static,
1081 P: EventPublisherGat + Send + Sync + 'static,
1082 S: StreamStoreGat + Send + Sync + 'static,
1083{
1084 let session_id =
1085 SessionId::from_string(&session_id).map_err(|_| PjsError::InvalidSessionId(session_id))?;
1086
1087 let query = GetSessionStatsQuery {
1088 session_id: session_id.into(),
1089 };
1090
1091 let response = <SessionQueryHandler<R> as QueryHandlerGat<GetSessionStatsQuery>>::handle(
1092 &*state.session_query_handler,
1093 query,
1094 )
1095 .await
1096 .map_err(PjsError::Application)?;
1097
1098 Ok(Json(response))
1099}
1100
1101#[derive(Debug, thiserror::Error)]
1124pub enum PjsError {
1125 #[error("Application error: {0}")]
1127 Application(#[from] crate::application::ApplicationError),
1128
1129 #[error("Invalid session ID: {0}")]
1131 InvalidSessionId(String),
1132
1133 #[error("Invalid stream ID: {0}")]
1135 InvalidStreamId(String),
1136
1137 #[error("Invalid priority: {0}")]
1139 InvalidPriority(String),
1140
1141 #[error("HTTP error: {0}")]
1143 HttpError(String),
1144}
1145
1146impl IntoResponse for PjsError {
1147 fn into_response(self) -> Response {
1148 let (status, error_message) = match &self {
1149 PjsError::Application(app_err) => {
1150 use crate::application::ApplicationError;
1151 let status = match app_err {
1152 ApplicationError::NotFound(_) => StatusCode::NOT_FOUND,
1153 ApplicationError::Validation(_) => StatusCode::BAD_REQUEST,
1154 ApplicationError::Authorization(_) => StatusCode::UNAUTHORIZED,
1155 ApplicationError::Concurrency(_) | ApplicationError::Conflict(_) => {
1156 StatusCode::CONFLICT
1157 }
1158 ApplicationError::Domain(_) | ApplicationError::Logic(_) => {
1159 StatusCode::INTERNAL_SERVER_ERROR
1160 }
1161 };
1162 (status, self.to_string())
1163 }
1164 PjsError::InvalidSessionId(_) => (StatusCode::BAD_REQUEST, self.to_string()),
1165 PjsError::InvalidStreamId(_) => (StatusCode::BAD_REQUEST, self.to_string()),
1166 PjsError::InvalidPriority(_) => (StatusCode::BAD_REQUEST, self.to_string()),
1167 PjsError::HttpError(_) => (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()),
1168 };
1169
1170 let body = Json(serde_json::json!({
1171 "error": error_message
1172 }));
1173
1174 (status, body).into_response()
1175 }
1176}
1177
1178#[cfg(test)]
1179mod tests {
1180 use super::*;
1181
1182 #[test]
1185 fn cors_empty_origins_denies_all() {
1186 let config = HttpServerConfig {
1187 allowed_origins: vec![],
1188 };
1189 let result = build_cors_layer(&config);
1191 assert!(
1192 result.is_ok(),
1193 "empty origins should return Ok (deny-all layer)"
1194 );
1195 }
1196
1197 #[test]
1198 fn cors_wildcard_only_is_ok() {
1199 let config = HttpServerConfig {
1200 allowed_origins: vec!["*".to_string()],
1201 };
1202 let result = build_cors_layer(&config);
1203 assert!(result.is_ok(), "wildcard-only should return Ok");
1204 }
1205
1206 #[test]
1207 fn cors_mixed_wildcard_and_explicit_is_err() {
1208 let config = HttpServerConfig {
1209 allowed_origins: vec!["*".to_string(), "http://example.com".to_string()],
1210 };
1211 let result = build_cors_layer(&config);
1212 assert!(
1213 result.is_err(),
1214 "mixing wildcard with explicit origins must fail"
1215 );
1216 let msg = result.unwrap_err().to_string();
1217 assert!(
1218 msg.contains("wildcard"),
1219 "error message should mention wildcard: {msg}"
1220 );
1221 }
1222
1223 #[test]
1224 fn cors_valid_single_origin_is_ok() {
1225 let config = HttpServerConfig {
1226 allowed_origins: vec!["http://example.com".to_string()],
1227 };
1228 assert!(build_cors_layer(&config).is_ok());
1229 }
1230
1231 #[test]
1232 fn cors_valid_multiple_origins_is_ok() {
1233 let config = HttpServerConfig {
1234 allowed_origins: vec![
1235 "https://app.example.com".to_string(),
1236 "https://admin.example.com".to_string(),
1237 ],
1238 };
1239 assert!(build_cors_layer(&config).is_ok());
1240 }
1241
1242 #[test]
1243 fn cors_invalid_origin_string_is_err() {
1244 let config = HttpServerConfig {
1245 allowed_origins: vec!["not a\nvalid header".to_string()],
1247 };
1248 let result = build_cors_layer(&config);
1249 assert!(result.is_err(), "invalid origin string must return Err");
1250 }
1251
1252 #[test]
1253 fn default_config_is_valid() {
1254 assert!(
1257 build_cors_layer(&HttpServerConfig::default()).is_ok(),
1258 "default HttpServerConfig must produce a valid CORS layer"
1259 );
1260 }
1261
1262 use crate::domain::{
1265 aggregates::StreamSession,
1266 entities::Stream,
1267 events::DomainEvent,
1268 ports::{
1269 EventPublisherGat, Pagination, PriorityDistribution, SessionHealthSnapshot,
1270 SessionQueryCriteria, SessionQueryResult, StreamFilter, StreamRepositoryGat,
1271 StreamStatistics, StreamStatus, StreamStoreGat,
1272 },
1273 value_objects::{SessionId, StreamId},
1274 };
1275 use chrono::Utc;
1276 use std::collections::HashMap;
1277
1278 struct MockRepository {
1279 sessions: parking_lot::Mutex<HashMap<SessionId, StreamSession>>,
1280 }
1281
1282 impl MockRepository {
1283 fn new() -> Self {
1284 Self {
1285 sessions: parking_lot::Mutex::new(HashMap::new()),
1286 }
1287 }
1288 }
1289
1290 impl StreamRepositoryGat for MockRepository {
1291 type FindSessionFuture<'a>
1292 = impl std::future::Future<Output = crate::domain::DomainResult<Option<StreamSession>>>
1293 + Send
1294 + 'a
1295 where
1296 Self: 'a;
1297
1298 type SaveSessionFuture<'a>
1299 = impl std::future::Future<Output = crate::domain::DomainResult<()>> + Send + 'a
1300 where
1301 Self: 'a;
1302
1303 type RemoveSessionFuture<'a>
1304 = impl std::future::Future<Output = crate::domain::DomainResult<()>> + Send + 'a
1305 where
1306 Self: 'a;
1307
1308 type FindActiveSessionsFuture<'a>
1309 = impl std::future::Future<Output = crate::domain::DomainResult<Vec<StreamSession>>>
1310 + Send
1311 + 'a
1312 where
1313 Self: 'a;
1314
1315 type FindSessionsByCriteriaFuture<'a>
1316 = impl std::future::Future<Output = crate::domain::DomainResult<SessionQueryResult>>
1317 + Send
1318 + 'a
1319 where
1320 Self: 'a;
1321
1322 type GetSessionHealthFuture<'a>
1323 = impl std::future::Future<Output = crate::domain::DomainResult<SessionHealthSnapshot>>
1324 + Send
1325 + 'a
1326 where
1327 Self: 'a;
1328
1329 type SessionExistsFuture<'a>
1330 = impl std::future::Future<Output = crate::domain::DomainResult<bool>> + Send + 'a
1331 where
1332 Self: 'a;
1333
1334 fn find_session(&self, session_id: SessionId) -> Self::FindSessionFuture<'_> {
1335 async move { Ok(self.sessions.lock().get(&session_id).cloned()) }
1336 }
1337
1338 fn save_session(&self, session: StreamSession) -> Self::SaveSessionFuture<'_> {
1339 async move {
1340 self.sessions.lock().insert(session.id(), session);
1341 Ok(())
1342 }
1343 }
1344
1345 fn remove_session(&self, session_id: SessionId) -> Self::RemoveSessionFuture<'_> {
1346 async move {
1347 self.sessions.lock().remove(&session_id);
1348 Ok(())
1349 }
1350 }
1351
1352 fn find_active_sessions(&self) -> Self::FindActiveSessionsFuture<'_> {
1353 async move { Ok(self.sessions.lock().values().cloned().collect()) }
1354 }
1355
1356 fn find_sessions_by_criteria(
1357 &self,
1358 _criteria: SessionQueryCriteria,
1359 pagination: Pagination,
1360 ) -> Self::FindSessionsByCriteriaFuture<'_> {
1361 async move {
1362 let sessions: Vec<_> = self.sessions.lock().values().cloned().collect();
1363 let total_count = sessions.len();
1364 let paginated: Vec<_> = sessions
1365 .into_iter()
1366 .skip(pagination.offset)
1367 .take(pagination.limit)
1368 .collect();
1369 let has_more = pagination.offset + paginated.len() < total_count;
1370 Ok(SessionQueryResult {
1371 sessions: paginated,
1372 total_count,
1373 has_more,
1374 query_duration_ms: 0,
1375 scan_limit_reached: false,
1376 })
1377 }
1378 }
1379
1380 fn get_session_health(&self, session_id: SessionId) -> Self::GetSessionHealthFuture<'_> {
1381 async move {
1382 Ok(SessionHealthSnapshot {
1383 session_id,
1384 is_healthy: true,
1385 active_streams: 0,
1386 total_frames: 0,
1387 last_activity: Utc::now(),
1388 error_rate: 0.0,
1389 metrics: HashMap::new(),
1390 })
1391 }
1392 }
1393
1394 fn session_exists(&self, session_id: SessionId) -> Self::SessionExistsFuture<'_> {
1395 async move { Ok(self.sessions.lock().contains_key(&session_id)) }
1396 }
1397 }
1398
1399 struct MockEventPublisher;
1400
1401 impl EventPublisherGat for MockEventPublisher {
1402 type PublishFuture<'a>
1403 = impl std::future::Future<Output = crate::domain::DomainResult<()>> + Send + 'a
1404 where
1405 Self: 'a;
1406
1407 type PublishBatchFuture<'a>
1408 = impl std::future::Future<Output = crate::domain::DomainResult<()>> + Send + 'a
1409 where
1410 Self: 'a;
1411
1412 fn publish(&self, _event: DomainEvent) -> Self::PublishFuture<'_> {
1413 async move { Ok(()) }
1414 }
1415
1416 fn publish_batch(&self, _events: Vec<DomainEvent>) -> Self::PublishBatchFuture<'_> {
1417 async move { Ok(()) }
1418 }
1419 }
1420
1421 struct MockStreamStore;
1422
1423 impl StreamStoreGat for MockStreamStore {
1424 type StoreStreamFuture<'a>
1425 = impl std::future::Future<Output = crate::domain::DomainResult<()>> + Send + 'a
1426 where
1427 Self: 'a;
1428
1429 type GetStreamFuture<'a>
1430 = impl std::future::Future<Output = crate::domain::DomainResult<Option<Stream>>>
1431 + Send
1432 + 'a
1433 where
1434 Self: 'a;
1435
1436 type DeleteStreamFuture<'a>
1437 = impl std::future::Future<Output = crate::domain::DomainResult<()>> + Send + 'a
1438 where
1439 Self: 'a;
1440
1441 type ListStreamsForSessionFuture<'a>
1442 =
1443 impl std::future::Future<Output = crate::domain::DomainResult<Vec<Stream>>> + Send + 'a
1444 where
1445 Self: 'a;
1446
1447 type FindStreamsBySessionFuture<'a>
1448 =
1449 impl std::future::Future<Output = crate::domain::DomainResult<Vec<Stream>>> + Send + 'a
1450 where
1451 Self: 'a;
1452
1453 type UpdateStreamStatusFuture<'a>
1454 = impl std::future::Future<Output = crate::domain::DomainResult<()>> + Send + 'a
1455 where
1456 Self: 'a;
1457
1458 type GetStreamStatisticsFuture<'a>
1459 = impl std::future::Future<Output = crate::domain::DomainResult<StreamStatistics>>
1460 + Send
1461 + 'a
1462 where
1463 Self: 'a;
1464
1465 fn store_stream(&self, _stream: Stream) -> Self::StoreStreamFuture<'_> {
1466 async move { Ok(()) }
1467 }
1468
1469 fn get_stream(&self, _stream_id: StreamId) -> Self::GetStreamFuture<'_> {
1470 async move { Ok(None) }
1471 }
1472
1473 fn delete_stream(&self, _stream_id: StreamId) -> Self::DeleteStreamFuture<'_> {
1474 async move { Ok(()) }
1475 }
1476
1477 fn list_streams_for_session(
1478 &self,
1479 _session_id: SessionId,
1480 ) -> Self::ListStreamsForSessionFuture<'_> {
1481 async move { Ok(vec![]) }
1482 }
1483
1484 fn find_streams_by_session(
1485 &self,
1486 _session_id: SessionId,
1487 _filter: StreamFilter,
1488 ) -> Self::FindStreamsBySessionFuture<'_> {
1489 async move { Ok(vec![]) }
1490 }
1491
1492 fn update_stream_status(
1493 &self,
1494 _stream_id: StreamId,
1495 _status: StreamStatus,
1496 ) -> Self::UpdateStreamStatusFuture<'_> {
1497 async move { Ok(()) }
1498 }
1499
1500 fn get_stream_statistics(
1501 &self,
1502 _stream_id: StreamId,
1503 ) -> Self::GetStreamStatisticsFuture<'_> {
1504 async move {
1505 Ok(StreamStatistics {
1506 total_frames: 0,
1507 total_bytes: 0,
1508 priority_distribution: PriorityDistribution::default(),
1509 avg_frame_size: 0.0,
1510 creation_time: Utc::now(),
1511 completion_time: None,
1512 processing_duration: None,
1513 })
1514 }
1515 }
1516 }
1517
1518 #[tokio::test]
1519 async fn test_system_health() {
1520 let response = system_health().await;
1521 let health_data: serde_json::Value = response.0;
1522
1523 assert_eq!(health_data["status"], "healthy");
1524 assert!(!health_data["features"].as_array().unwrap().is_empty());
1525 }
1526
1527 #[tokio::test]
1528 async fn test_app_state_creation() {
1529 let repository = Arc::new(MockRepository::new());
1530 let event_publisher = Arc::new(MockEventPublisher);
1531 let stream_store = Arc::new(MockStreamStore);
1532
1533 let _state = PjsAppState::new(repository, event_publisher, stream_store);
1534 }
1535
1536 #[tokio::test]
1537 async fn test_get_system_stats_returns_real_uptime() {
1538 use crate::application::handlers::QueryHandlerGat;
1539 use crate::application::handlers::query_handlers::SystemQueryHandler;
1540 use crate::application::queries::GetSystemStatsQuery;
1541 use std::time::{Duration, Instant};
1542
1543 let repository = Arc::new(MockRepository::new());
1544 let started_at = Instant::now() - Duration::from_secs(5);
1546 let handler = SystemQueryHandler::with_start_time(repository, started_at);
1547
1548 let query = GetSystemStatsQuery {
1549 include_historical: false,
1550 };
1551 let result = QueryHandlerGat::handle(&handler, query).await.unwrap();
1552
1553 assert!(
1555 result.uptime_seconds >= 5,
1556 "uptime_seconds should be at least 5, got {}",
1557 result.uptime_seconds
1558 );
1559 assert_ne!(
1561 result.uptime_seconds, 3600,
1562 "uptime_seconds must not be the hard-coded placeholder 3600"
1563 );
1564 }
1565
1566 #[cfg(feature = "metrics")]
1567 #[tokio::test]
1568 async fn test_metrics_endpoint_returns_prometheus_format() {
1569 use crate::infrastructure::http::metrics::install_global_recorder;
1570
1571 let handle = install_global_recorder().expect("recorder install should succeed");
1573 let rendered = handle.render();
1574 assert!(
1577 !rendered.contains("{\"error\""),
1578 "rendered metrics should not be a JSON error: {rendered}"
1579 );
1580
1581 let handle2 = install_global_recorder().expect("second call must not fail");
1583 assert_eq!(
1584 handle.render(),
1585 handle2.render(),
1586 "both handles must render the same metrics"
1587 );
1588 }
1589
1590 #[cfg(feature = "metrics")]
1591 #[test]
1592 fn test_metrics_router_has_metrics_route() {
1593 let _router =
1596 create_pjs_router_with_config::<MockRepository, MockEventPublisher, MockStreamStore>(
1597 &HttpServerConfig::default(),
1598 )
1599 .expect("router should build successfully with metrics feature");
1600 }
1601
1602 #[tokio::test]
1603 async fn search_sessions_route_returns_ok() {
1604 use axum::http::Request;
1605 use tower::ServiceExt;
1606
1607 let repository = Arc::new(MockRepository::new());
1608 let event_publisher = Arc::new(MockEventPublisher);
1609 let stream_store = Arc::new(MockStreamStore);
1610 let state = PjsAppState::new(repository, event_publisher, stream_store);
1611
1612 let router =
1613 create_pjs_router_with_config::<MockRepository, MockEventPublisher, MockStreamStore>(
1614 &HttpServerConfig::default(),
1615 )
1616 .expect("router should build")
1617 .with_state(state);
1618
1619 let req = Request::builder()
1620 .uri("/pjs/sessions/search")
1621 .body(axum::body::Body::empty())
1622 .unwrap();
1623
1624 let resp = router.oneshot(req).await.unwrap();
1625 assert_eq!(resp.status(), StatusCode::OK);
1626 }
1627
1628 #[tokio::test]
1636 async fn generate_frames_route_dispatches_command_end_to_end() {
1637 use axum::body::to_bytes;
1638 use axum::http::{Method, Request};
1639 use tower::ServiceExt;
1640
1641 let repository = Arc::new(MockRepository::new());
1642 let event_publisher = Arc::new(MockEventPublisher);
1643 let stream_store = Arc::new(MockStreamStore);
1644 let state = PjsAppState::new(repository, event_publisher, stream_store);
1645
1646 let router =
1647 create_pjs_router_with_config::<MockRepository, MockEventPublisher, MockStreamStore>(
1648 &HttpServerConfig::default(),
1649 )
1650 .expect("router should build")
1651 .with_state(state);
1652
1653 let create_session = Request::builder()
1654 .method(Method::POST)
1655 .uri("/pjs/sessions")
1656 .header(header::CONTENT_TYPE, "application/json")
1657 .body(axum::body::Body::from("{}"))
1658 .unwrap();
1659 let resp = router.clone().oneshot(create_session).await.unwrap();
1660 assert_eq!(resp.status(), StatusCode::OK);
1661 let body = to_bytes(resp.into_body(), usize::MAX).await.unwrap();
1662 let session: serde_json::Value = serde_json::from_slice(&body).unwrap();
1663 let session_id = session["session_id"].as_str().unwrap().to_string();
1664
1665 let create_stream = Request::builder()
1666 .method(Method::POST)
1667 .uri(format!("/pjs/sessions/{session_id}/streams"))
1668 .header(header::CONTENT_TYPE, "application/json")
1669 .body(axum::body::Body::from(
1670 serde_json::json!({ "data": { "items": [1, 2, 3] } }).to_string(),
1671 ))
1672 .unwrap();
1673 let resp = router.clone().oneshot(create_stream).await.unwrap();
1674 assert_eq!(resp.status(), StatusCode::OK);
1675 let body = to_bytes(resp.into_body(), usize::MAX).await.unwrap();
1676 let stream: serde_json::Value = serde_json::from_slice(&body).unwrap();
1677 let stream_id = stream["stream_id"].as_str().unwrap().to_string();
1678
1679 let start = Request::builder()
1680 .method(Method::POST)
1681 .uri(format!(
1682 "/pjs/sessions/{session_id}/streams/{stream_id}/start"
1683 ))
1684 .body(axum::body::Body::empty())
1685 .unwrap();
1686 let resp = router.clone().oneshot(start).await.unwrap();
1687 assert_eq!(resp.status(), StatusCode::OK);
1688
1689 let generate = Request::builder()
1690 .method(Method::POST)
1691 .uri(format!(
1692 "/pjs/sessions/{session_id}/streams/{stream_id}/generate-frames"
1693 ))
1694 .header(header::CONTENT_TYPE, "application/json")
1695 .body(axum::body::Body::from(
1696 serde_json::json!({ "max_frames": 4 }).to_string(),
1697 ))
1698 .unwrap();
1699 let resp = router.oneshot(generate).await.unwrap();
1700 assert_eq!(
1701 resp.status(),
1702 StatusCode::OK,
1703 "POST .../generate-frames must be reachable end-to-end"
1704 );
1705 let body = to_bytes(resp.into_body(), usize::MAX).await.unwrap();
1706 let payload: serde_json::Value = serde_json::from_slice(&body).unwrap();
1707 assert!(payload["frames"].is_array(), "response must carry frames[]");
1708 let frame_count = payload["frame_count"]
1709 .as_u64()
1710 .expect("response must carry numeric frame_count");
1711 assert!(
1712 frame_count > 0,
1713 "extract_patches must yield at least one patch frame for `{{\"items\": [1,2,3]}}` \
1714 — frame_count was {frame_count}"
1715 );
1716 }
1717
1718 #[cfg(all(feature = "compression", not(target_arch = "wasm32")))]
1723 #[tokio::test]
1724 async fn dictionary_endpoint_becomes_reachable_after_training() {
1725 use crate::compression::zstd::N_TRAIN;
1726 use crate::infrastructure::repositories::InMemoryDictionaryStore;
1727 use crate::security::CompressionBombDetector;
1728 use axum::body::to_bytes;
1729 use axum::http::{Method, Request};
1730 use tower::ServiceExt;
1731
1732 let repository = Arc::new(MockRepository::new());
1733 let event_publisher = Arc::new(MockEventPublisher);
1734 let stream_store = Arc::new(MockStreamStore);
1735 let dictionary_store = Arc::new(InMemoryDictionaryStore::new(
1736 Arc::new(CompressionBombDetector::default()),
1737 64 * 1024,
1738 ));
1739 let state = PjsAppState::with_dictionary_store(
1740 repository,
1741 event_publisher,
1742 stream_store,
1743 dictionary_store,
1744 );
1745
1746 let router =
1747 create_pjs_router_with_config::<MockRepository, MockEventPublisher, MockStreamStore>(
1748 &HttpServerConfig::default(),
1749 )
1750 .expect("router should build")
1751 .with_state(state);
1752
1753 let create_session = Request::builder()
1754 .method(Method::POST)
1755 .uri("/pjs/sessions")
1756 .header(header::CONTENT_TYPE, "application/json")
1757 .body(axum::body::Body::from("{}"))
1758 .unwrap();
1759 let resp = router.clone().oneshot(create_session).await.unwrap();
1760 assert_eq!(resp.status(), StatusCode::OK);
1761 let body = to_bytes(resp.into_body(), usize::MAX).await.unwrap();
1762 let session: serde_json::Value = serde_json::from_slice(&body).unwrap();
1763 let session_id = session["session_id"].as_str().unwrap().to_string();
1764
1765 let mut payload = serde_json::Map::new();
1769 for i in 0..(N_TRAIN + 4) {
1770 payload.insert(
1771 format!("field_{i}"),
1772 serde_json::Value::String(format!("value_{i}")),
1773 );
1774 }
1775 let create_stream = Request::builder()
1776 .method(Method::POST)
1777 .uri(format!("/pjs/sessions/{session_id}/streams"))
1778 .header(header::CONTENT_TYPE, "application/json")
1779 .body(axum::body::Body::from(
1780 serde_json::json!({ "data": serde_json::Value::Object(payload) }).to_string(),
1781 ))
1782 .unwrap();
1783 let resp = router.clone().oneshot(create_stream).await.unwrap();
1784 assert_eq!(resp.status(), StatusCode::OK);
1785 let body = to_bytes(resp.into_body(), usize::MAX).await.unwrap();
1786 let stream: serde_json::Value = serde_json::from_slice(&body).unwrap();
1787 let stream_id = stream["stream_id"].as_str().unwrap().to_string();
1788
1789 let start = Request::builder()
1790 .method(Method::POST)
1791 .uri(format!(
1792 "/pjs/sessions/{session_id}/streams/{stream_id}/start"
1793 ))
1794 .body(axum::body::Body::empty())
1795 .unwrap();
1796 let resp = router.clone().oneshot(start).await.unwrap();
1797 assert_eq!(resp.status(), StatusCode::OK);
1798
1799 let dict_before = Request::builder()
1801 .method(Method::GET)
1802 .uri(format!("/pjs/sessions/{session_id}/dictionary"))
1803 .body(axum::body::Body::empty())
1804 .unwrap();
1805 let resp = router.clone().oneshot(dict_before).await.unwrap();
1806 assert_eq!(
1807 resp.status(),
1808 StatusCode::NOT_FOUND,
1809 "dictionary endpoint must be 404 before N_TRAIN samples accumulate"
1810 );
1811
1812 let max_frames = N_TRAIN + 4;
1815 let generate = Request::builder()
1816 .method(Method::POST)
1817 .uri(format!(
1818 "/pjs/sessions/{session_id}/streams/{stream_id}/generate-frames"
1819 ))
1820 .header(header::CONTENT_TYPE, "application/json")
1821 .body(axum::body::Body::from(
1822 serde_json::json!({ "max_frames": max_frames }).to_string(),
1823 ))
1824 .unwrap();
1825 let resp = router.clone().oneshot(generate).await.unwrap();
1826 assert_eq!(resp.status(), StatusCode::OK);
1827 let body = to_bytes(resp.into_body(), usize::MAX).await.unwrap();
1828 let payload: serde_json::Value = serde_json::from_slice(&body).unwrap();
1829 let frame_count = payload["frame_count"].as_u64().unwrap();
1830 assert!(
1831 frame_count >= N_TRAIN as u64,
1832 "single generate-frames call must yield at least N_TRAIN ({}) frames \
1833 so train_if_ready triggers training; got {frame_count}",
1834 N_TRAIN
1835 );
1836
1837 let dict_after = Request::builder()
1839 .method(Method::GET)
1840 .uri(format!("/pjs/sessions/{session_id}/dictionary"))
1841 .body(axum::body::Body::empty())
1842 .unwrap();
1843 let resp = router.oneshot(dict_after).await.unwrap();
1844 assert_eq!(
1845 resp.status(),
1846 StatusCode::OK,
1847 "dictionary endpoint must transition to 200 OK once N_TRAIN samples have been fed"
1848 );
1849 let body = to_bytes(resp.into_body(), usize::MAX).await.unwrap();
1850 assert!(
1851 !body.is_empty(),
1852 "trained dictionary body must be non-empty"
1853 );
1854 }
1855
1856 #[tokio::test]
1860 async fn generate_frames_route_rejects_invalid_priority() {
1861 use axum::http::{Method, Request};
1862 use tower::ServiceExt;
1863
1864 let repository = Arc::new(MockRepository::new());
1865 let event_publisher = Arc::new(MockEventPublisher);
1866 let stream_store = Arc::new(MockStreamStore);
1867 let state = PjsAppState::new(repository, event_publisher, stream_store);
1868
1869 let router =
1870 create_pjs_router_with_config::<MockRepository, MockEventPublisher, MockStreamStore>(
1871 &HttpServerConfig::default(),
1872 )
1873 .expect("router should build")
1874 .with_state(state);
1875
1876 let sid = SessionId::new();
1877 let stream_id = StreamId::new();
1878 let req = Request::builder()
1879 .method(Method::POST)
1880 .uri(format!(
1881 "/pjs/sessions/{sid}/streams/{stream_id}/generate-frames"
1882 ))
1883 .header(header::CONTENT_TYPE, "application/json")
1884 .body(axum::body::Body::from(
1885 serde_json::json!({ "priority_threshold": 0 }).to_string(),
1886 ))
1887 .unwrap();
1888 let resp = router.oneshot(req).await.unwrap();
1889 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1890 }
1891}