hickory_server/server/
h2_handler.rs1use std::{io, net::SocketAddr, sync::Arc};
9
10use bytes::Bytes;
11use futures_util::lock::Mutex;
12use h2::server;
13use hickory_proto::{http::Version, rr::Record};
14use tokio::io::{AsyncRead, AsyncWrite};
15use tokio_util::sync::CancellationToken;
16use tracing::{debug, error, warn};
17
18use crate::{
19 access::AccessControl,
20 authority::MessageResponse,
21 proto::{h2::h2_server, xfer::Protocol},
22 server::{
23 ResponseInfo,
24 request_handler::RequestHandler,
25 response_handler::{ResponseHandler, encode_fallback_servfail_response},
26 },
27};
28
29pub(crate) async fn h2_handler<T, I>(
30 access: Arc<AccessControl>,
31 handler: Arc<T>,
32 io: I,
33 src_addr: SocketAddr,
34 dns_hostname: Option<Arc<str>>,
35 http_endpoint: Arc<str>,
36 shutdown: CancellationToken,
37) where
38 T: RequestHandler,
39 I: AsyncRead + AsyncWrite + Unpin,
40{
41 let dns_hostname = dns_hostname.clone();
42 let http_endpoint = http_endpoint.clone();
43
44 let mut h2 = match server::handshake(io).await {
46 Ok(h2) => h2,
47 Err(err) => {
48 warn!("handshake error from {}: {}", src_addr, err);
49 return;
50 }
51 };
52
53 loop {
56 let (request, respond) = tokio::select! {
57 result = h2.accept() => match result {
58 Some(Ok(next_request)) => next_request,
59 Some(Err(err)) => {
60 warn!("error accepting request {}: {}", src_addr, err);
61 return;
62 }
63 None => {
64 return;
65 }
66 },
67 _ = shutdown.cancelled() => {
68 return
70 },
71 };
72
73 debug!("Received request: {:#?}", request);
74 let dns_hostname = dns_hostname.clone();
75 let http_endpoint = http_endpoint.clone();
76 let handler = handler.clone();
77 let access = access.clone();
78 let responder = HttpsResponseHandle(Arc::new(Mutex::new(respond)));
79
80 tokio::spawn(async move {
81 let body = match h2_server::message_from(dns_hostname, http_endpoint, request).await {
82 Ok(bytes) => bytes,
83 Err(err) => {
84 warn!("error while handling request from {}: {}", src_addr, err);
85 return;
86 }
87 };
88
89 super::handle_request(&body, src_addr, Protocol::Https, access, handler, responder)
90 .await
91 });
92
93 }
95}
96
97#[derive(Clone)]
98struct HttpsResponseHandle(Arc<Mutex<server::SendResponse<Bytes>>>);
99
100#[async_trait::async_trait]
101impl ResponseHandler for HttpsResponseHandle {
102 async fn send_response<'a>(
103 &mut self,
104 response: MessageResponse<
105 '_,
106 'a,
107 impl Iterator<Item = &'a Record> + Send + 'a,
108 impl Iterator<Item = &'a Record> + Send + 'a,
109 impl Iterator<Item = &'a Record> + Send + 'a,
110 impl Iterator<Item = &'a Record> + Send + 'a,
111 >,
112 ) -> io::Result<ResponseInfo> {
113 use crate::proto::h2::HttpsError;
114 use crate::proto::http::response;
115 use crate::proto::serialize::binary::BinEncoder;
116
117 let id = response.header().id();
118 let mut bytes = Vec::with_capacity(512);
119 let info = {
121 let mut encoder = BinEncoder::new(&mut bytes);
122 response.destructive_emit(&mut encoder).or_else(|error| {
123 error!(%error, "error encoding message");
124 encode_fallback_servfail_response(id, &mut bytes)
125 })?
126 };
127 let bytes = Bytes::from(bytes);
128 let response = response::new(Version::Http2, bytes.len())?;
129
130 debug!("sending response: {:#?}", response);
131 let mut stream = self
132 .0
133 .lock()
134 .await
135 .send_response(response, false)
136 .map_err(HttpsError::from)?;
137 stream.send_data(bytes, true).map_err(HttpsError::from)?;
138
139 Ok(info)
140 }
141}