1pub(crate) use crate::application::json_input::{
4 json_bool_field, json_f32_field, json_string_field, json_usize_field,
5};
6pub(crate) use crate::application::{
7 AdminUseCases, CatalogUseCases, CreateDocumentInput, CreateEdgeInput, CreateEntityOutput,
8 CreateKvInput, CreateNodeEmbeddingInput, CreateNodeGraphLinkInput, CreateNodeInput,
9 CreateNodeTableLinkInput, CreateRowInput, CreateVectorInput, DeleteEntityInput, EntityUseCases,
10 ExecuteQueryInput, ExplainQueryInput, GraphCentralityInput, GraphClusteringInput,
11 GraphCommunitiesInput, GraphComponentsInput, GraphCyclesInput, GraphHitsInput,
12 GraphNeighborhoodInput, GraphPersonalizedPageRankInput, GraphShortestPathInput,
13 GraphTopologicalSortInput, GraphTraversalInput, GraphUseCases, InspectNativeArtifactInput,
14 NativeUseCases, PatchEntityInput, PatchEntityOperation, PatchEntityOperationType,
15 QueryUseCases, SearchHybridInput, SearchIvfInput, SearchMultimodalInput, SearchSimilarInput,
16 SearchTextInput, TreeUseCases,
17};
18use std::collections::{BTreeMap, HashMap};
19use std::io::{self, Read, Write};
20use std::net::{TcpListener, TcpStream};
21use std::sync::atomic::{AtomicU64, Ordering};
22use std::thread;
23use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
24
25use std::sync::Arc;
26
27use crate::api::{RedDBError, RedDBOptions, RedDBResult};
28use crate::auth::store::AuthStore;
29use crate::catalog::{CatalogModelSnapshot, CollectionDescriptor, CollectionModel, SchemaMode};
30use crate::health::{HealthProvider, HealthReport, HealthState};
31use crate::json::{parse_json, to_vec as json_to_vec, Map, Value as JsonValue};
32use crate::runtime::{
33 RedDBRuntime, RuntimeFilter, RuntimeFilterValue, RuntimeGraphCentralityAlgorithm,
34 RuntimeGraphCentralityResult, RuntimeGraphClusteringResult, RuntimeGraphCommunityAlgorithm,
35 RuntimeGraphCommunityResult, RuntimeGraphComponentsMode, RuntimeGraphComponentsResult,
36 RuntimeGraphCyclesResult, RuntimeGraphDirection, RuntimeGraphHitsResult,
37 RuntimeGraphNeighborhoodResult, RuntimeGraphPathAlgorithm, RuntimeGraphPathResult,
38 RuntimeGraphPattern, RuntimeGraphProjection, RuntimeGraphTopologicalSortResult,
39 RuntimeGraphTraversalResult, RuntimeGraphTraversalStrategy, RuntimeIvfSearchResult,
40 RuntimeQueryWeights, RuntimeStats, ScanCursor, ScanPage,
41};
42use crate::storage::schema::Value;
43use crate::storage::unified::devx::refs::{NodeRef, TableRef, VectorRef};
44use crate::storage::unified::dsl::{MatchComponents, QueryResult as DslQueryResult};
45use crate::storage::unified::{MetadataValue, RefTarget, SparseVector};
46use crate::storage::{CrossRef, EntityData, EntityId, EntityKind, SimilarResult, UnifiedEntity};
47
48fn analytics_job_json(job: &crate::PhysicalAnalyticsJob) -> JsonValue {
49 crate::presentation::admin_json::analytics_job_json(job)
50}
51
52#[cfg(test)]
53mod tests {
54 use super::*;
55 use crate::api::RedDBOptions;
56 use crate::health::HealthReport;
57 use crate::service_cli::{
58 TransportListenerFailure, TransportListenerState, TransportReadiness,
59 };
60
61 #[test]
62 fn health_json_reports_transport_listeners() {
63 let runtime = RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime");
64 let mut options = ServerOptions::default();
65 options.transport_readiness = TransportReadiness {
66 active: vec![TransportListenerState {
67 transport: "grpc".to_string(),
68 bind_addr: "127.0.0.1:50051".to_string(),
69 explicit: true,
70 }],
71 failed: vec![TransportListenerFailure {
72 transport: "http".to_string(),
73 bind_addr: "127.0.0.1:5055".to_string(),
74 explicit: false,
75 reason: "http listener bind 127.0.0.1:5055: address in use".to_string(),
76 }],
77 };
78 let server = RedDBServer::with_options(runtime, options);
79
80 let payload = server.health_json_with_transport(&HealthReport::healthy());
81 let JsonValue::Object(root) = payload else {
82 panic!("health payload should be an object");
83 };
84 let Some(JsonValue::Object(listeners)) = root.get("transport_listeners") else {
85 panic!("health payload should include transport_listeners");
86 };
87 let Some(JsonValue::Array(active)) = listeners.get("active") else {
88 panic!("transport_listeners.active should be an array");
89 };
90 let Some(JsonValue::Array(failed)) = listeners.get("failed") else {
91 panic!("transport_listeners.failed should be an array");
92 };
93
94 assert_eq!(active.len(), 1);
95 assert_eq!(failed.len(), 1);
96 }
97}
98
99fn graph_projection_json(projection: &crate::PhysicalGraphProjection) -> JsonValue {
100 crate::presentation::admin_json::graph_projection_json(projection)
101}
102
103pub mod handlers_admin;
104mod handlers_ai;
105mod handlers_auth;
106mod handlers_backup;
107mod handlers_ec;
108pub(crate) mod handlers_entity;
109mod handlers_geo;
110mod handlers_graph;
111mod handlers_keyed;
112mod handlers_log;
113mod handlers_metrics;
114mod handlers_ops;
115mod handlers_query;
116mod handlers_replication;
117mod handlers_vcs;
118mod handlers_vector;
119pub mod header_escape_guard;
120pub mod http_connection_limiter;
121pub mod http_handler_metrics;
122pub mod http_limits;
123pub mod ingest_pipeline;
124mod patch_support;
125mod request_body;
126mod request_context;
127mod routing;
128mod serverless_support;
129pub mod tls;
130mod transport;
131
132use self::handlers_ai::*;
133use self::handlers_entity::*;
134use self::handlers_graph::*;
135use self::handlers_keyed::*;
136use self::handlers_metrics::*;
137use self::handlers_ops::*;
138use self::handlers_query::*;
139use self::http_connection_limiter::{
140 HandlerDeadline, HttpConnectionLimiter, MonotonicClock, SystemMonotonicClock,
141};
142use self::http_handler_metrics::{HttpHandlerMetrics, HttpRejectReason, HttpTransport};
143pub use self::http_limits::{
144 HttpLimitsCliInput, HttpLimitsResolved, DEFAULT_HANDLER_TIMEOUT_MS, DEFAULT_RETRY_AFTER_SECS,
145};
146use self::patch_support::*;
147use self::request_body::*;
148use self::routing::*;
149use self::serverless_support::*;
150use self::transport::*;
151
152#[derive(Debug, Clone, Copy, PartialEq, Eq)]
158pub enum ServerSurface {
159 Public,
161 AdminOnly,
165 MetricsOnly,
169}
170
171#[derive(Debug, Clone)]
172pub struct ServerOptions {
173 pub bind_addr: String,
174 pub max_body_bytes: usize,
175 pub read_timeout_ms: u64,
176 pub write_timeout_ms: u64,
177 pub max_scan_limit: usize,
178 pub surface: ServerSurface,
182 pub transport_readiness: crate::service_cli::TransportReadiness,
183}
184
185impl Default for ServerOptions {
186 fn default() -> Self {
187 Self {
188 bind_addr: "127.0.0.1:5055".to_string(),
189 max_body_bytes: 1024 * 1024,
190 read_timeout_ms: 5_000,
191 write_timeout_ms: 5_000,
192 max_scan_limit: 1_000,
193 surface: ServerSurface::Public,
194 transport_readiness: crate::service_cli::TransportReadiness::default(),
195 }
196 }
197}
198
199pub struct ServerReplicationState {
201 pub config: crate::replication::ReplicationConfig,
202 pub primary: Option<crate::replication::primary::PrimaryReplication>,
203}
204
205#[derive(Clone)]
206pub struct RedDBServer {
207 runtime: RedDBRuntime,
208 options: ServerOptions,
209 auth_store: Option<Arc<AuthStore>>,
210 replication: Option<Arc<ServerReplicationState>>,
211 http_limiter: HttpConnectionLimiter,
216 handler_timeout: Duration,
222 handler_clock: Arc<dyn MonotonicClock>,
229 slow_inject_ms: Arc<AtomicU64>,
236 http_metrics: HttpHandlerMetrics,
242 retry_after_secs: u64,
246 reject_503_bytes: Arc<Vec<u8>>,
250}
251
252const DEFAULT_HANDLER_TIMEOUT: Duration = Duration::from_millis(30_000);
254
255#[derive(Debug, Clone, Copy, PartialEq, Eq)]
256enum ServerlessWarmupScope {
257 Indexes,
258 GraphProjections,
259 AnalyticsJobs,
260 NativeArtifacts,
261}
262
263#[derive(Debug, Clone, Copy, PartialEq, Eq)]
264enum DeploymentProfile {
265 Embedded,
266 Server,
267 Serverless,
268}
269
270fn percent_decode_path_segment(input: &str) -> Result<String, String> {
271 let bytes = input.as_bytes();
272 let mut out = Vec::with_capacity(bytes.len());
273 let mut index = 0;
274 while index < bytes.len() {
275 match bytes[index] {
276 b'%' => {
277 if index + 2 >= bytes.len() {
278 return Err("truncated percent escape".to_string());
279 }
280 let high = hex_value(bytes[index + 1])
281 .ok_or_else(|| "invalid percent escape".to_string())?;
282 let low = hex_value(bytes[index + 2])
283 .ok_or_else(|| "invalid percent escape".to_string())?;
284 out.push((high << 4) | low);
285 index += 3;
286 }
287 byte => {
288 out.push(byte);
289 index += 1;
290 }
291 }
292 }
293 String::from_utf8(out).map_err(|_| "path segment is not valid UTF-8".to_string())
294}
295
296fn hex_value(byte: u8) -> Option<u8> {
297 match byte {
298 b'0'..=b'9' => Some(byte - b'0'),
299 b'a'..=b'f' => Some(byte - b'a' + 10),
300 b'A'..=b'F' => Some(byte - b'A' + 10),
301 _ => None,
302 }
303}
304
305#[derive(Debug, Clone)]
306struct ParsedQueryRequest {
307 query: String,
308 entity_types: Option<Vec<String>>,
309 capabilities: Option<Vec<String>>,
310 params: Option<Vec<Value>>,
314}
315
316#[derive(Debug, Clone, Copy)]
317enum PatchOperationType {
318 Set,
319 Replace,
320 Unset,
321}
322
323#[derive(Debug, Clone)]
324struct PatchOperation {
325 op: PatchOperationType,
326 path: Vec<String>,
327 value: Option<JsonValue>,
328}
329
330impl RedDBServer {
331 pub fn new(runtime: RedDBRuntime) -> Self {
332 Self::with_options(runtime, ServerOptions::default())
333 }
334
335 pub fn from_database_options(
336 db_options: RedDBOptions,
337 server_options: ServerOptions,
338 ) -> RedDBResult<Self> {
339 let runtime = RedDBRuntime::with_options(db_options)?;
340 Ok(Self::with_options(runtime, server_options))
341 }
342
343 pub fn with_options(runtime: RedDBRuntime, options: ServerOptions) -> Self {
344 Self {
345 runtime,
346 options,
347 auth_store: None,
348 replication: None,
349 http_limiter: HttpConnectionLimiter::with_default_cap(),
350 handler_timeout: DEFAULT_HANDLER_TIMEOUT,
351 handler_clock: Arc::new(SystemMonotonicClock::new()),
352 slow_inject_ms: Arc::new(AtomicU64::new(0)),
353 http_metrics: HttpHandlerMetrics::new(),
354 retry_after_secs: DEFAULT_RETRY_AFTER_SECS,
355 reject_503_bytes: Arc::new(build_reject_503_bytes(DEFAULT_RETRY_AFTER_SECS)),
356 }
357 }
358
359 #[doc(hidden)]
360 pub fn http_metrics(&self) -> &HttpHandlerMetrics {
361 &self.http_metrics
362 }
363
364 #[doc(hidden)]
369 pub fn with_http_limiter_cap(mut self, cap: usize) -> Self {
370 self.http_limiter = HttpConnectionLimiter::new(cap);
371 self
372 }
373
374 pub fn with_http_limits(mut self, limits: HttpLimitsResolved) -> Self {
379 self.http_limiter = HttpConnectionLimiter::new(limits.max_handlers);
380 self.handler_timeout = Duration::from_millis(limits.handler_timeout_ms);
381 self.retry_after_secs = limits.retry_after_secs;
382 self.reject_503_bytes = Arc::new(build_reject_503_bytes(limits.retry_after_secs));
383 self
384 }
385
386 #[doc(hidden)]
387 pub fn retry_after_secs(&self) -> u64 {
388 self.retry_after_secs
389 }
390
391 #[doc(hidden)]
392 pub fn http_limiter(&self) -> &HttpConnectionLimiter {
393 &self.http_limiter
394 }
395
396 #[doc(hidden)]
399 pub fn with_handler_timeout(mut self, timeout: Duration) -> Self {
400 self.handler_timeout = timeout;
401 self
402 }
403
404 #[doc(hidden)]
405 pub fn handler_timeout(&self) -> Duration {
406 self.handler_timeout
407 }
408
409 #[doc(hidden)]
414 pub fn with_handler_clock(mut self, clock: Arc<dyn MonotonicClock>) -> Self {
415 self.handler_clock = clock;
416 self
417 }
418
419 #[doc(hidden)]
425 pub fn set_test_slow_inject_ms(&self, ms: u64) {
426 self.slow_inject_ms.store(ms, Ordering::Relaxed);
427 }
428
429 pub fn with_auth(mut self, auth_store: Arc<AuthStore>) -> Self {
433 self.runtime.set_auth_store(Arc::clone(&auth_store));
434 self.auth_store = Some(auth_store);
435 self
436 }
437
438 pub fn with_replication(mut self, state: Arc<ServerReplicationState>) -> Self {
440 self.replication = Some(state);
441 self
442 }
443
444 pub fn runtime(&self) -> &RedDBRuntime {
445 &self.runtime
446 }
447
448 pub fn options(&self) -> &ServerOptions {
449 &self.options
450 }
451
452 fn query_use_cases(&self) -> QueryUseCases<'_, RedDBRuntime> {
453 QueryUseCases::new(&self.runtime)
454 }
455
456 fn admin_use_cases(&self) -> AdminUseCases<'_, RedDBRuntime> {
457 AdminUseCases::new(&self.runtime)
458 }
459
460 fn entity_use_cases(&self) -> EntityUseCases<'_, RedDBRuntime> {
461 EntityUseCases::new(&self.runtime)
462 }
463
464 fn catalog_use_cases(&self) -> CatalogUseCases<'_, RedDBRuntime> {
465 CatalogUseCases::new(&self.runtime)
466 }
467
468 fn graph_use_cases(&self) -> GraphUseCases<'_, RedDBRuntime> {
469 GraphUseCases::new(&self.runtime)
470 }
471
472 fn native_use_cases(&self) -> NativeUseCases<'_, RedDBRuntime> {
473 NativeUseCases::new(&self.runtime)
474 }
475
476 fn tree_use_cases(&self) -> TreeUseCases<'_, RedDBRuntime> {
477 TreeUseCases::new(&self.runtime)
478 }
479
480 fn transport_readiness_json(&self) -> JsonValue {
481 let active = self
482 .options
483 .transport_readiness
484 .active
485 .iter()
486 .map(|listener| {
487 let mut object = Map::new();
488 object.insert(
489 "transport".to_string(),
490 JsonValue::String(listener.transport.clone()),
491 );
492 object.insert(
493 "bind_addr".to_string(),
494 JsonValue::String(listener.bind_addr.clone()),
495 );
496 object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
497 JsonValue::Object(object)
498 })
499 .collect();
500 let failed = self
501 .options
502 .transport_readiness
503 .failed
504 .iter()
505 .map(|listener| {
506 let mut object = Map::new();
507 object.insert(
508 "transport".to_string(),
509 JsonValue::String(listener.transport.clone()),
510 );
511 object.insert(
512 "bind_addr".to_string(),
513 JsonValue::String(listener.bind_addr.clone()),
514 );
515 object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
516 object.insert(
517 "reason".to_string(),
518 JsonValue::String(listener.reason.clone()),
519 );
520 JsonValue::Object(object)
521 })
522 .collect();
523
524 let mut object = Map::new();
525 object.insert("active".to_string(), JsonValue::Array(active));
526 object.insert("failed".to_string(), JsonValue::Array(failed));
527 JsonValue::Object(object)
528 }
529
530 fn health_json_with_transport(&self, report: &HealthReport) -> JsonValue {
531 let mut value = crate::presentation::ops_json::health_json(report);
532 if let JsonValue::Object(ref mut object) = value {
533 object.insert(
534 "transport_listeners".to_string(),
535 self.transport_readiness_json(),
536 );
537 }
538 value
539 }
540
541 pub fn serve(&self) -> io::Result<()> {
542 let listener = TcpListener::bind(&self.options.bind_addr)?;
543 self.serve_on(listener)
544 }
545
546 pub fn serve_on(&self, listener: TcpListener) -> io::Result<()> {
547 for stream in listener.incoming() {
548 match stream {
549 Ok(stream) => match self.http_limiter.try_acquire() {
550 Some(permit) => {
551 let server = self.clone();
553 thread::spawn(move || {
554 let _guard = permit; let _ = server.handle_connection(stream);
556 });
557 }
558 None => {
559 self.http_metrics
564 .record_reject(HttpTransport::Http, HttpRejectReason::CapExhausted);
565 self.reject_with_503(stream, self.options.write_timeout_ms);
566 }
567 },
568 Err(err) => return Err(err),
569 }
570 }
571 Ok(())
572 }
573
574 fn reject_with_503(&self, mut stream: TcpStream, write_timeout_ms: u64) {
579 let _ = stream.set_write_timeout(Some(Duration::from_millis(write_timeout_ms)));
580 let _ = stream.write_all(&self.reject_503_bytes);
581 let _ = stream.flush();
582 let _ = stream.shutdown(std::net::Shutdown::Both);
583 }
584
585 pub fn serve_one_on(&self, listener: TcpListener) -> io::Result<()> {
586 let (stream, _) = listener.accept()?;
587 self.handle_connection(stream)
588 }
589
590 pub fn serve_in_background(&self) -> thread::JoinHandle<io::Result<()>> {
591 let server = self.clone();
592 thread::spawn(move || server.serve())
593 }
594
595 pub fn serve_in_background_on(
596 &self,
597 listener: TcpListener,
598 ) -> thread::JoinHandle<io::Result<()>> {
599 let server = self.clone();
600 thread::spawn(move || server.serve_on(listener))
601 }
602
603 pub fn serve_tls(&self, tls_config: std::sync::Arc<rustls::ServerConfig>) -> io::Result<()> {
607 let listener = TcpListener::bind(&self.options.bind_addr)?;
608 self.serve_tls_on(listener, tls_config)
609 }
610
611 pub fn serve_tls_on(
612 &self,
613 listener: TcpListener,
614 tls_config: std::sync::Arc<rustls::ServerConfig>,
615 ) -> io::Result<()> {
616 for stream in listener.incoming() {
617 match stream {
618 Ok(stream) => match self.http_limiter.try_acquire() {
619 Some(permit) => {
620 let server = self.clone();
621 let cfg = tls_config.clone();
622 thread::spawn(move || {
623 let _guard = permit; let _ = server.handle_tls_connection(stream, cfg);
625 });
626 }
627 None => {
628 self.http_metrics
634 .record_reject(HttpTransport::Https, HttpRejectReason::CapExhausted);
635 let _ = stream.shutdown(std::net::Shutdown::Both);
636 drop(stream);
637 }
638 },
639 Err(err) => return Err(err),
640 }
641 }
642 Ok(())
643 }
644
645 pub fn serve_tls_in_background(
646 &self,
647 tls_config: std::sync::Arc<rustls::ServerConfig>,
648 ) -> thread::JoinHandle<io::Result<()>> {
649 let server = self.clone();
650 thread::spawn(move || server.serve_tls(tls_config))
651 }
652
653 pub fn serve_tls_in_background_on(
654 &self,
655 listener: TcpListener,
656 tls_config: std::sync::Arc<rustls::ServerConfig>,
657 ) -> thread::JoinHandle<io::Result<()>> {
658 let server = self.clone();
659 thread::spawn(move || server.serve_tls_on(listener, tls_config))
660 }
661
662 fn handle_connection(&self, stream: TcpStream) -> io::Result<()> {
663 let started = Instant::now();
664 let result = self.handle_connection_inner(stream);
665 let elapsed = started.elapsed().as_secs_f64();
666 self.http_metrics
667 .record_duration(HttpTransport::Http, elapsed);
668 result
669 }
670
671 fn handle_connection_inner(&self, mut stream: TcpStream) -> io::Result<()> {
672 stream.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
673 stream.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;
674
675 let deadline = HandlerDeadline::arm(Arc::clone(&self.handler_clock), self.handler_timeout);
683
684 let request = HttpRequest::read_from(&mut stream, self.options.max_body_bytes)?;
685
686 if deadline.expired() {
688 self.http_metrics
689 .record_reject(HttpTransport::Http, HttpRejectReason::HandlerTimeout);
690 Self::write_handler_timeout_503(&mut stream);
691 return Ok(());
692 }
693
694 if self.try_route_streaming(&request, &mut stream)? {
695 return Ok(());
696 }
697 let response = self.route(request);
698
699 let inject_ms = self.slow_inject_ms.load(Ordering::Relaxed);
703 if inject_ms > 0 {
704 thread::sleep(Duration::from_millis(inject_ms));
705 }
706
707 if deadline.expired() {
709 self.http_metrics
710 .record_reject(HttpTransport::Http, HttpRejectReason::HandlerTimeout);
711 Self::write_handler_timeout_503(&mut stream);
712 return Ok(());
713 }
714
715 stream.write_all(&response.to_http_bytes())?;
716 stream.flush()?;
717 Ok(())
718 }
719
720 fn write_handler_timeout_503<S: Write>(stream: &mut S) {
726 const RESPONSE: &[u8] = b"HTTP/1.1 503 Service Unavailable\r\n\
727 Connection: close\r\n\
728 Content-Length: 0\r\n\
729 \r\n";
730 let _ = stream.write_all(RESPONSE);
731 let _ = stream.flush();
732 }
733
734 fn handle_tls_connection(
735 &self,
736 tcp: TcpStream,
737 tls_config: std::sync::Arc<rustls::ServerConfig>,
738 ) -> io::Result<()> {
739 let started = Instant::now();
740 let result = self.handle_tls_connection_inner(tcp, tls_config);
741 let elapsed = started.elapsed().as_secs_f64();
742 self.http_metrics
743 .record_duration(HttpTransport::Https, elapsed);
744 result
745 }
746
747 fn handle_tls_connection_inner(
748 &self,
749 tcp: TcpStream,
750 tls_config: std::sync::Arc<rustls::ServerConfig>,
751 ) -> io::Result<()> {
752 tcp.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
753 tcp.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;
754
755 let deadline = HandlerDeadline::arm(Arc::clone(&self.handler_clock), self.handler_timeout);
759
760 let mut tls_stream = match self::tls::accept_tls(tls_config, tcp) {
761 Ok(s) => s,
762 Err(err) => {
763 tracing::warn!(
764 target: "reddb::http_tls",
765 err = %err,
766 "TLS handshake failed"
767 );
768 return Err(err);
769 }
770 };
771 let request = match HttpRequest::read_from(&mut tls_stream, self.options.max_body_bytes) {
772 Ok(req) => req,
773 Err(err) => {
774 tracing::warn!(
775 target: "reddb::http_tls",
776 err = %err,
777 "TLS request parse failed"
778 );
779 return Err(err);
780 }
781 };
782
783 if deadline.expired() {
785 self.http_metrics
786 .record_reject(HttpTransport::Https, HttpRejectReason::HandlerTimeout);
787 Self::write_handler_timeout_503(&mut tls_stream);
788 return Ok(());
789 }
790
791 if self.try_route_streaming(&request, &mut tls_stream)? {
792 return Ok(());
793 }
794 let response = self.route(request);
795
796 let inject_ms = self.slow_inject_ms.load(Ordering::Relaxed);
799 if inject_ms > 0 {
800 thread::sleep(Duration::from_millis(inject_ms));
801 }
802
803 if deadline.expired() {
805 self.http_metrics
806 .record_reject(HttpTransport::Https, HttpRejectReason::HandlerTimeout);
807 Self::write_handler_timeout_503(&mut tls_stream);
808 return Ok(());
809 }
810
811 tls_stream.write_all(&response.to_http_bytes())?;
812 tls_stream.flush()?;
813 Ok(())
814 }
815}
816
817fn build_reject_503_bytes(retry_after_secs: u64) -> Vec<u8> {
822 format!(
823 "HTTP/1.1 503 Service Unavailable\r\n\
824 Connection: close\r\n\
825 Content-Length: 0\r\n\
826 Retry-After: {retry_after_secs}\r\n\
827 \r\n"
828 )
829 .into_bytes()
830}