Skip to main content

pjson_rs/infrastructure/http/
axum_adapter.rs

1//! Axum HTTP server adapter for PJS streaming
2
3use axum::{
4    Json, Router,
5    extract::{DefaultBodyLimit, Path as AxumPath, Query, State},
6    http::{
7        HeaderValue, Method, StatusCode,
8        header::{self, AUTHORIZATION, CONTENT_TYPE},
9    },
10    middleware,
11    response::{IntoResponse, Response},
12    routing::{get, post},
13};
14use serde::{Deserialize, Serialize};
15use serde_json::Value as JsonValue;
16use std::{sync::Arc, time::Instant};
17use tower_http::{
18    cors::{AllowOrigin, CorsLayer},
19    trace::TraceLayer,
20};
21
22use crate::{
23    application::{
24        commands::*,
25        dto::PriorityDto,
26        handlers::{
27            CommandHandlerGat, QueryHandlerGat,
28            command_handlers::SessionCommandHandler,
29            query_handlers::{SessionQueryHandler, StreamQueryHandler, SystemQueryHandler},
30        },
31        queries::*,
32    },
33    domain::{
34        aggregates::stream_session::{SessionConfig, SessionHealth},
35        entities::Frame,
36        ports::{
37            DictionaryStore, EventPublisherGat, FrameStoreGat, NoopDictionaryStore,
38            StreamRepositoryGat, StreamStoreGat,
39        },
40        value_objects::{Priority, SessionId, StreamId},
41    },
42    infrastructure::{
43        adapters::InMemoryFrameStore,
44        http::middleware::{RateLimitMiddleware, security_middleware},
45    },
46};
47
48/// HTTP server configuration.
49///
50/// # Production warning
51///
52/// `HttpServerConfig::default()` returns a configuration suitable for **local development
53/// only** — it allows a single hard-coded origin (`http://localhost:3000`). Production
54/// deployments must construct an explicit `HttpServerConfig` with the actual list of
55/// allowed origins, or pass `vec![]` to deny all cross-origin requests.
56///
57/// Use [`create_pjs_router_with_config`] to apply a non-default configuration.
58///
59/// # Adding fields
60///
61/// This struct is marked `#[non_exhaustive]` so future additive fields
62/// (e.g. `allow_credentials`, `max_age`) do not become breaking changes.
63/// External callers cannot use the struct-init pattern; construct an instance
64/// via [`HttpServerConfig::new`] or [`HttpServerConfig::default`] and mutate
65/// the public fields you need.
66#[derive(Debug, Clone)]
67#[non_exhaustive]
68pub struct HttpServerConfig {
69    /// List of origins allowed by the CORS layer.
70    ///
71    /// # Matching semantics
72    ///
73    /// Origins are matched against the request's `Origin` header by **case-sensitive byte
74    /// equality**. This is `tower_http::cors::AllowOrigin::list` behavior; it is not the
75    /// case-insensitive scheme/host comparison defined by RFC 6454 §6.
76    ///
77    /// In practice this matches all real browser traffic, because mainstream browsers
78    /// always send lowercase scheme and host. Write your origins in lowercase.
79    ///
80    /// # Special values
81    ///
82    /// - `[]` (empty) — deny all cross-origin requests (fail-closed)
83    /// - `["*"]` — allow any origin (passes through to `tower_http::cors::Any`)
84    /// - Mixing `"*"` with explicit origins is rejected at construction time
85    pub allowed_origins: Vec<String>,
86}
87
88impl HttpServerConfig {
89    /// Construct a configuration with an explicit list of allowed CORS origins.
90    ///
91    /// Pass `vec![]` to deny all cross-origin requests, or `vec!["*".into()]`
92    /// to allow any origin. Mixing `"*"` with explicit origins is rejected
93    /// later when the CORS layer is built.
94    ///
95    /// # Examples
96    ///
97    /// ```
98    /// use pjson_rs::infrastructure::http::HttpServerConfig;
99    ///
100    /// let config = HttpServerConfig::new(vec!["https://app.example.com".into()]);
101    /// assert_eq!(config.allowed_origins.len(), 1);
102    /// ```
103    pub fn new(allowed_origins: Vec<String>) -> Self {
104        Self { allowed_origins }
105    }
106}
107
108impl Default for HttpServerConfig {
109    /// Local-development default: allows `http://localhost:3000`.
110    ///
111    /// **Do not use this in production.** See the type-level docs.
112    fn default() -> Self {
113        Self {
114            allowed_origins: vec!["http://localhost:3000".to_string()],
115        }
116    }
117}
118
119/// Build a [`CorsLayer`] from an [`HttpServerConfig`].
120///
121/// # Errors
122///
123/// Returns [`PjsError::HttpError`] if:
124/// - `allowed_origins` is a mix of `"*"` and explicit origins
125/// - any origin string fails to parse as a valid `HeaderValue`
126fn build_cors_layer(config: &HttpServerConfig) -> Result<CorsLayer, PjsError> {
127    // We intentionally do NOT call .allow_credentials(true).
128    // PJS does not use cookie-based auth; the Authorization header works without
129    // credentials mode. allow_credentials(true) is incompatible with allow_origin(Any),
130    // which would forbid the `["*"]` config path.
131    let base = CorsLayer::new()
132        .allow_methods([Method::GET, Method::POST])
133        .allow_headers([CONTENT_TYPE, AUTHORIZATION])
134        .max_age(std::time::Duration::from_secs(3600));
135
136    let has_wildcard = config.allowed_origins.iter().any(|o| o == "*");
137    let has_explicit = config.allowed_origins.iter().any(|o| o != "*");
138
139    let layer = match (
140        config.allowed_origins.is_empty(),
141        has_wildcard,
142        has_explicit,
143    ) {
144        (true, _, _) => base.allow_origin(AllowOrigin::list(std::iter::empty::<HeaderValue>())),
145        (_, true, true) => {
146            return Err(PjsError::HttpError(
147                "CORS: wildcard '*' cannot be combined with explicit origins".into(),
148            ));
149        }
150        (_, true, false) => base.allow_origin(tower_http::cors::Any),
151        (_, false, _) => {
152            let origins: Vec<HeaderValue> = config
153                .allowed_origins
154                .iter()
155                .map(|o| {
156                    o.parse::<HeaderValue>()
157                        .map_err(|e| PjsError::HttpError(format!("invalid CORS origin {o:?}: {e}")))
158                })
159                .collect::<Result<_, _>>()?;
160            base.allow_origin(AllowOrigin::list(origins))
161        }
162    };
163    Ok(layer)
164}
165
166/// Axum application state with PJS GAT-based handlers.
167///
168/// The `dictionary_store` field is `pub(crate)` so the dictionary handler can
169/// access it without exposing it as a public API.
170pub struct PjsAppState<R, P, S, F = InMemoryFrameStore>
171where
172    R: StreamRepositoryGat + Send + Sync + 'static,
173    P: EventPublisherGat + Send + Sync + 'static,
174    S: StreamStoreGat + Send + Sync + 'static,
175    F: FrameStoreGat + Send + Sync + 'static,
176{
177    command_handler: Arc<SessionCommandHandler<R, P, F>>,
178    session_query_handler: Arc<SessionQueryHandler<R>>,
179    stream_query_handler: Arc<StreamQueryHandler<R, S, F>>,
180    system_handler: Arc<SystemQueryHandler<R>>,
181    pub(crate) dictionary_store: Arc<dyn DictionaryStore>,
182}
183
184impl<R, P, S, F> Clone for PjsAppState<R, P, S, F>
185where
186    R: StreamRepositoryGat + Send + Sync + 'static,
187    P: EventPublisherGat + Send + Sync + 'static,
188    S: StreamStoreGat + Send + Sync + 'static,
189    F: FrameStoreGat + Send + Sync + 'static,
190{
191    fn clone(&self) -> Self {
192        Self {
193            command_handler: self.command_handler.clone(),
194            session_query_handler: self.session_query_handler.clone(),
195            stream_query_handler: self.stream_query_handler.clone(),
196            system_handler: self.system_handler.clone(),
197            dictionary_store: self.dictionary_store.clone(),
198        }
199    }
200}
201
202impl<R, P, S> PjsAppState<R, P, S, InMemoryFrameStore>
203where
204    R: StreamRepositoryGat + Send + Sync + 'static,
205    P: EventPublisherGat + Send + Sync + 'static,
206    S: StreamStoreGat + Send + Sync + 'static,
207{
208    /// Create a new application state with default [`NoopDictionaryStore`] and
209    /// an in-memory frame store.
210    ///
211    /// The `/pjs/sessions/{id}/dictionary` endpoint will return 404 until
212    /// you upgrade to [`PjsAppState::with_dictionary_store`] with a concrete
213    /// implementation such as [`crate::infrastructure::repositories::InMemoryDictionaryStore`].
214    ///
215    /// Records the current instant as the process start time for uptime reporting.
216    pub fn new(repository: Arc<R>, event_publisher: Arc<P>, stream_store: Arc<S>) -> Self {
217        Self::with_dictionary_store(
218            repository,
219            event_publisher,
220            stream_store,
221            Arc::new(NoopDictionaryStore),
222        )
223    }
224
225    /// Create a new application state with a custom [`DictionaryStore`] and an
226    /// in-memory frame store.
227    ///
228    /// Pass `Arc::new(InMemoryDictionaryStore::new(...))` to enable end-to-end
229    /// dictionary training and serving.
230    pub fn with_dictionary_store(
231        repository: Arc<R>,
232        event_publisher: Arc<P>,
233        stream_store: Arc<S>,
234        dictionary_store: Arc<dyn DictionaryStore>,
235    ) -> Self {
236        Self::with_stores(
237            repository,
238            event_publisher,
239            stream_store,
240            dictionary_store,
241            Arc::new(InMemoryFrameStore::new()),
242        )
243    }
244}
245
246impl<R, P, S, F> PjsAppState<R, P, S, F>
247where
248    R: StreamRepositoryGat + Send + Sync + 'static,
249    P: EventPublisherGat + Send + Sync + 'static,
250    S: StreamStoreGat + Send + Sync + 'static,
251    F: FrameStoreGat + Send + Sync + 'static,
252{
253    /// Create a new application state with custom [`DictionaryStore`] and
254    /// [`FrameStoreGat`] implementations.
255    pub fn with_stores(
256        repository: Arc<R>,
257        event_publisher: Arc<P>,
258        stream_store: Arc<S>,
259        dictionary_store: Arc<dyn DictionaryStore>,
260        frame_store: Arc<F>,
261    ) -> Self {
262        let started_at = Instant::now();
263        Self {
264            command_handler: Arc::new(SessionCommandHandler::with_stores(
265                repository.clone(),
266                event_publisher,
267                dictionary_store.clone(),
268                frame_store.clone(),
269            )),
270            session_query_handler: Arc::new(SessionQueryHandler::new(repository.clone())),
271            stream_query_handler: Arc::new(StreamQueryHandler::new(
272                repository.clone(),
273                stream_store,
274                frame_store,
275            )),
276            system_handler: Arc::new(SystemQueryHandler::with_start_time(repository, started_at)),
277            dictionary_store,
278        }
279    }
280}
281
282/// Request to create a new streaming session
283#[derive(Debug, Deserialize)]
284pub struct CreateSessionRequest {
285    /// Maximum number of streams the session is allowed to host concurrently.
286    pub max_concurrent_streams: Option<usize>,
287    /// Idle timeout for the session, in seconds.
288    pub timeout_seconds: Option<u64>,
289    /// Optional human-readable client identifier.
290    pub client_info: Option<String>,
291}
292
293/// Response for session creation
294#[derive(Debug, Serialize)]
295pub struct CreateSessionResponse {
296    /// Newly assigned session identifier.
297    pub session_id: String,
298    /// Wall-clock instant after which the session expires.
299    pub expires_at: chrono::DateTime<chrono::Utc>,
300}
301
302/// Request to start streaming data
303#[derive(Debug, Deserialize)]
304pub struct StartStreamRequest {
305    /// JSON payload to be decomposed into priority frames.
306    pub data: JsonValue,
307    /// Minimum frame priority to emit; lower-priority frames are dropped.
308    pub priority_threshold: Option<u8>,
309    /// Maximum number of frames to emit before the stream is closed.
310    pub max_frames: Option<usize>,
311}
312
313/// Stream response parameters
314#[derive(Debug, Deserialize)]
315pub struct StreamParams {
316    /// Identifier of the streaming session.
317    pub session_id: String,
318    /// Optional minimum priority filter applied to emitted frames.
319    pub priority: Option<u8>,
320    /// Optional response format selector (for example, `"json"` or `"sse"`).
321    pub format: Option<String>,
322}
323
324/// Request body for generating priority-filtered frames on an existing stream.
325///
326/// Both fields are optional; defaults match the lowest-cost configuration that
327/// still drives the priority pipeline:
328/// - `priority_threshold` defaults to [`Priority::BACKGROUND`] (10) — accepts every frame.
329/// - `max_frames` defaults to 16 — bounded so a single request cannot emit an
330///   unbounded number of frames.
331#[derive(Debug, Default, Deserialize)]
332pub struct GenerateFramesRequest {
333    /// Minimum frame priority to emit; lower-priority frames are dropped.
334    pub priority_threshold: Option<u8>,
335    /// Maximum number of frames to emit in this request.
336    pub max_frames: Option<usize>,
337}
338
339/// Response body for `POST .../streams/{stream_id}/generate-frames`.
340///
341/// Returns the frames produced by the stream's priority extractor, in the
342/// same shape as `GET .../frames` but freshly generated (and fed into the
343/// per-session dictionary training corpus when the `compression` feature
344/// is enabled).
345#[derive(Debug, Serialize)]
346pub struct GenerateFramesResponse {
347    /// Frames produced by the priority extractor in this request.
348    pub frames: Vec<Frame>,
349    /// Number of frames returned (always equal to `frames.len()`).
350    pub frame_count: usize,
351}
352
353/// Session health response
354#[derive(Debug, Serialize)]
355pub struct SessionHealthResponse {
356    /// Aggregate health flag derived from rates and recent activity.
357    pub is_healthy: bool,
358    /// Number of streams currently in an active state.
359    pub active_streams: usize,
360    /// Number of streams that have terminated with an error.
361    pub failed_streams: usize,
362    /// Whether the session has passed its expiry instant.
363    pub is_expired: bool,
364    /// Number of seconds since the session was created.
365    pub uptime_seconds: i64,
366}
367
368impl From<SessionHealth> for SessionHealthResponse {
369    fn from(health: SessionHealth) -> Self {
370        Self {
371            is_healthy: health.is_healthy,
372            active_streams: health.active_streams,
373            failed_streams: health.failed_streams,
374            is_expired: health.is_expired,
375            uptime_seconds: health.uptime_seconds,
376        }
377    }
378}
379
380/// Create PJS-enabled Axum router with the default CORS configuration.
381///
382/// Uses [`HttpServerConfig::default`] which allows `http://localhost:3000`.
383///
384/// # Security Note
385///
386/// This is suitable for local development only. For production, use
387/// [`create_pjs_router_with_config`] with an explicit [`HttpServerConfig`].
388///
389/// TODO: Implement authentication strategy before production deployment.
390/// Options: API keys, JWT tokens, OAuth2/OIDC
391pub fn create_pjs_router<R, P, S>() -> Router<PjsAppState<R, P, S>>
392where
393    R: StreamRepositoryGat + Send + Sync + 'static,
394    P: EventPublisherGat + Send + Sync + 'static,
395    S: StreamStoreGat + Send + Sync + 'static,
396{
397    create_pjs_router_with_config::<R, P, S>(&HttpServerConfig::default())
398        .expect("default HttpServerConfig must always produce a valid CORS layer")
399}
400
401/// Create PJS-enabled Axum router with a custom [`HttpServerConfig`].
402///
403/// # Errors
404///
405/// Returns [`PjsError::HttpError`] if `config` contains invalid CORS origins —
406/// specifically, when `allowed_origins` mixes `"*"` with explicit origins, or
407/// any origin string fails to parse as a valid `HeaderValue`.
408///
409/// # Examples
410///
411/// ```rust,ignore
412/// use pjson_rs::infrastructure::http::{HttpServerConfig, create_pjs_router_with_config};
413///
414/// let config = HttpServerConfig::new(vec!["https://app.example.com".to_string()]);
415/// let router = create_pjs_router_with_config::<R, P, S>(&config)?;
416/// ```
417pub fn create_pjs_router_with_config<R, P, S>(
418    config: &HttpServerConfig,
419) -> Result<Router<PjsAppState<R, P, S>>, PjsError>
420where
421    R: StreamRepositoryGat + Send + Sync + 'static,
422    P: EventPublisherGat + Send + Sync + 'static,
423    S: StreamStoreGat + Send + Sync + 'static,
424{
425    let all_routes = public_routes::<R, P, S>().merge(protected_routes::<R, P, S>());
426    apply_common_layers(all_routes, config)
427}
428
429/// Create PJS-enabled Axum router with rate limiting and the default CORS configuration.
430///
431/// Adds rate limiting middleware to protect against DoS attacks.
432/// Default: 100 requests per minute per IP address.
433///
434/// Uses [`HttpServerConfig::default`] which allows `http://localhost:3000`.
435/// For production, use [`create_pjs_router_with_rate_limit_and_config`].
436///
437/// # Security Note
438///
439/// Rate limiting is applied globally to all endpoints.
440/// Returns 429 Too Many Requests with Retry-After header when limit exceeded.
441/// Adds X-RateLimit-* headers per RFC 6585.
442pub fn create_pjs_router_with_rate_limit<R, P, S>(
443    rate_limit_middleware: RateLimitMiddleware,
444) -> Router<PjsAppState<R, P, S>>
445where
446    R: StreamRepositoryGat + Send + Sync + 'static,
447    P: EventPublisherGat + Send + Sync + 'static,
448    S: StreamStoreGat + Send + Sync + 'static,
449{
450    create_pjs_router_with_rate_limit_and_config::<R, P, S>(
451        &HttpServerConfig::default(),
452        rate_limit_middleware,
453    )
454    .expect("default HttpServerConfig must always produce a valid CORS layer")
455}
456
457/// Create PJS-enabled Axum router with rate limiting and a custom [`HttpServerConfig`].
458///
459/// # Errors
460///
461/// Returns [`PjsError::HttpError`] if `config` contains invalid CORS origins.
462pub fn create_pjs_router_with_rate_limit_and_config<R, P, S>(
463    config: &HttpServerConfig,
464    rate_limit_middleware: RateLimitMiddleware,
465) -> Result<Router<PjsAppState<R, P, S>>, PjsError>
466where
467    R: StreamRepositoryGat + Send + Sync + 'static,
468    P: EventPublisherGat + Send + Sync + 'static,
469    S: StreamStoreGat + Send + Sync + 'static,
470{
471    let all_routes = public_routes::<R, P, S>()
472        .merge(protected_routes::<R, P, S>())
473        .layer(rate_limit_middleware);
474    apply_common_layers(all_routes, config)
475}
476
477/// Create PJS-enabled Axum router with API key authentication and a custom [`HttpServerConfig`].
478///
479/// The health endpoint (`/pjs/health`) is **not** protected by auth — it lives in a
480/// separate public sub-router that is merged without the auth layer. All other routes
481/// require a valid API key.
482///
483/// # Errors
484///
485/// Returns [`PjsError::HttpError`] if `config` contains invalid CORS origins.
486///
487/// # Examples
488///
489/// ```rust,ignore
490/// use pjson_rs::infrastructure::http::{
491///     HttpServerConfig, auth::{ApiKeyConfig, ApiKeyAuthLayer},
492///     create_pjs_router_with_auth,
493/// };
494///
495/// let api_config = ApiKeyConfig::new(&["my-api-key"])?;
496/// let auth_layer = ApiKeyAuthLayer::new(api_config);
497/// let config = HttpServerConfig::default();
498/// let router = create_pjs_router_with_auth::<R, P, S>(&config, auth_layer)?;
499/// ```
500#[cfg(feature = "http-server")]
501pub fn create_pjs_router_with_auth<R, P, S>(
502    config: &HttpServerConfig,
503    auth: crate::infrastructure::http::auth::ApiKeyAuthLayer,
504) -> Result<Router<PjsAppState<R, P, S>>, PjsError>
505where
506    R: StreamRepositoryGat + Send + Sync + 'static,
507    P: EventPublisherGat + Send + Sync + 'static,
508    S: StreamStoreGat + Send + Sync + 'static,
509{
510    // Auth wraps only the protected sub-router. Public routes (health, metrics) are
511    // merged separately so there is zero path-string comparison logic in the auth layer.
512    let protected = protected_routes::<R, P, S>().layer(auth);
513    let merged = public_routes::<R, P, S>().merge(protected);
514    apply_common_layers(merged, config)
515}
516
517/// Create PJS-enabled Axum router with both rate limiting and API key authentication.
518///
519/// Layer ordering (Tower applies layers outer-to-inner):
520/// ```text
521/// rate_limit  ← outermost: wraps both public and protected sub-routers
522///   public_routes (no auth)
523///   protected_routes
524///     auth    ← inner: wraps only protected routes; unauthenticated requests are
525///               rejected before consuming rate-limit quota for protected paths
526///     handlers
527/// ```
528///
529/// Rate limiting is applied to **both** the public and protected sub-routers (DoS
530/// protection for `/pjs/health` is still desirable).
531///
532/// # Errors
533///
534/// Returns [`PjsError::HttpError`] if `config` contains invalid CORS origins.
535#[cfg(feature = "http-server")]
536pub fn create_pjs_router_with_rate_limit_and_auth<R, P, S>(
537    config: &HttpServerConfig,
538    rate_limit: RateLimitMiddleware,
539    auth: crate::infrastructure::http::auth::ApiKeyAuthLayer,
540) -> Result<Router<PjsAppState<R, P, S>>, PjsError>
541where
542    R: StreamRepositoryGat + Send + Sync + 'static,
543    P: EventPublisherGat + Send + Sync + 'static,
544    S: StreamStoreGat + Send + Sync + 'static,
545{
546    // Auth runs before rate limit on the protected sub-router so that unauthenticated
547    // traffic does not consume rate-limit quota and cannot starve legitimate clients.
548    let protected = protected_routes::<R, P, S>().layer(auth);
549    let merged = public_routes::<R, P, S>()
550        .merge(protected)
551        .layer(rate_limit);
552    apply_common_layers(merged, config)
553}
554
555// ── Route table helpers ────────────────────────────────────────────────────────────
556
557/// Routes that are always public — no authentication applied.
558///
559/// Currently: `/pjs/health` and (when the `metrics` feature is enabled) `/metrics`.
560fn public_routes<R, P, S>() -> Router<PjsAppState<R, P, S>>
561where
562    R: StreamRepositoryGat + Send + Sync + 'static,
563    P: EventPublisherGat + Send + Sync + 'static,
564    S: StreamStoreGat + Send + Sync + 'static,
565{
566    let router = Router::new().route("/pjs/health", get(system_health));
567
568    #[cfg(feature = "metrics")]
569    let router = router.route(
570        "/metrics",
571        get(crate::infrastructure::http::metrics::metrics_handler),
572    );
573
574    router
575}
576
577/// Routes that require authentication when an auth layer is applied.
578fn protected_routes<R, P, S>() -> Router<PjsAppState<R, P, S>>
579where
580    R: StreamRepositoryGat + Send + Sync + 'static,
581    P: EventPublisherGat + Send + Sync + 'static,
582    S: StreamStoreGat + Send + Sync + 'static,
583{
584    let router = Router::new()
585        .route("/pjs/sessions", post(create_session::<R, P, S>))
586        .route("/pjs/sessions/{session_id}", get(get_session::<R, P, S>))
587        .route(
588            "/pjs/sessions/{session_id}/health",
589            get(session_health::<R, P, S>),
590        )
591        .route(
592            "/pjs/sessions/{session_id}/stats",
593            get(get_session_stats::<R, P, S>),
594        )
595        .route(
596            "/pjs/sessions/{session_id}/streams",
597            post(create_stream::<R, P, S>),
598        )
599        .route(
600            "/pjs/sessions/{session_id}/streams/{stream_id}/start",
601            post(start_stream::<R, P, S>),
602        )
603        .route(
604            "/pjs/sessions/{session_id}/streams/{stream_id}/generate-frames",
605            post(generate_frames::<R, P, S>),
606        )
607        .route(
608            "/pjs/sessions/{session_id}/streams/{stream_id}",
609            get(get_stream::<R, P, S>),
610        )
611        .route(
612            "/pjs/sessions/{session_id}/streams/{stream_id}/frames",
613            get(get_stream_frames::<R, P, S>),
614        )
615        .route("/pjs/sessions/search", get(search_sessions::<R, P, S>))
616        .route("/pjs/sessions", get(list_sessions::<R, P, S>))
617        .route("/pjs/stats", get(get_system_stats::<R, P, S>));
618
619    #[cfg(all(feature = "compression", not(target_arch = "wasm32")))]
620    let router = router.route(
621        "/pjs/sessions/{session_id}/dictionary",
622        get(crate::infrastructure::http::dictionary::get_session_dictionary::<R, P, S>),
623    );
624
625    router
626}
627
628/// Apply the cross-cutting middleware stack shared by all router variants.
629///
630/// Order (Tower applies outer-to-inner):
631/// ```text
632/// security_middleware   ← security headers
633/// DefaultBodyLimit      ← body size guard
634/// CorsLayer             ← CORS (outside auth, so preflight is answered before auth)
635/// TraceLayer            ← distributed tracing
636/// ```
637fn apply_common_layers<R, P, S>(
638    router: Router<PjsAppState<R, P, S>>,
639    config: &HttpServerConfig,
640) -> Result<Router<PjsAppState<R, P, S>>, PjsError>
641where
642    R: StreamRepositoryGat + Send + Sync + 'static,
643    P: EventPublisherGat + Send + Sync + 'static,
644    S: StreamStoreGat + Send + Sync + 'static,
645{
646    let cors = build_cors_layer(config)?;
647    Ok(router
648        .layer(middleware::from_fn(security_middleware))
649        .layer(DefaultBodyLimit::max(10 * 1024 * 1024))
650        .layer(cors)
651        .layer(TraceLayer::new_for_http()))
652}
653
654/// Create a new streaming session
655async fn create_session<R, P, S>(
656    State(state): State<PjsAppState<R, P, S>>,
657    headers: axum::http::HeaderMap,
658    Json(request): Json<CreateSessionRequest>,
659) -> Result<Json<CreateSessionResponse>, PjsError>
660where
661    R: StreamRepositoryGat + Send + Sync + 'static,
662    P: EventPublisherGat + Send + Sync + 'static,
663    S: StreamStoreGat + Send + Sync + 'static,
664{
665    let config = SessionConfig {
666        max_concurrent_streams: request.max_concurrent_streams.unwrap_or(10),
667        session_timeout_seconds: request.timeout_seconds.unwrap_or(3600),
668        default_stream_config: Default::default(),
669        enable_compression: true,
670        metadata: Default::default(),
671    };
672
673    let user_agent = headers
674        .get(header::USER_AGENT)
675        .and_then(|h| h.to_str().ok())
676        .map(String::from);
677
678    let command = CreateSessionCommand {
679        config,
680        client_info: request.client_info,
681        user_agent,
682        ip_address: None,
683    };
684
685    let session_id: SessionId = CommandHandlerGat::handle(&*state.command_handler, command)
686        .await
687        .map_err(PjsError::Application)?;
688
689    let expires_at = chrono::Utc::now()
690        + chrono::Duration::seconds(request.timeout_seconds.unwrap_or(3600) as i64);
691
692    Ok(Json(CreateSessionResponse {
693        session_id: session_id.to_string(),
694        expires_at,
695    }))
696}
697
698/// Get session information
699async fn get_session<R, P, S>(
700    State(state): State<PjsAppState<R, P, S>>,
701    AxumPath(session_id): AxumPath<String>,
702) -> Result<Json<SessionResponse>, PjsError>
703where
704    R: StreamRepositoryGat + Send + Sync + 'static,
705    P: EventPublisherGat + Send + Sync + 'static,
706    S: StreamStoreGat + Send + Sync + 'static,
707{
708    let session_id =
709        SessionId::from_string(&session_id).map_err(|_| PjsError::InvalidSessionId(session_id))?;
710
711    let query = GetSessionQuery {
712        session_id: session_id.into(),
713    };
714
715    let response = <SessionQueryHandler<R> as QueryHandlerGat<GetSessionQuery>>::handle(
716        &*state.session_query_handler,
717        query,
718    )
719    .await
720    .map_err(PjsError::Application)?;
721
722    Ok(Json(response))
723}
724
725/// Get session health status
726async fn session_health<R, P, S>(
727    State(state): State<PjsAppState<R, P, S>>,
728    AxumPath(session_id): AxumPath<String>,
729) -> Result<Json<SessionHealthResponse>, PjsError>
730where
731    R: StreamRepositoryGat + Send + Sync + 'static,
732    P: EventPublisherGat + Send + Sync + 'static,
733    S: StreamStoreGat + Send + Sync + 'static,
734{
735    let session_id =
736        SessionId::from_string(&session_id).map_err(|_| PjsError::InvalidSessionId(session_id))?;
737
738    let query = GetSessionHealthQuery {
739        session_id: session_id.into(),
740    };
741
742    let response = <SessionQueryHandler<R> as QueryHandlerGat<GetSessionHealthQuery>>::handle(
743        &*state.session_query_handler,
744        query,
745    )
746    .await
747    .map_err(PjsError::Application)?;
748
749    Ok(Json(SessionHealthResponse::from(response.health)))
750}
751
752/// Create a new stream within a session
753///
754/// TODO(CQ-007): Optimize double JSON processing
755/// Current: serde_json::Value -> JsonDataDto -> JsonData
756/// Optimization: Direct JsonData deserialization or use sonic-rs
757async fn create_stream<R, P, S>(
758    State(state): State<PjsAppState<R, P, S>>,
759    AxumPath(session_id): AxumPath<String>,
760    Json(request): Json<StartStreamRequest>,
761) -> Result<Json<serde_json::Value>, PjsError>
762where
763    R: StreamRepositoryGat + Send + Sync + 'static,
764    P: EventPublisherGat + Send + Sync + 'static,
765    S: StreamStoreGat + Send + Sync + 'static,
766{
767    let session_id =
768        SessionId::from_string(&session_id).map_err(|_| PjsError::InvalidSessionId(session_id))?;
769
770    let command = CreateStreamCommand {
771        session_id: session_id.into(),
772        source_data: request.data,
773        config: None,
774    };
775
776    let stream_id: StreamId = CommandHandlerGat::handle(&*state.command_handler, command)
777        .await
778        .map_err(PjsError::Application)?;
779
780    Ok(Json(serde_json::json!({
781        "stream_id": stream_id.to_string(),
782        "status": "created"
783    })))
784}
785
786/// Start streaming for a specific stream
787async fn start_stream<R, P, S>(
788    State(state): State<PjsAppState<R, P, S>>,
789    AxumPath((session_id, stream_id)): AxumPath<(String, String)>,
790) -> Result<Json<serde_json::Value>, PjsError>
791where
792    R: StreamRepositoryGat + Send + Sync + 'static,
793    P: EventPublisherGat + Send + Sync + 'static,
794    S: StreamStoreGat + Send + Sync + 'static,
795{
796    let session_id = SessionId::from_string(&session_id)
797        .map_err(|_| PjsError::InvalidSessionId(session_id.clone()))?;
798    let stream_id =
799        StreamId::from_string(&stream_id).map_err(|_| PjsError::InvalidStreamId(stream_id))?;
800
801    let command = StartStreamCommand {
802        session_id: session_id.into(),
803        stream_id: stream_id.into(),
804    };
805
806    <SessionCommandHandler<R, P> as CommandHandlerGat<StartStreamCommand>>::handle(
807        &*state.command_handler,
808        command,
809    )
810    .await
811    .map_err(PjsError::Application)?;
812
813    Ok(Json(serde_json::json!({
814        "stream_id": stream_id.to_string(),
815        "status": "started"
816    })))
817}
818
819/// Generate priority-filtered frames for an existing stream.
820///
821/// Dispatches [`GenerateFramesCommand`] so the produced frames are fed into
822/// the per-session dictionary-training corpus (see
823/// [`SessionCommandHandler::with_dictionary_store`]). Without this route the
824/// `GET /pjs/sessions/{id}/dictionary` endpoint stays at `404 Not Found` for
825/// HTTP-only clients regardless of how many sessions and streams they create.
826async fn generate_frames<R, P, S>(
827    State(state): State<PjsAppState<R, P, S>>,
828    AxumPath((session_id, stream_id)): AxumPath<(String, String)>,
829    request: Option<Json<GenerateFramesRequest>>,
830) -> Result<Json<GenerateFramesResponse>, PjsError>
831where
832    R: StreamRepositoryGat + Send + Sync + 'static,
833    P: EventPublisherGat + Send + Sync + 'static,
834    S: StreamStoreGat + Send + Sync + 'static,
835{
836    let session_id = SessionId::from_string(&session_id)
837        .map_err(|_| PjsError::InvalidSessionId(session_id.clone()))?;
838    let stream_id =
839        StreamId::from_string(&stream_id).map_err(|_| PjsError::InvalidStreamId(stream_id))?;
840
841    let Json(request) = request.unwrap_or_default();
842
843    let priority_value = request
844        .priority_threshold
845        .unwrap_or(Priority::BACKGROUND.value());
846    let priority_threshold =
847        PriorityDto::new(priority_value).map_err(|e| PjsError::InvalidPriority(e.to_string()))?;
848    let max_frames = request.max_frames.unwrap_or(16);
849
850    let command = GenerateFramesCommand {
851        session_id: session_id.into(),
852        stream_id: stream_id.into(),
853        priority_threshold,
854        max_frames,
855    };
856
857    let frames: Vec<Frame> = <SessionCommandHandler<R, P> as CommandHandlerGat<
858        GenerateFramesCommand,
859    >>::handle(&*state.command_handler, command)
860    .await
861    .map_err(PjsError::Application)?;
862
863    let frame_count = frames.len();
864    Ok(Json(GenerateFramesResponse {
865        frames,
866        frame_count,
867    }))
868}
869
870/// Get stream information
871async fn get_stream<R, P, S>(
872    State(state): State<PjsAppState<R, P, S>>,
873    AxumPath((session_id, stream_id)): AxumPath<(String, String)>,
874) -> Result<Json<StreamResponse>, PjsError>
875where
876    R: StreamRepositoryGat + Send + Sync + 'static,
877    P: EventPublisherGat + Send + Sync + 'static,
878    S: StreamStoreGat + Send + Sync + 'static,
879{
880    let session_id = SessionId::from_string(&session_id)
881        .map_err(|_| PjsError::InvalidSessionId(session_id.clone()))?;
882    let stream_id =
883        StreamId::from_string(&stream_id).map_err(|_| PjsError::InvalidStreamId(stream_id))?;
884
885    let query = GetStreamQuery {
886        session_id: session_id.into(),
887        stream_id: stream_id.into(),
888    };
889
890    let response = <StreamQueryHandler<R, S, InMemoryFrameStore> as QueryHandlerGat<
891        GetStreamQuery,
892    >>::handle(&*state.stream_query_handler, query)
893    .await
894    .map_err(PjsError::Application)?;
895
896    Ok(Json(response))
897}
898
899/// List active sessions
900async fn list_sessions<R, P, S>(
901    State(state): State<PjsAppState<R, P, S>>,
902    Query(params): Query<PaginationParams>,
903) -> Result<Json<SessionsResponse>, PjsError>
904where
905    R: StreamRepositoryGat + Send + Sync + 'static,
906    P: EventPublisherGat + Send + Sync + 'static,
907    S: StreamStoreGat + Send + Sync + 'static,
908{
909    let query = GetActiveSessionsQuery {
910        limit: params.limit,
911        offset: params.offset,
912    };
913
914    let response = <SessionQueryHandler<R> as QueryHandlerGat<GetActiveSessionsQuery>>::handle(
915        &*state.session_query_handler,
916        query,
917    )
918    .await
919    .map_err(PjsError::Application)?;
920
921    Ok(Json(response))
922}
923
924/// Search sessions with filters and sorting.
925async fn search_sessions<R, P, S>(
926    State(state): State<PjsAppState<R, P, S>>,
927    Query(params): Query<SearchSessionsParams>,
928) -> Result<Json<SessionsResponse>, PjsError>
929where
930    R: StreamRepositoryGat + Send + Sync + 'static,
931    P: EventPublisherGat + Send + Sync + 'static,
932    S: StreamStoreGat + Send + Sync + 'static,
933{
934    let sort_by = params.sort_by.as_deref().and_then(|s| match s {
935        "created_at" => Some(SessionSortField::CreatedAt),
936        "updated_at" => Some(SessionSortField::UpdatedAt),
937        "stream_count" => Some(SessionSortField::StreamCount),
938        "total_bytes" => Some(SessionSortField::TotalBytes),
939        _ => None,
940    });
941    let sort_order = params.sort_order.as_deref().and_then(|s| match s {
942        "ascending" | "asc" => Some(SortOrder::Ascending),
943        "descending" | "desc" => Some(SortOrder::Descending),
944        _ => None,
945    });
946    let query = SearchSessionsQuery {
947        filters: SessionFilters {
948            state: params.state,
949            created_after: None,
950            created_before: None,
951            client_info: None,
952            has_active_streams: None,
953        },
954        sort_by,
955        sort_order,
956        limit: params.limit,
957        offset: params.offset,
958    };
959    let response = <SessionQueryHandler<R> as QueryHandlerGat<SearchSessionsQuery>>::handle(
960        &*state.session_query_handler,
961        query,
962    )
963    .await
964    .map_err(PjsError::Application)?;
965    Ok(Json(response))
966}
967
968/// Pagination parameters
969#[derive(Debug, Deserialize)]
970pub struct PaginationParams {
971    /// Maximum number of items to return.
972    pub limit: Option<usize>,
973    /// Number of items to skip before returning results.
974    pub offset: Option<usize>,
975}
976
977/// Query parameters for session search endpoint.
978#[derive(Debug, Deserialize)]
979pub struct SearchSessionsParams {
980    /// Match sessions whose state equals this value.
981    pub state: Option<String>,
982    /// Field name to sort by; ignored if not in the allowed sort field list.
983    pub sort_by: Option<String>,
984    /// Sort direction (`"asc"`/`"ascending"` or `"desc"`/`"descending"`).
985    pub sort_order: Option<String>,
986    /// Maximum number of sessions to return.
987    pub limit: Option<usize>,
988    /// Number of sessions to skip before returning results.
989    pub offset: Option<usize>,
990}
991
992/// System health endpoint
993async fn system_health() -> Json<serde_json::Value> {
994    Json(serde_json::json!({
995        "status": "healthy",
996        "version": env!("CARGO_PKG_VERSION"),
997        "features": ["pjs_streaming", "axum_integration", "gat_handlers"]
998    }))
999}
1000
1001/// Real-time system statistics: uptime, session counts, frame throughput.
1002async fn get_system_stats<R, P, S>(
1003    State(state): State<PjsAppState<R, P, S>>,
1004) -> Result<Json<SystemStatsResponse>, PjsError>
1005where
1006    R: StreamRepositoryGat + Send + Sync + 'static,
1007    P: EventPublisherGat + Send + Sync + 'static,
1008    S: StreamStoreGat + Send + Sync + 'static,
1009{
1010    let query = GetSystemStatsQuery {
1011        include_historical: false,
1012    };
1013
1014    let response = <SystemQueryHandler<R> as QueryHandlerGat<GetSystemStatsQuery>>::handle(
1015        &*state.system_handler,
1016        query,
1017    )
1018    .await
1019    .map_err(PjsError::Application)?;
1020
1021    Ok(Json(response))
1022}
1023
1024/// Query parameters for frame listing
1025#[derive(Debug, Deserialize)]
1026pub struct FrameQueryParams {
1027    /// Return only frames whose sequence number is greater than this value.
1028    pub since_sequence: Option<u64>,
1029    /// Return only frames whose priority satisfies this filter.
1030    pub priority: Option<u8>,
1031    /// Maximum number of frames to return.
1032    pub limit: Option<usize>,
1033}
1034
1035/// Get frames for a stream (currently returns empty; no persistent frame store exists yet)
1036async fn get_stream_frames<R, P, S>(
1037    State(state): State<PjsAppState<R, P, S>>,
1038    AxumPath((session_id, stream_id)): AxumPath<(String, String)>,
1039    Query(params): Query<FrameQueryParams>,
1040) -> Result<Json<FramesResponse>, PjsError>
1041where
1042    R: StreamRepositoryGat + Send + Sync + 'static,
1043    P: EventPublisherGat + Send + Sync + 'static,
1044    S: StreamStoreGat + Send + Sync + 'static,
1045{
1046    let session_id = SessionId::from_string(&session_id)
1047        .map_err(|_| PjsError::InvalidSessionId(session_id.clone()))?;
1048    let stream_id =
1049        StreamId::from_string(&stream_id).map_err(|_| PjsError::InvalidStreamId(stream_id))?;
1050
1051    let priority_filter = params
1052        .priority
1053        .map(|p| Priority::new(p).map(Into::into))
1054        .transpose()
1055        .map_err(|e: crate::domain::DomainError| PjsError::InvalidPriority(e.to_string()))?;
1056
1057    let query = GetStreamFramesQuery {
1058        session_id: session_id.into(),
1059        stream_id: stream_id.into(),
1060        since_sequence: params.since_sequence,
1061        priority_filter,
1062        limit: params.limit,
1063    };
1064
1065    let response = <StreamQueryHandler<R, S, InMemoryFrameStore> as QueryHandlerGat<
1066        GetStreamFramesQuery,
1067    >>::handle(&*state.stream_query_handler, query)
1068    .await
1069    .map_err(PjsError::Application)?;
1070
1071    Ok(Json(response))
1072}
1073
1074/// Get statistics for a session
1075async fn get_session_stats<R, P, S>(
1076    State(state): State<PjsAppState<R, P, S>>,
1077    AxumPath(session_id): AxumPath<String>,
1078) -> Result<Json<SessionStatsResponse>, PjsError>
1079where
1080    R: StreamRepositoryGat + Send + Sync + 'static,
1081    P: EventPublisherGat + Send + Sync + 'static,
1082    S: StreamStoreGat + Send + Sync + 'static,
1083{
1084    let session_id =
1085        SessionId::from_string(&session_id).map_err(|_| PjsError::InvalidSessionId(session_id))?;
1086
1087    let query = GetSessionStatsQuery {
1088        session_id: session_id.into(),
1089    };
1090
1091    let response = <SessionQueryHandler<R> as QueryHandlerGat<GetSessionStatsQuery>>::handle(
1092        &*state.session_query_handler,
1093        query,
1094    )
1095    .await
1096    .map_err(PjsError::Application)?;
1097
1098    Ok(Json(response))
1099}
1100
1101// TODO(CQ-004): Implement HTTP rate limiting middleware
1102//
1103// Recommended implementation:
1104// - Add Arc<WebSocketRateLimiter> to PjsAppState
1105// - Use 100 requests/minute per IP with burst allowance
1106// - Extract IP from ConnectInfo<SocketAddr>
1107// - Return 429 Too Many Requests on limit exceeded
1108//
1109// Example:
1110// ```ignore
1111// async fn rate_limit_middleware(
1112//     State(limiter): State<Arc<WebSocketRateLimiter>>,
1113//     ConnectInfo(addr): ConnectInfo<SocketAddr>,
1114//     req: Request,
1115//     next: Next,
1116// ) -> Result<Response, StatusCode> {
1117//     limiter.check_request(addr.ip())
1118//         .map_err(|_| StatusCode::TOO_MANY_REQUESTS)?;
1119//     Ok(next.run(req).await)
1120// }
1121// ```
1122/// PJS-specific errors for HTTP endpoints
1123#[derive(Debug, thiserror::Error)]
1124pub enum PjsError {
1125    /// Wraps an application-layer error returned by a CQRS handler.
1126    #[error("Application error: {0}")]
1127    Application(#[from] crate::application::ApplicationError),
1128
1129    /// Provided session identifier is malformed or not a valid UUID.
1130    #[error("Invalid session ID: {0}")]
1131    InvalidSessionId(String),
1132
1133    /// Provided stream identifier is malformed or not a valid UUID.
1134    #[error("Invalid stream ID: {0}")]
1135    InvalidStreamId(String),
1136
1137    /// Priority value is out of range or otherwise invalid.
1138    #[error("Invalid priority: {0}")]
1139    InvalidPriority(String),
1140
1141    /// Generic HTTP-layer error not covered by other variants.
1142    #[error("HTTP error: {0}")]
1143    HttpError(String),
1144}
1145
1146impl IntoResponse for PjsError {
1147    fn into_response(self) -> Response {
1148        let (status, error_message) = match &self {
1149            PjsError::Application(app_err) => {
1150                use crate::application::ApplicationError;
1151                let status = match app_err {
1152                    ApplicationError::NotFound(_) => StatusCode::NOT_FOUND,
1153                    ApplicationError::Validation(_) => StatusCode::BAD_REQUEST,
1154                    ApplicationError::Authorization(_) => StatusCode::UNAUTHORIZED,
1155                    ApplicationError::Concurrency(_) | ApplicationError::Conflict(_) => {
1156                        StatusCode::CONFLICT
1157                    }
1158                    ApplicationError::Domain(_) | ApplicationError::Logic(_) => {
1159                        StatusCode::INTERNAL_SERVER_ERROR
1160                    }
1161                };
1162                (status, self.to_string())
1163            }
1164            PjsError::InvalidSessionId(_) => (StatusCode::BAD_REQUEST, self.to_string()),
1165            PjsError::InvalidStreamId(_) => (StatusCode::BAD_REQUEST, self.to_string()),
1166            PjsError::InvalidPriority(_) => (StatusCode::BAD_REQUEST, self.to_string()),
1167            PjsError::HttpError(_) => (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()),
1168        };
1169
1170        let body = Json(serde_json::json!({
1171            "error": error_message
1172        }));
1173
1174        (status, body).into_response()
1175    }
1176}
1177
1178#[cfg(test)]
1179mod tests {
1180    use super::*;
1181
1182    // --- build_cors_layer unit tests ---
1183
1184    #[test]
1185    fn cors_empty_origins_denies_all() {
1186        let config = HttpServerConfig {
1187            allowed_origins: vec![],
1188        };
1189        // Empty list must succeed (returns a layer that denies all origins).
1190        let result = build_cors_layer(&config);
1191        assert!(
1192            result.is_ok(),
1193            "empty origins should return Ok (deny-all layer)"
1194        );
1195    }
1196
1197    #[test]
1198    fn cors_wildcard_only_is_ok() {
1199        let config = HttpServerConfig {
1200            allowed_origins: vec!["*".to_string()],
1201        };
1202        let result = build_cors_layer(&config);
1203        assert!(result.is_ok(), "wildcard-only should return Ok");
1204    }
1205
1206    #[test]
1207    fn cors_mixed_wildcard_and_explicit_is_err() {
1208        let config = HttpServerConfig {
1209            allowed_origins: vec!["*".to_string(), "http://example.com".to_string()],
1210        };
1211        let result = build_cors_layer(&config);
1212        assert!(
1213            result.is_err(),
1214            "mixing wildcard with explicit origins must fail"
1215        );
1216        let msg = result.unwrap_err().to_string();
1217        assert!(
1218            msg.contains("wildcard"),
1219            "error message should mention wildcard: {msg}"
1220        );
1221    }
1222
1223    #[test]
1224    fn cors_valid_single_origin_is_ok() {
1225        let config = HttpServerConfig {
1226            allowed_origins: vec!["http://example.com".to_string()],
1227        };
1228        assert!(build_cors_layer(&config).is_ok());
1229    }
1230
1231    #[test]
1232    fn cors_valid_multiple_origins_is_ok() {
1233        let config = HttpServerConfig {
1234            allowed_origins: vec![
1235                "https://app.example.com".to_string(),
1236                "https://admin.example.com".to_string(),
1237            ],
1238        };
1239        assert!(build_cors_layer(&config).is_ok());
1240    }
1241
1242    #[test]
1243    fn cors_invalid_origin_string_is_err() {
1244        let config = HttpServerConfig {
1245            // HeaderValue rejects strings containing control characters / invalid bytes.
1246            allowed_origins: vec!["not a\nvalid header".to_string()],
1247        };
1248        let result = build_cors_layer(&config);
1249        assert!(result.is_err(), "invalid origin string must return Err");
1250    }
1251
1252    #[test]
1253    fn default_config_is_valid() {
1254        // Guarantees that the expect() in create_pjs_router / create_pjs_router_with_rate_limit
1255        // will never panic at runtime.
1256        assert!(
1257            build_cors_layer(&HttpServerConfig::default()).is_ok(),
1258            "default HttpServerConfig must produce a valid CORS layer"
1259        );
1260    }
1261
1262    // --- existing integration tests ---
1263
1264    use crate::domain::{
1265        aggregates::StreamSession,
1266        entities::Stream,
1267        events::DomainEvent,
1268        ports::{
1269            EventPublisherGat, Pagination, PriorityDistribution, SessionHealthSnapshot,
1270            SessionQueryCriteria, SessionQueryResult, StreamFilter, StreamRepositoryGat,
1271            StreamStatistics, StreamStatus, StreamStoreGat,
1272        },
1273        value_objects::{SessionId, StreamId},
1274    };
1275    use chrono::Utc;
1276    use std::collections::HashMap;
1277
1278    struct MockRepository {
1279        sessions: parking_lot::Mutex<HashMap<SessionId, StreamSession>>,
1280    }
1281
1282    impl MockRepository {
1283        fn new() -> Self {
1284            Self {
1285                sessions: parking_lot::Mutex::new(HashMap::new()),
1286            }
1287        }
1288    }
1289
1290    impl StreamRepositoryGat for MockRepository {
1291        type FindSessionFuture<'a>
1292            = impl std::future::Future<Output = crate::domain::DomainResult<Option<StreamSession>>>
1293            + Send
1294            + 'a
1295        where
1296            Self: 'a;
1297
1298        type SaveSessionFuture<'a>
1299            = impl std::future::Future<Output = crate::domain::DomainResult<()>> + Send + 'a
1300        where
1301            Self: 'a;
1302
1303        type RemoveSessionFuture<'a>
1304            = impl std::future::Future<Output = crate::domain::DomainResult<()>> + Send + 'a
1305        where
1306            Self: 'a;
1307
1308        type FindActiveSessionsFuture<'a>
1309            = impl std::future::Future<Output = crate::domain::DomainResult<Vec<StreamSession>>>
1310            + Send
1311            + 'a
1312        where
1313            Self: 'a;
1314
1315        type FindSessionsByCriteriaFuture<'a>
1316            = impl std::future::Future<Output = crate::domain::DomainResult<SessionQueryResult>>
1317            + Send
1318            + 'a
1319        where
1320            Self: 'a;
1321
1322        type GetSessionHealthFuture<'a>
1323            = impl std::future::Future<Output = crate::domain::DomainResult<SessionHealthSnapshot>>
1324            + Send
1325            + 'a
1326        where
1327            Self: 'a;
1328
1329        type SessionExistsFuture<'a>
1330            = impl std::future::Future<Output = crate::domain::DomainResult<bool>> + Send + 'a
1331        where
1332            Self: 'a;
1333
1334        fn find_session(&self, session_id: SessionId) -> Self::FindSessionFuture<'_> {
1335            async move { Ok(self.sessions.lock().get(&session_id).cloned()) }
1336        }
1337
1338        fn save_session(&self, session: StreamSession) -> Self::SaveSessionFuture<'_> {
1339            async move {
1340                self.sessions.lock().insert(session.id(), session);
1341                Ok(())
1342            }
1343        }
1344
1345        fn remove_session(&self, session_id: SessionId) -> Self::RemoveSessionFuture<'_> {
1346            async move {
1347                self.sessions.lock().remove(&session_id);
1348                Ok(())
1349            }
1350        }
1351
1352        fn find_active_sessions(&self) -> Self::FindActiveSessionsFuture<'_> {
1353            async move { Ok(self.sessions.lock().values().cloned().collect()) }
1354        }
1355
1356        fn find_sessions_by_criteria(
1357            &self,
1358            _criteria: SessionQueryCriteria,
1359            pagination: Pagination,
1360        ) -> Self::FindSessionsByCriteriaFuture<'_> {
1361            async move {
1362                let sessions: Vec<_> = self.sessions.lock().values().cloned().collect();
1363                let total_count = sessions.len();
1364                let paginated: Vec<_> = sessions
1365                    .into_iter()
1366                    .skip(pagination.offset)
1367                    .take(pagination.limit)
1368                    .collect();
1369                let has_more = pagination.offset + paginated.len() < total_count;
1370                Ok(SessionQueryResult {
1371                    sessions: paginated,
1372                    total_count,
1373                    has_more,
1374                    query_duration_ms: 0,
1375                    scan_limit_reached: false,
1376                })
1377            }
1378        }
1379
1380        fn get_session_health(&self, session_id: SessionId) -> Self::GetSessionHealthFuture<'_> {
1381            async move {
1382                Ok(SessionHealthSnapshot {
1383                    session_id,
1384                    is_healthy: true,
1385                    active_streams: 0,
1386                    total_frames: 0,
1387                    last_activity: Utc::now(),
1388                    error_rate: 0.0,
1389                    metrics: HashMap::new(),
1390                })
1391            }
1392        }
1393
1394        fn session_exists(&self, session_id: SessionId) -> Self::SessionExistsFuture<'_> {
1395            async move { Ok(self.sessions.lock().contains_key(&session_id)) }
1396        }
1397    }
1398
1399    struct MockEventPublisher;
1400
1401    impl EventPublisherGat for MockEventPublisher {
1402        type PublishFuture<'a>
1403            = impl std::future::Future<Output = crate::domain::DomainResult<()>> + Send + 'a
1404        where
1405            Self: 'a;
1406
1407        type PublishBatchFuture<'a>
1408            = impl std::future::Future<Output = crate::domain::DomainResult<()>> + Send + 'a
1409        where
1410            Self: 'a;
1411
1412        fn publish(&self, _event: DomainEvent) -> Self::PublishFuture<'_> {
1413            async move { Ok(()) }
1414        }
1415
1416        fn publish_batch(&self, _events: Vec<DomainEvent>) -> Self::PublishBatchFuture<'_> {
1417            async move { Ok(()) }
1418        }
1419    }
1420
1421    struct MockStreamStore;
1422
1423    impl StreamStoreGat for MockStreamStore {
1424        type StoreStreamFuture<'a>
1425            = impl std::future::Future<Output = crate::domain::DomainResult<()>> + Send + 'a
1426        where
1427            Self: 'a;
1428
1429        type GetStreamFuture<'a>
1430            = impl std::future::Future<Output = crate::domain::DomainResult<Option<Stream>>>
1431            + Send
1432            + 'a
1433        where
1434            Self: 'a;
1435
1436        type DeleteStreamFuture<'a>
1437            = impl std::future::Future<Output = crate::domain::DomainResult<()>> + Send + 'a
1438        where
1439            Self: 'a;
1440
1441        type ListStreamsForSessionFuture<'a>
1442            =
1443            impl std::future::Future<Output = crate::domain::DomainResult<Vec<Stream>>> + Send + 'a
1444        where
1445            Self: 'a;
1446
1447        type FindStreamsBySessionFuture<'a>
1448            =
1449            impl std::future::Future<Output = crate::domain::DomainResult<Vec<Stream>>> + Send + 'a
1450        where
1451            Self: 'a;
1452
1453        type UpdateStreamStatusFuture<'a>
1454            = impl std::future::Future<Output = crate::domain::DomainResult<()>> + Send + 'a
1455        where
1456            Self: 'a;
1457
1458        type GetStreamStatisticsFuture<'a>
1459            = impl std::future::Future<Output = crate::domain::DomainResult<StreamStatistics>>
1460            + Send
1461            + 'a
1462        where
1463            Self: 'a;
1464
1465        fn store_stream(&self, _stream: Stream) -> Self::StoreStreamFuture<'_> {
1466            async move { Ok(()) }
1467        }
1468
1469        fn get_stream(&self, _stream_id: StreamId) -> Self::GetStreamFuture<'_> {
1470            async move { Ok(None) }
1471        }
1472
1473        fn delete_stream(&self, _stream_id: StreamId) -> Self::DeleteStreamFuture<'_> {
1474            async move { Ok(()) }
1475        }
1476
1477        fn list_streams_for_session(
1478            &self,
1479            _session_id: SessionId,
1480        ) -> Self::ListStreamsForSessionFuture<'_> {
1481            async move { Ok(vec![]) }
1482        }
1483
1484        fn find_streams_by_session(
1485            &self,
1486            _session_id: SessionId,
1487            _filter: StreamFilter,
1488        ) -> Self::FindStreamsBySessionFuture<'_> {
1489            async move { Ok(vec![]) }
1490        }
1491
1492        fn update_stream_status(
1493            &self,
1494            _stream_id: StreamId,
1495            _status: StreamStatus,
1496        ) -> Self::UpdateStreamStatusFuture<'_> {
1497            async move { Ok(()) }
1498        }
1499
1500        fn get_stream_statistics(
1501            &self,
1502            _stream_id: StreamId,
1503        ) -> Self::GetStreamStatisticsFuture<'_> {
1504            async move {
1505                Ok(StreamStatistics {
1506                    total_frames: 0,
1507                    total_bytes: 0,
1508                    priority_distribution: PriorityDistribution::default(),
1509                    avg_frame_size: 0.0,
1510                    creation_time: Utc::now(),
1511                    completion_time: None,
1512                    processing_duration: None,
1513                })
1514            }
1515        }
1516    }
1517
1518    #[tokio::test]
1519    async fn test_system_health() {
1520        let response = system_health().await;
1521        let health_data: serde_json::Value = response.0;
1522
1523        assert_eq!(health_data["status"], "healthy");
1524        assert!(!health_data["features"].as_array().unwrap().is_empty());
1525    }
1526
1527    #[tokio::test]
1528    async fn test_app_state_creation() {
1529        let repository = Arc::new(MockRepository::new());
1530        let event_publisher = Arc::new(MockEventPublisher);
1531        let stream_store = Arc::new(MockStreamStore);
1532
1533        let _state = PjsAppState::new(repository, event_publisher, stream_store);
1534    }
1535
1536    #[tokio::test]
1537    async fn test_get_system_stats_returns_real_uptime() {
1538        use crate::application::handlers::QueryHandlerGat;
1539        use crate::application::handlers::query_handlers::SystemQueryHandler;
1540        use crate::application::queries::GetSystemStatsQuery;
1541        use std::time::{Duration, Instant};
1542
1543        let repository = Arc::new(MockRepository::new());
1544        // Simulate a handler that started 5 seconds ago.
1545        let started_at = Instant::now() - Duration::from_secs(5);
1546        let handler = SystemQueryHandler::with_start_time(repository, started_at);
1547
1548        let query = GetSystemStatsQuery {
1549            include_historical: false,
1550        };
1551        let result = QueryHandlerGat::handle(&handler, query).await.unwrap();
1552
1553        // uptime must reflect the real elapsed time, not a hard-coded value.
1554        assert!(
1555            result.uptime_seconds >= 5,
1556            "uptime_seconds should be at least 5, got {}",
1557            result.uptime_seconds
1558        );
1559        // Must not be the old placeholder value (3600).
1560        assert_ne!(
1561            result.uptime_seconds, 3600,
1562            "uptime_seconds must not be the hard-coded placeholder 3600"
1563        );
1564    }
1565
1566    #[cfg(feature = "metrics")]
1567    #[tokio::test]
1568    async fn test_metrics_endpoint_returns_prometheus_format() {
1569        use crate::infrastructure::http::metrics::install_global_recorder;
1570
1571        // Install the recorder and verify the handle renders text/plain output.
1572        let handle = install_global_recorder().expect("recorder install should succeed");
1573        let rendered = handle.render();
1574        // Prometheus text format: empty registry produces an empty string or
1575        // comment lines; never a JSON error body.
1576        assert!(
1577            !rendered.contains("{\"error\""),
1578            "rendered metrics should not be a JSON error: {rendered}"
1579        );
1580
1581        // Calling again must be idempotent.
1582        let handle2 = install_global_recorder().expect("second call must not fail");
1583        assert_eq!(
1584            handle.render(),
1585            handle2.render(),
1586            "both handles must render the same metrics"
1587        );
1588    }
1589
1590    #[cfg(feature = "metrics")]
1591    #[test]
1592    fn test_metrics_router_has_metrics_route() {
1593        // Verify that the router includes /metrics by exercising the route builder.
1594        // We check this at compile time through the feature-gated code path.
1595        let _router =
1596            create_pjs_router_with_config::<MockRepository, MockEventPublisher, MockStreamStore>(
1597                &HttpServerConfig::default(),
1598            )
1599            .expect("router should build successfully with metrics feature");
1600    }
1601
1602    #[tokio::test]
1603    async fn search_sessions_route_returns_ok() {
1604        use axum::http::Request;
1605        use tower::ServiceExt;
1606
1607        let repository = Arc::new(MockRepository::new());
1608        let event_publisher = Arc::new(MockEventPublisher);
1609        let stream_store = Arc::new(MockStreamStore);
1610        let state = PjsAppState::new(repository, event_publisher, stream_store);
1611
1612        let router =
1613            create_pjs_router_with_config::<MockRepository, MockEventPublisher, MockStreamStore>(
1614                &HttpServerConfig::default(),
1615            )
1616            .expect("router should build")
1617            .with_state(state);
1618
1619        let req = Request::builder()
1620            .uri("/pjs/sessions/search")
1621            .body(axum::body::Body::empty())
1622            .unwrap();
1623
1624        let resp = router.oneshot(req).await.unwrap();
1625        assert_eq!(resp.status(), StatusCode::OK);
1626    }
1627
1628    /// End-to-end HTTP smoke test for the frame-generation route added in issue #230.
1629    ///
1630    /// Drives `create-session → create-stream → start-stream → generate-frames`
1631    /// over the real Axum router and asserts each step succeeds. After issue
1632    /// #232 implemented `Stream::extract_patches` and `batch_patches_into_frames`,
1633    /// the route now produces frames for non-empty source data — the assertion
1634    /// `frame_count > 0` verifies the full chain end-to-end.
1635    #[tokio::test]
1636    async fn generate_frames_route_dispatches_command_end_to_end() {
1637        use axum::body::to_bytes;
1638        use axum::http::{Method, Request};
1639        use tower::ServiceExt;
1640
1641        let repository = Arc::new(MockRepository::new());
1642        let event_publisher = Arc::new(MockEventPublisher);
1643        let stream_store = Arc::new(MockStreamStore);
1644        let state = PjsAppState::new(repository, event_publisher, stream_store);
1645
1646        let router =
1647            create_pjs_router_with_config::<MockRepository, MockEventPublisher, MockStreamStore>(
1648                &HttpServerConfig::default(),
1649            )
1650            .expect("router should build")
1651            .with_state(state);
1652
1653        let create_session = Request::builder()
1654            .method(Method::POST)
1655            .uri("/pjs/sessions")
1656            .header(header::CONTENT_TYPE, "application/json")
1657            .body(axum::body::Body::from("{}"))
1658            .unwrap();
1659        let resp = router.clone().oneshot(create_session).await.unwrap();
1660        assert_eq!(resp.status(), StatusCode::OK);
1661        let body = to_bytes(resp.into_body(), usize::MAX).await.unwrap();
1662        let session: serde_json::Value = serde_json::from_slice(&body).unwrap();
1663        let session_id = session["session_id"].as_str().unwrap().to_string();
1664
1665        let create_stream = Request::builder()
1666            .method(Method::POST)
1667            .uri(format!("/pjs/sessions/{session_id}/streams"))
1668            .header(header::CONTENT_TYPE, "application/json")
1669            .body(axum::body::Body::from(
1670                serde_json::json!({ "data": { "items": [1, 2, 3] } }).to_string(),
1671            ))
1672            .unwrap();
1673        let resp = router.clone().oneshot(create_stream).await.unwrap();
1674        assert_eq!(resp.status(), StatusCode::OK);
1675        let body = to_bytes(resp.into_body(), usize::MAX).await.unwrap();
1676        let stream: serde_json::Value = serde_json::from_slice(&body).unwrap();
1677        let stream_id = stream["stream_id"].as_str().unwrap().to_string();
1678
1679        let start = Request::builder()
1680            .method(Method::POST)
1681            .uri(format!(
1682                "/pjs/sessions/{session_id}/streams/{stream_id}/start"
1683            ))
1684            .body(axum::body::Body::empty())
1685            .unwrap();
1686        let resp = router.clone().oneshot(start).await.unwrap();
1687        assert_eq!(resp.status(), StatusCode::OK);
1688
1689        let generate = Request::builder()
1690            .method(Method::POST)
1691            .uri(format!(
1692                "/pjs/sessions/{session_id}/streams/{stream_id}/generate-frames"
1693            ))
1694            .header(header::CONTENT_TYPE, "application/json")
1695            .body(axum::body::Body::from(
1696                serde_json::json!({ "max_frames": 4 }).to_string(),
1697            ))
1698            .unwrap();
1699        let resp = router.oneshot(generate).await.unwrap();
1700        assert_eq!(
1701            resp.status(),
1702            StatusCode::OK,
1703            "POST .../generate-frames must be reachable end-to-end"
1704        );
1705        let body = to_bytes(resp.into_body(), usize::MAX).await.unwrap();
1706        let payload: serde_json::Value = serde_json::from_slice(&body).unwrap();
1707        assert!(payload["frames"].is_array(), "response must carry frames[]");
1708        let frame_count = payload["frame_count"]
1709            .as_u64()
1710            .expect("response must carry numeric frame_count");
1711        assert!(
1712            frame_count > 0,
1713            "extract_patches must yield at least one patch frame for `{{\"items\": [1,2,3]}}` \
1714             — frame_count was {frame_count}"
1715        );
1716    }
1717
1718    /// End-to-end dictionary path: drive `generate-frames` enough times to
1719    /// cross the `N_TRAIN` threshold, then assert the dictionary endpoint
1720    /// transitions from `404 Not Found` to `200 OK`. This is the chain that
1721    /// issues #224, #230, and #232 together claim to deliver.
1722    #[cfg(all(feature = "compression", not(target_arch = "wasm32")))]
1723    #[tokio::test]
1724    async fn dictionary_endpoint_becomes_reachable_after_training() {
1725        use crate::compression::zstd::N_TRAIN;
1726        use crate::infrastructure::repositories::InMemoryDictionaryStore;
1727        use crate::security::CompressionBombDetector;
1728        use axum::body::to_bytes;
1729        use axum::http::{Method, Request};
1730        use tower::ServiceExt;
1731
1732        let repository = Arc::new(MockRepository::new());
1733        let event_publisher = Arc::new(MockEventPublisher);
1734        let stream_store = Arc::new(MockStreamStore);
1735        let dictionary_store = Arc::new(InMemoryDictionaryStore::new(
1736            Arc::new(CompressionBombDetector::default()),
1737            64 * 1024,
1738        ));
1739        let state = PjsAppState::with_dictionary_store(
1740            repository,
1741            event_publisher,
1742            stream_store,
1743            dictionary_store,
1744        );
1745
1746        let router =
1747            create_pjs_router_with_config::<MockRepository, MockEventPublisher, MockStreamStore>(
1748                &HttpServerConfig::default(),
1749            )
1750            .expect("router should build")
1751            .with_state(state);
1752
1753        let create_session = Request::builder()
1754            .method(Method::POST)
1755            .uri("/pjs/sessions")
1756            .header(header::CONTENT_TYPE, "application/json")
1757            .body(axum::body::Body::from("{}"))
1758            .unwrap();
1759        let resp = router.clone().oneshot(create_session).await.unwrap();
1760        assert_eq!(resp.status(), StatusCode::OK);
1761        let body = to_bytes(resp.into_body(), usize::MAX).await.unwrap();
1762        let session: serde_json::Value = serde_json::from_slice(&body).unwrap();
1763        let session_id = session["session_id"].as_str().unwrap().to_string();
1764
1765        // Source data with N_TRAIN+ leaf patches keeps the test self-contained:
1766        // a single generate-frames call yields enough samples to cross the
1767        // training threshold.
1768        let mut payload = serde_json::Map::new();
1769        for i in 0..(N_TRAIN + 4) {
1770            payload.insert(
1771                format!("field_{i}"),
1772                serde_json::Value::String(format!("value_{i}")),
1773            );
1774        }
1775        let create_stream = Request::builder()
1776            .method(Method::POST)
1777            .uri(format!("/pjs/sessions/{session_id}/streams"))
1778            .header(header::CONTENT_TYPE, "application/json")
1779            .body(axum::body::Body::from(
1780                serde_json::json!({ "data": serde_json::Value::Object(payload) }).to_string(),
1781            ))
1782            .unwrap();
1783        let resp = router.clone().oneshot(create_stream).await.unwrap();
1784        assert_eq!(resp.status(), StatusCode::OK);
1785        let body = to_bytes(resp.into_body(), usize::MAX).await.unwrap();
1786        let stream: serde_json::Value = serde_json::from_slice(&body).unwrap();
1787        let stream_id = stream["stream_id"].as_str().unwrap().to_string();
1788
1789        let start = Request::builder()
1790            .method(Method::POST)
1791            .uri(format!(
1792                "/pjs/sessions/{session_id}/streams/{stream_id}/start"
1793            ))
1794            .body(axum::body::Body::empty())
1795            .unwrap();
1796        let resp = router.clone().oneshot(start).await.unwrap();
1797        assert_eq!(resp.status(), StatusCode::OK);
1798
1799        // Before training: the dictionary endpoint must be 404.
1800        let dict_before = Request::builder()
1801            .method(Method::GET)
1802            .uri(format!("/pjs/sessions/{session_id}/dictionary"))
1803            .body(axum::body::Body::empty())
1804            .unwrap();
1805        let resp = router.clone().oneshot(dict_before).await.unwrap();
1806        assert_eq!(
1807            resp.status(),
1808            StatusCode::NOT_FOUND,
1809            "dictionary endpoint must be 404 before N_TRAIN samples accumulate"
1810        );
1811
1812        // Generate enough frames to cross N_TRAIN. With max_frames at least
1813        // N_TRAIN+4, every leaf patch lands in its own frame.
1814        let max_frames = N_TRAIN + 4;
1815        let generate = Request::builder()
1816            .method(Method::POST)
1817            .uri(format!(
1818                "/pjs/sessions/{session_id}/streams/{stream_id}/generate-frames"
1819            ))
1820            .header(header::CONTENT_TYPE, "application/json")
1821            .body(axum::body::Body::from(
1822                serde_json::json!({ "max_frames": max_frames }).to_string(),
1823            ))
1824            .unwrap();
1825        let resp = router.clone().oneshot(generate).await.unwrap();
1826        assert_eq!(resp.status(), StatusCode::OK);
1827        let body = to_bytes(resp.into_body(), usize::MAX).await.unwrap();
1828        let payload: serde_json::Value = serde_json::from_slice(&body).unwrap();
1829        let frame_count = payload["frame_count"].as_u64().unwrap();
1830        assert!(
1831            frame_count >= N_TRAIN as u64,
1832            "single generate-frames call must yield at least N_TRAIN ({}) frames \
1833             so train_if_ready triggers training; got {frame_count}",
1834            N_TRAIN
1835        );
1836
1837        // After training: the dictionary endpoint must be 200.
1838        let dict_after = Request::builder()
1839            .method(Method::GET)
1840            .uri(format!("/pjs/sessions/{session_id}/dictionary"))
1841            .body(axum::body::Body::empty())
1842            .unwrap();
1843        let resp = router.oneshot(dict_after).await.unwrap();
1844        assert_eq!(
1845            resp.status(),
1846            StatusCode::OK,
1847            "dictionary endpoint must transition to 200 OK once N_TRAIN samples have been fed"
1848        );
1849        let body = to_bytes(resp.into_body(), usize::MAX).await.unwrap();
1850        assert!(
1851            !body.is_empty(),
1852            "trained dictionary body must be non-empty"
1853        );
1854    }
1855
1856    /// `priority_threshold = 0` is invalid per `Priority::new` — the route
1857    /// must reject the request with `400 Bad Request` rather than reaching
1858    /// the command handler.
1859    #[tokio::test]
1860    async fn generate_frames_route_rejects_invalid_priority() {
1861        use axum::http::{Method, Request};
1862        use tower::ServiceExt;
1863
1864        let repository = Arc::new(MockRepository::new());
1865        let event_publisher = Arc::new(MockEventPublisher);
1866        let stream_store = Arc::new(MockStreamStore);
1867        let state = PjsAppState::new(repository, event_publisher, stream_store);
1868
1869        let router =
1870            create_pjs_router_with_config::<MockRepository, MockEventPublisher, MockStreamStore>(
1871                &HttpServerConfig::default(),
1872            )
1873            .expect("router should build")
1874            .with_state(state);
1875
1876        let sid = SessionId::new();
1877        let stream_id = StreamId::new();
1878        let req = Request::builder()
1879            .method(Method::POST)
1880            .uri(format!(
1881                "/pjs/sessions/{sid}/streams/{stream_id}/generate-frames"
1882            ))
1883            .header(header::CONTENT_TYPE, "application/json")
1884            .body(axum::body::Body::from(
1885                serde_json::json!({ "priority_threshold": 0 }).to_string(),
1886            ))
1887            .unwrap();
1888        let resp = router.oneshot(req).await.unwrap();
1889        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1890    }
1891}