reddb-io-server 1.2.4

RedDB server-side engine: storage, runtime, replication, MCP, AI, and the gRPC/HTTP/RedWire/PG-wire dispatchers. Re-exported by the umbrella `reddb` crate.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
//! Minimal HTTP server for RedDB management and remote access.

pub(crate) use crate::application::json_input::{
    json_bool_field, json_f32_field, json_string_field, json_usize_field,
};
pub(crate) use crate::application::{
    AdminUseCases, CatalogUseCases, CreateDocumentInput, CreateEdgeInput, CreateEntityOutput,
    CreateKvInput, CreateNodeEmbeddingInput, CreateNodeGraphLinkInput, CreateNodeInput,
    CreateNodeTableLinkInput, CreateRowInput, CreateVectorInput, DeleteEntityInput, EntityUseCases,
    ExecuteQueryInput, ExplainQueryInput, GraphCentralityInput, GraphClusteringInput,
    GraphCommunitiesInput, GraphComponentsInput, GraphCyclesInput, GraphHitsInput,
    GraphNeighborhoodInput, GraphPersonalizedPageRankInput, GraphShortestPathInput,
    GraphTopologicalSortInput, GraphTraversalInput, GraphUseCases, InspectNativeArtifactInput,
    NativeUseCases, PatchEntityInput, PatchEntityOperation, PatchEntityOperationType,
    QueryUseCases, SearchHybridInput, SearchIvfInput, SearchMultimodalInput, SearchSimilarInput,
    SearchTextInput, TreeUseCases,
};
use std::collections::{BTreeMap, HashMap};
use std::io::{self, Read, Write};
use std::net::{TcpListener, TcpStream};
use std::sync::atomic::{AtomicU64, Ordering};
use std::thread;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};

use std::sync::Arc;

use crate::api::{RedDBError, RedDBOptions, RedDBResult};
use crate::auth::store::AuthStore;
use crate::catalog::{CatalogModelSnapshot, CollectionDescriptor, CollectionModel, SchemaMode};
use crate::health::{HealthProvider, HealthReport, HealthState};
use crate::json::{parse_json, to_vec as json_to_vec, Map, Value as JsonValue};
use crate::runtime::{
    RedDBRuntime, RuntimeFilter, RuntimeFilterValue, RuntimeGraphCentralityAlgorithm,
    RuntimeGraphCentralityResult, RuntimeGraphClusteringResult, RuntimeGraphCommunityAlgorithm,
    RuntimeGraphCommunityResult, RuntimeGraphComponentsMode, RuntimeGraphComponentsResult,
    RuntimeGraphCyclesResult, RuntimeGraphDirection, RuntimeGraphHitsResult,
    RuntimeGraphNeighborhoodResult, RuntimeGraphPathAlgorithm, RuntimeGraphPathResult,
    RuntimeGraphPattern, RuntimeGraphProjection, RuntimeGraphTopologicalSortResult,
    RuntimeGraphTraversalResult, RuntimeGraphTraversalStrategy, RuntimeIvfSearchResult,
    RuntimeQueryWeights, RuntimeStats, ScanCursor, ScanPage,
};
use crate::storage::schema::Value;
use crate::storage::unified::devx::refs::{NodeRef, TableRef, VectorRef};
use crate::storage::unified::dsl::{MatchComponents, QueryResult as DslQueryResult};
use crate::storage::unified::{MetadataValue, RefTarget, SparseVector};
use crate::storage::{CrossRef, EntityData, EntityId, EntityKind, SimilarResult, UnifiedEntity};

