Skip to main content

pjson_rs/infrastructure/http/
axum_adapter.rs

1//! Axum HTTP server adapter for PJS streaming
2
3use axum::{
4    Json, Router,
5    extract::{DefaultBodyLimit, Path as AxumPath, Query, State},
6    http::{
7        HeaderValue, Method, StatusCode,
8        header::{self, AUTHORIZATION, CONTENT_TYPE},
9    },
10    middleware,
11    response::{IntoResponse, Response},
12    routing::{get, post},
13};
14use serde::{Deserialize, Serialize};
15use serde_json::Value as JsonValue;
16use std::{sync::Arc, time::Instant};
17use tower_http::{
18    cors::{AllowOrigin, CorsLayer},
19    trace::TraceLayer,
20};
21
22use crate::{
23    application::{
24        commands::*,
25        handlers::{
26            CommandHandlerGat, QueryHandlerGat,
27            command_handlers::SessionCommandHandler,
28            query_handlers::{SessionQueryHandler, StreamQueryHandler, SystemQueryHandler},
29        },
30        queries::*,
31    },
32    domain::{
33        aggregates::stream_session::{SessionConfig, SessionHealth},
34        ports::{
35            DictionaryStore, EventPublisherGat, NoopDictionaryStore, StreamRepositoryGat,
36            StreamStoreGat,
37        },
38        value_objects::{Priority, SessionId, StreamId},
39    },
40    infrastructure::http::middleware::{RateLimitMiddleware, security_middleware},
41};
42
43/// HTTP server configuration.
44///
45/// # Production warning
46///
47/// `HttpServerConfig::default()` returns a configuration suitable for **local development
48/// only** — it allows a single hard-coded origin (`http://localhost:3000`). Production
49/// deployments must construct an explicit `HttpServerConfig` with the actual list of
50/// allowed origins, or pass `vec![]` to deny all cross-origin requests.
51///
52/// Use [`create_pjs_router_with_config`] to apply a non-default configuration.
53// TODO(critic): Mark with #[non_exhaustive] or migrate to builder before 1.0
54// to avoid breaking changes when adding fields like allow_credentials/max_age.
55#[derive(Debug, Clone)]
56pub struct HttpServerConfig {
57    /// List of origins allowed by the CORS layer.
58    ///
59    /// # Matching semantics
60    ///
61    /// Origins are matched against the request's `Origin` header by **case-sensitive byte
62    /// equality**. This is `tower_http::cors::AllowOrigin::list` behavior; it is not the
63    /// case-insensitive scheme/host comparison defined by RFC 6454 §6.
64    ///
65    /// In practice this matches all real browser traffic, because mainstream browsers
66    /// always send lowercase scheme and host. Write your origins in lowercase.
67    ///
68    /// # Special values
69    ///
70    /// - `[]` (empty) — deny all cross-origin requests (fail-closed)
71    /// - `["*"]` — allow any origin (passes through to `tower_http::cors::Any`)
72    /// - Mixing `"*"` with explicit origins is rejected at construction time
73    pub allowed_origins: Vec<String>,
74}
75
76impl Default for HttpServerConfig {
77    /// Local-development default: allows `http://localhost:3000`.
78    ///
79    /// **Do not use this in production.** See the type-level docs.
80    fn default() -> Self {
81        Self {
82            allowed_origins: vec!["http://localhost:3000".to_string()],
83        }
84    }
85}
86
87/// Build a [`CorsLayer`] from an [`HttpServerConfig`].
88///
89/// # Errors
90///
91/// Returns [`PjsError::HttpError`] if:
92/// - `allowed_origins` is a mix of `"*"` and explicit origins
93/// - any origin string fails to parse as a valid `HeaderValue`
94fn build_cors_layer(config: &HttpServerConfig) -> Result<CorsLayer, PjsError> {
95    // We intentionally do NOT call .allow_credentials(true).
96    // PJS does not use cookie-based auth; the Authorization header works without
97    // credentials mode. allow_credentials(true) is incompatible with allow_origin(Any),
98    // which would forbid the `["*"]` config path.
99    let base = CorsLayer::new()
100        .allow_methods([Method::GET, Method::POST])
101        .allow_headers([CONTENT_TYPE, AUTHORIZATION])
102        .max_age(std::time::Duration::from_secs(3600));
103
104    let has_wildcard = config.allowed_origins.iter().any(|o| o == "*");
105    let has_explicit = config.allowed_origins.iter().any(|o| o != "*");
106
107    let layer = match (
108        config.allowed_origins.is_empty(),
109        has_wildcard,
110        has_explicit,
111    ) {
112        (true, _, _) => base.allow_origin(AllowOrigin::list(std::iter::empty::<HeaderValue>())),
113        (_, true, true) => {
114            return Err(PjsError::HttpError(
115                "CORS: wildcard '*' cannot be combined with explicit origins".into(),
116            ));
117        }
118        (_, true, false) => base.allow_origin(tower_http::cors::Any),
119        (_, false, _) => {
120            let origins: Vec<HeaderValue> = config
121                .allowed_origins
122                .iter()
123                .map(|o| {
124                    o.parse::<HeaderValue>()
125                        .map_err(|e| PjsError::HttpError(format!("invalid CORS origin {o:?}: {e}")))
126                })
127                .collect::<Result<_, _>>()?;
128            base.allow_origin(AllowOrigin::list(origins))
129        }
130    };
131    Ok(layer)
132}
133
134/// Axum application state with PJS GAT-based handlers.
135///
136/// The `dictionary_store` field is `pub(crate)` so the dictionary handler can
137/// access it without exposing it as a public API.
138pub struct PjsAppState<R, P, S>
139where
140    R: StreamRepositoryGat + Send + Sync + 'static,
141    P: EventPublisherGat + Send + Sync + 'static,
142    S: StreamStoreGat + Send + Sync + 'static,
143{
144    command_handler: Arc<SessionCommandHandler<R, P>>,
145    session_query_handler: Arc<SessionQueryHandler<R>>,
146    stream_query_handler: Arc<StreamQueryHandler<R, S>>,
147    system_handler: Arc<SystemQueryHandler<R>>,
148    pub(crate) dictionary_store: Arc<dyn DictionaryStore>,
149}
150
151impl<R, P, S> Clone for PjsAppState<R, P, S>
152where
153    R: StreamRepositoryGat + Send + Sync + 'static,
154    P: EventPublisherGat + Send + Sync + 'static,
155    S: StreamStoreGat + Send + Sync + 'static,
156{
157    fn clone(&self) -> Self {
158        Self {
159            command_handler: self.command_handler.clone(),
160            session_query_handler: self.session_query_handler.clone(),
161            stream_query_handler: self.stream_query_handler.clone(),
162            system_handler: self.system_handler.clone(),
163            dictionary_store: self.dictionary_store.clone(),
164        }
165    }
166}
167
168impl<R, P, S> PjsAppState<R, P, S>
169where
170    R: StreamRepositoryGat + Send + Sync + 'static,
171    P: EventPublisherGat + Send + Sync + 'static,
172    S: StreamStoreGat + Send + Sync + 'static,
173{
174    /// Create a new application state with default [`NoopDictionaryStore`].
175    ///
176    /// The `/pjs/sessions/{id}/dictionary` endpoint will return 404 until
177    /// you upgrade to [`PjsAppState::with_dictionary_store`] with a concrete
178    /// implementation such as [`crate::infrastructure::repositories::InMemoryDictionaryStore`].
179    ///
180    /// Records the current instant as the process start time for uptime reporting.
181    pub fn new(repository: Arc<R>, event_publisher: Arc<P>, stream_store: Arc<S>) -> Self {
182        Self::with_dictionary_store(
183            repository,
184            event_publisher,
185            stream_store,
186            Arc::new(NoopDictionaryStore),
187        )
188    }
189
190    /// Create a new application state with a custom [`DictionaryStore`].
191    ///
192    /// Pass `Arc::new(InMemoryDictionaryStore::new(...))` to enable end-to-end
193    /// dictionary training and serving.
194    pub fn with_dictionary_store(
195        repository: Arc<R>,
196        event_publisher: Arc<P>,
197        stream_store: Arc<S>,
198        dictionary_store: Arc<dyn DictionaryStore>,
199    ) -> Self {
200        let started_at = Instant::now();
201        Self {
202            command_handler: Arc::new(SessionCommandHandler::new(
203                repository.clone(),
204                event_publisher,
205            )),
206            session_query_handler: Arc::new(SessionQueryHandler::new(repository.clone())),
207            stream_query_handler: Arc::new(StreamQueryHandler::new(
208                repository.clone(),
209                stream_store,
210            )),
211            system_handler: Arc::new(SystemQueryHandler::with_start_time(repository, started_at)),
212            dictionary_store,
213        }
214    }
215}
216
217/// Request to create a new streaming session
218#[derive(Debug, Deserialize)]
219pub struct CreateSessionRequest {
220    pub max_concurrent_streams: Option<usize>,
221    pub timeout_seconds: Option<u64>,
222    pub client_info: Option<String>,
223}
224
225/// Response for session creation
226#[derive(Debug, Serialize)]
227pub struct CreateSessionResponse {
228    pub session_id: String,
229    pub expires_at: chrono::DateTime<chrono::Utc>,
230}
231
232/// Request to start streaming data
233#[derive(Debug, Deserialize)]
234pub struct StartStreamRequest {
235    pub data: JsonValue,
236    pub priority_threshold: Option<u8>,
237    pub max_frames: Option<usize>,
238}
239
240/// Stream response parameters
241#[derive(Debug, Deserialize)]
242pub struct StreamParams {
243    pub session_id: String,
244    pub priority: Option<u8>,
245    pub format: Option<String>,
246}
247
248/// Session health response
249#[derive(Debug, Serialize)]
250pub struct SessionHealthResponse {
251    pub is_healthy: bool,
252    pub active_streams: usize,
253    pub failed_streams: usize,
254    pub is_expired: bool,
255    pub uptime_seconds: i64,
256}
257
258impl From<SessionHealth> for SessionHealthResponse {
259    fn from(health: SessionHealth) -> Self {
260        Self {
261            is_healthy: health.is_healthy,
262            active_streams: health.active_streams,
263            failed_streams: health.failed_streams,
264            is_expired: health.is_expired,
265            uptime_seconds: health.uptime_seconds,
266        }
267    }
268}
269
270/// Create PJS-enabled Axum router with the default CORS configuration.
271///
272/// Uses [`HttpServerConfig::default`] which allows `http://localhost:3000`.
273///
274/// # Security Note
275///
276/// This is suitable for local development only. For production, use
277/// [`create_pjs_router_with_config`] with an explicit [`HttpServerConfig`].
278///
279/// TODO: Implement authentication strategy before production deployment.
280/// Options: API keys, JWT tokens, OAuth2/OIDC
281pub fn create_pjs_router<R, P, S>() -> Router<PjsAppState<R, P, S>>
282where
283    R: StreamRepositoryGat + Send + Sync + 'static,
284    P: EventPublisherGat + Send + Sync + 'static,
285    S: StreamStoreGat + Send + Sync + 'static,
286{
287    create_pjs_router_with_config::<R, P, S>(&HttpServerConfig::default())
288        .expect("default HttpServerConfig must always produce a valid CORS layer")
289}
290
291/// Create PJS-enabled Axum router with a custom [`HttpServerConfig`].
292///
293/// # Errors
294///
295/// Returns [`PjsError::HttpError`] if `config` contains invalid CORS origins
296/// (see [`build_cors_layer`] for the full list of failure conditions).
297///
298/// # Examples
299///
300/// ```rust,ignore
301/// use pjson_rs::infrastructure::http::{HttpServerConfig, create_pjs_router_with_config};
302///
303/// let config = HttpServerConfig {
304///     allowed_origins: vec!["https://app.example.com".to_string()],
305/// };
306/// let router = create_pjs_router_with_config::<R, P, S>(&config)?;
307/// ```
308pub fn create_pjs_router_with_config<R, P, S>(
309    config: &HttpServerConfig,
310) -> Result<Router<PjsAppState<R, P, S>>, PjsError>
311where
312    R: StreamRepositoryGat + Send + Sync + 'static,
313    P: EventPublisherGat + Send + Sync + 'static,
314    S: StreamStoreGat + Send + Sync + 'static,
315{
316    let all_routes = public_routes::<R, P, S>().merge(protected_routes::<R, P, S>());
317    apply_common_layers(all_routes, config)
318}
319
320/// Create PJS-enabled Axum router with rate limiting and the default CORS configuration.
321///
322/// Adds rate limiting middleware to protect against DoS attacks.
323/// Default: 100 requests per minute per IP address.
324///
325/// Uses [`HttpServerConfig::default`] which allows `http://localhost:3000`.
326/// For production, use [`create_pjs_router_with_rate_limit_and_config`].
327///
328/// # Security Note
329///
330/// Rate limiting is applied globally to all endpoints.
331/// Returns 429 Too Many Requests with Retry-After header when limit exceeded.
332/// Adds X-RateLimit-* headers per RFC 6585.
333pub fn create_pjs_router_with_rate_limit<R, P, S>(
334    rate_limit_middleware: RateLimitMiddleware,
335) -> Router<PjsAppState<R, P, S>>
336where
337    R: StreamRepositoryGat + Send + Sync + 'static,
338    P: EventPublisherGat + Send + Sync + 'static,
339    S: StreamStoreGat + Send + Sync + 'static,
340{
341    create_pjs_router_with_rate_limit_and_config::<R, P, S>(
342        &HttpServerConfig::default(),
343        rate_limit_middleware,
344    )
345    .expect("default HttpServerConfig must always produce a valid CORS layer")
346}
347
348/// Create PJS-enabled Axum router with rate limiting and a custom [`HttpServerConfig`].
349///
350/// # Errors
351///
352/// Returns [`PjsError::HttpError`] if `config` contains invalid CORS origins.
353pub fn create_pjs_router_with_rate_limit_and_config<R, P, S>(
354    config: &HttpServerConfig,
355    rate_limit_middleware: RateLimitMiddleware,
356) -> Result<Router<PjsAppState<R, P, S>>, PjsError>
357where
358    R: StreamRepositoryGat + Send + Sync + 'static,
359    P: EventPublisherGat + Send + Sync + 'static,
360    S: StreamStoreGat + Send + Sync + 'static,
361{
362    let all_routes = public_routes::<R, P, S>()
363        .merge(protected_routes::<R, P, S>())
364        .layer(rate_limit_middleware);
365    apply_common_layers(all_routes, config)
366}
367
368/// Create PJS-enabled Axum router with API key authentication and a custom [`HttpServerConfig`].
369///
370/// The health endpoint (`/pjs/health`) is **not** protected by auth — it lives in a
371/// separate public sub-router that is merged without the auth layer. All other routes
372/// require a valid API key.
373///
374/// # Errors
375///
376/// Returns [`PjsError::HttpError`] if `config` contains invalid CORS origins.
377///
378/// # Examples
379///
380/// ```rust,ignore
381/// use pjson_rs::infrastructure::http::{
382///     HttpServerConfig, auth::{ApiKeyConfig, ApiKeyAuthLayer},
383///     create_pjs_router_with_auth,
384/// };
385///
386/// let api_config = ApiKeyConfig::new(&["my-api-key"])?;
387/// let auth_layer = ApiKeyAuthLayer::new(api_config);
388/// let config = HttpServerConfig::default();
389/// let router = create_pjs_router_with_auth::<R, P, S>(&config, auth_layer)?;
390/// ```
391#[cfg(feature = "http-server")]
392pub fn create_pjs_router_with_auth<R, P, S>(
393    config: &HttpServerConfig,
394    auth: crate::infrastructure::http::auth::ApiKeyAuthLayer,
395) -> Result<Router<PjsAppState<R, P, S>>, PjsError>
396where
397    R: StreamRepositoryGat + Send + Sync + 'static,
398    P: EventPublisherGat + Send + Sync + 'static,
399    S: StreamStoreGat + Send + Sync + 'static,
400{
401    // Auth wraps only the protected sub-router. Public routes (health, metrics) are
402    // merged separately so there is zero path-string comparison logic in the auth layer.
403    let protected = protected_routes::<R, P, S>().layer(auth);
404    let merged = public_routes::<R, P, S>().merge(protected);
405    apply_common_layers(merged, config)
406}
407
408/// Create PJS-enabled Axum router with both rate limiting and API key authentication.
409///
410/// Layer ordering (Tower applies layers outer-to-inner):
411/// ```text
412/// rate_limit  ← outermost: wraps both public and protected sub-routers
413///   public_routes (no auth)
414///   protected_routes
415///     auth    ← inner: wraps only protected routes; unauthenticated requests are
416///               rejected before consuming rate-limit quota for protected paths
417///     handlers
418/// ```
419///
420/// Rate limiting is applied to **both** the public and protected sub-routers (DoS
421/// protection for `/pjs/health` is still desirable).
422///
423/// # Errors
424///
425/// Returns [`PjsError::HttpError`] if `config` contains invalid CORS origins.
426#[cfg(feature = "http-server")]
427pub fn create_pjs_router_with_rate_limit_and_auth<R, P, S>(
428    config: &HttpServerConfig,
429    rate_limit: RateLimitMiddleware,
430    auth: crate::infrastructure::http::auth::ApiKeyAuthLayer,
431) -> Result<Router<PjsAppState<R, P, S>>, PjsError>
432where
433    R: StreamRepositoryGat + Send + Sync + 'static,
434    P: EventPublisherGat + Send + Sync + 'static,
435    S: StreamStoreGat + Send + Sync + 'static,
436{
437    // Auth runs before rate limit on the protected sub-router so that unauthenticated
438    // traffic does not consume rate-limit quota and cannot starve legitimate clients.
439    let protected = protected_routes::<R, P, S>().layer(auth);
440    let merged = public_routes::<R, P, S>()
441        .merge(protected)
442        .layer(rate_limit);
443    apply_common_layers(merged, config)
444}
445
446// ── Route table helpers ────────────────────────────────────────────────────────────
447
448/// Routes that are always public — no authentication applied.
449///
450/// Currently: `/pjs/health` and (when the `metrics` feature is enabled) `/metrics`.
451fn public_routes<R, P, S>() -> Router<PjsAppState<R, P, S>>
452where
453    R: StreamRepositoryGat + Send + Sync + 'static,
454    P: EventPublisherGat + Send + Sync + 'static,
455    S: StreamStoreGat + Send + Sync + 'static,
456{
457    let router = Router::new().route("/pjs/health", get(system_health));
458
459    #[cfg(feature = "metrics")]
460    let router = router.route(
461        "/metrics",
462        get(crate::infrastructure::http::metrics::metrics_handler),
463    );
464
465    router
466}
467
468/// Routes that require authentication when an auth layer is applied.
469fn protected_routes<R, P, S>() -> Router<PjsAppState<R, P, S>>
470where
471    R: StreamRepositoryGat + Send + Sync + 'static,
472    P: EventPublisherGat + Send + Sync + 'static,
473    S: StreamStoreGat + Send + Sync + 'static,
474{
475    let router = Router::new()
476        .route("/pjs/sessions", post(create_session::<R, P, S>))
477        .route("/pjs/sessions/{session_id}", get(get_session::<R, P, S>))
478        .route(
479            "/pjs/sessions/{session_id}/health",
480            get(session_health::<R, P, S>),
481        )
482        .route(
483            "/pjs/sessions/{session_id}/stats",
484            get(get_session_stats::<R, P, S>),
485        )
486        .route(
487            "/pjs/sessions/{session_id}/streams",
488            post(create_stream::<R, P, S>),
489        )
490        .route(
491            "/pjs/sessions/{session_id}/streams/{stream_id}/start",
492            post(start_stream::<R, P, S>),
493        )
494        .route(
495            "/pjs/sessions/{session_id}/streams/{stream_id}",
496            get(get_stream::<R, P, S>),
497        )
498        .route(
499            "/pjs/sessions/{session_id}/streams/{stream_id}/frames",
500            get(get_stream_frames::<R, P, S>),
501        )
502        .route("/pjs/sessions/search", get(search_sessions::<R, P, S>))
503        .route("/pjs/sessions", get(list_sessions::<R, P, S>))
504        .route("/pjs/stats", get(get_system_stats::<R, P, S>));
505
506    #[cfg(all(feature = "compression", not(target_arch = "wasm32")))]
507    let router = router.route(
508        "/pjs/sessions/{session_id}/dictionary",
509        get(crate::infrastructure::http::dictionary::get_session_dictionary::<R, P, S>),
510    );
511
512    router
513}
514
515/// Apply the cross-cutting middleware stack shared by all router variants.
516///
517/// Order (Tower applies outer-to-inner):
518/// ```text
519/// security_middleware   ← security headers
520/// DefaultBodyLimit      ← body size guard
521/// CorsLayer             ← CORS (outside auth, so preflight is answered before auth)
522/// TraceLayer            ← distributed tracing
523/// ```
524fn apply_common_layers<R, P, S>(
525    router: Router<PjsAppState<R, P, S>>,
526    config: &HttpServerConfig,
527) -> Result<Router<PjsAppState<R, P, S>>, PjsError>
528where
529    R: StreamRepositoryGat + Send + Sync + 'static,
530    P: EventPublisherGat + Send + Sync + 'static,
531    S: StreamStoreGat + Send + Sync + 'static,
532{
533    let cors = build_cors_layer(config)?;
534    Ok(router
535        .layer(middleware::from_fn(security_middleware))
536        .layer(DefaultBodyLimit::max(10 * 1024 * 1024))
537        .layer(cors)
538        .layer(TraceLayer::new_for_http()))
539}
540
541/// Create a new streaming session
542async fn create_session<R, P, S>(
543    State(state): State<PjsAppState<R, P, S>>,
544    headers: axum::http::HeaderMap,
545    Json(request): Json<CreateSessionRequest>,
546) -> Result<Json<CreateSessionResponse>, PjsError>
547where
548    R: StreamRepositoryGat + Send + Sync + 'static,
549    P: EventPublisherGat + Send + Sync + 'static,
550    S: StreamStoreGat + Send + Sync + 'static,
551{
552    let config = SessionConfig {
553        max_concurrent_streams: request.max_concurrent_streams.unwrap_or(10),
554        session_timeout_seconds: request.timeout_seconds.unwrap_or(3600),
555        default_stream_config: Default::default(),
556        enable_compression: true,
557        metadata: Default::default(),
558    };
559
560    let user_agent = headers
561        .get(header::USER_AGENT)
562        .and_then(|h| h.to_str().ok())
563        .map(String::from);
564
565    let command = CreateSessionCommand {
566        config,
567        client_info: request.client_info,
568        user_agent,
569        ip_address: None,
570    };
571
572    let session_id: SessionId = CommandHandlerGat::handle(&*state.command_handler, command)
573        .await
574        .map_err(PjsError::Application)?;
575
576    let expires_at = chrono::Utc::now()
577        + chrono::Duration::seconds(request.timeout_seconds.unwrap_or(3600) as i64);
578
579    Ok(Json(CreateSessionResponse {
580        session_id: session_id.to_string(),
581        expires_at,
582    }))
583}
584
585/// Get session information
586async fn get_session<R, P, S>(
587    State(state): State<PjsAppState<R, P, S>>,
588    AxumPath(session_id): AxumPath<String>,
589) -> Result<Json<SessionResponse>, PjsError>
590where
591    R: StreamRepositoryGat + Send + Sync + 'static,
592    P: EventPublisherGat + Send + Sync + 'static,
593    S: StreamStoreGat + Send + Sync + 'static,
594{
595    let session_id =
596        SessionId::from_string(&session_id).map_err(|_| PjsError::InvalidSessionId(session_id))?;
597
598    let query = GetSessionQuery {
599        session_id: session_id.into(),
600    };
601
602    let response = <SessionQueryHandler<R> as QueryHandlerGat<GetSessionQuery>>::handle(
603        &*state.session_query_handler,
604        query,
605    )
606    .await
607    .map_err(PjsError::Application)?;
608
609    Ok(Json(response))
610}
611
612/// Get session health status
613async fn session_health<R, P, S>(
614    State(state): State<PjsAppState<R, P, S>>,
615    AxumPath(session_id): AxumPath<String>,
616) -> Result<Json<SessionHealthResponse>, PjsError>
617where
618    R: StreamRepositoryGat + Send + Sync + 'static,
619    P: EventPublisherGat + Send + Sync + 'static,
620    S: StreamStoreGat + Send + Sync + 'static,
621{
622    let session_id =
623        SessionId::from_string(&session_id).map_err(|_| PjsError::InvalidSessionId(session_id))?;
624
625    let query = GetSessionHealthQuery {
626        session_id: session_id.into(),
627    };
628
629    let response = <SessionQueryHandler<R> as QueryHandlerGat<GetSessionHealthQuery>>::handle(
630        &*state.session_query_handler,
631        query,
632    )
633    .await
634    .map_err(PjsError::Application)?;
635
636    Ok(Json(SessionHealthResponse::from(response.health)))
637}
638
639/// Create a new stream within a session
640///
641/// TODO(CQ-007): Optimize double JSON processing
642/// Current: serde_json::Value -> JsonDataDto -> JsonData
643/// Optimization: Direct JsonData deserialization or use sonic-rs
644async fn create_stream<R, P, S>(
645    State(state): State<PjsAppState<R, P, S>>,
646    AxumPath(session_id): AxumPath<String>,
647    Json(request): Json<StartStreamRequest>,
648) -> Result<Json<serde_json::Value>, PjsError>
649where
650    R: StreamRepositoryGat + Send + Sync + 'static,
651    P: EventPublisherGat + Send + Sync + 'static,
652    S: StreamStoreGat + Send + Sync + 'static,
653{
654    let session_id =
655        SessionId::from_string(&session_id).map_err(|_| PjsError::InvalidSessionId(session_id))?;
656
657    let command = CreateStreamCommand {
658        session_id: session_id.into(),
659        source_data: request.data,
660        config: None,
661    };
662
663    let stream_id: StreamId = CommandHandlerGat::handle(&*state.command_handler, command)
664        .await
665        .map_err(PjsError::Application)?;
666
667    Ok(Json(serde_json::json!({
668        "stream_id": stream_id.to_string(),
669        "status": "created"
670    })))
671}
672
673/// Start streaming for a specific stream
674async fn start_stream<R, P, S>(
675    State(state): State<PjsAppState<R, P, S>>,
676    AxumPath((session_id, stream_id)): AxumPath<(String, String)>,
677) -> Result<Json<serde_json::Value>, PjsError>
678where
679    R: StreamRepositoryGat + Send + Sync + 'static,
680    P: EventPublisherGat + Send + Sync + 'static,
681    S: StreamStoreGat + Send + Sync + 'static,
682{
683    let session_id = SessionId::from_string(&session_id)
684        .map_err(|_| PjsError::InvalidSessionId(session_id.clone()))?;
685    let stream_id =
686        StreamId::from_string(&stream_id).map_err(|_| PjsError::InvalidStreamId(stream_id))?;
687
688    let command = StartStreamCommand {
689        session_id: session_id.into(),
690        stream_id: stream_id.into(),
691    };
692
693    <SessionCommandHandler<R, P> as CommandHandlerGat<StartStreamCommand>>::handle(
694        &*state.command_handler,
695        command,
696    )
697    .await
698    .map_err(PjsError::Application)?;
699
700    Ok(Json(serde_json::json!({
701        "stream_id": stream_id.to_string(),
702        "status": "started"
703    })))
704}
705
706/// Get stream information
707async fn get_stream<R, P, S>(
708    State(state): State<PjsAppState<R, P, S>>,
709    AxumPath((session_id, stream_id)): AxumPath<(String, String)>,
710) -> Result<Json<StreamResponse>, PjsError>
711where
712    R: StreamRepositoryGat + Send + Sync + 'static,
713    P: EventPublisherGat + Send + Sync + 'static,
714    S: StreamStoreGat + Send + Sync + 'static,
715{
716    let session_id = SessionId::from_string(&session_id)
717        .map_err(|_| PjsError::InvalidSessionId(session_id.clone()))?;
718    let stream_id =
719        StreamId::from_string(&stream_id).map_err(|_| PjsError::InvalidStreamId(stream_id))?;
720
721    let query = GetStreamQuery {
722        session_id: session_id.into(),
723        stream_id: stream_id.into(),
724    };
725
726    let response = <StreamQueryHandler<R, S> as QueryHandlerGat<GetStreamQuery>>::handle(
727        &*state.stream_query_handler,
728        query,
729    )
730    .await
731    .map_err(PjsError::Application)?;
732
733    Ok(Json(response))
734}
735
736/// List active sessions
737async fn list_sessions<R, P, S>(
738    State(state): State<PjsAppState<R, P, S>>,
739    Query(params): Query<PaginationParams>,
740) -> Result<Json<SessionsResponse>, PjsError>
741where
742    R: StreamRepositoryGat + Send + Sync + 'static,
743    P: EventPublisherGat + Send + Sync + 'static,
744    S: StreamStoreGat + Send + Sync + 'static,
745{
746    let query = GetActiveSessionsQuery {
747        limit: params.limit,
748        offset: params.offset,
749    };
750
751    let response = <SessionQueryHandler<R> as QueryHandlerGat<GetActiveSessionsQuery>>::handle(
752        &*state.session_query_handler,
753        query,
754    )
755    .await
756    .map_err(PjsError::Application)?;
757
758    Ok(Json(response))
759}
760
761/// Search sessions with filters and sorting.
762async fn search_sessions<R, P, S>(
763    State(state): State<PjsAppState<R, P, S>>,
764    Query(params): Query<SearchSessionsParams>,
765) -> Result<Json<SessionsResponse>, PjsError>
766where
767    R: StreamRepositoryGat + Send + Sync + 'static,
768    P: EventPublisherGat + Send + Sync + 'static,
769    S: StreamStoreGat + Send + Sync + 'static,
770{
771    let sort_by = params.sort_by.as_deref().and_then(|s| match s {
772        "created_at" => Some(SessionSortField::CreatedAt),
773        "updated_at" => Some(SessionSortField::UpdatedAt),
774        "stream_count" => Some(SessionSortField::StreamCount),
775        "total_bytes" => Some(SessionSortField::TotalBytes),
776        _ => None,
777    });
778    let sort_order = params.sort_order.as_deref().and_then(|s| match s {
779        "ascending" | "asc" => Some(SortOrder::Ascending),
780        "descending" | "desc" => Some(SortOrder::Descending),
781        _ => None,
782    });
783    let query = SearchSessionsQuery {
784        filters: SessionFilters {
785            state: params.state,
786            created_after: None,
787            created_before: None,
788            client_info: None,
789            has_active_streams: None,
790        },
791        sort_by,
792        sort_order,
793        limit: params.limit,
794        offset: params.offset,
795    };
796    let response = <SessionQueryHandler<R> as QueryHandlerGat<SearchSessionsQuery>>::handle(
797        &*state.session_query_handler,
798        query,
799    )
800    .await
801    .map_err(PjsError::Application)?;
802    Ok(Json(response))
803}
804
805/// Pagination parameters
806#[derive(Debug, Deserialize)]
807pub struct PaginationParams {
808    pub limit: Option<usize>,
809    pub offset: Option<usize>,
810}
811
812/// Query parameters for session search endpoint.
813#[derive(Debug, Deserialize)]
814pub struct SearchSessionsParams {
815    pub state: Option<String>,
816    pub sort_by: Option<String>,
817    pub sort_order: Option<String>,
818    pub limit: Option<usize>,
819    pub offset: Option<usize>,
820}
821
822/// System health endpoint
823async fn system_health() -> Json<serde_json::Value> {
824    Json(serde_json::json!({
825        "status": "healthy",
826        "version": env!("CARGO_PKG_VERSION"),
827        "features": ["pjs_streaming", "axum_integration", "gat_handlers"]
828    }))
829}
830
831/// Real-time system statistics: uptime, session counts, frame throughput.
832async fn get_system_stats<R, P, S>(
833    State(state): State<PjsAppState<R, P, S>>,
834) -> Result<Json<SystemStatsResponse>, PjsError>
835where
836    R: StreamRepositoryGat + Send + Sync + 'static,
837    P: EventPublisherGat + Send + Sync + 'static,
838    S: StreamStoreGat + Send + Sync + 'static,
839{
840    let query = GetSystemStatsQuery {
841        include_historical: false,
842    };
843
844    let response = <SystemQueryHandler<R> as QueryHandlerGat<GetSystemStatsQuery>>::handle(
845        &*state.system_handler,
846        query,
847    )
848    .await
849    .map_err(PjsError::Application)?;
850
851    Ok(Json(response))
852}
853
854/// Query parameters for frame listing
855#[derive(Debug, Deserialize)]
856pub struct FrameQueryParams {
857    pub since_sequence: Option<u64>,
858    pub priority: Option<u8>,
859    pub limit: Option<usize>,
860}
861
862/// Get frames for a stream (currently returns empty; no persistent frame store exists yet)
863async fn get_stream_frames<R, P, S>(
864    State(state): State<PjsAppState<R, P, S>>,
865    AxumPath((session_id, stream_id)): AxumPath<(String, String)>,
866    Query(params): Query<FrameQueryParams>,
867) -> Result<Json<FramesResponse>, PjsError>
868where
869    R: StreamRepositoryGat + Send + Sync + 'static,
870    P: EventPublisherGat + Send + Sync + 'static,
871    S: StreamStoreGat + Send + Sync + 'static,
872{
873    let session_id = SessionId::from_string(&session_id)
874        .map_err(|_| PjsError::InvalidSessionId(session_id.clone()))?;
875    let stream_id =
876        StreamId::from_string(&stream_id).map_err(|_| PjsError::InvalidStreamId(stream_id))?;
877
878    let priority_filter = params
879        .priority
880        .map(|p| Priority::new(p).map(Into::into))
881        .transpose()
882        .map_err(|e: crate::domain::DomainError| PjsError::InvalidPriority(e.to_string()))?;
883
884    let query = GetStreamFramesQuery {
885        session_id: session_id.into(),
886        stream_id: stream_id.into(),
887        since_sequence: params.since_sequence,
888        priority_filter,
889        limit: params.limit,
890    };
891
892    let response = <StreamQueryHandler<R, S> as QueryHandlerGat<GetStreamFramesQuery>>::handle(
893        &*state.stream_query_handler,
894        query,
895    )
896    .await
897    .map_err(PjsError::Application)?;
898
899    Ok(Json(response))
900}
901
902/// Get statistics for a session
903async fn get_session_stats<R, P, S>(
904    State(state): State<PjsAppState<R, P, S>>,
905    AxumPath(session_id): AxumPath<String>,
906) -> Result<Json<SessionStatsResponse>, PjsError>
907where
908    R: StreamRepositoryGat + Send + Sync + 'static,
909    P: EventPublisherGat + Send + Sync + 'static,
910    S: StreamStoreGat + Send + Sync + 'static,
911{
912    let session_id =
913        SessionId::from_string(&session_id).map_err(|_| PjsError::InvalidSessionId(session_id))?;
914
915    let query = GetSessionStatsQuery {
916        session_id: session_id.into(),
917    };
918
919    let response = <SessionQueryHandler<R> as QueryHandlerGat<GetSessionStatsQuery>>::handle(
920        &*state.session_query_handler,
921        query,
922    )
923    .await
924    .map_err(PjsError::Application)?;
925
926    Ok(Json(response))
927}
928
929// TODO(CQ-004): Implement HTTP rate limiting middleware
930//
931// Recommended implementation:
932// - Add Arc<WebSocketRateLimiter> to PjsAppState
933// - Use 100 requests/minute per IP with burst allowance
934// - Extract IP from ConnectInfo<SocketAddr>
935// - Return 429 Too Many Requests on limit exceeded
936//
937// Example:
938// ```ignore
939// async fn rate_limit_middleware(
940//     State(limiter): State<Arc<WebSocketRateLimiter>>,
941//     ConnectInfo(addr): ConnectInfo<SocketAddr>,
942//     req: Request,
943//     next: Next,
944// ) -> Result<Response, StatusCode> {
945//     limiter.check_request(addr.ip())
946//         .map_err(|_| StatusCode::TOO_MANY_REQUESTS)?;
947//     Ok(next.run(req).await)
948// }
949// ```
950/// PJS-specific errors for HTTP endpoints
951#[derive(Debug, thiserror::Error)]
952pub enum PjsError {
953    #[error("Application error: {0}")]
954    Application(#[from] crate::application::ApplicationError),
955
956    #[error("Invalid session ID: {0}")]
957    InvalidSessionId(String),
958
959    #[error("Invalid stream ID: {0}")]
960    InvalidStreamId(String),
961
962    #[error("Invalid priority: {0}")]
963    InvalidPriority(String),
964
965    #[error("HTTP error: {0}")]
966    HttpError(String),
967}
968
969impl IntoResponse for PjsError {
970    fn into_response(self) -> Response {
971        let (status, error_message) = match &self {
972            PjsError::Application(app_err) => {
973                use crate::application::ApplicationError;
974                let status = match app_err {
975                    ApplicationError::NotFound(_) => StatusCode::NOT_FOUND,
976                    ApplicationError::Validation(_) => StatusCode::BAD_REQUEST,
977                    ApplicationError::Authorization(_) => StatusCode::UNAUTHORIZED,
978                    ApplicationError::Concurrency(_) | ApplicationError::Conflict(_) => {
979                        StatusCode::CONFLICT
980                    }
981                    ApplicationError::Domain(_) | ApplicationError::Logic(_) => {
982                        StatusCode::INTERNAL_SERVER_ERROR
983                    }
984                };
985                (status, self.to_string())
986            }
987            PjsError::InvalidSessionId(_) => (StatusCode::BAD_REQUEST, self.to_string()),
988            PjsError::InvalidStreamId(_) => (StatusCode::BAD_REQUEST, self.to_string()),
989            PjsError::InvalidPriority(_) => (StatusCode::BAD_REQUEST, self.to_string()),
990            PjsError::HttpError(_) => (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()),
991        };
992
993        let body = Json(serde_json::json!({
994            "error": error_message
995        }));
996
997        (status, body).into_response()
998    }
999}
1000
1001#[cfg(test)]
1002mod tests {
1003    use super::*;
1004
1005    // --- build_cors_layer unit tests ---
1006
1007    #[test]
1008    fn cors_empty_origins_denies_all() {
1009        let config = HttpServerConfig {
1010            allowed_origins: vec![],
1011        };
1012        // Empty list must succeed (returns a layer that denies all origins).
1013        let result = build_cors_layer(&config);
1014        assert!(
1015            result.is_ok(),
1016            "empty origins should return Ok (deny-all layer)"
1017        );
1018    }
1019
1020    #[test]
1021    fn cors_wildcard_only_is_ok() {
1022        let config = HttpServerConfig {
1023            allowed_origins: vec!["*".to_string()],
1024        };
1025        let result = build_cors_layer(&config);
1026        assert!(result.is_ok(), "wildcard-only should return Ok");
1027    }
1028
1029    #[test]
1030    fn cors_mixed_wildcard_and_explicit_is_err() {
1031        let config = HttpServerConfig {
1032            allowed_origins: vec!["*".to_string(), "http://example.com".to_string()],
1033        };
1034        let result = build_cors_layer(&config);
1035        assert!(
1036            result.is_err(),
1037            "mixing wildcard with explicit origins must fail"
1038        );
1039        let msg = result.unwrap_err().to_string();
1040        assert!(
1041            msg.contains("wildcard"),
1042            "error message should mention wildcard: {msg}"
1043        );
1044    }
1045
1046    #[test]
1047    fn cors_valid_single_origin_is_ok() {
1048        let config = HttpServerConfig {
1049            allowed_origins: vec!["http://example.com".to_string()],
1050        };
1051        assert!(build_cors_layer(&config).is_ok());
1052    }
1053
1054    #[test]
1055    fn cors_valid_multiple_origins_is_ok() {
1056        let config = HttpServerConfig {
1057            allowed_origins: vec![
1058                "https://app.example.com".to_string(),
1059                "https://admin.example.com".to_string(),
1060            ],
1061        };
1062        assert!(build_cors_layer(&config).is_ok());
1063    }
1064
1065    #[test]
1066    fn cors_invalid_origin_string_is_err() {
1067        let config = HttpServerConfig {
1068            // HeaderValue rejects strings containing control characters / invalid bytes.
1069            allowed_origins: vec!["not a\nvalid header".to_string()],
1070        };
1071        let result = build_cors_layer(&config);
1072        assert!(result.is_err(), "invalid origin string must return Err");
1073    }
1074
1075    #[test]
1076    fn default_config_is_valid() {
1077        // Guarantees that the expect() in create_pjs_router / create_pjs_router_with_rate_limit
1078        // will never panic at runtime.
1079        assert!(
1080            build_cors_layer(&HttpServerConfig::default()).is_ok(),
1081            "default HttpServerConfig must produce a valid CORS layer"
1082        );
1083    }
1084
1085    // --- existing integration tests ---
1086
1087    use crate::domain::{
1088        aggregates::StreamSession,
1089        entities::Stream,
1090        events::DomainEvent,
1091        ports::{
1092            EventPublisherGat, Pagination, PriorityDistribution, SessionHealthSnapshot,
1093            SessionQueryCriteria, SessionQueryResult, StreamFilter, StreamRepositoryGat,
1094            StreamStatistics, StreamStatus, StreamStoreGat,
1095        },
1096        value_objects::{SessionId, StreamId},
1097    };
1098    use chrono::Utc;
1099    use std::collections::HashMap;
1100
1101    struct MockRepository {
1102        sessions: parking_lot::Mutex<HashMap<SessionId, StreamSession>>,
1103    }
1104
1105    impl MockRepository {
1106        fn new() -> Self {
1107            Self {
1108                sessions: parking_lot::Mutex::new(HashMap::new()),
1109            }
1110        }
1111    }
1112
1113    impl StreamRepositoryGat for MockRepository {
1114        type FindSessionFuture<'a>
1115            = impl std::future::Future<Output = crate::domain::DomainResult<Option<StreamSession>>>
1116            + Send
1117            + 'a
1118        where
1119            Self: 'a;
1120
1121        type SaveSessionFuture<'a>
1122            = impl std::future::Future<Output = crate::domain::DomainResult<()>> + Send + 'a
1123        where
1124            Self: 'a;
1125
1126        type RemoveSessionFuture<'a>
1127            = impl std::future::Future<Output = crate::domain::DomainResult<()>> + Send + 'a
1128        where
1129            Self: 'a;
1130
1131        type FindActiveSessionsFuture<'a>
1132            = impl std::future::Future<Output = crate::domain::DomainResult<Vec<StreamSession>>>
1133            + Send
1134            + 'a
1135        where
1136            Self: 'a;
1137
1138        type FindSessionsByCriteriaFuture<'a>
1139            = impl std::future::Future<Output = crate::domain::DomainResult<SessionQueryResult>>
1140            + Send
1141            + 'a
1142        where
1143            Self: 'a;
1144
1145        type GetSessionHealthFuture<'a>
1146            = impl std::future::Future<Output = crate::domain::DomainResult<SessionHealthSnapshot>>
1147            + Send
1148            + 'a
1149        where
1150            Self: 'a;
1151
1152        type SessionExistsFuture<'a>
1153            = impl std::future::Future<Output = crate::domain::DomainResult<bool>> + Send + 'a
1154        where
1155            Self: 'a;
1156
1157        fn find_session(&self, session_id: SessionId) -> Self::FindSessionFuture<'_> {
1158            async move { Ok(self.sessions.lock().get(&session_id).cloned()) }
1159        }
1160
1161        fn save_session(&self, session: StreamSession) -> Self::SaveSessionFuture<'_> {
1162            async move {
1163                self.sessions.lock().insert(session.id(), session);
1164                Ok(())
1165            }
1166        }
1167
1168        fn remove_session(&self, session_id: SessionId) -> Self::RemoveSessionFuture<'_> {
1169            async move {
1170                self.sessions.lock().remove(&session_id);
1171                Ok(())
1172            }
1173        }
1174
1175        fn find_active_sessions(&self) -> Self::FindActiveSessionsFuture<'_> {
1176            async move { Ok(self.sessions.lock().values().cloned().collect()) }
1177        }
1178
1179        fn find_sessions_by_criteria(
1180            &self,
1181            _criteria: SessionQueryCriteria,
1182            pagination: Pagination,
1183        ) -> Self::FindSessionsByCriteriaFuture<'_> {
1184            async move {
1185                let sessions: Vec<_> = self.sessions.lock().values().cloned().collect();
1186                let total_count = sessions.len();
1187                let paginated: Vec<_> = sessions
1188                    .into_iter()
1189                    .skip(pagination.offset)
1190                    .take(pagination.limit)
1191                    .collect();
1192                let has_more = pagination.offset + paginated.len() < total_count;
1193                Ok(SessionQueryResult {
1194                    sessions: paginated,
1195                    total_count,
1196                    has_more,
1197                    query_duration_ms: 0,
1198                    scan_limit_reached: false,
1199                })
1200            }
1201        }
1202
1203        fn get_session_health(&self, session_id: SessionId) -> Self::GetSessionHealthFuture<'_> {
1204            async move {
1205                Ok(SessionHealthSnapshot {
1206                    session_id,
1207                    is_healthy: true,
1208                    active_streams: 0,
1209                    total_frames: 0,
1210                    last_activity: Utc::now(),
1211                    error_rate: 0.0,
1212                    metrics: HashMap::new(),
1213                })
1214            }
1215        }
1216
1217        fn session_exists(&self, session_id: SessionId) -> Self::SessionExistsFuture<'_> {
1218            async move { Ok(self.sessions.lock().contains_key(&session_id)) }
1219        }
1220    }
1221
1222    struct MockEventPublisher;
1223
1224    impl EventPublisherGat for MockEventPublisher {
1225        type PublishFuture<'a>
1226            = impl std::future::Future<Output = crate::domain::DomainResult<()>> + Send + 'a
1227        where
1228            Self: 'a;
1229
1230        type PublishBatchFuture<'a>
1231            = impl std::future::Future<Output = crate::domain::DomainResult<()>> + Send + 'a
1232        where
1233            Self: 'a;
1234
1235        fn publish(&self, _event: DomainEvent) -> Self::PublishFuture<'_> {
1236            async move { Ok(()) }
1237        }
1238
1239        fn publish_batch(&self, _events: Vec<DomainEvent>) -> Self::PublishBatchFuture<'_> {
1240            async move { Ok(()) }
1241        }
1242    }
1243
1244    struct MockStreamStore;
1245
1246    impl StreamStoreGat for MockStreamStore {
1247        type StoreStreamFuture<'a>
1248            = impl std::future::Future<Output = crate::domain::DomainResult<()>> + Send + 'a
1249        where
1250            Self: 'a;
1251
1252        type GetStreamFuture<'a>
1253            = impl std::future::Future<Output = crate::domain::DomainResult<Option<Stream>>>
1254            + Send
1255            + 'a
1256        where
1257            Self: 'a;
1258
1259        type DeleteStreamFuture<'a>
1260            = impl std::future::Future<Output = crate::domain::DomainResult<()>> + Send + 'a
1261        where
1262            Self: 'a;
1263
1264        type ListStreamsForSessionFuture<'a>
1265            =
1266            impl std::future::Future<Output = crate::domain::DomainResult<Vec<Stream>>> + Send + 'a
1267        where
1268            Self: 'a;
1269
1270        type FindStreamsBySessionFuture<'a>
1271            =
1272            impl std::future::Future<Output = crate::domain::DomainResult<Vec<Stream>>> + Send + 'a
1273        where
1274            Self: 'a;
1275
1276        type UpdateStreamStatusFuture<'a>
1277            = impl std::future::Future<Output = crate::domain::DomainResult<()>> + Send + 'a
1278        where
1279            Self: 'a;
1280
1281        type GetStreamStatisticsFuture<'a>
1282            = impl std::future::Future<Output = crate::domain::DomainResult<StreamStatistics>>
1283            + Send
1284            + 'a
1285        where
1286            Self: 'a;
1287
1288        fn store_stream(&self, _stream: Stream) -> Self::StoreStreamFuture<'_> {
1289            async move { Ok(()) }
1290        }
1291
1292        fn get_stream(&self, _stream_id: StreamId) -> Self::GetStreamFuture<'_> {
1293            async move { Ok(None) }
1294        }
1295
1296        fn delete_stream(&self, _stream_id: StreamId) -> Self::DeleteStreamFuture<'_> {
1297            async move { Ok(()) }
1298        }
1299
1300        fn list_streams_for_session(
1301            &self,
1302            _session_id: SessionId,
1303        ) -> Self::ListStreamsForSessionFuture<'_> {
1304            async move { Ok(vec![]) }
1305        }
1306
1307        fn find_streams_by_session(
1308            &self,
1309            _session_id: SessionId,
1310            _filter: StreamFilter,
1311        ) -> Self::FindStreamsBySessionFuture<'_> {
1312            async move { Ok(vec![]) }
1313        }
1314
1315        fn update_stream_status(
1316            &self,
1317            _stream_id: StreamId,
1318            _status: StreamStatus,
1319        ) -> Self::UpdateStreamStatusFuture<'_> {
1320            async move { Ok(()) }
1321        }
1322
1323        fn get_stream_statistics(
1324            &self,
1325            _stream_id: StreamId,
1326        ) -> Self::GetStreamStatisticsFuture<'_> {
1327            async move {
1328                Ok(StreamStatistics {
1329                    total_frames: 0,
1330                    total_bytes: 0,
1331                    priority_distribution: PriorityDistribution::default(),
1332                    avg_frame_size: 0.0,
1333                    creation_time: Utc::now(),
1334                    completion_time: None,
1335                    processing_duration: None,
1336                })
1337            }
1338        }
1339    }
1340
1341    #[tokio::test]
1342    async fn test_system_health() {
1343        let response = system_health().await;
1344        let health_data: serde_json::Value = response.0;
1345
1346        assert_eq!(health_data["status"], "healthy");
1347        assert!(!health_data["features"].as_array().unwrap().is_empty());
1348    }
1349
1350    #[tokio::test]
1351    async fn test_app_state_creation() {
1352        let repository = Arc::new(MockRepository::new());
1353        let event_publisher = Arc::new(MockEventPublisher);
1354        let stream_store = Arc::new(MockStreamStore);
1355
1356        let _state = PjsAppState::new(repository, event_publisher, stream_store);
1357    }
1358
1359    #[tokio::test]
1360    async fn test_get_system_stats_returns_real_uptime() {
1361        use crate::application::handlers::QueryHandlerGat;
1362        use crate::application::handlers::query_handlers::SystemQueryHandler;
1363        use crate::application::queries::GetSystemStatsQuery;
1364        use std::time::{Duration, Instant};
1365
1366        let repository = Arc::new(MockRepository::new());
1367        // Simulate a handler that started 5 seconds ago.
1368        let started_at = Instant::now() - Duration::from_secs(5);
1369        let handler = SystemQueryHandler::with_start_time(repository, started_at);
1370
1371        let query = GetSystemStatsQuery {
1372            include_historical: false,
1373        };
1374        let result = QueryHandlerGat::handle(&handler, query).await.unwrap();
1375
1376        // uptime must reflect the real elapsed time, not a hard-coded value.
1377        assert!(
1378            result.uptime_seconds >= 5,
1379            "uptime_seconds should be at least 5, got {}",
1380            result.uptime_seconds
1381        );
1382        // Must not be the old placeholder value (3600).
1383        assert_ne!(
1384            result.uptime_seconds, 3600,
1385            "uptime_seconds must not be the hard-coded placeholder 3600"
1386        );
1387    }
1388
1389    #[cfg(feature = "metrics")]
1390    #[tokio::test]
1391    async fn test_metrics_endpoint_returns_prometheus_format() {
1392        use crate::infrastructure::http::metrics::install_global_recorder;
1393
1394        // Install the recorder and verify the handle renders text/plain output.
1395        let handle = install_global_recorder().expect("recorder install should succeed");
1396        let rendered = handle.render();
1397        // Prometheus text format: empty registry produces an empty string or
1398        // comment lines; never a JSON error body.
1399        assert!(
1400            !rendered.contains("{\"error\""),
1401            "rendered metrics should not be a JSON error: {rendered}"
1402        );
1403
1404        // Calling again must be idempotent.
1405        let handle2 = install_global_recorder().expect("second call must not fail");
1406        assert_eq!(
1407            handle.render(),
1408            handle2.render(),
1409            "both handles must render the same metrics"
1410        );
1411    }
1412
1413    #[cfg(feature = "metrics")]
1414    #[test]
1415    fn test_metrics_router_has_metrics_route() {
1416        // Verify that the router includes /metrics by exercising the route builder.
1417        // We check this at compile time through the feature-gated code path.
1418        let _router =
1419            create_pjs_router_with_config::<MockRepository, MockEventPublisher, MockStreamStore>(
1420                &HttpServerConfig::default(),
1421            )
1422            .expect("router should build successfully with metrics feature");
1423    }
1424
1425    #[tokio::test]
1426    async fn search_sessions_route_returns_ok() {
1427        use axum::http::Request;
1428        use tower::ServiceExt;
1429
1430        let repository = Arc::new(MockRepository::new());
1431        let event_publisher = Arc::new(MockEventPublisher);
1432        let stream_store = Arc::new(MockStreamStore);
1433        let state = PjsAppState::new(repository, event_publisher, stream_store);
1434
1435        let router =
1436            create_pjs_router_with_config::<MockRepository, MockEventPublisher, MockStreamStore>(
1437                &HttpServerConfig::default(),
1438            )
1439            .expect("router should build")
1440            .with_state(state);
1441
1442        let req = Request::builder()
1443            .uri("/pjs/sessions/search")
1444            .body(axum::body::Body::empty())
1445            .unwrap();
1446
1447        let resp = router.oneshot(req).await.unwrap();
1448        assert_eq!(resp.status(), StatusCode::OK);
1449    }
1450}