sozu_lib/protocol/proxy_protocol/
relay.rs

1use std::{cell::RefCell, io::Write, rc::Rc};
2
3use mio::{net::TcpStream, Token};
4use nom::{Err, Offset};
5use rusty_ulid::Ulid;
6
7use crate::{
8    pool::Checkout,
9    protocol::{
10        pipe::{Pipe, WebSocketContext},
11        proxy_protocol::parser::parse_v2_header,
12    },
13    socket::{SocketHandler, SocketResult},
14    sozu_command::ready::Ready,
15    tcp::TcpListener,
16    Protocol, Readiness, SessionMetrics, SessionResult,
17};
18
19pub struct RelayProxyProtocol<Front: SocketHandler> {
20    cursor_header: usize,
21    pub backend_readiness: Readiness,
22    pub backend_token: Option<Token>,
23    pub backend: Option<TcpStream>,
24    pub frontend_buffer: Checkout,
25    pub frontend_readiness: Readiness,
26    pub frontend_token: Token,
27    pub frontend: Front,
28    pub header_size: Option<usize>,
29    pub request_id: Ulid,
30}
31
32impl<Front: SocketHandler> RelayProxyProtocol<Front> {
33    /// Instantiate a new RelayProxyProtocol SessionState with:
34    /// - frontend_interest: READABLE | HUP | ERROR
35    /// - frontend_event: EMPTY
36    pub fn new(
37        frontend: Front,
38        frontend_token: Token,
39        request_id: Ulid,
40        backend: Option<TcpStream>,
41        front_buf: Checkout,
42    ) -> Self {
43        RelayProxyProtocol {
44            backend_readiness: Readiness {
45                interest: Ready::HUP | Ready::ERROR,
46                event: Ready::EMPTY,
47            },
48            backend_token: None,
49            backend,
50            cursor_header: 0,
51            frontend_buffer: front_buf,
52            frontend_readiness: Readiness {
53                interest: Ready::READABLE | Ready::HUP | Ready::ERROR,
54                event: Ready::EMPTY,
55            },
56            frontend_token,
57            frontend,
58            header_size: None,
59            request_id,
60        }
61    }
62
63    pub fn readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
64        let (sz, res) = self.frontend.socket_read(self.frontend_buffer.space());
65        debug!(
66            "FRONT proxy protocol [{:?}]: read {} bytes and res={:?}",
67            self.frontend_token, sz, res
68        );
69
70        if sz > 0 {
71            self.frontend_buffer.fill(sz);
72
73            count!("bytes_in", sz as i64);
74            metrics.bin += sz;
75
76            if res == SocketResult::Error {
77                error!(
78                    "[{:?}] front socket error, closing the connection",
79                    self.frontend_token
80                );
81                incr!("proxy_protocol.errors");
82                self.frontend_readiness.reset();
83                self.backend_readiness.reset();
84                return SessionResult::Close;
85            }
86
87            if res == SocketResult::WouldBlock {
88                self.frontend_readiness.event.remove(Ready::READABLE);
89            }
90
91            let read_sz = match parse_v2_header(self.frontend_buffer.data()) {
92                Ok((rest, _)) => {
93                    self.frontend_readiness.interest.remove(Ready::READABLE);
94                    self.backend_readiness.interest.insert(Ready::WRITABLE);
95                    self.frontend_buffer.data().offset(rest)
96                }
97                Err(Err::Incomplete(_)) => return SessionResult::Continue,
98                Err(e) => {
99                    error!("[{:?}] error parsing the proxy protocol header(error={:?}), closing the connection",
100            self.frontend_token, e);
101                    return SessionResult::Close;
102                }
103            };
104
105            self.header_size = Some(read_sz);
106            self.frontend_buffer.consume(sz);
107            return SessionResult::Continue;
108        }
109
110        SessionResult::Continue
111    }
112
113    // The header is send immediately at once upon the connection is establish
114    // and prepended before any data.
115    pub fn back_writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
116        debug!("Writing proxy protocol header");
117
118        if let Some(ref mut socket) = self.backend {
119            if let Some(ref header_size) = self.header_size {
120                loop {
121                    match socket.write(self.frontend_buffer.data()) {
122                        Ok(sz) => {
123                            self.cursor_header += sz;
124
125                            metrics.backend_bout += sz;
126                            self.frontend_buffer.consume(sz);
127
128                            if self.cursor_header >= *header_size {
129                                info!("Proxy protocol sent, upgrading");
130                                return SessionResult::Upgrade;
131                            }
132                        }
133                        Err(e) => {
134                            incr!("proxy_protocol.errors");
135                            self.frontend_readiness.reset();
136                            self.backend_readiness.reset();
137                            debug!("PROXY PROTOCOL {}", e);
138                            break;
139                        }
140                    }
141                }
142            }
143        }
144        SessionResult::Continue
145    }
146
147    pub fn front_socket(&self) -> &TcpStream {
148        self.frontend.socket_ref()
149    }
150
151    pub fn front_socket_mut(&mut self) -> &mut TcpStream {
152        self.frontend.socket_mut()
153    }
154
155    pub fn back_socket(&self) -> Option<&TcpStream> {
156        self.backend.as_ref()
157    }
158
159    pub fn back_socket_mut(&mut self) -> Option<&mut TcpStream> {
160        self.backend.as_mut()
161    }
162
163    pub fn set_back_socket(&mut self, socket: TcpStream) {
164        self.backend = Some(socket);
165    }
166
167    pub fn back_token(&self) -> Option<Token> {
168        self.backend_token
169    }
170
171    pub fn set_back_token(&mut self, token: Token) {
172        self.backend_token = Some(token);
173    }
174
175    pub fn into_pipe(
176        mut self,
177        back_buf: Checkout,
178        listener: Rc<RefCell<TcpListener>>,
179    ) -> Pipe<Front, TcpListener> {
180        let backend_socket = self.backend.take().unwrap();
181        let addr = self.front_socket().peer_addr().ok();
182
183        let mut pipe = Pipe::new(
184            back_buf,
185            None,
186            Some(backend_socket),
187            None,
188            None,
189            None,
190            None,
191            self.frontend_buffer,
192            self.frontend_token,
193            self.frontend,
194            listener,
195            Protocol::TCP,
196            self.request_id,
197            addr,
198            WebSocketContext::Tcp,
199        );
200
201        pipe.frontend_readiness.event = self.frontend_readiness.event;
202        pipe.backend_readiness.event = self.backend_readiness.event;
203
204        if let Some(back_token) = self.backend_token {
205            pipe.set_back_token(back_token);
206        }
207
208        pipe
209    }
210}