Skip to main content

reddb_server/
server.rs

1//! Minimal HTTP server for RedDB management and remote access.
2
3pub(crate) use crate::application::json_input::{
4    json_bool_field, json_f32_field, json_string_field, json_usize_field,
5};
6pub(crate) use crate::application::{
7    AdminUseCases, CatalogUseCases, CreateDocumentInput, CreateEdgeInput, CreateEntityOutput,
8    CreateKvInput, CreateNodeEmbeddingInput, CreateNodeGraphLinkInput, CreateNodeInput,
9    CreateNodeTableLinkInput, CreateRowInput, CreateVectorInput, DeleteEntityInput, EntityUseCases,
10    ExecuteQueryInput, ExplainQueryInput, GraphCentralityInput, GraphClusteringInput,
11    GraphCommunitiesInput, GraphComponentsInput, GraphCyclesInput, GraphHitsInput,
12    GraphNeighborhoodInput, GraphPersonalizedPageRankInput, GraphShortestPathInput,
13    GraphTopologicalSortInput, GraphTraversalInput, GraphUseCases, InspectNativeArtifactInput,
14    NativeUseCases, PatchEntityInput, PatchEntityOperation, PatchEntityOperationType,
15    QueryUseCases, SearchHybridInput, SearchIvfInput, SearchMultimodalInput, SearchSimilarInput,
16    SearchTextInput, TreeUseCases,
17};
18use std::collections::{BTreeMap, HashMap};
19use std::io::{self, Read, Write};
20use std::net::{TcpListener, TcpStream};
21use std::sync::atomic::{AtomicU64, Ordering};
22use std::thread;
23use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
24
25use std::sync::Arc;
26
27use crate::api::{RedDBError, RedDBOptions, RedDBResult};
28use crate::auth::store::AuthStore;
29use crate::catalog::{CatalogModelSnapshot, CollectionDescriptor, CollectionModel, SchemaMode};
30use crate::health::{HealthProvider, HealthReport, HealthState};
31use crate::json::{parse_json, to_vec as json_to_vec, Map, Value as JsonValue};
32use crate::runtime::{
33    RedDBRuntime, RuntimeFilter, RuntimeFilterValue, RuntimeGraphCentralityAlgorithm,
34    RuntimeGraphCentralityResult, RuntimeGraphClusteringResult, RuntimeGraphCommunityAlgorithm,
35    RuntimeGraphCommunityResult, RuntimeGraphComponentsMode, RuntimeGraphComponentsResult,
36    RuntimeGraphCyclesResult, RuntimeGraphDirection, RuntimeGraphHitsResult,
37    RuntimeGraphNeighborhoodResult, RuntimeGraphPathAlgorithm, RuntimeGraphPathResult,
38    RuntimeGraphPattern, RuntimeGraphProjection, RuntimeGraphTopologicalSortResult,
39    RuntimeGraphTraversalResult, RuntimeGraphTraversalStrategy, RuntimeIvfSearchResult,
40    RuntimeQueryWeights, RuntimeStats, ScanCursor, ScanPage,
41};
42use crate::storage::schema::Value;
43use crate::storage::unified::devx::refs::{NodeRef, TableRef, VectorRef};
44use crate::storage::unified::dsl::{MatchComponents, QueryResult as DslQueryResult};
45use crate::storage::unified::{MetadataValue, RefTarget, SparseVector};
46use crate::storage::{CrossRef, EntityData, EntityId, EntityKind, SimilarResult, UnifiedEntity};
47
48fn analytics_job_json(job: &crate::PhysicalAnalyticsJob) -> JsonValue {
49    crate::presentation::admin_json::analytics_job_json(job)
50}
51
52#[cfg(test)]
53mod tests {
54    use super::*;
55    use crate::api::RedDBOptions;
56    use crate::health::HealthReport;
57    use crate::service_cli::{
58        TransportListenerFailure, TransportListenerState, TransportReadiness,
59    };
60
61    #[test]
62    fn server_options_default_http_body_limit_is_32_mib() {
63        assert_eq!(ServerOptions::default().max_body_bytes, 32 * 1024 * 1024);
64    }
65
66    #[test]
67    fn health_json_reports_transport_listeners() {
68        let runtime = RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime");
69        let mut options = ServerOptions::default();
70        options.transport_readiness = TransportReadiness {
71            active: vec![TransportListenerState {
72                transport: "grpc".to_string(),
73                bind_addr: "127.0.0.1:50051".to_string(),
74                explicit: true,
75            }],
76            failed: vec![TransportListenerFailure {
77                transport: "http".to_string(),
78                bind_addr: "127.0.0.1:5055".to_string(),
79                explicit: false,
80                reason: "http listener bind 127.0.0.1:5055: address in use".to_string(),
81            }],
82        };
83        let server = RedDBServer::with_options(runtime, options);
84
85        let payload = server.health_json_with_transport(&HealthReport::healthy());
86        let JsonValue::Object(root) = payload else {
87            panic!("health payload should be an object");
88        };
89        let Some(JsonValue::Object(listeners)) = root.get("transport_listeners") else {
90            panic!("health payload should include transport_listeners");
91        };
92        let Some(JsonValue::Array(active)) = listeners.get("active") else {
93            panic!("transport_listeners.active should be an array");
94        };
95        let Some(JsonValue::Array(failed)) = listeners.get("failed") else {
96            panic!("transport_listeners.failed should be an array");
97        };
98
99        assert_eq!(active.len(), 1);
100        assert_eq!(failed.len(), 1);
101    }
102}
103
104fn graph_projection_json(projection: &crate::PhysicalGraphProjection) -> JsonValue {
105    crate::presentation::admin_json::graph_projection_json(projection)
106}
107
108mod axum_edge;
109mod ws_edge;
110pub mod handlers_admin;
111mod handlers_ai;
112mod handlers_ai_model_cache;
113mod handlers_auth;
114mod handlers_backup;
115mod handlers_collection_policy;
116mod handlers_ec;
117pub(crate) mod handlers_entity;
118mod handlers_geo;
119mod handlers_graph;
120mod handlers_keyed;
121mod handlers_log;
122mod handlers_metrics;
123mod handlers_ops;
124mod handlers_ops_policy;
125// `pub(crate)` so the RedWire input-stream path (issue #764 / S5)
126// can reuse the canonical S4 INSERT builders / identifier checks
127// (`build_insert_sql`, `is_safe_sql_identifier`) rather than fork
128// the SQL-escaping logic.
129pub(crate) mod handlers_query;
130mod handlers_replication;
131mod handlers_topology;
132mod handlers_vcs;
133mod handlers_vector;
134pub mod header_escape_guard;
135pub mod http_connection_limiter;
136pub mod http_handler_metrics;
137pub mod http_limits;
138pub mod http_principal_limiter;
139pub mod ingest_pipeline;
140pub mod output_stream;
141mod patch_support;
142mod request_body;
143mod request_context;
144mod routing;
145mod serverless_support;
146pub mod tls;
147mod transport;
148
149use self::handlers_ai::*;
150use self::handlers_entity::*;
151use self::handlers_graph::*;
152use self::handlers_keyed::*;
153use self::handlers_metrics::*;
154use self::handlers_ops::*;
155use self::handlers_query::*;
156use self::http_connection_limiter::{
157    HandlerDeadline, HttpConnectionLimiter, MonotonicClock, SystemMonotonicClock,
158};
159use self::http_handler_metrics::{HttpHandlerMetrics, HttpRejectReason, HttpTransport};
160pub use self::http_limits::{
161    HttpLimitsCliInput, HttpLimitsResolved, DEFAULT_HANDLER_TIMEOUT_MS, DEFAULT_RETRY_AFTER_SECS,
162};
163use self::http_principal_limiter::PrincipalConnectionLimiter;
164use self::patch_support::*;
165use self::request_body::*;
166use self::routing::*;
167use self::serverless_support::*;
168use self::transport::*;
169
170/// PLAN.md Phase 6.2 — endpoint segregation. A given HTTP listener
171/// can serve either every public surface (`Public`, default) or a
172/// restricted slice (`AdminOnly`, `MetricsOnly`). The route filter at
173/// the top of `route()` consults this so a port bound only to
174/// loopback for admin work won't accidentally hand out DML.
175#[derive(Debug, Clone, Copy, PartialEq, Eq)]
176pub enum ServerSurface {
177    /// Everything routed normally (default — matches v0 behaviour).
178    Public,
179    /// Only `/admin/*`, `/metrics`, and `/health/*`. Other paths
180    /// return 404. Intended for `RED_ADMIN_BIND` operator listeners
181    /// which default to `127.0.0.1`.
182    AdminOnly,
183    /// Only `/metrics` and `/health/*`. Intended for
184    /// `RED_METRICS_BIND` Prometheus scrape ports that may be
185    /// exposed to non-admin networks.
186    MetricsOnly,
187}
188
189#[derive(Debug, Clone)]
190pub struct ServerOptions {
191    pub bind_addr: String,
192    pub max_body_bytes: usize,
193    pub read_timeout_ms: u64,
194    pub write_timeout_ms: u64,
195    pub max_scan_limit: usize,
196    /// Which subset of paths this listener serves. Defaults to
197    /// `Public`. Set to `AdminOnly` / `MetricsOnly` for dedicated
198    /// admin / scrape ports (PLAN.md Phase 6.2).
199    pub surface: ServerSurface,
200    pub transport_readiness: crate::service_cli::TransportReadiness,
201    /// Allowed `Origin` values for the RedWire-over-WSS browser endpoint
202    /// (issue #935, ADR 0036). WebSocket is not covered by CORS, so the
203    /// upgrade is gated on an explicit allowlist to block Cross-Site
204    /// WebSocket Hijacking. **Default-deny:** empty means the `/redwire`
205    /// route is not mounted at all — operators opt in by configuring at
206    /// least one origin. Matched exactly (scheme+host+port), e.g.
207    /// `https://app.example.com`.
208    pub websocket_allowed_origins: Vec<String>,
209}
210
211pub const DEFAULT_HTTP_MAX_BODY_BYTES: usize = 32 * 1024 * 1024;
212
213impl Default for ServerOptions {
214    fn default() -> Self {
215        Self {
216            bind_addr: "127.0.0.1:5055".to_string(),
217            max_body_bytes: DEFAULT_HTTP_MAX_BODY_BYTES,
218            read_timeout_ms: 5_000,
219            write_timeout_ms: 5_000,
220            max_scan_limit: 1_000,
221            surface: ServerSurface::Public,
222            transport_readiness: crate::service_cli::TransportReadiness::default(),
223            websocket_allowed_origins: Vec::new(),
224        }
225    }
226}
227
228/// Replication state exposed to the HTTP server.
229pub struct ServerReplicationState {
230    pub config: crate::replication::ReplicationConfig,
231    pub primary: Option<crate::replication::primary::PrimaryReplication>,
232}
233
234#[derive(Clone)]
235pub struct RedDBServer {
236    runtime: RedDBRuntime,
237    options: ServerOptions,
238    auth_store: Option<Arc<AuthStore>>,
239    replication: Option<Arc<ServerReplicationState>>,
240    /// Bounded handler-thread admission for the clear-text HTTP accept
241    /// loop (issue #570 slice 1). Cloned with the server; `Clone` of
242    /// `HttpConnectionLimiter` shares an `Arc` so every serve loop on
243    /// the same `RedDBServer` shares one cap.
244    http_limiter: HttpConnectionLimiter,
245    /// Per-handler total-time deadline (issue #570 slice 2). Each
246    /// clear-text handler thread arms a deadline at spawn and bails
247    /// with a best-effort 503 at coarse boundaries between request
248    /// parse, route dispatch, and response write. Hard-coded to 30s
249    /// here; the config knob lands in slice 5.
250    handler_timeout: Duration,
251    /// Monotonic clock the per-handler deadline (issue #621) is armed
252    /// against — the same [`MonotonicClock`] abstraction the limiter
253    /// uses. Production wires [`SystemMonotonicClock`] (real wall time);
254    /// tests inject a fake to drive timeout expiry deterministically
255    /// without `sleep()`. Shared via `Arc` so cloned server handles
256    /// (e.g. `serve_in_background`) read the same clock.
257    handler_clock: Arc<dyn MonotonicClock>,
258    /// Test-only synchronous sleep injected between route dispatch and
259    /// response write so an integration test can simulate a slow
260    /// downstream tripping the deadline. Default 0 (no-op). Shared via
261    /// `Arc` so a cloned `RedDBServer` (e.g. `serve_in_background`)
262    /// observes flips from the originating handle. Production callers
263    /// have no way to set this — the setter is `#[doc(hidden)]`.
264    slow_inject_ms: Arc<AtomicU64>,
265    /// Prometheus metrics for the HTTP handler-thread pool (issue
266    /// #573 slice 4). Records rejections (cap_exhausted /
267    /// handler_timeout) and per-handler duration histograms. Cloned
268    /// with the server via `Arc` so every serve loop on the same
269    /// `RedDBServer` writes to one set of counters.
270    http_metrics: HttpHandlerMetrics,
271    /// `Retry-After` value (seconds) emitted in the async edge's
272    /// capacity-reject 503 path (issue #574 slice 5). Read on the reject
273    /// path in `axum_edge`.
274    retry_after_secs: u64,
275    /// Issue #934 / PRD #930 — per-principal concurrent in-flight cap. The
276    /// global `http_limiter` bounds *total* in-flight work (async
277    /// backpressure); this bounds any *single* principal's share so one
278    /// abusive caller can't drain the whole global cap and starve the rest.
279    /// Consulted at the async edge after global admission; a principal over
280    /// its cap gets a structured 429 refusal. Shared via `Arc` (inside the
281    /// limiter) so cloned server handles enforce one set of counters.
282    principal_limiter: PrincipalConnectionLimiter,
283    /// Issue #761 / S2 — process-wide output-stream capacity registry.
284    /// Shared via `Arc` so cloned server handles (e.g.
285    /// `serve_in_background`) all enforce against one set of counters.
286    /// The HTTP NDJSON path acquires through this in
287    /// `try_route_streaming` before invoking the handler; the guard is
288    /// dropped on return so any stream-end path (success / mid-stream
289    /// error / snapshot expiry / panic unwind) releases the slot.
290    pub(crate) stream_capacity: Arc<output_stream::StreamCapacityRegistry>,
291    /// Issue #766 / S7 — resume coordinator ledger. Tracks
292    /// `(snapshot_lsn → opened_at_ms, ttl_ms)` for resume-eligibility
293    /// checks. Shared via `Arc` so cloned server handles see one
294    /// ledger.
295    pub(crate) lease_registry: Arc<output_stream::LeaseRegistry>,
296    /// Issue #807 / PRD #750 — `/query/stream` cursor registry. Holds the
297    /// opaque token → (snapshot pin, TTL, tenant, principal, query) entries
298    /// that let a client resume or reference a streamed read. Shared via
299    /// `Arc` so cloned server handles see one registry.
300    pub(crate) cursor_registry: Arc<output_stream::CursorRegistry>,
301}
302
303/// Default per-handler total-time budget (issue #571 slice 2).
304const DEFAULT_HANDLER_TIMEOUT: Duration = Duration::from_millis(30_000);
305
306#[derive(Debug, Clone, Copy, PartialEq, Eq)]
307enum ServerlessWarmupScope {
308    Indexes,
309    GraphProjections,
310    AnalyticsJobs,
311    NativeArtifacts,
312}
313
314#[derive(Debug, Clone, Copy, PartialEq, Eq)]
315enum DeploymentProfile {
316    Embedded,
317    Server,
318    Serverless,
319}
320
321fn percent_decode_path_segment(input: &str) -> Result<String, String> {
322    let bytes = input.as_bytes();
323    let mut out = Vec::with_capacity(bytes.len());
324    let mut index = 0;
325    while index < bytes.len() {
326        match bytes[index] {
327            b'%' => {
328                if index + 2 >= bytes.len() {
329                    return Err("truncated percent escape".to_string());
330                }
331                let high = hex_value(bytes[index + 1])
332                    .ok_or_else(|| "invalid percent escape".to_string())?;
333                let low = hex_value(bytes[index + 2])
334                    .ok_or_else(|| "invalid percent escape".to_string())?;
335                out.push((high << 4) | low);
336                index += 3;
337            }
338            byte => {
339                out.push(byte);
340                index += 1;
341            }
342        }
343    }
344    String::from_utf8(out).map_err(|_| "path segment is not valid UTF-8".to_string())
345}
346
347fn hex_value(byte: u8) -> Option<u8> {
348    match byte {
349        b'0'..=b'9' => Some(byte - b'0'),
350        b'a'..=b'f' => Some(byte - b'a' + 10),
351        b'A'..=b'F' => Some(byte - b'A' + 10),
352        _ => None,
353    }
354}
355
356#[derive(Debug, Clone)]
357struct ParsedQueryRequest {
358    query: String,
359    entity_types: Option<Vec<String>>,
360    capabilities: Option<Vec<String>>,
361    /// Optional positional `$N` bind parameters (#358). When `Some`, the
362    /// query handler runs the user_params binder before executing.
363    /// Absence preserves the legacy `query`-only behavior.
364    params: Option<Vec<Value>>,
365}
366
367#[derive(Debug, Clone, Copy)]
368enum PatchOperationType {
369    Set,
370    Replace,
371    Unset,
372}
373
374#[derive(Debug, Clone)]
375struct PatchOperation {
376    op: PatchOperationType,
377    path: Vec<String>,
378    value: Option<JsonValue>,
379}
380
381impl RedDBServer {
382    pub fn new(runtime: RedDBRuntime) -> Self {
383        Self::with_options(runtime, ServerOptions::default())
384    }
385
386    pub fn from_database_options(
387        db_options: RedDBOptions,
388        server_options: ServerOptions,
389    ) -> RedDBResult<Self> {
390        let runtime = RedDBRuntime::with_options(db_options)?;
391        Ok(Self::with_options(runtime, server_options))
392    }
393
394    pub fn with_options(runtime: RedDBRuntime, options: ServerOptions) -> Self {
395        Self {
396            runtime,
397            options,
398            auth_store: None,
399            replication: None,
400            http_limiter: HttpConnectionLimiter::with_default_cap(),
401            handler_timeout: DEFAULT_HANDLER_TIMEOUT,
402            handler_clock: Arc::new(SystemMonotonicClock::new()),
403            slow_inject_ms: Arc::new(AtomicU64::new(0)),
404            http_metrics: HttpHandlerMetrics::new(),
405            retry_after_secs: DEFAULT_RETRY_AFTER_SECS,
406            principal_limiter: PrincipalConnectionLimiter::new(
407                http_limits::DEFAULT_MAX_INFLIGHT_PER_PRINCIPAL,
408            ),
409            stream_capacity: output_stream::StreamCapacityRegistry::new(),
410            lease_registry: output_stream::LeaseRegistry::new(),
411            cursor_registry: output_stream::CursorRegistry::new(),
412        }
413    }
414
415    #[doc(hidden)]
416    pub fn stream_capacity(&self) -> &Arc<output_stream::StreamCapacityRegistry> {
417        &self.stream_capacity
418    }
419
420    #[doc(hidden)]
421    pub fn lease_registry(&self) -> &Arc<output_stream::LeaseRegistry> {
422        &self.lease_registry
423    }
424
425    #[doc(hidden)]
426    pub fn cursor_registry(&self) -> &Arc<output_stream::CursorRegistry> {
427        &self.cursor_registry
428    }
429
430    #[doc(hidden)]
431    pub fn http_metrics(&self) -> &HttpHandlerMetrics {
432        &self.http_metrics
433    }
434
435    /// Visible for tests. Lets the integration test in
436    /// `tests/http_connection_limiter.rs` saturate the cap and observe
437    /// `503 Service Unavailable` responses without spinning up
438    /// thousands of sockets.
439    #[doc(hidden)]
440    pub fn with_http_limiter_cap(mut self, cap: usize) -> Self {
441        self.http_limiter = HttpConnectionLimiter::new(cap);
442        self
443    }
444
445    /// Stamp resolved HTTP limits onto the server (issue #574 slice 5).
446    /// Replaces the limiter cap, the per-handler deadline, and the
447    /// `Retry-After` value used by the limiter's reject path. All
448    /// values are assumed validated by [`http_limits::resolve_http_limits`].
449    pub fn with_http_limits(mut self, limits: HttpLimitsResolved) -> Self {
450        self.http_limiter = HttpConnectionLimiter::new(limits.max_handlers);
451        self.handler_timeout = Duration::from_millis(limits.handler_timeout_ms);
452        self.retry_after_secs = limits.retry_after_secs;
453        self.principal_limiter = PrincipalConnectionLimiter::new(limits.max_inflight_per_principal);
454        self
455    }
456
457    /// Visible for tests. Override the per-principal concurrent in-flight
458    /// cap (issue #934) so the integration test can saturate one principal
459    /// and observe the structured 429 refusal without standing up hundreds
460    /// of real sockets. `0` disables the per-principal cap.
461    #[doc(hidden)]
462    pub fn with_principal_inflight_cap(mut self, cap: usize) -> Self {
463        self.principal_limiter = PrincipalConnectionLimiter::new(cap);
464        self
465    }
466
467    #[doc(hidden)]
468    pub fn principal_limiter(&self) -> &PrincipalConnectionLimiter {
469        &self.principal_limiter
470    }
471
472    #[doc(hidden)]
473    pub fn retry_after_secs(&self) -> u64 {
474        self.retry_after_secs
475    }
476
477    #[doc(hidden)]
478    pub fn http_limiter(&self) -> &HttpConnectionLimiter {
479        &self.http_limiter
480    }
481
482    /// Visible for tests. Override the per-handler total-time deadline
483    /// (issue #570 slice 2). Default 30s.
484    #[doc(hidden)]
485    pub fn with_handler_timeout(mut self, timeout: Duration) -> Self {
486        self.handler_timeout = timeout;
487        self
488    }
489
490    #[doc(hidden)]
491    pub fn handler_timeout(&self) -> Duration {
492        self.handler_timeout
493    }
494
495    /// Visible for tests. Override the clock the per-handler deadline
496    /// (issue #621) is armed against, so timeout expiry can be driven
497    /// deterministically without real sleeps. Default is the real
498    /// monotonic clock.
499    #[doc(hidden)]
500    pub fn with_handler_clock(mut self, clock: Arc<dyn MonotonicClock>) -> Self {
501        self.handler_clock = clock;
502        self
503    }
504
505    /// Test hook: set a synchronous sleep (in ms) inserted between
506    /// route dispatch and response write. The integration test for
507    /// slice 2 sets a value greater than `handler_timeout` to trip
508    /// the deadline, then resets to 0 to verify recovery. Shared via
509    /// `Arc<AtomicU64>` so cloned server handles see the same flip.
510    #[doc(hidden)]
511    pub fn set_test_slow_inject_ms(&self, ms: u64) {
512        self.slow_inject_ms.store(ms, Ordering::Relaxed);
513    }
514
515    /// Attach an `AuthStore` for HTTP-layer authentication.
516    /// Also injects the store into the runtime so that `Value::Secret`
517    /// auto-encrypt/decrypt can reach the vault AES key.
518    pub fn with_auth(mut self, auth_store: Arc<AuthStore>) -> Self {
519        self.runtime.set_auth_store(Arc::clone(&auth_store));
520        self.auth_store = Some(auth_store);
521        self
522    }
523
524    /// Attach replication state for status and snapshot endpoints.
525    pub fn with_replication(mut self, state: Arc<ServerReplicationState>) -> Self {
526        self.replication = Some(state);
527        self
528    }
529
530    /// Set the `Origin` allowlist that enables the RedWire-over-WSS
531    /// browser endpoint (issue #935, ADR 0036). A non-empty list mounts
532    /// the `/redwire` upgrade route on the TLS edge; an empty list leaves
533    /// it unmounted (default-deny).
534    pub fn with_websocket_allowed_origins(mut self, origins: Vec<String>) -> Self {
535        self.options.websocket_allowed_origins = origins;
536        self
537    }
538
539    /// The configured RedWire-over-WSS `Origin` allowlist (issue #935).
540    pub(crate) fn websocket_allowed_origins(&self) -> &[String] {
541        &self.options.websocket_allowed_origins
542    }
543
544    pub fn runtime(&self) -> &RedDBRuntime {
545        &self.runtime
546    }
547
548    pub fn options(&self) -> &ServerOptions {
549        &self.options
550    }
551
552    fn query_use_cases(&self) -> QueryUseCases<'_, RedDBRuntime> {
553        QueryUseCases::new(&self.runtime)
554    }
555
556    fn admin_use_cases(&self) -> AdminUseCases<'_, RedDBRuntime> {
557        AdminUseCases::new(&self.runtime)
558    }
559
560    fn entity_use_cases(&self) -> EntityUseCases<'_, RedDBRuntime> {
561        EntityUseCases::new(&self.runtime)
562    }
563
564    fn catalog_use_cases(&self) -> CatalogUseCases<'_, RedDBRuntime> {
565        CatalogUseCases::new(&self.runtime)
566    }
567
568    fn graph_use_cases(&self) -> GraphUseCases<'_, RedDBRuntime> {
569        GraphUseCases::new(&self.runtime)
570    }
571
572    fn native_use_cases(&self) -> NativeUseCases<'_, RedDBRuntime> {
573        NativeUseCases::new(&self.runtime)
574    }
575
576    fn tree_use_cases(&self) -> TreeUseCases<'_, RedDBRuntime> {
577        TreeUseCases::new(&self.runtime)
578    }
579
580    fn transport_readiness_json(&self) -> JsonValue {
581        let active = self
582            .options
583            .transport_readiness
584            .active
585            .iter()
586            .map(|listener| {
587                let mut object = Map::new();
588                object.insert(
589                    "transport".to_string(),
590                    JsonValue::String(listener.transport.clone()),
591                );
592                object.insert(
593                    "bind_addr".to_string(),
594                    JsonValue::String(listener.bind_addr.clone()),
595                );
596                object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
597                JsonValue::Object(object)
598            })
599            .collect();
600        let failed = self
601            .options
602            .transport_readiness
603            .failed
604            .iter()
605            .map(|listener| {
606                let mut object = Map::new();
607                object.insert(
608                    "transport".to_string(),
609                    JsonValue::String(listener.transport.clone()),
610                );
611                object.insert(
612                    "bind_addr".to_string(),
613                    JsonValue::String(listener.bind_addr.clone()),
614                );
615                object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
616                object.insert(
617                    "reason".to_string(),
618                    JsonValue::String(listener.reason.clone()),
619                );
620                JsonValue::Object(object)
621            })
622            .collect();
623
624        let mut object = Map::new();
625        object.insert("active".to_string(), JsonValue::Array(active));
626        object.insert("failed".to_string(), JsonValue::Array(failed));
627        JsonValue::Object(object)
628    }
629
630    fn handle_grpc_discovery(&self) -> HttpResponse {
631        let mut methods = Map::new();
632        methods.insert(
633            "query".to_string(),
634            JsonValue::String("reddb.v1.RedDB/Query".to_string()),
635        );
636        methods.insert(
637            "batch_query".to_string(),
638            JsonValue::String("reddb.v1.RedDB/BatchQuery".to_string()),
639        );
640        methods.insert(
641            "health".to_string(),
642            JsonValue::String("reddb.v1.RedDB/Health".to_string()),
643        );
644        methods.insert(
645            "prepare".to_string(),
646            JsonValue::String("reddb.v1.RedDB/Prepare".to_string()),
647        );
648        methods.insert(
649            "execute_prepared".to_string(),
650            JsonValue::String("reddb.v1.RedDB/ExecutePrepared".to_string()),
651        );
652
653        let mut examples = Map::new();
654        examples.insert(
655            "query".to_string(),
656            JsonValue::String(
657                "grpcurl -plaintext -d '{\"query\":\"SELECT 1\"}' 127.0.0.1:50051 reddb.v1.RedDB/Query"
658                    .to_string(),
659            ),
660        );
661        examples.insert(
662            "query_with_params".to_string(),
663            JsonValue::String(
664                "grpcurl -plaintext -d '{\"query\":\"SELECT $1 AS value\",\"params\":[{\"intValue\":42}]}' 127.0.0.1:50051 reddb.v1.RedDB/Query"
665                    .to_string(),
666            ),
667        );
668        examples.insert(
669            "health".to_string(),
670            JsonValue::String(
671                "grpcurl -plaintext -d '{}' 127.0.0.1:50051 reddb.v1.RedDB/Health".to_string(),
672            ),
673        );
674
675        let mut object = Map::new();
676        object.insert("ok".to_string(), JsonValue::Bool(true));
677        object.insert(
678            "service".to_string(),
679            JsonValue::String("reddb.v1.RedDB".to_string()),
680        );
681        object.insert(
682            "package".to_string(),
683            JsonValue::String("reddb.v1".to_string()),
684        );
685        object.insert(
686            "proto".to_string(),
687            JsonValue::String("crates/reddb-grpc-proto/proto/reddb.proto".to_string()),
688        );
689        object.insert("methods".to_string(), JsonValue::Object(methods));
690        object.insert("examples".to_string(), JsonValue::Object(examples));
691        object.insert(
692            "transport_listeners".to_string(),
693            self.transport_readiness_json(),
694        );
695        object.insert(
696            "hint".to_string(),
697            JsonValue::String(
698                "If grpcurl cannot list services, pass the proto file with -import-path crates/reddb-grpc-proto/proto -proto reddb.proto."
699                    .to_string(),
700            ),
701        );
702        json_response(200, JsonValue::Object(object))
703    }
704
705    fn handle_query_contract(&self) -> HttpResponse {
706        let mut examples = Map::new();
707        examples.insert(
708            "raw_sql".to_string(),
709            JsonValue::String("curl -sS http://127.0.0.1:8080/query -d 'SELECT 1'".to_string()),
710        );
711        examples.insert(
712            "json_query".to_string(),
713            JsonValue::String(
714                "curl -sS http://127.0.0.1:8080/query -H 'content-type: application/json' -d '{\"query\":\"SELECT 1\"}'"
715                    .to_string(),
716            ),
717        );
718        examples.insert(
719            "json_query_with_params".to_string(),
720            JsonValue::String(
721                "curl -sS http://127.0.0.1:8080/query -H 'content-type: application/json' -d '{\"query\":\"SELECT $1 AS value\",\"params\":[42]}'"
722                    .to_string(),
723            ),
724        );
725
726        let mut request_body = Map::new();
727        request_body.insert(
728            "query".to_string(),
729            JsonValue::String("required string".to_string()),
730        );
731        request_body.insert(
732            "params".to_string(),
733            JsonValue::String("optional array".to_string()),
734        );
735
736        let mut response_shape = Map::new();
737        response_shape.insert(
738            "columns".to_string(),
739            JsonValue::String("projected column names".to_string()),
740        );
741        response_shape.insert(
742            "records[].values".to_string(),
743            JsonValue::String("only projected values".to_string()),
744        );
745        response_shape.insert(
746            "records[].meta".to_string(),
747            JsonValue::String("internal metadata when present".to_string()),
748        );
749
750        let mut object = Map::new();
751        object.insert("ok".to_string(), JsonValue::Bool(false));
752        object.insert(
753            "code".to_string(),
754            JsonValue::String("method_not_allowed".to_string()),
755        );
756        object.insert(
757            "message".to_string(),
758            JsonValue::String("/query accepts POST requests".to_string()),
759        );
760        object.insert(
761            "hint".to_string(),
762            JsonValue::String(
763                "Send raw SQL in the body, or JSON with a string 'query' field.".to_string(),
764            ),
765        );
766        object.insert("method".to_string(), JsonValue::String("POST".to_string()));
767        object.insert("path".to_string(), JsonValue::String("/query".to_string()));
768        object.insert("request_body".to_string(), JsonValue::Object(request_body));
769        object.insert(
770            "response_shape".to_string(),
771            JsonValue::Object(response_shape),
772        );
773        object.insert("examples".to_string(), JsonValue::Object(examples));
774        object.insert(
775            "docs".to_string(),
776            JsonValue::String("https://reddb.io/docs/query".to_string()),
777        );
778
779        json_response(405, JsonValue::Object(object))
780            .with_header("Allow", http::HeaderValue::from_static("POST"))
781    }
782
783    fn handle_root_discovery(&self) -> HttpResponse {
784        let mut endpoints = Map::new();
785        endpoints.insert(
786            "health".to_string(),
787            JsonValue::String("GET /health".to_string()),
788        );
789        endpoints.insert(
790            "ready".to_string(),
791            JsonValue::String("GET /ready".to_string()),
792        );
793        endpoints.insert(
794            "query".to_string(),
795            JsonValue::String("POST /query".to_string()),
796        );
797        endpoints.insert(
798            "query_readiness".to_string(),
799            JsonValue::String("GET /ready/query".to_string()),
800        );
801        endpoints.insert(
802            "catalog".to_string(),
803            JsonValue::String("GET /catalog".to_string()),
804        );
805        endpoints.insert(
806            "deployment_profiles".to_string(),
807            JsonValue::String("GET /deployment/profiles".to_string()),
808        );
809
810        let mut examples = Map::new();
811        examples.insert(
812            "http_raw_sql".to_string(),
813            JsonValue::String("curl -sS http://127.0.0.1:8080/query -d 'SELECT 1'".to_string()),
814        );
815        examples.insert(
816            "http_json_query".to_string(),
817            JsonValue::String(
818                "curl -sS http://127.0.0.1:8080/query -H 'content-type: application/json' -d '{\"query\":\"SELECT 1\"}'"
819                    .to_string(),
820            ),
821        );
822        examples.insert(
823            "http_json_query_with_params".to_string(),
824            JsonValue::String(
825                "curl -sS http://127.0.0.1:8080/query -H 'content-type: application/json' -d '{\"query\":\"SELECT $1 AS value\",\"params\":[42]}'"
826                    .to_string(),
827            ),
828        );
829
830        let mut object = Map::new();
831        object.insert("ok".to_string(), JsonValue::Bool(true));
832        object.insert(
833            "service".to_string(),
834            JsonValue::String("reddb".to_string()),
835        );
836        object.insert(
837            "version".to_string(),
838            JsonValue::String(env!("CARGO_PKG_VERSION").to_string()),
839        );
840        object.insert("endpoints".to_string(), JsonValue::Object(endpoints));
841        object.insert("examples".to_string(), JsonValue::Object(examples));
842        object.insert(
843            "docs".to_string(),
844            JsonValue::String("https://reddb.io/docs".to_string()),
845        );
846        object.insert(
847            "transport_listeners".to_string(),
848            self.transport_readiness_json(),
849        );
850        json_response(200, JsonValue::Object(object))
851    }
852
853    fn health_json_with_transport(&self, report: &HealthReport) -> JsonValue {
854        let mut value = crate::presentation::ops_json::health_json(report);
855        if let JsonValue::Object(ref mut object) = value {
856            object.insert(
857                "transport_listeners".to_string(),
858                self.transport_readiness_json(),
859            );
860        }
861        value
862    }
863
864    pub fn serve(&self) -> io::Result<()> {
865        let listener = TcpListener::bind(&self.options.bind_addr)?;
866        self.serve_on(listener)
867    }
868
869    /// Serve the async axum/hyper HTTP edge (issue #931) on the given
870    /// listener until it errors fatally. A dedicated multi-threaded tokio
871    /// runtime drives the I/O; the synchronous disk-backed engine is
872    /// reached via `spawn_blocking`. This replaces the retired
873    /// thread-per-connection accept loop and its `(2*num_cpus)` thread
874    /// cap — idle keep-alive connections are now cheap parked tasks.
875    pub fn serve_on(&self, listener: TcpListener) -> io::Result<()> {
876        let runtime = axum_edge::build_edge_runtime()?;
877        runtime.block_on(
878            self.clone()
879                .serve_edge_on_std(listener, HttpTransport::Http),
880        )
881    }
882
883    /// Accept and serve a single connection to completion, then return.
884    /// Used by tests that want a one-shot HTTP server alongside another
885    /// transport.
886    pub fn serve_one_on(&self, listener: TcpListener) -> io::Result<()> {
887        let runtime = axum_edge::build_background_edge_runtime()?;
888        let server = self.clone();
889        runtime.block_on(async move {
890            listener.set_nonblocking(true)?;
891            let listener = tokio::net::TcpListener::from_std(listener)?;
892            let (stream, _peer) = listener.accept().await?;
893            server.serve_edge_one(stream).await;
894            Ok(())
895        })
896    }
897
898    pub fn serve_in_background(&self) -> thread::JoinHandle<io::Result<()>> {
899        let server = self.clone();
900        thread::spawn(move || server.serve())
901    }
902
903    pub fn serve_in_background_on(
904        &self,
905        listener: TcpListener,
906    ) -> thread::JoinHandle<io::Result<()>> {
907        let server = self.clone();
908        thread::spawn(move || {
909            let runtime = axum_edge::build_background_edge_runtime()?;
910            runtime.block_on(server.serve_edge_on_std(listener, HttpTransport::Http))
911        })
912    }
913
914    /// Serve TLS-wrapped HTTPS on the configured `bind_addr`. The
915    /// `tls_config` is shared across all connections (rustls
916    /// `ServerConfig` is `Send + Sync`).
917    pub fn serve_tls(&self, tls_config: std::sync::Arc<rustls::ServerConfig>) -> io::Result<()> {
918        let listener = TcpListener::bind(&self.options.bind_addr)?;
919        self.serve_tls_on(listener, tls_config)
920    }
921
922    pub fn serve_tls_on(
923        &self,
924        listener: TcpListener,
925        tls_config: std::sync::Arc<rustls::ServerConfig>,
926    ) -> io::Result<()> {
927        let runtime = axum_edge::build_edge_runtime()?;
928        let acceptor = axum_edge::tls_acceptor(tls_config);
929        runtime.block_on(self.clone().serve_edge_tls_on_std(
930            listener,
931            acceptor,
932            HttpTransport::Https,
933        ))
934    }
935
936    pub fn serve_tls_in_background(
937        &self,
938        tls_config: std::sync::Arc<rustls::ServerConfig>,
939    ) -> thread::JoinHandle<io::Result<()>> {
940        let server = self.clone();
941        thread::spawn(move || server.serve_tls(tls_config))
942    }
943
944    pub fn serve_tls_in_background_on(
945        &self,
946        listener: TcpListener,
947        tls_config: std::sync::Arc<rustls::ServerConfig>,
948    ) -> thread::JoinHandle<io::Result<()>> {
949        let server = self.clone();
950        thread::spawn(move || {
951            let runtime = axum_edge::build_background_edge_runtime()?;
952            let acceptor = axum_edge::tls_acceptor(tls_config);
953            runtime.block_on(server.serve_edge_tls_on_std(listener, acceptor, HttpTransport::Https))
954        })
955    }
956
957    fn handle_connection(&self, stream: TcpStream) -> io::Result<()> {
958        let started = Instant::now();
959        let result = self.handle_connection_inner(stream);
960        let elapsed = started.elapsed().as_secs_f64();
961        self.http_metrics
962            .record_duration(HttpTransport::Http, elapsed);
963        result
964    }
965
966    fn handle_connection_inner(&self, mut stream: TcpStream) -> io::Result<()> {
967        stream.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
968        stream.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;
969
970        // Issue #570 slice 2 / #621: arm a deadline at handler spawn and
971        // check at coarse boundaries. Armed against the injectable
972        // monotonic clock (#621) so timeout behaviour is deterministically
973        // testable without real sleeps; production wires the real clock,
974        // so this tracks wall time. No hard pre-emption — a thread blocked
975        // inside a true syscall is still bounded only by the per-socket
976        // read/write timeouts.
977        let deadline = HandlerDeadline::arm(Arc::clone(&self.handler_clock), self.handler_timeout);
978
979        let request = HttpRequest::read_from(&mut stream, self.options.max_body_bytes)?;
980
981        // Boundary (a): between request parse and route dispatch.
982        if deadline.expired() {
983            self.http_metrics
984                .record_reject(HttpTransport::Http, HttpRejectReason::HandlerTimeout);
985            Self::write_handler_timeout_503(&mut stream);
986            return Ok(());
987        }
988
989        if self.try_route_streaming(&request, &mut stream)? {
990            return Ok(());
991        }
992        let response = self.route(request);
993
994        // Test-only injected slow downstream (issue #570 slice 2
995        // integration test). Production builds set this to 0, so this
996        // is a single relaxed atomic load on the hot path.
997        let inject_ms = self.slow_inject_ms.load(Ordering::Relaxed);
998        if inject_ms > 0 {
999            thread::sleep(Duration::from_millis(inject_ms));
1000        }
1001
1002        // Boundary (b): between route dispatch and response write.
1003        if deadline.expired() {
1004            self.http_metrics
1005                .record_reject(HttpTransport::Http, HttpRejectReason::HandlerTimeout);
1006            Self::write_handler_timeout_503(&mut stream);
1007            return Ok(());
1008        }
1009
1010        stream.write_all(&response.to_http_bytes())?;
1011        stream.flush()?;
1012        Ok(())
1013    }
1014
1015    /// Best-effort 503 emitted when the per-handler deadline expires
1016    /// at a coarse boundary. Writes are swallowed — the caller has
1017    /// already exceeded its budget, so we do not propagate write
1018    /// errors. Permit drop happens on the handler thread's normal
1019    /// exit path.
1020    fn write_handler_timeout_503<S: Write>(stream: &mut S) {
1021        const RESPONSE: &[u8] = b"HTTP/1.1 503 Service Unavailable\r\n\
1022            Connection: close\r\n\
1023            Content-Length: 0\r\n\
1024            \r\n";
1025        let _ = stream.write_all(RESPONSE);
1026        let _ = stream.flush();
1027    }
1028}