Skip to main content

reddb_server/
server.rs

1//! Minimal HTTP server for RedDB management and remote access.
2
3pub(crate) use crate::application::json_input::{
4    json_bool_field, json_f32_field, json_string_field, json_usize_field,
5};
6pub(crate) use crate::application::{
7    AdminUseCases, CatalogUseCases, CreateDocumentInput, CreateEdgeInput, CreateEntityOutput,
8    CreateKvInput, CreateNodeEmbeddingInput, CreateNodeGraphLinkInput, CreateNodeInput,
9    CreateNodeTableLinkInput, CreateRowInput, CreateVectorInput, DeleteEntityInput, EntityUseCases,
10    ExecuteQueryInput, ExplainQueryInput, GraphCentralityInput, GraphClusteringInput,
11    GraphCommunitiesInput, GraphComponentsInput, GraphCyclesInput, GraphHitsInput,
12    GraphNeighborhoodInput, GraphPersonalizedPageRankInput, GraphShortestPathInput,
13    GraphTopologicalSortInput, GraphTraversalInput, GraphUseCases, InspectNativeArtifactInput,
14    NativeUseCases, PatchEntityInput, PatchEntityOperation, PatchEntityOperationType,
15    QueryUseCases, SearchHybridInput, SearchIvfInput, SearchMultimodalInput, SearchSimilarInput,
16    SearchTextInput, TreeUseCases,
17};
18use std::collections::{BTreeMap, HashMap};
19use std::io::{self, Read, Write};
20use std::net::{TcpListener, TcpStream};
21use std::sync::atomic::{AtomicU64, Ordering};
22use std::thread;
23use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
24
25use std::sync::Arc;
26
27use crate::api::{RedDBError, RedDBOptions, RedDBResult};
28use crate::auth::store::AuthStore;
29use crate::catalog::{CatalogModelSnapshot, CollectionDescriptor, CollectionModel, SchemaMode};
30use crate::health::{HealthProvider, HealthReport, HealthState};
31use crate::json::{parse_json, to_vec as json_to_vec, Map, Value as JsonValue};
32use crate::runtime::{
33    RedDBRuntime, RuntimeFilter, RuntimeFilterValue, RuntimeGraphCentralityAlgorithm,
34    RuntimeGraphCentralityResult, RuntimeGraphClusteringResult, RuntimeGraphCommunityAlgorithm,
35    RuntimeGraphCommunityResult, RuntimeGraphComponentsMode, RuntimeGraphComponentsResult,
36    RuntimeGraphCyclesResult, RuntimeGraphDirection, RuntimeGraphHitsResult,
37    RuntimeGraphNeighborhoodResult, RuntimeGraphPathAlgorithm, RuntimeGraphPathResult,
38    RuntimeGraphPattern, RuntimeGraphProjection, RuntimeGraphTopologicalSortResult,
39    RuntimeGraphTraversalResult, RuntimeGraphTraversalStrategy, RuntimeIvfSearchResult,
40    RuntimeQueryWeights, RuntimeStats, ScanCursor, ScanPage,
41};
42use crate::storage::schema::Value;
43use crate::storage::unified::devx::refs::{NodeRef, TableRef, VectorRef};
44use crate::storage::unified::dsl::{MatchComponents, QueryResult as DslQueryResult};
45use crate::storage::unified::{MetadataValue, RefTarget, SparseVector};
46use crate::storage::{CrossRef, EntityData, EntityId, EntityKind, SimilarResult, UnifiedEntity};
47
48fn analytics_job_json(job: &crate::PhysicalAnalyticsJob) -> JsonValue {
49    crate::presentation::admin_json::analytics_job_json(job)
50}
51
52#[cfg(test)]
53mod tests {
54    use super::*;
55    use crate::api::RedDBOptions;
56    use crate::health::HealthReport;
57    use crate::service_cli::{
58        TransportListenerFailure, TransportListenerState, TransportReadiness,
59    };
60
61    #[test]
62    fn health_json_reports_transport_listeners() {
63        let runtime = RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime");
64        let mut options = ServerOptions::default();
65        options.transport_readiness = TransportReadiness {
66            active: vec![TransportListenerState {
67                transport: "grpc".to_string(),
68                bind_addr: "127.0.0.1:50051".to_string(),
69                explicit: true,
70            }],
71            failed: vec![TransportListenerFailure {
72                transport: "http".to_string(),
73                bind_addr: "127.0.0.1:5055".to_string(),
74                explicit: false,
75                reason: "http listener bind 127.0.0.1:5055: address in use".to_string(),
76            }],
77        };
78        let server = RedDBServer::with_options(runtime, options);
79
80        let payload = server.health_json_with_transport(&HealthReport::healthy());
81        let JsonValue::Object(root) = payload else {
82            panic!("health payload should be an object");
83        };
84        let Some(JsonValue::Object(listeners)) = root.get("transport_listeners") else {
85            panic!("health payload should include transport_listeners");
86        };
87        let Some(JsonValue::Array(active)) = listeners.get("active") else {
88            panic!("transport_listeners.active should be an array");
89        };
90        let Some(JsonValue::Array(failed)) = listeners.get("failed") else {
91            panic!("transport_listeners.failed should be an array");
92        };
93
94        assert_eq!(active.len(), 1);
95        assert_eq!(failed.len(), 1);
96    }
97}
98
99fn graph_projection_json(projection: &crate::PhysicalGraphProjection) -> JsonValue {
100    crate::presentation::admin_json::graph_projection_json(projection)
101}
102
103pub mod handlers_admin;
104mod handlers_ai;
105mod handlers_auth;
106mod handlers_backup;
107mod handlers_ec;
108pub(crate) mod handlers_entity;
109mod handlers_geo;
110mod handlers_graph;
111mod handlers_keyed;
112mod handlers_log;
113mod handlers_metrics;
114mod handlers_ops;
115mod handlers_query;
116mod handlers_replication;
117mod handlers_vcs;
118mod handlers_vector;
119pub mod header_escape_guard;
120pub mod http_connection_limiter;
121pub mod http_handler_metrics;
122pub mod http_limits;
123pub mod ingest_pipeline;
124mod patch_support;
125mod request_body;
126mod request_context;
127mod routing;
128mod serverless_support;
129pub mod tls;
130mod transport;
131
132use self::handlers_ai::*;
133use self::handlers_entity::*;
134use self::handlers_graph::*;
135use self::handlers_keyed::*;
136use self::handlers_metrics::*;
137use self::handlers_ops::*;
138use self::handlers_query::*;
139use self::http_connection_limiter::{
140    HandlerDeadline, HttpConnectionLimiter, MonotonicClock, SystemMonotonicClock,
141};
142use self::http_handler_metrics::{HttpHandlerMetrics, HttpRejectReason, HttpTransport};
143pub use self::http_limits::{
144    HttpLimitsCliInput, HttpLimitsResolved, DEFAULT_HANDLER_TIMEOUT_MS, DEFAULT_RETRY_AFTER_SECS,
145};
146use self::patch_support::*;
147use self::request_body::*;
148use self::routing::*;
149use self::serverless_support::*;
150use self::transport::*;
151
152/// PLAN.md Phase 6.2 — endpoint segregation. A given HTTP listener
153/// can serve either every public surface (`Public`, default) or a
154/// restricted slice (`AdminOnly`, `MetricsOnly`). The route filter at
155/// the top of `route()` consults this so a port bound only to
156/// loopback for admin work won't accidentally hand out DML.
157#[derive(Debug, Clone, Copy, PartialEq, Eq)]
158pub enum ServerSurface {
159    /// Everything routed normally (default — matches v0 behaviour).
160    Public,
161    /// Only `/admin/*`, `/metrics`, and `/health/*`. Other paths
162    /// return 404. Intended for `RED_ADMIN_BIND` operator listeners
163    /// which default to `127.0.0.1`.
164    AdminOnly,
165    /// Only `/metrics` and `/health/*`. Intended for
166    /// `RED_METRICS_BIND` Prometheus scrape ports that may be
167    /// exposed to non-admin networks.
168    MetricsOnly,
169}
170
171#[derive(Debug, Clone)]
172pub struct ServerOptions {
173    pub bind_addr: String,
174    pub max_body_bytes: usize,
175    pub read_timeout_ms: u64,
176    pub write_timeout_ms: u64,
177    pub max_scan_limit: usize,
178    /// Which subset of paths this listener serves. Defaults to
179    /// `Public`. Set to `AdminOnly` / `MetricsOnly` for dedicated
180    /// admin / scrape ports (PLAN.md Phase 6.2).
181    pub surface: ServerSurface,
182    pub transport_readiness: crate::service_cli::TransportReadiness,
183}
184
185impl Default for ServerOptions {
186    fn default() -> Self {
187        Self {
188            bind_addr: "127.0.0.1:5055".to_string(),
189            max_body_bytes: 1024 * 1024,
190            read_timeout_ms: 5_000,
191            write_timeout_ms: 5_000,
192            max_scan_limit: 1_000,
193            surface: ServerSurface::Public,
194            transport_readiness: crate::service_cli::TransportReadiness::default(),
195        }
196    }
197}
198
199/// Replication state exposed to the HTTP server.
200pub struct ServerReplicationState {
201    pub config: crate::replication::ReplicationConfig,
202    pub primary: Option<crate::replication::primary::PrimaryReplication>,
203}
204
205#[derive(Clone)]
206pub struct RedDBServer {
207    runtime: RedDBRuntime,
208    options: ServerOptions,
209    auth_store: Option<Arc<AuthStore>>,
210    replication: Option<Arc<ServerReplicationState>>,
211    /// Bounded handler-thread admission for the clear-text HTTP accept
212    /// loop (issue #570 slice 1). Cloned with the server; `Clone` of
213    /// `HttpConnectionLimiter` shares an `Arc` so every serve loop on
214    /// the same `RedDBServer` shares one cap.
215    http_limiter: HttpConnectionLimiter,
216    /// Per-handler total-time deadline (issue #570 slice 2). Each
217    /// clear-text handler thread arms a deadline at spawn and bails
218    /// with a best-effort 503 at coarse boundaries between request
219    /// parse, route dispatch, and response write. Hard-coded to 30s
220    /// here; the config knob lands in slice 5.
221    handler_timeout: Duration,
222    /// Monotonic clock the per-handler deadline (issue #621) is armed
223    /// against — the same [`MonotonicClock`] abstraction the limiter
224    /// uses. Production wires [`SystemMonotonicClock`] (real wall time);
225    /// tests inject a fake to drive timeout expiry deterministically
226    /// without `sleep()`. Shared via `Arc` so cloned server handles
227    /// (e.g. `serve_in_background`) read the same clock.
228    handler_clock: Arc<dyn MonotonicClock>,
229    /// Test-only synchronous sleep injected between route dispatch and
230    /// response write so an integration test can simulate a slow
231    /// downstream tripping the deadline. Default 0 (no-op). Shared via
232    /// `Arc` so a cloned `RedDBServer` (e.g. `serve_in_background`)
233    /// observes flips from the originating handle. Production callers
234    /// have no way to set this — the setter is `#[doc(hidden)]`.
235    slow_inject_ms: Arc<AtomicU64>,
236    /// Prometheus metrics for the HTTP handler-thread pool (issue
237    /// #573 slice 4). Records rejections (cap_exhausted /
238    /// handler_timeout) and per-handler duration histograms. Cloned
239    /// with the server via `Arc` so every serve loop on the same
240    /// `RedDBServer` writes to one set of counters.
241    http_metrics: HttpHandlerMetrics,
242    /// `Retry-After` value (seconds) emitted in the limiter's reject
243    /// path (issue #574 slice 5). Pre-rendered into `reject_503_bytes`
244    /// at construction so the hot path is one write+close.
245    retry_after_secs: u64,
246    /// Cached bytes of the limiter's 503 response. Shared via `Arc`
247    /// across cloned server handles so flipping `retry_after_secs`
248    /// via `with_http_limits` propagates to every accept loop.
249    reject_503_bytes: Arc<Vec<u8>>,
250}
251
252/// Default per-handler total-time budget (issue #571 slice 2).
253const DEFAULT_HANDLER_TIMEOUT: Duration = Duration::from_millis(30_000);
254
255#[derive(Debug, Clone, Copy, PartialEq, Eq)]
256enum ServerlessWarmupScope {
257    Indexes,
258    GraphProjections,
259    AnalyticsJobs,
260    NativeArtifacts,
261}
262
263#[derive(Debug, Clone, Copy, PartialEq, Eq)]
264enum DeploymentProfile {
265    Embedded,
266    Server,
267    Serverless,
268}
269
270fn percent_decode_path_segment(input: &str) -> Result<String, String> {
271    let bytes = input.as_bytes();
272    let mut out = Vec::with_capacity(bytes.len());
273    let mut index = 0;
274    while index < bytes.len() {
275        match bytes[index] {
276            b'%' => {
277                if index + 2 >= bytes.len() {
278                    return Err("truncated percent escape".to_string());
279                }
280                let high = hex_value(bytes[index + 1])
281                    .ok_or_else(|| "invalid percent escape".to_string())?;
282                let low = hex_value(bytes[index + 2])
283                    .ok_or_else(|| "invalid percent escape".to_string())?;
284                out.push((high << 4) | low);
285                index += 3;
286            }
287            byte => {
288                out.push(byte);
289                index += 1;
290            }
291        }
292    }
293    String::from_utf8(out).map_err(|_| "path segment is not valid UTF-8".to_string())
294}
295
296fn hex_value(byte: u8) -> Option<u8> {
297    match byte {
298        b'0'..=b'9' => Some(byte - b'0'),
299        b'a'..=b'f' => Some(byte - b'a' + 10),
300        b'A'..=b'F' => Some(byte - b'A' + 10),
301        _ => None,
302    }
303}
304
305#[derive(Debug, Clone)]
306struct ParsedQueryRequest {
307    query: String,
308    entity_types: Option<Vec<String>>,
309    capabilities: Option<Vec<String>>,
310    /// Optional positional `$N` bind parameters (#358). When `Some`, the
311    /// query handler runs the user_params binder before executing.
312    /// Absence preserves the legacy `query`-only behavior.
313    params: Option<Vec<Value>>,
314}
315
316#[derive(Debug, Clone, Copy)]
317enum PatchOperationType {
318    Set,
319    Replace,
320    Unset,
321}
322
323#[derive(Debug, Clone)]
324struct PatchOperation {
325    op: PatchOperationType,
326    path: Vec<String>,
327    value: Option<JsonValue>,
328}
329
330impl RedDBServer {
331    pub fn new(runtime: RedDBRuntime) -> Self {
332        Self::with_options(runtime, ServerOptions::default())
333    }
334
335    pub fn from_database_options(
336        db_options: RedDBOptions,
337        server_options: ServerOptions,
338    ) -> RedDBResult<Self> {
339        let runtime = RedDBRuntime::with_options(db_options)?;
340        Ok(Self::with_options(runtime, server_options))
341    }
342
343    pub fn with_options(runtime: RedDBRuntime, options: ServerOptions) -> Self {
344        Self {
345            runtime,
346            options,
347            auth_store: None,
348            replication: None,
349            http_limiter: HttpConnectionLimiter::with_default_cap(),
350            handler_timeout: DEFAULT_HANDLER_TIMEOUT,
351            handler_clock: Arc::new(SystemMonotonicClock::new()),
352            slow_inject_ms: Arc::new(AtomicU64::new(0)),
353            http_metrics: HttpHandlerMetrics::new(),
354            retry_after_secs: DEFAULT_RETRY_AFTER_SECS,
355            reject_503_bytes: Arc::new(build_reject_503_bytes(DEFAULT_RETRY_AFTER_SECS)),
356        }
357    }
358
359    #[doc(hidden)]
360    pub fn http_metrics(&self) -> &HttpHandlerMetrics {
361        &self.http_metrics
362    }
363
364    /// Visible for tests. Lets the integration test in
365    /// `tests/http_connection_limiter.rs` saturate the cap and observe
366    /// `503 Service Unavailable` responses without spinning up
367    /// thousands of sockets.
368    #[doc(hidden)]
369    pub fn with_http_limiter_cap(mut self, cap: usize) -> Self {
370        self.http_limiter = HttpConnectionLimiter::new(cap);
371        self
372    }
373
374    /// Stamp resolved HTTP limits onto the server (issue #574 slice 5).
375    /// Replaces the limiter cap, the per-handler deadline, and the
376    /// `Retry-After` value used by the limiter's reject path. All
377    /// values are assumed validated by [`http_limits::resolve_http_limits`].
378    pub fn with_http_limits(mut self, limits: HttpLimitsResolved) -> Self {
379        self.http_limiter = HttpConnectionLimiter::new(limits.max_handlers);
380        self.handler_timeout = Duration::from_millis(limits.handler_timeout_ms);
381        self.retry_after_secs = limits.retry_after_secs;
382        self.reject_503_bytes = Arc::new(build_reject_503_bytes(limits.retry_after_secs));
383        self
384    }
385
386    #[doc(hidden)]
387    pub fn retry_after_secs(&self) -> u64 {
388        self.retry_after_secs
389    }
390
391    #[doc(hidden)]
392    pub fn http_limiter(&self) -> &HttpConnectionLimiter {
393        &self.http_limiter
394    }
395
396    /// Visible for tests. Override the per-handler total-time deadline
397    /// (issue #570 slice 2). Default 30s.
398    #[doc(hidden)]
399    pub fn with_handler_timeout(mut self, timeout: Duration) -> Self {
400        self.handler_timeout = timeout;
401        self
402    }
403
404    #[doc(hidden)]
405    pub fn handler_timeout(&self) -> Duration {
406        self.handler_timeout
407    }
408
409    /// Visible for tests. Override the clock the per-handler deadline
410    /// (issue #621) is armed against, so timeout expiry can be driven
411    /// deterministically without real sleeps. Default is the real
412    /// monotonic clock.
413    #[doc(hidden)]
414    pub fn with_handler_clock(mut self, clock: Arc<dyn MonotonicClock>) -> Self {
415        self.handler_clock = clock;
416        self
417    }
418
419    /// Test hook: set a synchronous sleep (in ms) inserted between
420    /// route dispatch and response write. The integration test for
421    /// slice 2 sets a value greater than `handler_timeout` to trip
422    /// the deadline, then resets to 0 to verify recovery. Shared via
423    /// `Arc<AtomicU64>` so cloned server handles see the same flip.
424    #[doc(hidden)]
425    pub fn set_test_slow_inject_ms(&self, ms: u64) {
426        self.slow_inject_ms.store(ms, Ordering::Relaxed);
427    }
428
429    /// Attach an `AuthStore` for HTTP-layer authentication.
430    /// Also injects the store into the runtime so that `Value::Secret`
431    /// auto-encrypt/decrypt can reach the vault AES key.
432    pub fn with_auth(mut self, auth_store: Arc<AuthStore>) -> Self {
433        self.runtime.set_auth_store(Arc::clone(&auth_store));
434        self.auth_store = Some(auth_store);
435        self
436    }
437
438    /// Attach replication state for status and snapshot endpoints.
439    pub fn with_replication(mut self, state: Arc<ServerReplicationState>) -> Self {
440        self.replication = Some(state);
441        self
442    }
443
444    pub fn runtime(&self) -> &RedDBRuntime {
445        &self.runtime
446    }
447
448    pub fn options(&self) -> &ServerOptions {
449        &self.options
450    }
451
452    fn query_use_cases(&self) -> QueryUseCases<'_, RedDBRuntime> {
453        QueryUseCases::new(&self.runtime)
454    }
455
456    fn admin_use_cases(&self) -> AdminUseCases<'_, RedDBRuntime> {
457        AdminUseCases::new(&self.runtime)
458    }
459
460    fn entity_use_cases(&self) -> EntityUseCases<'_, RedDBRuntime> {
461        EntityUseCases::new(&self.runtime)
462    }
463
464    fn catalog_use_cases(&self) -> CatalogUseCases<'_, RedDBRuntime> {
465        CatalogUseCases::new(&self.runtime)
466    }
467
468    fn graph_use_cases(&self) -> GraphUseCases<'_, RedDBRuntime> {
469        GraphUseCases::new(&self.runtime)
470    }
471
472    fn native_use_cases(&self) -> NativeUseCases<'_, RedDBRuntime> {
473        NativeUseCases::new(&self.runtime)
474    }
475
476    fn tree_use_cases(&self) -> TreeUseCases<'_, RedDBRuntime> {
477        TreeUseCases::new(&self.runtime)
478    }
479
480    fn transport_readiness_json(&self) -> JsonValue {
481        let active = self
482            .options
483            .transport_readiness
484            .active
485            .iter()
486            .map(|listener| {
487                let mut object = Map::new();
488                object.insert(
489                    "transport".to_string(),
490                    JsonValue::String(listener.transport.clone()),
491                );
492                object.insert(
493                    "bind_addr".to_string(),
494                    JsonValue::String(listener.bind_addr.clone()),
495                );
496                object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
497                JsonValue::Object(object)
498            })
499            .collect();
500        let failed = self
501            .options
502            .transport_readiness
503            .failed
504            .iter()
505            .map(|listener| {
506                let mut object = Map::new();
507                object.insert(
508                    "transport".to_string(),
509                    JsonValue::String(listener.transport.clone()),
510                );
511                object.insert(
512                    "bind_addr".to_string(),
513                    JsonValue::String(listener.bind_addr.clone()),
514                );
515                object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
516                object.insert(
517                    "reason".to_string(),
518                    JsonValue::String(listener.reason.clone()),
519                );
520                JsonValue::Object(object)
521            })
522            .collect();
523
524        let mut object = Map::new();
525        object.insert("active".to_string(), JsonValue::Array(active));
526        object.insert("failed".to_string(), JsonValue::Array(failed));
527        JsonValue::Object(object)
528    }
529
530    fn health_json_with_transport(&self, report: &HealthReport) -> JsonValue {
531        let mut value = crate::presentation::ops_json::health_json(report);
532        if let JsonValue::Object(ref mut object) = value {
533            object.insert(
534                "transport_listeners".to_string(),
535                self.transport_readiness_json(),
536            );
537        }
538        value
539    }
540
541    pub fn serve(&self) -> io::Result<()> {
542        let listener = TcpListener::bind(&self.options.bind_addr)?;
543        self.serve_on(listener)
544    }
545
546    pub fn serve_on(&self, listener: TcpListener) -> io::Result<()> {
547        for stream in listener.incoming() {
548            match stream {
549                Ok(stream) => match self.http_limiter.try_acquire() {
550                    Some(permit) => {
551                        // Spawn a thread per connection for concurrent request handling
552                        let server = self.clone();
553                        thread::spawn(move || {
554                            let _guard = permit; // released on thread exit
555                            let _ = server.handle_connection(stream);
556                        });
557                    }
558                    None => {
559                        // Cap exhausted: write static 503 inline on the
560                        // accept thread, close the socket, and continue.
561                        // No thread spawn, no `HttpRequest::read_from`,
562                        // no runtime call.
563                        self.http_metrics
564                            .record_reject(HttpTransport::Http, HttpRejectReason::CapExhausted);
565                        self.reject_with_503(stream, self.options.write_timeout_ms);
566                    }
567                },
568                Err(err) => return Err(err),
569            }
570        }
571        Ok(())
572    }
573
574    /// 503 response used when the connection limiter is full. The
575    /// payload is pre-rendered into `reject_503_bytes` at construction
576    /// (issue #574 slice 5: `Retry-After` is configurable), so the
577    /// hot path is still one write and a close.
578    fn reject_with_503(&self, mut stream: TcpStream, write_timeout_ms: u64) {
579        let _ = stream.set_write_timeout(Some(Duration::from_millis(write_timeout_ms)));
580        let _ = stream.write_all(&self.reject_503_bytes);
581        let _ = stream.flush();
582        let _ = stream.shutdown(std::net::Shutdown::Both);
583    }
584
585    pub fn serve_one_on(&self, listener: TcpListener) -> io::Result<()> {
586        let (stream, _) = listener.accept()?;
587        self.handle_connection(stream)
588    }
589
590    pub fn serve_in_background(&self) -> thread::JoinHandle<io::Result<()>> {
591        let server = self.clone();
592        thread::spawn(move || server.serve())
593    }
594
595    pub fn serve_in_background_on(
596        &self,
597        listener: TcpListener,
598    ) -> thread::JoinHandle<io::Result<()>> {
599        let server = self.clone();
600        thread::spawn(move || server.serve_on(listener))
601    }
602
603    /// Serve TLS-wrapped HTTPS on the configured `bind_addr`. The
604    /// `tls_config` is shared across all connections (rustls
605    /// `ServerConfig` is `Send + Sync`).
606    pub fn serve_tls(&self, tls_config: std::sync::Arc<rustls::ServerConfig>) -> io::Result<()> {
607        let listener = TcpListener::bind(&self.options.bind_addr)?;
608        self.serve_tls_on(listener, tls_config)
609    }
610
611    pub fn serve_tls_on(
612        &self,
613        listener: TcpListener,
614        tls_config: std::sync::Arc<rustls::ServerConfig>,
615    ) -> io::Result<()> {
616        for stream in listener.incoming() {
617            match stream {
618                Ok(stream) => match self.http_limiter.try_acquire() {
619                    Some(permit) => {
620                        let server = self.clone();
621                        let cfg = tls_config.clone();
622                        thread::spawn(move || {
623                            let _guard = permit; // released on thread exit
624                            let _ = server.handle_tls_connection(stream, cfg);
625                        });
626                    }
627                    None => {
628                        // Issue #572 slice 3: cross-transport cap shared
629                        // with the clear-text limiter. Writing a 503 over
630                        // a non-handshaken TLS socket is not meaningful,
631                        // so reject by closing the raw socket. No TLS
632                        // handshake, no thread spawn, no runtime call.
633                        self.http_metrics
634                            .record_reject(HttpTransport::Https, HttpRejectReason::CapExhausted);
635                        let _ = stream.shutdown(std::net::Shutdown::Both);
636                        drop(stream);
637                    }
638                },
639                Err(err) => return Err(err),
640            }
641        }
642        Ok(())
643    }
644
645    pub fn serve_tls_in_background(
646        &self,
647        tls_config: std::sync::Arc<rustls::ServerConfig>,
648    ) -> thread::JoinHandle<io::Result<()>> {
649        let server = self.clone();
650        thread::spawn(move || server.serve_tls(tls_config))
651    }
652
653    pub fn serve_tls_in_background_on(
654        &self,
655        listener: TcpListener,
656        tls_config: std::sync::Arc<rustls::ServerConfig>,
657    ) -> thread::JoinHandle<io::Result<()>> {
658        let server = self.clone();
659        thread::spawn(move || server.serve_tls_on(listener, tls_config))
660    }
661
662    fn handle_connection(&self, stream: TcpStream) -> io::Result<()> {
663        let started = Instant::now();
664        let result = self.handle_connection_inner(stream);
665        let elapsed = started.elapsed().as_secs_f64();
666        self.http_metrics
667            .record_duration(HttpTransport::Http, elapsed);
668        result
669    }
670
671    fn handle_connection_inner(&self, mut stream: TcpStream) -> io::Result<()> {
672        stream.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
673        stream.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;
674
675        // Issue #570 slice 2 / #621: arm a deadline at handler spawn and
676        // check at coarse boundaries. Armed against the injectable
677        // monotonic clock (#621) so timeout behaviour is deterministically
678        // testable without real sleeps; production wires the real clock,
679        // so this tracks wall time. No hard pre-emption — a thread blocked
680        // inside a true syscall is still bounded only by the per-socket
681        // read/write timeouts.
682        let deadline = HandlerDeadline::arm(Arc::clone(&self.handler_clock), self.handler_timeout);
683
684        let request = HttpRequest::read_from(&mut stream, self.options.max_body_bytes)?;
685
686        // Boundary (a): between request parse and route dispatch.
687        if deadline.expired() {
688            self.http_metrics
689                .record_reject(HttpTransport::Http, HttpRejectReason::HandlerTimeout);
690            Self::write_handler_timeout_503(&mut stream);
691            return Ok(());
692        }
693
694        if self.try_route_streaming(&request, &mut stream)? {
695            return Ok(());
696        }
697        let response = self.route(request);
698
699        // Test-only injected slow downstream (issue #570 slice 2
700        // integration test). Production builds set this to 0, so this
701        // is a single relaxed atomic load on the hot path.
702        let inject_ms = self.slow_inject_ms.load(Ordering::Relaxed);
703        if inject_ms > 0 {
704            thread::sleep(Duration::from_millis(inject_ms));
705        }
706
707        // Boundary (b): between route dispatch and response write.
708        if deadline.expired() {
709            self.http_metrics
710                .record_reject(HttpTransport::Http, HttpRejectReason::HandlerTimeout);
711            Self::write_handler_timeout_503(&mut stream);
712            return Ok(());
713        }
714
715        stream.write_all(&response.to_http_bytes())?;
716        stream.flush()?;
717        Ok(())
718    }
719
720    /// Best-effort 503 emitted when the per-handler deadline expires
721    /// at a coarse boundary. Writes are swallowed — the caller has
722    /// already exceeded its budget, so we do not propagate write
723    /// errors. Permit drop happens on the handler thread's normal
724    /// exit path.
725    fn write_handler_timeout_503<S: Write>(stream: &mut S) {
726        const RESPONSE: &[u8] = b"HTTP/1.1 503 Service Unavailable\r\n\
727            Connection: close\r\n\
728            Content-Length: 0\r\n\
729            \r\n";
730        let _ = stream.write_all(RESPONSE);
731        let _ = stream.flush();
732    }
733
734    fn handle_tls_connection(
735        &self,
736        tcp: TcpStream,
737        tls_config: std::sync::Arc<rustls::ServerConfig>,
738    ) -> io::Result<()> {
739        let started = Instant::now();
740        let result = self.handle_tls_connection_inner(tcp, tls_config);
741        let elapsed = started.elapsed().as_secs_f64();
742        self.http_metrics
743            .record_duration(HttpTransport::Https, elapsed);
744        result
745    }
746
747    fn handle_tls_connection_inner(
748        &self,
749        tcp: TcpStream,
750        tls_config: std::sync::Arc<rustls::ServerConfig>,
751    ) -> io::Result<()> {
752        tcp.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
753        tcp.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;
754
755        // Issue #572 slice 3 / #621: per-handler deadline applies to TLS
756        // handlers too — same injectable-clock scaffolding as the
757        // clear-text path.
758        let deadline = HandlerDeadline::arm(Arc::clone(&self.handler_clock), self.handler_timeout);
759
760        let mut tls_stream = match self::tls::accept_tls(tls_config, tcp) {
761            Ok(s) => s,
762            Err(err) => {
763                tracing::warn!(
764                    target: "reddb::http_tls",
765                    err = %err,
766                    "TLS handshake failed"
767                );
768                return Err(err);
769            }
770        };
771        let request = match HttpRequest::read_from(&mut tls_stream, self.options.max_body_bytes) {
772            Ok(req) => req,
773            Err(err) => {
774                tracing::warn!(
775                    target: "reddb::http_tls",
776                    err = %err,
777                    "TLS request parse failed"
778                );
779                return Err(err);
780            }
781        };
782
783        // Boundary (a): between request parse and route dispatch.
784        if deadline.expired() {
785            self.http_metrics
786                .record_reject(HttpTransport::Https, HttpRejectReason::HandlerTimeout);
787            Self::write_handler_timeout_503(&mut tls_stream);
788            return Ok(());
789        }
790
791        if self.try_route_streaming(&request, &mut tls_stream)? {
792            return Ok(());
793        }
794        let response = self.route(request);
795
796        // Test-only injected slow downstream (issue #572 slice 3
797        // integration test). Production sets this to 0.
798        let inject_ms = self.slow_inject_ms.load(Ordering::Relaxed);
799        if inject_ms > 0 {
800            thread::sleep(Duration::from_millis(inject_ms));
801        }
802
803        // Boundary (b): between route dispatch and response write.
804        if deadline.expired() {
805            self.http_metrics
806                .record_reject(HttpTransport::Https, HttpRejectReason::HandlerTimeout);
807            Self::write_handler_timeout_503(&mut tls_stream);
808            return Ok(());
809        }
810
811        tls_stream.write_all(&response.to_http_bytes())?;
812        tls_stream.flush()?;
813        Ok(())
814    }
815}
816
817/// Pre-render the limiter-reject 503 response. The `Retry-After`
818/// value comes from the resolved HTTP limits (issue #574 slice 5)
819/// and is fixed for the lifetime of the server, so we build the
820/// bytes once and hand a shared `Arc<Vec<u8>>` to every accept loop.
821fn build_reject_503_bytes(retry_after_secs: u64) -> Vec<u8> {
822    format!(
823        "HTTP/1.1 503 Service Unavailable\r\n\
824         Connection: close\r\n\
825         Content-Length: 0\r\n\
826         Retry-After: {retry_after_secs}\r\n\
827         \r\n"
828    )
829    .into_bytes()
830}