1pub(crate) use crate::application::json_input::{
4 json_bool_field, json_f32_field, json_string_field, json_usize_field,
5};
6pub(crate) use crate::application::{
7 AdminUseCases, CatalogUseCases, CreateDocumentInput, CreateEdgeInput, CreateEntityOutput,
8 CreateKvInput, CreateNodeEmbeddingInput, CreateNodeGraphLinkInput, CreateNodeInput,
9 CreateNodeTableLinkInput, CreateRowInput, CreateVectorInput, DeleteEntityInput, EntityUseCases,
10 ExecuteQueryInput, ExplainQueryInput, GraphCentralityInput, GraphClusteringInput,
11 GraphCommunitiesInput, GraphComponentsInput, GraphCyclesInput, GraphHitsInput,
12 GraphNeighborhoodInput, GraphPersonalizedPageRankInput, GraphShortestPathInput,
13 GraphTopologicalSortInput, GraphTraversalInput, GraphUseCases, InspectNativeArtifactInput,
14 NativeUseCases, PatchEntityInput, PatchEntityOperation, PatchEntityOperationType,
15 QueryUseCases, SearchHybridInput, SearchIvfInput, SearchMultimodalInput, SearchSimilarInput,
16 SearchTextInput, TreeUseCases,
17};
18use std::collections::{BTreeMap, HashMap};
19use std::io::{self, Read, Write};
20use std::net::{TcpListener, TcpStream};
21use std::sync::atomic::{AtomicU64, Ordering};
22use std::thread;
23use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
24
25use std::sync::Arc;
26
27use crate::api::{RedDBError, RedDBOptions, RedDBResult};
28use crate::auth::store::AuthStore;
29use crate::catalog::{CatalogModelSnapshot, CollectionDescriptor, CollectionModel, SchemaMode};
30use crate::health::{HealthProvider, HealthReport, HealthState};
31use crate::json::{parse_json, to_vec as json_to_vec, Map, Value as JsonValue};
32use crate::runtime::{
33 RedDBRuntime, RuntimeFilter, RuntimeFilterValue, RuntimeGraphCentralityAlgorithm,
34 RuntimeGraphCentralityResult, RuntimeGraphClusteringResult, RuntimeGraphCommunityAlgorithm,
35 RuntimeGraphCommunityResult, RuntimeGraphComponentsMode, RuntimeGraphComponentsResult,
36 RuntimeGraphCyclesResult, RuntimeGraphDirection, RuntimeGraphHitsResult,
37 RuntimeGraphNeighborhoodResult, RuntimeGraphPathAlgorithm, RuntimeGraphPathResult,
38 RuntimeGraphPattern, RuntimeGraphProjection, RuntimeGraphTopologicalSortResult,
39 RuntimeGraphTraversalResult, RuntimeGraphTraversalStrategy, RuntimeIvfSearchResult,
40 RuntimeQueryWeights, RuntimeStats, ScanCursor, ScanPage,
41};
42use crate::storage::schema::Value;
43use crate::storage::unified::devx::refs::{NodeRef, TableRef, VectorRef};
44use crate::storage::unified::dsl::{MatchComponents, QueryResult as DslQueryResult};
45use crate::storage::unified::{MetadataValue, RefTarget, SparseVector};
46use crate::storage::{CrossRef, EntityData, EntityId, EntityKind, SimilarResult, UnifiedEntity};
47
48fn analytics_job_json(job: &crate::PhysicalAnalyticsJob) -> JsonValue {
49 crate::presentation::admin_json::analytics_job_json(job)
50}
51
52#[cfg(test)]
53mod tests {
54 use super::*;
55 use crate::api::RedDBOptions;
56 use crate::health::HealthReport;
57 use crate::service_cli::{
58 TransportListenerFailure, TransportListenerState, TransportReadiness,
59 };
60
61 #[test]
62 fn server_options_default_http_body_limit_is_32_mib() {
63 assert_eq!(ServerOptions::default().max_body_bytes, 32 * 1024 * 1024);
64 }
65
66 #[test]
67 fn health_json_reports_transport_listeners() {
68 let runtime = RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime");
69 let mut options = ServerOptions::default();
70 options.transport_readiness = TransportReadiness {
71 active: vec![TransportListenerState {
72 transport: "grpc".to_string(),
73 bind_addr: "127.0.0.1:50051".to_string(),
74 explicit: true,
75 }],
76 failed: vec![TransportListenerFailure {
77 transport: "http".to_string(),
78 bind_addr: "127.0.0.1:5055".to_string(),
79 explicit: false,
80 reason: "http listener bind 127.0.0.1:5055: address in use".to_string(),
81 }],
82 };
83 let server = RedDBServer::with_options(runtime, options);
84
85 let payload = server.health_json_with_transport(&HealthReport::healthy());
86 let JsonValue::Object(root) = payload else {
87 panic!("health payload should be an object");
88 };
89 let Some(JsonValue::Object(listeners)) = root.get("transport_listeners") else {
90 panic!("health payload should include transport_listeners");
91 };
92 let Some(JsonValue::Array(active)) = listeners.get("active") else {
93 panic!("transport_listeners.active should be an array");
94 };
95 let Some(JsonValue::Array(failed)) = listeners.get("failed") else {
96 panic!("transport_listeners.failed should be an array");
97 };
98
99 assert_eq!(active.len(), 1);
100 assert_eq!(failed.len(), 1);
101 }
102}
103
104fn graph_projection_json(projection: &crate::PhysicalGraphProjection) -> JsonValue {
105 crate::presentation::admin_json::graph_projection_json(projection)
106}
107
108pub mod handlers_admin;
109mod handlers_ai;
110mod handlers_auth;
111mod handlers_backup;
112mod handlers_ec;
113pub(crate) mod handlers_entity;
114mod handlers_geo;
115mod handlers_graph;
116mod handlers_keyed;
117mod handlers_log;
118mod handlers_metrics;
119mod handlers_ops;
120mod handlers_query;
121mod handlers_replication;
122mod handlers_vcs;
123mod handlers_vector;
124pub mod header_escape_guard;
125pub mod http_connection_limiter;
126pub mod http_handler_metrics;
127pub mod http_limits;
128pub mod ingest_pipeline;
129mod patch_support;
130mod request_body;
131mod request_context;
132mod routing;
133mod serverless_support;
134pub mod tls;
135mod transport;
136
137use self::handlers_ai::*;
138use self::handlers_entity::*;
139use self::handlers_graph::*;
140use self::handlers_keyed::*;
141use self::handlers_metrics::*;
142use self::handlers_ops::*;
143use self::handlers_query::*;
144use self::http_connection_limiter::{
145 HandlerDeadline, HttpConnectionLimiter, MonotonicClock, SystemMonotonicClock,
146};
147use self::http_handler_metrics::{HttpHandlerMetrics, HttpRejectReason, HttpTransport};
148pub use self::http_limits::{
149 HttpLimitsCliInput, HttpLimitsResolved, DEFAULT_HANDLER_TIMEOUT_MS, DEFAULT_RETRY_AFTER_SECS,
150};
151use self::patch_support::*;
152use self::request_body::*;
153use self::routing::*;
154use self::serverless_support::*;
155use self::transport::*;
156
157#[derive(Debug, Clone, Copy, PartialEq, Eq)]
163pub enum ServerSurface {
164 Public,
166 AdminOnly,
170 MetricsOnly,
174}
175
176#[derive(Debug, Clone)]
177pub struct ServerOptions {
178 pub bind_addr: String,
179 pub max_body_bytes: usize,
180 pub read_timeout_ms: u64,
181 pub write_timeout_ms: u64,
182 pub max_scan_limit: usize,
183 pub surface: ServerSurface,
187 pub transport_readiness: crate::service_cli::TransportReadiness,
188}
189
190pub const DEFAULT_HTTP_MAX_BODY_BYTES: usize = 32 * 1024 * 1024;
191
192impl Default for ServerOptions {
193 fn default() -> Self {
194 Self {
195 bind_addr: "127.0.0.1:5055".to_string(),
196 max_body_bytes: DEFAULT_HTTP_MAX_BODY_BYTES,
197 read_timeout_ms: 5_000,
198 write_timeout_ms: 5_000,
199 max_scan_limit: 1_000,
200 surface: ServerSurface::Public,
201 transport_readiness: crate::service_cli::TransportReadiness::default(),
202 }
203 }
204}
205
206pub struct ServerReplicationState {
208 pub config: crate::replication::ReplicationConfig,
209 pub primary: Option<crate::replication::primary::PrimaryReplication>,
210}
211
212#[derive(Clone)]
213pub struct RedDBServer {
214 runtime: RedDBRuntime,
215 options: ServerOptions,
216 auth_store: Option<Arc<AuthStore>>,
217 replication: Option<Arc<ServerReplicationState>>,
218 http_limiter: HttpConnectionLimiter,
223 handler_timeout: Duration,
229 handler_clock: Arc<dyn MonotonicClock>,
236 slow_inject_ms: Arc<AtomicU64>,
243 http_metrics: HttpHandlerMetrics,
249 retry_after_secs: u64,
253 reject_503_bytes: Arc<Vec<u8>>,
257}
258
259const DEFAULT_HANDLER_TIMEOUT: Duration = Duration::from_millis(30_000);
261
262#[derive(Debug, Clone, Copy, PartialEq, Eq)]
263enum ServerlessWarmupScope {
264 Indexes,
265 GraphProjections,
266 AnalyticsJobs,
267 NativeArtifacts,
268}
269
270#[derive(Debug, Clone, Copy, PartialEq, Eq)]
271enum DeploymentProfile {
272 Embedded,
273 Server,
274 Serverless,
275}
276
277fn percent_decode_path_segment(input: &str) -> Result<String, String> {
278 let bytes = input.as_bytes();
279 let mut out = Vec::with_capacity(bytes.len());
280 let mut index = 0;
281 while index < bytes.len() {
282 match bytes[index] {
283 b'%' => {
284 if index + 2 >= bytes.len() {
285 return Err("truncated percent escape".to_string());
286 }
287 let high = hex_value(bytes[index + 1])
288 .ok_or_else(|| "invalid percent escape".to_string())?;
289 let low = hex_value(bytes[index + 2])
290 .ok_or_else(|| "invalid percent escape".to_string())?;
291 out.push((high << 4) | low);
292 index += 3;
293 }
294 byte => {
295 out.push(byte);
296 index += 1;
297 }
298 }
299 }
300 String::from_utf8(out).map_err(|_| "path segment is not valid UTF-8".to_string())
301}
302
303fn hex_value(byte: u8) -> Option<u8> {
304 match byte {
305 b'0'..=b'9' => Some(byte - b'0'),
306 b'a'..=b'f' => Some(byte - b'a' + 10),
307 b'A'..=b'F' => Some(byte - b'A' + 10),
308 _ => None,
309 }
310}
311
312#[derive(Debug, Clone)]
313struct ParsedQueryRequest {
314 query: String,
315 entity_types: Option<Vec<String>>,
316 capabilities: Option<Vec<String>>,
317 params: Option<Vec<Value>>,
321}
322
323#[derive(Debug, Clone, Copy)]
324enum PatchOperationType {
325 Set,
326 Replace,
327 Unset,
328}
329
330#[derive(Debug, Clone)]
331struct PatchOperation {
332 op: PatchOperationType,
333 path: Vec<String>,
334 value: Option<JsonValue>,
335}
336
337impl RedDBServer {
338 pub fn new(runtime: RedDBRuntime) -> Self {
339 Self::with_options(runtime, ServerOptions::default())
340 }
341
342 pub fn from_database_options(
343 db_options: RedDBOptions,
344 server_options: ServerOptions,
345 ) -> RedDBResult<Self> {
346 let runtime = RedDBRuntime::with_options(db_options)?;
347 Ok(Self::with_options(runtime, server_options))
348 }
349
350 pub fn with_options(runtime: RedDBRuntime, options: ServerOptions) -> Self {
351 Self {
352 runtime,
353 options,
354 auth_store: None,
355 replication: None,
356 http_limiter: HttpConnectionLimiter::with_default_cap(),
357 handler_timeout: DEFAULT_HANDLER_TIMEOUT,
358 handler_clock: Arc::new(SystemMonotonicClock::new()),
359 slow_inject_ms: Arc::new(AtomicU64::new(0)),
360 http_metrics: HttpHandlerMetrics::new(),
361 retry_after_secs: DEFAULT_RETRY_AFTER_SECS,
362 reject_503_bytes: Arc::new(build_reject_503_bytes(DEFAULT_RETRY_AFTER_SECS)),
363 }
364 }
365
366 #[doc(hidden)]
367 pub fn http_metrics(&self) -> &HttpHandlerMetrics {
368 &self.http_metrics
369 }
370
371 #[doc(hidden)]
376 pub fn with_http_limiter_cap(mut self, cap: usize) -> Self {
377 self.http_limiter = HttpConnectionLimiter::new(cap);
378 self
379 }
380
381 pub fn with_http_limits(mut self, limits: HttpLimitsResolved) -> Self {
386 self.http_limiter = HttpConnectionLimiter::new(limits.max_handlers);
387 self.handler_timeout = Duration::from_millis(limits.handler_timeout_ms);
388 self.retry_after_secs = limits.retry_after_secs;
389 self.reject_503_bytes = Arc::new(build_reject_503_bytes(limits.retry_after_secs));
390 self
391 }
392
393 #[doc(hidden)]
394 pub fn retry_after_secs(&self) -> u64 {
395 self.retry_after_secs
396 }
397
398 #[doc(hidden)]
399 pub fn http_limiter(&self) -> &HttpConnectionLimiter {
400 &self.http_limiter
401 }
402
403 #[doc(hidden)]
406 pub fn with_handler_timeout(mut self, timeout: Duration) -> Self {
407 self.handler_timeout = timeout;
408 self
409 }
410
411 #[doc(hidden)]
412 pub fn handler_timeout(&self) -> Duration {
413 self.handler_timeout
414 }
415
416 #[doc(hidden)]
421 pub fn with_handler_clock(mut self, clock: Arc<dyn MonotonicClock>) -> Self {
422 self.handler_clock = clock;
423 self
424 }
425
426 #[doc(hidden)]
432 pub fn set_test_slow_inject_ms(&self, ms: u64) {
433 self.slow_inject_ms.store(ms, Ordering::Relaxed);
434 }
435
436 pub fn with_auth(mut self, auth_store: Arc<AuthStore>) -> Self {
440 self.runtime.set_auth_store(Arc::clone(&auth_store));
441 self.auth_store = Some(auth_store);
442 self
443 }
444
445 pub fn with_replication(mut self, state: Arc<ServerReplicationState>) -> Self {
447 self.replication = Some(state);
448 self
449 }
450
451 pub fn runtime(&self) -> &RedDBRuntime {
452 &self.runtime
453 }
454
455 pub fn options(&self) -> &ServerOptions {
456 &self.options
457 }
458
459 fn query_use_cases(&self) -> QueryUseCases<'_, RedDBRuntime> {
460 QueryUseCases::new(&self.runtime)
461 }
462
463 fn admin_use_cases(&self) -> AdminUseCases<'_, RedDBRuntime> {
464 AdminUseCases::new(&self.runtime)
465 }
466
467 fn entity_use_cases(&self) -> EntityUseCases<'_, RedDBRuntime> {
468 EntityUseCases::new(&self.runtime)
469 }
470
471 fn catalog_use_cases(&self) -> CatalogUseCases<'_, RedDBRuntime> {
472 CatalogUseCases::new(&self.runtime)
473 }
474
475 fn graph_use_cases(&self) -> GraphUseCases<'_, RedDBRuntime> {
476 GraphUseCases::new(&self.runtime)
477 }
478
479 fn native_use_cases(&self) -> NativeUseCases<'_, RedDBRuntime> {
480 NativeUseCases::new(&self.runtime)
481 }
482
483 fn tree_use_cases(&self) -> TreeUseCases<'_, RedDBRuntime> {
484 TreeUseCases::new(&self.runtime)
485 }
486
487 fn transport_readiness_json(&self) -> JsonValue {
488 let active = self
489 .options
490 .transport_readiness
491 .active
492 .iter()
493 .map(|listener| {
494 let mut object = Map::new();
495 object.insert(
496 "transport".to_string(),
497 JsonValue::String(listener.transport.clone()),
498 );
499 object.insert(
500 "bind_addr".to_string(),
501 JsonValue::String(listener.bind_addr.clone()),
502 );
503 object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
504 JsonValue::Object(object)
505 })
506 .collect();
507 let failed = self
508 .options
509 .transport_readiness
510 .failed
511 .iter()
512 .map(|listener| {
513 let mut object = Map::new();
514 object.insert(
515 "transport".to_string(),
516 JsonValue::String(listener.transport.clone()),
517 );
518 object.insert(
519 "bind_addr".to_string(),
520 JsonValue::String(listener.bind_addr.clone()),
521 );
522 object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
523 object.insert(
524 "reason".to_string(),
525 JsonValue::String(listener.reason.clone()),
526 );
527 JsonValue::Object(object)
528 })
529 .collect();
530
531 let mut object = Map::new();
532 object.insert("active".to_string(), JsonValue::Array(active));
533 object.insert("failed".to_string(), JsonValue::Array(failed));
534 JsonValue::Object(object)
535 }
536
537 fn health_json_with_transport(&self, report: &HealthReport) -> JsonValue {
538 let mut value = crate::presentation::ops_json::health_json(report);
539 if let JsonValue::Object(ref mut object) = value {
540 object.insert(
541 "transport_listeners".to_string(),
542 self.transport_readiness_json(),
543 );
544 }
545 value
546 }
547
548 pub fn serve(&self) -> io::Result<()> {
549 let listener = TcpListener::bind(&self.options.bind_addr)?;
550 self.serve_on(listener)
551 }
552
553 pub fn serve_on(&self, listener: TcpListener) -> io::Result<()> {
554 for stream in listener.incoming() {
555 match stream {
556 Ok(stream) => match self.http_limiter.try_acquire() {
557 Some(permit) => {
558 let server = self.clone();
560 thread::spawn(move || {
561 let _guard = permit; let _ = server.handle_connection(stream);
563 });
564 }
565 None => {
566 self.http_metrics
571 .record_reject(HttpTransport::Http, HttpRejectReason::CapExhausted);
572 self.reject_with_503(stream, self.options.write_timeout_ms);
573 }
574 },
575 Err(err) => return Err(err),
576 }
577 }
578 Ok(())
579 }
580
581 fn reject_with_503(&self, mut stream: TcpStream, write_timeout_ms: u64) {
586 let _ = stream.set_write_timeout(Some(Duration::from_millis(write_timeout_ms)));
587 let _ = stream.write_all(&self.reject_503_bytes);
588 let _ = stream.flush();
589 let _ = stream.shutdown(std::net::Shutdown::Both);
590 }
591
592 pub fn serve_one_on(&self, listener: TcpListener) -> io::Result<()> {
593 let (stream, _) = listener.accept()?;
594 self.handle_connection(stream)
595 }
596
597 pub fn serve_in_background(&self) -> thread::JoinHandle<io::Result<()>> {
598 let server = self.clone();
599 thread::spawn(move || server.serve())
600 }
601
602 pub fn serve_in_background_on(
603 &self,
604 listener: TcpListener,
605 ) -> thread::JoinHandle<io::Result<()>> {
606 let server = self.clone();
607 thread::spawn(move || server.serve_on(listener))
608 }
609
610 pub fn serve_tls(&self, tls_config: std::sync::Arc<rustls::ServerConfig>) -> io::Result<()> {
614 let listener = TcpListener::bind(&self.options.bind_addr)?;
615 self.serve_tls_on(listener, tls_config)
616 }
617
618 pub fn serve_tls_on(
619 &self,
620 listener: TcpListener,
621 tls_config: std::sync::Arc<rustls::ServerConfig>,
622 ) -> io::Result<()> {
623 for stream in listener.incoming() {
624 match stream {
625 Ok(stream) => match self.http_limiter.try_acquire() {
626 Some(permit) => {
627 let server = self.clone();
628 let cfg = tls_config.clone();
629 thread::spawn(move || {
630 let _guard = permit; let _ = server.handle_tls_connection(stream, cfg);
632 });
633 }
634 None => {
635 self.http_metrics
641 .record_reject(HttpTransport::Https, HttpRejectReason::CapExhausted);
642 let _ = stream.shutdown(std::net::Shutdown::Both);
643 drop(stream);
644 }
645 },
646 Err(err) => return Err(err),
647 }
648 }
649 Ok(())
650 }
651
652 pub fn serve_tls_in_background(
653 &self,
654 tls_config: std::sync::Arc<rustls::ServerConfig>,
655 ) -> thread::JoinHandle<io::Result<()>> {
656 let server = self.clone();
657 thread::spawn(move || server.serve_tls(tls_config))
658 }
659
660 pub fn serve_tls_in_background_on(
661 &self,
662 listener: TcpListener,
663 tls_config: std::sync::Arc<rustls::ServerConfig>,
664 ) -> thread::JoinHandle<io::Result<()>> {
665 let server = self.clone();
666 thread::spawn(move || server.serve_tls_on(listener, tls_config))
667 }
668
669 fn handle_connection(&self, stream: TcpStream) -> io::Result<()> {
670 let started = Instant::now();
671 let result = self.handle_connection_inner(stream);
672 let elapsed = started.elapsed().as_secs_f64();
673 self.http_metrics
674 .record_duration(HttpTransport::Http, elapsed);
675 result
676 }
677
678 fn handle_connection_inner(&self, mut stream: TcpStream) -> io::Result<()> {
679 stream.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
680 stream.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;
681
682 let deadline = HandlerDeadline::arm(Arc::clone(&self.handler_clock), self.handler_timeout);
690
691 let request = HttpRequest::read_from(&mut stream, self.options.max_body_bytes)?;
692
693 if deadline.expired() {
695 self.http_metrics
696 .record_reject(HttpTransport::Http, HttpRejectReason::HandlerTimeout);
697 Self::write_handler_timeout_503(&mut stream);
698 return Ok(());
699 }
700
701 if self.try_route_streaming(&request, &mut stream)? {
702 return Ok(());
703 }
704 let response = self.route(request);
705
706 let inject_ms = self.slow_inject_ms.load(Ordering::Relaxed);
710 if inject_ms > 0 {
711 thread::sleep(Duration::from_millis(inject_ms));
712 }
713
714 if deadline.expired() {
716 self.http_metrics
717 .record_reject(HttpTransport::Http, HttpRejectReason::HandlerTimeout);
718 Self::write_handler_timeout_503(&mut stream);
719 return Ok(());
720 }
721
722 stream.write_all(&response.to_http_bytes())?;
723 stream.flush()?;
724 Ok(())
725 }
726
727 fn write_handler_timeout_503<S: Write>(stream: &mut S) {
733 const RESPONSE: &[u8] = b"HTTP/1.1 503 Service Unavailable\r\n\
734 Connection: close\r\n\
735 Content-Length: 0\r\n\
736 \r\n";
737 let _ = stream.write_all(RESPONSE);
738 let _ = stream.flush();
739 }
740
741 fn handle_tls_connection(
742 &self,
743 tcp: TcpStream,
744 tls_config: std::sync::Arc<rustls::ServerConfig>,
745 ) -> io::Result<()> {
746 let started = Instant::now();
747 let result = self.handle_tls_connection_inner(tcp, tls_config);
748 let elapsed = started.elapsed().as_secs_f64();
749 self.http_metrics
750 .record_duration(HttpTransport::Https, elapsed);
751 result
752 }
753
754 fn handle_tls_connection_inner(
755 &self,
756 tcp: TcpStream,
757 tls_config: std::sync::Arc<rustls::ServerConfig>,
758 ) -> io::Result<()> {
759 tcp.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
760 tcp.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;
761
762 let deadline = HandlerDeadline::arm(Arc::clone(&self.handler_clock), self.handler_timeout);
766
767 let mut tls_stream = match self::tls::accept_tls(tls_config, tcp) {
768 Ok(s) => s,
769 Err(err) => {
770 tracing::warn!(
771 target: "reddb::http_tls",
772 err = %err,
773 "TLS handshake failed"
774 );
775 return Err(err);
776 }
777 };
778 let request = match HttpRequest::read_from(&mut tls_stream, self.options.max_body_bytes) {
779 Ok(req) => req,
780 Err(err) => {
781 tracing::warn!(
782 target: "reddb::http_tls",
783 err = %err,
784 "TLS request parse failed"
785 );
786 return Err(err);
787 }
788 };
789
790 if deadline.expired() {
792 self.http_metrics
793 .record_reject(HttpTransport::Https, HttpRejectReason::HandlerTimeout);
794 Self::write_handler_timeout_503(&mut tls_stream);
795 return Ok(());
796 }
797
798 if self.try_route_streaming(&request, &mut tls_stream)? {
799 return Ok(());
800 }
801 let response = self.route(request);
802
803 let inject_ms = self.slow_inject_ms.load(Ordering::Relaxed);
806 if inject_ms > 0 {
807 thread::sleep(Duration::from_millis(inject_ms));
808 }
809
810 if deadline.expired() {
812 self.http_metrics
813 .record_reject(HttpTransport::Https, HttpRejectReason::HandlerTimeout);
814 Self::write_handler_timeout_503(&mut tls_stream);
815 return Ok(());
816 }
817
818 tls_stream.write_all(&response.to_http_bytes())?;
819 tls_stream.flush()?;
820 Ok(())
821 }
822}
823
824fn build_reject_503_bytes(retry_after_secs: u64) -> Vec<u8> {
829 format!(
830 "HTTP/1.1 503 Service Unavailable\r\n\
831 Connection: close\r\n\
832 Content-Length: 0\r\n\
833 Retry-After: {retry_after_secs}\r\n\
834 \r\n"
835 )
836 .into_bytes()
837}