sozu_lib/protocol/proxy_protocol/
relay.rs

1use std::{cell::RefCell, io::Write, rc::Rc};
2
3use mio::{Token, net::TcpStream};
4use nom::{Err, Offset};
5use rusty_ulid::Ulid;
6
7use crate::{
8    Protocol, Readiness, SessionMetrics, SessionResult,
9    pool::Checkout,
10    protocol::{
11        pipe::{Pipe, WebSocketContext},
12        proxy_protocol::parser::parse_v2_header,
13    },
14    socket::{SocketHandler, SocketResult},
15    sozu_command::ready::Ready,
16    tcp::TcpListener,
17};
18
19pub struct RelayProxyProtocol<Front: SocketHandler> {
20    cursor_header: usize,
21    pub backend_readiness: Readiness,
22    pub backend_token: Option<Token>,
23    pub backend: Option<TcpStream>,
24    pub frontend_buffer: Checkout,
25    pub frontend_readiness: Readiness,
26    pub frontend_token: Token,
27    pub frontend: Front,
28    pub header_size: Option<usize>,
29    pub request_id: Ulid,
30}
31
32impl<Front: SocketHandler> RelayProxyProtocol<Front> {
33    /// Instantiate a new RelayProxyProtocol SessionState with:
34    /// - frontend_interest: READABLE | HUP | ERROR
35    /// - frontend_event: EMPTY
36    pub fn new(
37        frontend: Front,
38        frontend_token: Token,
39        request_id: Ulid,
40        backend: Option<TcpStream>,
41        front_buf: Checkout,
42    ) -> Self {
43        RelayProxyProtocol {
44            backend_readiness: Readiness {
45                interest: Ready::HUP | Ready::ERROR,
46                event: Ready::EMPTY,
47            },
48            backend_token: None,
49            backend,
50            cursor_header: 0,
51            frontend_buffer: front_buf,
52            frontend_readiness: Readiness {
53                interest: Ready::READABLE | Ready::HUP | Ready::ERROR,
54                event: Ready::EMPTY,
55            },
56            frontend_token,
57            frontend,
58            header_size: None,
59            request_id,
60        }
61    }
62
63    pub fn readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
64        let (sz, res) = self.frontend.socket_read(self.frontend_buffer.space());
65        debug!(
66            "FRONT proxy protocol [{:?}]: read {} bytes and res={:?}",
67            self.frontend_token, sz, res
68        );
69
70        if sz > 0 {
71            self.frontend_buffer.fill(sz);
72
73            count!("bytes_in", sz as i64);
74            metrics.bin += sz;
75
76            if res == SocketResult::Error {
77                error!(
78                    "[{:?}] front socket error, closing the connection",
79                    self.frontend_token
80                );
81                incr!("proxy_protocol.errors");
82                self.frontend_readiness.reset();
83                self.backend_readiness.reset();
84                return SessionResult::Close;
85            }
86
87            if res == SocketResult::WouldBlock {
88                self.frontend_readiness.event.remove(Ready::READABLE);
89            }
90
91            let read_sz = match parse_v2_header(self.frontend_buffer.data()) {
92                Ok((rest, _)) => {
93                    self.frontend_readiness.interest.remove(Ready::READABLE);
94                    self.backend_readiness.interest.insert(Ready::WRITABLE);
95                    self.frontend_buffer.data().offset(rest)
96                }
97                Err(Err::Incomplete(_)) => return SessionResult::Continue,
98                Err(e) => {
99                    error!(
100                        "[{:?}] error parsing the proxy protocol header(error={:?}), closing the connection",
101                        self.frontend_token, e
102                    );
103                    return SessionResult::Close;
104                }
105            };
106
107            self.header_size = Some(read_sz);
108            self.frontend_buffer.consume(sz);
109            return SessionResult::Continue;
110        }
111
112        SessionResult::Continue
113    }
114
115    // The header is send immediately at once upon the connection is establish
116    // and prepended before any data.
117    pub fn back_writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
118        debug!("Writing proxy protocol header");
119
120        if let Some(ref mut socket) = self.backend {
121            if let Some(ref header_size) = self.header_size {
122                loop {
123                    match socket.write(self.frontend_buffer.data()) {
124                        Ok(sz) => {
125                            self.cursor_header += sz;
126
127                            metrics.backend_bout += sz;
128                            self.frontend_buffer.consume(sz);
129
130                            if self.cursor_header >= *header_size {
131                                info!("Proxy protocol sent, upgrading");
132                                return SessionResult::Upgrade;
133                            }
134                        }
135                        Err(e) => {
136                            incr!("proxy_protocol.errors");
137                            self.frontend_readiness.reset();
138                            self.backend_readiness.reset();
139                            debug!("PROXY PROTOCOL {}", e);
140                            break;
141                        }
142                    }
143                }
144            }
145        }
146        SessionResult::Continue
147    }
148
149    pub fn front_socket(&self) -> &TcpStream {
150        self.frontend.socket_ref()
151    }
152
153    pub fn front_socket_mut(&mut self) -> &mut TcpStream {
154        self.frontend.socket_mut()
155    }
156
157    pub fn back_socket(&self) -> Option<&TcpStream> {
158        self.backend.as_ref()
159    }
160
161    pub fn back_socket_mut(&mut self) -> Option<&mut TcpStream> {
162        self.backend.as_mut()
163    }
164
165    pub fn set_back_socket(&mut self, socket: TcpStream) {
166        self.backend = Some(socket);
167    }
168
169    pub fn back_token(&self) -> Option<Token> {
170        self.backend_token
171    }
172
173    pub fn set_back_token(&mut self, token: Token) {
174        self.backend_token = Some(token);
175    }
176
177    pub fn into_pipe(
178        mut self,
179        back_buf: Checkout,
180        listener: Rc<RefCell<TcpListener>>,
181    ) -> Pipe<Front, TcpListener> {
182        let backend_socket = self.backend.take().unwrap();
183        let addr = self.front_socket().peer_addr().ok();
184
185        let mut pipe = Pipe::new(
186            back_buf,
187            None,
188            Some(backend_socket),
189            None,
190            None,
191            None,
192            None,
193            self.frontend_buffer,
194            self.frontend_token,
195            self.frontend,
196            listener,
197            Protocol::TCP,
198            self.request_id,
199            addr,
200            WebSocketContext::Tcp,
201        );
202
203        pipe.frontend_readiness.event = self.frontend_readiness.event;
204        pipe.backend_readiness.event = self.backend_readiness.event;
205
206        if let Some(back_token) = self.backend_token {
207            pipe.set_back_token(back_token);
208        }
209
210        pipe
211    }
212}