fn analytics_job_json(job: &crate::PhysicalAnalyticsJob) -> JsonValue {
    crate::presentation::admin_json::analytics_job_json(job)
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::api::RedDBOptions;
    use crate::health::HealthReport;
    use crate::service_cli::{
        TransportListenerFailure, TransportListenerState, TransportReadiness,
    };

    #[test]
    fn health_json_reports_transport_listeners() {
        let runtime = RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime");
        let mut options = ServerOptions::default();
        options.transport_readiness = TransportReadiness {
            active: vec![TransportListenerState {
                transport: "grpc".to_string(),
                bind_addr: "127.0.0.1:50051".to_string(),
                explicit: true,
            }],
            failed: vec![TransportListenerFailure {
                transport: "http".to_string(),
                bind_addr: "127.0.0.1:5055".to_string(),
                explicit: false,
                reason: "http listener bind 127.0.0.1:5055: address in use".to_string(),
            }],
        };
        let server = RedDBServer::with_options(runtime, options);

        let payload = server.health_json_with_transport(&HealthReport::healthy());
        let JsonValue::Object(root) = payload else {
            panic!("health payload should be an object");
        };
        let Some(JsonValue::Object(listeners)) = root.get("transport_listeners") else {
            panic!("health payload should include transport_listeners");
        };
        let Some(JsonValue::Array(active)) = listeners.get("active") else {
            panic!("transport_listeners.active should be an array");
        };
        let Some(JsonValue::Array(failed)) = listeners.get("failed") else {
            panic!("transport_listeners.failed should be an array");
        };

        assert_eq!(active.len(), 1);
        assert_eq!(failed.len(), 1);
    }
}

fn graph_projection_json(projection: &crate::PhysicalGraphProjection) -> JsonValue {
    crate::presentation::admin_json::graph_projection_json(projection)
}

pub mod handlers_admin;
mod handlers_ai;
pub mod http_connection_limiter;
pub mod http_handler_metrics;
pub mod http_limits;
mod handlers_auth;
mod handlers_backup;
mod handlers_ec;
pub(crate) mod handlers_entity;
mod handlers_geo;
mod handlers_graph;
mod handlers_keyed;
mod handlers_log;
mod handlers_metrics;
mod handlers_ops;
mod handlers_query;
mod handlers_replication;
mod handlers_vcs;
mod handlers_vector;
pub mod header_escape_guard;
pub mod ingest_pipeline;
mod patch_support;
mod request_body;
mod request_context;
mod routing;
mod serverless_support;
pub mod tls;
mod transport;

use self::handlers_ai::*;
use self::http_connection_limiter::HttpConnectionLimiter;
use self::http_handler_metrics::{HttpHandlerMetrics, HttpRejectReason, HttpTransport};
pub use self::http_limits::{
    HttpLimitsCliInput, HttpLimitsResolved, DEFAULT_HANDLER_TIMEOUT_MS, DEFAULT_RETRY_AFTER_SECS,
};
use self::handlers_entity::*;
use self::handlers_graph::*;
use self::handlers_keyed::*;
use self::handlers_metrics::*;
use self::handlers_ops::*;
use self::handlers_query::*;
use self::patch_support::*;
use self::request_body::*;
use self::routing::*;
use self::serverless_support::*;
use self::transport::*;

/// PLAN.md Phase 6.2 — endpoint segregation. A given HTTP listener
/// can serve either every public surface (`Public`, default) or a
/// restricted slice (`AdminOnly`, `MetricsOnly`). The route filter at
/// the top of `route()` consults this so a port bound only to
/// loopback for admin work won't accidentally hand out DML.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ServerSurface {
    /// Everything routed normally (default — matches v0 behaviour).
    Public,
    /// Only `/admin/*`, `/metrics`, and `/health/*`. Other paths
    /// return 404. Intended for `RED_ADMIN_BIND` operator listeners
    /// which default to `127.0.0.1`.
    AdminOnly,
    /// Only `/metrics` and `/health/*`. Intended for
    /// `RED_METRICS_BIND` Prometheus scrape ports that may be
    /// exposed to non-admin networks.
    MetricsOnly,
}

#[derive(Debug, Clone)]
pub struct ServerOptions {
    pub bind_addr: String,
    pub max_body_bytes: usize,
    pub read_timeout_ms: u64,
    pub write_timeout_ms: u64,
    pub max_scan_limit: usize,
    /// Which subset of paths this listener serves. Defaults to
    /// `Public`. Set to `AdminOnly` / `MetricsOnly` for dedicated
    /// admin / scrape ports (PLAN.md Phase 6.2).
    pub surface: ServerSurface,
    pub transport_readiness: crate::service_cli::TransportReadiness,
}

impl Default for ServerOptions {
    fn default() -> Self {
        Self {
            bind_addr: "127.0.0.1:5055".to_string(),
            max_body_bytes: 1024 * 1024,
            read_timeout_ms: 5_000,
            write_timeout_ms: 5_000,
            max_scan_limit: 1_000,
            surface: ServerSurface::Public,
            transport_readiness: crate::service_cli::TransportReadiness::default(),
        }
    }
}

