1pub(crate) use crate::application::json_input::{
4 json_bool_field, json_f32_field, json_string_field, json_usize_field,
5};
6pub(crate) use crate::application::{
7 AdminUseCases, CatalogUseCases, CreateDocumentInput, CreateEdgeInput, CreateEntityOutput,
8 CreateKvInput, CreateNodeEmbeddingInput, CreateNodeGraphLinkInput, CreateNodeInput,
9 CreateNodeTableLinkInput, CreateRowInput, CreateVectorInput, DeleteEntityInput, EntityUseCases,
10 ExecuteQueryInput, ExplainQueryInput, GraphCentralityInput, GraphClusteringInput,
11 GraphCommunitiesInput, GraphComponentsInput, GraphCyclesInput, GraphHitsInput,
12 GraphNeighborhoodInput, GraphPersonalizedPageRankInput, GraphShortestPathInput,
13 GraphTopologicalSortInput, GraphTraversalInput, GraphUseCases, InspectNativeArtifactInput,
14 NativeUseCases, PatchEntityInput, PatchEntityOperation, PatchEntityOperationType,
15 QueryUseCases, SearchHybridInput, SearchIvfInput, SearchMultimodalInput, SearchSimilarInput,
16 SearchTextInput, TreeUseCases,
17};
18use std::collections::{BTreeMap, HashMap};
19use std::io::{self, Read, Write};
20use std::net::{TcpListener, TcpStream};
21use std::sync::atomic::{AtomicU64, Ordering};
22use std::thread;
23use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
24
25use std::sync::Arc;
26
27use crate::api::{RedDBError, RedDBOptions, RedDBResult};
28use crate::auth::store::AuthStore;
29use crate::catalog::{CatalogModelSnapshot, CollectionDescriptor, CollectionModel, SchemaMode};
30use crate::health::{HealthProvider, HealthReport, HealthState};
31use crate::json::{parse_json, to_vec as json_to_vec, Map, Value as JsonValue};
32use crate::runtime::{
33 RedDBRuntime, RuntimeFilter, RuntimeFilterValue, RuntimeGraphCentralityAlgorithm,
34 RuntimeGraphCentralityResult, RuntimeGraphClusteringResult, RuntimeGraphCommunityAlgorithm,
35 RuntimeGraphCommunityResult, RuntimeGraphComponentsMode, RuntimeGraphComponentsResult,
36 RuntimeGraphCyclesResult, RuntimeGraphDirection, RuntimeGraphHitsResult,
37 RuntimeGraphNeighborhoodResult, RuntimeGraphPathAlgorithm, RuntimeGraphPathResult,
38 RuntimeGraphPattern, RuntimeGraphProjection, RuntimeGraphTopologicalSortResult,
39 RuntimeGraphTraversalResult, RuntimeGraphTraversalStrategy, RuntimeIvfSearchResult,
40 RuntimeQueryWeights, RuntimeStats, ScanCursor, ScanPage,
41};
42use crate::storage::schema::Value;
43use crate::storage::unified::devx::refs::{NodeRef, TableRef, VectorRef};
44use crate::storage::unified::dsl::{MatchComponents, QueryResult as DslQueryResult};
45use crate::storage::unified::{MetadataValue, RefTarget, SparseVector};
46use crate::storage::{CrossRef, EntityData, EntityId, EntityKind, SimilarResult, UnifiedEntity};
47
48fn analytics_job_json(job: &crate::PhysicalAnalyticsJob) -> JsonValue {
49 crate::presentation::admin_json::analytics_job_json(job)
50}
51
52#[cfg(test)]
53mod tests {
54 use super::*;
55 use crate::api::RedDBOptions;
56 use crate::health::HealthReport;
57 use crate::service_cli::{
58 TransportListenerFailure, TransportListenerState, TransportReadiness,
59 };
60
61 #[test]
62 fn server_options_default_http_body_limit_is_32_mib() {
63 assert_eq!(ServerOptions::default().max_body_bytes, 32 * 1024 * 1024);
64 }
65
66 #[test]
67 fn health_json_reports_transport_listeners() {
68 let runtime = RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime");
69 let mut options = ServerOptions::default();
70 options.transport_readiness = TransportReadiness {
71 active: vec![TransportListenerState {
72 transport: "grpc".to_string(),
73 bind_addr: "127.0.0.1:50051".to_string(),
74 explicit: true,
75 }],
76 failed: vec![TransportListenerFailure {
77 transport: "http".to_string(),
78 bind_addr: "127.0.0.1:5055".to_string(),
79 explicit: false,
80 reason: "http listener bind 127.0.0.1:5055: address in use".to_string(),
81 }],
82 };
83 let server = RedDBServer::with_options(runtime, options);
84
85 let payload = server.health_json_with_transport(&HealthReport::healthy());
86 let JsonValue::Object(root) = payload else {
87 panic!("health payload should be an object");
88 };
89 let Some(JsonValue::Object(listeners)) = root.get("transport_listeners") else {
90 panic!("health payload should include transport_listeners");
91 };
92 let Some(JsonValue::Array(active)) = listeners.get("active") else {
93 panic!("transport_listeners.active should be an array");
94 };
95 let Some(JsonValue::Array(failed)) = listeners.get("failed") else {
96 panic!("transport_listeners.failed should be an array");
97 };
98
99 assert_eq!(active.len(), 1);
100 assert_eq!(failed.len(), 1);
101 }
102}
103
104fn graph_projection_json(projection: &crate::PhysicalGraphProjection) -> JsonValue {
105 crate::presentation::admin_json::graph_projection_json(projection)
106}
107
108mod axum_edge;
109pub mod handlers_admin;
110mod handlers_admin_metrics;
111mod handlers_admin_status;
112mod handlers_ai;
113mod handlers_ai_model_cache;
114mod handlers_auth;
115mod handlers_backup;
116mod handlers_browser_auth;
117mod handlers_capabilities;
118mod handlers_collection_policy;
119mod handlers_ec;
120pub(crate) mod handlers_entity;
121mod handlers_failover;
122mod handlers_geo;
123mod handlers_graph;
124mod handlers_iam_policy;
125mod handlers_keyed;
126mod handlers_log;
127mod handlers_metrics;
128mod handlers_ops;
129mod handlers_ops_policy;
130mod ws_edge;
131pub(crate) mod handlers_query;
136mod handlers_replication;
137mod handlers_topology;
138mod handlers_vcs;
139mod handlers_vector;
140pub mod header_escape_guard;
141pub mod http_connection_limiter;
142pub mod http_handler_metrics;
143pub mod http_limits;
144pub mod http_principal_limiter;
145pub mod ingest_pipeline;
146pub mod output_stream;
147mod patch_support;
148mod request_body;
149mod request_context;
150mod routing;
151mod serverless_support;
152pub mod tls;
153mod transport;
154
155use self::handlers_ai::*;
156use self::handlers_entity::*;
157use self::handlers_graph::*;
158use self::handlers_keyed::*;
159use self::handlers_metrics::*;
160use self::handlers_ops::*;
161use self::handlers_query::*;
162use self::http_connection_limiter::{
163 HandlerDeadline, HttpConnectionLimiter, MonotonicClock, SystemMonotonicClock,
164};
165use self::http_handler_metrics::{HttpHandlerMetrics, HttpRejectReason, HttpTransport};
166pub use self::http_limits::{
167 HttpLimitsCliInput, HttpLimitsResolved, DEFAULT_HANDLER_TIMEOUT_MS, DEFAULT_RETRY_AFTER_SECS,
168};
169use self::http_principal_limiter::PrincipalConnectionLimiter;
170use self::patch_support::*;
171use self::request_body::*;
172use self::routing::*;
173use self::serverless_support::*;
174use self::transport::*;
175
176#[derive(Debug, Clone, Copy, PartialEq, Eq)]
182pub enum ServerSurface {
183 Public,
185 AdminOnly,
189 MetricsOnly,
193}
194
195#[derive(Debug, Clone)]
196pub struct ServerOptions {
197 pub bind_addr: String,
198 pub max_body_bytes: usize,
199 pub read_timeout_ms: u64,
200 pub write_timeout_ms: u64,
201 pub max_scan_limit: usize,
202 pub surface: ServerSurface,
206 pub transport_readiness: crate::service_cli::TransportReadiness,
207 pub websocket_allowed_origins: Vec<String>,
215}
216
217pub const DEFAULT_HTTP_MAX_BODY_BYTES: usize = 32 * 1024 * 1024;
218
219impl Default for ServerOptions {
220 fn default() -> Self {
221 Self {
222 bind_addr: "127.0.0.1:5055".to_string(),
223 max_body_bytes: DEFAULT_HTTP_MAX_BODY_BYTES,
224 read_timeout_ms: 5_000,
225 write_timeout_ms: 5_000,
226 max_scan_limit: 1_000,
227 surface: ServerSurface::Public,
228 transport_readiness: crate::service_cli::TransportReadiness::default(),
229 websocket_allowed_origins: Vec::new(),
230 }
231 }
232}
233
234pub struct ServerReplicationState {
236 pub config: crate::replication::ReplicationConfig,
237 pub primary: Option<crate::replication::primary::PrimaryReplication>,
238}
239
240#[derive(Clone)]
241pub struct RedDBServer {
242 runtime: RedDBRuntime,
243 options: ServerOptions,
244 auth_store: Option<Arc<AuthStore>>,
245 replication: Option<Arc<ServerReplicationState>>,
246 http_limiter: HttpConnectionLimiter,
251 handler_timeout: Duration,
257 handler_clock: Arc<dyn MonotonicClock>,
264 slow_inject_ms: Arc<AtomicU64>,
271 http_metrics: HttpHandlerMetrics,
277 retry_after_secs: u64,
281 principal_limiter: PrincipalConnectionLimiter,
289 pub(crate) stream_capacity: Arc<output_stream::StreamCapacityRegistry>,
297 pub(crate) lease_registry: Arc<output_stream::LeaseRegistry>,
302 pub(crate) cursor_registry: Arc<output_stream::CursorRegistry>,
307}
308
309const DEFAULT_HANDLER_TIMEOUT: Duration = Duration::from_millis(30_000);
311
312#[derive(Debug, Clone, Copy, PartialEq, Eq)]
313enum ServerlessWarmupScope {
314 Indexes,
315 GraphProjections,
316 AnalyticsJobs,
317 NativeArtifacts,
318}
319
320#[derive(Debug, Clone, Copy, PartialEq, Eq)]
321enum DeploymentProfile {
322 Embedded,
323 Server,
324 Serverless,
325}
326
327fn percent_decode_path_segment(input: &str) -> Result<String, String> {
328 let bytes = input.as_bytes();
329 let mut out = Vec::with_capacity(bytes.len());
330 let mut index = 0;
331 while index < bytes.len() {
332 match bytes[index] {
333 b'%' => {
334 if index + 2 >= bytes.len() {
335 return Err("truncated percent escape".to_string());
336 }
337 let high = hex_value(bytes[index + 1])
338 .ok_or_else(|| "invalid percent escape".to_string())?;
339 let low = hex_value(bytes[index + 2])
340 .ok_or_else(|| "invalid percent escape".to_string())?;
341 out.push((high << 4) | low);
342 index += 3;
343 }
344 byte => {
345 out.push(byte);
346 index += 1;
347 }
348 }
349 }
350 String::from_utf8(out).map_err(|_| "path segment is not valid UTF-8".to_string())
351}
352
353fn hex_value(byte: u8) -> Option<u8> {
354 match byte {
355 b'0'..=b'9' => Some(byte - b'0'),
356 b'a'..=b'f' => Some(byte - b'a' + 10),
357 b'A'..=b'F' => Some(byte - b'A' + 10),
358 _ => None,
359 }
360}
361
362#[derive(Debug, Clone)]
363struct ParsedQueryRequest {
364 query: String,
365 entity_types: Option<Vec<String>>,
366 capabilities: Option<Vec<String>>,
367 params: Option<Vec<Value>>,
371}
372
373#[derive(Debug, Clone, Copy)]
374enum PatchOperationType {
375 Set,
376 Replace,
377 Unset,
378}
379
380#[derive(Debug, Clone)]
381struct PatchOperation {
382 op: PatchOperationType,
383 path: Vec<String>,
384 value: Option<JsonValue>,
385}
386
387impl RedDBServer {
388 pub fn new(runtime: RedDBRuntime) -> Self {
389 Self::with_options(runtime, ServerOptions::default())
390 }
391
392 pub fn from_database_options(
393 db_options: RedDBOptions,
394 server_options: ServerOptions,
395 ) -> RedDBResult<Self> {
396 let runtime = RedDBRuntime::with_options(db_options)?;
397 Ok(Self::with_options(runtime, server_options))
398 }
399
400 pub fn with_options(runtime: RedDBRuntime, options: ServerOptions) -> Self {
401 Self {
402 runtime,
403 options,
404 auth_store: None,
405 replication: None,
406 http_limiter: HttpConnectionLimiter::with_default_cap(),
407 handler_timeout: DEFAULT_HANDLER_TIMEOUT,
408 handler_clock: Arc::new(SystemMonotonicClock::new()),
409 slow_inject_ms: Arc::new(AtomicU64::new(0)),
410 http_metrics: HttpHandlerMetrics::new(),
411 retry_after_secs: DEFAULT_RETRY_AFTER_SECS,
412 principal_limiter: PrincipalConnectionLimiter::new(
413 http_limits::DEFAULT_MAX_INFLIGHT_PER_PRINCIPAL,
414 ),
415 stream_capacity: output_stream::StreamCapacityRegistry::new(),
416 lease_registry: output_stream::LeaseRegistry::new(),
417 cursor_registry: output_stream::CursorRegistry::new(),
418 }
419 }
420
421 #[doc(hidden)]
422 pub fn stream_capacity(&self) -> &Arc<output_stream::StreamCapacityRegistry> {
423 &self.stream_capacity
424 }
425
426 #[doc(hidden)]
427 pub fn lease_registry(&self) -> &Arc<output_stream::LeaseRegistry> {
428 &self.lease_registry
429 }
430
431 #[doc(hidden)]
432 pub fn cursor_registry(&self) -> &Arc<output_stream::CursorRegistry> {
433 &self.cursor_registry
434 }
435
436 #[doc(hidden)]
437 pub fn http_metrics(&self) -> &HttpHandlerMetrics {
438 &self.http_metrics
439 }
440
441 #[doc(hidden)]
446 pub fn with_http_limiter_cap(mut self, cap: usize) -> Self {
447 self.http_limiter = HttpConnectionLimiter::new(cap);
448 self
449 }
450
451 pub fn with_http_limits(mut self, limits: HttpLimitsResolved) -> Self {
456 self.http_limiter = HttpConnectionLimiter::new(limits.max_handlers);
457 self.handler_timeout = Duration::from_millis(limits.handler_timeout_ms);
458 self.retry_after_secs = limits.retry_after_secs;
459 self.principal_limiter = PrincipalConnectionLimiter::new(limits.max_inflight_per_principal);
460 self
461 }
462
463 #[doc(hidden)]
468 pub fn with_principal_inflight_cap(mut self, cap: usize) -> Self {
469 self.principal_limiter = PrincipalConnectionLimiter::new(cap);
470 self
471 }
472
473 #[doc(hidden)]
474 pub fn principal_limiter(&self) -> &PrincipalConnectionLimiter {
475 &self.principal_limiter
476 }
477
478 #[doc(hidden)]
479 pub fn retry_after_secs(&self) -> u64 {
480 self.retry_after_secs
481 }
482
483 #[doc(hidden)]
484 pub fn http_limiter(&self) -> &HttpConnectionLimiter {
485 &self.http_limiter
486 }
487
488 #[doc(hidden)]
491 pub fn with_handler_timeout(mut self, timeout: Duration) -> Self {
492 self.handler_timeout = timeout;
493 self
494 }
495
496 #[doc(hidden)]
497 pub fn handler_timeout(&self) -> Duration {
498 self.handler_timeout
499 }
500
501 #[doc(hidden)]
506 pub fn with_handler_clock(mut self, clock: Arc<dyn MonotonicClock>) -> Self {
507 self.handler_clock = clock;
508 self
509 }
510
511 #[doc(hidden)]
517 pub fn set_test_slow_inject_ms(&self, ms: u64) {
518 self.slow_inject_ms.store(ms, Ordering::Relaxed);
519 }
520
521 pub fn with_auth(mut self, auth_store: Arc<AuthStore>) -> Self {
525 self.runtime.set_auth_store(Arc::clone(&auth_store));
526 self.auth_store = Some(auth_store);
527 self
528 }
529
530 pub fn with_replication(mut self, state: Arc<ServerReplicationState>) -> Self {
532 self.replication = Some(state);
533 self
534 }
535
536 pub fn with_websocket_allowed_origins(mut self, origins: Vec<String>) -> Self {
541 self.options.websocket_allowed_origins = origins;
542 self
543 }
544
545 pub(crate) fn websocket_allowed_origins(&self) -> &[String] {
547 &self.options.websocket_allowed_origins
548 }
549
550 pub fn with_browser_token_authority(
557 self,
558 authority: Arc<crate::auth::browser_token::BrowserTokenAuthority>,
559 ) -> Self {
560 self.runtime.set_browser_token_authority(Some(authority));
561 self
562 }
563
564 pub fn runtime(&self) -> &RedDBRuntime {
565 &self.runtime
566 }
567
568 pub fn options(&self) -> &ServerOptions {
569 &self.options
570 }
571
572 fn query_use_cases(&self) -> QueryUseCases<'_, RedDBRuntime> {
573 QueryUseCases::new(&self.runtime)
574 }
575
576 fn admin_use_cases(&self) -> AdminUseCases<'_, RedDBRuntime> {
577 AdminUseCases::new(&self.runtime)
578 }
579
580 fn entity_use_cases(&self) -> EntityUseCases<'_, RedDBRuntime> {
581 EntityUseCases::new(&self.runtime)
582 }
583
584 fn catalog_use_cases(&self) -> CatalogUseCases<'_, RedDBRuntime> {
585 CatalogUseCases::new(&self.runtime)
586 }
587
588 fn graph_use_cases(&self) -> GraphUseCases<'_, RedDBRuntime> {
589 GraphUseCases::new(&self.runtime)
590 }
591
592 fn native_use_cases(&self) -> NativeUseCases<'_, RedDBRuntime> {
593 NativeUseCases::new(&self.runtime)
594 }
595
596 fn tree_use_cases(&self) -> TreeUseCases<'_, RedDBRuntime> {
597 TreeUseCases::new(&self.runtime)
598 }
599
600 fn transport_readiness_json(&self) -> JsonValue {
601 let active = self
602 .options
603 .transport_readiness
604 .active
605 .iter()
606 .map(|listener| {
607 let mut object = Map::new();
608 object.insert(
609 "transport".to_string(),
610 JsonValue::String(listener.transport.clone()),
611 );
612 object.insert(
613 "bind_addr".to_string(),
614 JsonValue::String(listener.bind_addr.clone()),
615 );
616 object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
617 JsonValue::Object(object)
618 })
619 .collect();
620 let failed = self
621 .options
622 .transport_readiness
623 .failed
624 .iter()
625 .map(|listener| {
626 let mut object = Map::new();
627 object.insert(
628 "transport".to_string(),
629 JsonValue::String(listener.transport.clone()),
630 );
631 object.insert(
632 "bind_addr".to_string(),
633 JsonValue::String(listener.bind_addr.clone()),
634 );
635 object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
636 object.insert(
637 "reason".to_string(),
638 JsonValue::String(listener.reason.clone()),
639 );
640 JsonValue::Object(object)
641 })
642 .collect();
643
644 let mut object = Map::new();
645 object.insert("active".to_string(), JsonValue::Array(active));
646 object.insert("failed".to_string(), JsonValue::Array(failed));
647 JsonValue::Object(object)
648 }
649
650 fn handle_grpc_discovery(&self) -> HttpResponse {
651 let mut methods = Map::new();
652 methods.insert(
653 "query".to_string(),
654 JsonValue::String("reddb.v1.RedDB/Query".to_string()),
655 );
656 methods.insert(
657 "batch_query".to_string(),
658 JsonValue::String("reddb.v1.RedDB/BatchQuery".to_string()),
659 );
660 methods.insert(
661 "health".to_string(),
662 JsonValue::String("reddb.v1.RedDB/Health".to_string()),
663 );
664 methods.insert(
665 "prepare".to_string(),
666 JsonValue::String("reddb.v1.RedDB/Prepare".to_string()),
667 );
668 methods.insert(
669 "execute_prepared".to_string(),
670 JsonValue::String("reddb.v1.RedDB/ExecutePrepared".to_string()),
671 );
672
673 let mut examples = Map::new();
674 examples.insert(
675 "query".to_string(),
676 JsonValue::String(
677 "grpcurl -plaintext -d '{\"query\":\"SELECT 1\"}' 127.0.0.1:50051 reddb.v1.RedDB/Query"
678 .to_string(),
679 ),
680 );
681 examples.insert(
682 "query_with_params".to_string(),
683 JsonValue::String(
684 "grpcurl -plaintext -d '{\"query\":\"SELECT $1 AS value\",\"params\":[{\"intValue\":42}]}' 127.0.0.1:50051 reddb.v1.RedDB/Query"
685 .to_string(),
686 ),
687 );
688 examples.insert(
689 "health".to_string(),
690 JsonValue::String(
691 "grpcurl -plaintext -d '{}' 127.0.0.1:50051 reddb.v1.RedDB/Health".to_string(),
692 ),
693 );
694
695 let mut object = Map::new();
696 object.insert("ok".to_string(), JsonValue::Bool(true));
697 object.insert(
698 "service".to_string(),
699 JsonValue::String("reddb.v1.RedDB".to_string()),
700 );
701 object.insert(
702 "package".to_string(),
703 JsonValue::String("reddb.v1".to_string()),
704 );
705 object.insert(
706 "proto".to_string(),
707 JsonValue::String("crates/reddb-grpc-proto/proto/reddb.proto".to_string()),
708 );
709 object.insert("methods".to_string(), JsonValue::Object(methods));
710 object.insert("examples".to_string(), JsonValue::Object(examples));
711 object.insert(
712 "transport_listeners".to_string(),
713 self.transport_readiness_json(),
714 );
715 object.insert(
716 "hint".to_string(),
717 JsonValue::String(
718 "If grpcurl cannot list services, pass the proto file with -import-path crates/reddb-grpc-proto/proto -proto reddb.proto."
719 .to_string(),
720 ),
721 );
722 json_response(200, JsonValue::Object(object))
723 }
724
725 fn handle_query_contract(&self) -> HttpResponse {
726 let mut examples = Map::new();
727 examples.insert(
728 "raw_sql".to_string(),
729 JsonValue::String("curl -sS http://127.0.0.1:8080/query -d 'SELECT 1'".to_string()),
730 );
731 examples.insert(
732 "json_query".to_string(),
733 JsonValue::String(
734 "curl -sS http://127.0.0.1:8080/query -H 'content-type: application/json' -d '{\"query\":\"SELECT 1\"}'"
735 .to_string(),
736 ),
737 );
738 examples.insert(
739 "json_query_with_params".to_string(),
740 JsonValue::String(
741 "curl -sS http://127.0.0.1:8080/query -H 'content-type: application/json' -d '{\"query\":\"SELECT $1 AS value\",\"params\":[42]}'"
742 .to_string(),
743 ),
744 );
745
746 let mut request_body = Map::new();
747 request_body.insert(
748 "query".to_string(),
749 JsonValue::String("required string".to_string()),
750 );
751 request_body.insert(
752 "params".to_string(),
753 JsonValue::String("optional array".to_string()),
754 );
755
756 let mut response_shape = Map::new();
757 response_shape.insert(
758 "columns".to_string(),
759 JsonValue::String("projected column names".to_string()),
760 );
761 response_shape.insert(
762 "records[].values".to_string(),
763 JsonValue::String("only projected values".to_string()),
764 );
765 response_shape.insert(
766 "records[].meta".to_string(),
767 JsonValue::String("internal metadata when present".to_string()),
768 );
769
770 let mut object = Map::new();
771 object.insert("ok".to_string(), JsonValue::Bool(false));
772 object.insert(
773 "code".to_string(),
774 JsonValue::String("method_not_allowed".to_string()),
775 );
776 object.insert(
777 "message".to_string(),
778 JsonValue::String("/query accepts POST requests".to_string()),
779 );
780 object.insert(
781 "hint".to_string(),
782 JsonValue::String(
783 "Send raw SQL in the body, or JSON with a string 'query' field.".to_string(),
784 ),
785 );
786 object.insert("method".to_string(), JsonValue::String("POST".to_string()));
787 object.insert("path".to_string(), JsonValue::String("/query".to_string()));
788 object.insert("request_body".to_string(), JsonValue::Object(request_body));
789 object.insert(
790 "response_shape".to_string(),
791 JsonValue::Object(response_shape),
792 );
793 object.insert("examples".to_string(), JsonValue::Object(examples));
794 object.insert(
795 "docs".to_string(),
796 JsonValue::String("https://reddb.io/docs/query".to_string()),
797 );
798
799 json_response(405, JsonValue::Object(object))
800 .with_header("Allow", http::HeaderValue::from_static("POST"))
801 }
802
803 fn handle_root_discovery(&self) -> HttpResponse {
804 let mut endpoints = Map::new();
805 endpoints.insert(
806 "health".to_string(),
807 JsonValue::String("GET /health".to_string()),
808 );
809 endpoints.insert(
810 "ready".to_string(),
811 JsonValue::String("GET /ready".to_string()),
812 );
813 endpoints.insert(
814 "query".to_string(),
815 JsonValue::String("POST /query".to_string()),
816 );
817 endpoints.insert(
818 "query_readiness".to_string(),
819 JsonValue::String("GET /ready/query".to_string()),
820 );
821 endpoints.insert(
822 "catalog".to_string(),
823 JsonValue::String("GET /catalog".to_string()),
824 );
825 endpoints.insert(
826 "deployment_profiles".to_string(),
827 JsonValue::String("GET /deployment/profiles".to_string()),
828 );
829
830 let mut examples = Map::new();
831 examples.insert(
832 "http_raw_sql".to_string(),
833 JsonValue::String("curl -sS http://127.0.0.1:8080/query -d 'SELECT 1'".to_string()),
834 );
835 examples.insert(
836 "http_json_query".to_string(),
837 JsonValue::String(
838 "curl -sS http://127.0.0.1:8080/query -H 'content-type: application/json' -d '{\"query\":\"SELECT 1\"}'"
839 .to_string(),
840 ),
841 );
842 examples.insert(
843 "http_json_query_with_params".to_string(),
844 JsonValue::String(
845 "curl -sS http://127.0.0.1:8080/query -H 'content-type: application/json' -d '{\"query\":\"SELECT $1 AS value\",\"params\":[42]}'"
846 .to_string(),
847 ),
848 );
849
850 let mut object = Map::new();
851 object.insert("ok".to_string(), JsonValue::Bool(true));
852 object.insert(
853 "service".to_string(),
854 JsonValue::String("reddb".to_string()),
855 );
856 object.insert(
857 "version".to_string(),
858 JsonValue::String(env!("CARGO_PKG_VERSION").to_string()),
859 );
860 object.insert("endpoints".to_string(), JsonValue::Object(endpoints));
861 object.insert("examples".to_string(), JsonValue::Object(examples));
862 object.insert(
863 "docs".to_string(),
864 JsonValue::String("https://reddb.io/docs".to_string()),
865 );
866 object.insert(
867 "transport_listeners".to_string(),
868 self.transport_readiness_json(),
869 );
870 json_response(200, JsonValue::Object(object))
871 }
872
873 fn health_json_with_transport(&self, report: &HealthReport) -> JsonValue {
874 let mut value = crate::presentation::ops_json::health_json(report);
875 if let JsonValue::Object(ref mut object) = value {
876 object.insert(
877 "transport_listeners".to_string(),
878 self.transport_readiness_json(),
879 );
880 }
881 value
882 }
883
884 pub fn serve(&self) -> io::Result<()> {
885 let listener = TcpListener::bind(&self.options.bind_addr)?;
886 self.serve_on(listener)
887 }
888
889 pub fn serve_on(&self, listener: TcpListener) -> io::Result<()> {
896 let runtime = axum_edge::build_edge_runtime()?;
897 runtime.block_on(
898 self.clone()
899 .serve_edge_on_std(listener, HttpTransport::Http),
900 )
901 }
902
903 pub fn serve_one_on(&self, listener: TcpListener) -> io::Result<()> {
907 let runtime = axum_edge::build_background_edge_runtime()?;
908 let server = self.clone();
909 runtime.block_on(async move {
910 listener.set_nonblocking(true)?;
911 let listener = tokio::net::TcpListener::from_std(listener)?;
912 let (stream, _peer) = listener.accept().await?;
913 server.serve_edge_one(stream).await;
914 Ok(())
915 })
916 }
917
918 pub fn serve_in_background(&self) -> thread::JoinHandle<io::Result<()>> {
919 let server = self.clone();
920 thread::spawn(move || server.serve())
921 }
922
923 pub fn serve_in_background_on(
924 &self,
925 listener: TcpListener,
926 ) -> thread::JoinHandle<io::Result<()>> {
927 let server = self.clone();
928 thread::spawn(move || {
929 let runtime = axum_edge::build_background_edge_runtime()?;
930 runtime.block_on(server.serve_edge_on_std(listener, HttpTransport::Http))
931 })
932 }
933
934 pub fn serve_tls(&self, tls_config: std::sync::Arc<rustls::ServerConfig>) -> io::Result<()> {
938 let listener = TcpListener::bind(&self.options.bind_addr)?;
939 self.serve_tls_on(listener, tls_config)
940 }
941
942 pub fn serve_tls_on(
943 &self,
944 listener: TcpListener,
945 tls_config: std::sync::Arc<rustls::ServerConfig>,
946 ) -> io::Result<()> {
947 let runtime = axum_edge::build_edge_runtime()?;
948 let acceptor = axum_edge::tls_acceptor(tls_config);
949 runtime.block_on(self.clone().serve_edge_tls_on_std(
950 listener,
951 acceptor,
952 HttpTransport::Https,
953 ))
954 }
955
956 pub fn serve_tls_in_background(
957 &self,
958 tls_config: std::sync::Arc<rustls::ServerConfig>,
959 ) -> thread::JoinHandle<io::Result<()>> {
960 let server = self.clone();
961 thread::spawn(move || server.serve_tls(tls_config))
962 }
963
964 pub fn serve_tls_in_background_on(
965 &self,
966 listener: TcpListener,
967 tls_config: std::sync::Arc<rustls::ServerConfig>,
968 ) -> thread::JoinHandle<io::Result<()>> {
969 let server = self.clone();
970 thread::spawn(move || {
971 let runtime = axum_edge::build_background_edge_runtime()?;
972 let acceptor = axum_edge::tls_acceptor(tls_config);
973 runtime.block_on(server.serve_edge_tls_on_std(listener, acceptor, HttpTransport::Https))
974 })
975 }
976
977 fn handle_connection(&self, stream: TcpStream) -> io::Result<()> {
978 let started = Instant::now();
979 let result = self.handle_connection_inner(stream);
980 let elapsed = started.elapsed().as_secs_f64();
981 self.http_metrics
982 .record_duration(HttpTransport::Http, elapsed);
983 result
984 }
985
986 fn handle_connection_inner(&self, mut stream: TcpStream) -> io::Result<()> {
987 stream.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
988 stream.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;
989
990 let deadline = HandlerDeadline::arm(Arc::clone(&self.handler_clock), self.handler_timeout);
998
999 let request = HttpRequest::read_from(&mut stream, self.options.max_body_bytes)?;
1000
1001 if deadline.expired() {
1003 self.http_metrics
1004 .record_reject(HttpTransport::Http, HttpRejectReason::HandlerTimeout);
1005 Self::write_handler_timeout_503(&mut stream);
1006 return Ok(());
1007 }
1008
1009 if self.try_route_streaming(&request, &mut stream)? {
1010 return Ok(());
1011 }
1012 let response = self.route(request);
1013
1014 let inject_ms = self.slow_inject_ms.load(Ordering::Relaxed);
1018 if inject_ms > 0 {
1019 thread::sleep(Duration::from_millis(inject_ms));
1020 }
1021
1022 if deadline.expired() {
1024 self.http_metrics
1025 .record_reject(HttpTransport::Http, HttpRejectReason::HandlerTimeout);
1026 Self::write_handler_timeout_503(&mut stream);
1027 return Ok(());
1028 }
1029
1030 stream.write_all(&response.to_http_bytes())?;
1031 stream.flush()?;
1032 Ok(())
1033 }
1034
1035 fn write_handler_timeout_503<S: Write>(stream: &mut S) {
1041 const RESPONSE: &[u8] = b"HTTP/1.1 503 Service Unavailable\r\n\
1042 Connection: close\r\n\
1043 Content-Length: 0\r\n\
1044 \r\n";
1045 let _ = stream.write_all(RESPONSE);
1046 let _ = stream.flush();
1047 }
1048}