1pub(crate) use crate::application::json_input::{
4 json_bool_field, json_f32_field, json_string_field, json_usize_field,
5};
6pub(crate) use crate::application::{
7 AdminUseCases, CatalogUseCases, CreateDocumentInput, CreateEdgeInput, CreateEntityOutput,
8 CreateKvInput, CreateNodeEmbeddingInput, CreateNodeGraphLinkInput, CreateNodeInput,
9 CreateNodeTableLinkInput, CreateRowInput, CreateVectorInput, DeleteEntityInput, EntityUseCases,
10 ExecuteQueryInput, ExplainQueryInput, GraphCentralityInput, GraphClusteringInput,
11 GraphCommunitiesInput, GraphComponentsInput, GraphCyclesInput, GraphHitsInput,
12 GraphNeighborhoodInput, GraphPersonalizedPageRankInput, GraphShortestPathInput,
13 GraphTopologicalSortInput, GraphTraversalInput, GraphUseCases, InspectNativeArtifactInput,
14 NativeUseCases, PatchEntityInput, PatchEntityOperation, PatchEntityOperationType,
15 QueryUseCases, SearchHybridInput, SearchIvfInput, SearchMultimodalInput, SearchSimilarInput,
16 SearchTextInput, TreeUseCases,
17};
18use std::collections::{BTreeMap, HashMap};
19use std::io::{self, Read, Write};
20use std::net::{TcpListener, TcpStream};
21use std::sync::atomic::{AtomicU64, Ordering};
22use std::thread;
23use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
24
25use std::sync::Arc;
26
27use crate::api::{RedDBError, RedDBOptions, RedDBResult};
28use crate::auth::store::AuthStore;
29use crate::catalog::{CatalogModelSnapshot, CollectionDescriptor, CollectionModel, SchemaMode};
30use crate::health::{HealthProvider, HealthReport, HealthState};
31use crate::json::{parse_json, to_vec as json_to_vec, Map, Value as JsonValue};
32use crate::runtime::{
33 RedDBRuntime, RuntimeFilter, RuntimeFilterValue, RuntimeGraphCentralityAlgorithm,
34 RuntimeGraphCentralityResult, RuntimeGraphClusteringResult, RuntimeGraphCommunityAlgorithm,
35 RuntimeGraphCommunityResult, RuntimeGraphComponentsMode, RuntimeGraphComponentsResult,
36 RuntimeGraphCyclesResult, RuntimeGraphDirection, RuntimeGraphHitsResult,
37 RuntimeGraphNeighborhoodResult, RuntimeGraphPathAlgorithm, RuntimeGraphPathResult,
38 RuntimeGraphPattern, RuntimeGraphProjection, RuntimeGraphTopologicalSortResult,
39 RuntimeGraphTraversalResult, RuntimeGraphTraversalStrategy, RuntimeIvfSearchResult,
40 RuntimeQueryWeights, RuntimeStats, ScanCursor, ScanPage,
41};
42use crate::storage::schema::Value;
43use crate::storage::unified::devx::refs::{NodeRef, TableRef, VectorRef};
44use crate::storage::unified::dsl::{MatchComponents, QueryResult as DslQueryResult};
45use crate::storage::unified::{MetadataValue, RefTarget, SparseVector};
46use crate::storage::{CrossRef, EntityData, EntityId, EntityKind, SimilarResult, UnifiedEntity};
47
48fn analytics_job_json(job: &crate::PhysicalAnalyticsJob) -> JsonValue {
49 crate::presentation::admin_json::analytics_job_json(job)
50}
51
52#[cfg(test)]
53mod tests {
54 use super::*;
55 use crate::api::RedDBOptions;
56 use crate::health::HealthReport;
57 use crate::service_cli::{
58 TransportListenerFailure, TransportListenerState, TransportReadiness,
59 };
60
61 #[test]
62 fn server_options_default_http_body_limit_is_32_mib() {
63 assert_eq!(ServerOptions::default().max_body_bytes, 32 * 1024 * 1024);
64 }
65
66 #[test]
67 fn health_json_reports_transport_listeners() {
68 let runtime = RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime");
69 let mut options = ServerOptions::default();
70 options.transport_readiness = TransportReadiness {
71 active: vec![TransportListenerState {
72 transport: "grpc".to_string(),
73 bind_addr: "127.0.0.1:50051".to_string(),
74 explicit: true,
75 }],
76 failed: vec![TransportListenerFailure {
77 transport: "http".to_string(),
78 bind_addr: "127.0.0.1:5055".to_string(),
79 explicit: false,
80 reason: "http listener bind 127.0.0.1:5055: address in use".to_string(),
81 }],
82 };
83 let server = RedDBServer::with_options(runtime, options);
84
85 let payload = server.health_json_with_transport(&HealthReport::healthy());
86 let JsonValue::Object(root) = payload else {
87 panic!("health payload should be an object");
88 };
89 let Some(JsonValue::Object(listeners)) = root.get("transport_listeners") else {
90 panic!("health payload should include transport_listeners");
91 };
92 let Some(JsonValue::Array(active)) = listeners.get("active") else {
93 panic!("transport_listeners.active should be an array");
94 };
95 let Some(JsonValue::Array(failed)) = listeners.get("failed") else {
96 panic!("transport_listeners.failed should be an array");
97 };
98
99 assert_eq!(active.len(), 1);
100 assert_eq!(failed.len(), 1);
101 }
102}
103
104fn graph_projection_json(projection: &crate::PhysicalGraphProjection) -> JsonValue {
105 crate::presentation::admin_json::graph_projection_json(projection)
106}
107
108mod axum_edge;
109pub mod handlers_admin;
110mod handlers_admin_metrics;
111mod handlers_admin_status;
112mod handlers_ai;
113mod handlers_ai_model_cache;
114mod handlers_auth;
115mod handlers_backup;
116mod handlers_browser_auth;
117mod handlers_capabilities;
118mod handlers_collection_policy;
119mod handlers_ec;
120pub(crate) mod handlers_entity;
121mod handlers_failover;
122mod handlers_geo;
123mod handlers_graph;
124mod handlers_iam_policy;
125mod handlers_keyed;
126mod handlers_log;
127mod handlers_metrics;
128mod handlers_ops;
129mod handlers_ops_policy;
130mod ws_edge;
131pub mod ui_bridge;
135pub mod ui_bundle_resolver;
139pub mod ui_auth;
142mod ui_static;
146pub mod ui_deeplink;
151pub(crate) mod handlers_query;
156mod handlers_replication;
157mod handlers_topology;
158mod handlers_vcs;
159mod handlers_vector;
160pub mod header_escape_guard;
161pub mod http_connection_limiter;
162pub mod http_handler_metrics;
163pub mod http_limits;
164pub mod http_principal_limiter;
165pub mod ingest_pipeline;
166pub mod output_stream;
167mod patch_support;
168mod request_body;
169mod request_context;
170mod route_catalog;
171mod routes;
172mod routing;
173mod serverless_support;
174pub mod tls;
175mod transport;
176
177use self::handlers_ai::*;
178use self::handlers_entity::*;
179use self::handlers_graph::*;
180use self::handlers_keyed::*;
181use self::handlers_metrics::*;
182use self::handlers_ops::*;
183use self::handlers_query::*;
184use self::http_connection_limiter::{
185 HandlerDeadline, HttpConnectionLimiter, MonotonicClock, SystemMonotonicClock,
186};
187use self::http_handler_metrics::{HttpHandlerMetrics, HttpRejectReason, HttpTransport};
188pub use self::http_limits::{
189 HttpLimitsCliInput, HttpLimitsResolved, DEFAULT_HANDLER_TIMEOUT_MS, DEFAULT_RETRY_AFTER_SECS,
190};
191use self::http_principal_limiter::PrincipalConnectionLimiter;
192use self::patch_support::*;
193use self::request_body::*;
194use self::routing::*;
195use self::serverless_support::*;
196use self::transport::*;
197
198#[derive(Debug, Clone, Copy, PartialEq, Eq)]
204pub enum ServerSurface {
205 Public,
207 AdminOnly,
211 MetricsOnly,
215}
216
217#[derive(Debug, Clone)]
218pub struct ServerOptions {
219 pub bind_addr: String,
220 pub max_body_bytes: usize,
221 pub read_timeout_ms: u64,
222 pub write_timeout_ms: u64,
223 pub max_scan_limit: usize,
224 pub surface: ServerSurface,
228 pub transport_readiness: crate::service_cli::TransportReadiness,
229 pub websocket_allowed_origins: Vec<String>,
237 pub ui_dir: Option<std::path::PathBuf>,
246}
247
248pub const DEFAULT_HTTP_MAX_BODY_BYTES: usize = 32 * 1024 * 1024;
249
250impl Default for ServerOptions {
251 fn default() -> Self {
252 Self {
253 bind_addr: "127.0.0.1:5055".to_string(),
254 max_body_bytes: DEFAULT_HTTP_MAX_BODY_BYTES,
255 read_timeout_ms: 5_000,
256 write_timeout_ms: 5_000,
257 max_scan_limit: 1_000,
258 surface: ServerSurface::Public,
259 transport_readiness: crate::service_cli::TransportReadiness::default(),
260 websocket_allowed_origins: Vec::new(),
261 ui_dir: None,
262 }
263 }
264}
265
266pub struct ServerReplicationState {
268 pub config: crate::replication::ReplicationConfig,
269 pub primary: Option<crate::replication::primary::PrimaryReplication>,
270}
271
272#[derive(Clone)]
273pub struct RedDBServer {
274 runtime: RedDBRuntime,
275 options: ServerOptions,
276 auth_store: Option<Arc<AuthStore>>,
277 replication: Option<Arc<ServerReplicationState>>,
278 http_limiter: HttpConnectionLimiter,
283 handler_timeout: Duration,
289 handler_clock: Arc<dyn MonotonicClock>,
296 slow_inject_ms: Arc<AtomicU64>,
303 http_metrics: HttpHandlerMetrics,
309 retry_after_secs: u64,
313 principal_limiter: PrincipalConnectionLimiter,
321 pub(crate) stream_capacity: Arc<output_stream::StreamCapacityRegistry>,
329 pub(crate) lease_registry: Arc<output_stream::LeaseRegistry>,
334 pub(crate) cursor_registry: Arc<output_stream::CursorRegistry>,
339}
340
341const DEFAULT_HANDLER_TIMEOUT: Duration = Duration::from_millis(30_000);
343
344#[derive(Debug, Clone, Copy, PartialEq, Eq)]
345enum ServerlessWarmupScope {
346 Indexes,
347 GraphProjections,
348 AnalyticsJobs,
349 NativeArtifacts,
350}
351
352#[derive(Debug, Clone, Copy, PartialEq, Eq)]
353enum DeploymentProfile {
354 Embedded,
355 Server,
356 Serverless,
357}
358
359fn percent_decode_path_segment(input: &str) -> Result<String, String> {
360 let bytes = input.as_bytes();
361 let mut out = Vec::with_capacity(bytes.len());
362 let mut index = 0;
363 while index < bytes.len() {
364 match bytes[index] {
365 b'%' => {
366 if index + 2 >= bytes.len() {
367 return Err("truncated percent escape".to_string());
368 }
369 let high = hex_value(bytes[index + 1])
370 .ok_or_else(|| "invalid percent escape".to_string())?;
371 let low = hex_value(bytes[index + 2])
372 .ok_or_else(|| "invalid percent escape".to_string())?;
373 out.push((high << 4) | low);
374 index += 3;
375 }
376 byte => {
377 out.push(byte);
378 index += 1;
379 }
380 }
381 }
382 String::from_utf8(out).map_err(|_| "path segment is not valid UTF-8".to_string())
383}
384
385fn hex_value(byte: u8) -> Option<u8> {
386 match byte {
387 b'0'..=b'9' => Some(byte - b'0'),
388 b'a'..=b'f' => Some(byte - b'a' + 10),
389 b'A'..=b'F' => Some(byte - b'A' + 10),
390 _ => None,
391 }
392}
393
394#[derive(Debug, Clone)]
395struct ParsedQueryRequest {
396 query: String,
397 entity_types: Option<Vec<String>>,
398 capabilities: Option<Vec<String>>,
399 params: Option<Vec<Value>>,
403}
404
405#[derive(Debug, Clone, Copy)]
406enum PatchOperationType {
407 Set,
408 Replace,
409 Unset,
410}
411
412#[derive(Debug, Clone)]
413struct PatchOperation {
414 op: PatchOperationType,
415 path: Vec<String>,
416 value: Option<JsonValue>,
417}
418
419impl RedDBServer {
420 pub fn new(runtime: RedDBRuntime) -> Self {
421 Self::with_options(runtime, ServerOptions::default())
422 }
423
424 pub fn from_database_options(
425 db_options: RedDBOptions,
426 server_options: ServerOptions,
427 ) -> RedDBResult<Self> {
428 let runtime = RedDBRuntime::with_options(db_options)?;
429 Ok(Self::with_options(runtime, server_options))
430 }
431
432 pub fn with_options(runtime: RedDBRuntime, options: ServerOptions) -> Self {
433 Self {
434 runtime,
435 options,
436 auth_store: None,
437 replication: None,
438 http_limiter: HttpConnectionLimiter::with_default_cap(),
439 handler_timeout: DEFAULT_HANDLER_TIMEOUT,
440 handler_clock: Arc::new(SystemMonotonicClock::new()),
441 slow_inject_ms: Arc::new(AtomicU64::new(0)),
442 http_metrics: HttpHandlerMetrics::new(),
443 retry_after_secs: DEFAULT_RETRY_AFTER_SECS,
444 principal_limiter: PrincipalConnectionLimiter::new(
445 http_limits::DEFAULT_MAX_INFLIGHT_PER_PRINCIPAL,
446 ),
447 stream_capacity: output_stream::StreamCapacityRegistry::new(),
448 lease_registry: output_stream::LeaseRegistry::new(),
449 cursor_registry: output_stream::CursorRegistry::new(),
450 }
451 }
452
453 #[doc(hidden)]
454 pub fn stream_capacity(&self) -> &Arc<output_stream::StreamCapacityRegistry> {
455 &self.stream_capacity
456 }
457
458 #[doc(hidden)]
459 pub fn lease_registry(&self) -> &Arc<output_stream::LeaseRegistry> {
460 &self.lease_registry
461 }
462
463 #[doc(hidden)]
464 pub fn cursor_registry(&self) -> &Arc<output_stream::CursorRegistry> {
465 &self.cursor_registry
466 }
467
468 #[doc(hidden)]
469 pub fn http_metrics(&self) -> &HttpHandlerMetrics {
470 &self.http_metrics
471 }
472
473 #[doc(hidden)]
478 pub fn with_http_limiter_cap(mut self, cap: usize) -> Self {
479 self.http_limiter = HttpConnectionLimiter::new(cap);
480 self
481 }
482
483 pub fn with_http_limits(mut self, limits: HttpLimitsResolved) -> Self {
488 self.http_limiter = HttpConnectionLimiter::new(limits.max_handlers);
489 self.handler_timeout = Duration::from_millis(limits.handler_timeout_ms);
490 self.retry_after_secs = limits.retry_after_secs;
491 self.principal_limiter = PrincipalConnectionLimiter::new(limits.max_inflight_per_principal);
492 self
493 }
494
495 #[doc(hidden)]
500 pub fn with_principal_inflight_cap(mut self, cap: usize) -> Self {
501 self.principal_limiter = PrincipalConnectionLimiter::new(cap);
502 self
503 }
504
505 #[doc(hidden)]
506 pub fn principal_limiter(&self) -> &PrincipalConnectionLimiter {
507 &self.principal_limiter
508 }
509
510 #[doc(hidden)]
511 pub fn retry_after_secs(&self) -> u64 {
512 self.retry_after_secs
513 }
514
515 #[doc(hidden)]
516 pub fn http_limiter(&self) -> &HttpConnectionLimiter {
517 &self.http_limiter
518 }
519
520 #[doc(hidden)]
523 pub fn with_handler_timeout(mut self, timeout: Duration) -> Self {
524 self.handler_timeout = timeout;
525 self
526 }
527
528 #[doc(hidden)]
529 pub fn handler_timeout(&self) -> Duration {
530 self.handler_timeout
531 }
532
533 #[doc(hidden)]
538 pub fn with_handler_clock(mut self, clock: Arc<dyn MonotonicClock>) -> Self {
539 self.handler_clock = clock;
540 self
541 }
542
543 #[doc(hidden)]
549 pub fn set_test_slow_inject_ms(&self, ms: u64) {
550 self.slow_inject_ms.store(ms, Ordering::Relaxed);
551 }
552
553 pub fn with_auth(mut self, auth_store: Arc<AuthStore>) -> Self {
557 self.runtime.set_auth_store(Arc::clone(&auth_store));
558 self.auth_store = Some(auth_store);
559 self
560 }
561
562 pub fn with_replication(mut self, state: Arc<ServerReplicationState>) -> Self {
564 self.replication = Some(state);
565 self
566 }
567
568 pub fn with_websocket_allowed_origins(mut self, origins: Vec<String>) -> Self {
573 self.options.websocket_allowed_origins = origins;
574 self
575 }
576
577 pub(crate) fn websocket_allowed_origins(&self) -> &[String] {
579 &self.options.websocket_allowed_origins
580 }
581
582 pub fn with_ui_dir(mut self, dir: std::path::PathBuf) -> Self {
588 self.options.ui_dir = Some(dir);
589 self
590 }
591
592 pub(crate) fn ui_dir(&self) -> Option<&std::path::Path> {
595 self.options.ui_dir.as_deref()
596 }
597
598 pub fn with_browser_token_authority(
605 self,
606 authority: Arc<crate::auth::browser_token::BrowserTokenAuthority>,
607 ) -> Self {
608 self.runtime.set_browser_token_authority(Some(authority));
609 self
610 }
611
612 pub fn runtime(&self) -> &RedDBRuntime {
613 &self.runtime
614 }
615
616 pub fn options(&self) -> &ServerOptions {
617 &self.options
618 }
619
620 fn query_use_cases(&self) -> QueryUseCases<'_, RedDBRuntime> {
621 QueryUseCases::new(&self.runtime)
622 }
623
624 fn admin_use_cases(&self) -> AdminUseCases<'_, RedDBRuntime> {
625 AdminUseCases::new(&self.runtime)
626 }
627
628 fn entity_use_cases(&self) -> EntityUseCases<'_, RedDBRuntime> {
629 EntityUseCases::new(&self.runtime)
630 }
631
632 fn catalog_use_cases(&self) -> CatalogUseCases<'_, RedDBRuntime> {
633 CatalogUseCases::new(&self.runtime)
634 }
635
636 fn graph_use_cases(&self) -> GraphUseCases<'_, RedDBRuntime> {
637 GraphUseCases::new(&self.runtime)
638 }
639
640 fn native_use_cases(&self) -> NativeUseCases<'_, RedDBRuntime> {
641 NativeUseCases::new(&self.runtime)
642 }
643
644 fn tree_use_cases(&self) -> TreeUseCases<'_, RedDBRuntime> {
645 TreeUseCases::new(&self.runtime)
646 }
647
648 fn transport_readiness_json(&self) -> JsonValue {
649 let active = self
650 .options
651 .transport_readiness
652 .active
653 .iter()
654 .map(|listener| {
655 let mut object = Map::new();
656 object.insert(
657 "transport".to_string(),
658 JsonValue::String(listener.transport.clone()),
659 );
660 object.insert(
661 "bind_addr".to_string(),
662 JsonValue::String(listener.bind_addr.clone()),
663 );
664 object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
665 JsonValue::Object(object)
666 })
667 .collect();
668 let failed = self
669 .options
670 .transport_readiness
671 .failed
672 .iter()
673 .map(|listener| {
674 let mut object = Map::new();
675 object.insert(
676 "transport".to_string(),
677 JsonValue::String(listener.transport.clone()),
678 );
679 object.insert(
680 "bind_addr".to_string(),
681 JsonValue::String(listener.bind_addr.clone()),
682 );
683 object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
684 object.insert(
685 "reason".to_string(),
686 JsonValue::String(listener.reason.clone()),
687 );
688 JsonValue::Object(object)
689 })
690 .collect();
691
692 let mut object = Map::new();
693 object.insert("active".to_string(), JsonValue::Array(active));
694 object.insert("failed".to_string(), JsonValue::Array(failed));
695 JsonValue::Object(object)
696 }
697
698 fn handle_grpc_discovery(&self) -> HttpResponse {
699 let mut methods = Map::new();
700 methods.insert(
701 "query".to_string(),
702 JsonValue::String("reddb.v1.RedDB/Query".to_string()),
703 );
704 methods.insert(
705 "batch_query".to_string(),
706 JsonValue::String("reddb.v1.RedDB/BatchQuery".to_string()),
707 );
708 methods.insert(
709 "health".to_string(),
710 JsonValue::String("reddb.v1.RedDB/Health".to_string()),
711 );
712 methods.insert(
713 "prepare".to_string(),
714 JsonValue::String("reddb.v1.RedDB/Prepare".to_string()),
715 );
716 methods.insert(
717 "execute_prepared".to_string(),
718 JsonValue::String("reddb.v1.RedDB/ExecutePrepared".to_string()),
719 );
720
721 let mut examples = Map::new();
722 examples.insert(
723 "query".to_string(),
724 JsonValue::String(
725 "grpcurl -plaintext -d '{\"query\":\"SELECT 1\"}' 127.0.0.1:50051 reddb.v1.RedDB/Query"
726 .to_string(),
727 ),
728 );
729 examples.insert(
730 "query_with_params".to_string(),
731 JsonValue::String(
732 "grpcurl -plaintext -d '{\"query\":\"SELECT $1 AS value\",\"params\":[{\"intValue\":42}]}' 127.0.0.1:50051 reddb.v1.RedDB/Query"
733 .to_string(),
734 ),
735 );
736 examples.insert(
737 "health".to_string(),
738 JsonValue::String(
739 "grpcurl -plaintext -d '{}' 127.0.0.1:50051 reddb.v1.RedDB/Health".to_string(),
740 ),
741 );
742
743 let mut object = Map::new();
744 object.insert("ok".to_string(), JsonValue::Bool(true));
745 object.insert(
746 "service".to_string(),
747 JsonValue::String("reddb.v1.RedDB".to_string()),
748 );
749 object.insert(
750 "package".to_string(),
751 JsonValue::String("reddb.v1".to_string()),
752 );
753 object.insert(
754 "proto".to_string(),
755 JsonValue::String("crates/reddb-grpc-proto/proto/reddb.proto".to_string()),
756 );
757 object.insert("methods".to_string(), JsonValue::Object(methods));
758 object.insert("examples".to_string(), JsonValue::Object(examples));
759 object.insert(
760 "transport_listeners".to_string(),
761 self.transport_readiness_json(),
762 );
763 object.insert(
764 "hint".to_string(),
765 JsonValue::String(
766 "If grpcurl cannot list services, pass the proto file with -import-path crates/reddb-grpc-proto/proto -proto reddb.proto."
767 .to_string(),
768 ),
769 );
770 json_response(200, JsonValue::Object(object))
771 }
772
773 fn handle_query_contract(&self) -> HttpResponse {
774 let mut examples = Map::new();
775 examples.insert(
776 "raw_sql".to_string(),
777 JsonValue::String("curl -sS http://127.0.0.1:8080/query -d 'SELECT 1'".to_string()),
778 );
779 examples.insert(
780 "json_query".to_string(),
781 JsonValue::String(
782 "curl -sS http://127.0.0.1:8080/query -H 'content-type: application/json' -d '{\"query\":\"SELECT 1\"}'"
783 .to_string(),
784 ),
785 );
786 examples.insert(
787 "json_query_with_params".to_string(),
788 JsonValue::String(
789 "curl -sS http://127.0.0.1:8080/query -H 'content-type: application/json' -d '{\"query\":\"SELECT $1 AS value\",\"params\":[42]}'"
790 .to_string(),
791 ),
792 );
793
794 let mut request_body = Map::new();
795 request_body.insert(
796 "query".to_string(),
797 JsonValue::String("required string".to_string()),
798 );
799 request_body.insert(
800 "params".to_string(),
801 JsonValue::String("optional array".to_string()),
802 );
803
804 let mut response_shape = Map::new();
805 response_shape.insert(
806 "columns".to_string(),
807 JsonValue::String("projected column names".to_string()),
808 );
809 response_shape.insert(
810 "records[].values".to_string(),
811 JsonValue::String("only projected values".to_string()),
812 );
813 response_shape.insert(
814 "records[].meta".to_string(),
815 JsonValue::String("internal metadata when present".to_string()),
816 );
817
818 let mut object = Map::new();
819 object.insert("ok".to_string(), JsonValue::Bool(false));
820 object.insert(
821 "code".to_string(),
822 JsonValue::String("method_not_allowed".to_string()),
823 );
824 object.insert(
825 "message".to_string(),
826 JsonValue::String("/query accepts POST requests".to_string()),
827 );
828 object.insert(
829 "hint".to_string(),
830 JsonValue::String(
831 "Send raw SQL in the body, or JSON with a string 'query' field.".to_string(),
832 ),
833 );
834 object.insert("method".to_string(), JsonValue::String("POST".to_string()));
835 object.insert("path".to_string(), JsonValue::String("/query".to_string()));
836 object.insert("request_body".to_string(), JsonValue::Object(request_body));
837 object.insert(
838 "response_shape".to_string(),
839 JsonValue::Object(response_shape),
840 );
841 object.insert("examples".to_string(), JsonValue::Object(examples));
842 object.insert(
843 "docs".to_string(),
844 JsonValue::String("https://reddb.io/docs/query".to_string()),
845 );
846
847 json_response(405, JsonValue::Object(object))
848 .with_header("Allow", http::HeaderValue::from_static("POST"))
849 }
850
851 fn handle_root_discovery(&self) -> HttpResponse {
852 let mut endpoints = Map::new();
853 endpoints.insert(
854 "health".to_string(),
855 JsonValue::String("GET /health".to_string()),
856 );
857 endpoints.insert(
858 "ready".to_string(),
859 JsonValue::String("GET /ready".to_string()),
860 );
861 endpoints.insert(
862 "query".to_string(),
863 JsonValue::String("POST /query".to_string()),
864 );
865 endpoints.insert(
866 "query_readiness".to_string(),
867 JsonValue::String("GET /ready/query".to_string()),
868 );
869 endpoints.insert(
870 "catalog".to_string(),
871 JsonValue::String("GET /catalog".to_string()),
872 );
873 endpoints.insert(
874 "deployment_profiles".to_string(),
875 JsonValue::String("GET /deployment/profiles".to_string()),
876 );
877
878 let mut examples = Map::new();
879 examples.insert(
880 "http_raw_sql".to_string(),
881 JsonValue::String("curl -sS http://127.0.0.1:8080/query -d 'SELECT 1'".to_string()),
882 );
883 examples.insert(
884 "http_json_query".to_string(),
885 JsonValue::String(
886 "curl -sS http://127.0.0.1:8080/query -H 'content-type: application/json' -d '{\"query\":\"SELECT 1\"}'"
887 .to_string(),
888 ),
889 );
890 examples.insert(
891 "http_json_query_with_params".to_string(),
892 JsonValue::String(
893 "curl -sS http://127.0.0.1:8080/query -H 'content-type: application/json' -d '{\"query\":\"SELECT $1 AS value\",\"params\":[42]}'"
894 .to_string(),
895 ),
896 );
897
898 let mut object = Map::new();
899 object.insert("ok".to_string(), JsonValue::Bool(true));
900 object.insert(
901 "service".to_string(),
902 JsonValue::String("reddb".to_string()),
903 );
904 object.insert(
905 "version".to_string(),
906 JsonValue::String(env!("CARGO_PKG_VERSION").to_string()),
907 );
908 object.insert("endpoints".to_string(), JsonValue::Object(endpoints));
909 object.insert("examples".to_string(), JsonValue::Object(examples));
910 object.insert(
911 "docs".to_string(),
912 JsonValue::String("https://reddb.io/docs".to_string()),
913 );
914 object.insert(
915 "transport_listeners".to_string(),
916 self.transport_readiness_json(),
917 );
918 json_response(200, JsonValue::Object(object))
919 }
920
921 fn health_json_with_transport(&self, report: &HealthReport) -> JsonValue {
922 let mut value = crate::presentation::ops_json::health_json(report);
923 if let JsonValue::Object(ref mut object) = value {
924 object.insert(
925 "transport_listeners".to_string(),
926 self.transport_readiness_json(),
927 );
928 }
929 value
930 }
931
932 pub fn serve(&self) -> io::Result<()> {
933 let listener = TcpListener::bind(&self.options.bind_addr)?;
934 self.serve_on(listener)
935 }
936
937 pub fn serve_on(&self, listener: TcpListener) -> io::Result<()> {
944 let runtime = axum_edge::build_edge_runtime()?;
945 runtime.block_on(
946 self.clone()
947 .serve_edge_on_std(listener, HttpTransport::Http),
948 )
949 }
950
951 pub fn serve_one_on(&self, listener: TcpListener) -> io::Result<()> {
955 let runtime = axum_edge::build_background_edge_runtime()?;
956 let server = self.clone();
957 runtime.block_on(async move {
958 listener.set_nonblocking(true)?;
959 let listener = tokio::net::TcpListener::from_std(listener)?;
960 let (stream, _peer) = listener.accept().await?;
961 server.serve_edge_one(stream).await;
962 Ok(())
963 })
964 }
965
966 pub fn serve_in_background(&self) -> thread::JoinHandle<io::Result<()>> {
967 let server = self.clone();
968 thread::spawn(move || server.serve())
969 }
970
971 pub fn serve_in_background_on(
972 &self,
973 listener: TcpListener,
974 ) -> thread::JoinHandle<io::Result<()>> {
975 let server = self.clone();
976 thread::spawn(move || {
977 let runtime = axum_edge::build_background_edge_runtime()?;
978 runtime.block_on(server.serve_edge_on_std(listener, HttpTransport::Http))
979 })
980 }
981
982 pub fn serve_tls(&self, tls_config: std::sync::Arc<rustls::ServerConfig>) -> io::Result<()> {
986 let listener = TcpListener::bind(&self.options.bind_addr)?;
987 self.serve_tls_on(listener, tls_config)
988 }
989
990 pub fn serve_tls_on(
991 &self,
992 listener: TcpListener,
993 tls_config: std::sync::Arc<rustls::ServerConfig>,
994 ) -> io::Result<()> {
995 let runtime = axum_edge::build_edge_runtime()?;
996 let acceptor = axum_edge::tls_acceptor(tls_config);
997 runtime.block_on(self.clone().serve_edge_tls_on_std(
998 listener,
999 acceptor,
1000 HttpTransport::Https,
1001 ))
1002 }
1003
1004 pub fn serve_tls_in_background(
1005 &self,
1006 tls_config: std::sync::Arc<rustls::ServerConfig>,
1007 ) -> thread::JoinHandle<io::Result<()>> {
1008 let server = self.clone();
1009 thread::spawn(move || server.serve_tls(tls_config))
1010 }
1011
1012 pub fn serve_tls_in_background_on(
1013 &self,
1014 listener: TcpListener,
1015 tls_config: std::sync::Arc<rustls::ServerConfig>,
1016 ) -> thread::JoinHandle<io::Result<()>> {
1017 let server = self.clone();
1018 thread::spawn(move || {
1019 let runtime = axum_edge::build_background_edge_runtime()?;
1020 let acceptor = axum_edge::tls_acceptor(tls_config);
1021 runtime.block_on(server.serve_edge_tls_on_std(listener, acceptor, HttpTransport::Https))
1022 })
1023 }
1024
1025 fn handle_connection(&self, stream: TcpStream) -> io::Result<()> {
1026 let started = Instant::now();
1027 let result = self.handle_connection_inner(stream);
1028 let elapsed = started.elapsed().as_secs_f64();
1029 self.http_metrics
1030 .record_duration(HttpTransport::Http, elapsed);
1031 result
1032 }
1033
1034 fn handle_connection_inner(&self, mut stream: TcpStream) -> io::Result<()> {
1035 stream.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
1036 stream.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;
1037
1038 let deadline = HandlerDeadline::arm(Arc::clone(&self.handler_clock), self.handler_timeout);
1046
1047 let request = HttpRequest::read_from(&mut stream, self.options.max_body_bytes)?;
1048
1049 if deadline.expired() {
1051 self.http_metrics
1052 .record_reject(HttpTransport::Http, HttpRejectReason::HandlerTimeout);
1053 Self::write_handler_timeout_503(&mut stream);
1054 return Ok(());
1055 }
1056
1057 if self.try_route_streaming(&request, &mut stream)? {
1058 return Ok(());
1059 }
1060 let response = self.route(request);
1061
1062 let inject_ms = self.slow_inject_ms.load(Ordering::Relaxed);
1066 if inject_ms > 0 {
1067 thread::sleep(Duration::from_millis(inject_ms));
1068 }
1069
1070 if deadline.expired() {
1072 self.http_metrics
1073 .record_reject(HttpTransport::Http, HttpRejectReason::HandlerTimeout);
1074 Self::write_handler_timeout_503(&mut stream);
1075 return Ok(());
1076 }
1077
1078 stream.write_all(&response.to_http_bytes())?;
1079 stream.flush()?;
1080 Ok(())
1081 }
1082
1083 fn write_handler_timeout_503<S: Write>(stream: &mut S) {
1089 const RESPONSE: &[u8] = b"HTTP/1.1 503 Service Unavailable\r\n\
1090 Connection: close\r\n\
1091 Content-Length: 0\r\n\
1092 \r\n";
1093 let _ = stream.write_all(RESPONSE);
1094 let _ = stream.flush();
1095 }
1096}