1pub(crate) use crate::application::json_input::{
4 json_bool_field, json_f32_field, json_string_field, json_usize_field,
5};
6pub(crate) use crate::application::{
7 AdminUseCases, CatalogUseCases, CreateDocumentInput, CreateEdgeInput, CreateEntityOutput,
8 CreateKvInput, CreateNodeEmbeddingInput, CreateNodeGraphLinkInput, CreateNodeInput,
9 CreateNodeTableLinkInput, CreateRowInput, CreateVectorInput, DeleteEntityInput, EntityUseCases,
10 ExecuteQueryInput, ExplainQueryInput, GraphCentralityInput, GraphClusteringInput,
11 GraphCommunitiesInput, GraphComponentsInput, GraphCyclesInput, GraphHitsInput,
12 GraphNeighborhoodInput, GraphPersonalizedPageRankInput, GraphShortestPathInput,
13 GraphTopologicalSortInput, GraphTraversalInput, GraphUseCases, InspectNativeArtifactInput,
14 NativeUseCases, PatchEntityInput, PatchEntityOperation, PatchEntityOperationType,
15 QueryUseCases, SearchHybridInput, SearchIvfInput, SearchMultimodalInput, SearchSimilarInput,
16 SearchTextInput, TreeUseCases,
17};
18use std::collections::{BTreeMap, HashMap};
19use std::io::{self, Read, Write};
20use std::net::{TcpListener, TcpStream};
21use std::sync::atomic::{AtomicU64, Ordering};
22use std::thread;
23use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
24
25use std::sync::Arc;
26
27use crate::api::{RedDBError, RedDBOptions, RedDBResult};
28use crate::auth::store::AuthStore;
29use crate::catalog::{CatalogModelSnapshot, CollectionDescriptor, CollectionModel, SchemaMode};
30use crate::health::{HealthProvider, HealthReport, HealthState};
31use crate::json::{parse_json, to_vec as json_to_vec, Map, Value as JsonValue};
32use crate::runtime::{
33 RedDBRuntime, RuntimeFilter, RuntimeFilterValue, RuntimeGraphCentralityAlgorithm,
34 RuntimeGraphCentralityResult, RuntimeGraphClusteringResult, RuntimeGraphCommunityAlgorithm,
35 RuntimeGraphCommunityResult, RuntimeGraphComponentsMode, RuntimeGraphComponentsResult,
36 RuntimeGraphCyclesResult, RuntimeGraphDirection, RuntimeGraphHitsResult,
37 RuntimeGraphNeighborhoodResult, RuntimeGraphPathAlgorithm, RuntimeGraphPathResult,
38 RuntimeGraphPattern, RuntimeGraphProjection, RuntimeGraphTopologicalSortResult,
39 RuntimeGraphTraversalResult, RuntimeGraphTraversalStrategy, RuntimeIvfSearchResult,
40 RuntimeQueryWeights, RuntimeStats, ScanCursor, ScanPage,
41};
42use crate::storage::schema::Value;
43use crate::storage::unified::devx::refs::{NodeRef, TableRef, VectorRef};
44use crate::storage::unified::dsl::{MatchComponents, QueryResult as DslQueryResult};
45use crate::storage::unified::{MetadataValue, RefTarget, SparseVector};
46use crate::storage::{CrossRef, EntityData, EntityId, EntityKind, SimilarResult, UnifiedEntity};
47
48fn analytics_job_json(job: &crate::PhysicalAnalyticsJob) -> JsonValue {
49 crate::presentation::admin_json::analytics_job_json(job)
50}
51
52#[cfg(test)]
53mod tests {
54 use super::*;
55 use crate::api::RedDBOptions;
56 use crate::health::HealthReport;
57 use crate::service_cli::{
58 TransportListenerFailure, TransportListenerState, TransportReadiness,
59 };
60
61 #[test]
62 fn health_json_reports_transport_listeners() {
63 let runtime = RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime");
64 let mut options = ServerOptions::default();
65 options.transport_readiness = TransportReadiness {
66 active: vec![TransportListenerState {
67 transport: "grpc".to_string(),
68 bind_addr: "127.0.0.1:50051".to_string(),
69 explicit: true,
70 }],
71 failed: vec![TransportListenerFailure {
72 transport: "http".to_string(),
73 bind_addr: "127.0.0.1:5055".to_string(),
74 explicit: false,
75 reason: "http listener bind 127.0.0.1:5055: address in use".to_string(),
76 }],
77 };
78 let server = RedDBServer::with_options(runtime, options);
79
80 let payload = server.health_json_with_transport(&HealthReport::healthy());
81 let JsonValue::Object(root) = payload else {
82 panic!("health payload should be an object");
83 };
84 let Some(JsonValue::Object(listeners)) = root.get("transport_listeners") else {
85 panic!("health payload should include transport_listeners");
86 };
87 let Some(JsonValue::Array(active)) = listeners.get("active") else {
88 panic!("transport_listeners.active should be an array");
89 };
90 let Some(JsonValue::Array(failed)) = listeners.get("failed") else {
91 panic!("transport_listeners.failed should be an array");
92 };
93
94 assert_eq!(active.len(), 1);
95 assert_eq!(failed.len(), 1);
96 }
97}
98
99fn graph_projection_json(projection: &crate::PhysicalGraphProjection) -> JsonValue {
100 crate::presentation::admin_json::graph_projection_json(projection)
101}
102
103pub mod handlers_admin;
104mod handlers_ai;
105pub mod http_connection_limiter;
106pub mod http_handler_metrics;
107pub mod http_limits;
108mod handlers_auth;
109mod handlers_backup;
110mod handlers_ec;
111pub(crate) mod handlers_entity;
112mod handlers_geo;
113mod handlers_graph;
114mod handlers_keyed;
115mod handlers_log;
116mod handlers_metrics;
117mod handlers_ops;
118mod handlers_query;
119mod handlers_replication;
120mod handlers_vcs;
121mod handlers_vector;
122pub mod header_escape_guard;
123pub mod ingest_pipeline;
124mod patch_support;
125mod request_body;
126mod request_context;
127mod routing;
128mod serverless_support;
129pub mod tls;
130mod transport;
131
132use self::handlers_ai::*;
133use self::http_connection_limiter::HttpConnectionLimiter;
134use self::http_handler_metrics::{HttpHandlerMetrics, HttpRejectReason, HttpTransport};
135pub use self::http_limits::{
136 HttpLimitsCliInput, HttpLimitsResolved, DEFAULT_HANDLER_TIMEOUT_MS, DEFAULT_RETRY_AFTER_SECS,
137};
138use self::handlers_entity::*;
139use self::handlers_graph::*;
140use self::handlers_keyed::*;
141use self::handlers_metrics::*;
142use self::handlers_ops::*;
143use self::handlers_query::*;
144use self::patch_support::*;
145use self::request_body::*;
146use self::routing::*;
147use self::serverless_support::*;
148use self::transport::*;
149
150#[derive(Debug, Clone, Copy, PartialEq, Eq)]
156pub enum ServerSurface {
157 Public,
159 AdminOnly,
163 MetricsOnly,
167}
168
169#[derive(Debug, Clone)]
170pub struct ServerOptions {
171 pub bind_addr: String,
172 pub max_body_bytes: usize,
173 pub read_timeout_ms: u64,
174 pub write_timeout_ms: u64,
175 pub max_scan_limit: usize,
176 pub surface: ServerSurface,
180 pub transport_readiness: crate::service_cli::TransportReadiness,
181}
182
183impl Default for ServerOptions {
184 fn default() -> Self {
185 Self {
186 bind_addr: "127.0.0.1:5055".to_string(),
187 max_body_bytes: 1024 * 1024,
188 read_timeout_ms: 5_000,
189 write_timeout_ms: 5_000,
190 max_scan_limit: 1_000,
191 surface: ServerSurface::Public,
192 transport_readiness: crate::service_cli::TransportReadiness::default(),
193 }
194 }
195}
196
197pub struct ServerReplicationState {
199 pub config: crate::replication::ReplicationConfig,
200 pub primary: Option<crate::replication::primary::PrimaryReplication>,
201}
202
203#[derive(Clone)]
204pub struct RedDBServer {
205 runtime: RedDBRuntime,
206 options: ServerOptions,
207 auth_store: Option<Arc<AuthStore>>,
208 replication: Option<Arc<ServerReplicationState>>,
209 http_limiter: HttpConnectionLimiter,
214 handler_timeout: Duration,
220 slow_inject_ms: Arc<AtomicU64>,
227 http_metrics: HttpHandlerMetrics,
233 retry_after_secs: u64,
237 reject_503_bytes: Arc<Vec<u8>>,
241}
242
243const DEFAULT_HANDLER_TIMEOUT: Duration = Duration::from_millis(30_000);
245
246#[derive(Debug, Clone, Copy, PartialEq, Eq)]
247enum ServerlessWarmupScope {
248 Indexes,
249 GraphProjections,
250 AnalyticsJobs,
251 NativeArtifacts,
252}
253
254#[derive(Debug, Clone, Copy, PartialEq, Eq)]
255enum DeploymentProfile {
256 Embedded,
257 Server,
258 Serverless,
259}
260
261fn percent_decode_path_segment(input: &str) -> Result<String, String> {
262 let bytes = input.as_bytes();
263 let mut out = Vec::with_capacity(bytes.len());
264 let mut index = 0;
265 while index < bytes.len() {
266 match bytes[index] {
267 b'%' => {
268 if index + 2 >= bytes.len() {
269 return Err("truncated percent escape".to_string());
270 }
271 let high = hex_value(bytes[index + 1])
272 .ok_or_else(|| "invalid percent escape".to_string())?;
273 let low = hex_value(bytes[index + 2])
274 .ok_or_else(|| "invalid percent escape".to_string())?;
275 out.push((high << 4) | low);
276 index += 3;
277 }
278 byte => {
279 out.push(byte);
280 index += 1;
281 }
282 }
283 }
284 String::from_utf8(out).map_err(|_| "path segment is not valid UTF-8".to_string())
285}
286
287fn hex_value(byte: u8) -> Option<u8> {
288 match byte {
289 b'0'..=b'9' => Some(byte - b'0'),
290 b'a'..=b'f' => Some(byte - b'a' + 10),
291 b'A'..=b'F' => Some(byte - b'A' + 10),
292 _ => None,
293 }
294}
295
296#[derive(Debug, Clone)]
297struct ParsedQueryRequest {
298 query: String,
299 entity_types: Option<Vec<String>>,
300 capabilities: Option<Vec<String>>,
301 params: Option<Vec<Value>>,
305}
306
307#[derive(Debug, Clone, Copy)]
308enum PatchOperationType {
309 Set,
310 Replace,
311 Unset,
312}
313
314#[derive(Debug, Clone)]
315struct PatchOperation {
316 op: PatchOperationType,
317 path: Vec<String>,
318 value: Option<JsonValue>,
319}
320
321impl RedDBServer {
322 pub fn new(runtime: RedDBRuntime) -> Self {
323 Self::with_options(runtime, ServerOptions::default())
324 }
325
326 pub fn from_database_options(
327 db_options: RedDBOptions,
328 server_options: ServerOptions,
329 ) -> RedDBResult<Self> {
330 let runtime = RedDBRuntime::with_options(db_options)?;
331 Ok(Self::with_options(runtime, server_options))
332 }
333
334 pub fn with_options(runtime: RedDBRuntime, options: ServerOptions) -> Self {
335 Self {
336 runtime,
337 options,
338 auth_store: None,
339 replication: None,
340 http_limiter: HttpConnectionLimiter::with_default_cap(),
341 handler_timeout: DEFAULT_HANDLER_TIMEOUT,
342 slow_inject_ms: Arc::new(AtomicU64::new(0)),
343 http_metrics: HttpHandlerMetrics::new(),
344 retry_after_secs: DEFAULT_RETRY_AFTER_SECS,
345 reject_503_bytes: Arc::new(build_reject_503_bytes(DEFAULT_RETRY_AFTER_SECS)),
346 }
347 }
348
349 #[doc(hidden)]
350 pub fn http_metrics(&self) -> &HttpHandlerMetrics {
351 &self.http_metrics
352 }
353
354 #[doc(hidden)]
359 pub fn with_http_limiter_cap(mut self, cap: usize) -> Self {
360 self.http_limiter = HttpConnectionLimiter::new(cap);
361 self
362 }
363
364 pub fn with_http_limits(mut self, limits: HttpLimitsResolved) -> Self {
369 self.http_limiter = HttpConnectionLimiter::new(limits.max_handlers);
370 self.handler_timeout = Duration::from_millis(limits.handler_timeout_ms);
371 self.retry_after_secs = limits.retry_after_secs;
372 self.reject_503_bytes = Arc::new(build_reject_503_bytes(limits.retry_after_secs));
373 self
374 }
375
376 #[doc(hidden)]
377 pub fn retry_after_secs(&self) -> u64 {
378 self.retry_after_secs
379 }
380
381 #[doc(hidden)]
382 pub fn http_limiter(&self) -> &HttpConnectionLimiter {
383 &self.http_limiter
384 }
385
386 #[doc(hidden)]
389 pub fn with_handler_timeout(mut self, timeout: Duration) -> Self {
390 self.handler_timeout = timeout;
391 self
392 }
393
394 #[doc(hidden)]
395 pub fn handler_timeout(&self) -> Duration {
396 self.handler_timeout
397 }
398
399 #[doc(hidden)]
405 pub fn set_test_slow_inject_ms(&self, ms: u64) {
406 self.slow_inject_ms.store(ms, Ordering::Relaxed);
407 }
408
409 pub fn with_auth(mut self, auth_store: Arc<AuthStore>) -> Self {
413 self.runtime.set_auth_store(Arc::clone(&auth_store));
414 self.auth_store = Some(auth_store);
415 self
416 }
417
418 pub fn with_replication(mut self, state: Arc<ServerReplicationState>) -> Self {
420 self.replication = Some(state);
421 self
422 }
423
424 pub fn runtime(&self) -> &RedDBRuntime {
425 &self.runtime
426 }
427
428 pub fn options(&self) -> &ServerOptions {
429 &self.options
430 }
431
432 fn query_use_cases(&self) -> QueryUseCases<'_, RedDBRuntime> {
433 QueryUseCases::new(&self.runtime)
434 }
435
436 fn admin_use_cases(&self) -> AdminUseCases<'_, RedDBRuntime> {
437 AdminUseCases::new(&self.runtime)
438 }
439
440 fn entity_use_cases(&self) -> EntityUseCases<'_, RedDBRuntime> {
441 EntityUseCases::new(&self.runtime)
442 }
443
444 fn catalog_use_cases(&self) -> CatalogUseCases<'_, RedDBRuntime> {
445 CatalogUseCases::new(&self.runtime)
446 }
447
448 fn graph_use_cases(&self) -> GraphUseCases<'_, RedDBRuntime> {
449 GraphUseCases::new(&self.runtime)
450 }
451
452 fn native_use_cases(&self) -> NativeUseCases<'_, RedDBRuntime> {
453 NativeUseCases::new(&self.runtime)
454 }
455
456 fn tree_use_cases(&self) -> TreeUseCases<'_, RedDBRuntime> {
457 TreeUseCases::new(&self.runtime)
458 }
459
460 fn transport_readiness_json(&self) -> JsonValue {
461 let active = self
462 .options
463 .transport_readiness
464 .active
465 .iter()
466 .map(|listener| {
467 let mut object = Map::new();
468 object.insert(
469 "transport".to_string(),
470 JsonValue::String(listener.transport.clone()),
471 );
472 object.insert(
473 "bind_addr".to_string(),
474 JsonValue::String(listener.bind_addr.clone()),
475 );
476 object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
477 JsonValue::Object(object)
478 })
479 .collect();
480 let failed = self
481 .options
482 .transport_readiness
483 .failed
484 .iter()
485 .map(|listener| {
486 let mut object = Map::new();
487 object.insert(
488 "transport".to_string(),
489 JsonValue::String(listener.transport.clone()),
490 );
491 object.insert(
492 "bind_addr".to_string(),
493 JsonValue::String(listener.bind_addr.clone()),
494 );
495 object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
496 object.insert(
497 "reason".to_string(),
498 JsonValue::String(listener.reason.clone()),
499 );
500 JsonValue::Object(object)
501 })
502 .collect();
503
504 let mut object = Map::new();
505 object.insert("active".to_string(), JsonValue::Array(active));
506 object.insert("failed".to_string(), JsonValue::Array(failed));
507 JsonValue::Object(object)
508 }
509
510 fn health_json_with_transport(&self, report: &HealthReport) -> JsonValue {
511 let mut value = crate::presentation::ops_json::health_json(report);
512 if let JsonValue::Object(ref mut object) = value {
513 object.insert(
514 "transport_listeners".to_string(),
515 self.transport_readiness_json(),
516 );
517 }
518 value
519 }
520
521 pub fn serve(&self) -> io::Result<()> {
522 let listener = TcpListener::bind(&self.options.bind_addr)?;
523 self.serve_on(listener)
524 }
525
526 pub fn serve_on(&self, listener: TcpListener) -> io::Result<()> {
527 for stream in listener.incoming() {
528 match stream {
529 Ok(stream) => match self.http_limiter.try_acquire() {
530 Some(permit) => {
531 let server = self.clone();
533 thread::spawn(move || {
534 let _guard = permit; let _ = server.handle_connection(stream);
536 });
537 }
538 None => {
539 self.http_metrics
544 .record_reject(HttpTransport::Http, HttpRejectReason::CapExhausted);
545 self.reject_with_503(stream, self.options.write_timeout_ms);
546 }
547 },
548 Err(err) => return Err(err),
549 }
550 }
551 Ok(())
552 }
553
554 fn reject_with_503(&self, mut stream: TcpStream, write_timeout_ms: u64) {
559 let _ = stream.set_write_timeout(Some(Duration::from_millis(write_timeout_ms)));
560 let _ = stream.write_all(&self.reject_503_bytes);
561 let _ = stream.flush();
562 let _ = stream.shutdown(std::net::Shutdown::Both);
563 }
564
565 pub fn serve_one_on(&self, listener: TcpListener) -> io::Result<()> {
566 let (stream, _) = listener.accept()?;
567 self.handle_connection(stream)
568 }
569
570 pub fn serve_in_background(&self) -> thread::JoinHandle<io::Result<()>> {
571 let server = self.clone();
572 thread::spawn(move || server.serve())
573 }
574
575 pub fn serve_in_background_on(
576 &self,
577 listener: TcpListener,
578 ) -> thread::JoinHandle<io::Result<()>> {
579 let server = self.clone();
580 thread::spawn(move || server.serve_on(listener))
581 }
582
583 pub fn serve_tls(&self, tls_config: std::sync::Arc<rustls::ServerConfig>) -> io::Result<()> {
587 let listener = TcpListener::bind(&self.options.bind_addr)?;
588 self.serve_tls_on(listener, tls_config)
589 }
590
591 pub fn serve_tls_on(
592 &self,
593 listener: TcpListener,
594 tls_config: std::sync::Arc<rustls::ServerConfig>,
595 ) -> io::Result<()> {
596 for stream in listener.incoming() {
597 match stream {
598 Ok(stream) => match self.http_limiter.try_acquire() {
599 Some(permit) => {
600 let server = self.clone();
601 let cfg = tls_config.clone();
602 thread::spawn(move || {
603 let _guard = permit; let _ = server.handle_tls_connection(stream, cfg);
605 });
606 }
607 None => {
608 self.http_metrics
614 .record_reject(HttpTransport::Https, HttpRejectReason::CapExhausted);
615 let _ = stream.shutdown(std::net::Shutdown::Both);
616 drop(stream);
617 }
618 },
619 Err(err) => return Err(err),
620 }
621 }
622 Ok(())
623 }
624
625 pub fn serve_tls_in_background(
626 &self,
627 tls_config: std::sync::Arc<rustls::ServerConfig>,
628 ) -> thread::JoinHandle<io::Result<()>> {
629 let server = self.clone();
630 thread::spawn(move || server.serve_tls(tls_config))
631 }
632
633 pub fn serve_tls_in_background_on(
634 &self,
635 listener: TcpListener,
636 tls_config: std::sync::Arc<rustls::ServerConfig>,
637 ) -> thread::JoinHandle<io::Result<()>> {
638 let server = self.clone();
639 thread::spawn(move || server.serve_tls_on(listener, tls_config))
640 }
641
642 fn handle_connection(&self, stream: TcpStream) -> io::Result<()> {
643 let started = Instant::now();
644 let result = self.handle_connection_inner(stream, started);
645 let elapsed = started.elapsed().as_secs_f64();
646 self.http_metrics
647 .record_duration(HttpTransport::Http, elapsed);
648 result
649 }
650
651 fn handle_connection_inner(
652 &self,
653 mut stream: TcpStream,
654 started: Instant,
655 ) -> io::Result<()> {
656 stream.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
657 stream.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;
658
659 let deadline = started + self.handler_timeout;
664
665 let request = HttpRequest::read_from(&mut stream, self.options.max_body_bytes)?;
666
667 if Instant::now() >= deadline {
669 self.http_metrics
670 .record_reject(HttpTransport::Http, HttpRejectReason::HandlerTimeout);
671 Self::write_handler_timeout_503(&mut stream);
672 return Ok(());
673 }
674
675 if self.try_route_streaming(&request, &mut stream)? {
676 return Ok(());
677 }
678 let response = self.route(request);
679
680 let inject_ms = self.slow_inject_ms.load(Ordering::Relaxed);
684 if inject_ms > 0 {
685 thread::sleep(Duration::from_millis(inject_ms));
686 }
687
688 if Instant::now() >= deadline {
690 self.http_metrics
691 .record_reject(HttpTransport::Http, HttpRejectReason::HandlerTimeout);
692 Self::write_handler_timeout_503(&mut stream);
693 return Ok(());
694 }
695
696 stream.write_all(&response.to_http_bytes())?;
697 stream.flush()?;
698 Ok(())
699 }
700
701 fn write_handler_timeout_503<S: Write>(stream: &mut S) {
707 const RESPONSE: &[u8] = b"HTTP/1.1 503 Service Unavailable\r\n\
708 Connection: close\r\n\
709 Content-Length: 0\r\n\
710 \r\n";
711 let _ = stream.write_all(RESPONSE);
712 let _ = stream.flush();
713 }
714
715 fn handle_tls_connection(
716 &self,
717 tcp: TcpStream,
718 tls_config: std::sync::Arc<rustls::ServerConfig>,
719 ) -> io::Result<()> {
720 let started = Instant::now();
721 let result = self.handle_tls_connection_inner(tcp, tls_config, started);
722 let elapsed = started.elapsed().as_secs_f64();
723 self.http_metrics
724 .record_duration(HttpTransport::Https, elapsed);
725 result
726 }
727
728 fn handle_tls_connection_inner(
729 &self,
730 tcp: TcpStream,
731 tls_config: std::sync::Arc<rustls::ServerConfig>,
732 started: Instant,
733 ) -> io::Result<()> {
734 tcp.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
735 tcp.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;
736
737 let deadline = started + self.handler_timeout;
740
741 let mut tls_stream = match self::tls::accept_tls(tls_config, tcp) {
742 Ok(s) => s,
743 Err(err) => {
744 tracing::warn!(
745 target: "reddb::http_tls",
746 err = %err,
747 "TLS handshake failed"
748 );
749 return Err(err);
750 }
751 };
752 let request = match HttpRequest::read_from(&mut tls_stream, self.options.max_body_bytes) {
753 Ok(req) => req,
754 Err(err) => {
755 tracing::warn!(
756 target: "reddb::http_tls",
757 err = %err,
758 "TLS request parse failed"
759 );
760 return Err(err);
761 }
762 };
763
764 if Instant::now() >= deadline {
766 self.http_metrics
767 .record_reject(HttpTransport::Https, HttpRejectReason::HandlerTimeout);
768 Self::write_handler_timeout_503(&mut tls_stream);
769 return Ok(());
770 }
771
772 if self.try_route_streaming(&request, &mut tls_stream)? {
773 return Ok(());
774 }
775 let response = self.route(request);
776
777 let inject_ms = self.slow_inject_ms.load(Ordering::Relaxed);
780 if inject_ms > 0 {
781 thread::sleep(Duration::from_millis(inject_ms));
782 }
783
784 if Instant::now() >= deadline {
786 self.http_metrics
787 .record_reject(HttpTransport::Https, HttpRejectReason::HandlerTimeout);
788 Self::write_handler_timeout_503(&mut tls_stream);
789 return Ok(());
790 }
791
792 tls_stream.write_all(&response.to_http_bytes())?;
793 tls_stream.flush()?;
794 Ok(())
795 }
796}
797
798fn build_reject_503_bytes(retry_after_secs: u64) -> Vec<u8> {
803 format!(
804 "HTTP/1.1 503 Service Unavailable\r\n\
805 Connection: close\r\n\
806 Content-Length: 0\r\n\
807 Retry-After: {retry_after_secs}\r\n\
808 \r\n"
809 )
810 .into_bytes()
811}