/// Replication state exposed to the HTTP server.
pub struct ServerReplicationState {
    pub config: crate::replication::ReplicationConfig,
    pub primary: Option<crate::replication::primary::PrimaryReplication>,
}

#[derive(Clone)]
pub struct RedDBServer {
    runtime: RedDBRuntime,
    options: ServerOptions,
    auth_store: Option<Arc<AuthStore>>,
    replication: Option<Arc<ServerReplicationState>>,
    /// Bounded handler-thread admission for the clear-text HTTP accept
    /// loop (issue #570 slice 1). Cloned with the server; `Clone` of
    /// `HttpConnectionLimiter` shares an `Arc` so every serve loop on
    /// the same `RedDBServer` shares one cap.
    http_limiter: HttpConnectionLimiter,
    /// Per-handler total-time deadline (issue #570 slice 2). Each
    /// clear-text handler thread arms a deadline at spawn and bails
    /// with a best-effort 503 at coarse boundaries between request
    /// parse, route dispatch, and response write. Hard-coded to 30s
    /// here; the config knob lands in slice 5.
    handler_timeout: Duration,
    /// Test-only synchronous sleep injected between route dispatch and
    /// response write so an integration test can simulate a slow
    /// downstream tripping the deadline. Default 0 (no-op). Shared via
    /// `Arc` so a cloned `RedDBServer` (e.g. `serve_in_background`)
    /// observes flips from the originating handle. Production callers
    /// have no way to set this — the setter is `#[doc(hidden)]`.
    slow_inject_ms: Arc<AtomicU64>,
    /// Prometheus metrics for the HTTP handler-thread pool (issue
    /// #573 slice 4). Records rejections (cap_exhausted /
    /// handler_timeout) and per-handler duration histograms. Cloned
    /// with the server via `Arc` so every serve loop on the same
    /// `RedDBServer` writes to one set of counters.
    http_metrics: HttpHandlerMetrics,
    /// `Retry-After` value (seconds) emitted in the limiter's reject
    /// path (issue #574 slice 5). Pre-rendered into `reject_503_bytes`
    /// at construction so the hot path is one write+close.
    retry_after_secs: u64,
    /// Cached bytes of the limiter's 503 response. Shared via `Arc`
    /// across cloned server handles so flipping `retry_after_secs`
    /// via `with_http_limits` propagates to every accept loop.
    reject_503_bytes: Arc<Vec<u8>>,
}

