Skip to main content

reddb_server/
server.rs

1//! Minimal HTTP server for RedDB management and remote access.
2
3pub(crate) use crate::application::json_input::{
4    json_bool_field, json_f32_field, json_string_field, json_usize_field,
5};
6pub(crate) use crate::application::{
7    AdminUseCases, CatalogUseCases, CreateDocumentInput, CreateEdgeInput, CreateEntityOutput,
8    CreateKvInput, CreateNodeEmbeddingInput, CreateNodeGraphLinkInput, CreateNodeInput,
9    CreateNodeTableLinkInput, CreateRowInput, CreateVectorInput, DeleteEntityInput, EntityUseCases,
10    ExecuteQueryInput, ExplainQueryInput, GraphCentralityInput, GraphClusteringInput,
11    GraphCommunitiesInput, GraphComponentsInput, GraphCyclesInput, GraphHitsInput,
12    GraphNeighborhoodInput, GraphPersonalizedPageRankInput, GraphShortestPathInput,
13    GraphTopologicalSortInput, GraphTraversalInput, GraphUseCases, InspectNativeArtifactInput,
14    NativeUseCases, PatchEntityInput, PatchEntityOperation, PatchEntityOperationType,
15    QueryUseCases, SearchHybridInput, SearchIvfInput, SearchMultimodalInput, SearchSimilarInput,
16    SearchTextInput, TreeUseCases,
17};
18use std::collections::{BTreeMap, HashMap};
19use std::io::{self, Read, Write};
20use std::net::{TcpListener, TcpStream};
21use std::sync::atomic::{AtomicU64, Ordering};
22use std::thread;
23use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
24
25use std::sync::Arc;
26
27use crate::api::{RedDBError, RedDBOptions, RedDBResult};
28use crate::auth::store::AuthStore;
29use crate::catalog::{CatalogModelSnapshot, CollectionDescriptor, CollectionModel, SchemaMode};
30use crate::health::{HealthProvider, HealthReport, HealthState};
31use crate::json::{parse_json, to_vec as json_to_vec, Map, Value as JsonValue};
32use crate::runtime::{
33    RedDBRuntime, RuntimeFilter, RuntimeFilterValue, RuntimeGraphCentralityAlgorithm,
34    RuntimeGraphCentralityResult, RuntimeGraphClusteringResult, RuntimeGraphCommunityAlgorithm,
35    RuntimeGraphCommunityResult, RuntimeGraphComponentsMode, RuntimeGraphComponentsResult,
36    RuntimeGraphCyclesResult, RuntimeGraphDirection, RuntimeGraphHitsResult,
37    RuntimeGraphNeighborhoodResult, RuntimeGraphPathAlgorithm, RuntimeGraphPathResult,
38    RuntimeGraphPattern, RuntimeGraphProjection, RuntimeGraphTopologicalSortResult,
39    RuntimeGraphTraversalResult, RuntimeGraphTraversalStrategy, RuntimeIvfSearchResult,
40    RuntimeQueryWeights, RuntimeStats, ScanCursor, ScanPage,
41};
42use crate::storage::schema::Value;
43use crate::storage::unified::devx::refs::{NodeRef, TableRef, VectorRef};
44use crate::storage::unified::dsl::{MatchComponents, QueryResult as DslQueryResult};
45use crate::storage::unified::{MetadataValue, RefTarget, SparseVector};
46use crate::storage::{CrossRef, EntityData, EntityId, EntityKind, SimilarResult, UnifiedEntity};
47
48fn analytics_job_json(job: &crate::PhysicalAnalyticsJob) -> JsonValue {
49    crate::presentation::admin_json::analytics_job_json(job)
50}
51
52#[cfg(test)]
53mod tests {
54    use super::*;
55    use crate::api::RedDBOptions;
56    use crate::health::HealthReport;
57    use crate::service_cli::{
58        TransportListenerFailure, TransportListenerState, TransportReadiness,
59    };
60
61    #[test]
62    fn server_options_default_http_body_limit_is_32_mib() {
63        assert_eq!(ServerOptions::default().max_body_bytes, 32 * 1024 * 1024);
64    }
65
66    #[test]
67    fn health_json_reports_transport_listeners() {
68        let runtime = RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime");
69        let mut options = ServerOptions::default();
70        options.transport_readiness = TransportReadiness {
71            active: vec![TransportListenerState {
72                transport: "grpc".to_string(),
73                bind_addr: "127.0.0.1:50051".to_string(),
74                explicit: true,
75            }],
76            failed: vec![TransportListenerFailure {
77                transport: "http".to_string(),
78                bind_addr: "127.0.0.1:5055".to_string(),
79                explicit: false,
80                reason: "http listener bind 127.0.0.1:5055: address in use".to_string(),
81            }],
82        };
83        let server = RedDBServer::with_options(runtime, options);
84
85        let payload = server.health_json_with_transport(&HealthReport::healthy());
86        let JsonValue::Object(root) = payload else {
87            panic!("health payload should be an object");
88        };
89        let Some(JsonValue::Object(listeners)) = root.get("transport_listeners") else {
90            panic!("health payload should include transport_listeners");
91        };
92        let Some(JsonValue::Array(active)) = listeners.get("active") else {
93            panic!("transport_listeners.active should be an array");
94        };
95        let Some(JsonValue::Array(failed)) = listeners.get("failed") else {
96            panic!("transport_listeners.failed should be an array");
97        };
98
99        assert_eq!(active.len(), 1);
100        assert_eq!(failed.len(), 1);
101    }
102}
103
104fn graph_projection_json(projection: &crate::PhysicalGraphProjection) -> JsonValue {
105    crate::presentation::admin_json::graph_projection_json(projection)
106}
107
108pub mod handlers_admin;
109mod handlers_ai;
110mod handlers_auth;
111mod handlers_backup;
112mod handlers_ec;
113pub(crate) mod handlers_entity;
114mod handlers_geo;
115mod handlers_graph;
116mod handlers_keyed;
117mod handlers_log;
118mod handlers_metrics;
119mod handlers_ops;
120mod handlers_query;
121mod handlers_replication;
122mod handlers_vcs;
123mod handlers_vector;
124pub mod header_escape_guard;
125pub mod http_connection_limiter;
126pub mod http_handler_metrics;
127pub mod http_limits;
128pub mod ingest_pipeline;
129mod patch_support;
130mod request_body;
131mod request_context;
132mod routing;
133mod serverless_support;
134pub mod tls;
135mod transport;
136
137use self::handlers_ai::*;
138use self::handlers_entity::*;
139use self::handlers_graph::*;
140use self::handlers_keyed::*;
141use self::handlers_metrics::*;
142use self::handlers_ops::*;
143use self::handlers_query::*;
144use self::http_connection_limiter::{
145    HandlerDeadline, HttpConnectionLimiter, MonotonicClock, SystemMonotonicClock,
146};
147use self::http_handler_metrics::{HttpHandlerMetrics, HttpRejectReason, HttpTransport};
148pub use self::http_limits::{
149    HttpLimitsCliInput, HttpLimitsResolved, DEFAULT_HANDLER_TIMEOUT_MS, DEFAULT_RETRY_AFTER_SECS,
150};
151use self::patch_support::*;
152use self::request_body::*;
153use self::routing::*;
154use self::serverless_support::*;
155use self::transport::*;
156
157/// PLAN.md Phase 6.2 — endpoint segregation. A given HTTP listener
158/// can serve either every public surface (`Public`, default) or a
159/// restricted slice (`AdminOnly`, `MetricsOnly`). The route filter at
160/// the top of `route()` consults this so a port bound only to
161/// loopback for admin work won't accidentally hand out DML.
162#[derive(Debug, Clone, Copy, PartialEq, Eq)]
163pub enum ServerSurface {
164    /// Everything routed normally (default — matches v0 behaviour).
165    Public,
166    /// Only `/admin/*`, `/metrics`, and `/health/*`. Other paths
167    /// return 404. Intended for `RED_ADMIN_BIND` operator listeners
168    /// which default to `127.0.0.1`.
169    AdminOnly,
170    /// Only `/metrics` and `/health/*`. Intended for
171    /// `RED_METRICS_BIND` Prometheus scrape ports that may be
172    /// exposed to non-admin networks.
173    MetricsOnly,
174}
175
176#[derive(Debug, Clone)]
177pub struct ServerOptions {
178    pub bind_addr: String,
179    pub max_body_bytes: usize,
180    pub read_timeout_ms: u64,
181    pub write_timeout_ms: u64,
182    pub max_scan_limit: usize,
183    /// Which subset of paths this listener serves. Defaults to
184    /// `Public`. Set to `AdminOnly` / `MetricsOnly` for dedicated
185    /// admin / scrape ports (PLAN.md Phase 6.2).
186    pub surface: ServerSurface,
187    pub transport_readiness: crate::service_cli::TransportReadiness,
188}
189
190pub const DEFAULT_HTTP_MAX_BODY_BYTES: usize = 32 * 1024 * 1024;
191
192impl Default for ServerOptions {
193    fn default() -> Self {
194        Self {
195            bind_addr: "127.0.0.1:5055".to_string(),
196            max_body_bytes: DEFAULT_HTTP_MAX_BODY_BYTES,
197            read_timeout_ms: 5_000,
198            write_timeout_ms: 5_000,
199            max_scan_limit: 1_000,
200            surface: ServerSurface::Public,
201            transport_readiness: crate::service_cli::TransportReadiness::default(),
202        }
203    }
204}
205
206/// Replication state exposed to the HTTP server.
207pub struct ServerReplicationState {
208    pub config: crate::replication::ReplicationConfig,
209    pub primary: Option<crate::replication::primary::PrimaryReplication>,
210}
211
212#[derive(Clone)]
213pub struct RedDBServer {
214    runtime: RedDBRuntime,
215    options: ServerOptions,
216    auth_store: Option<Arc<AuthStore>>,
217    replication: Option<Arc<ServerReplicationState>>,
218    /// Bounded handler-thread admission for the clear-text HTTP accept
219    /// loop (issue #570 slice 1). Cloned with the server; `Clone` of
220    /// `HttpConnectionLimiter` shares an `Arc` so every serve loop on
221    /// the same `RedDBServer` shares one cap.
222    http_limiter: HttpConnectionLimiter,
223    /// Per-handler total-time deadline (issue #570 slice 2). Each
224    /// clear-text handler thread arms a deadline at spawn and bails
225    /// with a best-effort 503 at coarse boundaries between request
226    /// parse, route dispatch, and response write. Hard-coded to 30s
227    /// here; the config knob lands in slice 5.
228    handler_timeout: Duration,
229    /// Monotonic clock the per-handler deadline (issue #621) is armed
230    /// against — the same [`MonotonicClock`] abstraction the limiter
231    /// uses. Production wires [`SystemMonotonicClock`] (real wall time);
232    /// tests inject a fake to drive timeout expiry deterministically
233    /// without `sleep()`. Shared via `Arc` so cloned server handles
234    /// (e.g. `serve_in_background`) read the same clock.
235    handler_clock: Arc<dyn MonotonicClock>,
236    /// Test-only synchronous sleep injected between route dispatch and
237    /// response write so an integration test can simulate a slow
238    /// downstream tripping the deadline. Default 0 (no-op). Shared via
239    /// `Arc` so a cloned `RedDBServer` (e.g. `serve_in_background`)
240    /// observes flips from the originating handle. Production callers
241    /// have no way to set this — the setter is `#[doc(hidden)]`.
242    slow_inject_ms: Arc<AtomicU64>,
243    /// Prometheus metrics for the HTTP handler-thread pool (issue
244    /// #573 slice 4). Records rejections (cap_exhausted /
245    /// handler_timeout) and per-handler duration histograms. Cloned
246    /// with the server via `Arc` so every serve loop on the same
247    /// `RedDBServer` writes to one set of counters.
248    http_metrics: HttpHandlerMetrics,
249    /// `Retry-After` value (seconds) emitted in the limiter's reject
250    /// path (issue #574 slice 5). Pre-rendered into `reject_503_bytes`
251    /// at construction so the hot path is one write+close.
252    retry_after_secs: u64,
253    /// Cached bytes of the limiter's 503 response. Shared via `Arc`
254    /// across cloned server handles so flipping `retry_after_secs`
255    /// via `with_http_limits` propagates to every accept loop.
256    reject_503_bytes: Arc<Vec<u8>>,
257}
258
259/// Default per-handler total-time budget (issue #571 slice 2).
260const DEFAULT_HANDLER_TIMEOUT: Duration = Duration::from_millis(30_000);
261
262#[derive(Debug, Clone, Copy, PartialEq, Eq)]
263enum ServerlessWarmupScope {
264    Indexes,
265    GraphProjections,
266    AnalyticsJobs,
267    NativeArtifacts,
268}
269
270#[derive(Debug, Clone, Copy, PartialEq, Eq)]
271enum DeploymentProfile {
272    Embedded,
273    Server,
274    Serverless,
275}
276
277fn percent_decode_path_segment(input: &str) -> Result<String, String> {
278    let bytes = input.as_bytes();
279    let mut out = Vec::with_capacity(bytes.len());
280    let mut index = 0;
281    while index < bytes.len() {
282        match bytes[index] {
283            b'%' => {
284                if index + 2 >= bytes.len() {
285                    return Err("truncated percent escape".to_string());
286                }
287                let high = hex_value(bytes[index + 1])
288                    .ok_or_else(|| "invalid percent escape".to_string())?;
289                let low = hex_value(bytes[index + 2])
290                    .ok_or_else(|| "invalid percent escape".to_string())?;
291                out.push((high << 4) | low);
292                index += 3;
293            }
294            byte => {
295                out.push(byte);
296                index += 1;
297            }
298        }
299    }
300    String::from_utf8(out).map_err(|_| "path segment is not valid UTF-8".to_string())
301}
302
303fn hex_value(byte: u8) -> Option<u8> {
304    match byte {
305        b'0'..=b'9' => Some(byte - b'0'),
306        b'a'..=b'f' => Some(byte - b'a' + 10),
307        b'A'..=b'F' => Some(byte - b'A' + 10),
308        _ => None,
309    }
310}
311
312#[derive(Debug, Clone)]
313struct ParsedQueryRequest {
314    query: String,
315    entity_types: Option<Vec<String>>,
316    capabilities: Option<Vec<String>>,
317    /// Optional positional `$N` bind parameters (#358). When `Some`, the
318    /// query handler runs the user_params binder before executing.
319    /// Absence preserves the legacy `query`-only behavior.
320    params: Option<Vec<Value>>,
321}
322
323#[derive(Debug, Clone, Copy)]
324enum PatchOperationType {
325    Set,
326    Replace,
327    Unset,
328}
329
330#[derive(Debug, Clone)]
331struct PatchOperation {
332    op: PatchOperationType,
333    path: Vec<String>,
334    value: Option<JsonValue>,
335}
336
337impl RedDBServer {
338    pub fn new(runtime: RedDBRuntime) -> Self {
339        Self::with_options(runtime, ServerOptions::default())
340    }
341
342    pub fn from_database_options(
343        db_options: RedDBOptions,
344        server_options: ServerOptions,
345    ) -> RedDBResult<Self> {
346        let runtime = RedDBRuntime::with_options(db_options)?;
347        Ok(Self::with_options(runtime, server_options))
348    }
349
350    pub fn with_options(runtime: RedDBRuntime, options: ServerOptions) -> Self {
351        Self {
352            runtime,
353            options,
354            auth_store: None,
355            replication: None,
356            http_limiter: HttpConnectionLimiter::with_default_cap(),
357            handler_timeout: DEFAULT_HANDLER_TIMEOUT,
358            handler_clock: Arc::new(SystemMonotonicClock::new()),
359            slow_inject_ms: Arc::new(AtomicU64::new(0)),
360            http_metrics: HttpHandlerMetrics::new(),
361            retry_after_secs: DEFAULT_RETRY_AFTER_SECS,
362            reject_503_bytes: Arc::new(build_reject_503_bytes(DEFAULT_RETRY_AFTER_SECS)),
363        }
364    }
365
366    #[doc(hidden)]
367    pub fn http_metrics(&self) -> &HttpHandlerMetrics {
368        &self.http_metrics
369    }
370
371    /// Visible for tests. Lets the integration test in
372    /// `tests/http_connection_limiter.rs` saturate the cap and observe
373    /// `503 Service Unavailable` responses without spinning up
374    /// thousands of sockets.
375    #[doc(hidden)]
376    pub fn with_http_limiter_cap(mut self, cap: usize) -> Self {
377        self.http_limiter = HttpConnectionLimiter::new(cap);
378        self
379    }
380
381    /// Stamp resolved HTTP limits onto the server (issue #574 slice 5).
382    /// Replaces the limiter cap, the per-handler deadline, and the
383    /// `Retry-After` value used by the limiter's reject path. All
384    /// values are assumed validated by [`http_limits::resolve_http_limits`].
385    pub fn with_http_limits(mut self, limits: HttpLimitsResolved) -> Self {
386        self.http_limiter = HttpConnectionLimiter::new(limits.max_handlers);
387        self.handler_timeout = Duration::from_millis(limits.handler_timeout_ms);
388        self.retry_after_secs = limits.retry_after_secs;
389        self.reject_503_bytes = Arc::new(build_reject_503_bytes(limits.retry_after_secs));
390        self
391    }
392
393    #[doc(hidden)]
394    pub fn retry_after_secs(&self) -> u64 {
395        self.retry_after_secs
396    }
397
398    #[doc(hidden)]
399    pub fn http_limiter(&self) -> &HttpConnectionLimiter {
400        &self.http_limiter
401    }
402
403    /// Visible for tests. Override the per-handler total-time deadline
404    /// (issue #570 slice 2). Default 30s.
405    #[doc(hidden)]
406    pub fn with_handler_timeout(mut self, timeout: Duration) -> Self {
407        self.handler_timeout = timeout;
408        self
409    }
410
411    #[doc(hidden)]
412    pub fn handler_timeout(&self) -> Duration {
413        self.handler_timeout
414    }
415
416    /// Visible for tests. Override the clock the per-handler deadline
417    /// (issue #621) is armed against, so timeout expiry can be driven
418    /// deterministically without real sleeps. Default is the real
419    /// monotonic clock.
420    #[doc(hidden)]
421    pub fn with_handler_clock(mut self, clock: Arc<dyn MonotonicClock>) -> Self {
422        self.handler_clock = clock;
423        self
424    }
425
426    /// Test hook: set a synchronous sleep (in ms) inserted between
427    /// route dispatch and response write. The integration test for
428    /// slice 2 sets a value greater than `handler_timeout` to trip
429    /// the deadline, then resets to 0 to verify recovery. Shared via
430    /// `Arc<AtomicU64>` so cloned server handles see the same flip.
431    #[doc(hidden)]
432    pub fn set_test_slow_inject_ms(&self, ms: u64) {
433        self.slow_inject_ms.store(ms, Ordering::Relaxed);
434    }
435
436    /// Attach an `AuthStore` for HTTP-layer authentication.
437    /// Also injects the store into the runtime so that `Value::Secret`
438    /// auto-encrypt/decrypt can reach the vault AES key.
439    pub fn with_auth(mut self, auth_store: Arc<AuthStore>) -> Self {
440        self.runtime.set_auth_store(Arc::clone(&auth_store));
441        self.auth_store = Some(auth_store);
442        self
443    }
444
445    /// Attach replication state for status and snapshot endpoints.
446    pub fn with_replication(mut self, state: Arc<ServerReplicationState>) -> Self {
447        self.replication = Some(state);
448        self
449    }
450
451    pub fn runtime(&self) -> &RedDBRuntime {
452        &self.runtime
453    }
454
455    pub fn options(&self) -> &ServerOptions {
456        &self.options
457    }
458
459    fn query_use_cases(&self) -> QueryUseCases<'_, RedDBRuntime> {
460        QueryUseCases::new(&self.runtime)
461    }
462
463    fn admin_use_cases(&self) -> AdminUseCases<'_, RedDBRuntime> {
464        AdminUseCases::new(&self.runtime)
465    }
466
467    fn entity_use_cases(&self) -> EntityUseCases<'_, RedDBRuntime> {
468        EntityUseCases::new(&self.runtime)
469    }
470
471    fn catalog_use_cases(&self) -> CatalogUseCases<'_, RedDBRuntime> {
472        CatalogUseCases::new(&self.runtime)
473    }
474
475    fn graph_use_cases(&self) -> GraphUseCases<'_, RedDBRuntime> {
476        GraphUseCases::new(&self.runtime)
477    }
478
479    fn native_use_cases(&self) -> NativeUseCases<'_, RedDBRuntime> {
480        NativeUseCases::new(&self.runtime)
481    }
482
483    fn tree_use_cases(&self) -> TreeUseCases<'_, RedDBRuntime> {
484        TreeUseCases::new(&self.runtime)
485    }
486
487    fn transport_readiness_json(&self) -> JsonValue {
488        let active = self
489            .options
490            .transport_readiness
491            .active
492            .iter()
493            .map(|listener| {
494                let mut object = Map::new();
495                object.insert(
496                    "transport".to_string(),
497                    JsonValue::String(listener.transport.clone()),
498                );
499                object.insert(
500                    "bind_addr".to_string(),
501                    JsonValue::String(listener.bind_addr.clone()),
502                );
503                object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
504                JsonValue::Object(object)
505            })
506            .collect();
507        let failed = self
508            .options
509            .transport_readiness
510            .failed
511            .iter()
512            .map(|listener| {
513                let mut object = Map::new();
514                object.insert(
515                    "transport".to_string(),
516                    JsonValue::String(listener.transport.clone()),
517                );
518                object.insert(
519                    "bind_addr".to_string(),
520                    JsonValue::String(listener.bind_addr.clone()),
521                );
522                object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
523                object.insert(
524                    "reason".to_string(),
525                    JsonValue::String(listener.reason.clone()),
526                );
527                JsonValue::Object(object)
528            })
529            .collect();
530
531        let mut object = Map::new();
532        object.insert("active".to_string(), JsonValue::Array(active));
533        object.insert("failed".to_string(), JsonValue::Array(failed));
534        JsonValue::Object(object)
535    }
536
537    fn health_json_with_transport(&self, report: &HealthReport) -> JsonValue {
538        let mut value = crate::presentation::ops_json::health_json(report);
539        if let JsonValue::Object(ref mut object) = value {
540            object.insert(
541                "transport_listeners".to_string(),
542                self.transport_readiness_json(),
543            );
544        }
545        value
546    }
547
548    pub fn serve(&self) -> io::Result<()> {
549        let listener = TcpListener::bind(&self.options.bind_addr)?;
550        self.serve_on(listener)
551    }
552
553    pub fn serve_on(&self, listener: TcpListener) -> io::Result<()> {
554        for stream in listener.incoming() {
555            match stream {
556                Ok(stream) => match self.http_limiter.try_acquire() {
557                    Some(permit) => {
558                        // Spawn a thread per connection for concurrent request handling
559                        let server = self.clone();
560                        thread::spawn(move || {
561                            let _guard = permit; // released on thread exit
562                            let _ = server.handle_connection(stream);
563                        });
564                    }
565                    None => {
566                        // Cap exhausted: write static 503 inline on the
567                        // accept thread, close the socket, and continue.
568                        // No thread spawn, no `HttpRequest::read_from`,
569                        // no runtime call.
570                        self.http_metrics
571                            .record_reject(HttpTransport::Http, HttpRejectReason::CapExhausted);
572                        self.reject_with_503(stream, self.options.write_timeout_ms);
573                    }
574                },
575                Err(err) => return Err(err),
576            }
577        }
578        Ok(())
579    }
580
581    /// 503 response used when the connection limiter is full. The
582    /// payload is pre-rendered into `reject_503_bytes` at construction
583    /// (issue #574 slice 5: `Retry-After` is configurable), so the
584    /// hot path is still one write and a close.
585    fn reject_with_503(&self, mut stream: TcpStream, write_timeout_ms: u64) {
586        let _ = stream.set_write_timeout(Some(Duration::from_millis(write_timeout_ms)));
587        let _ = stream.write_all(&self.reject_503_bytes);
588        let _ = stream.flush();
589        let _ = stream.shutdown(std::net::Shutdown::Both);
590    }
591
592    pub fn serve_one_on(&self, listener: TcpListener) -> io::Result<()> {
593        let (stream, _) = listener.accept()?;
594        self.handle_connection(stream)
595    }
596
597    pub fn serve_in_background(&self) -> thread::JoinHandle<io::Result<()>> {
598        let server = self.clone();
599        thread::spawn(move || server.serve())
600    }
601
602    pub fn serve_in_background_on(
603        &self,
604        listener: TcpListener,
605    ) -> thread::JoinHandle<io::Result<()>> {
606        let server = self.clone();
607        thread::spawn(move || server.serve_on(listener))
608    }
609
610    /// Serve TLS-wrapped HTTPS on the configured `bind_addr`. The
611    /// `tls_config` is shared across all connections (rustls
612    /// `ServerConfig` is `Send + Sync`).
613    pub fn serve_tls(&self, tls_config: std::sync::Arc<rustls::ServerConfig>) -> io::Result<()> {
614        let listener = TcpListener::bind(&self.options.bind_addr)?;
615        self.serve_tls_on(listener, tls_config)
616    }
617
618    pub fn serve_tls_on(
619        &self,
620        listener: TcpListener,
621        tls_config: std::sync::Arc<rustls::ServerConfig>,
622    ) -> io::Result<()> {
623        for stream in listener.incoming() {
624            match stream {
625                Ok(stream) => match self.http_limiter.try_acquire() {
626                    Some(permit) => {
627                        let server = self.clone();
628                        let cfg = tls_config.clone();
629                        thread::spawn(move || {
630                            let _guard = permit; // released on thread exit
631                            let _ = server.handle_tls_connection(stream, cfg);
632                        });
633                    }
634                    None => {
635                        // Issue #572 slice 3: cross-transport cap shared
636                        // with the clear-text limiter. Writing a 503 over
637                        // a non-handshaken TLS socket is not meaningful,
638                        // so reject by closing the raw socket. No TLS
639                        // handshake, no thread spawn, no runtime call.
640                        self.http_metrics
641                            .record_reject(HttpTransport::Https, HttpRejectReason::CapExhausted);
642                        let _ = stream.shutdown(std::net::Shutdown::Both);
643                        drop(stream);
644                    }
645                },
646                Err(err) => return Err(err),
647            }
648        }
649        Ok(())
650    }
651
652    pub fn serve_tls_in_background(
653        &self,
654        tls_config: std::sync::Arc<rustls::ServerConfig>,
655    ) -> thread::JoinHandle<io::Result<()>> {
656        let server = self.clone();
657        thread::spawn(move || server.serve_tls(tls_config))
658    }
659
660    pub fn serve_tls_in_background_on(
661        &self,
662        listener: TcpListener,
663        tls_config: std::sync::Arc<rustls::ServerConfig>,
664    ) -> thread::JoinHandle<io::Result<()>> {
665        let server = self.clone();
666        thread::spawn(move || server.serve_tls_on(listener, tls_config))
667    }
668
669    fn handle_connection(&self, stream: TcpStream) -> io::Result<()> {
670        let started = Instant::now();
671        let result = self.handle_connection_inner(stream);
672        let elapsed = started.elapsed().as_secs_f64();
673        self.http_metrics
674            .record_duration(HttpTransport::Http, elapsed);
675        result
676    }
677
678    fn handle_connection_inner(&self, mut stream: TcpStream) -> io::Result<()> {
679        stream.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
680        stream.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;
681
682        // Issue #570 slice 2 / #621: arm a deadline at handler spawn and
683        // check at coarse boundaries. Armed against the injectable
684        // monotonic clock (#621) so timeout behaviour is deterministically
685        // testable without real sleeps; production wires the real clock,
686        // so this tracks wall time. No hard pre-emption — a thread blocked
687        // inside a true syscall is still bounded only by the per-socket
688        // read/write timeouts.
689        let deadline = HandlerDeadline::arm(Arc::clone(&self.handler_clock), self.handler_timeout);
690
691        let request = HttpRequest::read_from(&mut stream, self.options.max_body_bytes)?;
692
693        // Boundary (a): between request parse and route dispatch.
694        if deadline.expired() {
695            self.http_metrics
696                .record_reject(HttpTransport::Http, HttpRejectReason::HandlerTimeout);
697            Self::write_handler_timeout_503(&mut stream);
698            return Ok(());
699        }
700
701        if self.try_route_streaming(&request, &mut stream)? {
702            return Ok(());
703        }
704        let response = self.route(request);
705
706        // Test-only injected slow downstream (issue #570 slice 2
707        // integration test). Production builds set this to 0, so this
708        // is a single relaxed atomic load on the hot path.
709        let inject_ms = self.slow_inject_ms.load(Ordering::Relaxed);
710        if inject_ms > 0 {
711            thread::sleep(Duration::from_millis(inject_ms));
712        }
713
714        // Boundary (b): between route dispatch and response write.
715        if deadline.expired() {
716            self.http_metrics
717                .record_reject(HttpTransport::Http, HttpRejectReason::HandlerTimeout);
718            Self::write_handler_timeout_503(&mut stream);
719            return Ok(());
720        }
721
722        stream.write_all(&response.to_http_bytes())?;
723        stream.flush()?;
724        Ok(())
725    }
726
727    /// Best-effort 503 emitted when the per-handler deadline expires
728    /// at a coarse boundary. Writes are swallowed — the caller has
729    /// already exceeded its budget, so we do not propagate write
730    /// errors. Permit drop happens on the handler thread's normal
731    /// exit path.
732    fn write_handler_timeout_503<S: Write>(stream: &mut S) {
733        const RESPONSE: &[u8] = b"HTTP/1.1 503 Service Unavailable\r\n\
734            Connection: close\r\n\
735            Content-Length: 0\r\n\
736            \r\n";
737        let _ = stream.write_all(RESPONSE);
738        let _ = stream.flush();
739    }
740
741    fn handle_tls_connection(
742        &self,
743        tcp: TcpStream,
744        tls_config: std::sync::Arc<rustls::ServerConfig>,
745    ) -> io::Result<()> {
746        let started = Instant::now();
747        let result = self.handle_tls_connection_inner(tcp, tls_config);
748        let elapsed = started.elapsed().as_secs_f64();
749        self.http_metrics
750            .record_duration(HttpTransport::Https, elapsed);
751        result
752    }
753
754    fn handle_tls_connection_inner(
755        &self,
756        tcp: TcpStream,
757        tls_config: std::sync::Arc<rustls::ServerConfig>,
758    ) -> io::Result<()> {
759        tcp.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
760        tcp.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;
761
762        // Issue #572 slice 3 / #621: per-handler deadline applies to TLS
763        // handlers too — same injectable-clock scaffolding as the
764        // clear-text path.
765        let deadline = HandlerDeadline::arm(Arc::clone(&self.handler_clock), self.handler_timeout);
766
767        let mut tls_stream = match self::tls::accept_tls(tls_config, tcp) {
768            Ok(s) => s,
769            Err(err) => {
770                tracing::warn!(
771                    target: "reddb::http_tls",
772                    err = %err,
773                    "TLS handshake failed"
774                );
775                return Err(err);
776            }
777        };
778        let request = match HttpRequest::read_from(&mut tls_stream, self.options.max_body_bytes) {
779            Ok(req) => req,
780            Err(err) => {
781                tracing::warn!(
782                    target: "reddb::http_tls",
783                    err = %err,
784                    "TLS request parse failed"
785                );
786                return Err(err);
787            }
788        };
789
790        // Boundary (a): between request parse and route dispatch.
791        if deadline.expired() {
792            self.http_metrics
793                .record_reject(HttpTransport::Https, HttpRejectReason::HandlerTimeout);
794            Self::write_handler_timeout_503(&mut tls_stream);
795            return Ok(());
796        }
797
798        if self.try_route_streaming(&request, &mut tls_stream)? {
799            return Ok(());
800        }
801        let response = self.route(request);
802
803        // Test-only injected slow downstream (issue #572 slice 3
804        // integration test). Production sets this to 0.
805        let inject_ms = self.slow_inject_ms.load(Ordering::Relaxed);
806        if inject_ms > 0 {
807            thread::sleep(Duration::from_millis(inject_ms));
808        }
809
810        // Boundary (b): between route dispatch and response write.
811        if deadline.expired() {
812            self.http_metrics
813                .record_reject(HttpTransport::Https, HttpRejectReason::HandlerTimeout);
814            Self::write_handler_timeout_503(&mut tls_stream);
815            return Ok(());
816        }
817
818        tls_stream.write_all(&response.to_http_bytes())?;
819        tls_stream.flush()?;
820        Ok(())
821    }
822}
823
824/// Pre-render the limiter-reject 503 response. The `Retry-After`
825/// value comes from the resolved HTTP limits (issue #574 slice 5)
826/// and is fixed for the lifetime of the server, so we build the
827/// bytes once and hand a shared `Arc<Vec<u8>>` to every accept loop.
828fn build_reject_503_bytes(retry_after_secs: u64) -> Vec<u8> {
829    format!(
830        "HTTP/1.1 503 Service Unavailable\r\n\
831         Connection: close\r\n\
832         Content-Length: 0\r\n\
833         Retry-After: {retry_after_secs}\r\n\
834         \r\n"
835    )
836    .into_bytes()
837}