Skip to main content

reddb_server/
server.rs

1//! Minimal HTTP server for RedDB management and remote access.
2
3pub(crate) use crate::application::json_input::{
4    json_bool_field, json_f32_field, json_string_field, json_usize_field,
5};
6pub(crate) use crate::application::{
7    AdminUseCases, CatalogUseCases, CreateDocumentInput, CreateEdgeInput, CreateEntityOutput,
8    CreateKvInput, CreateNodeEmbeddingInput, CreateNodeGraphLinkInput, CreateNodeInput,
9    CreateNodeTableLinkInput, CreateRowInput, CreateVectorInput, DeleteEntityInput, EntityUseCases,
10    ExecuteQueryInput, ExplainQueryInput, GraphCentralityInput, GraphClusteringInput,
11    GraphCommunitiesInput, GraphComponentsInput, GraphCyclesInput, GraphHitsInput,
12    GraphNeighborhoodInput, GraphPersonalizedPageRankInput, GraphShortestPathInput,
13    GraphTopologicalSortInput, GraphTraversalInput, GraphUseCases, InspectNativeArtifactInput,
14    NativeUseCases, PatchEntityInput, PatchEntityOperation, PatchEntityOperationType,
15    QueryUseCases, SearchHybridInput, SearchIvfInput, SearchMultimodalInput, SearchSimilarInput,
16    SearchTextInput, TreeUseCases,
17};
18use std::collections::{BTreeMap, HashMap};
19use std::io::{self, Read, Write};
20use std::net::{TcpListener, TcpStream};
21use std::sync::atomic::{AtomicU64, Ordering};
22use std::thread;
23use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
24
25use std::sync::Arc;
26
27use crate::api::{RedDBError, RedDBOptions, RedDBResult};
28use crate::auth::store::AuthStore;
29use crate::catalog::{CatalogModelSnapshot, CollectionDescriptor, CollectionModel, SchemaMode};
30use crate::health::{HealthProvider, HealthReport, HealthState};
31use crate::json::{parse_json, to_vec as json_to_vec, Map, Value as JsonValue};
32use crate::runtime::{
33    RedDBRuntime, RuntimeFilter, RuntimeFilterValue, RuntimeGraphCentralityAlgorithm,
34    RuntimeGraphCentralityResult, RuntimeGraphClusteringResult, RuntimeGraphCommunityAlgorithm,
35    RuntimeGraphCommunityResult, RuntimeGraphComponentsMode, RuntimeGraphComponentsResult,
36    RuntimeGraphCyclesResult, RuntimeGraphDirection, RuntimeGraphHitsResult,
37    RuntimeGraphNeighborhoodResult, RuntimeGraphPathAlgorithm, RuntimeGraphPathResult,
38    RuntimeGraphPattern, RuntimeGraphProjection, RuntimeGraphTopologicalSortResult,
39    RuntimeGraphTraversalResult, RuntimeGraphTraversalStrategy, RuntimeIvfSearchResult,
40    RuntimeQueryWeights, RuntimeStats, ScanCursor, ScanPage,
41};
42use crate::storage::schema::Value;
43use crate::storage::unified::devx::refs::{NodeRef, TableRef, VectorRef};
44use crate::storage::unified::dsl::{MatchComponents, QueryResult as DslQueryResult};
45use crate::storage::unified::{MetadataValue, RefTarget, SparseVector};
46use crate::storage::{CrossRef, EntityData, EntityId, EntityKind, SimilarResult, UnifiedEntity};
47
48fn analytics_job_json(job: &crate::PhysicalAnalyticsJob) -> JsonValue {
49    crate::presentation::admin_json::analytics_job_json(job)
50}
51
52#[cfg(test)]
53mod tests {
54    use super::*;
55    use crate::api::RedDBOptions;
56    use crate::health::HealthReport;
57    use crate::service_cli::{
58        TransportListenerFailure, TransportListenerState, TransportReadiness,
59    };
60
61    #[test]
62    fn server_options_default_http_body_limit_is_32_mib() {
63        assert_eq!(ServerOptions::default().max_body_bytes, 32 * 1024 * 1024);
64    }
65
66    #[test]
67    fn health_json_reports_transport_listeners() {
68        let runtime = RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime");
69        let mut options = ServerOptions::default();
70        options.transport_readiness = TransportReadiness {
71            active: vec![TransportListenerState {
72                transport: "grpc".to_string(),
73                bind_addr: "127.0.0.1:50051".to_string(),
74                explicit: true,
75            }],
76            failed: vec![TransportListenerFailure {
77                transport: "http".to_string(),
78                bind_addr: "127.0.0.1:5055".to_string(),
79                explicit: false,
80                reason: "http listener bind 127.0.0.1:5055: address in use".to_string(),
81            }],
82        };
83        let server = RedDBServer::with_options(runtime, options);
84
85        let payload = server.health_json_with_transport(&HealthReport::healthy());
86        let JsonValue::Object(root) = payload else {
87            panic!("health payload should be an object");
88        };
89        let Some(JsonValue::Object(listeners)) = root.get("transport_listeners") else {
90            panic!("health payload should include transport_listeners");
91        };
92        let Some(JsonValue::Array(active)) = listeners.get("active") else {
93            panic!("transport_listeners.active should be an array");
94        };
95        let Some(JsonValue::Array(failed)) = listeners.get("failed") else {
96            panic!("transport_listeners.failed should be an array");
97        };
98
99        assert_eq!(active.len(), 1);
100        assert_eq!(failed.len(), 1);
101    }
102}
103
104fn graph_projection_json(projection: &crate::PhysicalGraphProjection) -> JsonValue {
105    crate::presentation::admin_json::graph_projection_json(projection)
106}
107
108pub mod handlers_admin;
109mod handlers_ai;
110mod handlers_ai_model_cache;
111mod handlers_auth;
112mod handlers_backup;
113mod handlers_collection_policy;
114mod handlers_ec;
115pub(crate) mod handlers_entity;
116mod handlers_geo;
117mod handlers_graph;
118mod handlers_keyed;
119mod handlers_log;
120mod handlers_metrics;
121mod handlers_ops;
122mod handlers_ops_policy;
123// `pub(crate)` so the RedWire input-stream path (issue #764 / S5)
124// can reuse the canonical S4 INSERT builders / identifier checks
125// (`build_insert_sql`, `is_safe_sql_identifier`) rather than fork
126// the SQL-escaping logic.
127pub(crate) mod handlers_query;
128mod handlers_replication;
129mod handlers_topology;
130mod handlers_vcs;
131mod handlers_vector;
132pub mod header_escape_guard;
133pub mod http_connection_limiter;
134pub mod http_handler_metrics;
135pub mod http_limits;
136pub mod ingest_pipeline;
137pub mod output_stream;
138mod patch_support;
139mod request_body;
140mod request_context;
141mod routing;
142mod serverless_support;
143pub mod tls;
144mod transport;
145
146use self::handlers_ai::*;
147use self::handlers_entity::*;
148use self::handlers_graph::*;
149use self::handlers_keyed::*;
150use self::handlers_metrics::*;
151use self::handlers_ops::*;
152use self::handlers_query::*;
153use self::http_connection_limiter::{
154    HandlerDeadline, HttpConnectionLimiter, MonotonicClock, SystemMonotonicClock,
155};
156use self::http_handler_metrics::{HttpHandlerMetrics, HttpRejectReason, HttpTransport};
157pub use self::http_limits::{
158    HttpLimitsCliInput, HttpLimitsResolved, DEFAULT_HANDLER_TIMEOUT_MS, DEFAULT_RETRY_AFTER_SECS,
159};
160use self::patch_support::*;
161use self::request_body::*;
162use self::routing::*;
163use self::serverless_support::*;
164use self::transport::*;
165
166/// PLAN.md Phase 6.2 — endpoint segregation. A given HTTP listener
167/// can serve either every public surface (`Public`, default) or a
168/// restricted slice (`AdminOnly`, `MetricsOnly`). The route filter at
169/// the top of `route()` consults this so a port bound only to
170/// loopback for admin work won't accidentally hand out DML.
171#[derive(Debug, Clone, Copy, PartialEq, Eq)]
172pub enum ServerSurface {
173    /// Everything routed normally (default — matches v0 behaviour).
174    Public,
175    /// Only `/admin/*`, `/metrics`, and `/health/*`. Other paths
176    /// return 404. Intended for `RED_ADMIN_BIND` operator listeners
177    /// which default to `127.0.0.1`.
178    AdminOnly,
179    /// Only `/metrics` and `/health/*`. Intended for
180    /// `RED_METRICS_BIND` Prometheus scrape ports that may be
181    /// exposed to non-admin networks.
182    MetricsOnly,
183}
184
185#[derive(Debug, Clone)]
186pub struct ServerOptions {
187    pub bind_addr: String,
188    pub max_body_bytes: usize,
189    pub read_timeout_ms: u64,
190    pub write_timeout_ms: u64,
191    pub max_scan_limit: usize,
192    /// Which subset of paths this listener serves. Defaults to
193    /// `Public`. Set to `AdminOnly` / `MetricsOnly` for dedicated
194    /// admin / scrape ports (PLAN.md Phase 6.2).
195    pub surface: ServerSurface,
196    pub transport_readiness: crate::service_cli::TransportReadiness,
197}
198
199pub const DEFAULT_HTTP_MAX_BODY_BYTES: usize = 32 * 1024 * 1024;
200
201impl Default for ServerOptions {
202    fn default() -> Self {
203        Self {
204            bind_addr: "127.0.0.1:5055".to_string(),
205            max_body_bytes: DEFAULT_HTTP_MAX_BODY_BYTES,
206            read_timeout_ms: 5_000,
207            write_timeout_ms: 5_000,
208            max_scan_limit: 1_000,
209            surface: ServerSurface::Public,
210            transport_readiness: crate::service_cli::TransportReadiness::default(),
211        }
212    }
213}
214
215/// Replication state exposed to the HTTP server.
216pub struct ServerReplicationState {
217    pub config: crate::replication::ReplicationConfig,
218    pub primary: Option<crate::replication::primary::PrimaryReplication>,
219}
220
221#[derive(Clone)]
222pub struct RedDBServer {
223    runtime: RedDBRuntime,
224    options: ServerOptions,
225    auth_store: Option<Arc<AuthStore>>,
226    replication: Option<Arc<ServerReplicationState>>,
227    /// Bounded handler-thread admission for the clear-text HTTP accept
228    /// loop (issue #570 slice 1). Cloned with the server; `Clone` of
229    /// `HttpConnectionLimiter` shares an `Arc` so every serve loop on
230    /// the same `RedDBServer` shares one cap.
231    http_limiter: HttpConnectionLimiter,
232    /// Per-handler total-time deadline (issue #570 slice 2). Each
233    /// clear-text handler thread arms a deadline at spawn and bails
234    /// with a best-effort 503 at coarse boundaries between request
235    /// parse, route dispatch, and response write. Hard-coded to 30s
236    /// here; the config knob lands in slice 5.
237    handler_timeout: Duration,
238    /// Monotonic clock the per-handler deadline (issue #621) is armed
239    /// against — the same [`MonotonicClock`] abstraction the limiter
240    /// uses. Production wires [`SystemMonotonicClock`] (real wall time);
241    /// tests inject a fake to drive timeout expiry deterministically
242    /// without `sleep()`. Shared via `Arc` so cloned server handles
243    /// (e.g. `serve_in_background`) read the same clock.
244    handler_clock: Arc<dyn MonotonicClock>,
245    /// Test-only synchronous sleep injected between route dispatch and
246    /// response write so an integration test can simulate a slow
247    /// downstream tripping the deadline. Default 0 (no-op). Shared via
248    /// `Arc` so a cloned `RedDBServer` (e.g. `serve_in_background`)
249    /// observes flips from the originating handle. Production callers
250    /// have no way to set this — the setter is `#[doc(hidden)]`.
251    slow_inject_ms: Arc<AtomicU64>,
252    /// Prometheus metrics for the HTTP handler-thread pool (issue
253    /// #573 slice 4). Records rejections (cap_exhausted /
254    /// handler_timeout) and per-handler duration histograms. Cloned
255    /// with the server via `Arc` so every serve loop on the same
256    /// `RedDBServer` writes to one set of counters.
257    http_metrics: HttpHandlerMetrics,
258    /// `Retry-After` value (seconds) emitted in the limiter's reject
259    /// path (issue #574 slice 5). Pre-rendered into `reject_503_bytes`
260    /// at construction so the hot path is one write+close.
261    retry_after_secs: u64,
262    /// Cached bytes of the limiter's 503 response. Shared via `Arc`
263    /// across cloned server handles so flipping `retry_after_secs`
264    /// via `with_http_limits` propagates to every accept loop.
265    reject_503_bytes: Arc<Vec<u8>>,
266    /// Issue #761 / S2 — process-wide output-stream capacity registry.
267    /// Shared via `Arc` so cloned server handles (e.g.
268    /// `serve_in_background`) all enforce against one set of counters.
269    /// The HTTP NDJSON path acquires through this in
270    /// `try_route_streaming` before invoking the handler; the guard is
271    /// dropped on return so any stream-end path (success / mid-stream
272    /// error / snapshot expiry / panic unwind) releases the slot.
273    pub(crate) stream_capacity: Arc<output_stream::StreamCapacityRegistry>,
274    /// Issue #766 / S7 — resume coordinator ledger. Tracks
275    /// `(snapshot_lsn → opened_at_ms, ttl_ms)` for resume-eligibility
276    /// checks. Shared via `Arc` so cloned server handles see one
277    /// ledger.
278    pub(crate) lease_registry: Arc<output_stream::LeaseRegistry>,
279    /// Issue #807 / PRD #750 — `/query/stream` cursor registry. Holds the
280    /// opaque token → (snapshot pin, TTL, tenant, principal, query) entries
281    /// that let a client resume or reference a streamed read. Shared via
282    /// `Arc` so cloned server handles see one registry.
283    pub(crate) cursor_registry: Arc<output_stream::CursorRegistry>,
284}
285
286/// Default per-handler total-time budget (issue #571 slice 2).
287const DEFAULT_HANDLER_TIMEOUT: Duration = Duration::from_millis(30_000);
288
289#[derive(Debug, Clone, Copy, PartialEq, Eq)]
290enum ServerlessWarmupScope {
291    Indexes,
292    GraphProjections,
293    AnalyticsJobs,
294    NativeArtifacts,
295}
296
297#[derive(Debug, Clone, Copy, PartialEq, Eq)]
298enum DeploymentProfile {
299    Embedded,
300    Server,
301    Serverless,
302}
303
304fn percent_decode_path_segment(input: &str) -> Result<String, String> {
305    let bytes = input.as_bytes();
306    let mut out = Vec::with_capacity(bytes.len());
307    let mut index = 0;
308    while index < bytes.len() {
309        match bytes[index] {
310            b'%' => {
311                if index + 2 >= bytes.len() {
312                    return Err("truncated percent escape".to_string());
313                }
314                let high = hex_value(bytes[index + 1])
315                    .ok_or_else(|| "invalid percent escape".to_string())?;
316                let low = hex_value(bytes[index + 2])
317                    .ok_or_else(|| "invalid percent escape".to_string())?;
318                out.push((high << 4) | low);
319                index += 3;
320            }
321            byte => {
322                out.push(byte);
323                index += 1;
324            }
325        }
326    }
327    String::from_utf8(out).map_err(|_| "path segment is not valid UTF-8".to_string())
328}
329
330fn hex_value(byte: u8) -> Option<u8> {
331    match byte {
332        b'0'..=b'9' => Some(byte - b'0'),
333        b'a'..=b'f' => Some(byte - b'a' + 10),
334        b'A'..=b'F' => Some(byte - b'A' + 10),
335        _ => None,
336    }
337}
338
339#[derive(Debug, Clone)]
340struct ParsedQueryRequest {
341    query: String,
342    entity_types: Option<Vec<String>>,
343    capabilities: Option<Vec<String>>,
344    /// Optional positional `$N` bind parameters (#358). When `Some`, the
345    /// query handler runs the user_params binder before executing.
346    /// Absence preserves the legacy `query`-only behavior.
347    params: Option<Vec<Value>>,
348}
349
350#[derive(Debug, Clone, Copy)]
351enum PatchOperationType {
352    Set,
353    Replace,
354    Unset,
355}
356
357#[derive(Debug, Clone)]
358struct PatchOperation {
359    op: PatchOperationType,
360    path: Vec<String>,
361    value: Option<JsonValue>,
362}
363
364impl RedDBServer {
365    pub fn new(runtime: RedDBRuntime) -> Self {
366        Self::with_options(runtime, ServerOptions::default())
367    }
368
369    pub fn from_database_options(
370        db_options: RedDBOptions,
371        server_options: ServerOptions,
372    ) -> RedDBResult<Self> {
373        let runtime = RedDBRuntime::with_options(db_options)?;
374        Ok(Self::with_options(runtime, server_options))
375    }
376
377    pub fn with_options(runtime: RedDBRuntime, options: ServerOptions) -> Self {
378        Self {
379            runtime,
380            options,
381            auth_store: None,
382            replication: None,
383            http_limiter: HttpConnectionLimiter::with_default_cap(),
384            handler_timeout: DEFAULT_HANDLER_TIMEOUT,
385            handler_clock: Arc::new(SystemMonotonicClock::new()),
386            slow_inject_ms: Arc::new(AtomicU64::new(0)),
387            http_metrics: HttpHandlerMetrics::new(),
388            retry_after_secs: DEFAULT_RETRY_AFTER_SECS,
389            reject_503_bytes: Arc::new(build_reject_503_bytes(DEFAULT_RETRY_AFTER_SECS)),
390            stream_capacity: output_stream::StreamCapacityRegistry::new(),
391            lease_registry: output_stream::LeaseRegistry::new(),
392            cursor_registry: output_stream::CursorRegistry::new(),
393        }
394    }
395
396    #[doc(hidden)]
397    pub fn stream_capacity(&self) -> &Arc<output_stream::StreamCapacityRegistry> {
398        &self.stream_capacity
399    }
400
401    #[doc(hidden)]
402    pub fn lease_registry(&self) -> &Arc<output_stream::LeaseRegistry> {
403        &self.lease_registry
404    }
405
406    #[doc(hidden)]
407    pub fn cursor_registry(&self) -> &Arc<output_stream::CursorRegistry> {
408        &self.cursor_registry
409    }
410
411    #[doc(hidden)]
412    pub fn http_metrics(&self) -> &HttpHandlerMetrics {
413        &self.http_metrics
414    }
415
416    /// Visible for tests. Lets the integration test in
417    /// `tests/http_connection_limiter.rs` saturate the cap and observe
418    /// `503 Service Unavailable` responses without spinning up
419    /// thousands of sockets.
420    #[doc(hidden)]
421    pub fn with_http_limiter_cap(mut self, cap: usize) -> Self {
422        self.http_limiter = HttpConnectionLimiter::new(cap);
423        self
424    }
425
426    /// Stamp resolved HTTP limits onto the server (issue #574 slice 5).
427    /// Replaces the limiter cap, the per-handler deadline, and the
428    /// `Retry-After` value used by the limiter's reject path. All
429    /// values are assumed validated by [`http_limits::resolve_http_limits`].
430    pub fn with_http_limits(mut self, limits: HttpLimitsResolved) -> Self {
431        self.http_limiter = HttpConnectionLimiter::new(limits.max_handlers);
432        self.handler_timeout = Duration::from_millis(limits.handler_timeout_ms);
433        self.retry_after_secs = limits.retry_after_secs;
434        self.reject_503_bytes = Arc::new(build_reject_503_bytes(limits.retry_after_secs));
435        self
436    }
437
438    #[doc(hidden)]
439    pub fn retry_after_secs(&self) -> u64 {
440        self.retry_after_secs
441    }
442
443    #[doc(hidden)]
444    pub fn http_limiter(&self) -> &HttpConnectionLimiter {
445        &self.http_limiter
446    }
447
448    /// Visible for tests. Override the per-handler total-time deadline
449    /// (issue #570 slice 2). Default 30s.
450    #[doc(hidden)]
451    pub fn with_handler_timeout(mut self, timeout: Duration) -> Self {
452        self.handler_timeout = timeout;
453        self
454    }
455
456    #[doc(hidden)]
457    pub fn handler_timeout(&self) -> Duration {
458        self.handler_timeout
459    }
460
461    /// Visible for tests. Override the clock the per-handler deadline
462    /// (issue #621) is armed against, so timeout expiry can be driven
463    /// deterministically without real sleeps. Default is the real
464    /// monotonic clock.
465    #[doc(hidden)]
466    pub fn with_handler_clock(mut self, clock: Arc<dyn MonotonicClock>) -> Self {
467        self.handler_clock = clock;
468        self
469    }
470
471    /// Test hook: set a synchronous sleep (in ms) inserted between
472    /// route dispatch and response write. The integration test for
473    /// slice 2 sets a value greater than `handler_timeout` to trip
474    /// the deadline, then resets to 0 to verify recovery. Shared via
475    /// `Arc<AtomicU64>` so cloned server handles see the same flip.
476    #[doc(hidden)]
477    pub fn set_test_slow_inject_ms(&self, ms: u64) {
478        self.slow_inject_ms.store(ms, Ordering::Relaxed);
479    }
480
481    /// Attach an `AuthStore` for HTTP-layer authentication.
482    /// Also injects the store into the runtime so that `Value::Secret`
483    /// auto-encrypt/decrypt can reach the vault AES key.
484    pub fn with_auth(mut self, auth_store: Arc<AuthStore>) -> Self {
485        self.runtime.set_auth_store(Arc::clone(&auth_store));
486        self.auth_store = Some(auth_store);
487        self
488    }
489
490    /// Attach replication state for status and snapshot endpoints.
491    pub fn with_replication(mut self, state: Arc<ServerReplicationState>) -> Self {
492        self.replication = Some(state);
493        self
494    }
495
496    pub fn runtime(&self) -> &RedDBRuntime {
497        &self.runtime
498    }
499
500    pub fn options(&self) -> &ServerOptions {
501        &self.options
502    }
503
504    fn query_use_cases(&self) -> QueryUseCases<'_, RedDBRuntime> {
505        QueryUseCases::new(&self.runtime)
506    }
507
508    fn admin_use_cases(&self) -> AdminUseCases<'_, RedDBRuntime> {
509        AdminUseCases::new(&self.runtime)
510    }
511
512    fn entity_use_cases(&self) -> EntityUseCases<'_, RedDBRuntime> {
513        EntityUseCases::new(&self.runtime)
514    }
515
516    fn catalog_use_cases(&self) -> CatalogUseCases<'_, RedDBRuntime> {
517        CatalogUseCases::new(&self.runtime)
518    }
519
520    fn graph_use_cases(&self) -> GraphUseCases<'_, RedDBRuntime> {
521        GraphUseCases::new(&self.runtime)
522    }
523
524    fn native_use_cases(&self) -> NativeUseCases<'_, RedDBRuntime> {
525        NativeUseCases::new(&self.runtime)
526    }
527
528    fn tree_use_cases(&self) -> TreeUseCases<'_, RedDBRuntime> {
529        TreeUseCases::new(&self.runtime)
530    }
531
532    fn transport_readiness_json(&self) -> JsonValue {
533        let active = self
534            .options
535            .transport_readiness
536            .active
537            .iter()
538            .map(|listener| {
539                let mut object = Map::new();
540                object.insert(
541                    "transport".to_string(),
542                    JsonValue::String(listener.transport.clone()),
543                );
544                object.insert(
545                    "bind_addr".to_string(),
546                    JsonValue::String(listener.bind_addr.clone()),
547                );
548                object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
549                JsonValue::Object(object)
550            })
551            .collect();
552        let failed = self
553            .options
554            .transport_readiness
555            .failed
556            .iter()
557            .map(|listener| {
558                let mut object = Map::new();
559                object.insert(
560                    "transport".to_string(),
561                    JsonValue::String(listener.transport.clone()),
562                );
563                object.insert(
564                    "bind_addr".to_string(),
565                    JsonValue::String(listener.bind_addr.clone()),
566                );
567                object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
568                object.insert(
569                    "reason".to_string(),
570                    JsonValue::String(listener.reason.clone()),
571                );
572                JsonValue::Object(object)
573            })
574            .collect();
575
576        let mut object = Map::new();
577        object.insert("active".to_string(), JsonValue::Array(active));
578        object.insert("failed".to_string(), JsonValue::Array(failed));
579        JsonValue::Object(object)
580    }
581
582    fn handle_grpc_discovery(&self) -> HttpResponse {
583        let mut methods = Map::new();
584        methods.insert(
585            "query".to_string(),
586            JsonValue::String("reddb.v1.RedDB/Query".to_string()),
587        );
588        methods.insert(
589            "batch_query".to_string(),
590            JsonValue::String("reddb.v1.RedDB/BatchQuery".to_string()),
591        );
592        methods.insert(
593            "health".to_string(),
594            JsonValue::String("reddb.v1.RedDB/Health".to_string()),
595        );
596        methods.insert(
597            "prepare".to_string(),
598            JsonValue::String("reddb.v1.RedDB/Prepare".to_string()),
599        );
600        methods.insert(
601            "execute_prepared".to_string(),
602            JsonValue::String("reddb.v1.RedDB/ExecutePrepared".to_string()),
603        );
604
605        let mut examples = Map::new();
606        examples.insert(
607            "query".to_string(),
608            JsonValue::String(
609                "grpcurl -plaintext -d '{\"query\":\"SELECT 1\"}' 127.0.0.1:50051 reddb.v1.RedDB/Query"
610                    .to_string(),
611            ),
612        );
613        examples.insert(
614            "query_with_params".to_string(),
615            JsonValue::String(
616                "grpcurl -plaintext -d '{\"query\":\"SELECT $1 AS value\",\"params\":[{\"intValue\":42}]}' 127.0.0.1:50051 reddb.v1.RedDB/Query"
617                    .to_string(),
618            ),
619        );
620        examples.insert(
621            "health".to_string(),
622            JsonValue::String(
623                "grpcurl -plaintext -d '{}' 127.0.0.1:50051 reddb.v1.RedDB/Health".to_string(),
624            ),
625        );
626
627        let mut object = Map::new();
628        object.insert("ok".to_string(), JsonValue::Bool(true));
629        object.insert(
630            "service".to_string(),
631            JsonValue::String("reddb.v1.RedDB".to_string()),
632        );
633        object.insert(
634            "package".to_string(),
635            JsonValue::String("reddb.v1".to_string()),
636        );
637        object.insert(
638            "proto".to_string(),
639            JsonValue::String("crates/reddb-grpc-proto/proto/reddb.proto".to_string()),
640        );
641        object.insert("methods".to_string(), JsonValue::Object(methods));
642        object.insert("examples".to_string(), JsonValue::Object(examples));
643        object.insert(
644            "transport_listeners".to_string(),
645            self.transport_readiness_json(),
646        );
647        object.insert(
648            "hint".to_string(),
649            JsonValue::String(
650                "If grpcurl cannot list services, pass the proto file with -import-path crates/reddb-grpc-proto/proto -proto reddb.proto."
651                    .to_string(),
652            ),
653        );
654        json_response(200, JsonValue::Object(object))
655    }
656
657    fn handle_query_contract(&self) -> HttpResponse {
658        let mut examples = Map::new();
659        examples.insert(
660            "raw_sql".to_string(),
661            JsonValue::String("curl -sS http://127.0.0.1:8080/query -d 'SELECT 1'".to_string()),
662        );
663        examples.insert(
664            "json_query".to_string(),
665            JsonValue::String(
666                "curl -sS http://127.0.0.1:8080/query -H 'content-type: application/json' -d '{\"query\":\"SELECT 1\"}'"
667                    .to_string(),
668            ),
669        );
670        examples.insert(
671            "json_query_with_params".to_string(),
672            JsonValue::String(
673                "curl -sS http://127.0.0.1:8080/query -H 'content-type: application/json' -d '{\"query\":\"SELECT $1 AS value\",\"params\":[42]}'"
674                    .to_string(),
675            ),
676        );
677
678        let mut request_body = Map::new();
679        request_body.insert(
680            "query".to_string(),
681            JsonValue::String("required string".to_string()),
682        );
683        request_body.insert(
684            "params".to_string(),
685            JsonValue::String("optional array".to_string()),
686        );
687
688        let mut response_shape = Map::new();
689        response_shape.insert(
690            "columns".to_string(),
691            JsonValue::String("projected column names".to_string()),
692        );
693        response_shape.insert(
694            "records[].values".to_string(),
695            JsonValue::String("only projected values".to_string()),
696        );
697        response_shape.insert(
698            "records[].meta".to_string(),
699            JsonValue::String("internal metadata when present".to_string()),
700        );
701
702        let mut object = Map::new();
703        object.insert("ok".to_string(), JsonValue::Bool(false));
704        object.insert(
705            "code".to_string(),
706            JsonValue::String("method_not_allowed".to_string()),
707        );
708        object.insert(
709            "message".to_string(),
710            JsonValue::String("/query accepts POST requests".to_string()),
711        );
712        object.insert(
713            "hint".to_string(),
714            JsonValue::String(
715                "Send raw SQL in the body, or JSON with a string 'query' field.".to_string(),
716            ),
717        );
718        object.insert("method".to_string(), JsonValue::String("POST".to_string()));
719        object.insert("path".to_string(), JsonValue::String("/query".to_string()));
720        object.insert("request_body".to_string(), JsonValue::Object(request_body));
721        object.insert(
722            "response_shape".to_string(),
723            JsonValue::Object(response_shape),
724        );
725        object.insert("examples".to_string(), JsonValue::Object(examples));
726        object.insert(
727            "docs".to_string(),
728            JsonValue::String("https://reddb.io/docs/query".to_string()),
729        );
730
731        json_response(405, JsonValue::Object(object))
732            .with_header("Allow", http::HeaderValue::from_static("POST"))
733    }
734
735    fn handle_root_discovery(&self) -> HttpResponse {
736        let mut endpoints = Map::new();
737        endpoints.insert(
738            "health".to_string(),
739            JsonValue::String("GET /health".to_string()),
740        );
741        endpoints.insert(
742            "ready".to_string(),
743            JsonValue::String("GET /ready".to_string()),
744        );
745        endpoints.insert(
746            "query".to_string(),
747            JsonValue::String("POST /query".to_string()),
748        );
749        endpoints.insert(
750            "query_readiness".to_string(),
751            JsonValue::String("GET /ready/query".to_string()),
752        );
753        endpoints.insert(
754            "catalog".to_string(),
755            JsonValue::String("GET /catalog".to_string()),
756        );
757        endpoints.insert(
758            "deployment_profiles".to_string(),
759            JsonValue::String("GET /deployment/profiles".to_string()),
760        );
761
762        let mut examples = Map::new();
763        examples.insert(
764            "http_raw_sql".to_string(),
765            JsonValue::String("curl -sS http://127.0.0.1:8080/query -d 'SELECT 1'".to_string()),
766        );
767        examples.insert(
768            "http_json_query".to_string(),
769            JsonValue::String(
770                "curl -sS http://127.0.0.1:8080/query -H 'content-type: application/json' -d '{\"query\":\"SELECT 1\"}'"
771                    .to_string(),
772            ),
773        );
774        examples.insert(
775            "http_json_query_with_params".to_string(),
776            JsonValue::String(
777                "curl -sS http://127.0.0.1:8080/query -H 'content-type: application/json' -d '{\"query\":\"SELECT $1 AS value\",\"params\":[42]}'"
778                    .to_string(),
779            ),
780        );
781
782        let mut object = Map::new();
783        object.insert("ok".to_string(), JsonValue::Bool(true));
784        object.insert(
785            "service".to_string(),
786            JsonValue::String("reddb".to_string()),
787        );
788        object.insert(
789            "version".to_string(),
790            JsonValue::String(env!("CARGO_PKG_VERSION").to_string()),
791        );
792        object.insert("endpoints".to_string(), JsonValue::Object(endpoints));
793        object.insert("examples".to_string(), JsonValue::Object(examples));
794        object.insert(
795            "docs".to_string(),
796            JsonValue::String("https://reddb.io/docs".to_string()),
797        );
798        object.insert(
799            "transport_listeners".to_string(),
800            self.transport_readiness_json(),
801        );
802        json_response(200, JsonValue::Object(object))
803    }
804
805    fn health_json_with_transport(&self, report: &HealthReport) -> JsonValue {
806        let mut value = crate::presentation::ops_json::health_json(report);
807        if let JsonValue::Object(ref mut object) = value {
808            object.insert(
809                "transport_listeners".to_string(),
810                self.transport_readiness_json(),
811            );
812        }
813        value
814    }
815
816    pub fn serve(&self) -> io::Result<()> {
817        let listener = TcpListener::bind(&self.options.bind_addr)?;
818        self.serve_on(listener)
819    }
820
821    pub fn serve_on(&self, listener: TcpListener) -> io::Result<()> {
822        for stream in listener.incoming() {
823            match stream {
824                Ok(stream) => match self.http_limiter.try_acquire() {
825                    Some(permit) => {
826                        // Spawn a thread per connection for concurrent request handling
827                        let server = self.clone();
828                        thread::spawn(move || {
829                            let _guard = permit; // released on thread exit
830                            let _ = server.handle_connection(stream);
831                        });
832                    }
833                    None => {
834                        // Cap exhausted: write static 503 inline on the
835                        // accept thread, close the socket, and continue.
836                        // No thread spawn, no `HttpRequest::read_from`,
837                        // no runtime call.
838                        self.http_metrics
839                            .record_reject(HttpTransport::Http, HttpRejectReason::CapExhausted);
840                        self.reject_with_503(stream, self.options.write_timeout_ms);
841                    }
842                },
843                Err(err) => return Err(err),
844            }
845        }
846        Ok(())
847    }
848
849    /// 503 response used when the connection limiter is full. The
850    /// payload is pre-rendered into `reject_503_bytes` at construction
851    /// (issue #574 slice 5: `Retry-After` is configurable), so the
852    /// hot path is still one write and a close.
853    fn reject_with_503(&self, mut stream: TcpStream, write_timeout_ms: u64) {
854        let _ = stream.set_write_timeout(Some(Duration::from_millis(write_timeout_ms)));
855        let _ = stream.write_all(&self.reject_503_bytes);
856        let _ = stream.flush();
857        let _ = stream.shutdown(std::net::Shutdown::Both);
858    }
859
860    pub fn serve_one_on(&self, listener: TcpListener) -> io::Result<()> {
861        let (stream, _) = listener.accept()?;
862        self.handle_connection(stream)
863    }
864
865    pub fn serve_in_background(&self) -> thread::JoinHandle<io::Result<()>> {
866        let server = self.clone();
867        thread::spawn(move || server.serve())
868    }
869
870    pub fn serve_in_background_on(
871        &self,
872        listener: TcpListener,
873    ) -> thread::JoinHandle<io::Result<()>> {
874        let server = self.clone();
875        thread::spawn(move || server.serve_on(listener))
876    }
877
878    /// Serve TLS-wrapped HTTPS on the configured `bind_addr`. The
879    /// `tls_config` is shared across all connections (rustls
880    /// `ServerConfig` is `Send + Sync`).
881    pub fn serve_tls(&self, tls_config: std::sync::Arc<rustls::ServerConfig>) -> io::Result<()> {
882        let listener = TcpListener::bind(&self.options.bind_addr)?;
883        self.serve_tls_on(listener, tls_config)
884    }
885
886    pub fn serve_tls_on(
887        &self,
888        listener: TcpListener,
889        tls_config: std::sync::Arc<rustls::ServerConfig>,
890    ) -> io::Result<()> {
891        for stream in listener.incoming() {
892            match stream {
893                Ok(stream) => match self.http_limiter.try_acquire() {
894                    Some(permit) => {
895                        let server = self.clone();
896                        let cfg = tls_config.clone();
897                        thread::spawn(move || {
898                            let _guard = permit; // released on thread exit
899                            let _ = server.handle_tls_connection(stream, cfg);
900                        });
901                    }
902                    None => {
903                        // Issue #572 slice 3: cross-transport cap shared
904                        // with the clear-text limiter. Writing a 503 over
905                        // a non-handshaken TLS socket is not meaningful,
906                        // so reject by closing the raw socket. No TLS
907                        // handshake, no thread spawn, no runtime call.
908                        self.http_metrics
909                            .record_reject(HttpTransport::Https, HttpRejectReason::CapExhausted);
910                        let _ = stream.shutdown(std::net::Shutdown::Both);
911                        drop(stream);
912                    }
913                },
914                Err(err) => return Err(err),
915            }
916        }
917        Ok(())
918    }
919
920    pub fn serve_tls_in_background(
921        &self,
922        tls_config: std::sync::Arc<rustls::ServerConfig>,
923    ) -> thread::JoinHandle<io::Result<()>> {
924        let server = self.clone();
925        thread::spawn(move || server.serve_tls(tls_config))
926    }
927
928    pub fn serve_tls_in_background_on(
929        &self,
930        listener: TcpListener,
931        tls_config: std::sync::Arc<rustls::ServerConfig>,
932    ) -> thread::JoinHandle<io::Result<()>> {
933        let server = self.clone();
934        thread::spawn(move || server.serve_tls_on(listener, tls_config))
935    }
936
937    fn handle_connection(&self, stream: TcpStream) -> io::Result<()> {
938        let started = Instant::now();
939        let result = self.handle_connection_inner(stream);
940        let elapsed = started.elapsed().as_secs_f64();
941        self.http_metrics
942            .record_duration(HttpTransport::Http, elapsed);
943        result
944    }
945
946    fn handle_connection_inner(&self, mut stream: TcpStream) -> io::Result<()> {
947        stream.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
948        stream.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;
949
950        // Issue #570 slice 2 / #621: arm a deadline at handler spawn and
951        // check at coarse boundaries. Armed against the injectable
952        // monotonic clock (#621) so timeout behaviour is deterministically
953        // testable without real sleeps; production wires the real clock,
954        // so this tracks wall time. No hard pre-emption — a thread blocked
955        // inside a true syscall is still bounded only by the per-socket
956        // read/write timeouts.
957        let deadline = HandlerDeadline::arm(Arc::clone(&self.handler_clock), self.handler_timeout);
958
959        let request = HttpRequest::read_from(&mut stream, self.options.max_body_bytes)?;
960
961        // Boundary (a): between request parse and route dispatch.
962        if deadline.expired() {
963            self.http_metrics
964                .record_reject(HttpTransport::Http, HttpRejectReason::HandlerTimeout);
965            Self::write_handler_timeout_503(&mut stream);
966            return Ok(());
967        }
968
969        if self.try_route_streaming(&request, &mut stream)? {
970            return Ok(());
971        }
972        let response = self.route(request);
973
974        // Test-only injected slow downstream (issue #570 slice 2
975        // integration test). Production builds set this to 0, so this
976        // is a single relaxed atomic load on the hot path.
977        let inject_ms = self.slow_inject_ms.load(Ordering::Relaxed);
978        if inject_ms > 0 {
979            thread::sleep(Duration::from_millis(inject_ms));
980        }
981
982        // Boundary (b): between route dispatch and response write.
983        if deadline.expired() {
984            self.http_metrics
985                .record_reject(HttpTransport::Http, HttpRejectReason::HandlerTimeout);
986            Self::write_handler_timeout_503(&mut stream);
987            return Ok(());
988        }
989
990        stream.write_all(&response.to_http_bytes())?;
991        stream.flush()?;
992        Ok(())
993    }
994
995    /// Best-effort 503 emitted when the per-handler deadline expires
996    /// at a coarse boundary. Writes are swallowed — the caller has
997    /// already exceeded its budget, so we do not propagate write
998    /// errors. Permit drop happens on the handler thread's normal
999    /// exit path.
1000    fn write_handler_timeout_503<S: Write>(stream: &mut S) {
1001        const RESPONSE: &[u8] = b"HTTP/1.1 503 Service Unavailable\r\n\
1002            Connection: close\r\n\
1003            Content-Length: 0\r\n\
1004            \r\n";
1005        let _ = stream.write_all(RESPONSE);
1006        let _ = stream.flush();
1007    }
1008
1009    fn handle_tls_connection(
1010        &self,
1011        tcp: TcpStream,
1012        tls_config: std::sync::Arc<rustls::ServerConfig>,
1013    ) -> io::Result<()> {
1014        let started = Instant::now();
1015        let result = self.handle_tls_connection_inner(tcp, tls_config);
1016        let elapsed = started.elapsed().as_secs_f64();
1017        self.http_metrics
1018            .record_duration(HttpTransport::Https, elapsed);
1019        result
1020    }
1021
1022    fn handle_tls_connection_inner(
1023        &self,
1024        tcp: TcpStream,
1025        tls_config: std::sync::Arc<rustls::ServerConfig>,
1026    ) -> io::Result<()> {
1027        tcp.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
1028        tcp.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;
1029
1030        // Issue #572 slice 3 / #621: per-handler deadline applies to TLS
1031        // handlers too — same injectable-clock scaffolding as the
1032        // clear-text path.
1033        let deadline = HandlerDeadline::arm(Arc::clone(&self.handler_clock), self.handler_timeout);
1034
1035        let mut tls_stream = match self::tls::accept_tls(tls_config, tcp) {
1036            Ok(s) => s,
1037            Err(err) => {
1038                tracing::warn!(
1039                    target: "reddb::http_tls",
1040                    err = %err,
1041                    "TLS handshake failed"
1042                );
1043                return Err(err);
1044            }
1045        };
1046        let request = match HttpRequest::read_from(&mut tls_stream, self.options.max_body_bytes) {
1047            Ok(req) => req,
1048            Err(err) => {
1049                tracing::warn!(
1050                    target: "reddb::http_tls",
1051                    err = %err,
1052                    "TLS request parse failed"
1053                );
1054                return Err(err);
1055            }
1056        };
1057
1058        // Boundary (a): between request parse and route dispatch.
1059        if deadline.expired() {
1060            self.http_metrics
1061                .record_reject(HttpTransport::Https, HttpRejectReason::HandlerTimeout);
1062            Self::write_handler_timeout_503(&mut tls_stream);
1063            return Ok(());
1064        }
1065
1066        if self.try_route_streaming(&request, &mut tls_stream)? {
1067            return Ok(());
1068        }
1069        let response = self.route(request);
1070
1071        // Test-only injected slow downstream (issue #572 slice 3
1072        // integration test). Production sets this to 0.
1073        let inject_ms = self.slow_inject_ms.load(Ordering::Relaxed);
1074        if inject_ms > 0 {
1075            thread::sleep(Duration::from_millis(inject_ms));
1076        }
1077
1078        // Boundary (b): between route dispatch and response write.
1079        if deadline.expired() {
1080            self.http_metrics
1081                .record_reject(HttpTransport::Https, HttpRejectReason::HandlerTimeout);
1082            Self::write_handler_timeout_503(&mut tls_stream);
1083            return Ok(());
1084        }
1085
1086        tls_stream.write_all(&response.to_http_bytes())?;
1087        tls_stream.flush()?;
1088        Ok(())
1089    }
1090}
1091
1092/// Pre-render the limiter-reject 503 response. The `Retry-After`
1093/// value comes from the resolved HTTP limits (issue #574 slice 5)
1094/// and is fixed for the lifetime of the server, so we build the
1095/// bytes once and hand a shared `Arc<Vec<u8>>` to every accept loop.
1096fn build_reject_503_bytes(retry_after_secs: u64) -> Vec<u8> {
1097    format!(
1098        "HTTP/1.1 503 Service Unavailable\r\n\
1099         Connection: close\r\n\
1100         Content-Length: 0\r\n\
1101         Retry-After: {retry_after_secs}\r\n\
1102         \r\n"
1103    )
1104    .into_bytes()
1105}