Skip to main content

reddb_server/
server.rs

1//! Minimal HTTP server for RedDB management and remote access.
2
3pub(crate) use crate::application::json_input::{
4    json_bool_field, json_f32_field, json_string_field, json_usize_field,
5};
6pub(crate) use crate::application::{
7    AdminUseCases, CatalogUseCases, CreateDocumentInput, CreateEdgeInput, CreateEntityOutput,
8    CreateKvInput, CreateNodeEmbeddingInput, CreateNodeGraphLinkInput, CreateNodeInput,
9    CreateNodeTableLinkInput, CreateRowInput, CreateVectorInput, DeleteEntityInput, EntityUseCases,
10    ExecuteQueryInput, ExplainQueryInput, GraphCentralityInput, GraphClusteringInput,
11    GraphCommunitiesInput, GraphComponentsInput, GraphCyclesInput, GraphHitsInput,
12    GraphNeighborhoodInput, GraphPersonalizedPageRankInput, GraphShortestPathInput,
13    GraphTopologicalSortInput, GraphTraversalInput, GraphUseCases, InspectNativeArtifactInput,
14    NativeUseCases, PatchEntityInput, PatchEntityOperation, PatchEntityOperationType,
15    QueryUseCases, SearchHybridInput, SearchIvfInput, SearchMultimodalInput, SearchSimilarInput,
16    SearchTextInput, TreeUseCases,
17};
18use std::collections::{BTreeMap, HashMap};
19use std::io::{self, Read, Write};
20use std::net::{TcpListener, TcpStream};
21use std::sync::atomic::{AtomicU64, Ordering};
22use std::thread;
23use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
24
25use std::sync::Arc;
26
27use crate::api::{RedDBError, RedDBOptions, RedDBResult};
28use crate::auth::store::AuthStore;
29use crate::catalog::{CatalogModelSnapshot, CollectionDescriptor, CollectionModel, SchemaMode};
30use crate::health::{HealthProvider, HealthReport, HealthState};
31use crate::json::{parse_json, to_vec as json_to_vec, Map, Value as JsonValue};
32use crate::runtime::{
33    RedDBRuntime, RuntimeFilter, RuntimeFilterValue, RuntimeGraphCentralityAlgorithm,
34    RuntimeGraphCentralityResult, RuntimeGraphClusteringResult, RuntimeGraphCommunityAlgorithm,
35    RuntimeGraphCommunityResult, RuntimeGraphComponentsMode, RuntimeGraphComponentsResult,
36    RuntimeGraphCyclesResult, RuntimeGraphDirection, RuntimeGraphHitsResult,
37    RuntimeGraphNeighborhoodResult, RuntimeGraphPathAlgorithm, RuntimeGraphPathResult,
38    RuntimeGraphPattern, RuntimeGraphProjection, RuntimeGraphTopologicalSortResult,
39    RuntimeGraphTraversalResult, RuntimeGraphTraversalStrategy, RuntimeIvfSearchResult,
40    RuntimeQueryWeights, RuntimeStats, ScanCursor, ScanPage,
41};
42use crate::storage::schema::Value;
43use crate::storage::unified::devx::refs::{NodeRef, TableRef, VectorRef};
44use crate::storage::unified::dsl::{MatchComponents, QueryResult as DslQueryResult};
45use crate::storage::unified::{MetadataValue, RefTarget, SparseVector};
46use crate::storage::{CrossRef, EntityData, EntityId, EntityKind, SimilarResult, UnifiedEntity};
47
48fn analytics_job_json(job: &crate::PhysicalAnalyticsJob) -> JsonValue {
49    crate::presentation::admin_json::analytics_job_json(job)
50}
51
52#[cfg(test)]
53mod tests {
54    use super::*;
55    use crate::api::RedDBOptions;
56    use crate::health::HealthReport;
57    use crate::service_cli::{
58        TransportListenerFailure, TransportListenerState, TransportReadiness,
59    };
60
61    #[test]
62    fn server_options_default_http_body_limit_is_32_mib() {
63        assert_eq!(ServerOptions::default().max_body_bytes, 32 * 1024 * 1024);
64    }
65
66    #[test]
67    fn health_json_reports_transport_listeners() {
68        let runtime = RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime");
69        let mut options = ServerOptions::default();
70        options.transport_readiness = TransportReadiness {
71            active: vec![TransportListenerState {
72                transport: "grpc".to_string(),
73                bind_addr: "127.0.0.1:50051".to_string(),
74                explicit: true,
75            }],
76            failed: vec![TransportListenerFailure {
77                transport: "http".to_string(),
78                bind_addr: "127.0.0.1:5055".to_string(),
79                explicit: false,
80                reason: "http listener bind 127.0.0.1:5055: address in use".to_string(),
81            }],
82        };
83        let server = RedDBServer::with_options(runtime, options);
84
85        let payload = server.health_json_with_transport(&HealthReport::healthy());
86        let JsonValue::Object(root) = payload else {
87            panic!("health payload should be an object");
88        };
89        let Some(JsonValue::Object(listeners)) = root.get("transport_listeners") else {
90            panic!("health payload should include transport_listeners");
91        };
92        let Some(JsonValue::Array(active)) = listeners.get("active") else {
93            panic!("transport_listeners.active should be an array");
94        };
95        let Some(JsonValue::Array(failed)) = listeners.get("failed") else {
96            panic!("transport_listeners.failed should be an array");
97        };
98
99        assert_eq!(active.len(), 1);
100        assert_eq!(failed.len(), 1);
101    }
102}
103
104fn graph_projection_json(projection: &crate::PhysicalGraphProjection) -> JsonValue {
105    crate::presentation::admin_json::graph_projection_json(projection)
106}
107
108mod axum_edge;
109pub mod handlers_admin;
110mod handlers_admin_metrics;
111mod handlers_admin_status;
112mod handlers_ai;
113mod handlers_ai_model_cache;
114mod handlers_auth;
115mod handlers_backup;
116mod handlers_browser_auth;
117mod handlers_capabilities;
118mod handlers_collection_policy;
119mod handlers_ec;
120pub(crate) mod handlers_entity;
121mod handlers_failover;
122mod handlers_geo;
123mod handlers_graph;
124mod handlers_iam_policy;
125mod handlers_keyed;
126mod handlers_log;
127mod handlers_metrics;
128mod handlers_ops;
129mod handlers_ops_policy;
130mod ws_edge;
131// `pub(crate)` so the RedWire input-stream path (issue #764 / S5)
132// can reuse the canonical S4 INSERT builders / identifier checks
133// (`build_insert_sql`, `is_safe_sql_identifier`) rather than fork
134// the SQL-escaping logic.
135pub(crate) mod handlers_query;
136mod handlers_replication;
137mod handlers_topology;
138mod handlers_vcs;
139mod handlers_vector;
140pub mod header_escape_guard;
141pub mod http_connection_limiter;
142pub mod http_handler_metrics;
143pub mod http_limits;
144pub mod http_principal_limiter;
145pub mod ingest_pipeline;
146pub mod output_stream;
147mod patch_support;
148mod request_body;
149mod request_context;
150mod routing;
151mod serverless_support;
152pub mod tls;
153mod transport;
154
155use self::handlers_ai::*;
156use self::handlers_entity::*;
157use self::handlers_graph::*;
158use self::handlers_keyed::*;
159use self::handlers_metrics::*;
160use self::handlers_ops::*;
161use self::handlers_query::*;
162use self::http_connection_limiter::{
163    HandlerDeadline, HttpConnectionLimiter, MonotonicClock, SystemMonotonicClock,
164};
165use self::http_handler_metrics::{HttpHandlerMetrics, HttpRejectReason, HttpTransport};
166pub use self::http_limits::{
167    HttpLimitsCliInput, HttpLimitsResolved, DEFAULT_HANDLER_TIMEOUT_MS, DEFAULT_RETRY_AFTER_SECS,
168};
169use self::http_principal_limiter::PrincipalConnectionLimiter;
170use self::patch_support::*;
171use self::request_body::*;
172use self::routing::*;
173use self::serverless_support::*;
174use self::transport::*;
175
176/// PLAN.md Phase 6.2 — endpoint segregation. A given HTTP listener
177/// can serve either every public surface (`Public`, default) or a
178/// restricted slice (`AdminOnly`, `MetricsOnly`). The route filter at
179/// the top of `route()` consults this so a port bound only to
180/// loopback for admin work won't accidentally hand out DML.
181#[derive(Debug, Clone, Copy, PartialEq, Eq)]
182pub enum ServerSurface {
183    /// Everything routed normally (default — matches v0 behaviour).
184    Public,
185    /// Only `/admin/*`, `/metrics`, and `/health/*`. Other paths
186    /// return 404. Intended for `RED_ADMIN_BIND` operator listeners
187    /// which default to `127.0.0.1`.
188    AdminOnly,
189    /// Only `/metrics` and `/health/*`. Intended for
190    /// `RED_METRICS_BIND` Prometheus scrape ports that may be
191    /// exposed to non-admin networks.
192    MetricsOnly,
193}
194
195#[derive(Debug, Clone)]
196pub struct ServerOptions {
197    pub bind_addr: String,
198    pub max_body_bytes: usize,
199    pub read_timeout_ms: u64,
200    pub write_timeout_ms: u64,
201    pub max_scan_limit: usize,
202    /// Which subset of paths this listener serves. Defaults to
203    /// `Public`. Set to `AdminOnly` / `MetricsOnly` for dedicated
204    /// admin / scrape ports (PLAN.md Phase 6.2).
205    pub surface: ServerSurface,
206    pub transport_readiness: crate::service_cli::TransportReadiness,
207    /// Allowed `Origin` values for the RedWire-over-WSS browser endpoint
208    /// (issue #935, ADR 0036). WebSocket is not covered by CORS, so the
209    /// upgrade is gated on an explicit allowlist to block Cross-Site
210    /// WebSocket Hijacking. **Default-deny:** empty means the `/redwire`
211    /// route is not mounted at all — operators opt in by configuring at
212    /// least one origin. Matched exactly (scheme+host+port), e.g.
213    /// `https://app.example.com`.
214    pub websocket_allowed_origins: Vec<String>,
215}
216
217pub const DEFAULT_HTTP_MAX_BODY_BYTES: usize = 32 * 1024 * 1024;
218
219impl Default for ServerOptions {
220    fn default() -> Self {
221        Self {
222            bind_addr: "127.0.0.1:5055".to_string(),
223            max_body_bytes: DEFAULT_HTTP_MAX_BODY_BYTES,
224            read_timeout_ms: 5_000,
225            write_timeout_ms: 5_000,
226            max_scan_limit: 1_000,
227            surface: ServerSurface::Public,
228            transport_readiness: crate::service_cli::TransportReadiness::default(),
229            websocket_allowed_origins: Vec::new(),
230        }
231    }
232}
233
234/// Replication state exposed to the HTTP server.
235pub struct ServerReplicationState {
236    pub config: crate::replication::ReplicationConfig,
237    pub primary: Option<crate::replication::primary::PrimaryReplication>,
238}
239
240#[derive(Clone)]
241pub struct RedDBServer {
242    runtime: RedDBRuntime,
243    options: ServerOptions,
244    auth_store: Option<Arc<AuthStore>>,
245    replication: Option<Arc<ServerReplicationState>>,
246    /// Bounded handler-thread admission for the clear-text HTTP accept
247    /// loop (issue #570 slice 1). Cloned with the server; `Clone` of
248    /// `HttpConnectionLimiter` shares an `Arc` so every serve loop on
249    /// the same `RedDBServer` shares one cap.
250    http_limiter: HttpConnectionLimiter,
251    /// Per-handler total-time deadline (issue #570 slice 2). Each
252    /// clear-text handler thread arms a deadline at spawn and bails
253    /// with a best-effort 503 at coarse boundaries between request
254    /// parse, route dispatch, and response write. Hard-coded to 30s
255    /// here; the config knob lands in slice 5.
256    handler_timeout: Duration,
257    /// Monotonic clock the per-handler deadline (issue #621) is armed
258    /// against — the same [`MonotonicClock`] abstraction the limiter
259    /// uses. Production wires [`SystemMonotonicClock`] (real wall time);
260    /// tests inject a fake to drive timeout expiry deterministically
261    /// without `sleep()`. Shared via `Arc` so cloned server handles
262    /// (e.g. `serve_in_background`) read the same clock.
263    handler_clock: Arc<dyn MonotonicClock>,
264    /// Test-only synchronous sleep injected between route dispatch and
265    /// response write so an integration test can simulate a slow
266    /// downstream tripping the deadline. Default 0 (no-op). Shared via
267    /// `Arc` so a cloned `RedDBServer` (e.g. `serve_in_background`)
268    /// observes flips from the originating handle. Production callers
269    /// have no way to set this — the setter is `#[doc(hidden)]`.
270    slow_inject_ms: Arc<AtomicU64>,
271    /// Prometheus metrics for the HTTP handler-thread pool (issue
272    /// #573 slice 4). Records rejections (cap_exhausted /
273    /// handler_timeout) and per-handler duration histograms. Cloned
274    /// with the server via `Arc` so every serve loop on the same
275    /// `RedDBServer` writes to one set of counters.
276    http_metrics: HttpHandlerMetrics,
277    /// `Retry-After` value (seconds) emitted in the async edge's
278    /// capacity-reject 503 path (issue #574 slice 5). Read on the reject
279    /// path in `axum_edge`.
280    retry_after_secs: u64,
281    /// Issue #934 / PRD #930 — per-principal concurrent in-flight cap. The
282    /// global `http_limiter` bounds *total* in-flight work (async
283    /// backpressure); this bounds any *single* principal's share so one
284    /// abusive caller can't drain the whole global cap and starve the rest.
285    /// Consulted at the async edge after global admission; a principal over
286    /// its cap gets a structured 429 refusal. Shared via `Arc` (inside the
287    /// limiter) so cloned server handles enforce one set of counters.
288    principal_limiter: PrincipalConnectionLimiter,
289    /// Issue #761 / S2 — process-wide output-stream capacity registry.
290    /// Shared via `Arc` so cloned server handles (e.g.
291    /// `serve_in_background`) all enforce against one set of counters.
292    /// The HTTP NDJSON path acquires through this in
293    /// `try_route_streaming` before invoking the handler; the guard is
294    /// dropped on return so any stream-end path (success / mid-stream
295    /// error / snapshot expiry / panic unwind) releases the slot.
296    pub(crate) stream_capacity: Arc<output_stream::StreamCapacityRegistry>,
297    /// Issue #766 / S7 — resume coordinator ledger. Tracks
298    /// `(snapshot_lsn → opened_at_ms, ttl_ms)` for resume-eligibility
299    /// checks. Shared via `Arc` so cloned server handles see one
300    /// ledger.
301    pub(crate) lease_registry: Arc<output_stream::LeaseRegistry>,
302    /// Issue #807 / PRD #750 — `/query/stream` cursor registry. Holds the
303    /// opaque token → (snapshot pin, TTL, tenant, principal, query) entries
304    /// that let a client resume or reference a streamed read. Shared via
305    /// `Arc` so cloned server handles see one registry.
306    pub(crate) cursor_registry: Arc<output_stream::CursorRegistry>,
307}
308
309/// Default per-handler total-time budget (issue #571 slice 2).
310const DEFAULT_HANDLER_TIMEOUT: Duration = Duration::from_millis(30_000);
311
312#[derive(Debug, Clone, Copy, PartialEq, Eq)]
313enum ServerlessWarmupScope {
314    Indexes,
315    GraphProjections,
316    AnalyticsJobs,
317    NativeArtifacts,
318}
319
320#[derive(Debug, Clone, Copy, PartialEq, Eq)]
321enum DeploymentProfile {
322    Embedded,
323    Server,
324    Serverless,
325}
326
327fn percent_decode_path_segment(input: &str) -> Result<String, String> {
328    let bytes = input.as_bytes();
329    let mut out = Vec::with_capacity(bytes.len());
330    let mut index = 0;
331    while index < bytes.len() {
332        match bytes[index] {
333            b'%' => {
334                if index + 2 >= bytes.len() {
335                    return Err("truncated percent escape".to_string());
336                }
337                let high = hex_value(bytes[index + 1])
338                    .ok_or_else(|| "invalid percent escape".to_string())?;
339                let low = hex_value(bytes[index + 2])
340                    .ok_or_else(|| "invalid percent escape".to_string())?;
341                out.push((high << 4) | low);
342                index += 3;
343            }
344            byte => {
345                out.push(byte);
346                index += 1;
347            }
348        }
349    }
350    String::from_utf8(out).map_err(|_| "path segment is not valid UTF-8".to_string())
351}
352
353fn hex_value(byte: u8) -> Option<u8> {
354    match byte {
355        b'0'..=b'9' => Some(byte - b'0'),
356        b'a'..=b'f' => Some(byte - b'a' + 10),
357        b'A'..=b'F' => Some(byte - b'A' + 10),
358        _ => None,
359    }
360}
361
362#[derive(Debug, Clone)]
363struct ParsedQueryRequest {
364    query: String,
365    entity_types: Option<Vec<String>>,
366    capabilities: Option<Vec<String>>,
367    /// Optional positional `$N` bind parameters (#358). When `Some`, the
368    /// query handler runs the user_params binder before executing.
369    /// Absence preserves the legacy `query`-only behavior.
370    params: Option<Vec<Value>>,
371}
372
373#[derive(Debug, Clone, Copy)]
374enum PatchOperationType {
375    Set,
376    Replace,
377    Unset,
378}
379
380#[derive(Debug, Clone)]
381struct PatchOperation {
382    op: PatchOperationType,
383    path: Vec<String>,
384    value: Option<JsonValue>,
385}
386
387impl RedDBServer {
388    pub fn new(runtime: RedDBRuntime) -> Self {
389        Self::with_options(runtime, ServerOptions::default())
390    }
391
392    pub fn from_database_options(
393        db_options: RedDBOptions,
394        server_options: ServerOptions,
395    ) -> RedDBResult<Self> {
396        let runtime = RedDBRuntime::with_options(db_options)?;
397        Ok(Self::with_options(runtime, server_options))
398    }
399
400    pub fn with_options(runtime: RedDBRuntime, options: ServerOptions) -> Self {
401        Self {
402            runtime,
403            options,
404            auth_store: None,
405            replication: None,
406            http_limiter: HttpConnectionLimiter::with_default_cap(),
407            handler_timeout: DEFAULT_HANDLER_TIMEOUT,
408            handler_clock: Arc::new(SystemMonotonicClock::new()),
409            slow_inject_ms: Arc::new(AtomicU64::new(0)),
410            http_metrics: HttpHandlerMetrics::new(),
411            retry_after_secs: DEFAULT_RETRY_AFTER_SECS,
412            principal_limiter: PrincipalConnectionLimiter::new(
413                http_limits::DEFAULT_MAX_INFLIGHT_PER_PRINCIPAL,
414            ),
415            stream_capacity: output_stream::StreamCapacityRegistry::new(),
416            lease_registry: output_stream::LeaseRegistry::new(),
417            cursor_registry: output_stream::CursorRegistry::new(),
418        }
419    }
420
421    #[doc(hidden)]
422    pub fn stream_capacity(&self) -> &Arc<output_stream::StreamCapacityRegistry> {
423        &self.stream_capacity
424    }
425
426    #[doc(hidden)]
427    pub fn lease_registry(&self) -> &Arc<output_stream::LeaseRegistry> {
428        &self.lease_registry
429    }
430
431    #[doc(hidden)]
432    pub fn cursor_registry(&self) -> &Arc<output_stream::CursorRegistry> {
433        &self.cursor_registry
434    }
435
436    #[doc(hidden)]
437    pub fn http_metrics(&self) -> &HttpHandlerMetrics {
438        &self.http_metrics
439    }
440
441    /// Visible for tests. Lets the integration test in
442    /// `tests/http_connection_limiter.rs` saturate the cap and observe
443    /// `503 Service Unavailable` responses without spinning up
444    /// thousands of sockets.
445    #[doc(hidden)]
446    pub fn with_http_limiter_cap(mut self, cap: usize) -> Self {
447        self.http_limiter = HttpConnectionLimiter::new(cap);
448        self
449    }
450
451    /// Stamp resolved HTTP limits onto the server (issue #574 slice 5).
452    /// Replaces the limiter cap, the per-handler deadline, and the
453    /// `Retry-After` value used by the limiter's reject path. All
454    /// values are assumed validated by [`http_limits::resolve_http_limits`].
455    pub fn with_http_limits(mut self, limits: HttpLimitsResolved) -> Self {
456        self.http_limiter = HttpConnectionLimiter::new(limits.max_handlers);
457        self.handler_timeout = Duration::from_millis(limits.handler_timeout_ms);
458        self.retry_after_secs = limits.retry_after_secs;
459        self.principal_limiter = PrincipalConnectionLimiter::new(limits.max_inflight_per_principal);
460        self
461    }
462
463    /// Visible for tests. Override the per-principal concurrent in-flight
464    /// cap (issue #934) so the integration test can saturate one principal
465    /// and observe the structured 429 refusal without standing up hundreds
466    /// of real sockets. `0` disables the per-principal cap.
467    #[doc(hidden)]
468    pub fn with_principal_inflight_cap(mut self, cap: usize) -> Self {
469        self.principal_limiter = PrincipalConnectionLimiter::new(cap);
470        self
471    }
472
473    #[doc(hidden)]
474    pub fn principal_limiter(&self) -> &PrincipalConnectionLimiter {
475        &self.principal_limiter
476    }
477
478    #[doc(hidden)]
479    pub fn retry_after_secs(&self) -> u64 {
480        self.retry_after_secs
481    }
482
483    #[doc(hidden)]
484    pub fn http_limiter(&self) -> &HttpConnectionLimiter {
485        &self.http_limiter
486    }
487
488    /// Visible for tests. Override the per-handler total-time deadline
489    /// (issue #570 slice 2). Default 30s.
490    #[doc(hidden)]
491    pub fn with_handler_timeout(mut self, timeout: Duration) -> Self {
492        self.handler_timeout = timeout;
493        self
494    }
495
496    #[doc(hidden)]
497    pub fn handler_timeout(&self) -> Duration {
498        self.handler_timeout
499    }
500
501    /// Visible for tests. Override the clock the per-handler deadline
502    /// (issue #621) is armed against, so timeout expiry can be driven
503    /// deterministically without real sleeps. Default is the real
504    /// monotonic clock.
505    #[doc(hidden)]
506    pub fn with_handler_clock(mut self, clock: Arc<dyn MonotonicClock>) -> Self {
507        self.handler_clock = clock;
508        self
509    }
510
511    /// Test hook: set a synchronous sleep (in ms) inserted between
512    /// route dispatch and response write. The integration test for
513    /// slice 2 sets a value greater than `handler_timeout` to trip
514    /// the deadline, then resets to 0 to verify recovery. Shared via
515    /// `Arc<AtomicU64>` so cloned server handles see the same flip.
516    #[doc(hidden)]
517    pub fn set_test_slow_inject_ms(&self, ms: u64) {
518        self.slow_inject_ms.store(ms, Ordering::Relaxed);
519    }
520
521    /// Attach an `AuthStore` for HTTP-layer authentication.
522    /// Also injects the store into the runtime so that `Value::Secret`
523    /// auto-encrypt/decrypt can reach the vault AES key.
524    pub fn with_auth(mut self, auth_store: Arc<AuthStore>) -> Self {
525        self.runtime.set_auth_store(Arc::clone(&auth_store));
526        self.auth_store = Some(auth_store);
527        self
528    }
529
530    /// Attach replication state for status and snapshot endpoints.
531    pub fn with_replication(mut self, state: Arc<ServerReplicationState>) -> Self {
532        self.replication = Some(state);
533        self
534    }
535
536    /// Set the `Origin` allowlist that enables the RedWire-over-WSS
537    /// browser endpoint (issue #935, ADR 0036). A non-empty list mounts
538    /// the `/redwire` upgrade route on the TLS edge; an empty list leaves
539    /// it unmounted (default-deny).
540    pub fn with_websocket_allowed_origins(mut self, origins: Vec<String>) -> Self {
541        self.options.websocket_allowed_origins = origins;
542        self
543    }
544
545    /// The configured RedWire-over-WSS `Origin` allowlist (issue #935).
546    pub(crate) fn websocket_allowed_origins(&self) -> &[String] {
547        &self.options.websocket_allowed_origins
548    }
549
550    /// Enable the browser credential layer (issue #936, PRD #930) by
551    /// wiring a hybrid-token authority into the runtime. Once set, the
552    /// `/auth/browser/*` HTTP endpoints issue/rotate the access+refresh
553    /// pair and the RedWire WS handshake accepts the access JWT. Left
554    /// unset, the browser flow is inert (default-deny), matching the WS
555    /// endpoint's own opt-in posture.
556    pub fn with_browser_token_authority(
557        self,
558        authority: Arc<crate::auth::browser_token::BrowserTokenAuthority>,
559    ) -> Self {
560        self.runtime.set_browser_token_authority(Some(authority));
561        self
562    }
563
564    pub fn runtime(&self) -> &RedDBRuntime {
565        &self.runtime
566    }
567
568    pub fn options(&self) -> &ServerOptions {
569        &self.options
570    }
571
572    fn query_use_cases(&self) -> QueryUseCases<'_, RedDBRuntime> {
573        QueryUseCases::new(&self.runtime)
574    }
575
576    fn admin_use_cases(&self) -> AdminUseCases<'_, RedDBRuntime> {
577        AdminUseCases::new(&self.runtime)
578    }
579
580    fn entity_use_cases(&self) -> EntityUseCases<'_, RedDBRuntime> {
581        EntityUseCases::new(&self.runtime)
582    }
583
584    fn catalog_use_cases(&self) -> CatalogUseCases<'_, RedDBRuntime> {
585        CatalogUseCases::new(&self.runtime)
586    }
587
588    fn graph_use_cases(&self) -> GraphUseCases<'_, RedDBRuntime> {
589        GraphUseCases::new(&self.runtime)
590    }
591
592    fn native_use_cases(&self) -> NativeUseCases<'_, RedDBRuntime> {
593        NativeUseCases::new(&self.runtime)
594    }
595
596    fn tree_use_cases(&self) -> TreeUseCases<'_, RedDBRuntime> {
597        TreeUseCases::new(&self.runtime)
598    }
599
600    fn transport_readiness_json(&self) -> JsonValue {
601        let active = self
602            .options
603            .transport_readiness
604            .active
605            .iter()
606            .map(|listener| {
607                let mut object = Map::new();
608                object.insert(
609                    "transport".to_string(),
610                    JsonValue::String(listener.transport.clone()),
611                );
612                object.insert(
613                    "bind_addr".to_string(),
614                    JsonValue::String(listener.bind_addr.clone()),
615                );
616                object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
617                JsonValue::Object(object)
618            })
619            .collect();
620        let failed = self
621            .options
622            .transport_readiness
623            .failed
624            .iter()
625            .map(|listener| {
626                let mut object = Map::new();
627                object.insert(
628                    "transport".to_string(),
629                    JsonValue::String(listener.transport.clone()),
630                );
631                object.insert(
632                    "bind_addr".to_string(),
633                    JsonValue::String(listener.bind_addr.clone()),
634                );
635                object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
636                object.insert(
637                    "reason".to_string(),
638                    JsonValue::String(listener.reason.clone()),
639                );
640                JsonValue::Object(object)
641            })
642            .collect();
643
644        let mut object = Map::new();
645        object.insert("active".to_string(), JsonValue::Array(active));
646        object.insert("failed".to_string(), JsonValue::Array(failed));
647        JsonValue::Object(object)
648    }
649
650    fn handle_grpc_discovery(&self) -> HttpResponse {
651        let mut methods = Map::new();
652        methods.insert(
653            "query".to_string(),
654            JsonValue::String("reddb.v1.RedDB/Query".to_string()),
655        );
656        methods.insert(
657            "batch_query".to_string(),
658            JsonValue::String("reddb.v1.RedDB/BatchQuery".to_string()),
659        );
660        methods.insert(
661            "health".to_string(),
662            JsonValue::String("reddb.v1.RedDB/Health".to_string()),
663        );
664        methods.insert(
665            "prepare".to_string(),
666            JsonValue::String("reddb.v1.RedDB/Prepare".to_string()),
667        );
668        methods.insert(
669            "execute_prepared".to_string(),
670            JsonValue::String("reddb.v1.RedDB/ExecutePrepared".to_string()),
671        );
672
673        let mut examples = Map::new();
674        examples.insert(
675            "query".to_string(),
676            JsonValue::String(
677                "grpcurl -plaintext -d '{\"query\":\"SELECT 1\"}' 127.0.0.1:50051 reddb.v1.RedDB/Query"
678                    .to_string(),
679            ),
680        );
681        examples.insert(
682            "query_with_params".to_string(),
683            JsonValue::String(
684                "grpcurl -plaintext -d '{\"query\":\"SELECT $1 AS value\",\"params\":[{\"intValue\":42}]}' 127.0.0.1:50051 reddb.v1.RedDB/Query"
685                    .to_string(),
686            ),
687        );
688        examples.insert(
689            "health".to_string(),
690            JsonValue::String(
691                "grpcurl -plaintext -d '{}' 127.0.0.1:50051 reddb.v1.RedDB/Health".to_string(),
692            ),
693        );
694
695        let mut object = Map::new();
696        object.insert("ok".to_string(), JsonValue::Bool(true));
697        object.insert(
698            "service".to_string(),
699            JsonValue::String("reddb.v1.RedDB".to_string()),
700        );
701        object.insert(
702            "package".to_string(),
703            JsonValue::String("reddb.v1".to_string()),
704        );
705        object.insert(
706            "proto".to_string(),
707            JsonValue::String("crates/reddb-grpc-proto/proto/reddb.proto".to_string()),
708        );
709        object.insert("methods".to_string(), JsonValue::Object(methods));
710        object.insert("examples".to_string(), JsonValue::Object(examples));
711        object.insert(
712            "transport_listeners".to_string(),
713            self.transport_readiness_json(),
714        );
715        object.insert(
716            "hint".to_string(),
717            JsonValue::String(
718                "If grpcurl cannot list services, pass the proto file with -import-path crates/reddb-grpc-proto/proto -proto reddb.proto."
719                    .to_string(),
720            ),
721        );
722        json_response(200, JsonValue::Object(object))
723    }
724
725    fn handle_query_contract(&self) -> HttpResponse {
726        let mut examples = Map::new();
727        examples.insert(
728            "raw_sql".to_string(),
729            JsonValue::String("curl -sS http://127.0.0.1:8080/query -d 'SELECT 1'".to_string()),
730        );
731        examples.insert(
732            "json_query".to_string(),
733            JsonValue::String(
734                "curl -sS http://127.0.0.1:8080/query -H 'content-type: application/json' -d '{\"query\":\"SELECT 1\"}'"
735                    .to_string(),
736            ),
737        );
738        examples.insert(
739            "json_query_with_params".to_string(),
740            JsonValue::String(
741                "curl -sS http://127.0.0.1:8080/query -H 'content-type: application/json' -d '{\"query\":\"SELECT $1 AS value\",\"params\":[42]}'"
742                    .to_string(),
743            ),
744        );
745
746        let mut request_body = Map::new();
747        request_body.insert(
748            "query".to_string(),
749            JsonValue::String("required string".to_string()),
750        );
751        request_body.insert(
752            "params".to_string(),
753            JsonValue::String("optional array".to_string()),
754        );
755
756        let mut response_shape = Map::new();
757        response_shape.insert(
758            "columns".to_string(),
759            JsonValue::String("projected column names".to_string()),
760        );
761        response_shape.insert(
762            "records[].values".to_string(),
763            JsonValue::String("only projected values".to_string()),
764        );
765        response_shape.insert(
766            "records[].meta".to_string(),
767            JsonValue::String("internal metadata when present".to_string()),
768        );
769
770        let mut object = Map::new();
771        object.insert("ok".to_string(), JsonValue::Bool(false));
772        object.insert(
773            "code".to_string(),
774            JsonValue::String("method_not_allowed".to_string()),
775        );
776        object.insert(
777            "message".to_string(),
778            JsonValue::String("/query accepts POST requests".to_string()),
779        );
780        object.insert(
781            "hint".to_string(),
782            JsonValue::String(
783                "Send raw SQL in the body, or JSON with a string 'query' field.".to_string(),
784            ),
785        );
786        object.insert("method".to_string(), JsonValue::String("POST".to_string()));
787        object.insert("path".to_string(), JsonValue::String("/query".to_string()));
788        object.insert("request_body".to_string(), JsonValue::Object(request_body));
789        object.insert(
790            "response_shape".to_string(),
791            JsonValue::Object(response_shape),
792        );
793        object.insert("examples".to_string(), JsonValue::Object(examples));
794        object.insert(
795            "docs".to_string(),
796            JsonValue::String("https://reddb.io/docs/query".to_string()),
797        );
798
799        json_response(405, JsonValue::Object(object))
800            .with_header("Allow", http::HeaderValue::from_static("POST"))
801    }
802
803    fn handle_root_discovery(&self) -> HttpResponse {
804        let mut endpoints = Map::new();
805        endpoints.insert(
806            "health".to_string(),
807            JsonValue::String("GET /health".to_string()),
808        );
809        endpoints.insert(
810            "ready".to_string(),
811            JsonValue::String("GET /ready".to_string()),
812        );
813        endpoints.insert(
814            "query".to_string(),
815            JsonValue::String("POST /query".to_string()),
816        );
817        endpoints.insert(
818            "query_readiness".to_string(),
819            JsonValue::String("GET /ready/query".to_string()),
820        );
821        endpoints.insert(
822            "catalog".to_string(),
823            JsonValue::String("GET /catalog".to_string()),
824        );
825        endpoints.insert(
826            "deployment_profiles".to_string(),
827            JsonValue::String("GET /deployment/profiles".to_string()),
828        );
829
830        let mut examples = Map::new();
831        examples.insert(
832            "http_raw_sql".to_string(),
833            JsonValue::String("curl -sS http://127.0.0.1:8080/query -d 'SELECT 1'".to_string()),
834        );
835        examples.insert(
836            "http_json_query".to_string(),
837            JsonValue::String(
838                "curl -sS http://127.0.0.1:8080/query -H 'content-type: application/json' -d '{\"query\":\"SELECT 1\"}'"
839                    .to_string(),
840            ),
841        );
842        examples.insert(
843            "http_json_query_with_params".to_string(),
844            JsonValue::String(
845                "curl -sS http://127.0.0.1:8080/query -H 'content-type: application/json' -d '{\"query\":\"SELECT $1 AS value\",\"params\":[42]}'"
846                    .to_string(),
847            ),
848        );
849
850        let mut object = Map::new();
851        object.insert("ok".to_string(), JsonValue::Bool(true));
852        object.insert(
853            "service".to_string(),
854            JsonValue::String("reddb".to_string()),
855        );
856        object.insert(
857            "version".to_string(),
858            JsonValue::String(env!("CARGO_PKG_VERSION").to_string()),
859        );
860        object.insert("endpoints".to_string(), JsonValue::Object(endpoints));
861        object.insert("examples".to_string(), JsonValue::Object(examples));
862        object.insert(
863            "docs".to_string(),
864            JsonValue::String("https://reddb.io/docs".to_string()),
865        );
866        object.insert(
867            "transport_listeners".to_string(),
868            self.transport_readiness_json(),
869        );
870        json_response(200, JsonValue::Object(object))
871    }
872
873    fn health_json_with_transport(&self, report: &HealthReport) -> JsonValue {
874        let mut value = crate::presentation::ops_json::health_json(report);
875        if let JsonValue::Object(ref mut object) = value {
876            object.insert(
877                "transport_listeners".to_string(),
878                self.transport_readiness_json(),
879            );
880        }
881        value
882    }
883
884    pub fn serve(&self) -> io::Result<()> {
885        let listener = TcpListener::bind(&self.options.bind_addr)?;
886        self.serve_on(listener)
887    }
888
889    /// Serve the async axum/hyper HTTP edge (issue #931) on the given
890    /// listener until it errors fatally. A dedicated multi-threaded tokio
891    /// runtime drives the I/O; the synchronous disk-backed engine is
892    /// reached via `spawn_blocking`. This replaces the retired
893    /// thread-per-connection accept loop and its `(2*num_cpus)` thread
894    /// cap — idle keep-alive connections are now cheap parked tasks.
895    pub fn serve_on(&self, listener: TcpListener) -> io::Result<()> {
896        let runtime = axum_edge::build_edge_runtime()?;
897        runtime.block_on(
898            self.clone()
899                .serve_edge_on_std(listener, HttpTransport::Http),
900        )
901    }
902
903    /// Accept and serve a single connection to completion, then return.
904    /// Used by tests that want a one-shot HTTP server alongside another
905    /// transport.
906    pub fn serve_one_on(&self, listener: TcpListener) -> io::Result<()> {
907        let runtime = axum_edge::build_background_edge_runtime()?;
908        let server = self.clone();
909        runtime.block_on(async move {
910            listener.set_nonblocking(true)?;
911            let listener = tokio::net::TcpListener::from_std(listener)?;
912            let (stream, _peer) = listener.accept().await?;
913            server.serve_edge_one(stream).await;
914            Ok(())
915        })
916    }
917
918    pub fn serve_in_background(&self) -> thread::JoinHandle<io::Result<()>> {
919        let server = self.clone();
920        thread::spawn(move || server.serve())
921    }
922
923    pub fn serve_in_background_on(
924        &self,
925        listener: TcpListener,
926    ) -> thread::JoinHandle<io::Result<()>> {
927        let server = self.clone();
928        thread::spawn(move || {
929            let runtime = axum_edge::build_background_edge_runtime()?;
930            runtime.block_on(server.serve_edge_on_std(listener, HttpTransport::Http))
931        })
932    }
933
934    /// Serve TLS-wrapped HTTPS on the configured `bind_addr`. The
935    /// `tls_config` is shared across all connections (rustls
936    /// `ServerConfig` is `Send + Sync`).
937    pub fn serve_tls(&self, tls_config: std::sync::Arc<rustls::ServerConfig>) -> io::Result<()> {
938        let listener = TcpListener::bind(&self.options.bind_addr)?;
939        self.serve_tls_on(listener, tls_config)
940    }
941
942    pub fn serve_tls_on(
943        &self,
944        listener: TcpListener,
945        tls_config: std::sync::Arc<rustls::ServerConfig>,
946    ) -> io::Result<()> {
947        let runtime = axum_edge::build_edge_runtime()?;
948        let acceptor = axum_edge::tls_acceptor(tls_config);
949        runtime.block_on(self.clone().serve_edge_tls_on_std(
950            listener,
951            acceptor,
952            HttpTransport::Https,
953        ))
954    }
955
956    pub fn serve_tls_in_background(
957        &self,
958        tls_config: std::sync::Arc<rustls::ServerConfig>,
959    ) -> thread::JoinHandle<io::Result<()>> {
960        let server = self.clone();
961        thread::spawn(move || server.serve_tls(tls_config))
962    }
963
964    pub fn serve_tls_in_background_on(
965        &self,
966        listener: TcpListener,
967        tls_config: std::sync::Arc<rustls::ServerConfig>,
968    ) -> thread::JoinHandle<io::Result<()>> {
969        let server = self.clone();
970        thread::spawn(move || {
971            let runtime = axum_edge::build_background_edge_runtime()?;
972            let acceptor = axum_edge::tls_acceptor(tls_config);
973            runtime.block_on(server.serve_edge_tls_on_std(listener, acceptor, HttpTransport::Https))
974        })
975    }
976
977    fn handle_connection(&self, stream: TcpStream) -> io::Result<()> {
978        let started = Instant::now();
979        let result = self.handle_connection_inner(stream);
980        let elapsed = started.elapsed().as_secs_f64();
981        self.http_metrics
982            .record_duration(HttpTransport::Http, elapsed);
983        result
984    }
985
986    fn handle_connection_inner(&self, mut stream: TcpStream) -> io::Result<()> {
987        stream.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
988        stream.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;
989
990        // Issue #570 slice 2 / #621: arm a deadline at handler spawn and
991        // check at coarse boundaries. Armed against the injectable
992        // monotonic clock (#621) so timeout behaviour is deterministically
993        // testable without real sleeps; production wires the real clock,
994        // so this tracks wall time. No hard pre-emption — a thread blocked
995        // inside a true syscall is still bounded only by the per-socket
996        // read/write timeouts.
997        let deadline = HandlerDeadline::arm(Arc::clone(&self.handler_clock), self.handler_timeout);
998
999        let request = HttpRequest::read_from(&mut stream, self.options.max_body_bytes)?;
1000
1001        // Boundary (a): between request parse and route dispatch.
1002        if deadline.expired() {
1003            self.http_metrics
1004                .record_reject(HttpTransport::Http, HttpRejectReason::HandlerTimeout);
1005            Self::write_handler_timeout_503(&mut stream);
1006            return Ok(());
1007        }
1008
1009        if self.try_route_streaming(&request, &mut stream)? {
1010            return Ok(());
1011        }
1012        let response = self.route(request);
1013
1014        // Test-only injected slow downstream (issue #570 slice 2
1015        // integration test). Production builds set this to 0, so this
1016        // is a single relaxed atomic load on the hot path.
1017        let inject_ms = self.slow_inject_ms.load(Ordering::Relaxed);
1018        if inject_ms > 0 {
1019            thread::sleep(Duration::from_millis(inject_ms));
1020        }
1021
1022        // Boundary (b): between route dispatch and response write.
1023        if deadline.expired() {
1024            self.http_metrics
1025                .record_reject(HttpTransport::Http, HttpRejectReason::HandlerTimeout);
1026            Self::write_handler_timeout_503(&mut stream);
1027            return Ok(());
1028        }
1029
1030        stream.write_all(&response.to_http_bytes())?;
1031        stream.flush()?;
1032        Ok(())
1033    }
1034
1035    /// Best-effort 503 emitted when the per-handler deadline expires
1036    /// at a coarse boundary. Writes are swallowed — the caller has
1037    /// already exceeded its budget, so we do not propagate write
1038    /// errors. Permit drop happens on the handler thread's normal
1039    /// exit path.
1040    fn write_handler_timeout_503<S: Write>(stream: &mut S) {
1041        const RESPONSE: &[u8] = b"HTTP/1.1 503 Service Unavailable\r\n\
1042            Connection: close\r\n\
1043            Content-Length: 0\r\n\
1044            \r\n";
1045        let _ = stream.write_all(RESPONSE);
1046        let _ = stream.flush();
1047    }
1048}