1pub(crate) use crate::application::json_input::{
4 json_bool_field, json_f32_field, json_string_field, json_usize_field,
5};
6pub(crate) use crate::application::{
7 AdminUseCases, CatalogUseCases, CreateDocumentInput, CreateEdgeInput, CreateEntityOutput,
8 CreateKvInput, CreateNodeEmbeddingInput, CreateNodeGraphLinkInput, CreateNodeInput,
9 CreateNodeTableLinkInput, CreateRowInput, CreateVectorInput, DeleteEntityInput, EntityUseCases,
10 ExecuteQueryInput, ExplainQueryInput, GraphCentralityInput, GraphClusteringInput,
11 GraphCommunitiesInput, GraphComponentsInput, GraphCyclesInput, GraphHitsInput,
12 GraphNeighborhoodInput, GraphPersonalizedPageRankInput, GraphShortestPathInput,
13 GraphTopologicalSortInput, GraphTraversalInput, GraphUseCases, InspectNativeArtifactInput,
14 NativeUseCases, PatchEntityInput, PatchEntityOperation, PatchEntityOperationType,
15 QueryUseCases, SearchHybridInput, SearchIvfInput, SearchMultimodalInput, SearchSimilarInput,
16 SearchTextInput, TreeUseCases,
17};
18use std::collections::{BTreeMap, HashMap};
19use std::io::{self, Read, Write};
20use std::net::{TcpListener, TcpStream};
21use std::sync::atomic::{AtomicU64, Ordering};
22use std::thread;
23use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
24
25use std::sync::Arc;
26
27use crate::api::{RedDBError, RedDBOptions, RedDBResult};
28use crate::auth::store::AuthStore;
29use crate::catalog::{CatalogModelSnapshot, CollectionDescriptor, CollectionModel, SchemaMode};
30use crate::health::{HealthProvider, HealthReport, HealthState};
31use crate::json::{parse_json, to_vec as json_to_vec, Map, Value as JsonValue};
32use crate::runtime::{
33 RedDBRuntime, RuntimeFilter, RuntimeFilterValue, RuntimeGraphCentralityAlgorithm,
34 RuntimeGraphCentralityResult, RuntimeGraphClusteringResult, RuntimeGraphCommunityAlgorithm,
35 RuntimeGraphCommunityResult, RuntimeGraphComponentsMode, RuntimeGraphComponentsResult,
36 RuntimeGraphCyclesResult, RuntimeGraphDirection, RuntimeGraphHitsResult,
37 RuntimeGraphNeighborhoodResult, RuntimeGraphPathAlgorithm, RuntimeGraphPathResult,
38 RuntimeGraphPattern, RuntimeGraphProjection, RuntimeGraphTopologicalSortResult,
39 RuntimeGraphTraversalResult, RuntimeGraphTraversalStrategy, RuntimeIvfSearchResult,
40 RuntimeQueryWeights, RuntimeStats, ScanCursor, ScanPage,
41};
42use crate::storage::schema::Value;
43use crate::storage::unified::devx::refs::{NodeRef, TableRef, VectorRef};
44use crate::storage::unified::dsl::{MatchComponents, QueryResult as DslQueryResult};
45use crate::storage::unified::{MetadataValue, RefTarget, SparseVector};
46use crate::storage::{CrossRef, EntityData, EntityId, EntityKind, SimilarResult, UnifiedEntity};
47
48fn analytics_job_json(job: &crate::PhysicalAnalyticsJob) -> JsonValue {
49 crate::presentation::admin_json::analytics_job_json(job)
50}
51
52#[cfg(test)]
53mod tests {
54 use super::*;
55 use crate::api::RedDBOptions;
56 use crate::health::HealthReport;
57 use crate::service_cli::{
58 TransportListenerFailure, TransportListenerState, TransportReadiness,
59 };
60
61 #[test]
62 fn server_options_default_http_body_limit_is_32_mib() {
63 assert_eq!(ServerOptions::default().max_body_bytes, 32 * 1024 * 1024);
64 }
65
66 #[test]
67 fn health_json_reports_transport_listeners() {
68 let runtime = RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime");
69 let mut options = ServerOptions::default();
70 options.transport_readiness = TransportReadiness {
71 active: vec![TransportListenerState {
72 transport: "grpc".to_string(),
73 bind_addr: "127.0.0.1:5000".to_string(),
74 explicit: true,
75 }],
76 failed: vec![TransportListenerFailure {
77 transport: "http".to_string(),
78 bind_addr: "127.0.0.1:5000".to_string(),
79 explicit: false,
80 reason: "http listener bind 127.0.0.1:5000: address in use".to_string(),
81 }],
82 };
83 let server = RedDBServer::with_options(runtime, options);
84
85 let payload = server.health_json_with_transport(&HealthReport::healthy());
86 let JsonValue::Object(root) = payload else {
87 panic!("health payload should be an object");
88 };
89 let Some(JsonValue::Object(listeners)) = root.get("transport_listeners") else {
90 panic!("health payload should include transport_listeners");
91 };
92 let Some(JsonValue::Array(active)) = listeners.get("active") else {
93 panic!("transport_listeners.active should be an array");
94 };
95 let Some(JsonValue::Array(failed)) = listeners.get("failed") else {
96 panic!("transport_listeners.failed should be an array");
97 };
98
99 assert_eq!(active.len(), 1);
100 assert_eq!(failed.len(), 1);
101 }
102}
103
104fn graph_projection_json(projection: &crate::PhysicalGraphProjection) -> JsonValue {
105 crate::presentation::admin_json::graph_projection_json(projection)
106}
107
108mod axum_edge;
109pub mod handlers_admin;
110mod handlers_admin_metrics;
111mod handlers_admin_status;
112mod handlers_ai;
113mod handlers_ai_model_cache;
114mod handlers_auth;
115mod handlers_backup;
116mod handlers_browser_auth;
117mod handlers_capabilities;
118mod handlers_collection_policy;
119mod handlers_ec;
120pub(crate) mod handlers_entity;
121mod handlers_failover;
122mod handlers_geo;
123mod handlers_graph;
124mod handlers_iam_policy;
125mod handlers_keyed;
126mod handlers_log;
127mod handlers_metrics;
128mod handlers_ops;
129mod handlers_ops_policy;
130pub mod ui_bridge;
134mod ws_edge;
135pub mod ui_bundle_resolver;
139pub mod ui_auth;
142mod ui_static;
146pub mod ui_deeplink;
151pub(crate) mod handlers_query;
156mod handlers_replication;
157mod handlers_topology;
158mod handlers_vcs;
159mod handlers_vector;
160pub mod header_escape_guard;
161pub mod http_connection_limiter;
162pub mod http_handler_metrics;
163pub mod http_limits;
164pub mod http_principal_limiter;
165pub mod http_request_metrics;
166pub mod ingest_pipeline;
167pub mod output_stream;
168mod patch_support;
169mod request_body;
170mod request_context;
171mod route_catalog;
172mod routes;
173mod routing;
174mod serverless_support;
175pub mod tls;
176mod transport;
177
178use self::handlers_ai::*;
179use self::handlers_entity::*;
180use self::handlers_graph::*;
181use self::handlers_keyed::*;
182use self::handlers_metrics::*;
183use self::handlers_ops::*;
184use self::handlers_query::*;
185use self::http_connection_limiter::{
186 HandlerDeadline, HttpConnectionLimiter, MonotonicClock, SystemMonotonicClock,
187};
188use self::http_handler_metrics::{HttpHandlerMetrics, HttpRejectReason, HttpTransport};
189pub use self::http_limits::{
190 HttpLimitsCliInput, HttpLimitsResolved, DEFAULT_HANDLER_TIMEOUT_MS, DEFAULT_RETRY_AFTER_SECS,
191};
192use self::http_principal_limiter::PrincipalConnectionLimiter;
193use self::http_request_metrics::HttpRequestMetrics;
194use self::patch_support::*;
195use self::request_body::*;
196use self::routing::*;
197use self::serverless_support::*;
198use self::transport::*;
199
200#[derive(Debug, Clone, Copy, PartialEq, Eq)]
206pub enum ServerSurface {
207 Public,
209 AdminOnly,
213 MetricsOnly,
217}
218
219#[derive(Debug, Clone)]
220pub struct ServerOptions {
221 pub bind_addr: String,
222 pub max_body_bytes: usize,
223 pub read_timeout_ms: u64,
224 pub write_timeout_ms: u64,
225 pub max_scan_limit: usize,
226 pub surface: ServerSurface,
230 pub transport_readiness: crate::service_cli::TransportReadiness,
231 pub websocket_allowed_origins: Vec<String>,
239 pub ui_dir: Option<std::path::PathBuf>,
248}
249
250pub const DEFAULT_HTTP_MAX_BODY_BYTES: usize = 32 * 1024 * 1024;
251
252impl Default for ServerOptions {
253 fn default() -> Self {
254 Self {
255 bind_addr: "127.0.0.1:5000".to_string(),
256 max_body_bytes: DEFAULT_HTTP_MAX_BODY_BYTES,
257 read_timeout_ms: 5_000,
258 write_timeout_ms: 5_000,
259 max_scan_limit: 1_000,
260 surface: ServerSurface::Public,
261 transport_readiness: crate::service_cli::TransportReadiness::default(),
262 websocket_allowed_origins: Vec::new(),
263 ui_dir: None,
264 }
265 }
266}
267
268pub struct ServerReplicationState {
270 pub config: crate::replication::ReplicationConfig,
271 pub primary: Option<crate::replication::primary::PrimaryReplication>,
272}
273
274#[derive(Clone)]
275pub struct RedDBServer {
276 runtime: RedDBRuntime,
277 options: ServerOptions,
278 auth_store: Option<Arc<AuthStore>>,
279 replication: Option<Arc<ServerReplicationState>>,
280 http_limiter: HttpConnectionLimiter,
285 handler_timeout: Duration,
291 handler_clock: Arc<dyn MonotonicClock>,
298 slow_inject_ms: Arc<AtomicU64>,
305 http_metrics: HttpHandlerMetrics,
311 http_request_metrics: HttpRequestMetrics,
317 retry_after_secs: u64,
321 principal_limiter: PrincipalConnectionLimiter,
329 pub(crate) stream_capacity: Arc<output_stream::StreamCapacityRegistry>,
337 pub(crate) lease_registry: Arc<output_stream::LeaseRegistry>,
342 pub(crate) cursor_registry: Arc<output_stream::CursorRegistry>,
347}
348
349const DEFAULT_HANDLER_TIMEOUT: Duration = Duration::from_millis(30_000);
351
352#[derive(Debug, Clone, Copy, PartialEq, Eq)]
353enum ServerlessWarmupScope {
354 Indexes,
355 GraphProjections,
356 AnalyticsJobs,
357 NativeArtifacts,
358}
359
360#[derive(Debug, Clone, Copy, PartialEq, Eq)]
361enum DeploymentProfile {
362 Embedded,
363 Server,
364 Serverless,
365}
366
367fn percent_decode_path_segment(input: &str) -> Result<String, String> {
368 let bytes = input.as_bytes();
369 let mut out = Vec::with_capacity(bytes.len());
370 let mut index = 0;
371 while index < bytes.len() {
372 match bytes[index] {
373 b'%' => {
374 if index + 2 >= bytes.len() {
375 return Err("truncated percent escape".to_string());
376 }
377 let high = hex_value(bytes[index + 1])
378 .ok_or_else(|| "invalid percent escape".to_string())?;
379 let low = hex_value(bytes[index + 2])
380 .ok_or_else(|| "invalid percent escape".to_string())?;
381 out.push((high << 4) | low);
382 index += 3;
383 }
384 byte => {
385 out.push(byte);
386 index += 1;
387 }
388 }
389 }
390 String::from_utf8(out).map_err(|_| "path segment is not valid UTF-8".to_string())
391}
392
393fn hex_value(byte: u8) -> Option<u8> {
394 match byte {
395 b'0'..=b'9' => Some(byte - b'0'),
396 b'a'..=b'f' => Some(byte - b'a' + 10),
397 b'A'..=b'F' => Some(byte - b'A' + 10),
398 _ => None,
399 }
400}
401
402#[derive(Debug, Clone)]
403struct ParsedQueryRequest {
404 query: String,
405 entity_types: Option<Vec<String>>,
406 capabilities: Option<Vec<String>>,
407 params: Option<Vec<Value>>,
411}
412
413#[derive(Debug, Clone, Copy)]
414enum PatchOperationType {
415 Set,
416 Replace,
417 Unset,
418}
419
420#[derive(Debug, Clone)]
421struct PatchOperation {
422 op: PatchOperationType,
423 path: Vec<String>,
424 value: Option<JsonValue>,
425}
426
427impl RedDBServer {
428 pub fn new(runtime: RedDBRuntime) -> Self {
429 Self::with_options(runtime, ServerOptions::default())
430 }
431
432 pub fn from_database_options(
433 db_options: RedDBOptions,
434 server_options: ServerOptions,
435 ) -> RedDBResult<Self> {
436 let runtime = RedDBRuntime::with_options(db_options)?;
437 Ok(Self::with_options(runtime, server_options))
438 }
439
440 pub fn with_options(runtime: RedDBRuntime, options: ServerOptions) -> Self {
441 Self {
442 runtime,
443 options,
444 auth_store: None,
445 replication: None,
446 http_limiter: HttpConnectionLimiter::with_default_cap(),
447 handler_timeout: DEFAULT_HANDLER_TIMEOUT,
448 handler_clock: Arc::new(SystemMonotonicClock::new()),
449 slow_inject_ms: Arc::new(AtomicU64::new(0)),
450 http_metrics: HttpHandlerMetrics::new(),
451 http_request_metrics: HttpRequestMetrics::new(),
452 retry_after_secs: DEFAULT_RETRY_AFTER_SECS,
453 principal_limiter: PrincipalConnectionLimiter::new(
454 http_limits::DEFAULT_MAX_INFLIGHT_PER_PRINCIPAL,
455 ),
456 stream_capacity: output_stream::StreamCapacityRegistry::new(),
457 lease_registry: output_stream::LeaseRegistry::new(),
458 cursor_registry: output_stream::CursorRegistry::new(),
459 }
460 }
461
462 #[doc(hidden)]
463 pub fn stream_capacity(&self) -> &Arc<output_stream::StreamCapacityRegistry> {
464 &self.stream_capacity
465 }
466
467 #[doc(hidden)]
468 pub fn lease_registry(&self) -> &Arc<output_stream::LeaseRegistry> {
469 &self.lease_registry
470 }
471
472 #[doc(hidden)]
473 pub fn cursor_registry(&self) -> &Arc<output_stream::CursorRegistry> {
474 &self.cursor_registry
475 }
476
477 #[doc(hidden)]
478 pub fn http_metrics(&self) -> &HttpHandlerMetrics {
479 &self.http_metrics
480 }
481
482 #[doc(hidden)]
485 pub fn http_request_metrics(&self) -> &HttpRequestMetrics {
486 &self.http_request_metrics
487 }
488
489 #[doc(hidden)]
494 pub fn with_http_limiter_cap(mut self, cap: usize) -> Self {
495 self.http_limiter = HttpConnectionLimiter::new(cap);
496 self
497 }
498
499 pub fn with_http_limits(mut self, limits: HttpLimitsResolved) -> Self {
504 self.http_limiter = HttpConnectionLimiter::new(limits.max_handlers);
505 self.handler_timeout = Duration::from_millis(limits.handler_timeout_ms);
506 self.retry_after_secs = limits.retry_after_secs;
507 self.principal_limiter = PrincipalConnectionLimiter::new(limits.max_inflight_per_principal);
508 self
509 }
510
511 #[doc(hidden)]
516 pub fn with_principal_inflight_cap(mut self, cap: usize) -> Self {
517 self.principal_limiter = PrincipalConnectionLimiter::new(cap);
518 self
519 }
520
521 #[doc(hidden)]
522 pub fn principal_limiter(&self) -> &PrincipalConnectionLimiter {
523 &self.principal_limiter
524 }
525
526 #[doc(hidden)]
527 pub fn retry_after_secs(&self) -> u64 {
528 self.retry_after_secs
529 }
530
531 #[doc(hidden)]
532 pub fn http_limiter(&self) -> &HttpConnectionLimiter {
533 &self.http_limiter
534 }
535
536 #[doc(hidden)]
539 pub fn with_handler_timeout(mut self, timeout: Duration) -> Self {
540 self.handler_timeout = timeout;
541 self
542 }
543
544 #[doc(hidden)]
545 pub fn handler_timeout(&self) -> Duration {
546 self.handler_timeout
547 }
548
549 #[doc(hidden)]
554 pub fn with_handler_clock(mut self, clock: Arc<dyn MonotonicClock>) -> Self {
555 self.handler_clock = clock;
556 self
557 }
558
559 #[doc(hidden)]
565 pub fn set_test_slow_inject_ms(&self, ms: u64) {
566 self.slow_inject_ms.store(ms, Ordering::Relaxed);
567 }
568
569 pub fn with_auth(mut self, auth_store: Arc<AuthStore>) -> Self {
573 self.runtime.set_auth_store(Arc::clone(&auth_store));
574 self.auth_store = Some(auth_store);
575 self
576 }
577
578 pub fn with_replication(mut self, state: Arc<ServerReplicationState>) -> Self {
580 self.replication = Some(state);
581 self
582 }
583
584 pub fn with_websocket_allowed_origins(mut self, origins: Vec<String>) -> Self {
589 self.options.websocket_allowed_origins = origins;
590 self
591 }
592
593 pub(crate) fn websocket_allowed_origins(&self) -> &[String] {
595 &self.options.websocket_allowed_origins
596 }
597
598 pub fn with_ui_dir(mut self, dir: std::path::PathBuf) -> Self {
604 self.options.ui_dir = Some(dir);
605 self
606 }
607
608 pub(crate) fn ui_dir(&self) -> Option<&std::path::Path> {
611 self.options.ui_dir.as_deref()
612 }
613
614 pub fn with_browser_token_authority(
621 self,
622 authority: Arc<crate::auth::browser_token::BrowserTokenAuthority>,
623 ) -> Self {
624 self.runtime.set_browser_token_authority(Some(authority));
625 self
626 }
627
628 pub fn runtime(&self) -> &RedDBRuntime {
629 &self.runtime
630 }
631
632 pub fn options(&self) -> &ServerOptions {
633 &self.options
634 }
635
636 fn query_use_cases(&self) -> QueryUseCases<'_, RedDBRuntime> {
637 QueryUseCases::new(&self.runtime)
638 }
639
640 fn admin_use_cases(&self) -> AdminUseCases<'_, RedDBRuntime> {
641 AdminUseCases::new(&self.runtime)
642 }
643
644 fn entity_use_cases(&self) -> EntityUseCases<'_, RedDBRuntime> {
645 EntityUseCases::new(&self.runtime)
646 }
647
648 fn catalog_use_cases(&self) -> CatalogUseCases<'_, RedDBRuntime> {
649 CatalogUseCases::new(&self.runtime)
650 }
651
652 fn graph_use_cases(&self) -> GraphUseCases<'_, RedDBRuntime> {
653 GraphUseCases::new(&self.runtime)
654 }
655
656 fn native_use_cases(&self) -> NativeUseCases<'_, RedDBRuntime> {
657 NativeUseCases::new(&self.runtime)
658 }
659
660 fn tree_use_cases(&self) -> TreeUseCases<'_, RedDBRuntime> {
661 TreeUseCases::new(&self.runtime)
662 }
663
664 fn transport_readiness_json(&self) -> JsonValue {
665 let active = self
666 .options
667 .transport_readiness
668 .active
669 .iter()
670 .map(|listener| {
671 let mut object = Map::new();
672 object.insert(
673 "transport".to_string(),
674 JsonValue::String(listener.transport.clone()),
675 );
676 object.insert(
677 "bind_addr".to_string(),
678 JsonValue::String(listener.bind_addr.clone()),
679 );
680 object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
681 JsonValue::Object(object)
682 })
683 .collect();
684 let failed = self
685 .options
686 .transport_readiness
687 .failed
688 .iter()
689 .map(|listener| {
690 let mut object = Map::new();
691 object.insert(
692 "transport".to_string(),
693 JsonValue::String(listener.transport.clone()),
694 );
695 object.insert(
696 "bind_addr".to_string(),
697 JsonValue::String(listener.bind_addr.clone()),
698 );
699 object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
700 object.insert(
701 "reason".to_string(),
702 JsonValue::String(listener.reason.clone()),
703 );
704 JsonValue::Object(object)
705 })
706 .collect();
707
708 let mut object = Map::new();
709 object.insert("active".to_string(), JsonValue::Array(active));
710 object.insert("failed".to_string(), JsonValue::Array(failed));
711 JsonValue::Object(object)
712 }
713
714 fn handle_grpc_discovery(&self) -> HttpResponse {
715 let mut methods = Map::new();
716 methods.insert(
717 "query".to_string(),
718 JsonValue::String("reddb.v1.RedDB/Query".to_string()),
719 );
720 methods.insert(
721 "batch_query".to_string(),
722 JsonValue::String("reddb.v1.RedDB/BatchQuery".to_string()),
723 );
724 methods.insert(
725 "health".to_string(),
726 JsonValue::String("reddb.v1.RedDB/Health".to_string()),
727 );
728 methods.insert(
729 "prepare".to_string(),
730 JsonValue::String("reddb.v1.RedDB/Prepare".to_string()),
731 );
732 methods.insert(
733 "execute_prepared".to_string(),
734 JsonValue::String("reddb.v1.RedDB/ExecutePrepared".to_string()),
735 );
736
737 let mut examples = Map::new();
738 examples.insert(
739 "query".to_string(),
740 JsonValue::String(
741 "grpcurl -plaintext -d '{\"query\":\"SELECT 1\"}' 127.0.0.1:5000 reddb.v1.RedDB/Query"
742 .to_string(),
743 ),
744 );
745 examples.insert(
746 "query_with_params".to_string(),
747 JsonValue::String(
748 "grpcurl -plaintext -d '{\"query\":\"SELECT $1 AS value\",\"params\":[{\"intValue\":42}]}' 127.0.0.1:5000 reddb.v1.RedDB/Query"
749 .to_string(),
750 ),
751 );
752 examples.insert(
753 "health".to_string(),
754 JsonValue::String(
755 "grpcurl -plaintext -d '{}' 127.0.0.1:5000 reddb.v1.RedDB/Health".to_string(),
756 ),
757 );
758
759 let mut object = Map::new();
760 object.insert("ok".to_string(), JsonValue::Bool(true));
761 object.insert(
762 "service".to_string(),
763 JsonValue::String("reddb.v1.RedDB".to_string()),
764 );
765 object.insert(
766 "package".to_string(),
767 JsonValue::String("reddb.v1".to_string()),
768 );
769 object.insert(
770 "proto".to_string(),
771 JsonValue::String("crates/reddb-grpc-proto/proto/reddb.proto".to_string()),
772 );
773 object.insert("methods".to_string(), JsonValue::Object(methods));
774 object.insert("examples".to_string(), JsonValue::Object(examples));
775 object.insert(
776 "transport_listeners".to_string(),
777 self.transport_readiness_json(),
778 );
779 object.insert(
780 "hint".to_string(),
781 JsonValue::String(
782 "If grpcurl cannot list services, pass the proto file with -import-path crates/reddb-grpc-proto/proto -proto reddb.proto."
783 .to_string(),
784 ),
785 );
786 json_response(200, JsonValue::Object(object))
787 }
788
789 fn handle_query_contract(&self) -> HttpResponse {
790 let mut examples = Map::new();
791 examples.insert(
792 "raw_sql".to_string(),
793 JsonValue::String("curl -sS http://127.0.0.1:5000/query -d 'SELECT 1'".to_string()),
794 );
795 examples.insert(
796 "json_query".to_string(),
797 JsonValue::String(
798 "curl -sS http://127.0.0.1:5000/query -H 'content-type: application/json' -d '{\"query\":\"SELECT 1\"}'"
799 .to_string(),
800 ),
801 );
802 examples.insert(
803 "json_query_with_params".to_string(),
804 JsonValue::String(
805 "curl -sS http://127.0.0.1:5000/query -H 'content-type: application/json' -d '{\"query\":\"SELECT $1 AS value\",\"params\":[42]}'"
806 .to_string(),
807 ),
808 );
809
810 let mut request_body = Map::new();
811 request_body.insert(
812 "query".to_string(),
813 JsonValue::String("required string".to_string()),
814 );
815 request_body.insert(
816 "params".to_string(),
817 JsonValue::String("optional array".to_string()),
818 );
819
820 let mut response_shape = Map::new();
821 response_shape.insert(
822 "columns".to_string(),
823 JsonValue::String("projected column names".to_string()),
824 );
825 response_shape.insert(
826 "records[].values".to_string(),
827 JsonValue::String("only projected values".to_string()),
828 );
829 response_shape.insert(
830 "records[].meta".to_string(),
831 JsonValue::String("internal metadata when present".to_string()),
832 );
833
834 let mut object = Map::new();
835 object.insert("ok".to_string(), JsonValue::Bool(false));
836 object.insert(
837 "code".to_string(),
838 JsonValue::String("method_not_allowed".to_string()),
839 );
840 object.insert(
841 "message".to_string(),
842 JsonValue::String("/query accepts POST requests".to_string()),
843 );
844 object.insert(
845 "hint".to_string(),
846 JsonValue::String(
847 "Send raw SQL in the body, or JSON with a string 'query' field.".to_string(),
848 ),
849 );
850 object.insert("method".to_string(), JsonValue::String("POST".to_string()));
851 object.insert("path".to_string(), JsonValue::String("/query".to_string()));
852 object.insert("request_body".to_string(), JsonValue::Object(request_body));
853 object.insert(
854 "response_shape".to_string(),
855 JsonValue::Object(response_shape),
856 );
857 object.insert("examples".to_string(), JsonValue::Object(examples));
858 object.insert(
859 "docs".to_string(),
860 JsonValue::String("https://reddb.io/docs/query".to_string()),
861 );
862
863 json_response(405, JsonValue::Object(object))
864 .with_header("Allow", http::HeaderValue::from_static("POST"))
865 }
866
867 fn handle_root_discovery(&self) -> HttpResponse {
868 let mut endpoints = Map::new();
869 endpoints.insert(
870 "health".to_string(),
871 JsonValue::String("GET /health".to_string()),
872 );
873 endpoints.insert(
874 "ready".to_string(),
875 JsonValue::String("GET /ready".to_string()),
876 );
877 endpoints.insert(
878 "query".to_string(),
879 JsonValue::String("POST /query".to_string()),
880 );
881 endpoints.insert(
882 "query_readiness".to_string(),
883 JsonValue::String("GET /ready/query".to_string()),
884 );
885 endpoints.insert(
886 "catalog".to_string(),
887 JsonValue::String("GET /catalog".to_string()),
888 );
889 endpoints.insert(
890 "deployment_profiles".to_string(),
891 JsonValue::String("GET /deployment/profiles".to_string()),
892 );
893
894 let mut examples = Map::new();
895 examples.insert(
896 "http_raw_sql".to_string(),
897 JsonValue::String("curl -sS http://127.0.0.1:5000/query -d 'SELECT 1'".to_string()),
898 );
899 examples.insert(
900 "http_json_query".to_string(),
901 JsonValue::String(
902 "curl -sS http://127.0.0.1:5000/query -H 'content-type: application/json' -d '{\"query\":\"SELECT 1\"}'"
903 .to_string(),
904 ),
905 );
906 examples.insert(
907 "http_json_query_with_params".to_string(),
908 JsonValue::String(
909 "curl -sS http://127.0.0.1:5000/query -H 'content-type: application/json' -d '{\"query\":\"SELECT $1 AS value\",\"params\":[42]}'"
910 .to_string(),
911 ),
912 );
913
914 let mut object = Map::new();
915 object.insert("ok".to_string(), JsonValue::Bool(true));
916 object.insert(
917 "service".to_string(),
918 JsonValue::String("reddb".to_string()),
919 );
920 object.insert(
921 "version".to_string(),
922 JsonValue::String(env!("CARGO_PKG_VERSION").to_string()),
923 );
924 object.insert("endpoints".to_string(), JsonValue::Object(endpoints));
925 object.insert("examples".to_string(), JsonValue::Object(examples));
926 object.insert(
927 "docs".to_string(),
928 JsonValue::String("https://reddb.io/docs".to_string()),
929 );
930 object.insert(
931 "transport_listeners".to_string(),
932 self.transport_readiness_json(),
933 );
934 json_response(200, JsonValue::Object(object))
935 }
936
937 fn health_json_with_transport(&self, report: &HealthReport) -> JsonValue {
938 let mut value = crate::presentation::ops_json::health_json(report);
939 if let JsonValue::Object(ref mut object) = value {
940 object.insert(
941 "transport_listeners".to_string(),
942 self.transport_readiness_json(),
943 );
944 }
945 value
946 }
947
948 pub fn serve(&self) -> io::Result<()> {
949 let listener = TcpListener::bind(&self.options.bind_addr)?;
950 self.serve_on(listener)
951 }
952
953 pub fn serve_on(&self, listener: TcpListener) -> io::Result<()> {
960 let runtime = axum_edge::build_edge_runtime()?;
961 runtime.block_on(
962 self.clone()
963 .serve_edge_on_std(listener, HttpTransport::Http),
964 )
965 }
966
967 pub fn serve_one_on(&self, listener: TcpListener) -> io::Result<()> {
971 let runtime = axum_edge::build_background_edge_runtime()?;
972 let server = self.clone();
973 runtime.block_on(async move {
974 listener.set_nonblocking(true)?;
975 let listener = tokio::net::TcpListener::from_std(listener)?;
976 let (stream, _peer) = listener.accept().await?;
977 server.serve_edge_one(stream).await;
978 Ok(())
979 })
980 }
981
982 pub fn serve_in_background(&self) -> thread::JoinHandle<io::Result<()>> {
983 let server = self.clone();
984 thread::spawn(move || server.serve())
985 }
986
987 pub fn serve_in_background_on(
988 &self,
989 listener: TcpListener,
990 ) -> thread::JoinHandle<io::Result<()>> {
991 let server = self.clone();
992 thread::spawn(move || {
993 let runtime = axum_edge::build_background_edge_runtime()?;
994 runtime.block_on(server.serve_edge_on_std(listener, HttpTransport::Http))
995 })
996 }
997
998 pub fn serve_tls(&self, tls_config: std::sync::Arc<rustls::ServerConfig>) -> io::Result<()> {
1002 let listener = TcpListener::bind(&self.options.bind_addr)?;
1003 self.serve_tls_on(listener, tls_config)
1004 }
1005
1006 pub fn serve_tls_on(
1007 &self,
1008 listener: TcpListener,
1009 tls_config: std::sync::Arc<rustls::ServerConfig>,
1010 ) -> io::Result<()> {
1011 let runtime = axum_edge::build_edge_runtime()?;
1012 let acceptor = axum_edge::tls_acceptor(tls_config);
1013 runtime.block_on(self.clone().serve_edge_tls_on_std(
1014 listener,
1015 acceptor,
1016 HttpTransport::Https,
1017 ))
1018 }
1019
1020 pub fn serve_tls_in_background(
1021 &self,
1022 tls_config: std::sync::Arc<rustls::ServerConfig>,
1023 ) -> thread::JoinHandle<io::Result<()>> {
1024 let server = self.clone();
1025 thread::spawn(move || server.serve_tls(tls_config))
1026 }
1027
1028 pub fn serve_tls_in_background_on(
1029 &self,
1030 listener: TcpListener,
1031 tls_config: std::sync::Arc<rustls::ServerConfig>,
1032 ) -> thread::JoinHandle<io::Result<()>> {
1033 let server = self.clone();
1034 thread::spawn(move || {
1035 let runtime = axum_edge::build_background_edge_runtime()?;
1036 let acceptor = axum_edge::tls_acceptor(tls_config);
1037 runtime.block_on(server.serve_edge_tls_on_std(listener, acceptor, HttpTransport::Https))
1038 })
1039 }
1040
1041 fn handle_connection(&self, stream: TcpStream) -> io::Result<()> {
1042 let started = Instant::now();
1043 let result = self.handle_connection_inner(stream);
1044 let elapsed = started.elapsed().as_secs_f64();
1045 self.http_metrics
1046 .record_duration(HttpTransport::Http, elapsed);
1047 result
1048 }
1049
1050 fn handle_connection_inner(&self, mut stream: TcpStream) -> io::Result<()> {
1051 stream.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
1052 stream.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;
1053
1054 let deadline = HandlerDeadline::arm(Arc::clone(&self.handler_clock), self.handler_timeout);
1062
1063 let request = HttpRequest::read_from(&mut stream, self.options.max_body_bytes)?;
1064
1065 if deadline.expired() {
1067 self.http_metrics
1068 .record_reject(HttpTransport::Http, HttpRejectReason::HandlerTimeout);
1069 Self::write_handler_timeout_503(&mut stream);
1070 return Ok(());
1071 }
1072
1073 if self.try_route_streaming(&request, &mut stream)? {
1074 return Ok(());
1075 }
1076 let response = self.route(request);
1077
1078 let inject_ms = self.slow_inject_ms.load(Ordering::Relaxed);
1082 if inject_ms > 0 {
1083 thread::sleep(Duration::from_millis(inject_ms));
1084 }
1085
1086 if deadline.expired() {
1088 self.http_metrics
1089 .record_reject(HttpTransport::Http, HttpRejectReason::HandlerTimeout);
1090 Self::write_handler_timeout_503(&mut stream);
1091 return Ok(());
1092 }
1093
1094 stream.write_all(&response.to_http_bytes())?;
1095 stream.flush()?;
1096 Ok(())
1097 }
1098
1099 fn write_handler_timeout_503<S: Write>(stream: &mut S) {
1105 const RESPONSE: &[u8] = b"HTTP/1.1 503 Service Unavailable\r\n\
1106 Connection: close\r\n\
1107 Content-Length: 0\r\n\
1108 \r\n";
1109 let _ = stream.write_all(RESPONSE);
1110 let _ = stream.flush();
1111 }
1112}