1pub(crate) use crate::application::json_input::{
4 json_bool_field, json_f32_field, json_string_field, json_usize_field,
5};
6pub(crate) use crate::application::{
7 AdminUseCases, CatalogUseCases, CreateDocumentInput, CreateEdgeInput, CreateEntityOutput,
8 CreateKvInput, CreateNodeEmbeddingInput, CreateNodeGraphLinkInput, CreateNodeInput,
9 CreateNodeTableLinkInput, CreateRowInput, CreateVectorInput, DeleteEntityInput, EntityUseCases,
10 ExecuteQueryInput, ExplainQueryInput, GraphCentralityInput, GraphClusteringInput,
11 GraphCommunitiesInput, GraphComponentsInput, GraphCyclesInput, GraphHitsInput,
12 GraphNeighborhoodInput, GraphPersonalizedPageRankInput, GraphShortestPathInput,
13 GraphTopologicalSortInput, GraphTraversalInput, GraphUseCases, InspectNativeArtifactInput,
14 NativeUseCases, PatchEntityInput, PatchEntityOperation, PatchEntityOperationType,
15 QueryUseCases, SearchHybridInput, SearchIvfInput, SearchMultimodalInput, SearchSimilarInput,
16 SearchTextInput, TreeUseCases,
17};
18use std::collections::{BTreeMap, HashMap};
19use std::io::{self, Read, Write};
20use std::net::{TcpListener, TcpStream};
21use std::sync::atomic::{AtomicU64, Ordering};
22use std::thread;
23use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
24
25use std::sync::Arc;
26
27use crate::api::{RedDBError, RedDBOptions, RedDBResult};
28use crate::auth::store::AuthStore;
29use crate::catalog::{CatalogModelSnapshot, CollectionDescriptor, CollectionModel, SchemaMode};
30use crate::health::{HealthProvider, HealthReport, HealthState};
31use crate::json::{parse_json, to_vec as json_to_vec, Map, Value as JsonValue};
32use crate::runtime::{
33 RedDBRuntime, RuntimeFilter, RuntimeFilterValue, RuntimeGraphCentralityAlgorithm,
34 RuntimeGraphCentralityResult, RuntimeGraphClusteringResult, RuntimeGraphCommunityAlgorithm,
35 RuntimeGraphCommunityResult, RuntimeGraphComponentsMode, RuntimeGraphComponentsResult,
36 RuntimeGraphCyclesResult, RuntimeGraphDirection, RuntimeGraphHitsResult,
37 RuntimeGraphNeighborhoodResult, RuntimeGraphPathAlgorithm, RuntimeGraphPathResult,
38 RuntimeGraphPattern, RuntimeGraphProjection, RuntimeGraphTopologicalSortResult,
39 RuntimeGraphTraversalResult, RuntimeGraphTraversalStrategy, RuntimeIvfSearchResult,
40 RuntimeQueryWeights, RuntimeStats, ScanCursor, ScanPage,
41};
42use crate::storage::schema::Value;
43use crate::storage::unified::devx::refs::{NodeRef, TableRef, VectorRef};
44use crate::storage::unified::dsl::{MatchComponents, QueryResult as DslQueryResult};
45use crate::storage::unified::{MetadataValue, RefTarget, SparseVector};
46use crate::storage::{CrossRef, EntityData, EntityId, EntityKind, SimilarResult, UnifiedEntity};
47
48fn analytics_job_json(job: &crate::PhysicalAnalyticsJob) -> JsonValue {
49 crate::presentation::admin_json::analytics_job_json(job)
50}
51
52#[cfg(test)]
53mod tests {
54 use super::*;
55 use crate::api::RedDBOptions;
56 use crate::health::HealthReport;
57 use crate::service_cli::{
58 TransportListenerFailure, TransportListenerState, TransportReadiness,
59 };
60
61 #[test]
62 fn server_options_default_http_body_limit_is_32_mib() {
63 assert_eq!(ServerOptions::default().max_body_bytes, 32 * 1024 * 1024);
64 }
65
66 #[test]
67 fn health_json_reports_transport_listeners() {
68 let runtime = RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime");
69 let mut options = ServerOptions::default();
70 options.transport_readiness = TransportReadiness {
71 active: vec![TransportListenerState {
72 transport: "grpc".to_string(),
73 bind_addr: "127.0.0.1:50051".to_string(),
74 explicit: true,
75 }],
76 failed: vec![TransportListenerFailure {
77 transport: "http".to_string(),
78 bind_addr: "127.0.0.1:5055".to_string(),
79 explicit: false,
80 reason: "http listener bind 127.0.0.1:5055: address in use".to_string(),
81 }],
82 };
83 let server = RedDBServer::with_options(runtime, options);
84
85 let payload = server.health_json_with_transport(&HealthReport::healthy());
86 let JsonValue::Object(root) = payload else {
87 panic!("health payload should be an object");
88 };
89 let Some(JsonValue::Object(listeners)) = root.get("transport_listeners") else {
90 panic!("health payload should include transport_listeners");
91 };
92 let Some(JsonValue::Array(active)) = listeners.get("active") else {
93 panic!("transport_listeners.active should be an array");
94 };
95 let Some(JsonValue::Array(failed)) = listeners.get("failed") else {
96 panic!("transport_listeners.failed should be an array");
97 };
98
99 assert_eq!(active.len(), 1);
100 assert_eq!(failed.len(), 1);
101 }
102}
103
104fn graph_projection_json(projection: &crate::PhysicalGraphProjection) -> JsonValue {
105 crate::presentation::admin_json::graph_projection_json(projection)
106}
107
108mod axum_edge;
109mod ws_edge;
110pub mod handlers_admin;
111mod handlers_ai;
112mod handlers_ai_model_cache;
113mod handlers_auth;
114mod handlers_backup;
115mod handlers_collection_policy;
116mod handlers_ec;
117pub(crate) mod handlers_entity;
118mod handlers_geo;
119mod handlers_graph;
120mod handlers_keyed;
121mod handlers_log;
122mod handlers_metrics;
123mod handlers_ops;
124mod handlers_ops_policy;
125pub(crate) mod handlers_query;
130mod handlers_replication;
131mod handlers_topology;
132mod handlers_vcs;
133mod handlers_vector;
134pub mod header_escape_guard;
135pub mod http_connection_limiter;
136pub mod http_handler_metrics;
137pub mod http_limits;
138pub mod http_principal_limiter;
139pub mod ingest_pipeline;
140pub mod output_stream;
141mod patch_support;
142mod request_body;
143mod request_context;
144mod routing;
145mod serverless_support;
146pub mod tls;
147mod transport;
148
149use self::handlers_ai::*;
150use self::handlers_entity::*;
151use self::handlers_graph::*;
152use self::handlers_keyed::*;
153use self::handlers_metrics::*;
154use self::handlers_ops::*;
155use self::handlers_query::*;
156use self::http_connection_limiter::{
157 HandlerDeadline, HttpConnectionLimiter, MonotonicClock, SystemMonotonicClock,
158};
159use self::http_handler_metrics::{HttpHandlerMetrics, HttpRejectReason, HttpTransport};
160pub use self::http_limits::{
161 HttpLimitsCliInput, HttpLimitsResolved, DEFAULT_HANDLER_TIMEOUT_MS, DEFAULT_RETRY_AFTER_SECS,
162};
163use self::http_principal_limiter::PrincipalConnectionLimiter;
164use self::patch_support::*;
165use self::request_body::*;
166use self::routing::*;
167use self::serverless_support::*;
168use self::transport::*;
169
170#[derive(Debug, Clone, Copy, PartialEq, Eq)]
176pub enum ServerSurface {
177 Public,
179 AdminOnly,
183 MetricsOnly,
187}
188
189#[derive(Debug, Clone)]
190pub struct ServerOptions {
191 pub bind_addr: String,
192 pub max_body_bytes: usize,
193 pub read_timeout_ms: u64,
194 pub write_timeout_ms: u64,
195 pub max_scan_limit: usize,
196 pub surface: ServerSurface,
200 pub transport_readiness: crate::service_cli::TransportReadiness,
201 pub websocket_allowed_origins: Vec<String>,
209}
210
211pub const DEFAULT_HTTP_MAX_BODY_BYTES: usize = 32 * 1024 * 1024;
212
213impl Default for ServerOptions {
214 fn default() -> Self {
215 Self {
216 bind_addr: "127.0.0.1:5055".to_string(),
217 max_body_bytes: DEFAULT_HTTP_MAX_BODY_BYTES,
218 read_timeout_ms: 5_000,
219 write_timeout_ms: 5_000,
220 max_scan_limit: 1_000,
221 surface: ServerSurface::Public,
222 transport_readiness: crate::service_cli::TransportReadiness::default(),
223 websocket_allowed_origins: Vec::new(),
224 }
225 }
226}
227
228pub struct ServerReplicationState {
230 pub config: crate::replication::ReplicationConfig,
231 pub primary: Option<crate::replication::primary::PrimaryReplication>,
232}
233
234#[derive(Clone)]
235pub struct RedDBServer {
236 runtime: RedDBRuntime,
237 options: ServerOptions,
238 auth_store: Option<Arc<AuthStore>>,
239 replication: Option<Arc<ServerReplicationState>>,
240 http_limiter: HttpConnectionLimiter,
245 handler_timeout: Duration,
251 handler_clock: Arc<dyn MonotonicClock>,
258 slow_inject_ms: Arc<AtomicU64>,
265 http_metrics: HttpHandlerMetrics,
271 retry_after_secs: u64,
275 principal_limiter: PrincipalConnectionLimiter,
283 pub(crate) stream_capacity: Arc<output_stream::StreamCapacityRegistry>,
291 pub(crate) lease_registry: Arc<output_stream::LeaseRegistry>,
296 pub(crate) cursor_registry: Arc<output_stream::CursorRegistry>,
301}
302
303const DEFAULT_HANDLER_TIMEOUT: Duration = Duration::from_millis(30_000);
305
306#[derive(Debug, Clone, Copy, PartialEq, Eq)]
307enum ServerlessWarmupScope {
308 Indexes,
309 GraphProjections,
310 AnalyticsJobs,
311 NativeArtifacts,
312}
313
314#[derive(Debug, Clone, Copy, PartialEq, Eq)]
315enum DeploymentProfile {
316 Embedded,
317 Server,
318 Serverless,
319}
320
321fn percent_decode_path_segment(input: &str) -> Result<String, String> {
322 let bytes = input.as_bytes();
323 let mut out = Vec::with_capacity(bytes.len());
324 let mut index = 0;
325 while index < bytes.len() {
326 match bytes[index] {
327 b'%' => {
328 if index + 2 >= bytes.len() {
329 return Err("truncated percent escape".to_string());
330 }
331 let high = hex_value(bytes[index + 1])
332 .ok_or_else(|| "invalid percent escape".to_string())?;
333 let low = hex_value(bytes[index + 2])
334 .ok_or_else(|| "invalid percent escape".to_string())?;
335 out.push((high << 4) | low);
336 index += 3;
337 }
338 byte => {
339 out.push(byte);
340 index += 1;
341 }
342 }
343 }
344 String::from_utf8(out).map_err(|_| "path segment is not valid UTF-8".to_string())
345}
346
347fn hex_value(byte: u8) -> Option<u8> {
348 match byte {
349 b'0'..=b'9' => Some(byte - b'0'),
350 b'a'..=b'f' => Some(byte - b'a' + 10),
351 b'A'..=b'F' => Some(byte - b'A' + 10),
352 _ => None,
353 }
354}
355
356#[derive(Debug, Clone)]
357struct ParsedQueryRequest {
358 query: String,
359 entity_types: Option<Vec<String>>,
360 capabilities: Option<Vec<String>>,
361 params: Option<Vec<Value>>,
365}
366
367#[derive(Debug, Clone, Copy)]
368enum PatchOperationType {
369 Set,
370 Replace,
371 Unset,
372}
373
374#[derive(Debug, Clone)]
375struct PatchOperation {
376 op: PatchOperationType,
377 path: Vec<String>,
378 value: Option<JsonValue>,
379}
380
381impl RedDBServer {
382 pub fn new(runtime: RedDBRuntime) -> Self {
383 Self::with_options(runtime, ServerOptions::default())
384 }
385
386 pub fn from_database_options(
387 db_options: RedDBOptions,
388 server_options: ServerOptions,
389 ) -> RedDBResult<Self> {
390 let runtime = RedDBRuntime::with_options(db_options)?;
391 Ok(Self::with_options(runtime, server_options))
392 }
393
394 pub fn with_options(runtime: RedDBRuntime, options: ServerOptions) -> Self {
395 Self {
396 runtime,
397 options,
398 auth_store: None,
399 replication: None,
400 http_limiter: HttpConnectionLimiter::with_default_cap(),
401 handler_timeout: DEFAULT_HANDLER_TIMEOUT,
402 handler_clock: Arc::new(SystemMonotonicClock::new()),
403 slow_inject_ms: Arc::new(AtomicU64::new(0)),
404 http_metrics: HttpHandlerMetrics::new(),
405 retry_after_secs: DEFAULT_RETRY_AFTER_SECS,
406 principal_limiter: PrincipalConnectionLimiter::new(
407 http_limits::DEFAULT_MAX_INFLIGHT_PER_PRINCIPAL,
408 ),
409 stream_capacity: output_stream::StreamCapacityRegistry::new(),
410 lease_registry: output_stream::LeaseRegistry::new(),
411 cursor_registry: output_stream::CursorRegistry::new(),
412 }
413 }
414
415 #[doc(hidden)]
416 pub fn stream_capacity(&self) -> &Arc<output_stream::StreamCapacityRegistry> {
417 &self.stream_capacity
418 }
419
420 #[doc(hidden)]
421 pub fn lease_registry(&self) -> &Arc<output_stream::LeaseRegistry> {
422 &self.lease_registry
423 }
424
425 #[doc(hidden)]
426 pub fn cursor_registry(&self) -> &Arc<output_stream::CursorRegistry> {
427 &self.cursor_registry
428 }
429
430 #[doc(hidden)]
431 pub fn http_metrics(&self) -> &HttpHandlerMetrics {
432 &self.http_metrics
433 }
434
435 #[doc(hidden)]
440 pub fn with_http_limiter_cap(mut self, cap: usize) -> Self {
441 self.http_limiter = HttpConnectionLimiter::new(cap);
442 self
443 }
444
445 pub fn with_http_limits(mut self, limits: HttpLimitsResolved) -> Self {
450 self.http_limiter = HttpConnectionLimiter::new(limits.max_handlers);
451 self.handler_timeout = Duration::from_millis(limits.handler_timeout_ms);
452 self.retry_after_secs = limits.retry_after_secs;
453 self.principal_limiter = PrincipalConnectionLimiter::new(limits.max_inflight_per_principal);
454 self
455 }
456
457 #[doc(hidden)]
462 pub fn with_principal_inflight_cap(mut self, cap: usize) -> Self {
463 self.principal_limiter = PrincipalConnectionLimiter::new(cap);
464 self
465 }
466
467 #[doc(hidden)]
468 pub fn principal_limiter(&self) -> &PrincipalConnectionLimiter {
469 &self.principal_limiter
470 }
471
472 #[doc(hidden)]
473 pub fn retry_after_secs(&self) -> u64 {
474 self.retry_after_secs
475 }
476
477 #[doc(hidden)]
478 pub fn http_limiter(&self) -> &HttpConnectionLimiter {
479 &self.http_limiter
480 }
481
482 #[doc(hidden)]
485 pub fn with_handler_timeout(mut self, timeout: Duration) -> Self {
486 self.handler_timeout = timeout;
487 self
488 }
489
490 #[doc(hidden)]
491 pub fn handler_timeout(&self) -> Duration {
492 self.handler_timeout
493 }
494
495 #[doc(hidden)]
500 pub fn with_handler_clock(mut self, clock: Arc<dyn MonotonicClock>) -> Self {
501 self.handler_clock = clock;
502 self
503 }
504
505 #[doc(hidden)]
511 pub fn set_test_slow_inject_ms(&self, ms: u64) {
512 self.slow_inject_ms.store(ms, Ordering::Relaxed);
513 }
514
515 pub fn with_auth(mut self, auth_store: Arc<AuthStore>) -> Self {
519 self.runtime.set_auth_store(Arc::clone(&auth_store));
520 self.auth_store = Some(auth_store);
521 self
522 }
523
524 pub fn with_replication(mut self, state: Arc<ServerReplicationState>) -> Self {
526 self.replication = Some(state);
527 self
528 }
529
530 pub fn with_websocket_allowed_origins(mut self, origins: Vec<String>) -> Self {
535 self.options.websocket_allowed_origins = origins;
536 self
537 }
538
539 pub(crate) fn websocket_allowed_origins(&self) -> &[String] {
541 &self.options.websocket_allowed_origins
542 }
543
544 pub fn runtime(&self) -> &RedDBRuntime {
545 &self.runtime
546 }
547
548 pub fn options(&self) -> &ServerOptions {
549 &self.options
550 }
551
552 fn query_use_cases(&self) -> QueryUseCases<'_, RedDBRuntime> {
553 QueryUseCases::new(&self.runtime)
554 }
555
556 fn admin_use_cases(&self) -> AdminUseCases<'_, RedDBRuntime> {
557 AdminUseCases::new(&self.runtime)
558 }
559
560 fn entity_use_cases(&self) -> EntityUseCases<'_, RedDBRuntime> {
561 EntityUseCases::new(&self.runtime)
562 }
563
564 fn catalog_use_cases(&self) -> CatalogUseCases<'_, RedDBRuntime> {
565 CatalogUseCases::new(&self.runtime)
566 }
567
568 fn graph_use_cases(&self) -> GraphUseCases<'_, RedDBRuntime> {
569 GraphUseCases::new(&self.runtime)
570 }
571
572 fn native_use_cases(&self) -> NativeUseCases<'_, RedDBRuntime> {
573 NativeUseCases::new(&self.runtime)
574 }
575
576 fn tree_use_cases(&self) -> TreeUseCases<'_, RedDBRuntime> {
577 TreeUseCases::new(&self.runtime)
578 }
579
580 fn transport_readiness_json(&self) -> JsonValue {
581 let active = self
582 .options
583 .transport_readiness
584 .active
585 .iter()
586 .map(|listener| {
587 let mut object = Map::new();
588 object.insert(
589 "transport".to_string(),
590 JsonValue::String(listener.transport.clone()),
591 );
592 object.insert(
593 "bind_addr".to_string(),
594 JsonValue::String(listener.bind_addr.clone()),
595 );
596 object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
597 JsonValue::Object(object)
598 })
599 .collect();
600 let failed = self
601 .options
602 .transport_readiness
603 .failed
604 .iter()
605 .map(|listener| {
606 let mut object = Map::new();
607 object.insert(
608 "transport".to_string(),
609 JsonValue::String(listener.transport.clone()),
610 );
611 object.insert(
612 "bind_addr".to_string(),
613 JsonValue::String(listener.bind_addr.clone()),
614 );
615 object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
616 object.insert(
617 "reason".to_string(),
618 JsonValue::String(listener.reason.clone()),
619 );
620 JsonValue::Object(object)
621 })
622 .collect();
623
624 let mut object = Map::new();
625 object.insert("active".to_string(), JsonValue::Array(active));
626 object.insert("failed".to_string(), JsonValue::Array(failed));
627 JsonValue::Object(object)
628 }
629
630 fn handle_grpc_discovery(&self) -> HttpResponse {
631 let mut methods = Map::new();
632 methods.insert(
633 "query".to_string(),
634 JsonValue::String("reddb.v1.RedDB/Query".to_string()),
635 );
636 methods.insert(
637 "batch_query".to_string(),
638 JsonValue::String("reddb.v1.RedDB/BatchQuery".to_string()),
639 );
640 methods.insert(
641 "health".to_string(),
642 JsonValue::String("reddb.v1.RedDB/Health".to_string()),
643 );
644 methods.insert(
645 "prepare".to_string(),
646 JsonValue::String("reddb.v1.RedDB/Prepare".to_string()),
647 );
648 methods.insert(
649 "execute_prepared".to_string(),
650 JsonValue::String("reddb.v1.RedDB/ExecutePrepared".to_string()),
651 );
652
653 let mut examples = Map::new();
654 examples.insert(
655 "query".to_string(),
656 JsonValue::String(
657 "grpcurl -plaintext -d '{\"query\":\"SELECT 1\"}' 127.0.0.1:50051 reddb.v1.RedDB/Query"
658 .to_string(),
659 ),
660 );
661 examples.insert(
662 "query_with_params".to_string(),
663 JsonValue::String(
664 "grpcurl -plaintext -d '{\"query\":\"SELECT $1 AS value\",\"params\":[{\"intValue\":42}]}' 127.0.0.1:50051 reddb.v1.RedDB/Query"
665 .to_string(),
666 ),
667 );
668 examples.insert(
669 "health".to_string(),
670 JsonValue::String(
671 "grpcurl -plaintext -d '{}' 127.0.0.1:50051 reddb.v1.RedDB/Health".to_string(),
672 ),
673 );
674
675 let mut object = Map::new();
676 object.insert("ok".to_string(), JsonValue::Bool(true));
677 object.insert(
678 "service".to_string(),
679 JsonValue::String("reddb.v1.RedDB".to_string()),
680 );
681 object.insert(
682 "package".to_string(),
683 JsonValue::String("reddb.v1".to_string()),
684 );
685 object.insert(
686 "proto".to_string(),
687 JsonValue::String("crates/reddb-grpc-proto/proto/reddb.proto".to_string()),
688 );
689 object.insert("methods".to_string(), JsonValue::Object(methods));
690 object.insert("examples".to_string(), JsonValue::Object(examples));
691 object.insert(
692 "transport_listeners".to_string(),
693 self.transport_readiness_json(),
694 );
695 object.insert(
696 "hint".to_string(),
697 JsonValue::String(
698 "If grpcurl cannot list services, pass the proto file with -import-path crates/reddb-grpc-proto/proto -proto reddb.proto."
699 .to_string(),
700 ),
701 );
702 json_response(200, JsonValue::Object(object))
703 }
704
705 fn handle_query_contract(&self) -> HttpResponse {
706 let mut examples = Map::new();
707 examples.insert(
708 "raw_sql".to_string(),
709 JsonValue::String("curl -sS http://127.0.0.1:8080/query -d 'SELECT 1'".to_string()),
710 );
711 examples.insert(
712 "json_query".to_string(),
713 JsonValue::String(
714 "curl -sS http://127.0.0.1:8080/query -H 'content-type: application/json' -d '{\"query\":\"SELECT 1\"}'"
715 .to_string(),
716 ),
717 );
718 examples.insert(
719 "json_query_with_params".to_string(),
720 JsonValue::String(
721 "curl -sS http://127.0.0.1:8080/query -H 'content-type: application/json' -d '{\"query\":\"SELECT $1 AS value\",\"params\":[42]}'"
722 .to_string(),
723 ),
724 );
725
726 let mut request_body = Map::new();
727 request_body.insert(
728 "query".to_string(),
729 JsonValue::String("required string".to_string()),
730 );
731 request_body.insert(
732 "params".to_string(),
733 JsonValue::String("optional array".to_string()),
734 );
735
736 let mut response_shape = Map::new();
737 response_shape.insert(
738 "columns".to_string(),
739 JsonValue::String("projected column names".to_string()),
740 );
741 response_shape.insert(
742 "records[].values".to_string(),
743 JsonValue::String("only projected values".to_string()),
744 );
745 response_shape.insert(
746 "records[].meta".to_string(),
747 JsonValue::String("internal metadata when present".to_string()),
748 );
749
750 let mut object = Map::new();
751 object.insert("ok".to_string(), JsonValue::Bool(false));
752 object.insert(
753 "code".to_string(),
754 JsonValue::String("method_not_allowed".to_string()),
755 );
756 object.insert(
757 "message".to_string(),
758 JsonValue::String("/query accepts POST requests".to_string()),
759 );
760 object.insert(
761 "hint".to_string(),
762 JsonValue::String(
763 "Send raw SQL in the body, or JSON with a string 'query' field.".to_string(),
764 ),
765 );
766 object.insert("method".to_string(), JsonValue::String("POST".to_string()));
767 object.insert("path".to_string(), JsonValue::String("/query".to_string()));
768 object.insert("request_body".to_string(), JsonValue::Object(request_body));
769 object.insert(
770 "response_shape".to_string(),
771 JsonValue::Object(response_shape),
772 );
773 object.insert("examples".to_string(), JsonValue::Object(examples));
774 object.insert(
775 "docs".to_string(),
776 JsonValue::String("https://reddb.io/docs/query".to_string()),
777 );
778
779 json_response(405, JsonValue::Object(object))
780 .with_header("Allow", http::HeaderValue::from_static("POST"))
781 }
782
783 fn handle_root_discovery(&self) -> HttpResponse {
784 let mut endpoints = Map::new();
785 endpoints.insert(
786 "health".to_string(),
787 JsonValue::String("GET /health".to_string()),
788 );
789 endpoints.insert(
790 "ready".to_string(),
791 JsonValue::String("GET /ready".to_string()),
792 );
793 endpoints.insert(
794 "query".to_string(),
795 JsonValue::String("POST /query".to_string()),
796 );
797 endpoints.insert(
798 "query_readiness".to_string(),
799 JsonValue::String("GET /ready/query".to_string()),
800 );
801 endpoints.insert(
802 "catalog".to_string(),
803 JsonValue::String("GET /catalog".to_string()),
804 );
805 endpoints.insert(
806 "deployment_profiles".to_string(),
807 JsonValue::String("GET /deployment/profiles".to_string()),
808 );
809
810 let mut examples = Map::new();
811 examples.insert(
812 "http_raw_sql".to_string(),
813 JsonValue::String("curl -sS http://127.0.0.1:8080/query -d 'SELECT 1'".to_string()),
814 );
815 examples.insert(
816 "http_json_query".to_string(),
817 JsonValue::String(
818 "curl -sS http://127.0.0.1:8080/query -H 'content-type: application/json' -d '{\"query\":\"SELECT 1\"}'"
819 .to_string(),
820 ),
821 );
822 examples.insert(
823 "http_json_query_with_params".to_string(),
824 JsonValue::String(
825 "curl -sS http://127.0.0.1:8080/query -H 'content-type: application/json' -d '{\"query\":\"SELECT $1 AS value\",\"params\":[42]}'"
826 .to_string(),
827 ),
828 );
829
830 let mut object = Map::new();
831 object.insert("ok".to_string(), JsonValue::Bool(true));
832 object.insert(
833 "service".to_string(),
834 JsonValue::String("reddb".to_string()),
835 );
836 object.insert(
837 "version".to_string(),
838 JsonValue::String(env!("CARGO_PKG_VERSION").to_string()),
839 );
840 object.insert("endpoints".to_string(), JsonValue::Object(endpoints));
841 object.insert("examples".to_string(), JsonValue::Object(examples));
842 object.insert(
843 "docs".to_string(),
844 JsonValue::String("https://reddb.io/docs".to_string()),
845 );
846 object.insert(
847 "transport_listeners".to_string(),
848 self.transport_readiness_json(),
849 );
850 json_response(200, JsonValue::Object(object))
851 }
852
853 fn health_json_with_transport(&self, report: &HealthReport) -> JsonValue {
854 let mut value = crate::presentation::ops_json::health_json(report);
855 if let JsonValue::Object(ref mut object) = value {
856 object.insert(
857 "transport_listeners".to_string(),
858 self.transport_readiness_json(),
859 );
860 }
861 value
862 }
863
864 pub fn serve(&self) -> io::Result<()> {
865 let listener = TcpListener::bind(&self.options.bind_addr)?;
866 self.serve_on(listener)
867 }
868
869 pub fn serve_on(&self, listener: TcpListener) -> io::Result<()> {
876 let runtime = axum_edge::build_edge_runtime()?;
877 runtime.block_on(
878 self.clone()
879 .serve_edge_on_std(listener, HttpTransport::Http),
880 )
881 }
882
883 pub fn serve_one_on(&self, listener: TcpListener) -> io::Result<()> {
887 let runtime = axum_edge::build_background_edge_runtime()?;
888 let server = self.clone();
889 runtime.block_on(async move {
890 listener.set_nonblocking(true)?;
891 let listener = tokio::net::TcpListener::from_std(listener)?;
892 let (stream, _peer) = listener.accept().await?;
893 server.serve_edge_one(stream).await;
894 Ok(())
895 })
896 }
897
898 pub fn serve_in_background(&self) -> thread::JoinHandle<io::Result<()>> {
899 let server = self.clone();
900 thread::spawn(move || server.serve())
901 }
902
903 pub fn serve_in_background_on(
904 &self,
905 listener: TcpListener,
906 ) -> thread::JoinHandle<io::Result<()>> {
907 let server = self.clone();
908 thread::spawn(move || {
909 let runtime = axum_edge::build_background_edge_runtime()?;
910 runtime.block_on(server.serve_edge_on_std(listener, HttpTransport::Http))
911 })
912 }
913
914 pub fn serve_tls(&self, tls_config: std::sync::Arc<rustls::ServerConfig>) -> io::Result<()> {
918 let listener = TcpListener::bind(&self.options.bind_addr)?;
919 self.serve_tls_on(listener, tls_config)
920 }
921
922 pub fn serve_tls_on(
923 &self,
924 listener: TcpListener,
925 tls_config: std::sync::Arc<rustls::ServerConfig>,
926 ) -> io::Result<()> {
927 let runtime = axum_edge::build_edge_runtime()?;
928 let acceptor = axum_edge::tls_acceptor(tls_config);
929 runtime.block_on(self.clone().serve_edge_tls_on_std(
930 listener,
931 acceptor,
932 HttpTransport::Https,
933 ))
934 }
935
936 pub fn serve_tls_in_background(
937 &self,
938 tls_config: std::sync::Arc<rustls::ServerConfig>,
939 ) -> thread::JoinHandle<io::Result<()>> {
940 let server = self.clone();
941 thread::spawn(move || server.serve_tls(tls_config))
942 }
943
944 pub fn serve_tls_in_background_on(
945 &self,
946 listener: TcpListener,
947 tls_config: std::sync::Arc<rustls::ServerConfig>,
948 ) -> thread::JoinHandle<io::Result<()>> {
949 let server = self.clone();
950 thread::spawn(move || {
951 let runtime = axum_edge::build_background_edge_runtime()?;
952 let acceptor = axum_edge::tls_acceptor(tls_config);
953 runtime.block_on(server.serve_edge_tls_on_std(listener, acceptor, HttpTransport::Https))
954 })
955 }
956
957 fn handle_connection(&self, stream: TcpStream) -> io::Result<()> {
958 let started = Instant::now();
959 let result = self.handle_connection_inner(stream);
960 let elapsed = started.elapsed().as_secs_f64();
961 self.http_metrics
962 .record_duration(HttpTransport::Http, elapsed);
963 result
964 }
965
966 fn handle_connection_inner(&self, mut stream: TcpStream) -> io::Result<()> {
967 stream.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
968 stream.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;
969
970 let deadline = HandlerDeadline::arm(Arc::clone(&self.handler_clock), self.handler_timeout);
978
979 let request = HttpRequest::read_from(&mut stream, self.options.max_body_bytes)?;
980
981 if deadline.expired() {
983 self.http_metrics
984 .record_reject(HttpTransport::Http, HttpRejectReason::HandlerTimeout);
985 Self::write_handler_timeout_503(&mut stream);
986 return Ok(());
987 }
988
989 if self.try_route_streaming(&request, &mut stream)? {
990 return Ok(());
991 }
992 let response = self.route(request);
993
994 let inject_ms = self.slow_inject_ms.load(Ordering::Relaxed);
998 if inject_ms > 0 {
999 thread::sleep(Duration::from_millis(inject_ms));
1000 }
1001
1002 if deadline.expired() {
1004 self.http_metrics
1005 .record_reject(HttpTransport::Http, HttpRejectReason::HandlerTimeout);
1006 Self::write_handler_timeout_503(&mut stream);
1007 return Ok(());
1008 }
1009
1010 stream.write_all(&response.to_http_bytes())?;
1011 stream.flush()?;
1012 Ok(())
1013 }
1014
1015 fn write_handler_timeout_503<S: Write>(stream: &mut S) {
1021 const RESPONSE: &[u8] = b"HTTP/1.1 503 Service Unavailable\r\n\
1022 Connection: close\r\n\
1023 Content-Length: 0\r\n\
1024 \r\n";
1025 let _ = stream.write_all(RESPONSE);
1026 let _ = stream.flush();
1027 }
1028}