Skip to main content

reddb_server/
server.rs

1//! Minimal HTTP server for RedDB management and remote access.
2
3pub(crate) use crate::application::json_input::{
4    json_bool_field, json_f32_field, json_string_field, json_usize_field,
5};
6pub(crate) use crate::application::{
7    AdminUseCases, CatalogUseCases, CreateDocumentInput, CreateEdgeInput, CreateEntityOutput,
8    CreateKvInput, CreateNodeEmbeddingInput, CreateNodeGraphLinkInput, CreateNodeInput,
9    CreateNodeTableLinkInput, CreateRowInput, CreateVectorInput, DeleteEntityInput, EntityUseCases,
10    ExecuteQueryInput, ExplainQueryInput, GraphCentralityInput, GraphClusteringInput,
11    GraphCommunitiesInput, GraphComponentsInput, GraphCyclesInput, GraphHitsInput,
12    GraphNeighborhoodInput, GraphPersonalizedPageRankInput, GraphShortestPathInput,
13    GraphTopologicalSortInput, GraphTraversalInput, GraphUseCases, InspectNativeArtifactInput,
14    NativeUseCases, PatchEntityInput, PatchEntityOperation, PatchEntityOperationType,
15    QueryUseCases, SearchHybridInput, SearchIvfInput, SearchMultimodalInput, SearchSimilarInput,
16    SearchTextInput, TreeUseCases,
17};
18use std::collections::{BTreeMap, HashMap};
19use std::io::{self, Read, Write};
20use std::net::{TcpListener, TcpStream};
21use std::sync::atomic::{AtomicU64, Ordering};
22use std::thread;
23use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
24
25use std::sync::Arc;
26
27use crate::api::{RedDBError, RedDBOptions, RedDBResult};
28use crate::auth::store::AuthStore;
29use crate::catalog::{CatalogModelSnapshot, CollectionDescriptor, CollectionModel, SchemaMode};
30use crate::health::{HealthProvider, HealthReport, HealthState};
31use crate::json::{parse_json, to_vec as json_to_vec, Map, Value as JsonValue};
32use crate::runtime::{
33    RedDBRuntime, RuntimeFilter, RuntimeFilterValue, RuntimeGraphCentralityAlgorithm,
34    RuntimeGraphCentralityResult, RuntimeGraphClusteringResult, RuntimeGraphCommunityAlgorithm,
35    RuntimeGraphCommunityResult, RuntimeGraphComponentsMode, RuntimeGraphComponentsResult,
36    RuntimeGraphCyclesResult, RuntimeGraphDirection, RuntimeGraphHitsResult,
37    RuntimeGraphNeighborhoodResult, RuntimeGraphPathAlgorithm, RuntimeGraphPathResult,
38    RuntimeGraphPattern, RuntimeGraphProjection, RuntimeGraphTopologicalSortResult,
39    RuntimeGraphTraversalResult, RuntimeGraphTraversalStrategy, RuntimeIvfSearchResult,
40    RuntimeQueryWeights, RuntimeStats, ScanCursor, ScanPage,
41};
42use crate::storage::schema::Value;
43use crate::storage::unified::devx::refs::{NodeRef, TableRef, VectorRef};
44use crate::storage::unified::dsl::{MatchComponents, QueryResult as DslQueryResult};
45use crate::storage::unified::{MetadataValue, RefTarget, SparseVector};
46use crate::storage::{CrossRef, EntityData, EntityId, EntityKind, SimilarResult, UnifiedEntity};
47
48fn analytics_job_json(job: &crate::PhysicalAnalyticsJob) -> JsonValue {
49    crate::presentation::admin_json::analytics_job_json(job)
50}
51
52#[cfg(test)]
53mod tests {
54    use super::*;
55    use crate::api::RedDBOptions;
56    use crate::health::HealthReport;
57    use crate::service_cli::{
58        TransportListenerFailure, TransportListenerState, TransportReadiness,
59    };
60
61    #[test]
62    fn server_options_default_http_body_limit_is_32_mib() {
63        assert_eq!(ServerOptions::default().max_body_bytes, 32 * 1024 * 1024);
64    }
65
66    #[test]
67    fn health_json_reports_transport_listeners() {
68        let runtime = RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime");
69        let mut options = ServerOptions::default();
70        options.transport_readiness = TransportReadiness {
71            active: vec![TransportListenerState {
72                transport: "grpc".to_string(),
73                bind_addr: "127.0.0.1:5000".to_string(),
74                explicit: true,
75            }],
76            failed: vec![TransportListenerFailure {
77                transport: "http".to_string(),
78                bind_addr: "127.0.0.1:5000".to_string(),
79                explicit: false,
80                reason: "http listener bind 127.0.0.1:5000: address in use".to_string(),
81            }],
82        };
83        let server = RedDBServer::with_options(runtime, options);
84
85        let payload = server.health_json_with_transport(&HealthReport::healthy());
86        let JsonValue::Object(root) = payload else {
87            panic!("health payload should be an object");
88        };
89        let Some(JsonValue::Object(listeners)) = root.get("transport_listeners") else {
90            panic!("health payload should include transport_listeners");
91        };
92        let Some(JsonValue::Array(active)) = listeners.get("active") else {
93            panic!("transport_listeners.active should be an array");
94        };
95        let Some(JsonValue::Array(failed)) = listeners.get("failed") else {
96            panic!("transport_listeners.failed should be an array");
97        };
98
99        assert_eq!(active.len(), 1);
100        assert_eq!(failed.len(), 1);
101    }
102}
103
104fn graph_projection_json(projection: &crate::PhysicalGraphProjection) -> JsonValue {
105    crate::presentation::admin_json::graph_projection_json(projection)
106}
107
108mod axum_edge;
109pub mod handlers_admin;
110mod handlers_admin_metrics;
111mod handlers_admin_status;
112mod handlers_ai;
113mod handlers_ai_model_cache;
114mod handlers_auth;
115mod handlers_backup;
116mod handlers_browser_auth;
117mod handlers_capabilities;
118mod handlers_collection_policy;
119mod handlers_ec;
120pub(crate) mod handlers_entity;
121mod handlers_failover;
122mod handlers_geo;
123mod handlers_graph;
124mod handlers_iam_policy;
125mod handlers_keyed;
126mod handlers_log;
127mod handlers_metrics;
128mod handlers_ops;
129mod handlers_ops_policy;
130/// Local `red ui` bridge: serves the UI bundle and fronts the embedded
131/// engine over RedWire-over-WebSocket for `file://` targets (issue #1042,
132/// ADR 0047/0049). Public so the `red ui` CLI command can drive it.
133pub mod ui_bridge;
134mod ws_edge;
135// `red ui` bundle resolver (issue #1043, ADR 0050): pin→URL resolution,
136// HTTPS download, SHA-256 verification, tgz extraction, and local cache
137// management for the pinned red-ui release asset.
138pub mod ui_bundle_resolver;
139// `red ui --token` credential handoff (issue #1048, ADR 0051): one-time
140// loopback nonce channel and served-UI auth-mode wiring.
141pub mod ui_auth;
142// `red server --ui` static bundle surface (issue #1047, ADR 0051): serve
143// the resolved red-ui bundle as inert static assets on the server's HTTP
144// edge so a remote browser can load it and connect back over RedWire-over-WS.
145mod ui_static;
146// `red ui <uri>` deep-link dispatch (issue #1046, ADR 0051): prefer the
147// installed desktop app via the `redui://` scheme, fall back to the served
148// browser bridge when no handler is registered. Decision + canonical
149// deep-link string live behind a testable seam.
150pub mod ui_deeplink;
151// `pub(crate)` so the RedWire input-stream path (issue #764 / S5)
152// can reuse the canonical S4 INSERT builders / identifier checks
153// (`build_insert_sql`, `is_safe_sql_identifier`) rather than fork
154// the SQL-escaping logic.
155pub(crate) mod handlers_query;
156mod handlers_replication;
157mod handlers_topology;
158mod handlers_vcs;
159mod handlers_vector;
160pub mod header_escape_guard;
161pub mod http_connection_limiter;
162pub mod http_handler_metrics;
163pub mod http_limits;
164pub mod http_principal_limiter;
165pub mod http_request_metrics;
166pub mod ingest_pipeline;
167pub mod output_stream;
168mod patch_support;
169mod request_body;
170mod request_context;
171mod route_catalog;
172mod routes;
173mod routing;
174mod serverless_support;
175pub mod tls;
176mod transport;
177
178use self::handlers_ai::*;
179use self::handlers_entity::*;
180use self::handlers_graph::*;
181use self::handlers_keyed::*;
182use self::handlers_metrics::*;
183use self::handlers_ops::*;
184use self::handlers_query::*;
185use self::http_connection_limiter::{
186    HandlerDeadline, HttpConnectionLimiter, MonotonicClock, SystemMonotonicClock,
187};
188use self::http_handler_metrics::{HttpHandlerMetrics, HttpRejectReason, HttpTransport};
189pub use self::http_limits::{
190    HttpLimitsCliInput, HttpLimitsResolved, DEFAULT_HANDLER_TIMEOUT_MS, DEFAULT_RETRY_AFTER_SECS,
191};
192use self::http_principal_limiter::PrincipalConnectionLimiter;
193use self::http_request_metrics::HttpRequestMetrics;
194use self::patch_support::*;
195use self::request_body::*;
196use self::routing::*;
197use self::serverless_support::*;
198use self::transport::*;
199
200/// PLAN.md Phase 6.2 — endpoint segregation. A given HTTP listener
201/// can serve either every public surface (`Public`, default) or a
202/// restricted slice (`AdminOnly`, `MetricsOnly`). The route filter at
203/// the top of `route()` consults this so a port bound only to
204/// loopback for admin work won't accidentally hand out DML.
205#[derive(Debug, Clone, Copy, PartialEq, Eq)]
206pub enum ServerSurface {
207    /// Everything routed normally (default — matches v0 behaviour).
208    Public,
209    /// Only `/admin/*`, `/metrics`, and `/health/*`. Other paths
210    /// return 404. Intended for `RED_ADMIN_BIND` operator listeners
211    /// which default to `127.0.0.1`.
212    AdminOnly,
213    /// Only `/metrics` and `/health/*`. Intended for
214    /// `RED_METRICS_BIND` Prometheus scrape ports that may be
215    /// exposed to non-admin networks.
216    MetricsOnly,
217}
218
219#[derive(Debug, Clone)]
220pub struct ServerOptions {
221    pub bind_addr: String,
222    pub max_body_bytes: usize,
223    pub read_timeout_ms: u64,
224    pub write_timeout_ms: u64,
225    pub max_scan_limit: usize,
226    /// Which subset of paths this listener serves. Defaults to
227    /// `Public`. Set to `AdminOnly` / `MetricsOnly` for dedicated
228    /// admin / scrape ports (PLAN.md Phase 6.2).
229    pub surface: ServerSurface,
230    pub transport_readiness: crate::service_cli::TransportReadiness,
231    /// Allowed `Origin` values for the RedWire-over-WSS browser endpoint
232    /// (issue #935, ADR 0036). WebSocket is not covered by CORS, so the
233    /// upgrade is gated on an explicit allowlist to block Cross-Site
234    /// WebSocket Hijacking. **Default-deny:** empty means the `/redwire`
235    /// route is not mounted at all — operators opt in by configuring at
236    /// least one origin. Matched exactly (scheme+host+port), e.g.
237    /// `https://app.example.com`.
238    pub websocket_allowed_origins: Vec<String>,
239    /// `red server --ui` (issue #1047, ADR 0051). When `Some`, this
240    /// listener also serves the resolved red-ui bundle from this directory
241    /// as inert static assets (`/` → `index.html`), alongside the existing
242    /// API. The assets carry no credential, so exposing them is safe by
243    /// construction — auth lives on the RedWire data endpoint, not the
244    /// asset path. Network reach is governed entirely by `bind_addr`
245    /// (default localhost), so this never widens reach on its own.
246    /// `None` (the default) leaves the listener API-only.
247    pub ui_dir: Option<std::path::PathBuf>,
248}
249
250pub const DEFAULT_HTTP_MAX_BODY_BYTES: usize = 32 * 1024 * 1024;
251
252impl Default for ServerOptions {
253    fn default() -> Self {
254        Self {
255            bind_addr: "127.0.0.1:5000".to_string(),
256            max_body_bytes: DEFAULT_HTTP_MAX_BODY_BYTES,
257            read_timeout_ms: 5_000,
258            write_timeout_ms: 5_000,
259            max_scan_limit: 1_000,
260            surface: ServerSurface::Public,
261            transport_readiness: crate::service_cli::TransportReadiness::default(),
262            websocket_allowed_origins: Vec::new(),
263            ui_dir: None,
264        }
265    }
266}
267
268/// Replication state exposed to the HTTP server.
269pub struct ServerReplicationState {
270    pub config: crate::replication::ReplicationConfig,
271    pub primary: Option<crate::replication::primary::PrimaryReplication>,
272}
273
274#[derive(Clone)]
275pub struct RedDBServer {
276    runtime: RedDBRuntime,
277    options: ServerOptions,
278    auth_store: Option<Arc<AuthStore>>,
279    replication: Option<Arc<ServerReplicationState>>,
280    /// Bounded handler-thread admission for the clear-text HTTP accept
281    /// loop (issue #570 slice 1). Cloned with the server; `Clone` of
282    /// `HttpConnectionLimiter` shares an `Arc` so every serve loop on
283    /// the same `RedDBServer` shares one cap.
284    http_limiter: HttpConnectionLimiter,
285    /// Per-handler total-time deadline (issue #570 slice 2). Each
286    /// clear-text handler thread arms a deadline at spawn and bails
287    /// with a best-effort 503 at coarse boundaries between request
288    /// parse, route dispatch, and response write. Hard-coded to 30s
289    /// here; the config knob lands in slice 5.
290    handler_timeout: Duration,
291    /// Monotonic clock the per-handler deadline (issue #621) is armed
292    /// against — the same [`MonotonicClock`] abstraction the limiter
293    /// uses. Production wires [`SystemMonotonicClock`] (real wall time);
294    /// tests inject a fake to drive timeout expiry deterministically
295    /// without `sleep()`. Shared via `Arc` so cloned server handles
296    /// (e.g. `serve_in_background`) read the same clock.
297    handler_clock: Arc<dyn MonotonicClock>,
298    /// Test-only synchronous sleep injected between route dispatch and
299    /// response write so an integration test can simulate a slow
300    /// downstream tripping the deadline. Default 0 (no-op). Shared via
301    /// `Arc` so a cloned `RedDBServer` (e.g. `serve_in_background`)
302    /// observes flips from the originating handle. Production callers
303    /// have no way to set this — the setter is `#[doc(hidden)]`.
304    slow_inject_ms: Arc<AtomicU64>,
305    /// Prometheus metrics for the HTTP handler-thread pool (issue
306    /// #573 slice 4). Records rejections (cap_exhausted /
307    /// handler_timeout) and per-handler duration histograms. Cloned
308    /// with the server via `Arc` so every serve loop on the same
309    /// `RedDBServer` writes to one set of counters.
310    http_metrics: HttpHandlerMetrics,
311    /// Issue #1239 / PRD #1237 — `reddb_http_requests_total` counters by
312    /// `method`, matched route template, and status class. Recorded once
313    /// per handled request in `route()`; read by `/metrics` and the
314    /// operational telemetry read model. Cloned with the server via `Arc`
315    /// so every serve loop on the same `RedDBServer` shares one counter set.
316    http_request_metrics: HttpRequestMetrics,
317    /// `Retry-After` value (seconds) emitted in the async edge's
318    /// capacity-reject 503 path (issue #574 slice 5). Read on the reject
319    /// path in `axum_edge`.
320    retry_after_secs: u64,
321    /// Issue #934 / PRD #930 — per-principal concurrent in-flight cap. The
322    /// global `http_limiter` bounds *total* in-flight work (async
323    /// backpressure); this bounds any *single* principal's share so one
324    /// abusive caller can't drain the whole global cap and starve the rest.
325    /// Consulted at the async edge after global admission; a principal over
326    /// its cap gets a structured 429 refusal. Shared via `Arc` (inside the
327    /// limiter) so cloned server handles enforce one set of counters.
328    principal_limiter: PrincipalConnectionLimiter,
329    /// Issue #761 / S2 — process-wide output-stream capacity registry.
330    /// Shared via `Arc` so cloned server handles (e.g.
331    /// `serve_in_background`) all enforce against one set of counters.
332    /// The HTTP NDJSON path acquires through this in
333    /// `try_route_streaming` before invoking the handler; the guard is
334    /// dropped on return so any stream-end path (success / mid-stream
335    /// error / snapshot expiry / panic unwind) releases the slot.
336    pub(crate) stream_capacity: Arc<output_stream::StreamCapacityRegistry>,
337    /// Issue #766 / S7 — resume coordinator ledger. Tracks
338    /// `(snapshot_lsn → opened_at_ms, ttl_ms)` for resume-eligibility
339    /// checks. Shared via `Arc` so cloned server handles see one
340    /// ledger.
341    pub(crate) lease_registry: Arc<output_stream::LeaseRegistry>,
342    /// Issue #807 / PRD #750 — `/query/stream` cursor registry. Holds the
343    /// opaque token → (snapshot pin, TTL, tenant, principal, query) entries
344    /// that let a client resume or reference a streamed read. Shared via
345    /// `Arc` so cloned server handles see one registry.
346    pub(crate) cursor_registry: Arc<output_stream::CursorRegistry>,
347}
348
349/// Default per-handler total-time budget (issue #571 slice 2).
350const DEFAULT_HANDLER_TIMEOUT: Duration = Duration::from_millis(30_000);
351
352#[derive(Debug, Clone, Copy, PartialEq, Eq)]
353enum ServerlessWarmupScope {
354    Indexes,
355    GraphProjections,
356    AnalyticsJobs,
357    NativeArtifacts,
358}
359
360#[derive(Debug, Clone, Copy, PartialEq, Eq)]
361enum DeploymentProfile {
362    Embedded,
363    Server,
364    Serverless,
365}
366
367fn percent_decode_path_segment(input: &str) -> Result<String, String> {
368    let bytes = input.as_bytes();
369    let mut out = Vec::with_capacity(bytes.len());
370    let mut index = 0;
371    while index < bytes.len() {
372        match bytes[index] {
373            b'%' => {
374                if index + 2 >= bytes.len() {
375                    return Err("truncated percent escape".to_string());
376                }
377                let high = hex_value(bytes[index + 1])
378                    .ok_or_else(|| "invalid percent escape".to_string())?;
379                let low = hex_value(bytes[index + 2])
380                    .ok_or_else(|| "invalid percent escape".to_string())?;
381                out.push((high << 4) | low);
382                index += 3;
383            }
384            byte => {
385                out.push(byte);
386                index += 1;
387            }
388        }
389    }
390    String::from_utf8(out).map_err(|_| "path segment is not valid UTF-8".to_string())
391}
392
393fn hex_value(byte: u8) -> Option<u8> {
394    match byte {
395        b'0'..=b'9' => Some(byte - b'0'),
396        b'a'..=b'f' => Some(byte - b'a' + 10),
397        b'A'..=b'F' => Some(byte - b'A' + 10),
398        _ => None,
399    }
400}
401
402#[derive(Debug, Clone)]
403struct ParsedQueryRequest {
404    query: String,
405    entity_types: Option<Vec<String>>,
406    capabilities: Option<Vec<String>>,
407    /// Optional positional `$N` bind parameters (#358). When `Some`, the
408    /// query handler runs the user_params binder before executing.
409    /// Absence preserves the legacy `query`-only behavior.
410    params: Option<Vec<Value>>,
411}
412
413#[derive(Debug, Clone, Copy)]
414enum PatchOperationType {
415    Set,
416    Replace,
417    Unset,
418}
419
420#[derive(Debug, Clone)]
421struct PatchOperation {
422    op: PatchOperationType,
423    path: Vec<String>,
424    value: Option<JsonValue>,
425}
426
427impl RedDBServer {
428    pub fn new(runtime: RedDBRuntime) -> Self {
429        Self::with_options(runtime, ServerOptions::default())
430    }
431
432    pub fn from_database_options(
433        db_options: RedDBOptions,
434        server_options: ServerOptions,
435    ) -> RedDBResult<Self> {
436        let runtime = RedDBRuntime::with_options(db_options)?;
437        Ok(Self::with_options(runtime, server_options))
438    }
439
440    pub fn with_options(runtime: RedDBRuntime, options: ServerOptions) -> Self {
441        Self {
442            runtime,
443            options,
444            auth_store: None,
445            replication: None,
446            http_limiter: HttpConnectionLimiter::with_default_cap(),
447            handler_timeout: DEFAULT_HANDLER_TIMEOUT,
448            handler_clock: Arc::new(SystemMonotonicClock::new()),
449            slow_inject_ms: Arc::new(AtomicU64::new(0)),
450            http_metrics: HttpHandlerMetrics::new(),
451            http_request_metrics: HttpRequestMetrics::new(),
452            retry_after_secs: DEFAULT_RETRY_AFTER_SECS,
453            principal_limiter: PrincipalConnectionLimiter::new(
454                http_limits::DEFAULT_MAX_INFLIGHT_PER_PRINCIPAL,
455            ),
456            stream_capacity: output_stream::StreamCapacityRegistry::new(),
457            lease_registry: output_stream::LeaseRegistry::new(),
458            cursor_registry: output_stream::CursorRegistry::new(),
459        }
460    }
461
462    #[doc(hidden)]
463    pub fn stream_capacity(&self) -> &Arc<output_stream::StreamCapacityRegistry> {
464        &self.stream_capacity
465    }
466
467    #[doc(hidden)]
468    pub fn lease_registry(&self) -> &Arc<output_stream::LeaseRegistry> {
469        &self.lease_registry
470    }
471
472    #[doc(hidden)]
473    pub fn cursor_registry(&self) -> &Arc<output_stream::CursorRegistry> {
474        &self.cursor_registry
475    }
476
477    #[doc(hidden)]
478    pub fn http_metrics(&self) -> &HttpHandlerMetrics {
479        &self.http_metrics
480    }
481
482    /// HTTP request/error volume counters (`reddb_http_requests_total`).
483    /// Read by `/metrics` and the operational telemetry read model.
484    #[doc(hidden)]
485    pub fn http_request_metrics(&self) -> &HttpRequestMetrics {
486        &self.http_request_metrics
487    }
488
489    /// Visible for tests. Lets the integration test in
490    /// `tests/http_connection_limiter.rs` saturate the cap and observe
491    /// `503 Service Unavailable` responses without spinning up
492    /// thousands of sockets.
493    #[doc(hidden)]
494    pub fn with_http_limiter_cap(mut self, cap: usize) -> Self {
495        self.http_limiter = HttpConnectionLimiter::new(cap);
496        self
497    }
498
499    /// Stamp resolved HTTP limits onto the server (issue #574 slice 5).
500    /// Replaces the limiter cap, the per-handler deadline, and the
501    /// `Retry-After` value used by the limiter's reject path. All
502    /// values are assumed validated by [`http_limits::resolve_http_limits`].
503    pub fn with_http_limits(mut self, limits: HttpLimitsResolved) -> Self {
504        self.http_limiter = HttpConnectionLimiter::new(limits.max_handlers);
505        self.handler_timeout = Duration::from_millis(limits.handler_timeout_ms);
506        self.retry_after_secs = limits.retry_after_secs;
507        self.principal_limiter = PrincipalConnectionLimiter::new(limits.max_inflight_per_principal);
508        self
509    }
510
511    /// Visible for tests. Override the per-principal concurrent in-flight
512    /// cap (issue #934) so the integration test can saturate one principal
513    /// and observe the structured 429 refusal without standing up hundreds
514    /// of real sockets. `0` disables the per-principal cap.
515    #[doc(hidden)]
516    pub fn with_principal_inflight_cap(mut self, cap: usize) -> Self {
517        self.principal_limiter = PrincipalConnectionLimiter::new(cap);
518        self
519    }
520
521    #[doc(hidden)]
522    pub fn principal_limiter(&self) -> &PrincipalConnectionLimiter {
523        &self.principal_limiter
524    }
525
526    #[doc(hidden)]
527    pub fn retry_after_secs(&self) -> u64 {
528        self.retry_after_secs
529    }
530
531    #[doc(hidden)]
532    pub fn http_limiter(&self) -> &HttpConnectionLimiter {
533        &self.http_limiter
534    }
535
536    /// Visible for tests. Override the per-handler total-time deadline
537    /// (issue #570 slice 2). Default 30s.
538    #[doc(hidden)]
539    pub fn with_handler_timeout(mut self, timeout: Duration) -> Self {
540        self.handler_timeout = timeout;
541        self
542    }
543
544    #[doc(hidden)]
545    pub fn handler_timeout(&self) -> Duration {
546        self.handler_timeout
547    }
548
549    /// Visible for tests. Override the clock the per-handler deadline
550    /// (issue #621) is armed against, so timeout expiry can be driven
551    /// deterministically without real sleeps. Default is the real
552    /// monotonic clock.
553    #[doc(hidden)]
554    pub fn with_handler_clock(mut self, clock: Arc<dyn MonotonicClock>) -> Self {
555        self.handler_clock = clock;
556        self
557    }
558
559    /// Test hook: set a synchronous sleep (in ms) inserted between
560    /// route dispatch and response write. The integration test for
561    /// slice 2 sets a value greater than `handler_timeout` to trip
562    /// the deadline, then resets to 0 to verify recovery. Shared via
563    /// `Arc<AtomicU64>` so cloned server handles see the same flip.
564    #[doc(hidden)]
565    pub fn set_test_slow_inject_ms(&self, ms: u64) {
566        self.slow_inject_ms.store(ms, Ordering::Relaxed);
567    }
568
569    /// Attach an `AuthStore` for HTTP-layer authentication.
570    /// Also injects the store into the runtime so that `Value::Secret`
571    /// auto-encrypt/decrypt can reach the vault AES key.
572    pub fn with_auth(mut self, auth_store: Arc<AuthStore>) -> Self {
573        self.runtime.set_auth_store(Arc::clone(&auth_store));
574        self.auth_store = Some(auth_store);
575        self
576    }
577
578    /// Attach replication state for status and snapshot endpoints.
579    pub fn with_replication(mut self, state: Arc<ServerReplicationState>) -> Self {
580        self.replication = Some(state);
581        self
582    }
583
584    /// Set the `Origin` allowlist that enables the RedWire-over-WSS
585    /// browser endpoint (issue #935, ADR 0036). A non-empty list mounts
586    /// the `/redwire` upgrade route on the TLS edge; an empty list leaves
587    /// it unmounted (default-deny).
588    pub fn with_websocket_allowed_origins(mut self, origins: Vec<String>) -> Self {
589        self.options.websocket_allowed_origins = origins;
590        self
591    }
592
593    /// The configured RedWire-over-WSS `Origin` allowlist (issue #935).
594    pub(crate) fn websocket_allowed_origins(&self) -> &[String] {
595        &self.options.websocket_allowed_origins
596    }
597
598    /// Serve the resolved red-ui bundle from `dir` as inert static assets
599    /// on this listener (`red server --ui`, issue #1047, ADR 0051). The
600    /// bundle is served as a fallback after the API routing table, so it
601    /// can never shadow a real endpoint; `GET /` resolves to the bundle's
602    /// `index.html`.
603    pub fn with_ui_dir(mut self, dir: std::path::PathBuf) -> Self {
604        self.options.ui_dir = Some(dir);
605        self
606    }
607
608    /// The directory this listener serves the red-ui bundle from, if
609    /// `--ui` is active (issue #1047).
610    pub(crate) fn ui_dir(&self) -> Option<&std::path::Path> {
611        self.options.ui_dir.as_deref()
612    }
613
614    /// Enable the browser credential layer (issue #936, PRD #930) by
615    /// wiring a hybrid-token authority into the runtime. Once set, the
616    /// `/auth/browser/*` HTTP endpoints issue/rotate the access+refresh
617    /// pair and the RedWire WS handshake accepts the access JWT. Left
618    /// unset, the browser flow is inert (default-deny), matching the WS
619    /// endpoint's own opt-in posture.
620    pub fn with_browser_token_authority(
621        self,
622        authority: Arc<crate::auth::browser_token::BrowserTokenAuthority>,
623    ) -> Self {
624        self.runtime.set_browser_token_authority(Some(authority));
625        self
626    }
627
628    pub fn runtime(&self) -> &RedDBRuntime {
629        &self.runtime
630    }
631
632    pub fn options(&self) -> &ServerOptions {
633        &self.options
634    }
635
636    fn query_use_cases(&self) -> QueryUseCases<'_, RedDBRuntime> {
637        QueryUseCases::new(&self.runtime)
638    }
639
640    fn admin_use_cases(&self) -> AdminUseCases<'_, RedDBRuntime> {
641        AdminUseCases::new(&self.runtime)
642    }
643
644    fn entity_use_cases(&self) -> EntityUseCases<'_, RedDBRuntime> {
645        EntityUseCases::new(&self.runtime)
646    }
647
648    fn catalog_use_cases(&self) -> CatalogUseCases<'_, RedDBRuntime> {
649        CatalogUseCases::new(&self.runtime)
650    }
651
652    fn graph_use_cases(&self) -> GraphUseCases<'_, RedDBRuntime> {
653        GraphUseCases::new(&self.runtime)
654    }
655
656    fn native_use_cases(&self) -> NativeUseCases<'_, RedDBRuntime> {
657        NativeUseCases::new(&self.runtime)
658    }
659
660    fn tree_use_cases(&self) -> TreeUseCases<'_, RedDBRuntime> {
661        TreeUseCases::new(&self.runtime)
662    }
663
664    fn transport_readiness_json(&self) -> JsonValue {
665        let active = self
666            .options
667            .transport_readiness
668            .active
669            .iter()
670            .map(|listener| {
671                let mut object = Map::new();
672                object.insert(
673                    "transport".to_string(),
674                    JsonValue::String(listener.transport.clone()),
675                );
676                object.insert(
677                    "bind_addr".to_string(),
678                    JsonValue::String(listener.bind_addr.clone()),
679                );
680                object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
681                JsonValue::Object(object)
682            })
683            .collect();
684        let failed = self
685            .options
686            .transport_readiness
687            .failed
688            .iter()
689            .map(|listener| {
690                let mut object = Map::new();
691                object.insert(
692                    "transport".to_string(),
693                    JsonValue::String(listener.transport.clone()),
694                );
695                object.insert(
696                    "bind_addr".to_string(),
697                    JsonValue::String(listener.bind_addr.clone()),
698                );
699                object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
700                object.insert(
701                    "reason".to_string(),
702                    JsonValue::String(listener.reason.clone()),
703                );
704                JsonValue::Object(object)
705            })
706            .collect();
707
708        let mut object = Map::new();
709        object.insert("active".to_string(), JsonValue::Array(active));
710        object.insert("failed".to_string(), JsonValue::Array(failed));
711        JsonValue::Object(object)
712    }
713
714    fn handle_grpc_discovery(&self) -> HttpResponse {
715        let mut methods = Map::new();
716        methods.insert(
717            "query".to_string(),
718            JsonValue::String("reddb.v1.RedDB/Query".to_string()),
719        );
720        methods.insert(
721            "batch_query".to_string(),
722            JsonValue::String("reddb.v1.RedDB/BatchQuery".to_string()),
723        );
724        methods.insert(
725            "health".to_string(),
726            JsonValue::String("reddb.v1.RedDB/Health".to_string()),
727        );
728        methods.insert(
729            "prepare".to_string(),
730            JsonValue::String("reddb.v1.RedDB/Prepare".to_string()),
731        );
732        methods.insert(
733            "execute_prepared".to_string(),
734            JsonValue::String("reddb.v1.RedDB/ExecutePrepared".to_string()),
735        );
736
737        let mut examples = Map::new();
738        examples.insert(
739            "query".to_string(),
740            JsonValue::String(
741                "grpcurl -plaintext -d '{\"query\":\"SELECT 1\"}' 127.0.0.1:5000 reddb.v1.RedDB/Query"
742                    .to_string(),
743            ),
744        );
745        examples.insert(
746            "query_with_params".to_string(),
747            JsonValue::String(
748                "grpcurl -plaintext -d '{\"query\":\"SELECT $1 AS value\",\"params\":[{\"intValue\":42}]}' 127.0.0.1:5000 reddb.v1.RedDB/Query"
749                    .to_string(),
750            ),
751        );
752        examples.insert(
753            "health".to_string(),
754            JsonValue::String(
755                "grpcurl -plaintext -d '{}' 127.0.0.1:5000 reddb.v1.RedDB/Health".to_string(),
756            ),
757        );
758
759        let mut object = Map::new();
760        object.insert("ok".to_string(), JsonValue::Bool(true));
761        object.insert(
762            "service".to_string(),
763            JsonValue::String("reddb.v1.RedDB".to_string()),
764        );
765        object.insert(
766            "package".to_string(),
767            JsonValue::String("reddb.v1".to_string()),
768        );
769        object.insert(
770            "proto".to_string(),
771            JsonValue::String("crates/reddb-grpc-proto/proto/reddb.proto".to_string()),
772        );
773        object.insert("methods".to_string(), JsonValue::Object(methods));
774        object.insert("examples".to_string(), JsonValue::Object(examples));
775        object.insert(
776            "transport_listeners".to_string(),
777            self.transport_readiness_json(),
778        );
779        object.insert(
780            "hint".to_string(),
781            JsonValue::String(
782                "If grpcurl cannot list services, pass the proto file with -import-path crates/reddb-grpc-proto/proto -proto reddb.proto."
783                    .to_string(),
784            ),
785        );
786        json_response(200, JsonValue::Object(object))
787    }
788
789    fn handle_query_contract(&self) -> HttpResponse {
790        let mut examples = Map::new();
791        examples.insert(
792            "raw_sql".to_string(),
793            JsonValue::String("curl -sS http://127.0.0.1:5000/query -d 'SELECT 1'".to_string()),
794        );
795        examples.insert(
796            "json_query".to_string(),
797            JsonValue::String(
798                "curl -sS http://127.0.0.1:5000/query -H 'content-type: application/json' -d '{\"query\":\"SELECT 1\"}'"
799                    .to_string(),
800            ),
801        );
802        examples.insert(
803            "json_query_with_params".to_string(),
804            JsonValue::String(
805                "curl -sS http://127.0.0.1:5000/query -H 'content-type: application/json' -d '{\"query\":\"SELECT $1 AS value\",\"params\":[42]}'"
806                    .to_string(),
807            ),
808        );
809
810        let mut request_body = Map::new();
811        request_body.insert(
812            "query".to_string(),
813            JsonValue::String("required string".to_string()),
814        );
815        request_body.insert(
816            "params".to_string(),
817            JsonValue::String("optional array".to_string()),
818        );
819
820        let mut response_shape = Map::new();
821        response_shape.insert(
822            "columns".to_string(),
823            JsonValue::String("projected column names".to_string()),
824        );
825        response_shape.insert(
826            "records[].values".to_string(),
827            JsonValue::String("only projected values".to_string()),
828        );
829        response_shape.insert(
830            "records[].meta".to_string(),
831            JsonValue::String("internal metadata when present".to_string()),
832        );
833
834        let mut object = Map::new();
835        object.insert("ok".to_string(), JsonValue::Bool(false));
836        object.insert(
837            "code".to_string(),
838            JsonValue::String("method_not_allowed".to_string()),
839        );
840        object.insert(
841            "message".to_string(),
842            JsonValue::String("/query accepts POST requests".to_string()),
843        );
844        object.insert(
845            "hint".to_string(),
846            JsonValue::String(
847                "Send raw SQL in the body, or JSON with a string 'query' field.".to_string(),
848            ),
849        );
850        object.insert("method".to_string(), JsonValue::String("POST".to_string()));
851        object.insert("path".to_string(), JsonValue::String("/query".to_string()));
852        object.insert("request_body".to_string(), JsonValue::Object(request_body));
853        object.insert(
854            "response_shape".to_string(),
855            JsonValue::Object(response_shape),
856        );
857        object.insert("examples".to_string(), JsonValue::Object(examples));
858        object.insert(
859            "docs".to_string(),
860            JsonValue::String("https://reddb.io/docs/query".to_string()),
861        );
862
863        json_response(405, JsonValue::Object(object))
864            .with_header("Allow", http::HeaderValue::from_static("POST"))
865    }
866
867    fn handle_root_discovery(&self) -> HttpResponse {
868        let mut endpoints = Map::new();
869        endpoints.insert(
870            "health".to_string(),
871            JsonValue::String("GET /health".to_string()),
872        );
873        endpoints.insert(
874            "ready".to_string(),
875            JsonValue::String("GET /ready".to_string()),
876        );
877        endpoints.insert(
878            "query".to_string(),
879            JsonValue::String("POST /query".to_string()),
880        );
881        endpoints.insert(
882            "query_readiness".to_string(),
883            JsonValue::String("GET /ready/query".to_string()),
884        );
885        endpoints.insert(
886            "catalog".to_string(),
887            JsonValue::String("GET /catalog".to_string()),
888        );
889        endpoints.insert(
890            "deployment_profiles".to_string(),
891            JsonValue::String("GET /deployment/profiles".to_string()),
892        );
893
894        let mut examples = Map::new();
895        examples.insert(
896            "http_raw_sql".to_string(),
897            JsonValue::String("curl -sS http://127.0.0.1:5000/query -d 'SELECT 1'".to_string()),
898        );
899        examples.insert(
900            "http_json_query".to_string(),
901            JsonValue::String(
902                "curl -sS http://127.0.0.1:5000/query -H 'content-type: application/json' -d '{\"query\":\"SELECT 1\"}'"
903                    .to_string(),
904            ),
905        );
906        examples.insert(
907            "http_json_query_with_params".to_string(),
908            JsonValue::String(
909                "curl -sS http://127.0.0.1:5000/query -H 'content-type: application/json' -d '{\"query\":\"SELECT $1 AS value\",\"params\":[42]}'"
910                    .to_string(),
911            ),
912        );
913
914        let mut object = Map::new();
915        object.insert("ok".to_string(), JsonValue::Bool(true));
916        object.insert(
917            "service".to_string(),
918            JsonValue::String("reddb".to_string()),
919        );
920        object.insert(
921            "version".to_string(),
922            JsonValue::String(env!("CARGO_PKG_VERSION").to_string()),
923        );
924        object.insert("endpoints".to_string(), JsonValue::Object(endpoints));
925        object.insert("examples".to_string(), JsonValue::Object(examples));
926        object.insert(
927            "docs".to_string(),
928            JsonValue::String("https://reddb.io/docs".to_string()),
929        );
930        object.insert(
931            "transport_listeners".to_string(),
932            self.transport_readiness_json(),
933        );
934        json_response(200, JsonValue::Object(object))
935    }
936
937    fn health_json_with_transport(&self, report: &HealthReport) -> JsonValue {
938        let mut value = crate::presentation::ops_json::health_json(report);
939        if let JsonValue::Object(ref mut object) = value {
940            object.insert(
941                "transport_listeners".to_string(),
942                self.transport_readiness_json(),
943            );
944        }
945        value
946    }
947
948    pub fn serve(&self) -> io::Result<()> {
949        let listener = TcpListener::bind(&self.options.bind_addr)?;
950        self.serve_on(listener)
951    }
952
953    /// Serve the async axum/hyper HTTP edge (issue #931) on the given
954    /// listener until it errors fatally. A dedicated multi-threaded tokio
955    /// runtime drives the I/O; the synchronous disk-backed engine is
956    /// reached via `spawn_blocking`. This replaces the retired
957    /// thread-per-connection accept loop and its `(2*num_cpus)` thread
958    /// cap — idle keep-alive connections are now cheap parked tasks.
959    pub fn serve_on(&self, listener: TcpListener) -> io::Result<()> {
960        let runtime = axum_edge::build_edge_runtime()?;
961        runtime.block_on(
962            self.clone()
963                .serve_edge_on_std(listener, HttpTransport::Http),
964        )
965    }
966
967    /// Accept and serve a single connection to completion, then return.
968    /// Used by tests that want a one-shot HTTP server alongside another
969    /// transport.
970    pub fn serve_one_on(&self, listener: TcpListener) -> io::Result<()> {
971        let runtime = axum_edge::build_background_edge_runtime()?;
972        let server = self.clone();
973        runtime.block_on(async move {
974            listener.set_nonblocking(true)?;
975            let listener = tokio::net::TcpListener::from_std(listener)?;
976            let (stream, _peer) = listener.accept().await?;
977            server.serve_edge_one(stream).await;
978            Ok(())
979        })
980    }
981
982    pub fn serve_in_background(&self) -> thread::JoinHandle<io::Result<()>> {
983        let server = self.clone();
984        thread::spawn(move || server.serve())
985    }
986
987    pub fn serve_in_background_on(
988        &self,
989        listener: TcpListener,
990    ) -> thread::JoinHandle<io::Result<()>> {
991        let server = self.clone();
992        thread::spawn(move || {
993            let runtime = axum_edge::build_background_edge_runtime()?;
994            runtime.block_on(server.serve_edge_on_std(listener, HttpTransport::Http))
995        })
996    }
997
998    /// Serve TLS-wrapped HTTPS on the configured `bind_addr`. The
999    /// `tls_config` is shared across all connections (rustls
1000    /// `ServerConfig` is `Send + Sync`).
1001    pub fn serve_tls(&self, tls_config: std::sync::Arc<rustls::ServerConfig>) -> io::Result<()> {
1002        let listener = TcpListener::bind(&self.options.bind_addr)?;
1003        self.serve_tls_on(listener, tls_config)
1004    }
1005
1006    pub fn serve_tls_on(
1007        &self,
1008        listener: TcpListener,
1009        tls_config: std::sync::Arc<rustls::ServerConfig>,
1010    ) -> io::Result<()> {
1011        let runtime = axum_edge::build_edge_runtime()?;
1012        let acceptor = axum_edge::tls_acceptor(tls_config);
1013        runtime.block_on(self.clone().serve_edge_tls_on_std(
1014            listener,
1015            acceptor,
1016            HttpTransport::Https,
1017        ))
1018    }
1019
1020    pub fn serve_tls_in_background(
1021        &self,
1022        tls_config: std::sync::Arc<rustls::ServerConfig>,
1023    ) -> thread::JoinHandle<io::Result<()>> {
1024        let server = self.clone();
1025        thread::spawn(move || server.serve_tls(tls_config))
1026    }
1027
1028    pub fn serve_tls_in_background_on(
1029        &self,
1030        listener: TcpListener,
1031        tls_config: std::sync::Arc<rustls::ServerConfig>,
1032    ) -> thread::JoinHandle<io::Result<()>> {
1033        let server = self.clone();
1034        thread::spawn(move || {
1035            let runtime = axum_edge::build_background_edge_runtime()?;
1036            let acceptor = axum_edge::tls_acceptor(tls_config);
1037            runtime.block_on(server.serve_edge_tls_on_std(listener, acceptor, HttpTransport::Https))
1038        })
1039    }
1040
1041    fn handle_connection(&self, stream: TcpStream) -> io::Result<()> {
1042        let started = Instant::now();
1043        let result = self.handle_connection_inner(stream);
1044        let elapsed = started.elapsed().as_secs_f64();
1045        self.http_metrics
1046            .record_duration(HttpTransport::Http, elapsed);
1047        result
1048    }
1049
1050    fn handle_connection_inner(&self, mut stream: TcpStream) -> io::Result<()> {
1051        stream.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
1052        stream.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;
1053
1054        // Issue #570 slice 2 / #621: arm a deadline at handler spawn and
1055        // check at coarse boundaries. Armed against the injectable
1056        // monotonic clock (#621) so timeout behaviour is deterministically
1057        // testable without real sleeps; production wires the real clock,
1058        // so this tracks wall time. No hard pre-emption — a thread blocked
1059        // inside a true syscall is still bounded only by the per-socket
1060        // read/write timeouts.
1061        let deadline = HandlerDeadline::arm(Arc::clone(&self.handler_clock), self.handler_timeout);
1062
1063        let request = HttpRequest::read_from(&mut stream, self.options.max_body_bytes)?;
1064
1065        // Boundary (a): between request parse and route dispatch.
1066        if deadline.expired() {
1067            self.http_metrics
1068                .record_reject(HttpTransport::Http, HttpRejectReason::HandlerTimeout);
1069            Self::write_handler_timeout_503(&mut stream);
1070            return Ok(());
1071        }
1072
1073        if self.try_route_streaming(&request, &mut stream)? {
1074            return Ok(());
1075        }
1076        let response = self.route(request);
1077
1078        // Test-only injected slow downstream (issue #570 slice 2
1079        // integration test). Production builds set this to 0, so this
1080        // is a single relaxed atomic load on the hot path.
1081        let inject_ms = self.slow_inject_ms.load(Ordering::Relaxed);
1082        if inject_ms > 0 {
1083            thread::sleep(Duration::from_millis(inject_ms));
1084        }
1085
1086        // Boundary (b): between route dispatch and response write.
1087        if deadline.expired() {
1088            self.http_metrics
1089                .record_reject(HttpTransport::Http, HttpRejectReason::HandlerTimeout);
1090            Self::write_handler_timeout_503(&mut stream);
1091            return Ok(());
1092        }
1093
1094        stream.write_all(&response.to_http_bytes())?;
1095        stream.flush()?;
1096        Ok(())
1097    }
1098
1099    /// Best-effort 503 emitted when the per-handler deadline expires
1100    /// at a coarse boundary. Writes are swallowed — the caller has
1101    /// already exceeded its budget, so we do not propagate write
1102    /// errors. Permit drop happens on the handler thread's normal
1103    /// exit path.
1104    fn write_handler_timeout_503<S: Write>(stream: &mut S) {
1105        const RESPONSE: &[u8] = b"HTTP/1.1 503 Service Unavailable\r\n\
1106            Connection: close\r\n\
1107            Content-Length: 0\r\n\
1108            \r\n";
1109        let _ = stream.write_all(RESPONSE);
1110        let _ = stream.flush();
1111    }
1112}