haproxy_protocol/
lib.rs

1#![deny(warnings)]
2#![warn(unused_extern_crates)]
3#![deny(clippy::todo)]
4#![deny(clippy::unimplemented)]
5#![deny(clippy::unwrap_used)]
6#![deny(clippy::expect_used)]
7#![deny(clippy::panic)]
8#![deny(clippy::unreachable)]
9#![deny(clippy::await_holding_lock)]
10#![deny(clippy::needless_pass_by_value)]
11#![deny(clippy::trivially_copy_pass_by_ref)]
12
13use crate::parse::{parse_proxy_hdr_v1, parse_proxy_hdr_v2};
14use std::num::NonZeroUsize;
15
16#[cfg(feature = "tokio")]
17use crate::parse::{V1_MAX_LEN, V1_MIN_LEN};
18
19const NZ_ONE: NonZeroUsize = NonZeroUsize::new(1).expect("Invalid compile time constant");
20
21mod parse;
22
23#[derive(Debug, PartialEq, Eq, Clone, Copy)]
24#[repr(u8)]
25enum Protocol {
26    Unspec = 0x00,
27    TcpV4 = 0x11,
28    UdpV4 = 0x12,
29    TcpV6 = 0x21,
30    UdpV6 = 0x22,
31    // UnixStream = 0x31,
32    // UnixDgram = 0x32,
33}
34
35#[derive(Debug, PartialEq, Eq, Clone, Copy)]
36#[repr(u8)]
37enum Command {
38    Local = 0x00,
39    Proxy = 0x01,
40}
41
42#[derive(Debug, PartialEq, Eq, Clone)]
43enum Address {
44    None,
45    V4 {
46        src: std::net::SocketAddrV4,
47        dst: std::net::SocketAddrV4,
48    },
49    V6 {
50        src: std::net::SocketAddrV6,
51        dst: std::net::SocketAddrV6,
52    },
53    // Unix {
54    //     src: PathBuf,
55    //     dst: PathBuf,
56    // }
57}
58
59#[derive(Debug, Clone)]
60pub enum RemoteAddress {
61    Local,
62    Invalid,
63    TcpV4 {
64        src: std::net::SocketAddrV4,
65        dst: std::net::SocketAddrV4,
66    },
67    UdpV4 {
68        src: std::net::SocketAddrV4,
69        dst: std::net::SocketAddrV4,
70    },
71    TcpV6 {
72        src: std::net::SocketAddrV6,
73        dst: std::net::SocketAddrV6,
74    },
75    UdpV6 {
76        src: std::net::SocketAddrV6,
77        dst: std::net::SocketAddrV6,
78    },
79}
80
81#[derive(Debug)]
82pub enum Error {
83    Incomplete { need: NonZeroUsize },
84    Invalid,
85    UnableToComplete,
86}
87
88#[derive(Debug, Clone)]
89pub struct ProxyHdrV2 {
90    command: Command,
91    protocol: Protocol,
92    // address_family: AddressFamily,
93    // length: u16,
94    address: Address,
95}
96
97impl ProxyHdrV2 {
98    pub fn parse(input_data: &[u8]) -> Result<(usize, Self), Error> {
99        match parse_proxy_hdr_v2(input_data) {
100            Ok((remainder, hdr)) => {
101                let took = input_data.len() - remainder.len();
102                Ok((took, hdr))
103            }
104            Err(nom::Err::Incomplete(nom::Needed::Size(need))) => Err(Error::Incomplete { need }),
105            // We always know exactly how much is needed for hdr v2
106            Err(nom::Err::Incomplete(nom::Needed::Unknown)) => Err(Error::UnableToComplete),
107
108            Err(nom::Err::Error(err)) => {
109                tracing::error!(?err);
110                Err(Error::Invalid)
111            }
112            Err(nom::Err::Failure(err)) => {
113                tracing::error!(?err);
114                Err(Error::Invalid)
115            }
116        }
117    }
118
119    pub fn to_remote_addr(self) -> RemoteAddress {
120        match (self.command, self.protocol, self.address) {
121            (Command::Local, _, _) => RemoteAddress::Local,
122            (Command::Proxy, Protocol::TcpV4, Address::V4 { src, dst }) => {
123                RemoteAddress::TcpV4 { src, dst }
124            }
125            (Command::Proxy, Protocol::UdpV4, Address::V4 { src, dst }) => {
126                RemoteAddress::UdpV4 { src, dst }
127            }
128            (Command::Proxy, Protocol::TcpV6, Address::V6 { src, dst }) => {
129                RemoteAddress::TcpV6 { src, dst }
130            }
131            (Command::Proxy, Protocol::UdpV6, Address::V6 { src, dst }) => {
132                RemoteAddress::UdpV6 { src, dst }
133            }
134            _ => RemoteAddress::Invalid,
135        }
136    }
137}
138
139#[derive(Debug, Clone)]
140pub struct ProxyHdrV1 {
141    protocol: Protocol,
142    address: Address,
143}
144
145impl ProxyHdrV1 {
146    pub fn parse(input_data: &[u8]) -> Result<(usize, Self), Error> {
147        match parse_proxy_hdr_v1(input_data) {
148            Ok((remainder, hdr)) => {
149                let took = input_data.len() - remainder.len();
150                Ok((took, hdr))
151            }
152            Err(nom::Err::Incomplete(nom::Needed::Size(need))) => Err(Error::Incomplete { need }),
153            // We aren't sure how much we need but we need *something*.
154            Err(nom::Err::Incomplete(nom::Needed::Unknown)) => {
155                Err(Error::Incomplete { need: NZ_ONE })
156            }
157
158            Err(nom::Err::Error(err)) => {
159                tracing::error!(?err);
160                Err(Error::Invalid)
161            }
162            Err(nom::Err::Failure(err)) => {
163                tracing::error!(?err);
164                Err(Error::Invalid)
165            }
166        }
167    }
168
169    pub fn to_remote_addr(self) -> RemoteAddress {
170        match (self.protocol, self.address) {
171            (Protocol::TcpV4, Address::V4 { src, dst }) => RemoteAddress::TcpV4 { src, dst },
172            (Protocol::UdpV4, Address::V4 { src, dst }) => RemoteAddress::UdpV4 { src, dst },
173            (Protocol::TcpV6, Address::V6 { src, dst }) => RemoteAddress::TcpV6 { src, dst },
174            (Protocol::UdpV6, Address::V6 { src, dst }) => RemoteAddress::UdpV6 { src, dst },
175            _ => RemoteAddress::Invalid,
176        }
177    }
178}
179
180#[cfg(feature = "tokio")]
181#[derive(Debug)]
182pub enum AsyncReadError {
183    Io(std::io::Error),
184    Invalid,
185    UnableToComplete,
186    RequestTooLarge,
187    InconsistentRead,
188}
189
190#[cfg(feature = "tokio")]
191impl ProxyHdrV2 {
192    pub async fn parse_from_read<S>(mut stream: S) -> Result<(S, Self), AsyncReadError>
193    where
194        S: tokio::io::AsyncReadExt + std::marker::Unpin,
195    {
196        use tracing::{debug, error};
197
198        const HDR_SIZE_LIMIT: usize = 512;
199
200        let mut buf = vec![0; 16];
201
202        // First we need to read the exact amount to get up to the *length* field. This will
203        // let us then proceed to parse the early header and return how much we need to continue
204        // to read.
205        let mut took = stream
206            .read_exact(&mut buf)
207            .await
208            .map_err(AsyncReadError::Io)?;
209
210        match ProxyHdrV2::parse(&buf) {
211            // Okay, we got a valid header - this can occur with proxy for local conditions.
212            Ok((_, hdr)) => return Ok((stream, hdr)),
213            // We need more bytes, this is the precise amount we need.
214            Err(Error::Incomplete { need }) => {
215                let resize_to = buf.len() + usize::from(need);
216                // Limit the amount so that we don't overflow anything or allocate a buffer that
217                // is too large. Nice try hackers.
218                if resize_to > HDR_SIZE_LIMIT {
219                    error!(
220                        "proxy header request was larger than {} bytes, refusing to proceed.",
221                        HDR_SIZE_LIMIT
222                    );
223                    return Err(AsyncReadError::RequestTooLarge);
224                }
225                buf.resize(resize_to, 0);
226            }
227            Err(Error::Invalid) => {
228                debug!(proxy_binary_dump = %hex::encode(&buf));
229                error!("proxy header was invalid");
230                return Err(AsyncReadError::Invalid);
231            }
232            Err(Error::UnableToComplete) => {
233                debug!(proxy_binary_dump = %hex::encode(&buf));
234                error!("proxy header was incomplete");
235                return Err(AsyncReadError::UnableToComplete);
236            }
237        };
238
239        // Now read any remaining bytes into the buffer.
240        took += stream
241            .read_exact(&mut buf[16..])
242            .await
243            .map_err(AsyncReadError::Io)?;
244
245        match ProxyHdrV2::parse(&buf) {
246            Ok((hdr_took, _)) if hdr_took != took => {
247                // We took inconsistent byte amounts, error.
248                error!("proxy header read an inconsistent amount from stream.");
249                Err(AsyncReadError::InconsistentRead)
250            }
251            Ok((_, hdr)) =>
252            // HAPPY!!!!!
253            {
254                Ok((stream, hdr))
255            }
256            Err(Error::Incomplete { need: _ }) => {
257                error!("proxy header could not be read to the end.");
258                Err(AsyncReadError::UnableToComplete)
259            }
260            Err(Error::Invalid) => {
261                debug!(proxy_binary_dump = %hex::encode(&buf));
262                error!("proxy header was invalid");
263                Err(AsyncReadError::Invalid)
264            }
265            Err(Error::UnableToComplete) => {
266                debug!(proxy_binary_dump = %hex::encode(&buf));
267                error!("proxy header was incomplete");
268                Err(AsyncReadError::UnableToComplete)
269            }
270        }
271    }
272}
273
274#[cfg(feature = "tokio")]
275impl ProxyHdrV1 {
276    pub async fn parse_from_read<S>(mut stream: S) -> Result<(S, Self), AsyncReadError>
277    where
278        S: tokio::io::AsyncReadExt + std::marker::Unpin,
279    {
280        use tracing::{debug, error};
281
282        // This is the maximum size of the buffer we could possibly need.
283        let mut buf = [0; V1_MAX_LEN + 1];
284
285        // First we need to read the exact amount to get up to the *length* field. This will
286        // let us then proceed to parse the early header and return how much we need to continue
287        // to read.
288        let mut took = stream
289            .read_exact(&mut buf[..V1_MIN_LEN])
290            .await
291            .map_err(AsyncReadError::Io)?;
292
293        loop {
294            match ProxyHdrV1::parse(&buf) {
295                Ok((hdr_took, _)) if hdr_took != took => {
296                    // We took inconsistent byte amounts, error.
297                    error!("proxy header read an inconsistent amount from stream.");
298                    return Err(AsyncReadError::InconsistentRead);
299                }
300                Ok((_, hdr)) =>
301                // HAPPY!!!!!
302                {
303                    return Ok((stream, hdr));
304                }
305                Err(Error::Incomplete { need }) => {
306                    // We need more data, read it and then continue the loop.
307                    // Now read any remaining bytes into the buffer.
308                    took += stream
309                        .read_exact(&mut buf[took..took + need.get()])
310                        .await
311                        .map_err(AsyncReadError::Io)?;
312
313                    continue;
314                }
315                Err(Error::Invalid) => {
316                    debug!(proxy_binary_dump = %hex::encode(buf));
317                    error!("proxy header was invalid");
318                    return Err(AsyncReadError::Invalid);
319                }
320                Err(Error::UnableToComplete) => {
321                    debug!(proxy_binary_dump = %hex::encode(buf));
322                    error!("proxy header was incomplete");
323                    return Err(AsyncReadError::UnableToComplete);
324                }
325            }
326        } // end loop
327    }
328}