1pub(crate) use crate::application::json_input::{
4 json_bool_field, json_f32_field, json_string_field, json_usize_field,
5};
6pub(crate) use crate::application::{
7 AdminUseCases, CatalogUseCases, CreateDocumentInput, CreateEdgeInput, CreateEntityOutput,
8 CreateKvInput, CreateNodeEmbeddingInput, CreateNodeGraphLinkInput, CreateNodeInput,
9 CreateNodeTableLinkInput, CreateRowInput, CreateVectorInput, DeleteEntityInput, EntityUseCases,
10 ExecuteQueryInput, ExplainQueryInput, GraphCentralityInput, GraphClusteringInput,
11 GraphCommunitiesInput, GraphComponentsInput, GraphCyclesInput, GraphHitsInput,
12 GraphNeighborhoodInput, GraphPersonalizedPageRankInput, GraphShortestPathInput,
13 GraphTopologicalSortInput, GraphTraversalInput, GraphUseCases, InspectNativeArtifactInput,
14 NativeUseCases, PatchEntityInput, PatchEntityOperation, PatchEntityOperationType,
15 QueryUseCases, SearchHybridInput, SearchIvfInput, SearchMultimodalInput, SearchSimilarInput,
16 SearchTextInput, TreeUseCases,
17};
18use std::collections::{BTreeMap, HashMap};
19use std::io::{self, Read, Write};
20use std::net::{TcpListener, TcpStream};
21use std::sync::atomic::{AtomicU64, Ordering};
22use std::thread;
23use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
24
25use std::sync::Arc;
26
27use crate::api::{RedDBError, RedDBOptions, RedDBResult};
28use crate::auth::store::AuthStore;
29use crate::catalog::{CatalogModelSnapshot, CollectionDescriptor, CollectionModel, SchemaMode};
30use crate::health::{HealthProvider, HealthReport, HealthState};
31use crate::json::{parse_json, to_vec as json_to_vec, Map, Value as JsonValue};
32use crate::runtime::{
33 RedDBRuntime, RuntimeFilter, RuntimeFilterValue, RuntimeGraphCentralityAlgorithm,
34 RuntimeGraphCentralityResult, RuntimeGraphClusteringResult, RuntimeGraphCommunityAlgorithm,
35 RuntimeGraphCommunityResult, RuntimeGraphComponentsMode, RuntimeGraphComponentsResult,
36 RuntimeGraphCyclesResult, RuntimeGraphDirection, RuntimeGraphHitsResult,
37 RuntimeGraphNeighborhoodResult, RuntimeGraphPathAlgorithm, RuntimeGraphPathResult,
38 RuntimeGraphPattern, RuntimeGraphProjection, RuntimeGraphTopologicalSortResult,
39 RuntimeGraphTraversalResult, RuntimeGraphTraversalStrategy, RuntimeIvfSearchResult,
40 RuntimeQueryWeights, RuntimeStats, ScanCursor, ScanPage,
41};
42use crate::storage::schema::Value;
43use crate::storage::unified::devx::refs::{NodeRef, TableRef, VectorRef};
44use crate::storage::unified::dsl::{MatchComponents, QueryResult as DslQueryResult};
45use crate::storage::unified::{MetadataValue, RefTarget, SparseVector};
46use crate::storage::{CrossRef, EntityData, EntityId, EntityKind, SimilarResult, UnifiedEntity};
47
48fn analytics_job_json(job: &crate::PhysicalAnalyticsJob) -> JsonValue {
49 crate::presentation::admin_json::analytics_job_json(job)
50}
51
52#[cfg(test)]
53mod tests {
54 use super::*;
55 use crate::api::RedDBOptions;
56 use crate::health::HealthReport;
57 use crate::service_cli::{
58 TransportListenerFailure, TransportListenerState, TransportReadiness,
59 };
60
61 #[test]
62 fn server_options_default_http_body_limit_is_32_mib() {
63 assert_eq!(ServerOptions::default().max_body_bytes, 32 * 1024 * 1024);
64 }
65
66 #[test]
67 fn health_json_reports_transport_listeners() {
68 let runtime = RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime");
69 let mut options = ServerOptions::default();
70 options.transport_readiness = TransportReadiness {
71 active: vec![TransportListenerState {
72 transport: "grpc".to_string(),
73 bind_addr: "127.0.0.1:50051".to_string(),
74 explicit: true,
75 }],
76 failed: vec![TransportListenerFailure {
77 transport: "http".to_string(),
78 bind_addr: "127.0.0.1:5055".to_string(),
79 explicit: false,
80 reason: "http listener bind 127.0.0.1:5055: address in use".to_string(),
81 }],
82 };
83 let server = RedDBServer::with_options(runtime, options);
84
85 let payload = server.health_json_with_transport(&HealthReport::healthy());
86 let JsonValue::Object(root) = payload else {
87 panic!("health payload should be an object");
88 };
89 let Some(JsonValue::Object(listeners)) = root.get("transport_listeners") else {
90 panic!("health payload should include transport_listeners");
91 };
92 let Some(JsonValue::Array(active)) = listeners.get("active") else {
93 panic!("transport_listeners.active should be an array");
94 };
95 let Some(JsonValue::Array(failed)) = listeners.get("failed") else {
96 panic!("transport_listeners.failed should be an array");
97 };
98
99 assert_eq!(active.len(), 1);
100 assert_eq!(failed.len(), 1);
101 }
102}
103
104fn graph_projection_json(projection: &crate::PhysicalGraphProjection) -> JsonValue {
105 crate::presentation::admin_json::graph_projection_json(projection)
106}
107
108pub mod handlers_admin;
109mod handlers_ai;
110mod handlers_ai_model_cache;
111mod handlers_auth;
112mod handlers_backup;
113mod handlers_collection_policy;
114mod handlers_ec;
115pub(crate) mod handlers_entity;
116mod handlers_geo;
117mod handlers_graph;
118mod handlers_keyed;
119mod handlers_log;
120mod handlers_metrics;
121mod handlers_ops;
122mod handlers_ops_policy;
123pub(crate) mod handlers_query;
128mod handlers_replication;
129mod handlers_topology;
130mod handlers_vcs;
131mod handlers_vector;
132pub mod header_escape_guard;
133pub mod http_connection_limiter;
134pub mod http_handler_metrics;
135pub mod http_limits;
136pub mod ingest_pipeline;
137pub mod output_stream;
138mod patch_support;
139mod request_body;
140mod request_context;
141mod routing;
142mod serverless_support;
143pub mod tls;
144mod transport;
145
146use self::handlers_ai::*;
147use self::handlers_entity::*;
148use self::handlers_graph::*;
149use self::handlers_keyed::*;
150use self::handlers_metrics::*;
151use self::handlers_ops::*;
152use self::handlers_query::*;
153use self::http_connection_limiter::{
154 HandlerDeadline, HttpConnectionLimiter, MonotonicClock, SystemMonotonicClock,
155};
156use self::http_handler_metrics::{HttpHandlerMetrics, HttpRejectReason, HttpTransport};
157pub use self::http_limits::{
158 HttpLimitsCliInput, HttpLimitsResolved, DEFAULT_HANDLER_TIMEOUT_MS, DEFAULT_RETRY_AFTER_SECS,
159};
160use self::patch_support::*;
161use self::request_body::*;
162use self::routing::*;
163use self::serverless_support::*;
164use self::transport::*;
165
166#[derive(Debug, Clone, Copy, PartialEq, Eq)]
172pub enum ServerSurface {
173 Public,
175 AdminOnly,
179 MetricsOnly,
183}
184
185#[derive(Debug, Clone)]
186pub struct ServerOptions {
187 pub bind_addr: String,
188 pub max_body_bytes: usize,
189 pub read_timeout_ms: u64,
190 pub write_timeout_ms: u64,
191 pub max_scan_limit: usize,
192 pub surface: ServerSurface,
196 pub transport_readiness: crate::service_cli::TransportReadiness,
197}
198
199pub const DEFAULT_HTTP_MAX_BODY_BYTES: usize = 32 * 1024 * 1024;
200
201impl Default for ServerOptions {
202 fn default() -> Self {
203 Self {
204 bind_addr: "127.0.0.1:5055".to_string(),
205 max_body_bytes: DEFAULT_HTTP_MAX_BODY_BYTES,
206 read_timeout_ms: 5_000,
207 write_timeout_ms: 5_000,
208 max_scan_limit: 1_000,
209 surface: ServerSurface::Public,
210 transport_readiness: crate::service_cli::TransportReadiness::default(),
211 }
212 }
213}
214
215pub struct ServerReplicationState {
217 pub config: crate::replication::ReplicationConfig,
218 pub primary: Option<crate::replication::primary::PrimaryReplication>,
219}
220
221#[derive(Clone)]
222pub struct RedDBServer {
223 runtime: RedDBRuntime,
224 options: ServerOptions,
225 auth_store: Option<Arc<AuthStore>>,
226 replication: Option<Arc<ServerReplicationState>>,
227 http_limiter: HttpConnectionLimiter,
232 handler_timeout: Duration,
238 handler_clock: Arc<dyn MonotonicClock>,
245 slow_inject_ms: Arc<AtomicU64>,
252 http_metrics: HttpHandlerMetrics,
258 retry_after_secs: u64,
262 reject_503_bytes: Arc<Vec<u8>>,
266 pub(crate) stream_capacity: Arc<output_stream::StreamCapacityRegistry>,
274 pub(crate) lease_registry: Arc<output_stream::LeaseRegistry>,
279 pub(crate) cursor_registry: Arc<output_stream::CursorRegistry>,
284}
285
286const DEFAULT_HANDLER_TIMEOUT: Duration = Duration::from_millis(30_000);
288
289#[derive(Debug, Clone, Copy, PartialEq, Eq)]
290enum ServerlessWarmupScope {
291 Indexes,
292 GraphProjections,
293 AnalyticsJobs,
294 NativeArtifacts,
295}
296
297#[derive(Debug, Clone, Copy, PartialEq, Eq)]
298enum DeploymentProfile {
299 Embedded,
300 Server,
301 Serverless,
302}
303
304fn percent_decode_path_segment(input: &str) -> Result<String, String> {
305 let bytes = input.as_bytes();
306 let mut out = Vec::with_capacity(bytes.len());
307 let mut index = 0;
308 while index < bytes.len() {
309 match bytes[index] {
310 b'%' => {
311 if index + 2 >= bytes.len() {
312 return Err("truncated percent escape".to_string());
313 }
314 let high = hex_value(bytes[index + 1])
315 .ok_or_else(|| "invalid percent escape".to_string())?;
316 let low = hex_value(bytes[index + 2])
317 .ok_or_else(|| "invalid percent escape".to_string())?;
318 out.push((high << 4) | low);
319 index += 3;
320 }
321 byte => {
322 out.push(byte);
323 index += 1;
324 }
325 }
326 }
327 String::from_utf8(out).map_err(|_| "path segment is not valid UTF-8".to_string())
328}
329
330fn hex_value(byte: u8) -> Option<u8> {
331 match byte {
332 b'0'..=b'9' => Some(byte - b'0'),
333 b'a'..=b'f' => Some(byte - b'a' + 10),
334 b'A'..=b'F' => Some(byte - b'A' + 10),
335 _ => None,
336 }
337}
338
339#[derive(Debug, Clone)]
340struct ParsedQueryRequest {
341 query: String,
342 entity_types: Option<Vec<String>>,
343 capabilities: Option<Vec<String>>,
344 params: Option<Vec<Value>>,
348}
349
350#[derive(Debug, Clone, Copy)]
351enum PatchOperationType {
352 Set,
353 Replace,
354 Unset,
355}
356
357#[derive(Debug, Clone)]
358struct PatchOperation {
359 op: PatchOperationType,
360 path: Vec<String>,
361 value: Option<JsonValue>,
362}
363
364impl RedDBServer {
365 pub fn new(runtime: RedDBRuntime) -> Self {
366 Self::with_options(runtime, ServerOptions::default())
367 }
368
369 pub fn from_database_options(
370 db_options: RedDBOptions,
371 server_options: ServerOptions,
372 ) -> RedDBResult<Self> {
373 let runtime = RedDBRuntime::with_options(db_options)?;
374 Ok(Self::with_options(runtime, server_options))
375 }
376
377 pub fn with_options(runtime: RedDBRuntime, options: ServerOptions) -> Self {
378 Self {
379 runtime,
380 options,
381 auth_store: None,
382 replication: None,
383 http_limiter: HttpConnectionLimiter::with_default_cap(),
384 handler_timeout: DEFAULT_HANDLER_TIMEOUT,
385 handler_clock: Arc::new(SystemMonotonicClock::new()),
386 slow_inject_ms: Arc::new(AtomicU64::new(0)),
387 http_metrics: HttpHandlerMetrics::new(),
388 retry_after_secs: DEFAULT_RETRY_AFTER_SECS,
389 reject_503_bytes: Arc::new(build_reject_503_bytes(DEFAULT_RETRY_AFTER_SECS)),
390 stream_capacity: output_stream::StreamCapacityRegistry::new(),
391 lease_registry: output_stream::LeaseRegistry::new(),
392 cursor_registry: output_stream::CursorRegistry::new(),
393 }
394 }
395
396 #[doc(hidden)]
397 pub fn stream_capacity(&self) -> &Arc<output_stream::StreamCapacityRegistry> {
398 &self.stream_capacity
399 }
400
401 #[doc(hidden)]
402 pub fn lease_registry(&self) -> &Arc<output_stream::LeaseRegistry> {
403 &self.lease_registry
404 }
405
406 #[doc(hidden)]
407 pub fn cursor_registry(&self) -> &Arc<output_stream::CursorRegistry> {
408 &self.cursor_registry
409 }
410
411 #[doc(hidden)]
412 pub fn http_metrics(&self) -> &HttpHandlerMetrics {
413 &self.http_metrics
414 }
415
416 #[doc(hidden)]
421 pub fn with_http_limiter_cap(mut self, cap: usize) -> Self {
422 self.http_limiter = HttpConnectionLimiter::new(cap);
423 self
424 }
425
426 pub fn with_http_limits(mut self, limits: HttpLimitsResolved) -> Self {
431 self.http_limiter = HttpConnectionLimiter::new(limits.max_handlers);
432 self.handler_timeout = Duration::from_millis(limits.handler_timeout_ms);
433 self.retry_after_secs = limits.retry_after_secs;
434 self.reject_503_bytes = Arc::new(build_reject_503_bytes(limits.retry_after_secs));
435 self
436 }
437
438 #[doc(hidden)]
439 pub fn retry_after_secs(&self) -> u64 {
440 self.retry_after_secs
441 }
442
443 #[doc(hidden)]
444 pub fn http_limiter(&self) -> &HttpConnectionLimiter {
445 &self.http_limiter
446 }
447
448 #[doc(hidden)]
451 pub fn with_handler_timeout(mut self, timeout: Duration) -> Self {
452 self.handler_timeout = timeout;
453 self
454 }
455
456 #[doc(hidden)]
457 pub fn handler_timeout(&self) -> Duration {
458 self.handler_timeout
459 }
460
461 #[doc(hidden)]
466 pub fn with_handler_clock(mut self, clock: Arc<dyn MonotonicClock>) -> Self {
467 self.handler_clock = clock;
468 self
469 }
470
471 #[doc(hidden)]
477 pub fn set_test_slow_inject_ms(&self, ms: u64) {
478 self.slow_inject_ms.store(ms, Ordering::Relaxed);
479 }
480
481 pub fn with_auth(mut self, auth_store: Arc<AuthStore>) -> Self {
485 self.runtime.set_auth_store(Arc::clone(&auth_store));
486 self.auth_store = Some(auth_store);
487 self
488 }
489
490 pub fn with_replication(mut self, state: Arc<ServerReplicationState>) -> Self {
492 self.replication = Some(state);
493 self
494 }
495
496 pub fn runtime(&self) -> &RedDBRuntime {
497 &self.runtime
498 }
499
500 pub fn options(&self) -> &ServerOptions {
501 &self.options
502 }
503
504 fn query_use_cases(&self) -> QueryUseCases<'_, RedDBRuntime> {
505 QueryUseCases::new(&self.runtime)
506 }
507
508 fn admin_use_cases(&self) -> AdminUseCases<'_, RedDBRuntime> {
509 AdminUseCases::new(&self.runtime)
510 }
511
512 fn entity_use_cases(&self) -> EntityUseCases<'_, RedDBRuntime> {
513 EntityUseCases::new(&self.runtime)
514 }
515
516 fn catalog_use_cases(&self) -> CatalogUseCases<'_, RedDBRuntime> {
517 CatalogUseCases::new(&self.runtime)
518 }
519
520 fn graph_use_cases(&self) -> GraphUseCases<'_, RedDBRuntime> {
521 GraphUseCases::new(&self.runtime)
522 }
523
524 fn native_use_cases(&self) -> NativeUseCases<'_, RedDBRuntime> {
525 NativeUseCases::new(&self.runtime)
526 }
527
528 fn tree_use_cases(&self) -> TreeUseCases<'_, RedDBRuntime> {
529 TreeUseCases::new(&self.runtime)
530 }
531
532 fn transport_readiness_json(&self) -> JsonValue {
533 let active = self
534 .options
535 .transport_readiness
536 .active
537 .iter()
538 .map(|listener| {
539 let mut object = Map::new();
540 object.insert(
541 "transport".to_string(),
542 JsonValue::String(listener.transport.clone()),
543 );
544 object.insert(
545 "bind_addr".to_string(),
546 JsonValue::String(listener.bind_addr.clone()),
547 );
548 object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
549 JsonValue::Object(object)
550 })
551 .collect();
552 let failed = self
553 .options
554 .transport_readiness
555 .failed
556 .iter()
557 .map(|listener| {
558 let mut object = Map::new();
559 object.insert(
560 "transport".to_string(),
561 JsonValue::String(listener.transport.clone()),
562 );
563 object.insert(
564 "bind_addr".to_string(),
565 JsonValue::String(listener.bind_addr.clone()),
566 );
567 object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
568 object.insert(
569 "reason".to_string(),
570 JsonValue::String(listener.reason.clone()),
571 );
572 JsonValue::Object(object)
573 })
574 .collect();
575
576 let mut object = Map::new();
577 object.insert("active".to_string(), JsonValue::Array(active));
578 object.insert("failed".to_string(), JsonValue::Array(failed));
579 JsonValue::Object(object)
580 }
581
582 fn handle_grpc_discovery(&self) -> HttpResponse {
583 let mut methods = Map::new();
584 methods.insert(
585 "query".to_string(),
586 JsonValue::String("reddb.v1.RedDB/Query".to_string()),
587 );
588 methods.insert(
589 "batch_query".to_string(),
590 JsonValue::String("reddb.v1.RedDB/BatchQuery".to_string()),
591 );
592 methods.insert(
593 "health".to_string(),
594 JsonValue::String("reddb.v1.RedDB/Health".to_string()),
595 );
596 methods.insert(
597 "prepare".to_string(),
598 JsonValue::String("reddb.v1.RedDB/Prepare".to_string()),
599 );
600 methods.insert(
601 "execute_prepared".to_string(),
602 JsonValue::String("reddb.v1.RedDB/ExecutePrepared".to_string()),
603 );
604
605 let mut examples = Map::new();
606 examples.insert(
607 "query".to_string(),
608 JsonValue::String(
609 "grpcurl -plaintext -d '{\"query\":\"SELECT 1\"}' 127.0.0.1:50051 reddb.v1.RedDB/Query"
610 .to_string(),
611 ),
612 );
613 examples.insert(
614 "query_with_params".to_string(),
615 JsonValue::String(
616 "grpcurl -plaintext -d '{\"query\":\"SELECT $1 AS value\",\"params\":[{\"intValue\":42}]}' 127.0.0.1:50051 reddb.v1.RedDB/Query"
617 .to_string(),
618 ),
619 );
620 examples.insert(
621 "health".to_string(),
622 JsonValue::String(
623 "grpcurl -plaintext -d '{}' 127.0.0.1:50051 reddb.v1.RedDB/Health".to_string(),
624 ),
625 );
626
627 let mut object = Map::new();
628 object.insert("ok".to_string(), JsonValue::Bool(true));
629 object.insert(
630 "service".to_string(),
631 JsonValue::String("reddb.v1.RedDB".to_string()),
632 );
633 object.insert(
634 "package".to_string(),
635 JsonValue::String("reddb.v1".to_string()),
636 );
637 object.insert(
638 "proto".to_string(),
639 JsonValue::String("crates/reddb-grpc-proto/proto/reddb.proto".to_string()),
640 );
641 object.insert("methods".to_string(), JsonValue::Object(methods));
642 object.insert("examples".to_string(), JsonValue::Object(examples));
643 object.insert(
644 "transport_listeners".to_string(),
645 self.transport_readiness_json(),
646 );
647 object.insert(
648 "hint".to_string(),
649 JsonValue::String(
650 "If grpcurl cannot list services, pass the proto file with -import-path crates/reddb-grpc-proto/proto -proto reddb.proto."
651 .to_string(),
652 ),
653 );
654 json_response(200, JsonValue::Object(object))
655 }
656
657 fn handle_query_contract(&self) -> HttpResponse {
658 let mut examples = Map::new();
659 examples.insert(
660 "raw_sql".to_string(),
661 JsonValue::String("curl -sS http://127.0.0.1:8080/query -d 'SELECT 1'".to_string()),
662 );
663 examples.insert(
664 "json_query".to_string(),
665 JsonValue::String(
666 "curl -sS http://127.0.0.1:8080/query -H 'content-type: application/json' -d '{\"query\":\"SELECT 1\"}'"
667 .to_string(),
668 ),
669 );
670 examples.insert(
671 "json_query_with_params".to_string(),
672 JsonValue::String(
673 "curl -sS http://127.0.0.1:8080/query -H 'content-type: application/json' -d '{\"query\":\"SELECT $1 AS value\",\"params\":[42]}'"
674 .to_string(),
675 ),
676 );
677
678 let mut request_body = Map::new();
679 request_body.insert(
680 "query".to_string(),
681 JsonValue::String("required string".to_string()),
682 );
683 request_body.insert(
684 "params".to_string(),
685 JsonValue::String("optional array".to_string()),
686 );
687
688 let mut response_shape = Map::new();
689 response_shape.insert(
690 "columns".to_string(),
691 JsonValue::String("projected column names".to_string()),
692 );
693 response_shape.insert(
694 "records[].values".to_string(),
695 JsonValue::String("only projected values".to_string()),
696 );
697 response_shape.insert(
698 "records[].meta".to_string(),
699 JsonValue::String("internal metadata when present".to_string()),
700 );
701
702 let mut object = Map::new();
703 object.insert("ok".to_string(), JsonValue::Bool(false));
704 object.insert(
705 "code".to_string(),
706 JsonValue::String("method_not_allowed".to_string()),
707 );
708 object.insert(
709 "message".to_string(),
710 JsonValue::String("/query accepts POST requests".to_string()),
711 );
712 object.insert(
713 "hint".to_string(),
714 JsonValue::String(
715 "Send raw SQL in the body, or JSON with a string 'query' field.".to_string(),
716 ),
717 );
718 object.insert("method".to_string(), JsonValue::String("POST".to_string()));
719 object.insert("path".to_string(), JsonValue::String("/query".to_string()));
720 object.insert("request_body".to_string(), JsonValue::Object(request_body));
721 object.insert(
722 "response_shape".to_string(),
723 JsonValue::Object(response_shape),
724 );
725 object.insert("examples".to_string(), JsonValue::Object(examples));
726 object.insert(
727 "docs".to_string(),
728 JsonValue::String("https://reddb.io/docs/query".to_string()),
729 );
730
731 json_response(405, JsonValue::Object(object))
732 .with_header("Allow", http::HeaderValue::from_static("POST"))
733 }
734
735 fn handle_root_discovery(&self) -> HttpResponse {
736 let mut endpoints = Map::new();
737 endpoints.insert(
738 "health".to_string(),
739 JsonValue::String("GET /health".to_string()),
740 );
741 endpoints.insert(
742 "ready".to_string(),
743 JsonValue::String("GET /ready".to_string()),
744 );
745 endpoints.insert(
746 "query".to_string(),
747 JsonValue::String("POST /query".to_string()),
748 );
749 endpoints.insert(
750 "query_readiness".to_string(),
751 JsonValue::String("GET /ready/query".to_string()),
752 );
753 endpoints.insert(
754 "catalog".to_string(),
755 JsonValue::String("GET /catalog".to_string()),
756 );
757 endpoints.insert(
758 "deployment_profiles".to_string(),
759 JsonValue::String("GET /deployment/profiles".to_string()),
760 );
761
762 let mut examples = Map::new();
763 examples.insert(
764 "http_raw_sql".to_string(),
765 JsonValue::String("curl -sS http://127.0.0.1:8080/query -d 'SELECT 1'".to_string()),
766 );
767 examples.insert(
768 "http_json_query".to_string(),
769 JsonValue::String(
770 "curl -sS http://127.0.0.1:8080/query -H 'content-type: application/json' -d '{\"query\":\"SELECT 1\"}'"
771 .to_string(),
772 ),
773 );
774 examples.insert(
775 "http_json_query_with_params".to_string(),
776 JsonValue::String(
777 "curl -sS http://127.0.0.1:8080/query -H 'content-type: application/json' -d '{\"query\":\"SELECT $1 AS value\",\"params\":[42]}'"
778 .to_string(),
779 ),
780 );
781
782 let mut object = Map::new();
783 object.insert("ok".to_string(), JsonValue::Bool(true));
784 object.insert(
785 "service".to_string(),
786 JsonValue::String("reddb".to_string()),
787 );
788 object.insert(
789 "version".to_string(),
790 JsonValue::String(env!("CARGO_PKG_VERSION").to_string()),
791 );
792 object.insert("endpoints".to_string(), JsonValue::Object(endpoints));
793 object.insert("examples".to_string(), JsonValue::Object(examples));
794 object.insert(
795 "docs".to_string(),
796 JsonValue::String("https://reddb.io/docs".to_string()),
797 );
798 object.insert(
799 "transport_listeners".to_string(),
800 self.transport_readiness_json(),
801 );
802 json_response(200, JsonValue::Object(object))
803 }
804
805 fn health_json_with_transport(&self, report: &HealthReport) -> JsonValue {
806 let mut value = crate::presentation::ops_json::health_json(report);
807 if let JsonValue::Object(ref mut object) = value {
808 object.insert(
809 "transport_listeners".to_string(),
810 self.transport_readiness_json(),
811 );
812 }
813 value
814 }
815
816 pub fn serve(&self) -> io::Result<()> {
817 let listener = TcpListener::bind(&self.options.bind_addr)?;
818 self.serve_on(listener)
819 }
820
821 pub fn serve_on(&self, listener: TcpListener) -> io::Result<()> {
822 for stream in listener.incoming() {
823 match stream {
824 Ok(stream) => match self.http_limiter.try_acquire() {
825 Some(permit) => {
826 let server = self.clone();
828 thread::spawn(move || {
829 let _guard = permit; let _ = server.handle_connection(stream);
831 });
832 }
833 None => {
834 self.http_metrics
839 .record_reject(HttpTransport::Http, HttpRejectReason::CapExhausted);
840 self.reject_with_503(stream, self.options.write_timeout_ms);
841 }
842 },
843 Err(err) => return Err(err),
844 }
845 }
846 Ok(())
847 }
848
849 fn reject_with_503(&self, mut stream: TcpStream, write_timeout_ms: u64) {
854 let _ = stream.set_write_timeout(Some(Duration::from_millis(write_timeout_ms)));
855 let _ = stream.write_all(&self.reject_503_bytes);
856 let _ = stream.flush();
857 let _ = stream.shutdown(std::net::Shutdown::Both);
858 }
859
860 pub fn serve_one_on(&self, listener: TcpListener) -> io::Result<()> {
861 let (stream, _) = listener.accept()?;
862 self.handle_connection(stream)
863 }
864
865 pub fn serve_in_background(&self) -> thread::JoinHandle<io::Result<()>> {
866 let server = self.clone();
867 thread::spawn(move || server.serve())
868 }
869
870 pub fn serve_in_background_on(
871 &self,
872 listener: TcpListener,
873 ) -> thread::JoinHandle<io::Result<()>> {
874 let server = self.clone();
875 thread::spawn(move || server.serve_on(listener))
876 }
877
878 pub fn serve_tls(&self, tls_config: std::sync::Arc<rustls::ServerConfig>) -> io::Result<()> {
882 let listener = TcpListener::bind(&self.options.bind_addr)?;
883 self.serve_tls_on(listener, tls_config)
884 }
885
886 pub fn serve_tls_on(
887 &self,
888 listener: TcpListener,
889 tls_config: std::sync::Arc<rustls::ServerConfig>,
890 ) -> io::Result<()> {
891 for stream in listener.incoming() {
892 match stream {
893 Ok(stream) => match self.http_limiter.try_acquire() {
894 Some(permit) => {
895 let server = self.clone();
896 let cfg = tls_config.clone();
897 thread::spawn(move || {
898 let _guard = permit; let _ = server.handle_tls_connection(stream, cfg);
900 });
901 }
902 None => {
903 self.http_metrics
909 .record_reject(HttpTransport::Https, HttpRejectReason::CapExhausted);
910 let _ = stream.shutdown(std::net::Shutdown::Both);
911 drop(stream);
912 }
913 },
914 Err(err) => return Err(err),
915 }
916 }
917 Ok(())
918 }
919
920 pub fn serve_tls_in_background(
921 &self,
922 tls_config: std::sync::Arc<rustls::ServerConfig>,
923 ) -> thread::JoinHandle<io::Result<()>> {
924 let server = self.clone();
925 thread::spawn(move || server.serve_tls(tls_config))
926 }
927
928 pub fn serve_tls_in_background_on(
929 &self,
930 listener: TcpListener,
931 tls_config: std::sync::Arc<rustls::ServerConfig>,
932 ) -> thread::JoinHandle<io::Result<()>> {
933 let server = self.clone();
934 thread::spawn(move || server.serve_tls_on(listener, tls_config))
935 }
936
937 fn handle_connection(&self, stream: TcpStream) -> io::Result<()> {
938 let started = Instant::now();
939 let result = self.handle_connection_inner(stream);
940 let elapsed = started.elapsed().as_secs_f64();
941 self.http_metrics
942 .record_duration(HttpTransport::Http, elapsed);
943 result
944 }
945
946 fn handle_connection_inner(&self, mut stream: TcpStream) -> io::Result<()> {
947 stream.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
948 stream.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;
949
950 let deadline = HandlerDeadline::arm(Arc::clone(&self.handler_clock), self.handler_timeout);
958
959 let request = HttpRequest::read_from(&mut stream, self.options.max_body_bytes)?;
960
961 if deadline.expired() {
963 self.http_metrics
964 .record_reject(HttpTransport::Http, HttpRejectReason::HandlerTimeout);
965 Self::write_handler_timeout_503(&mut stream);
966 return Ok(());
967 }
968
969 if self.try_route_streaming(&request, &mut stream)? {
970 return Ok(());
971 }
972 let response = self.route(request);
973
974 let inject_ms = self.slow_inject_ms.load(Ordering::Relaxed);
978 if inject_ms > 0 {
979 thread::sleep(Duration::from_millis(inject_ms));
980 }
981
982 if deadline.expired() {
984 self.http_metrics
985 .record_reject(HttpTransport::Http, HttpRejectReason::HandlerTimeout);
986 Self::write_handler_timeout_503(&mut stream);
987 return Ok(());
988 }
989
990 stream.write_all(&response.to_http_bytes())?;
991 stream.flush()?;
992 Ok(())
993 }
994
995 fn write_handler_timeout_503<S: Write>(stream: &mut S) {
1001 const RESPONSE: &[u8] = b"HTTP/1.1 503 Service Unavailable\r\n\
1002 Connection: close\r\n\
1003 Content-Length: 0\r\n\
1004 \r\n";
1005 let _ = stream.write_all(RESPONSE);
1006 let _ = stream.flush();
1007 }
1008
1009 fn handle_tls_connection(
1010 &self,
1011 tcp: TcpStream,
1012 tls_config: std::sync::Arc<rustls::ServerConfig>,
1013 ) -> io::Result<()> {
1014 let started = Instant::now();
1015 let result = self.handle_tls_connection_inner(tcp, tls_config);
1016 let elapsed = started.elapsed().as_secs_f64();
1017 self.http_metrics
1018 .record_duration(HttpTransport::Https, elapsed);
1019 result
1020 }
1021
1022 fn handle_tls_connection_inner(
1023 &self,
1024 tcp: TcpStream,
1025 tls_config: std::sync::Arc<rustls::ServerConfig>,
1026 ) -> io::Result<()> {
1027 tcp.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
1028 tcp.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;
1029
1030 let deadline = HandlerDeadline::arm(Arc::clone(&self.handler_clock), self.handler_timeout);
1034
1035 let mut tls_stream = match self::tls::accept_tls(tls_config, tcp) {
1036 Ok(s) => s,
1037 Err(err) => {
1038 tracing::warn!(
1039 target: "reddb::http_tls",
1040 err = %err,
1041 "TLS handshake failed"
1042 );
1043 return Err(err);
1044 }
1045 };
1046 let request = match HttpRequest::read_from(&mut tls_stream, self.options.max_body_bytes) {
1047 Ok(req) => req,
1048 Err(err) => {
1049 tracing::warn!(
1050 target: "reddb::http_tls",
1051 err = %err,
1052 "TLS request parse failed"
1053 );
1054 return Err(err);
1055 }
1056 };
1057
1058 if deadline.expired() {
1060 self.http_metrics
1061 .record_reject(HttpTransport::Https, HttpRejectReason::HandlerTimeout);
1062 Self::write_handler_timeout_503(&mut tls_stream);
1063 return Ok(());
1064 }
1065
1066 if self.try_route_streaming(&request, &mut tls_stream)? {
1067 return Ok(());
1068 }
1069 let response = self.route(request);
1070
1071 let inject_ms = self.slow_inject_ms.load(Ordering::Relaxed);
1074 if inject_ms > 0 {
1075 thread::sleep(Duration::from_millis(inject_ms));
1076 }
1077
1078 if deadline.expired() {
1080 self.http_metrics
1081 .record_reject(HttpTransport::Https, HttpRejectReason::HandlerTimeout);
1082 Self::write_handler_timeout_503(&mut tls_stream);
1083 return Ok(());
1084 }
1085
1086 tls_stream.write_all(&response.to_http_bytes())?;
1087 tls_stream.flush()?;
1088 Ok(())
1089 }
1090}
1091
1092fn build_reject_503_bytes(retry_after_secs: u64) -> Vec<u8> {
1097 format!(
1098 "HTTP/1.1 503 Service Unavailable\r\n\
1099 Connection: close\r\n\
1100 Content-Length: 0\r\n\
1101 Retry-After: {retry_after_secs}\r\n\
1102 \r\n"
1103 )
1104 .into_bytes()
1105}