Skip to main content

reddb_server/
server.rs

1//! Minimal HTTP server for RedDB management and remote access.
2
3pub(crate) use crate::application::json_input::{
4    json_bool_field, json_f32_field, json_string_field, json_usize_field,
5};
6pub(crate) use crate::application::{
7    AdminUseCases, CatalogUseCases, CreateDocumentInput, CreateEdgeInput, CreateEntityOutput,
8    CreateKvInput, CreateNodeEmbeddingInput, CreateNodeGraphLinkInput, CreateNodeInput,
9    CreateNodeTableLinkInput, CreateRowInput, CreateVectorInput, DeleteEntityInput, EntityUseCases,
10    ExecuteQueryInput, ExplainQueryInput, GraphCentralityInput, GraphClusteringInput,
11    GraphCommunitiesInput, GraphComponentsInput, GraphCyclesInput, GraphHitsInput,
12    GraphNeighborhoodInput, GraphPersonalizedPageRankInput, GraphShortestPathInput,
13    GraphTopologicalSortInput, GraphTraversalInput, GraphUseCases, InspectNativeArtifactInput,
14    NativeUseCases, PatchEntityInput, PatchEntityOperation, PatchEntityOperationType,
15    QueryUseCases, SearchHybridInput, SearchIvfInput, SearchMultimodalInput, SearchSimilarInput,
16    SearchTextInput, TreeUseCases,
17};
18use std::collections::{BTreeMap, HashMap};
19use std::io::{self, Read, Write};
20use std::net::{TcpListener, TcpStream};
21use std::sync::atomic::{AtomicU64, Ordering};
22use std::thread;
23use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
24
25use std::sync::Arc;
26
27use crate::api::{RedDBError, RedDBOptions, RedDBResult};
28use crate::auth::store::AuthStore;
29use crate::catalog::{CatalogModelSnapshot, CollectionDescriptor, CollectionModel, SchemaMode};
30use crate::health::{HealthProvider, HealthReport, HealthState};
31use crate::json::{parse_json, to_vec as json_to_vec, Map, Value as JsonValue};
32use crate::runtime::{
33    RedDBRuntime, RuntimeFilter, RuntimeFilterValue, RuntimeGraphCentralityAlgorithm,
34    RuntimeGraphCentralityResult, RuntimeGraphClusteringResult, RuntimeGraphCommunityAlgorithm,
35    RuntimeGraphCommunityResult, RuntimeGraphComponentsMode, RuntimeGraphComponentsResult,
36    RuntimeGraphCyclesResult, RuntimeGraphDirection, RuntimeGraphHitsResult,
37    RuntimeGraphNeighborhoodResult, RuntimeGraphPathAlgorithm, RuntimeGraphPathResult,
38    RuntimeGraphPattern, RuntimeGraphProjection, RuntimeGraphTopologicalSortResult,
39    RuntimeGraphTraversalResult, RuntimeGraphTraversalStrategy, RuntimeIvfSearchResult,
40    RuntimeQueryWeights, RuntimeStats, ScanCursor, ScanPage,
41};
42use crate::storage::schema::Value;
43use crate::storage::unified::devx::refs::{NodeRef, TableRef, VectorRef};
44use crate::storage::unified::dsl::{MatchComponents, QueryResult as DslQueryResult};
45use crate::storage::unified::{MetadataValue, RefTarget, SparseVector};
46use crate::storage::{CrossRef, EntityData, EntityId, EntityKind, SimilarResult, UnifiedEntity};
47
48fn analytics_job_json(job: &crate::PhysicalAnalyticsJob) -> JsonValue {
49    crate::presentation::admin_json::analytics_job_json(job)
50}
51
52#[cfg(test)]
53mod tests {
54    use super::*;
55    use crate::api::RedDBOptions;
56    use crate::health::HealthReport;
57    use crate::service_cli::{
58        TransportListenerFailure, TransportListenerState, TransportReadiness,
59    };
60
61    #[test]
62    fn server_options_default_http_body_limit_is_32_mib() {
63        assert_eq!(ServerOptions::default().max_body_bytes, 32 * 1024 * 1024);
64    }
65
66    #[test]
67    fn health_json_reports_transport_listeners() {
68        let runtime = RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime");
69        let mut options = ServerOptions::default();
70        options.transport_readiness = TransportReadiness {
71            active: vec![TransportListenerState {
72                transport: "grpc".to_string(),
73                bind_addr: "127.0.0.1:50051".to_string(),
74                explicit: true,
75            }],
76            failed: vec![TransportListenerFailure {
77                transport: "http".to_string(),
78                bind_addr: "127.0.0.1:5055".to_string(),
79                explicit: false,
80                reason: "http listener bind 127.0.0.1:5055: address in use".to_string(),
81            }],
82        };
83        let server = RedDBServer::with_options(runtime, options);
84
85        let payload = server.health_json_with_transport(&HealthReport::healthy());
86        let JsonValue::Object(root) = payload else {
87            panic!("health payload should be an object");
88        };
89        let Some(JsonValue::Object(listeners)) = root.get("transport_listeners") else {
90            panic!("health payload should include transport_listeners");
91        };
92        let Some(JsonValue::Array(active)) = listeners.get("active") else {
93            panic!("transport_listeners.active should be an array");
94        };
95        let Some(JsonValue::Array(failed)) = listeners.get("failed") else {
96            panic!("transport_listeners.failed should be an array");
97        };
98
99        assert_eq!(active.len(), 1);
100        assert_eq!(failed.len(), 1);
101    }
102}
103
104fn graph_projection_json(projection: &crate::PhysicalGraphProjection) -> JsonValue {
105    crate::presentation::admin_json::graph_projection_json(projection)
106}
107
108pub mod handlers_admin;
109mod handlers_ai;
110mod handlers_ai_model_cache;
111mod handlers_auth;
112mod handlers_backup;
113mod handlers_ec;
114pub(crate) mod handlers_entity;
115mod handlers_geo;
116mod handlers_graph;
117mod handlers_keyed;
118mod handlers_log;
119mod handlers_metrics;
120mod handlers_ops;
121mod handlers_query;
122mod handlers_replication;
123mod handlers_vcs;
124mod handlers_vector;
125pub mod header_escape_guard;
126pub mod http_connection_limiter;
127pub mod http_handler_metrics;
128pub mod http_limits;
129pub mod ingest_pipeline;
130mod patch_support;
131mod request_body;
132mod request_context;
133mod routing;
134mod serverless_support;
135pub mod tls;
136mod transport;
137
138use self::handlers_ai::*;
139use self::handlers_entity::*;
140use self::handlers_graph::*;
141use self::handlers_keyed::*;
142use self::handlers_metrics::*;
143use self::handlers_ops::*;
144use self::handlers_query::*;
145use self::http_connection_limiter::{
146    HandlerDeadline, HttpConnectionLimiter, MonotonicClock, SystemMonotonicClock,
147};
148use self::http_handler_metrics::{HttpHandlerMetrics, HttpRejectReason, HttpTransport};
149pub use self::http_limits::{
150    HttpLimitsCliInput, HttpLimitsResolved, DEFAULT_HANDLER_TIMEOUT_MS, DEFAULT_RETRY_AFTER_SECS,
151};
152use self::patch_support::*;
153use self::request_body::*;
154use self::routing::*;
155use self::serverless_support::*;
156use self::transport::*;
157
158/// PLAN.md Phase 6.2 — endpoint segregation. A given HTTP listener
159/// can serve either every public surface (`Public`, default) or a
160/// restricted slice (`AdminOnly`, `MetricsOnly`). The route filter at
161/// the top of `route()` consults this so a port bound only to
162/// loopback for admin work won't accidentally hand out DML.
163#[derive(Debug, Clone, Copy, PartialEq, Eq)]
164pub enum ServerSurface {
165    /// Everything routed normally (default — matches v0 behaviour).
166    Public,
167    /// Only `/admin/*`, `/metrics`, and `/health/*`. Other paths
168    /// return 404. Intended for `RED_ADMIN_BIND` operator listeners
169    /// which default to `127.0.0.1`.
170    AdminOnly,
171    /// Only `/metrics` and `/health/*`. Intended for
172    /// `RED_METRICS_BIND` Prometheus scrape ports that may be
173    /// exposed to non-admin networks.
174    MetricsOnly,
175}
176
177#[derive(Debug, Clone)]
178pub struct ServerOptions {
179    pub bind_addr: String,
180    pub max_body_bytes: usize,
181    pub read_timeout_ms: u64,
182    pub write_timeout_ms: u64,
183    pub max_scan_limit: usize,
184    /// Which subset of paths this listener serves. Defaults to
185    /// `Public`. Set to `AdminOnly` / `MetricsOnly` for dedicated
186    /// admin / scrape ports (PLAN.md Phase 6.2).
187    pub surface: ServerSurface,
188    pub transport_readiness: crate::service_cli::TransportReadiness,
189}
190
191pub const DEFAULT_HTTP_MAX_BODY_BYTES: usize = 32 * 1024 * 1024;
192
193impl Default for ServerOptions {
194    fn default() -> Self {
195        Self {
196            bind_addr: "127.0.0.1:5055".to_string(),
197            max_body_bytes: DEFAULT_HTTP_MAX_BODY_BYTES,
198            read_timeout_ms: 5_000,
199            write_timeout_ms: 5_000,
200            max_scan_limit: 1_000,
201            surface: ServerSurface::Public,
202            transport_readiness: crate::service_cli::TransportReadiness::default(),
203        }
204    }
205}
206
207/// Replication state exposed to the HTTP server.
208pub struct ServerReplicationState {
209    pub config: crate::replication::ReplicationConfig,
210    pub primary: Option<crate::replication::primary::PrimaryReplication>,
211}
212
213#[derive(Clone)]
214pub struct RedDBServer {
215    runtime: RedDBRuntime,
216    options: ServerOptions,
217    auth_store: Option<Arc<AuthStore>>,
218    replication: Option<Arc<ServerReplicationState>>,
219    /// Bounded handler-thread admission for the clear-text HTTP accept
220    /// loop (issue #570 slice 1). Cloned with the server; `Clone` of
221    /// `HttpConnectionLimiter` shares an `Arc` so every serve loop on
222    /// the same `RedDBServer` shares one cap.
223    http_limiter: HttpConnectionLimiter,
224    /// Per-handler total-time deadline (issue #570 slice 2). Each
225    /// clear-text handler thread arms a deadline at spawn and bails
226    /// with a best-effort 503 at coarse boundaries between request
227    /// parse, route dispatch, and response write. Hard-coded to 30s
228    /// here; the config knob lands in slice 5.
229    handler_timeout: Duration,
230    /// Monotonic clock the per-handler deadline (issue #621) is armed
231    /// against — the same [`MonotonicClock`] abstraction the limiter
232    /// uses. Production wires [`SystemMonotonicClock`] (real wall time);
233    /// tests inject a fake to drive timeout expiry deterministically
234    /// without `sleep()`. Shared via `Arc` so cloned server handles
235    /// (e.g. `serve_in_background`) read the same clock.
236    handler_clock: Arc<dyn MonotonicClock>,
237    /// Test-only synchronous sleep injected between route dispatch and
238    /// response write so an integration test can simulate a slow
239    /// downstream tripping the deadline. Default 0 (no-op). Shared via
240    /// `Arc` so a cloned `RedDBServer` (e.g. `serve_in_background`)
241    /// observes flips from the originating handle. Production callers
242    /// have no way to set this — the setter is `#[doc(hidden)]`.
243    slow_inject_ms: Arc<AtomicU64>,
244    /// Prometheus metrics for the HTTP handler-thread pool (issue
245    /// #573 slice 4). Records rejections (cap_exhausted /
246    /// handler_timeout) and per-handler duration histograms. Cloned
247    /// with the server via `Arc` so every serve loop on the same
248    /// `RedDBServer` writes to one set of counters.
249    http_metrics: HttpHandlerMetrics,
250    /// `Retry-After` value (seconds) emitted in the limiter's reject
251    /// path (issue #574 slice 5). Pre-rendered into `reject_503_bytes`
252    /// at construction so the hot path is one write+close.
253    retry_after_secs: u64,
254    /// Cached bytes of the limiter's 503 response. Shared via `Arc`
255    /// across cloned server handles so flipping `retry_after_secs`
256    /// via `with_http_limits` propagates to every accept loop.
257    reject_503_bytes: Arc<Vec<u8>>,
258}
259
260/// Default per-handler total-time budget (issue #571 slice 2).
261const DEFAULT_HANDLER_TIMEOUT: Duration = Duration::from_millis(30_000);
262
263#[derive(Debug, Clone, Copy, PartialEq, Eq)]
264enum ServerlessWarmupScope {
265    Indexes,
266    GraphProjections,
267    AnalyticsJobs,
268    NativeArtifacts,
269}
270
271#[derive(Debug, Clone, Copy, PartialEq, Eq)]
272enum DeploymentProfile {
273    Embedded,
274    Server,
275    Serverless,
276}
277
278fn percent_decode_path_segment(input: &str) -> Result<String, String> {
279    let bytes = input.as_bytes();
280    let mut out = Vec::with_capacity(bytes.len());
281    let mut index = 0;
282    while index < bytes.len() {
283        match bytes[index] {
284            b'%' => {
285                if index + 2 >= bytes.len() {
286                    return Err("truncated percent escape".to_string());
287                }
288                let high = hex_value(bytes[index + 1])
289                    .ok_or_else(|| "invalid percent escape".to_string())?;
290                let low = hex_value(bytes[index + 2])
291                    .ok_or_else(|| "invalid percent escape".to_string())?;
292                out.push((high << 4) | low);
293                index += 3;
294            }
295            byte => {
296                out.push(byte);
297                index += 1;
298            }
299        }
300    }
301    String::from_utf8(out).map_err(|_| "path segment is not valid UTF-8".to_string())
302}
303
304fn hex_value(byte: u8) -> Option<u8> {
305    match byte {
306        b'0'..=b'9' => Some(byte - b'0'),
307        b'a'..=b'f' => Some(byte - b'a' + 10),
308        b'A'..=b'F' => Some(byte - b'A' + 10),
309        _ => None,
310    }
311}
312
313#[derive(Debug, Clone)]
314struct ParsedQueryRequest {
315    query: String,
316    entity_types: Option<Vec<String>>,
317    capabilities: Option<Vec<String>>,
318    /// Optional positional `$N` bind parameters (#358). When `Some`, the
319    /// query handler runs the user_params binder before executing.
320    /// Absence preserves the legacy `query`-only behavior.
321    params: Option<Vec<Value>>,
322}
323
324#[derive(Debug, Clone, Copy)]
325enum PatchOperationType {
326    Set,
327    Replace,
328    Unset,
329}
330
331#[derive(Debug, Clone)]
332struct PatchOperation {
333    op: PatchOperationType,
334    path: Vec<String>,
335    value: Option<JsonValue>,
336}
337
338impl RedDBServer {
339    pub fn new(runtime: RedDBRuntime) -> Self {
340        Self::with_options(runtime, ServerOptions::default())
341    }
342
343    pub fn from_database_options(
344        db_options: RedDBOptions,
345        server_options: ServerOptions,
346    ) -> RedDBResult<Self> {
347        let runtime = RedDBRuntime::with_options(db_options)?;
348        Ok(Self::with_options(runtime, server_options))
349    }
350
351    pub fn with_options(runtime: RedDBRuntime, options: ServerOptions) -> Self {
352        Self {
353            runtime,
354            options,
355            auth_store: None,
356            replication: None,
357            http_limiter: HttpConnectionLimiter::with_default_cap(),
358            handler_timeout: DEFAULT_HANDLER_TIMEOUT,
359            handler_clock: Arc::new(SystemMonotonicClock::new()),
360            slow_inject_ms: Arc::new(AtomicU64::new(0)),
361            http_metrics: HttpHandlerMetrics::new(),
362            retry_after_secs: DEFAULT_RETRY_AFTER_SECS,
363            reject_503_bytes: Arc::new(build_reject_503_bytes(DEFAULT_RETRY_AFTER_SECS)),
364        }
365    }
366
367    #[doc(hidden)]
368    pub fn http_metrics(&self) -> &HttpHandlerMetrics {
369        &self.http_metrics
370    }
371
372    /// Visible for tests. Lets the integration test in
373    /// `tests/http_connection_limiter.rs` saturate the cap and observe
374    /// `503 Service Unavailable` responses without spinning up
375    /// thousands of sockets.
376    #[doc(hidden)]
377    pub fn with_http_limiter_cap(mut self, cap: usize) -> Self {
378        self.http_limiter = HttpConnectionLimiter::new(cap);
379        self
380    }
381
382    /// Stamp resolved HTTP limits onto the server (issue #574 slice 5).
383    /// Replaces the limiter cap, the per-handler deadline, and the
384    /// `Retry-After` value used by the limiter's reject path. All
385    /// values are assumed validated by [`http_limits::resolve_http_limits`].
386    pub fn with_http_limits(mut self, limits: HttpLimitsResolved) -> Self {
387        self.http_limiter = HttpConnectionLimiter::new(limits.max_handlers);
388        self.handler_timeout = Duration::from_millis(limits.handler_timeout_ms);
389        self.retry_after_secs = limits.retry_after_secs;
390        self.reject_503_bytes = Arc::new(build_reject_503_bytes(limits.retry_after_secs));
391        self
392    }
393
394    #[doc(hidden)]
395    pub fn retry_after_secs(&self) -> u64 {
396        self.retry_after_secs
397    }
398
399    #[doc(hidden)]
400    pub fn http_limiter(&self) -> &HttpConnectionLimiter {
401        &self.http_limiter
402    }
403
404    /// Visible for tests. Override the per-handler total-time deadline
405    /// (issue #570 slice 2). Default 30s.
406    #[doc(hidden)]
407    pub fn with_handler_timeout(mut self, timeout: Duration) -> Self {
408        self.handler_timeout = timeout;
409        self
410    }
411
412    #[doc(hidden)]
413    pub fn handler_timeout(&self) -> Duration {
414        self.handler_timeout
415    }
416
417    /// Visible for tests. Override the clock the per-handler deadline
418    /// (issue #621) is armed against, so timeout expiry can be driven
419    /// deterministically without real sleeps. Default is the real
420    /// monotonic clock.
421    #[doc(hidden)]
422    pub fn with_handler_clock(mut self, clock: Arc<dyn MonotonicClock>) -> Self {
423        self.handler_clock = clock;
424        self
425    }
426
427    /// Test hook: set a synchronous sleep (in ms) inserted between
428    /// route dispatch and response write. The integration test for
429    /// slice 2 sets a value greater than `handler_timeout` to trip
430    /// the deadline, then resets to 0 to verify recovery. Shared via
431    /// `Arc<AtomicU64>` so cloned server handles see the same flip.
432    #[doc(hidden)]
433    pub fn set_test_slow_inject_ms(&self, ms: u64) {
434        self.slow_inject_ms.store(ms, Ordering::Relaxed);
435    }
436
437    /// Attach an `AuthStore` for HTTP-layer authentication.
438    /// Also injects the store into the runtime so that `Value::Secret`
439    /// auto-encrypt/decrypt can reach the vault AES key.
440    pub fn with_auth(mut self, auth_store: Arc<AuthStore>) -> Self {
441        self.runtime.set_auth_store(Arc::clone(&auth_store));
442        self.auth_store = Some(auth_store);
443        self
444    }
445
446    /// Attach replication state for status and snapshot endpoints.
447    pub fn with_replication(mut self, state: Arc<ServerReplicationState>) -> Self {
448        self.replication = Some(state);
449        self
450    }
451
452    pub fn runtime(&self) -> &RedDBRuntime {
453        &self.runtime
454    }
455
456    pub fn options(&self) -> &ServerOptions {
457        &self.options
458    }
459
460    fn query_use_cases(&self) -> QueryUseCases<'_, RedDBRuntime> {
461        QueryUseCases::new(&self.runtime)
462    }
463
464    fn admin_use_cases(&self) -> AdminUseCases<'_, RedDBRuntime> {
465        AdminUseCases::new(&self.runtime)
466    }
467
468    fn entity_use_cases(&self) -> EntityUseCases<'_, RedDBRuntime> {
469        EntityUseCases::new(&self.runtime)
470    }
471
472    fn catalog_use_cases(&self) -> CatalogUseCases<'_, RedDBRuntime> {
473        CatalogUseCases::new(&self.runtime)
474    }
475
476    fn graph_use_cases(&self) -> GraphUseCases<'_, RedDBRuntime> {
477        GraphUseCases::new(&self.runtime)
478    }
479
480    fn native_use_cases(&self) -> NativeUseCases<'_, RedDBRuntime> {
481        NativeUseCases::new(&self.runtime)
482    }
483
484    fn tree_use_cases(&self) -> TreeUseCases<'_, RedDBRuntime> {
485        TreeUseCases::new(&self.runtime)
486    }
487
488    fn transport_readiness_json(&self) -> JsonValue {
489        let active = self
490            .options
491            .transport_readiness
492            .active
493            .iter()
494            .map(|listener| {
495                let mut object = Map::new();
496                object.insert(
497                    "transport".to_string(),
498                    JsonValue::String(listener.transport.clone()),
499                );
500                object.insert(
501                    "bind_addr".to_string(),
502                    JsonValue::String(listener.bind_addr.clone()),
503                );
504                object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
505                JsonValue::Object(object)
506            })
507            .collect();
508        let failed = self
509            .options
510            .transport_readiness
511            .failed
512            .iter()
513            .map(|listener| {
514                let mut object = Map::new();
515                object.insert(
516                    "transport".to_string(),
517                    JsonValue::String(listener.transport.clone()),
518                );
519                object.insert(
520                    "bind_addr".to_string(),
521                    JsonValue::String(listener.bind_addr.clone()),
522                );
523                object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
524                object.insert(
525                    "reason".to_string(),
526                    JsonValue::String(listener.reason.clone()),
527                );
528                JsonValue::Object(object)
529            })
530            .collect();
531
532        let mut object = Map::new();
533        object.insert("active".to_string(), JsonValue::Array(active));
534        object.insert("failed".to_string(), JsonValue::Array(failed));
535        JsonValue::Object(object)
536    }
537
538    fn handle_grpc_discovery(&self) -> HttpResponse {
539        let mut methods = Map::new();
540        methods.insert(
541            "query".to_string(),
542            JsonValue::String("reddb.v1.RedDB/Query".to_string()),
543        );
544        methods.insert(
545            "batch_query".to_string(),
546            JsonValue::String("reddb.v1.RedDB/BatchQuery".to_string()),
547        );
548        methods.insert(
549            "health".to_string(),
550            JsonValue::String("reddb.v1.RedDB/Health".to_string()),
551        );
552        methods.insert(
553            "prepare".to_string(),
554            JsonValue::String("reddb.v1.RedDB/Prepare".to_string()),
555        );
556        methods.insert(
557            "execute_prepared".to_string(),
558            JsonValue::String("reddb.v1.RedDB/ExecutePrepared".to_string()),
559        );
560
561        let mut examples = Map::new();
562        examples.insert(
563            "query".to_string(),
564            JsonValue::String(
565                "grpcurl -plaintext -d '{\"query\":\"SELECT 1\"}' 127.0.0.1:50051 reddb.v1.RedDB/Query"
566                    .to_string(),
567            ),
568        );
569        examples.insert(
570            "query_with_params".to_string(),
571            JsonValue::String(
572                "grpcurl -plaintext -d '{\"query\":\"SELECT $1 AS value\",\"params\":[{\"intValue\":42}]}' 127.0.0.1:50051 reddb.v1.RedDB/Query"
573                    .to_string(),
574            ),
575        );
576        examples.insert(
577            "health".to_string(),
578            JsonValue::String(
579                "grpcurl -plaintext -d '{}' 127.0.0.1:50051 reddb.v1.RedDB/Health".to_string(),
580            ),
581        );
582
583        let mut object = Map::new();
584        object.insert("ok".to_string(), JsonValue::Bool(true));
585        object.insert(
586            "service".to_string(),
587            JsonValue::String("reddb.v1.RedDB".to_string()),
588        );
589        object.insert(
590            "package".to_string(),
591            JsonValue::String("reddb.v1".to_string()),
592        );
593        object.insert(
594            "proto".to_string(),
595            JsonValue::String("crates/reddb-grpc-proto/proto/reddb.proto".to_string()),
596        );
597        object.insert("methods".to_string(), JsonValue::Object(methods));
598        object.insert("examples".to_string(), JsonValue::Object(examples));
599        object.insert(
600            "transport_listeners".to_string(),
601            self.transport_readiness_json(),
602        );
603        object.insert(
604            "hint".to_string(),
605            JsonValue::String(
606                "If grpcurl cannot list services, pass the proto file with -import-path crates/reddb-grpc-proto/proto -proto reddb.proto."
607                    .to_string(),
608            ),
609        );
610        json_response(200, JsonValue::Object(object))
611    }
612
613    fn handle_query_contract(&self) -> HttpResponse {
614        let mut examples = Map::new();
615        examples.insert(
616            "raw_sql".to_string(),
617            JsonValue::String("curl -sS http://127.0.0.1:8080/query -d 'SELECT 1'".to_string()),
618        );
619        examples.insert(
620            "json_query".to_string(),
621            JsonValue::String(
622                "curl -sS http://127.0.0.1:8080/query -H 'content-type: application/json' -d '{\"query\":\"SELECT 1\"}'"
623                    .to_string(),
624            ),
625        );
626        examples.insert(
627            "json_query_with_params".to_string(),
628            JsonValue::String(
629                "curl -sS http://127.0.0.1:8080/query -H 'content-type: application/json' -d '{\"query\":\"SELECT $1 AS value\",\"params\":[42]}'"
630                    .to_string(),
631            ),
632        );
633
634        let mut request_body = Map::new();
635        request_body.insert(
636            "query".to_string(),
637            JsonValue::String("required string".to_string()),
638        );
639        request_body.insert(
640            "params".to_string(),
641            JsonValue::String("optional array".to_string()),
642        );
643
644        let mut response_shape = Map::new();
645        response_shape.insert(
646            "columns".to_string(),
647            JsonValue::String("projected column names".to_string()),
648        );
649        response_shape.insert(
650            "records[].values".to_string(),
651            JsonValue::String("only projected values".to_string()),
652        );
653        response_shape.insert(
654            "records[].meta".to_string(),
655            JsonValue::String("internal metadata when present".to_string()),
656        );
657
658        let mut object = Map::new();
659        object.insert("ok".to_string(), JsonValue::Bool(false));
660        object.insert(
661            "code".to_string(),
662            JsonValue::String("method_not_allowed".to_string()),
663        );
664        object.insert(
665            "message".to_string(),
666            JsonValue::String("/query accepts POST requests".to_string()),
667        );
668        object.insert(
669            "hint".to_string(),
670            JsonValue::String(
671                "Send raw SQL in the body, or JSON with a string 'query' field.".to_string(),
672            ),
673        );
674        object.insert("method".to_string(), JsonValue::String("POST".to_string()));
675        object.insert("path".to_string(), JsonValue::String("/query".to_string()));
676        object.insert("request_body".to_string(), JsonValue::Object(request_body));
677        object.insert(
678            "response_shape".to_string(),
679            JsonValue::Object(response_shape),
680        );
681        object.insert("examples".to_string(), JsonValue::Object(examples));
682        object.insert(
683            "docs".to_string(),
684            JsonValue::String("https://reddb.io/docs/query".to_string()),
685        );
686
687        json_response(405, JsonValue::Object(object))
688            .with_header("Allow", http::HeaderValue::from_static("POST"))
689    }
690
691    fn handle_root_discovery(&self) -> HttpResponse {
692        let mut endpoints = Map::new();
693        endpoints.insert(
694            "health".to_string(),
695            JsonValue::String("GET /health".to_string()),
696        );
697        endpoints.insert(
698            "ready".to_string(),
699            JsonValue::String("GET /ready".to_string()),
700        );
701        endpoints.insert(
702            "query".to_string(),
703            JsonValue::String("POST /query".to_string()),
704        );
705        endpoints.insert(
706            "query_readiness".to_string(),
707            JsonValue::String("GET /ready/query".to_string()),
708        );
709        endpoints.insert(
710            "catalog".to_string(),
711            JsonValue::String("GET /catalog".to_string()),
712        );
713        endpoints.insert(
714            "deployment_profiles".to_string(),
715            JsonValue::String("GET /deployment/profiles".to_string()),
716        );
717
718        let mut examples = Map::new();
719        examples.insert(
720            "http_raw_sql".to_string(),
721            JsonValue::String("curl -sS http://127.0.0.1:8080/query -d 'SELECT 1'".to_string()),
722        );
723        examples.insert(
724            "http_json_query".to_string(),
725            JsonValue::String(
726                "curl -sS http://127.0.0.1:8080/query -H 'content-type: application/json' -d '{\"query\":\"SELECT 1\"}'"
727                    .to_string(),
728            ),
729        );
730        examples.insert(
731            "http_json_query_with_params".to_string(),
732            JsonValue::String(
733                "curl -sS http://127.0.0.1:8080/query -H 'content-type: application/json' -d '{\"query\":\"SELECT $1 AS value\",\"params\":[42]}'"
734                    .to_string(),
735            ),
736        );
737
738        let mut object = Map::new();
739        object.insert("ok".to_string(), JsonValue::Bool(true));
740        object.insert(
741            "service".to_string(),
742            JsonValue::String("reddb".to_string()),
743        );
744        object.insert(
745            "version".to_string(),
746            JsonValue::String(env!("CARGO_PKG_VERSION").to_string()),
747        );
748        object.insert("endpoints".to_string(), JsonValue::Object(endpoints));
749        object.insert("examples".to_string(), JsonValue::Object(examples));
750        object.insert(
751            "docs".to_string(),
752            JsonValue::String("https://reddb.io/docs".to_string()),
753        );
754        object.insert(
755            "transport_listeners".to_string(),
756            self.transport_readiness_json(),
757        );
758        json_response(200, JsonValue::Object(object))
759    }
760
761    fn health_json_with_transport(&self, report: &HealthReport) -> JsonValue {
762        let mut value = crate::presentation::ops_json::health_json(report);
763        if let JsonValue::Object(ref mut object) = value {
764            object.insert(
765                "transport_listeners".to_string(),
766                self.transport_readiness_json(),
767            );
768        }
769        value
770    }
771
772    pub fn serve(&self) -> io::Result<()> {
773        let listener = TcpListener::bind(&self.options.bind_addr)?;
774        self.serve_on(listener)
775    }
776
777    pub fn serve_on(&self, listener: TcpListener) -> io::Result<()> {
778        for stream in listener.incoming() {
779            match stream {
780                Ok(stream) => match self.http_limiter.try_acquire() {
781                    Some(permit) => {
782                        // Spawn a thread per connection for concurrent request handling
783                        let server = self.clone();
784                        thread::spawn(move || {
785                            let _guard = permit; // released on thread exit
786                            let _ = server.handle_connection(stream);
787                        });
788                    }
789                    None => {
790                        // Cap exhausted: write static 503 inline on the
791                        // accept thread, close the socket, and continue.
792                        // No thread spawn, no `HttpRequest::read_from`,
793                        // no runtime call.
794                        self.http_metrics
795                            .record_reject(HttpTransport::Http, HttpRejectReason::CapExhausted);
796                        self.reject_with_503(stream, self.options.write_timeout_ms);
797                    }
798                },
799                Err(err) => return Err(err),
800            }
801        }
802        Ok(())
803    }
804
805    /// 503 response used when the connection limiter is full. The
806    /// payload is pre-rendered into `reject_503_bytes` at construction
807    /// (issue #574 slice 5: `Retry-After` is configurable), so the
808    /// hot path is still one write and a close.
809    fn reject_with_503(&self, mut stream: TcpStream, write_timeout_ms: u64) {
810        let _ = stream.set_write_timeout(Some(Duration::from_millis(write_timeout_ms)));
811        let _ = stream.write_all(&self.reject_503_bytes);
812        let _ = stream.flush();
813        let _ = stream.shutdown(std::net::Shutdown::Both);
814    }
815
816    pub fn serve_one_on(&self, listener: TcpListener) -> io::Result<()> {
817        let (stream, _) = listener.accept()?;
818        self.handle_connection(stream)
819    }
820
821    pub fn serve_in_background(&self) -> thread::JoinHandle<io::Result<()>> {
822        let server = self.clone();
823        thread::spawn(move || server.serve())
824    }
825
826    pub fn serve_in_background_on(
827        &self,
828        listener: TcpListener,
829    ) -> thread::JoinHandle<io::Result<()>> {
830        let server = self.clone();
831        thread::spawn(move || server.serve_on(listener))
832    }
833
834    /// Serve TLS-wrapped HTTPS on the configured `bind_addr`. The
835    /// `tls_config` is shared across all connections (rustls
836    /// `ServerConfig` is `Send + Sync`).
837    pub fn serve_tls(&self, tls_config: std::sync::Arc<rustls::ServerConfig>) -> io::Result<()> {
838        let listener = TcpListener::bind(&self.options.bind_addr)?;
839        self.serve_tls_on(listener, tls_config)
840    }
841
842    pub fn serve_tls_on(
843        &self,
844        listener: TcpListener,
845        tls_config: std::sync::Arc<rustls::ServerConfig>,
846    ) -> io::Result<()> {
847        for stream in listener.incoming() {
848            match stream {
849                Ok(stream) => match self.http_limiter.try_acquire() {
850                    Some(permit) => {
851                        let server = self.clone();
852                        let cfg = tls_config.clone();
853                        thread::spawn(move || {
854                            let _guard = permit; // released on thread exit
855                            let _ = server.handle_tls_connection(stream, cfg);
856                        });
857                    }
858                    None => {
859                        // Issue #572 slice 3: cross-transport cap shared
860                        // with the clear-text limiter. Writing a 503 over
861                        // a non-handshaken TLS socket is not meaningful,
862                        // so reject by closing the raw socket. No TLS
863                        // handshake, no thread spawn, no runtime call.
864                        self.http_metrics
865                            .record_reject(HttpTransport::Https, HttpRejectReason::CapExhausted);
866                        let _ = stream.shutdown(std::net::Shutdown::Both);
867                        drop(stream);
868                    }
869                },
870                Err(err) => return Err(err),
871            }
872        }
873        Ok(())
874    }
875
876    pub fn serve_tls_in_background(
877        &self,
878        tls_config: std::sync::Arc<rustls::ServerConfig>,
879    ) -> thread::JoinHandle<io::Result<()>> {
880        let server = self.clone();
881        thread::spawn(move || server.serve_tls(tls_config))
882    }
883
884    pub fn serve_tls_in_background_on(
885        &self,
886        listener: TcpListener,
887        tls_config: std::sync::Arc<rustls::ServerConfig>,
888    ) -> thread::JoinHandle<io::Result<()>> {
889        let server = self.clone();
890        thread::spawn(move || server.serve_tls_on(listener, tls_config))
891    }
892
893    fn handle_connection(&self, stream: TcpStream) -> io::Result<()> {
894        let started = Instant::now();
895        let result = self.handle_connection_inner(stream);
896        let elapsed = started.elapsed().as_secs_f64();
897        self.http_metrics
898            .record_duration(HttpTransport::Http, elapsed);
899        result
900    }
901
902    fn handle_connection_inner(&self, mut stream: TcpStream) -> io::Result<()> {
903        stream.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
904        stream.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;
905
906        // Issue #570 slice 2 / #621: arm a deadline at handler spawn and
907        // check at coarse boundaries. Armed against the injectable
908        // monotonic clock (#621) so timeout behaviour is deterministically
909        // testable without real sleeps; production wires the real clock,
910        // so this tracks wall time. No hard pre-emption — a thread blocked
911        // inside a true syscall is still bounded only by the per-socket
912        // read/write timeouts.
913        let deadline = HandlerDeadline::arm(Arc::clone(&self.handler_clock), self.handler_timeout);
914
915        let request = HttpRequest::read_from(&mut stream, self.options.max_body_bytes)?;
916
917        // Boundary (a): between request parse and route dispatch.
918        if deadline.expired() {
919            self.http_metrics
920                .record_reject(HttpTransport::Http, HttpRejectReason::HandlerTimeout);
921            Self::write_handler_timeout_503(&mut stream);
922            return Ok(());
923        }
924
925        if self.try_route_streaming(&request, &mut stream)? {
926            return Ok(());
927        }
928        let response = self.route(request);
929
930        // Test-only injected slow downstream (issue #570 slice 2
931        // integration test). Production builds set this to 0, so this
932        // is a single relaxed atomic load on the hot path.
933        let inject_ms = self.slow_inject_ms.load(Ordering::Relaxed);
934        if inject_ms > 0 {
935            thread::sleep(Duration::from_millis(inject_ms));
936        }
937
938        // Boundary (b): between route dispatch and response write.
939        if deadline.expired() {
940            self.http_metrics
941                .record_reject(HttpTransport::Http, HttpRejectReason::HandlerTimeout);
942            Self::write_handler_timeout_503(&mut stream);
943            return Ok(());
944        }
945
946        stream.write_all(&response.to_http_bytes())?;
947        stream.flush()?;
948        Ok(())
949    }
950
951    /// Best-effort 503 emitted when the per-handler deadline expires
952    /// at a coarse boundary. Writes are swallowed — the caller has
953    /// already exceeded its budget, so we do not propagate write
954    /// errors. Permit drop happens on the handler thread's normal
955    /// exit path.
956    fn write_handler_timeout_503<S: Write>(stream: &mut S) {
957        const RESPONSE: &[u8] = b"HTTP/1.1 503 Service Unavailable\r\n\
958            Connection: close\r\n\
959            Content-Length: 0\r\n\
960            \r\n";
961        let _ = stream.write_all(RESPONSE);
962        let _ = stream.flush();
963    }
964
965    fn handle_tls_connection(
966        &self,
967        tcp: TcpStream,
968        tls_config: std::sync::Arc<rustls::ServerConfig>,
969    ) -> io::Result<()> {
970        let started = Instant::now();
971        let result = self.handle_tls_connection_inner(tcp, tls_config);
972        let elapsed = started.elapsed().as_secs_f64();
973        self.http_metrics
974            .record_duration(HttpTransport::Https, elapsed);
975        result
976    }
977
978    fn handle_tls_connection_inner(
979        &self,
980        tcp: TcpStream,
981        tls_config: std::sync::Arc<rustls::ServerConfig>,
982    ) -> io::Result<()> {
983        tcp.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
984        tcp.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;
985
986        // Issue #572 slice 3 / #621: per-handler deadline applies to TLS
987        // handlers too — same injectable-clock scaffolding as the
988        // clear-text path.
989        let deadline = HandlerDeadline::arm(Arc::clone(&self.handler_clock), self.handler_timeout);
990
991        let mut tls_stream = match self::tls::accept_tls(tls_config, tcp) {
992            Ok(s) => s,
993            Err(err) => {
994                tracing::warn!(
995                    target: "reddb::http_tls",
996                    err = %err,
997                    "TLS handshake failed"
998                );
999                return Err(err);
1000            }
1001        };
1002        let request = match HttpRequest::read_from(&mut tls_stream, self.options.max_body_bytes) {
1003            Ok(req) => req,
1004            Err(err) => {
1005                tracing::warn!(
1006                    target: "reddb::http_tls",
1007                    err = %err,
1008                    "TLS request parse failed"
1009                );
1010                return Err(err);
1011            }
1012        };
1013
1014        // Boundary (a): between request parse and route dispatch.
1015        if deadline.expired() {
1016            self.http_metrics
1017                .record_reject(HttpTransport::Https, HttpRejectReason::HandlerTimeout);
1018            Self::write_handler_timeout_503(&mut tls_stream);
1019            return Ok(());
1020        }
1021
1022        if self.try_route_streaming(&request, &mut tls_stream)? {
1023            return Ok(());
1024        }
1025        let response = self.route(request);
1026
1027        // Test-only injected slow downstream (issue #572 slice 3
1028        // integration test). Production sets this to 0.
1029        let inject_ms = self.slow_inject_ms.load(Ordering::Relaxed);
1030        if inject_ms > 0 {
1031            thread::sleep(Duration::from_millis(inject_ms));
1032        }
1033
1034        // Boundary (b): between route dispatch and response write.
1035        if deadline.expired() {
1036            self.http_metrics
1037                .record_reject(HttpTransport::Https, HttpRejectReason::HandlerTimeout);
1038            Self::write_handler_timeout_503(&mut tls_stream);
1039            return Ok(());
1040        }
1041
1042        tls_stream.write_all(&response.to_http_bytes())?;
1043        tls_stream.flush()?;
1044        Ok(())
1045    }
1046}
1047
1048/// Pre-render the limiter-reject 503 response. The `Retry-After`
1049/// value comes from the resolved HTTP limits (issue #574 slice 5)
1050/// and is fixed for the lifetime of the server, so we build the
1051/// bytes once and hand a shared `Arc<Vec<u8>>` to every accept loop.
1052fn build_reject_503_bytes(retry_after_secs: u64) -> Vec<u8> {
1053    format!(
1054        "HTTP/1.1 503 Service Unavailable\r\n\
1055         Connection: close\r\n\
1056         Content-Length: 0\r\n\
1057         Retry-After: {retry_after_secs}\r\n\
1058         \r\n"
1059    )
1060    .into_bytes()
1061}