Skip to main content

reddb_server/
server.rs

1//! Minimal HTTP server for RedDB management and remote access.
2
3pub(crate) use crate::application::json_input::{
4    json_bool_field, json_f32_field, json_string_field, json_usize_field,
5};
6pub(crate) use crate::application::{
7    AdminUseCases, CatalogUseCases, CreateDocumentInput, CreateEdgeInput, CreateEntityOutput,
8    CreateKvInput, CreateNodeEmbeddingInput, CreateNodeGraphLinkInput, CreateNodeInput,
9    CreateNodeTableLinkInput, CreateRowInput, CreateVectorInput, DeleteEntityInput, EntityUseCases,
10    ExecuteQueryInput, ExplainQueryInput, GraphCentralityInput, GraphClusteringInput,
11    GraphCommunitiesInput, GraphComponentsInput, GraphCyclesInput, GraphHitsInput,
12    GraphNeighborhoodInput, GraphPersonalizedPageRankInput, GraphShortestPathInput,
13    GraphTopologicalSortInput, GraphTraversalInput, GraphUseCases, InspectNativeArtifactInput,
14    NativeUseCases, PatchEntityInput, PatchEntityOperation, PatchEntityOperationType,
15    QueryUseCases, SearchHybridInput, SearchIvfInput, SearchMultimodalInput, SearchSimilarInput,
16    SearchTextInput, TreeUseCases,
17};
18use std::collections::{BTreeMap, HashMap};
19use std::io::{self, Read, Write};
20use std::net::{TcpListener, TcpStream};
21use std::sync::atomic::{AtomicU64, Ordering};
22use std::thread;
23use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
24
25use std::sync::Arc;
26
27use crate::api::{RedDBError, RedDBOptions, RedDBResult};
28use crate::auth::store::AuthStore;
29use crate::catalog::{CatalogModelSnapshot, CollectionDescriptor, CollectionModel, SchemaMode};
30use crate::health::{HealthProvider, HealthReport, HealthState};
31use crate::json::{parse_json, to_vec as json_to_vec, Map, Value as JsonValue};
32use crate::runtime::{
33    RedDBRuntime, RuntimeFilter, RuntimeFilterValue, RuntimeGraphCentralityAlgorithm,
34    RuntimeGraphCentralityResult, RuntimeGraphClusteringResult, RuntimeGraphCommunityAlgorithm,
35    RuntimeGraphCommunityResult, RuntimeGraphComponentsMode, RuntimeGraphComponentsResult,
36    RuntimeGraphCyclesResult, RuntimeGraphDirection, RuntimeGraphHitsResult,
37    RuntimeGraphNeighborhoodResult, RuntimeGraphPathAlgorithm, RuntimeGraphPathResult,
38    RuntimeGraphPattern, RuntimeGraphProjection, RuntimeGraphTopologicalSortResult,
39    RuntimeGraphTraversalResult, RuntimeGraphTraversalStrategy, RuntimeIvfSearchResult,
40    RuntimeQueryWeights, RuntimeStats, ScanCursor, ScanPage,
41};
42use crate::storage::schema::Value;
43use crate::storage::unified::devx::refs::{NodeRef, TableRef, VectorRef};
44use crate::storage::unified::dsl::{MatchComponents, QueryResult as DslQueryResult};
45use crate::storage::unified::{MetadataValue, RefTarget, SparseVector};
46use crate::storage::{CrossRef, EntityData, EntityId, EntityKind, SimilarResult, UnifiedEntity};
47
48fn analytics_job_json(job: &crate::PhysicalAnalyticsJob) -> JsonValue {
49    crate::presentation::admin_json::analytics_job_json(job)
50}
51
52#[cfg(test)]
53mod tests {
54    use super::*;
55    use crate::api::RedDBOptions;
56    use crate::health::HealthReport;
57    use crate::service_cli::{
58        TransportListenerFailure, TransportListenerState, TransportReadiness,
59    };
60
61    #[test]
62    fn health_json_reports_transport_listeners() {
63        let runtime = RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime");
64        let mut options = ServerOptions::default();
65        options.transport_readiness = TransportReadiness {
66            active: vec![TransportListenerState {
67                transport: "grpc".to_string(),
68                bind_addr: "127.0.0.1:50051".to_string(),
69                explicit: true,
70            }],
71            failed: vec![TransportListenerFailure {
72                transport: "http".to_string(),
73                bind_addr: "127.0.0.1:5055".to_string(),
74                explicit: false,
75                reason: "http listener bind 127.0.0.1:5055: address in use".to_string(),
76            }],
77        };
78        let server = RedDBServer::with_options(runtime, options);
79
80        let payload = server.health_json_with_transport(&HealthReport::healthy());
81        let JsonValue::Object(root) = payload else {
82            panic!("health payload should be an object");
83        };
84        let Some(JsonValue::Object(listeners)) = root.get("transport_listeners") else {
85            panic!("health payload should include transport_listeners");
86        };
87        let Some(JsonValue::Array(active)) = listeners.get("active") else {
88            panic!("transport_listeners.active should be an array");
89        };
90        let Some(JsonValue::Array(failed)) = listeners.get("failed") else {
91            panic!("transport_listeners.failed should be an array");
92        };
93
94        assert_eq!(active.len(), 1);
95        assert_eq!(failed.len(), 1);
96    }
97}
98
99fn graph_projection_json(projection: &crate::PhysicalGraphProjection) -> JsonValue {
100    crate::presentation::admin_json::graph_projection_json(projection)
101}
102
103pub mod handlers_admin;
104mod handlers_ai;
105pub mod http_connection_limiter;
106pub mod http_handler_metrics;
107pub mod http_limits;
108mod handlers_auth;
109mod handlers_backup;
110mod handlers_ec;
111pub(crate) mod handlers_entity;
112mod handlers_geo;
113mod handlers_graph;
114mod handlers_keyed;
115mod handlers_log;
116mod handlers_metrics;
117mod handlers_ops;
118mod handlers_query;
119mod handlers_replication;
120mod handlers_vcs;
121mod handlers_vector;
122pub mod header_escape_guard;
123pub mod ingest_pipeline;
124mod patch_support;
125mod request_body;
126mod request_context;
127mod routing;
128mod serverless_support;
129pub mod tls;
130mod transport;
131
132use self::handlers_ai::*;
133use self::http_connection_limiter::HttpConnectionLimiter;
134use self::http_handler_metrics::{HttpHandlerMetrics, HttpRejectReason, HttpTransport};
135pub use self::http_limits::{
136    HttpLimitsCliInput, HttpLimitsResolved, DEFAULT_HANDLER_TIMEOUT_MS, DEFAULT_RETRY_AFTER_SECS,
137};
138use self::handlers_entity::*;
139use self::handlers_graph::*;
140use self::handlers_keyed::*;
141use self::handlers_metrics::*;
142use self::handlers_ops::*;
143use self::handlers_query::*;
144use self::patch_support::*;
145use self::request_body::*;
146use self::routing::*;
147use self::serverless_support::*;
148use self::transport::*;
149
150/// PLAN.md Phase 6.2 — endpoint segregation. A given HTTP listener
151/// can serve either every public surface (`Public`, default) or a
152/// restricted slice (`AdminOnly`, `MetricsOnly`). The route filter at
153/// the top of `route()` consults this so a port bound only to
154/// loopback for admin work won't accidentally hand out DML.
155#[derive(Debug, Clone, Copy, PartialEq, Eq)]
156pub enum ServerSurface {
157    /// Everything routed normally (default — matches v0 behaviour).
158    Public,
159    /// Only `/admin/*`, `/metrics`, and `/health/*`. Other paths
160    /// return 404. Intended for `RED_ADMIN_BIND` operator listeners
161    /// which default to `127.0.0.1`.
162    AdminOnly,
163    /// Only `/metrics` and `/health/*`. Intended for
164    /// `RED_METRICS_BIND` Prometheus scrape ports that may be
165    /// exposed to non-admin networks.
166    MetricsOnly,
167}
168
169#[derive(Debug, Clone)]
170pub struct ServerOptions {
171    pub bind_addr: String,
172    pub max_body_bytes: usize,
173    pub read_timeout_ms: u64,
174    pub write_timeout_ms: u64,
175    pub max_scan_limit: usize,
176    /// Which subset of paths this listener serves. Defaults to
177    /// `Public`. Set to `AdminOnly` / `MetricsOnly` for dedicated
178    /// admin / scrape ports (PLAN.md Phase 6.2).
179    pub surface: ServerSurface,
180    pub transport_readiness: crate::service_cli::TransportReadiness,
181}
182
183impl Default for ServerOptions {
184    fn default() -> Self {
185        Self {
186            bind_addr: "127.0.0.1:5055".to_string(),
187            max_body_bytes: 1024 * 1024,
188            read_timeout_ms: 5_000,
189            write_timeout_ms: 5_000,
190            max_scan_limit: 1_000,
191            surface: ServerSurface::Public,
192            transport_readiness: crate::service_cli::TransportReadiness::default(),
193        }
194    }
195}
196
197/// Replication state exposed to the HTTP server.
198pub struct ServerReplicationState {
199    pub config: crate::replication::ReplicationConfig,
200    pub primary: Option<crate::replication::primary::PrimaryReplication>,
201}
202
203#[derive(Clone)]
204pub struct RedDBServer {
205    runtime: RedDBRuntime,
206    options: ServerOptions,
207    auth_store: Option<Arc<AuthStore>>,
208    replication: Option<Arc<ServerReplicationState>>,
209    /// Bounded handler-thread admission for the clear-text HTTP accept
210    /// loop (issue #570 slice 1). Cloned with the server; `Clone` of
211    /// `HttpConnectionLimiter` shares an `Arc` so every serve loop on
212    /// the same `RedDBServer` shares one cap.
213    http_limiter: HttpConnectionLimiter,
214    /// Per-handler total-time deadline (issue #570 slice 2). Each
215    /// clear-text handler thread arms a deadline at spawn and bails
216    /// with a best-effort 503 at coarse boundaries between request
217    /// parse, route dispatch, and response write. Hard-coded to 30s
218    /// here; the config knob lands in slice 5.
219    handler_timeout: Duration,
220    /// Test-only synchronous sleep injected between route dispatch and
221    /// response write so an integration test can simulate a slow
222    /// downstream tripping the deadline. Default 0 (no-op). Shared via
223    /// `Arc` so a cloned `RedDBServer` (e.g. `serve_in_background`)
224    /// observes flips from the originating handle. Production callers
225    /// have no way to set this — the setter is `#[doc(hidden)]`.
226    slow_inject_ms: Arc<AtomicU64>,
227    /// Prometheus metrics for the HTTP handler-thread pool (issue
228    /// #573 slice 4). Records rejections (cap_exhausted /
229    /// handler_timeout) and per-handler duration histograms. Cloned
230    /// with the server via `Arc` so every serve loop on the same
231    /// `RedDBServer` writes to one set of counters.
232    http_metrics: HttpHandlerMetrics,
233    /// `Retry-After` value (seconds) emitted in the limiter's reject
234    /// path (issue #574 slice 5). Pre-rendered into `reject_503_bytes`
235    /// at construction so the hot path is one write+close.
236    retry_after_secs: u64,
237    /// Cached bytes of the limiter's 503 response. Shared via `Arc`
238    /// across cloned server handles so flipping `retry_after_secs`
239    /// via `with_http_limits` propagates to every accept loop.
240    reject_503_bytes: Arc<Vec<u8>>,
241}
242
243/// Default per-handler total-time budget (issue #571 slice 2).
244const DEFAULT_HANDLER_TIMEOUT: Duration = Duration::from_millis(30_000);
245
246#[derive(Debug, Clone, Copy, PartialEq, Eq)]
247enum ServerlessWarmupScope {
248    Indexes,
249    GraphProjections,
250    AnalyticsJobs,
251    NativeArtifacts,
252}
253
254#[derive(Debug, Clone, Copy, PartialEq, Eq)]
255enum DeploymentProfile {
256    Embedded,
257    Server,
258    Serverless,
259}
260
261fn percent_decode_path_segment(input: &str) -> Result<String, String> {
262    let bytes = input.as_bytes();
263    let mut out = Vec::with_capacity(bytes.len());
264    let mut index = 0;
265    while index < bytes.len() {
266        match bytes[index] {
267            b'%' => {
268                if index + 2 >= bytes.len() {
269                    return Err("truncated percent escape".to_string());
270                }
271                let high = hex_value(bytes[index + 1])
272                    .ok_or_else(|| "invalid percent escape".to_string())?;
273                let low = hex_value(bytes[index + 2])
274                    .ok_or_else(|| "invalid percent escape".to_string())?;
275                out.push((high << 4) | low);
276                index += 3;
277            }
278            byte => {
279                out.push(byte);
280                index += 1;
281            }
282        }
283    }
284    String::from_utf8(out).map_err(|_| "path segment is not valid UTF-8".to_string())
285}
286
287fn hex_value(byte: u8) -> Option<u8> {
288    match byte {
289        b'0'..=b'9' => Some(byte - b'0'),
290        b'a'..=b'f' => Some(byte - b'a' + 10),
291        b'A'..=b'F' => Some(byte - b'A' + 10),
292        _ => None,
293    }
294}
295
296#[derive(Debug, Clone)]
297struct ParsedQueryRequest {
298    query: String,
299    entity_types: Option<Vec<String>>,
300    capabilities: Option<Vec<String>>,
301    /// Optional positional `$N` bind parameters (#358). When `Some`, the
302    /// query handler runs the user_params binder before executing.
303    /// Absence preserves the legacy `query`-only behavior.
304    params: Option<Vec<Value>>,
305}
306
307#[derive(Debug, Clone, Copy)]
308enum PatchOperationType {
309    Set,
310    Replace,
311    Unset,
312}
313
314#[derive(Debug, Clone)]
315struct PatchOperation {
316    op: PatchOperationType,
317    path: Vec<String>,
318    value: Option<JsonValue>,
319}
320
321impl RedDBServer {
322    pub fn new(runtime: RedDBRuntime) -> Self {
323        Self::with_options(runtime, ServerOptions::default())
324    }
325
326    pub fn from_database_options(
327        db_options: RedDBOptions,
328        server_options: ServerOptions,
329    ) -> RedDBResult<Self> {
330        let runtime = RedDBRuntime::with_options(db_options)?;
331        Ok(Self::with_options(runtime, server_options))
332    }
333
334    pub fn with_options(runtime: RedDBRuntime, options: ServerOptions) -> Self {
335        Self {
336            runtime,
337            options,
338            auth_store: None,
339            replication: None,
340            http_limiter: HttpConnectionLimiter::with_default_cap(),
341            handler_timeout: DEFAULT_HANDLER_TIMEOUT,
342            slow_inject_ms: Arc::new(AtomicU64::new(0)),
343            http_metrics: HttpHandlerMetrics::new(),
344            retry_after_secs: DEFAULT_RETRY_AFTER_SECS,
345            reject_503_bytes: Arc::new(build_reject_503_bytes(DEFAULT_RETRY_AFTER_SECS)),
346        }
347    }
348
349    #[doc(hidden)]
350    pub fn http_metrics(&self) -> &HttpHandlerMetrics {
351        &self.http_metrics
352    }
353
354    /// Visible for tests. Lets the integration test in
355    /// `tests/http_connection_limiter.rs` saturate the cap and observe
356    /// `503 Service Unavailable` responses without spinning up
357    /// thousands of sockets.
358    #[doc(hidden)]
359    pub fn with_http_limiter_cap(mut self, cap: usize) -> Self {
360        self.http_limiter = HttpConnectionLimiter::new(cap);
361        self
362    }
363
364    /// Stamp resolved HTTP limits onto the server (issue #574 slice 5).
365    /// Replaces the limiter cap, the per-handler deadline, and the
366    /// `Retry-After` value used by the limiter's reject path. All
367    /// values are assumed validated by [`http_limits::resolve_http_limits`].
368    pub fn with_http_limits(mut self, limits: HttpLimitsResolved) -> Self {
369        self.http_limiter = HttpConnectionLimiter::new(limits.max_handlers);
370        self.handler_timeout = Duration::from_millis(limits.handler_timeout_ms);
371        self.retry_after_secs = limits.retry_after_secs;
372        self.reject_503_bytes = Arc::new(build_reject_503_bytes(limits.retry_after_secs));
373        self
374    }
375
376    #[doc(hidden)]
377    pub fn retry_after_secs(&self) -> u64 {
378        self.retry_after_secs
379    }
380
381    #[doc(hidden)]
382    pub fn http_limiter(&self) -> &HttpConnectionLimiter {
383        &self.http_limiter
384    }
385
386    /// Visible for tests. Override the per-handler total-time deadline
387    /// (issue #570 slice 2). Default 30s.
388    #[doc(hidden)]
389    pub fn with_handler_timeout(mut self, timeout: Duration) -> Self {
390        self.handler_timeout = timeout;
391        self
392    }
393
394    #[doc(hidden)]
395    pub fn handler_timeout(&self) -> Duration {
396        self.handler_timeout
397    }
398
399    /// Test hook: set a synchronous sleep (in ms) inserted between
400    /// route dispatch and response write. The integration test for
401    /// slice 2 sets a value greater than `handler_timeout` to trip
402    /// the deadline, then resets to 0 to verify recovery. Shared via
403    /// `Arc<AtomicU64>` so cloned server handles see the same flip.
404    #[doc(hidden)]
405    pub fn set_test_slow_inject_ms(&self, ms: u64) {
406        self.slow_inject_ms.store(ms, Ordering::Relaxed);
407    }
408
409    /// Attach an `AuthStore` for HTTP-layer authentication.
410    /// Also injects the store into the runtime so that `Value::Secret`
411    /// auto-encrypt/decrypt can reach the vault AES key.
412    pub fn with_auth(mut self, auth_store: Arc<AuthStore>) -> Self {
413        self.runtime.set_auth_store(Arc::clone(&auth_store));
414        self.auth_store = Some(auth_store);
415        self
416    }
417
418    /// Attach replication state for status and snapshot endpoints.
419    pub fn with_replication(mut self, state: Arc<ServerReplicationState>) -> Self {
420        self.replication = Some(state);
421        self
422    }
423
424    pub fn runtime(&self) -> &RedDBRuntime {
425        &self.runtime
426    }
427
428    pub fn options(&self) -> &ServerOptions {
429        &self.options
430    }
431
432    fn query_use_cases(&self) -> QueryUseCases<'_, RedDBRuntime> {
433        QueryUseCases::new(&self.runtime)
434    }
435
436    fn admin_use_cases(&self) -> AdminUseCases<'_, RedDBRuntime> {
437        AdminUseCases::new(&self.runtime)
438    }
439
440    fn entity_use_cases(&self) -> EntityUseCases<'_, RedDBRuntime> {
441        EntityUseCases::new(&self.runtime)
442    }
443
444    fn catalog_use_cases(&self) -> CatalogUseCases<'_, RedDBRuntime> {
445        CatalogUseCases::new(&self.runtime)
446    }
447
448    fn graph_use_cases(&self) -> GraphUseCases<'_, RedDBRuntime> {
449        GraphUseCases::new(&self.runtime)
450    }
451
452    fn native_use_cases(&self) -> NativeUseCases<'_, RedDBRuntime> {
453        NativeUseCases::new(&self.runtime)
454    }
455
456    fn tree_use_cases(&self) -> TreeUseCases<'_, RedDBRuntime> {
457        TreeUseCases::new(&self.runtime)
458    }
459
460    fn transport_readiness_json(&self) -> JsonValue {
461        let active = self
462            .options
463            .transport_readiness
464            .active
465            .iter()
466            .map(|listener| {
467                let mut object = Map::new();
468                object.insert(
469                    "transport".to_string(),
470                    JsonValue::String(listener.transport.clone()),
471                );
472                object.insert(
473                    "bind_addr".to_string(),
474                    JsonValue::String(listener.bind_addr.clone()),
475                );
476                object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
477                JsonValue::Object(object)
478            })
479            .collect();
480        let failed = self
481            .options
482            .transport_readiness
483            .failed
484            .iter()
485            .map(|listener| {
486                let mut object = Map::new();
487                object.insert(
488                    "transport".to_string(),
489                    JsonValue::String(listener.transport.clone()),
490                );
491                object.insert(
492                    "bind_addr".to_string(),
493                    JsonValue::String(listener.bind_addr.clone()),
494                );
495                object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
496                object.insert(
497                    "reason".to_string(),
498                    JsonValue::String(listener.reason.clone()),
499                );
500                JsonValue::Object(object)
501            })
502            .collect();
503
504        let mut object = Map::new();
505        object.insert("active".to_string(), JsonValue::Array(active));
506        object.insert("failed".to_string(), JsonValue::Array(failed));
507        JsonValue::Object(object)
508    }
509
510    fn health_json_with_transport(&self, report: &HealthReport) -> JsonValue {
511        let mut value = crate::presentation::ops_json::health_json(report);
512        if let JsonValue::Object(ref mut object) = value {
513            object.insert(
514                "transport_listeners".to_string(),
515                self.transport_readiness_json(),
516            );
517        }
518        value
519    }
520
521    pub fn serve(&self) -> io::Result<()> {
522        let listener = TcpListener::bind(&self.options.bind_addr)?;
523        self.serve_on(listener)
524    }
525
526    pub fn serve_on(&self, listener: TcpListener) -> io::Result<()> {
527        for stream in listener.incoming() {
528            match stream {
529                Ok(stream) => match self.http_limiter.try_acquire() {
530                    Some(permit) => {
531                        // Spawn a thread per connection for concurrent request handling
532                        let server = self.clone();
533                        thread::spawn(move || {
534                            let _guard = permit; // released on thread exit
535                            let _ = server.handle_connection(stream);
536                        });
537                    }
538                    None => {
539                        // Cap exhausted: write static 503 inline on the
540                        // accept thread, close the socket, and continue.
541                        // No thread spawn, no `HttpRequest::read_from`,
542                        // no runtime call.
543                        self.http_metrics
544                            .record_reject(HttpTransport::Http, HttpRejectReason::CapExhausted);
545                        self.reject_with_503(stream, self.options.write_timeout_ms);
546                    }
547                },
548                Err(err) => return Err(err),
549            }
550        }
551        Ok(())
552    }
553
554    /// 503 response used when the connection limiter is full. The
555    /// payload is pre-rendered into `reject_503_bytes` at construction
556    /// (issue #574 slice 5: `Retry-After` is configurable), so the
557    /// hot path is still one write and a close.
558    fn reject_with_503(&self, mut stream: TcpStream, write_timeout_ms: u64) {
559        let _ = stream.set_write_timeout(Some(Duration::from_millis(write_timeout_ms)));
560        let _ = stream.write_all(&self.reject_503_bytes);
561        let _ = stream.flush();
562        let _ = stream.shutdown(std::net::Shutdown::Both);
563    }
564
565    pub fn serve_one_on(&self, listener: TcpListener) -> io::Result<()> {
566        let (stream, _) = listener.accept()?;
567        self.handle_connection(stream)
568    }
569
570    pub fn serve_in_background(&self) -> thread::JoinHandle<io::Result<()>> {
571        let server = self.clone();
572        thread::spawn(move || server.serve())
573    }
574
575    pub fn serve_in_background_on(
576        &self,
577        listener: TcpListener,
578    ) -> thread::JoinHandle<io::Result<()>> {
579        let server = self.clone();
580        thread::spawn(move || server.serve_on(listener))
581    }
582
583    /// Serve TLS-wrapped HTTPS on the configured `bind_addr`. The
584    /// `tls_config` is shared across all connections (rustls
585    /// `ServerConfig` is `Send + Sync`).
586    pub fn serve_tls(&self, tls_config: std::sync::Arc<rustls::ServerConfig>) -> io::Result<()> {
587        let listener = TcpListener::bind(&self.options.bind_addr)?;
588        self.serve_tls_on(listener, tls_config)
589    }
590
591    pub fn serve_tls_on(
592        &self,
593        listener: TcpListener,
594        tls_config: std::sync::Arc<rustls::ServerConfig>,
595    ) -> io::Result<()> {
596        for stream in listener.incoming() {
597            match stream {
598                Ok(stream) => match self.http_limiter.try_acquire() {
599                    Some(permit) => {
600                        let server = self.clone();
601                        let cfg = tls_config.clone();
602                        thread::spawn(move || {
603                            let _guard = permit; // released on thread exit
604                            let _ = server.handle_tls_connection(stream, cfg);
605                        });
606                    }
607                    None => {
608                        // Issue #572 slice 3: cross-transport cap shared
609                        // with the clear-text limiter. Writing a 503 over
610                        // a non-handshaken TLS socket is not meaningful,
611                        // so reject by closing the raw socket. No TLS
612                        // handshake, no thread spawn, no runtime call.
613                        self.http_metrics
614                            .record_reject(HttpTransport::Https, HttpRejectReason::CapExhausted);
615                        let _ = stream.shutdown(std::net::Shutdown::Both);
616                        drop(stream);
617                    }
618                },
619                Err(err) => return Err(err),
620            }
621        }
622        Ok(())
623    }
624
625    pub fn serve_tls_in_background(
626        &self,
627        tls_config: std::sync::Arc<rustls::ServerConfig>,
628    ) -> thread::JoinHandle<io::Result<()>> {
629        let server = self.clone();
630        thread::spawn(move || server.serve_tls(tls_config))
631    }
632
633    pub fn serve_tls_in_background_on(
634        &self,
635        listener: TcpListener,
636        tls_config: std::sync::Arc<rustls::ServerConfig>,
637    ) -> thread::JoinHandle<io::Result<()>> {
638        let server = self.clone();
639        thread::spawn(move || server.serve_tls_on(listener, tls_config))
640    }
641
642    fn handle_connection(&self, stream: TcpStream) -> io::Result<()> {
643        let started = Instant::now();
644        let result = self.handle_connection_inner(stream, started);
645        let elapsed = started.elapsed().as_secs_f64();
646        self.http_metrics
647            .record_duration(HttpTransport::Http, elapsed);
648        result
649    }
650
651    fn handle_connection_inner(
652        &self,
653        mut stream: TcpStream,
654        started: Instant,
655    ) -> io::Result<()> {
656        stream.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
657        stream.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;
658
659        // Issue #570 slice 2: arm a deadline at handler spawn and
660        // check at coarse boundaries. No hard pre-emption — a thread
661        // blocked inside a true syscall is still bounded only by the
662        // per-socket read/write timeouts.
663        let deadline = started + self.handler_timeout;
664
665        let request = HttpRequest::read_from(&mut stream, self.options.max_body_bytes)?;
666
667        // Boundary (a): between request parse and route dispatch.
668        if Instant::now() >= deadline {
669            self.http_metrics
670                .record_reject(HttpTransport::Http, HttpRejectReason::HandlerTimeout);
671            Self::write_handler_timeout_503(&mut stream);
672            return Ok(());
673        }
674
675        if self.try_route_streaming(&request, &mut stream)? {
676            return Ok(());
677        }
678        let response = self.route(request);
679
680        // Test-only injected slow downstream (issue #570 slice 2
681        // integration test). Production builds set this to 0, so this
682        // is a single relaxed atomic load on the hot path.
683        let inject_ms = self.slow_inject_ms.load(Ordering::Relaxed);
684        if inject_ms > 0 {
685            thread::sleep(Duration::from_millis(inject_ms));
686        }
687
688        // Boundary (b): between route dispatch and response write.
689        if Instant::now() >= deadline {
690            self.http_metrics
691                .record_reject(HttpTransport::Http, HttpRejectReason::HandlerTimeout);
692            Self::write_handler_timeout_503(&mut stream);
693            return Ok(());
694        }
695
696        stream.write_all(&response.to_http_bytes())?;
697        stream.flush()?;
698        Ok(())
699    }
700
701    /// Best-effort 503 emitted when the per-handler deadline expires
702    /// at a coarse boundary. Writes are swallowed — the caller has
703    /// already exceeded its budget, so we do not propagate write
704    /// errors. Permit drop happens on the handler thread's normal
705    /// exit path.
706    fn write_handler_timeout_503<S: Write>(stream: &mut S) {
707        const RESPONSE: &[u8] = b"HTTP/1.1 503 Service Unavailable\r\n\
708            Connection: close\r\n\
709            Content-Length: 0\r\n\
710            \r\n";
711        let _ = stream.write_all(RESPONSE);
712        let _ = stream.flush();
713    }
714
715    fn handle_tls_connection(
716        &self,
717        tcp: TcpStream,
718        tls_config: std::sync::Arc<rustls::ServerConfig>,
719    ) -> io::Result<()> {
720        let started = Instant::now();
721        let result = self.handle_tls_connection_inner(tcp, tls_config, started);
722        let elapsed = started.elapsed().as_secs_f64();
723        self.http_metrics
724            .record_duration(HttpTransport::Https, elapsed);
725        result
726    }
727
728    fn handle_tls_connection_inner(
729        &self,
730        tcp: TcpStream,
731        tls_config: std::sync::Arc<rustls::ServerConfig>,
732        started: Instant,
733    ) -> io::Result<()> {
734        tcp.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
735        tcp.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;
736
737        // Issue #572 slice 3: per-handler deadline applies to TLS
738        // handlers too — same handler-thread scaffolding as slice 2.
739        let deadline = started + self.handler_timeout;
740
741        let mut tls_stream = match self::tls::accept_tls(tls_config, tcp) {
742            Ok(s) => s,
743            Err(err) => {
744                tracing::warn!(
745                    target: "reddb::http_tls",
746                    err = %err,
747                    "TLS handshake failed"
748                );
749                return Err(err);
750            }
751        };
752        let request = match HttpRequest::read_from(&mut tls_stream, self.options.max_body_bytes) {
753            Ok(req) => req,
754            Err(err) => {
755                tracing::warn!(
756                    target: "reddb::http_tls",
757                    err = %err,
758                    "TLS request parse failed"
759                );
760                return Err(err);
761            }
762        };
763
764        // Boundary (a): between request parse and route dispatch.
765        if Instant::now() >= deadline {
766            self.http_metrics
767                .record_reject(HttpTransport::Https, HttpRejectReason::HandlerTimeout);
768            Self::write_handler_timeout_503(&mut tls_stream);
769            return Ok(());
770        }
771
772        if self.try_route_streaming(&request, &mut tls_stream)? {
773            return Ok(());
774        }
775        let response = self.route(request);
776
777        // Test-only injected slow downstream (issue #572 slice 3
778        // integration test). Production sets this to 0.
779        let inject_ms = self.slow_inject_ms.load(Ordering::Relaxed);
780        if inject_ms > 0 {
781            thread::sleep(Duration::from_millis(inject_ms));
782        }
783
784        // Boundary (b): between route dispatch and response write.
785        if Instant::now() >= deadline {
786            self.http_metrics
787                .record_reject(HttpTransport::Https, HttpRejectReason::HandlerTimeout);
788            Self::write_handler_timeout_503(&mut tls_stream);
789            return Ok(());
790        }
791
792        tls_stream.write_all(&response.to_http_bytes())?;
793        tls_stream.flush()?;
794        Ok(())
795    }
796}
797
798/// Pre-render the limiter-reject 503 response. The `Retry-After`
799/// value comes from the resolved HTTP limits (issue #574 slice 5)
800/// and is fixed for the lifetime of the server, so we build the
801/// bytes once and hand a shared `Arc<Vec<u8>>` to every accept loop.
802fn build_reject_503_bytes(retry_after_secs: u64) -> Vec<u8> {
803    format!(
804        "HTTP/1.1 503 Service Unavailable\r\n\
805         Connection: close\r\n\
806         Content-Length: 0\r\n\
807         Retry-After: {retry_after_secs}\r\n\
808         \r\n"
809    )
810    .into_bytes()
811}