/// Default per-handler total-time budget (issue #571 slice 2).
const DEFAULT_HANDLER_TIMEOUT: Duration = Duration::from_millis(30_000);

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ServerlessWarmupScope {
    Indexes,
    GraphProjections,
    AnalyticsJobs,
    NativeArtifacts,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum DeploymentProfile {
    Embedded,
    Server,
    Serverless,
}

fn percent_decode_path_segment(input: &str) -> Result<String, String> {
    let bytes = input.as_bytes();
    let mut out = Vec::with_capacity(bytes.len());
    let mut index = 0;
    while index < bytes.len() {
        match bytes[index] {
            b'%' => {
                if index + 2 >= bytes.len() {
                    return Err("truncated percent escape".to_string());
                }
                let high = hex_value(bytes[index + 1])
                    .ok_or_else(|| "invalid percent escape".to_string())?;
                let low = hex_value(bytes[index + 2])
                    .ok_or_else(|| "invalid percent escape".to_string())?;
                out.push((high << 4) | low);
                index += 3;
            }
            byte => {
                out.push(byte);
                index += 1;
            }
        }
    }
    String::from_utf8(out).map_err(|_| "path segment is not valid UTF-8".to_string())
}

fn hex_value(byte: u8) -> Option<u8> {
    match byte {
        b'0'..=b'9' => Some(byte - b'0'),
        b'a'..=b'f' => Some(byte - b'a' + 10),
        b'A'..=b'F' => Some(byte - b'A' + 10),
        _ => None,
    }
}

#[derive(Debug, Clone)]
struct ParsedQueryRequest {
    query: String,
    entity_types: Option<Vec<String>>,
    capabilities: Option<Vec<String>>,
    /// Optional positional `$N` bind parameters (#358). When `Some`, the
    /// query handler runs the user_params binder before executing.
    /// Absence preserves the legacy `query`-only behavior.
    params: Option<Vec<Value>>,
}

#[derive(Debug, Clone, Copy)]
enum PatchOperationType {
    Set,
    Replace,
    Unset,
}

#[derive(Debug, Clone)]
struct PatchOperation {
    op: PatchOperationType,
    path: Vec<String>,
    value: Option<JsonValue>,
}

impl RedDBServer {
    pub fn new(runtime: RedDBRuntime) -> Self {
        Self::with_options(runtime, ServerOptions::default())
    }

    pub fn from_database_options(
        db_options: RedDBOptions,
        server_options: ServerOptions,
    ) -> RedDBResult<Self> {
        let runtime = RedDBRuntime::with_options(db_options)?;
        Ok(Self::with_options(runtime, server_options))
    }

    pub fn with_options(runtime: RedDBRuntime, options: ServerOptions) -> Self {
        Self {
            runtime,
            options,
            auth_store: None,
            replication: None,
            http_limiter: HttpConnectionLimiter::with_default_cap(),
            handler_timeout: DEFAULT_HANDLER_TIMEOUT,
            slow_inject_ms: Arc::new(AtomicU64::new(0)),
            http_metrics: HttpHandlerMetrics::new(),
            retry_after_secs: DEFAULT_RETRY_AFTER_SECS,
            reject_503_bytes: Arc::new(build_reject_503_bytes(DEFAULT_RETRY_AFTER_SECS)),
        }
    }

    #[doc(hidden)]
    pub fn http_metrics(&self) -> &HttpHandlerMetrics {
        &self.http_metrics
    }

    /// Visible for tests. Lets the integration test in
    /// `tests/http_connection_limiter.rs` saturate the cap and observe
    /// `503 Service Unavailable` responses without spinning up
    /// thousands of sockets.
    #[doc(hidden)]
    pub fn with_http_limiter_cap(mut self, cap: usize) -> Self {
        self.http_limiter = HttpConnectionLimiter::new(cap);
        self
    }

    /// Stamp resolved HTTP limits onto the server (issue #574 slice 5).
    /// Replaces the limiter cap, the per-handler deadline, and the
    /// `Retry-After` value used by the limiter's reject path. All
    /// values are assumed validated by [`http_limits::resolve_http_limits`].
    pub fn with_http_limits(mut self, limits: HttpLimitsResolved) -> Self {
        self.http_limiter = HttpConnectionLimiter::new(limits.max_handlers);
        self.handler_timeout = Duration::from_millis(limits.handler_timeout_ms);
        self.retry_after_secs = limits.retry_after_secs;
        self.reject_503_bytes = Arc::new(build_reject_503_bytes(limits.retry_after_secs));
        self
    }

    #[doc(hidden)]
    pub fn retry_after_secs(&self) -> u64 {
        self.retry_after_secs
    }

    #[doc(hidden)]
    pub fn http_limiter(&self) -> &HttpConnectionLimiter {
        &self.http_limiter
    }

    /// Visible for tests. Override the per-handler total-time deadline
    /// (issue #570 slice 2). Default 30s.
    #[doc(hidden)]
    pub fn with_handler_timeout(mut self, timeout: Duration) -> Self {
        self.handler_timeout = timeout;
        self
    }

    #[doc(hidden)]
    pub fn handler_timeout(&self) -> Duration {
        self.handler_timeout
    }

    /// Test hook: set a synchronous sleep (in ms) inserted between
    /// route dispatch and response write. The integration test for
    /// slice 2 sets a value greater than `handler_timeout` to trip
    /// the deadline, then resets to 0 to verify recovery. Shared via
    /// `Arc<AtomicU64>` so cloned server handles see the same flip.
    #[doc(hidden)]
    pub fn set_test_slow_inject_ms(&self, ms: u64) {
        self.slow_inject_ms.store(ms, Ordering::Relaxed);
    }

    /// Attach an `AuthStore` for HTTP-layer authentication.
    /// Also injects the store into the runtime so that `Value::Secret`
    /// auto-encrypt/decrypt can reach the vault AES key.
    pub fn with_auth(mut self, auth_store: Arc<AuthStore>) -> Self {
        self.runtime.set_auth_store(Arc::clone(&auth_store));
        self.auth_store = Some(auth_store);
        self
    }

    /// Attach replication state for status and snapshot endpoints.
    pub fn with_replication(mut self, state: Arc<ServerReplicationState>) -> Self {
        self.replication = Some(state);
        self
    }

    pub fn runtime(&self) -> &RedDBRuntime {
        &self.runtime
    }

    pub fn options(&self) -> &ServerOptions {
        &self.options
    }

    fn query_use_cases(&self) -> QueryUseCases<'_, RedDBRuntime> {
        QueryUseCases::new(&self.runtime)
    }

    fn admin_use_cases(&self) -> AdminUseCases<'_, RedDBRuntime> {
        AdminUseCases::new(&self.runtime)
    }

    fn entity_use_cases(&self) -> EntityUseCases<'_, RedDBRuntime> {
        EntityUseCases::new(&self.runtime)
    }

    fn catalog_use_cases(&self) -> CatalogUseCases<'_, RedDBRuntime> {
        CatalogUseCases::new(&self.runtime)
    }

    fn graph_use_cases(&self) -> GraphUseCases<'_, RedDBRuntime> {
        GraphUseCases::new(&self.runtime)
    }

    fn native_use_cases(&self) -> NativeUseCases<'_, RedDBRuntime> {
        NativeUseCases::new(&self.runtime)
    }

    fn tree_use_cases(&self) -> TreeUseCases<'_, RedDBRuntime> {
        TreeUseCases::new(&self.runtime)
    }

    fn transport_readiness_json(&self) -> JsonValue {
        let active = self
            .options
            .transport_readiness
            .active
            .iter()
            .map(|listener| {
                let mut object = Map::new();
                object.insert(
                    "transport".to_string(),
                    JsonValue::String(listener.transport.clone()),
                );
                object.insert(
                    "bind_addr".to_string(),
                    JsonValue::String(listener.bind_addr.clone()),
                );
                object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
                JsonValue::Object(object)
            })
            .collect();
        let failed = self
            .options
            .transport_readiness
            .failed
            .iter()
            .map(|listener| {
                let mut object = Map::new();
                object.insert(
                    "transport".to_string(),
                    JsonValue::String(listener.transport.clone()),
                );
                object.insert(
                    "bind_addr".to_string(),
                    JsonValue::String(listener.bind_addr.clone()),
                );
                object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
                object.insert(
                    "reason".to_string(),
                    JsonValue::String(listener.reason.clone()),
                );
                JsonValue::Object(object)
            })
            .collect();

        let mut object = Map::new();
        object.insert("active".to_string(), JsonValue::Array(active));
        object.insert("failed".to_string(), JsonValue::Array(failed));
        JsonValue::Object(object)
    }

    fn health_json_with_transport(&self, report: &HealthReport) -> JsonValue {
        let mut value = crate::presentation::ops_json::health_json(report);
        if let JsonValue::Object(ref mut object) = value {
            object.insert(
                "transport_listeners".to_string(),
                self.transport_readiness_json(),
            );
        }
        value
    }

    pub fn serve(&self) -> io::Result<()> {
        let listener = TcpListener::bind(&self.options.bind_addr)?;
        self.serve_on(listener)
    }

    pub fn serve_on(&self, listener: TcpListener) -> io::Result<()> {
        for stream in listener.incoming() {
            match stream {
                Ok(stream) => match self.http_limiter.try_acquire() {
                    Some(permit) => {
                        // Spawn a thread per connection for concurrent request handling
                        let server = self.clone();
                        thread::spawn(move || {
                            let _guard = permit; // released on thread exit
                            let _ = server.handle_connection(stream);
                        });
                    }
                    None => {
                        // Cap exhausted: write static 503 inline on the
                        // accept thread, close the socket, and continue.
                        // No thread spawn, no `HttpRequest::read_from`,
                        // no runtime call.
                        self.http_metrics
                            .record_reject(HttpTransport::Http, HttpRejectReason::CapExhausted);
                        self.reject_with_503(stream, self.options.write_timeout_ms);
                    }
                },
                Err(err) => return Err(err),
            }
        }
        Ok(())
    }

    /// 503 response used when the connection limiter is full. The
    /// payload is pre-rendered into `reject_503_bytes` at construction
    /// (issue #574 slice 5: `Retry-After` is configurable), so the
    /// hot path is still one write and a close.
    fn reject_with_503(&self, mut stream: TcpStream, write_timeout_ms: u64) {
        let _ = stream.set_write_timeout(Some(Duration::from_millis(write_timeout_ms)));
        let _ = stream.write_all(&self.reject_503_bytes);
        let _ = stream.flush();
        let _ = stream.shutdown(std::net::Shutdown::Both);
    }

    pub fn serve_one_on(&self, listener: TcpListener) -> io::Result<()> {
        let (stream, _) = listener.accept()?;
        self.handle_connection(stream)
    }

    pub fn serve_in_background(&self) -> thread::JoinHandle<io::Result<()>> {
        let server = self.clone();
        thread::spawn(move || server.serve())
    }

    pub fn serve_in_background_on(
        &self,
        listener: TcpListener,
    ) -> thread::JoinHandle<io::Result<()>> {
        let server = self.clone();
        thread::spawn(move || server.serve_on(listener))
    }

    /// Serve TLS-wrapped HTTPS on the configured `bind_addr`. The
    /// `tls_config` is shared across all connections (rustls
    /// `ServerConfig` is `Send + Sync`).
    pub fn serve_tls(&self, tls_config: std::sync::Arc<rustls::ServerConfig>) -> io::Result<()> {
        let listener = TcpListener::bind(&self.options.bind_addr)?;
        self.serve_tls_on(listener, tls_config)
    }

    pub fn serve_tls_on(
        &self,
        listener: TcpListener,
        tls_config: std::sync::Arc<rustls::ServerConfig>,
    ) -> io::Result<()> {
        for stream in listener.incoming() {
            match stream {
                Ok(stream) => match self.http_limiter.try_acquire() {
                    Some(permit) => {
                        let server = self.clone();
                        let cfg = tls_config.clone();
                        thread::spawn(move || {
                            let _guard = permit; // released on thread exit
                            let _ = server.handle_tls_connection(stream, cfg);
                        });
                    }
                    None => {
                        // Issue #572 slice 3: cross-transport cap shared
                        // with the clear-text limiter. Writing a 503 over
                        // a non-handshaken TLS socket is not meaningful,
                        // so reject by closing the raw socket. No TLS
                        // handshake, no thread spawn, no runtime call.
                        self.http_metrics
                            .record_reject(HttpTransport::Https, HttpRejectReason::CapExhausted);
                        let _ = stream.shutdown(std::net::Shutdown::Both);
                        drop(stream);
                    }
                },
                Err(err) => return Err(err),
            }
        }
        Ok(())
    }

    pub fn serve_tls_in_background(
        &self,
        tls_config: std::sync::Arc<rustls::ServerConfig>,
    ) -> thread::JoinHandle<io::Result<()>> {
        let server = self.clone();
        thread::spawn(move || server.serve_tls(tls_config))
    }

    pub fn serve_tls_in_background_on(
        &self,
        listener: TcpListener,
        tls_config: std::sync::Arc<rustls::ServerConfig>,
    ) -> thread::JoinHandle<io::Result<()>> {
        let server = self.clone();
        thread::spawn(move || server.serve_tls_on(listener, tls_config))
    }

    fn handle_connection(&self, stream: TcpStream) -> io::Result<()> {
        let started = Instant::now();
        let result = self.handle_connection_inner(stream, started);
        let elapsed = started.elapsed().as_secs_f64();
        self.http_metrics
            .record_duration(HttpTransport::Http, elapsed);
        result
    }

    fn handle_connection_inner(
        &self,
        mut stream: TcpStream,
        started: Instant,
    ) -> io::Result<()> {
        stream.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
        stream.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;

        // Issue #570 slice 2: arm a deadline at handler spawn and
        // check at coarse boundaries. No hard pre-emption — a thread
        // blocked inside a true syscall is still bounded only by the
        // per-socket read/write timeouts.
        let deadline = started + self.handler_timeout;

        let request = HttpRequest::read_from(&mut stream, self.options.max_body_bytes)?;

        // Boundary (a): between request parse and route dispatch.
        if Instant::now() >= deadline {
            self.http_metrics
                .record_reject(HttpTransport::Http, HttpRejectReason::HandlerTimeout);
            Self::write_handler_timeout_503(&mut stream);
            return Ok(());
        }

        if self.try_route_streaming(&request, &mut stream)? {
            return Ok(());
        }
        let response = self.route(request);

        // Test-only injected slow downstream (issue #570 slice 2
        // integration test). Production builds set this to 0, so this
        // is a single relaxed atomic load on the hot path.
        let inject_ms = self.slow_inject_ms.load(Ordering::Relaxed);
        if inject_ms > 0 {
            thread::sleep(Duration::from_millis(inject_ms));
        }

        // Boundary (b): between route dispatch and response write.
        if Instant::now() >= deadline {
            self.http_metrics
                .record_reject(HttpTransport::Http, HttpRejectReason::HandlerTimeout);
            Self::write_handler_timeout_503(&mut stream);
            return Ok(());
        }

        stream.write_all(&response.to_http_bytes())?;
        stream.flush()?;
        Ok(())
    }

    /// Best-effort 503 emitted when the per-handler deadline expires
    /// at a coarse boundary. Writes are swallowed — the caller has
    /// already exceeded its budget, so we do not propagate write
    /// errors. Permit drop happens on the handler thread's normal
    /// exit path.
    fn write_handler_timeout_503<S: Write>(stream: &mut S) {
        const RESPONSE: &[u8] = b"HTTP/1.1 503 Service Unavailable\r\n\
            Connection: close\r\n\
            Content-Length: 0\r\n\
            \r\n";
        let _ = stream.write_all(RESPONSE);
        let _ = stream.flush();
    }

    fn handle_tls_connection(
        &self,
        tcp: TcpStream,
        tls_config: std::sync::Arc<rustls::ServerConfig>,
    ) -> io::Result<()> {
        let started = Instant::now();
        let result = self.handle_tls_connection_inner(tcp, tls_config, started);
        let elapsed = started.elapsed().as_secs_f64();
        self.http_metrics
            .record_duration(HttpTransport::Https, elapsed);
        result
    }

    fn handle_tls_connection_inner(
        &self,
        tcp: TcpStream,
        tls_config: std::sync::Arc<rustls::ServerConfig>,
        started: Instant,
    ) -> io::Result<()> {
        tcp.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
        tcp.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;

        // Issue #572 slice 3: per-handler deadline applies to TLS
        // handlers too — same handler-thread scaffolding as slice 2.
        let deadline = started + self.handler_timeout;

        let mut tls_stream = match self::tls::accept_tls(tls_config, tcp) {
            Ok(s) => s,
            Err(err) => {
                tracing::warn!(
                    target: "reddb::http_tls",
                    err = %err,
                    "TLS handshake failed"
                );
                return Err(err);
            }
        };
        let request = match HttpRequest::read_from(&mut tls_stream, self.options.max_body_bytes) {
            Ok(req) => req,
            Err(err) => {
                tracing::warn!(
                    target: "reddb::http_tls",
                    err = %err,
                    "TLS request parse failed"
                );
                return Err(err);
            }
        };

        // Boundary (a): between request parse and route dispatch.
        if Instant::now() >= deadline {
            self.http_metrics
                .record_reject(HttpTransport::Https, HttpRejectReason::HandlerTimeout);
            Self::write_handler_timeout_503(&mut tls_stream);
            return Ok(());
        }

        if self.try_route_streaming(&request, &mut tls_stream)? {
            return Ok(());
        }
        let response = self.route(request);

        // Test-only injected slow downstream (issue #572 slice 3
        // integration test). Production sets this to 0.
        let inject_ms = self.slow_inject_ms.load(Ordering::Relaxed);
        if inject_ms > 0 {
            thread::sleep(Duration::from_millis(inject_ms));
        }

        // Boundary (b): between route dispatch and response write.
        if Instant::now() >= deadline {
            self.http_metrics
                .record_reject(HttpTransport::Https, HttpRejectReason::HandlerTimeout);
            Self::write_handler_timeout_503(&mut tls_stream);
            return Ok(());
        }

        tls_stream.write_all(&response.to_http_bytes())?;
        tls_stream.flush()?;
        Ok(())
    }
}

/// Pre-render the limiter-reject 503 response. The `Retry-After`
/// value comes from the resolved HTTP limits (issue #574 slice 5)
/// and is fixed for the lifetime of the server, so we build the
/// bytes once and hand a shared `Arc<Vec<u8>>` to every accept loop.
fn build_reject_503_bytes(retry_after_secs: u64) -> Vec<u8> {
    format!(
        "HTTP/1.1 503 Service Unavailable\r\n\
         Connection: close\r\n\
         Content-Length: 0\r\n\
         Retry-After: {retry_after_secs}\r\n\
         \r\n"
    )
    .into_bytes()
}