1pub(crate) use crate::application::json_input::{
4 json_bool_field, json_f32_field, json_string_field, json_usize_field,
5};
6pub(crate) use crate::application::{
7 AdminUseCases, CatalogUseCases, CreateDocumentInput, CreateEdgeInput, CreateEntityOutput,
8 CreateKvInput, CreateNodeEmbeddingInput, CreateNodeGraphLinkInput, CreateNodeInput,
9 CreateNodeTableLinkInput, CreateRowInput, CreateVectorInput, DeleteEntityInput, EntityUseCases,
10 ExecuteQueryInput, ExplainQueryInput, GraphCentralityInput, GraphClusteringInput,
11 GraphCommunitiesInput, GraphComponentsInput, GraphCyclesInput, GraphHitsInput,
12 GraphNeighborhoodInput, GraphPersonalizedPageRankInput, GraphShortestPathInput,
13 GraphTopologicalSortInput, GraphTraversalInput, GraphUseCases, InspectNativeArtifactInput,
14 NativeUseCases, PatchEntityInput, PatchEntityOperation, PatchEntityOperationType,
15 QueryUseCases, SearchHybridInput, SearchIvfInput, SearchMultimodalInput, SearchSimilarInput,
16 SearchTextInput, TreeUseCases,
17};
18use std::collections::{BTreeMap, HashMap};
19use std::io::{self, Read, Write};
20use std::net::{TcpListener, TcpStream};
21use std::sync::atomic::{AtomicU64, Ordering};
22use std::thread;
23use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
24
25use std::sync::Arc;
26
27use crate::api::{RedDBError, RedDBOptions, RedDBResult};
28use crate::auth::store::AuthStore;
29use crate::catalog::{CatalogModelSnapshot, CollectionDescriptor, CollectionModel, SchemaMode};
30use crate::health::{HealthProvider, HealthReport, HealthState};
31use crate::json::{parse_json, to_vec as json_to_vec, Map, Value as JsonValue};
32use crate::runtime::{
33 RedDBRuntime, RuntimeFilter, RuntimeFilterValue, RuntimeGraphCentralityAlgorithm,
34 RuntimeGraphCentralityResult, RuntimeGraphClusteringResult, RuntimeGraphCommunityAlgorithm,
35 RuntimeGraphCommunityResult, RuntimeGraphComponentsMode, RuntimeGraphComponentsResult,
36 RuntimeGraphCyclesResult, RuntimeGraphDirection, RuntimeGraphHitsResult,
37 RuntimeGraphNeighborhoodResult, RuntimeGraphPathAlgorithm, RuntimeGraphPathResult,
38 RuntimeGraphPattern, RuntimeGraphProjection, RuntimeGraphTopologicalSortResult,
39 RuntimeGraphTraversalResult, RuntimeGraphTraversalStrategy, RuntimeIvfSearchResult,
40 RuntimeQueryWeights, RuntimeStats, ScanCursor, ScanPage,
41};
42use crate::storage::schema::Value;
43use crate::storage::unified::devx::refs::{NodeRef, TableRef, VectorRef};
44use crate::storage::unified::dsl::{MatchComponents, QueryResult as DslQueryResult};
45use crate::storage::unified::{MetadataValue, RefTarget, SparseVector};
46use crate::storage::{CrossRef, EntityData, EntityId, EntityKind, SimilarResult, UnifiedEntity};
47
48fn analytics_job_json(job: &crate::PhysicalAnalyticsJob) -> JsonValue {
49 crate::presentation::admin_json::analytics_job_json(job)
50}
51
52#[cfg(test)]
53mod tests {
54 use super::*;
55 use crate::api::RedDBOptions;
56 use crate::health::HealthReport;
57 use crate::service_cli::{
58 TransportListenerFailure, TransportListenerState, TransportReadiness,
59 };
60
61 #[test]
62 fn server_options_default_http_body_limit_is_32_mib() {
63 assert_eq!(ServerOptions::default().max_body_bytes, 32 * 1024 * 1024);
64 }
65
66 #[test]
67 fn health_json_reports_transport_listeners() {
68 let runtime = RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime");
69 let mut options = ServerOptions::default();
70 options.transport_readiness = TransportReadiness {
71 active: vec![TransportListenerState {
72 transport: "grpc".to_string(),
73 bind_addr: "127.0.0.1:50051".to_string(),
74 explicit: true,
75 }],
76 failed: vec![TransportListenerFailure {
77 transport: "http".to_string(),
78 bind_addr: "127.0.0.1:5055".to_string(),
79 explicit: false,
80 reason: "http listener bind 127.0.0.1:5055: address in use".to_string(),
81 }],
82 };
83 let server = RedDBServer::with_options(runtime, options);
84
85 let payload = server.health_json_with_transport(&HealthReport::healthy());
86 let JsonValue::Object(root) = payload else {
87 panic!("health payload should be an object");
88 };
89 let Some(JsonValue::Object(listeners)) = root.get("transport_listeners") else {
90 panic!("health payload should include transport_listeners");
91 };
92 let Some(JsonValue::Array(active)) = listeners.get("active") else {
93 panic!("transport_listeners.active should be an array");
94 };
95 let Some(JsonValue::Array(failed)) = listeners.get("failed") else {
96 panic!("transport_listeners.failed should be an array");
97 };
98
99 assert_eq!(active.len(), 1);
100 assert_eq!(failed.len(), 1);
101 }
102}
103
104fn graph_projection_json(projection: &crate::PhysicalGraphProjection) -> JsonValue {
105 crate::presentation::admin_json::graph_projection_json(projection)
106}
107
108mod axum_edge;
109pub mod handlers_admin;
110mod handlers_admin_metrics;
111mod handlers_admin_status;
112mod handlers_ai;
113mod handlers_ai_model_cache;
114mod handlers_auth;
115mod handlers_backup;
116mod handlers_browser_auth;
117mod handlers_capabilities;
118mod handlers_collection_policy;
119mod handlers_ec;
120pub(crate) mod handlers_entity;
121mod handlers_failover;
122mod handlers_geo;
123mod handlers_graph;
124mod handlers_iam_policy;
125mod handlers_keyed;
126mod handlers_log;
127mod handlers_metrics;
128mod handlers_ops;
129mod handlers_ops_policy;
130mod ws_edge;
131pub mod ui_bridge;
135pub mod ui_bundle_resolver;
139pub mod ui_auth;
142mod ui_static;
146pub mod ui_deeplink;
151pub(crate) mod handlers_query;
156mod handlers_replication;
157mod handlers_topology;
158mod handlers_vcs;
159mod handlers_vector;
160pub mod header_escape_guard;
161pub mod http_connection_limiter;
162pub mod http_handler_metrics;
163pub mod http_limits;
164pub mod http_principal_limiter;
165pub mod ingest_pipeline;
166pub mod output_stream;
167mod patch_support;
168mod request_body;
169mod request_context;
170mod routing;
171mod serverless_support;
172pub mod tls;
173mod transport;
174
175use self::handlers_ai::*;
176use self::handlers_entity::*;
177use self::handlers_graph::*;
178use self::handlers_keyed::*;
179use self::handlers_metrics::*;
180use self::handlers_ops::*;
181use self::handlers_query::*;
182use self::http_connection_limiter::{
183 HandlerDeadline, HttpConnectionLimiter, MonotonicClock, SystemMonotonicClock,
184};
185use self::http_handler_metrics::{HttpHandlerMetrics, HttpRejectReason, HttpTransport};
186pub use self::http_limits::{
187 HttpLimitsCliInput, HttpLimitsResolved, DEFAULT_HANDLER_TIMEOUT_MS, DEFAULT_RETRY_AFTER_SECS,
188};
189use self::http_principal_limiter::PrincipalConnectionLimiter;
190use self::patch_support::*;
191use self::request_body::*;
192use self::routing::*;
193use self::serverless_support::*;
194use self::transport::*;
195
196#[derive(Debug, Clone, Copy, PartialEq, Eq)]
202pub enum ServerSurface {
203 Public,
205 AdminOnly,
209 MetricsOnly,
213}
214
215#[derive(Debug, Clone)]
216pub struct ServerOptions {
217 pub bind_addr: String,
218 pub max_body_bytes: usize,
219 pub read_timeout_ms: u64,
220 pub write_timeout_ms: u64,
221 pub max_scan_limit: usize,
222 pub surface: ServerSurface,
226 pub transport_readiness: crate::service_cli::TransportReadiness,
227 pub websocket_allowed_origins: Vec<String>,
235 pub ui_dir: Option<std::path::PathBuf>,
244}
245
246pub const DEFAULT_HTTP_MAX_BODY_BYTES: usize = 32 * 1024 * 1024;
247
248impl Default for ServerOptions {
249 fn default() -> Self {
250 Self {
251 bind_addr: "127.0.0.1:5055".to_string(),
252 max_body_bytes: DEFAULT_HTTP_MAX_BODY_BYTES,
253 read_timeout_ms: 5_000,
254 write_timeout_ms: 5_000,
255 max_scan_limit: 1_000,
256 surface: ServerSurface::Public,
257 transport_readiness: crate::service_cli::TransportReadiness::default(),
258 websocket_allowed_origins: Vec::new(),
259 ui_dir: None,
260 }
261 }
262}
263
264pub struct ServerReplicationState {
266 pub config: crate::replication::ReplicationConfig,
267 pub primary: Option<crate::replication::primary::PrimaryReplication>,
268}
269
270#[derive(Clone)]
271pub struct RedDBServer {
272 runtime: RedDBRuntime,
273 options: ServerOptions,
274 auth_store: Option<Arc<AuthStore>>,
275 replication: Option<Arc<ServerReplicationState>>,
276 http_limiter: HttpConnectionLimiter,
281 handler_timeout: Duration,
287 handler_clock: Arc<dyn MonotonicClock>,
294 slow_inject_ms: Arc<AtomicU64>,
301 http_metrics: HttpHandlerMetrics,
307 retry_after_secs: u64,
311 principal_limiter: PrincipalConnectionLimiter,
319 pub(crate) stream_capacity: Arc<output_stream::StreamCapacityRegistry>,
327 pub(crate) lease_registry: Arc<output_stream::LeaseRegistry>,
332 pub(crate) cursor_registry: Arc<output_stream::CursorRegistry>,
337}
338
339const DEFAULT_HANDLER_TIMEOUT: Duration = Duration::from_millis(30_000);
341
342#[derive(Debug, Clone, Copy, PartialEq, Eq)]
343enum ServerlessWarmupScope {
344 Indexes,
345 GraphProjections,
346 AnalyticsJobs,
347 NativeArtifacts,
348}
349
350#[derive(Debug, Clone, Copy, PartialEq, Eq)]
351enum DeploymentProfile {
352 Embedded,
353 Server,
354 Serverless,
355}
356
357fn percent_decode_path_segment(input: &str) -> Result<String, String> {
358 let bytes = input.as_bytes();
359 let mut out = Vec::with_capacity(bytes.len());
360 let mut index = 0;
361 while index < bytes.len() {
362 match bytes[index] {
363 b'%' => {
364 if index + 2 >= bytes.len() {
365 return Err("truncated percent escape".to_string());
366 }
367 let high = hex_value(bytes[index + 1])
368 .ok_or_else(|| "invalid percent escape".to_string())?;
369 let low = hex_value(bytes[index + 2])
370 .ok_or_else(|| "invalid percent escape".to_string())?;
371 out.push((high << 4) | low);
372 index += 3;
373 }
374 byte => {
375 out.push(byte);
376 index += 1;
377 }
378 }
379 }
380 String::from_utf8(out).map_err(|_| "path segment is not valid UTF-8".to_string())
381}
382
383fn hex_value(byte: u8) -> Option<u8> {
384 match byte {
385 b'0'..=b'9' => Some(byte - b'0'),
386 b'a'..=b'f' => Some(byte - b'a' + 10),
387 b'A'..=b'F' => Some(byte - b'A' + 10),
388 _ => None,
389 }
390}
391
392#[derive(Debug, Clone)]
393struct ParsedQueryRequest {
394 query: String,
395 entity_types: Option<Vec<String>>,
396 capabilities: Option<Vec<String>>,
397 params: Option<Vec<Value>>,
401}
402
403#[derive(Debug, Clone, Copy)]
404enum PatchOperationType {
405 Set,
406 Replace,
407 Unset,
408}
409
410#[derive(Debug, Clone)]
411struct PatchOperation {
412 op: PatchOperationType,
413 path: Vec<String>,
414 value: Option<JsonValue>,
415}
416
417impl RedDBServer {
418 pub fn new(runtime: RedDBRuntime) -> Self {
419 Self::with_options(runtime, ServerOptions::default())
420 }
421
422 pub fn from_database_options(
423 db_options: RedDBOptions,
424 server_options: ServerOptions,
425 ) -> RedDBResult<Self> {
426 let runtime = RedDBRuntime::with_options(db_options)?;
427 Ok(Self::with_options(runtime, server_options))
428 }
429
430 pub fn with_options(runtime: RedDBRuntime, options: ServerOptions) -> Self {
431 Self {
432 runtime,
433 options,
434 auth_store: None,
435 replication: None,
436 http_limiter: HttpConnectionLimiter::with_default_cap(),
437 handler_timeout: DEFAULT_HANDLER_TIMEOUT,
438 handler_clock: Arc::new(SystemMonotonicClock::new()),
439 slow_inject_ms: Arc::new(AtomicU64::new(0)),
440 http_metrics: HttpHandlerMetrics::new(),
441 retry_after_secs: DEFAULT_RETRY_AFTER_SECS,
442 principal_limiter: PrincipalConnectionLimiter::new(
443 http_limits::DEFAULT_MAX_INFLIGHT_PER_PRINCIPAL,
444 ),
445 stream_capacity: output_stream::StreamCapacityRegistry::new(),
446 lease_registry: output_stream::LeaseRegistry::new(),
447 cursor_registry: output_stream::CursorRegistry::new(),
448 }
449 }
450
451 #[doc(hidden)]
452 pub fn stream_capacity(&self) -> &Arc<output_stream::StreamCapacityRegistry> {
453 &self.stream_capacity
454 }
455
456 #[doc(hidden)]
457 pub fn lease_registry(&self) -> &Arc<output_stream::LeaseRegistry> {
458 &self.lease_registry
459 }
460
461 #[doc(hidden)]
462 pub fn cursor_registry(&self) -> &Arc<output_stream::CursorRegistry> {
463 &self.cursor_registry
464 }
465
466 #[doc(hidden)]
467 pub fn http_metrics(&self) -> &HttpHandlerMetrics {
468 &self.http_metrics
469 }
470
471 #[doc(hidden)]
476 pub fn with_http_limiter_cap(mut self, cap: usize) -> Self {
477 self.http_limiter = HttpConnectionLimiter::new(cap);
478 self
479 }
480
481 pub fn with_http_limits(mut self, limits: HttpLimitsResolved) -> Self {
486 self.http_limiter = HttpConnectionLimiter::new(limits.max_handlers);
487 self.handler_timeout = Duration::from_millis(limits.handler_timeout_ms);
488 self.retry_after_secs = limits.retry_after_secs;
489 self.principal_limiter = PrincipalConnectionLimiter::new(limits.max_inflight_per_principal);
490 self
491 }
492
493 #[doc(hidden)]
498 pub fn with_principal_inflight_cap(mut self, cap: usize) -> Self {
499 self.principal_limiter = PrincipalConnectionLimiter::new(cap);
500 self
501 }
502
503 #[doc(hidden)]
504 pub fn principal_limiter(&self) -> &PrincipalConnectionLimiter {
505 &self.principal_limiter
506 }
507
508 #[doc(hidden)]
509 pub fn retry_after_secs(&self) -> u64 {
510 self.retry_after_secs
511 }
512
513 #[doc(hidden)]
514 pub fn http_limiter(&self) -> &HttpConnectionLimiter {
515 &self.http_limiter
516 }
517
518 #[doc(hidden)]
521 pub fn with_handler_timeout(mut self, timeout: Duration) -> Self {
522 self.handler_timeout = timeout;
523 self
524 }
525
526 #[doc(hidden)]
527 pub fn handler_timeout(&self) -> Duration {
528 self.handler_timeout
529 }
530
531 #[doc(hidden)]
536 pub fn with_handler_clock(mut self, clock: Arc<dyn MonotonicClock>) -> Self {
537 self.handler_clock = clock;
538 self
539 }
540
541 #[doc(hidden)]
547 pub fn set_test_slow_inject_ms(&self, ms: u64) {
548 self.slow_inject_ms.store(ms, Ordering::Relaxed);
549 }
550
551 pub fn with_auth(mut self, auth_store: Arc<AuthStore>) -> Self {
555 self.runtime.set_auth_store(Arc::clone(&auth_store));
556 self.auth_store = Some(auth_store);
557 self
558 }
559
560 pub fn with_replication(mut self, state: Arc<ServerReplicationState>) -> Self {
562 self.replication = Some(state);
563 self
564 }
565
566 pub fn with_websocket_allowed_origins(mut self, origins: Vec<String>) -> Self {
571 self.options.websocket_allowed_origins = origins;
572 self
573 }
574
575 pub(crate) fn websocket_allowed_origins(&self) -> &[String] {
577 &self.options.websocket_allowed_origins
578 }
579
580 pub fn with_ui_dir(mut self, dir: std::path::PathBuf) -> Self {
586 self.options.ui_dir = Some(dir);
587 self
588 }
589
590 pub(crate) fn ui_dir(&self) -> Option<&std::path::Path> {
593 self.options.ui_dir.as_deref()
594 }
595
596 pub fn with_browser_token_authority(
603 self,
604 authority: Arc<crate::auth::browser_token::BrowserTokenAuthority>,
605 ) -> Self {
606 self.runtime.set_browser_token_authority(Some(authority));
607 self
608 }
609
610 pub fn runtime(&self) -> &RedDBRuntime {
611 &self.runtime
612 }
613
614 pub fn options(&self) -> &ServerOptions {
615 &self.options
616 }
617
618 fn query_use_cases(&self) -> QueryUseCases<'_, RedDBRuntime> {
619 QueryUseCases::new(&self.runtime)
620 }
621
622 fn admin_use_cases(&self) -> AdminUseCases<'_, RedDBRuntime> {
623 AdminUseCases::new(&self.runtime)
624 }
625
626 fn entity_use_cases(&self) -> EntityUseCases<'_, RedDBRuntime> {
627 EntityUseCases::new(&self.runtime)
628 }
629
630 fn catalog_use_cases(&self) -> CatalogUseCases<'_, RedDBRuntime> {
631 CatalogUseCases::new(&self.runtime)
632 }
633
634 fn graph_use_cases(&self) -> GraphUseCases<'_, RedDBRuntime> {
635 GraphUseCases::new(&self.runtime)
636 }
637
638 fn native_use_cases(&self) -> NativeUseCases<'_, RedDBRuntime> {
639 NativeUseCases::new(&self.runtime)
640 }
641
642 fn tree_use_cases(&self) -> TreeUseCases<'_, RedDBRuntime> {
643 TreeUseCases::new(&self.runtime)
644 }
645
646 fn transport_readiness_json(&self) -> JsonValue {
647 let active = self
648 .options
649 .transport_readiness
650 .active
651 .iter()
652 .map(|listener| {
653 let mut object = Map::new();
654 object.insert(
655 "transport".to_string(),
656 JsonValue::String(listener.transport.clone()),
657 );
658 object.insert(
659 "bind_addr".to_string(),
660 JsonValue::String(listener.bind_addr.clone()),
661 );
662 object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
663 JsonValue::Object(object)
664 })
665 .collect();
666 let failed = self
667 .options
668 .transport_readiness
669 .failed
670 .iter()
671 .map(|listener| {
672 let mut object = Map::new();
673 object.insert(
674 "transport".to_string(),
675 JsonValue::String(listener.transport.clone()),
676 );
677 object.insert(
678 "bind_addr".to_string(),
679 JsonValue::String(listener.bind_addr.clone()),
680 );
681 object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
682 object.insert(
683 "reason".to_string(),
684 JsonValue::String(listener.reason.clone()),
685 );
686 JsonValue::Object(object)
687 })
688 .collect();
689
690 let mut object = Map::new();
691 object.insert("active".to_string(), JsonValue::Array(active));
692 object.insert("failed".to_string(), JsonValue::Array(failed));
693 JsonValue::Object(object)
694 }
695
696 fn handle_grpc_discovery(&self) -> HttpResponse {
697 let mut methods = Map::new();
698 methods.insert(
699 "query".to_string(),
700 JsonValue::String("reddb.v1.RedDB/Query".to_string()),
701 );
702 methods.insert(
703 "batch_query".to_string(),
704 JsonValue::String("reddb.v1.RedDB/BatchQuery".to_string()),
705 );
706 methods.insert(
707 "health".to_string(),
708 JsonValue::String("reddb.v1.RedDB/Health".to_string()),
709 );
710 methods.insert(
711 "prepare".to_string(),
712 JsonValue::String("reddb.v1.RedDB/Prepare".to_string()),
713 );
714 methods.insert(
715 "execute_prepared".to_string(),
716 JsonValue::String("reddb.v1.RedDB/ExecutePrepared".to_string()),
717 );
718
719 let mut examples = Map::new();
720 examples.insert(
721 "query".to_string(),
722 JsonValue::String(
723 "grpcurl -plaintext -d '{\"query\":\"SELECT 1\"}' 127.0.0.1:50051 reddb.v1.RedDB/Query"
724 .to_string(),
725 ),
726 );
727 examples.insert(
728 "query_with_params".to_string(),
729 JsonValue::String(
730 "grpcurl -plaintext -d '{\"query\":\"SELECT $1 AS value\",\"params\":[{\"intValue\":42}]}' 127.0.0.1:50051 reddb.v1.RedDB/Query"
731 .to_string(),
732 ),
733 );
734 examples.insert(
735 "health".to_string(),
736 JsonValue::String(
737 "grpcurl -plaintext -d '{}' 127.0.0.1:50051 reddb.v1.RedDB/Health".to_string(),
738 ),
739 );
740
741 let mut object = Map::new();
742 object.insert("ok".to_string(), JsonValue::Bool(true));
743 object.insert(
744 "service".to_string(),
745 JsonValue::String("reddb.v1.RedDB".to_string()),
746 );
747 object.insert(
748 "package".to_string(),
749 JsonValue::String("reddb.v1".to_string()),
750 );
751 object.insert(
752 "proto".to_string(),
753 JsonValue::String("crates/reddb-grpc-proto/proto/reddb.proto".to_string()),
754 );
755 object.insert("methods".to_string(), JsonValue::Object(methods));
756 object.insert("examples".to_string(), JsonValue::Object(examples));
757 object.insert(
758 "transport_listeners".to_string(),
759 self.transport_readiness_json(),
760 );
761 object.insert(
762 "hint".to_string(),
763 JsonValue::String(
764 "If grpcurl cannot list services, pass the proto file with -import-path crates/reddb-grpc-proto/proto -proto reddb.proto."
765 .to_string(),
766 ),
767 );
768 json_response(200, JsonValue::Object(object))
769 }
770
771 fn handle_query_contract(&self) -> HttpResponse {
772 let mut examples = Map::new();
773 examples.insert(
774 "raw_sql".to_string(),
775 JsonValue::String("curl -sS http://127.0.0.1:8080/query -d 'SELECT 1'".to_string()),
776 );
777 examples.insert(
778 "json_query".to_string(),
779 JsonValue::String(
780 "curl -sS http://127.0.0.1:8080/query -H 'content-type: application/json' -d '{\"query\":\"SELECT 1\"}'"
781 .to_string(),
782 ),
783 );
784 examples.insert(
785 "json_query_with_params".to_string(),
786 JsonValue::String(
787 "curl -sS http://127.0.0.1:8080/query -H 'content-type: application/json' -d '{\"query\":\"SELECT $1 AS value\",\"params\":[42]}'"
788 .to_string(),
789 ),
790 );
791
792 let mut request_body = Map::new();
793 request_body.insert(
794 "query".to_string(),
795 JsonValue::String("required string".to_string()),
796 );
797 request_body.insert(
798 "params".to_string(),
799 JsonValue::String("optional array".to_string()),
800 );
801
802 let mut response_shape = Map::new();
803 response_shape.insert(
804 "columns".to_string(),
805 JsonValue::String("projected column names".to_string()),
806 );
807 response_shape.insert(
808 "records[].values".to_string(),
809 JsonValue::String("only projected values".to_string()),
810 );
811 response_shape.insert(
812 "records[].meta".to_string(),
813 JsonValue::String("internal metadata when present".to_string()),
814 );
815
816 let mut object = Map::new();
817 object.insert("ok".to_string(), JsonValue::Bool(false));
818 object.insert(
819 "code".to_string(),
820 JsonValue::String("method_not_allowed".to_string()),
821 );
822 object.insert(
823 "message".to_string(),
824 JsonValue::String("/query accepts POST requests".to_string()),
825 );
826 object.insert(
827 "hint".to_string(),
828 JsonValue::String(
829 "Send raw SQL in the body, or JSON with a string 'query' field.".to_string(),
830 ),
831 );
832 object.insert("method".to_string(), JsonValue::String("POST".to_string()));
833 object.insert("path".to_string(), JsonValue::String("/query".to_string()));
834 object.insert("request_body".to_string(), JsonValue::Object(request_body));
835 object.insert(
836 "response_shape".to_string(),
837 JsonValue::Object(response_shape),
838 );
839 object.insert("examples".to_string(), JsonValue::Object(examples));
840 object.insert(
841 "docs".to_string(),
842 JsonValue::String("https://reddb.io/docs/query".to_string()),
843 );
844
845 json_response(405, JsonValue::Object(object))
846 .with_header("Allow", http::HeaderValue::from_static("POST"))
847 }
848
849 fn handle_root_discovery(&self) -> HttpResponse {
850 let mut endpoints = Map::new();
851 endpoints.insert(
852 "health".to_string(),
853 JsonValue::String("GET /health".to_string()),
854 );
855 endpoints.insert(
856 "ready".to_string(),
857 JsonValue::String("GET /ready".to_string()),
858 );
859 endpoints.insert(
860 "query".to_string(),
861 JsonValue::String("POST /query".to_string()),
862 );
863 endpoints.insert(
864 "query_readiness".to_string(),
865 JsonValue::String("GET /ready/query".to_string()),
866 );
867 endpoints.insert(
868 "catalog".to_string(),
869 JsonValue::String("GET /catalog".to_string()),
870 );
871 endpoints.insert(
872 "deployment_profiles".to_string(),
873 JsonValue::String("GET /deployment/profiles".to_string()),
874 );
875
876 let mut examples = Map::new();
877 examples.insert(
878 "http_raw_sql".to_string(),
879 JsonValue::String("curl -sS http://127.0.0.1:8080/query -d 'SELECT 1'".to_string()),
880 );
881 examples.insert(
882 "http_json_query".to_string(),
883 JsonValue::String(
884 "curl -sS http://127.0.0.1:8080/query -H 'content-type: application/json' -d '{\"query\":\"SELECT 1\"}'"
885 .to_string(),
886 ),
887 );
888 examples.insert(
889 "http_json_query_with_params".to_string(),
890 JsonValue::String(
891 "curl -sS http://127.0.0.1:8080/query -H 'content-type: application/json' -d '{\"query\":\"SELECT $1 AS value\",\"params\":[42]}'"
892 .to_string(),
893 ),
894 );
895
896 let mut object = Map::new();
897 object.insert("ok".to_string(), JsonValue::Bool(true));
898 object.insert(
899 "service".to_string(),
900 JsonValue::String("reddb".to_string()),
901 );
902 object.insert(
903 "version".to_string(),
904 JsonValue::String(env!("CARGO_PKG_VERSION").to_string()),
905 );
906 object.insert("endpoints".to_string(), JsonValue::Object(endpoints));
907 object.insert("examples".to_string(), JsonValue::Object(examples));
908 object.insert(
909 "docs".to_string(),
910 JsonValue::String("https://reddb.io/docs".to_string()),
911 );
912 object.insert(
913 "transport_listeners".to_string(),
914 self.transport_readiness_json(),
915 );
916 json_response(200, JsonValue::Object(object))
917 }
918
919 fn health_json_with_transport(&self, report: &HealthReport) -> JsonValue {
920 let mut value = crate::presentation::ops_json::health_json(report);
921 if let JsonValue::Object(ref mut object) = value {
922 object.insert(
923 "transport_listeners".to_string(),
924 self.transport_readiness_json(),
925 );
926 }
927 value
928 }
929
930 pub fn serve(&self) -> io::Result<()> {
931 let listener = TcpListener::bind(&self.options.bind_addr)?;
932 self.serve_on(listener)
933 }
934
935 pub fn serve_on(&self, listener: TcpListener) -> io::Result<()> {
942 let runtime = axum_edge::build_edge_runtime()?;
943 runtime.block_on(
944 self.clone()
945 .serve_edge_on_std(listener, HttpTransport::Http),
946 )
947 }
948
949 pub fn serve_one_on(&self, listener: TcpListener) -> io::Result<()> {
953 let runtime = axum_edge::build_background_edge_runtime()?;
954 let server = self.clone();
955 runtime.block_on(async move {
956 listener.set_nonblocking(true)?;
957 let listener = tokio::net::TcpListener::from_std(listener)?;
958 let (stream, _peer) = listener.accept().await?;
959 server.serve_edge_one(stream).await;
960 Ok(())
961 })
962 }
963
964 pub fn serve_in_background(&self) -> thread::JoinHandle<io::Result<()>> {
965 let server = self.clone();
966 thread::spawn(move || server.serve())
967 }
968
969 pub fn serve_in_background_on(
970 &self,
971 listener: TcpListener,
972 ) -> thread::JoinHandle<io::Result<()>> {
973 let server = self.clone();
974 thread::spawn(move || {
975 let runtime = axum_edge::build_background_edge_runtime()?;
976 runtime.block_on(server.serve_edge_on_std(listener, HttpTransport::Http))
977 })
978 }
979
980 pub fn serve_tls(&self, tls_config: std::sync::Arc<rustls::ServerConfig>) -> io::Result<()> {
984 let listener = TcpListener::bind(&self.options.bind_addr)?;
985 self.serve_tls_on(listener, tls_config)
986 }
987
988 pub fn serve_tls_on(
989 &self,
990 listener: TcpListener,
991 tls_config: std::sync::Arc<rustls::ServerConfig>,
992 ) -> io::Result<()> {
993 let runtime = axum_edge::build_edge_runtime()?;
994 let acceptor = axum_edge::tls_acceptor(tls_config);
995 runtime.block_on(self.clone().serve_edge_tls_on_std(
996 listener,
997 acceptor,
998 HttpTransport::Https,
999 ))
1000 }
1001
1002 pub fn serve_tls_in_background(
1003 &self,
1004 tls_config: std::sync::Arc<rustls::ServerConfig>,
1005 ) -> thread::JoinHandle<io::Result<()>> {
1006 let server = self.clone();
1007 thread::spawn(move || server.serve_tls(tls_config))
1008 }
1009
1010 pub fn serve_tls_in_background_on(
1011 &self,
1012 listener: TcpListener,
1013 tls_config: std::sync::Arc<rustls::ServerConfig>,
1014 ) -> thread::JoinHandle<io::Result<()>> {
1015 let server = self.clone();
1016 thread::spawn(move || {
1017 let runtime = axum_edge::build_background_edge_runtime()?;
1018 let acceptor = axum_edge::tls_acceptor(tls_config);
1019 runtime.block_on(server.serve_edge_tls_on_std(listener, acceptor, HttpTransport::Https))
1020 })
1021 }
1022
1023 fn handle_connection(&self, stream: TcpStream) -> io::Result<()> {
1024 let started = Instant::now();
1025 let result = self.handle_connection_inner(stream);
1026 let elapsed = started.elapsed().as_secs_f64();
1027 self.http_metrics
1028 .record_duration(HttpTransport::Http, elapsed);
1029 result
1030 }
1031
1032 fn handle_connection_inner(&self, mut stream: TcpStream) -> io::Result<()> {
1033 stream.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
1034 stream.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;
1035
1036 let deadline = HandlerDeadline::arm(Arc::clone(&self.handler_clock), self.handler_timeout);
1044
1045 let request = HttpRequest::read_from(&mut stream, self.options.max_body_bytes)?;
1046
1047 if deadline.expired() {
1049 self.http_metrics
1050 .record_reject(HttpTransport::Http, HttpRejectReason::HandlerTimeout);
1051 Self::write_handler_timeout_503(&mut stream);
1052 return Ok(());
1053 }
1054
1055 if self.try_route_streaming(&request, &mut stream)? {
1056 return Ok(());
1057 }
1058 let response = self.route(request);
1059
1060 let inject_ms = self.slow_inject_ms.load(Ordering::Relaxed);
1064 if inject_ms > 0 {
1065 thread::sleep(Duration::from_millis(inject_ms));
1066 }
1067
1068 if deadline.expired() {
1070 self.http_metrics
1071 .record_reject(HttpTransport::Http, HttpRejectReason::HandlerTimeout);
1072 Self::write_handler_timeout_503(&mut stream);
1073 return Ok(());
1074 }
1075
1076 stream.write_all(&response.to_http_bytes())?;
1077 stream.flush()?;
1078 Ok(())
1079 }
1080
1081 fn write_handler_timeout_503<S: Write>(stream: &mut S) {
1087 const RESPONSE: &[u8] = b"HTTP/1.1 503 Service Unavailable\r\n\
1088 Connection: close\r\n\
1089 Content-Length: 0\r\n\
1090 \r\n";
1091 let _ = stream.write_all(RESPONSE);
1092 let _ = stream.flush();
1093 }
1094}