hickory_server/server/
h2_handler.rs

1// Copyright 2015-2021 Benjamin Fry <benjaminfry@me.com>
2//
3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4// https://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// https://opensource.org/licenses/MIT>, at your option. This file may not be
6// copied, modified, or distributed except according to those terms.
7
8use std::{io, net::SocketAddr, sync::Arc};
9
10use bytes::Bytes;
11use futures_util::lock::Mutex;
12use h2::server;
13use hickory_proto::{http::Version, rr::Record};
14use tokio::io::{AsyncRead, AsyncWrite};
15use tokio_util::sync::CancellationToken;
16use tracing::{debug, error, warn};
17
18use crate::{
19    access::AccessControl,
20    authority::MessageResponse,
21    proto::{h2::h2_server, xfer::Protocol},
22    server::{
23        ResponseInfo,
24        request_handler::RequestHandler,
25        response_handler::{ResponseHandler, encode_fallback_servfail_response},
26    },
27};
28
29pub(crate) async fn h2_handler<T, I>(
30    access: Arc<AccessControl>,
31    handler: Arc<T>,
32    io: I,
33    src_addr: SocketAddr,
34    dns_hostname: Option<Arc<str>>,
35    http_endpoint: Arc<str>,
36    shutdown: CancellationToken,
37) where
38    T: RequestHandler,
39    I: AsyncRead + AsyncWrite + Unpin,
40{
41    let dns_hostname = dns_hostname.clone();
42    let http_endpoint = http_endpoint.clone();
43
44    // Start the HTTP/2.0 connection handshake
45    let mut h2 = match server::handshake(io).await {
46        Ok(h2) => h2,
47        Err(err) => {
48            warn!("handshake error from {}: {}", src_addr, err);
49            return;
50        }
51    };
52
53    // Accept all inbound HTTP/2.0 streams sent over the
54    // connection.
55    loop {
56        let (request, respond) = tokio::select! {
57            result = h2.accept() => match result {
58                Some(Ok(next_request)) => next_request,
59                Some(Err(err)) => {
60                    warn!("error accepting request {}: {}", src_addr, err);
61                        return;
62                }
63                None => {
64                    return;
65                }
66            },
67            _ = shutdown.cancelled() => {
68                // A graceful shutdown was initiated.
69                return
70            },
71        };
72
73        debug!("Received request: {:#?}", request);
74        let dns_hostname = dns_hostname.clone();
75        let http_endpoint = http_endpoint.clone();
76        let handler = handler.clone();
77        let access = access.clone();
78        let responder = HttpsResponseHandle(Arc::new(Mutex::new(respond)));
79
80        tokio::spawn(async move {
81            let body = match h2_server::message_from(dns_hostname, http_endpoint, request).await {
82                Ok(bytes) => bytes,
83                Err(err) => {
84                    warn!("error while handling request from {}: {}", src_addr, err);
85                    return;
86                }
87            };
88
89            super::handle_request(&body, src_addr, Protocol::Https, access, handler, responder)
90                .await
91        });
92
93        // we'll continue handling requests from here.
94    }
95}
96
97#[derive(Clone)]
98struct HttpsResponseHandle(Arc<Mutex<server::SendResponse<Bytes>>>);
99
100#[async_trait::async_trait]
101impl ResponseHandler for HttpsResponseHandle {
102    async fn send_response<'a>(
103        &mut self,
104        response: MessageResponse<
105            '_,
106            'a,
107            impl Iterator<Item = &'a Record> + Send + 'a,
108            impl Iterator<Item = &'a Record> + Send + 'a,
109            impl Iterator<Item = &'a Record> + Send + 'a,
110            impl Iterator<Item = &'a Record> + Send + 'a,
111        >,
112    ) -> io::Result<ResponseInfo> {
113        use crate::proto::h2::HttpsError;
114        use crate::proto::http::response;
115        use crate::proto::serialize::binary::BinEncoder;
116
117        let id = response.header().id();
118        let mut bytes = Vec::with_capacity(512);
119        // mut block
120        let info = {
121            let mut encoder = BinEncoder::new(&mut bytes);
122            response.destructive_emit(&mut encoder).or_else(|error| {
123                error!(%error, "error encoding message");
124                encode_fallback_servfail_response(id, &mut bytes)
125            })?
126        };
127        let bytes = Bytes::from(bytes);
128        let response = response::new(Version::Http2, bytes.len())?;
129
130        debug!("sending response: {:#?}", response);
131        let mut stream = self
132            .0
133            .lock()
134            .await
135            .send_response(response, false)
136            .map_err(HttpsError::from)?;
137        stream.send_data(bytes, true).map_err(HttpsError::from)?;
138
139        Ok(info)
140    }
141}