sozu_lib/
http.rs

1use std::{
2    cell::RefCell,
3    collections::{BTreeMap, HashMap, hash_map::Entry},
4    io::ErrorKind,
5    net::{Shutdown, SocketAddr},
6    os::unix::io::AsRawFd,
7    rc::{Rc, Weak},
8    str::from_utf8_unchecked,
9    time::{Duration, Instant},
10};
11
12use mio::{
13    Interest, Registry, Token,
14    net::{TcpListener as MioTcpListener, TcpStream},
15    unix::SourceFd,
16};
17use rusty_ulid::Ulid;
18use sozu_command::{
19    logging::CachedTags,
20    proto::command::{
21        Cluster, HttpListenerConfig, ListenerType, RemoveListener, RequestHttpFrontend,
22        WorkerRequest, WorkerResponse, request::RequestType,
23    },
24    ready::Ready,
25    response::HttpFrontend,
26    state::ClusterId,
27};
28
29use crate::{
30    AcceptError, FrontendFromRequestError, L7ListenerHandler, L7Proxy, ListenerError,
31    ListenerHandler, Protocol, ProxyConfiguration, ProxyError, ProxySession, SessionIsToBeClosed,
32    SessionMetrics, SessionResult, StateMachineBuilder, StateResult,
33    backends::BackendMap,
34    pool::Pool,
35    protocol::{
36        Http, Pipe, SessionState,
37        http::{
38            ResponseStream,
39            answers::HttpAnswers,
40            parser::{Method, hostname_and_port},
41        },
42        proxy_protocol::expect::ExpectProxyProtocol,
43    },
44    router::{Route, Router},
45    server::{ListenToken, SessionManager},
46    socket::server_bind,
47    timer::TimeoutContainer,
48};
49
50#[derive(PartialEq, Eq)]
51pub enum SessionStatus {
52    Normal,
53    DefaultAnswer,
54}
55
56StateMachineBuilder! {
57    /// The various Stages of an HTTP connection:
58    ///
59    /// 1. optional (ExpectProxyProtocol)
60    /// 2. HTTP
61    /// 3. WebSocket (passthrough)
62    enum HttpStateMachine impl SessionState {
63        Expect(ExpectProxyProtocol<TcpStream>),
64        Http(Http<TcpStream, HttpListener>),
65        WebSocket(Pipe<TcpStream, HttpListener>),
66    }
67}
68
69/// HTTP Session to insert in the SessionManager
70///
71/// 1 session <=> 1 HTTP connection (client to sozu)
72pub struct HttpSession {
73    answers: Rc<RefCell<HttpAnswers>>,
74    configured_backend_timeout: Duration,
75    configured_connect_timeout: Duration,
76    configured_frontend_timeout: Duration,
77    frontend_token: Token,
78    last_event: Instant,
79    listener: Rc<RefCell<HttpListener>>,
80    metrics: SessionMetrics,
81    pool: Weak<RefCell<Pool>>,
82    proxy: Rc<RefCell<HttpProxy>>,
83    state: HttpStateMachine,
84    sticky_name: String,
85    has_been_closed: bool,
86}
87
88impl HttpSession {
89    #[allow(clippy::too_many_arguments)]
90    pub fn new(
91        answers: Rc<RefCell<HttpAnswers>>,
92        configured_backend_timeout: Duration,
93        configured_connect_timeout: Duration,
94        configured_frontend_timeout: Duration,
95        configured_request_timeout: Duration,
96        expect_proxy: bool,
97        listener: Rc<RefCell<HttpListener>>,
98        pool: Weak<RefCell<Pool>>,
99        proxy: Rc<RefCell<HttpProxy>>,
100        public_address: SocketAddr,
101        sock: TcpStream,
102        sticky_name: String,
103        token: Token,
104        wait_time: Duration,
105    ) -> Result<Self, AcceptError> {
106        let request_id = Ulid::generate();
107        let container_frontend_timeout = TimeoutContainer::new(configured_request_timeout, token);
108
109        let state = if expect_proxy {
110            trace!("starting in expect proxy state");
111            gauge_add!("protocol.proxy.expect", 1);
112
113            HttpStateMachine::Expect(ExpectProxyProtocol::new(
114                container_frontend_timeout,
115                sock,
116                token,
117                request_id,
118            ))
119        } else {
120            gauge_add!("protocol.http", 1);
121            let session_address = sock.peer_addr().ok();
122
123            HttpStateMachine::Http(Http::new(
124                answers.clone(),
125                configured_backend_timeout,
126                configured_connect_timeout,
127                configured_frontend_timeout,
128                container_frontend_timeout,
129                sock,
130                token,
131                listener.clone(),
132                pool.clone(),
133                Protocol::HTTP,
134                public_address,
135                request_id,
136                session_address,
137                sticky_name.clone(),
138            )?)
139        };
140
141        let metrics = SessionMetrics::new(Some(wait_time));
142        Ok(HttpSession {
143            answers,
144            configured_backend_timeout,
145            configured_connect_timeout,
146            configured_frontend_timeout,
147            frontend_token: token,
148            has_been_closed: false,
149            last_event: Instant::now(),
150            listener,
151            metrics,
152            pool,
153            proxy,
154            state,
155            sticky_name,
156        })
157    }
158
159    pub fn upgrade(&mut self) -> SessionIsToBeClosed {
160        debug!("HTTP::upgrade");
161        let new_state = match self.state.take() {
162            HttpStateMachine::Http(http) => self.upgrade_http(http),
163            HttpStateMachine::Expect(expect) => self.upgrade_expect(expect),
164            HttpStateMachine::WebSocket(ws) => self.upgrade_websocket(ws),
165            HttpStateMachine::FailedUpgrade(_) => unreachable!(),
166        };
167
168        match new_state {
169            Some(state) => {
170                self.state = state;
171                false
172            }
173            // The state stays FailedUpgrade, but the Session should be closed right after
174            None => true,
175        }
176    }
177
178    fn upgrade_expect(
179        &mut self,
180        expect: ExpectProxyProtocol<TcpStream>,
181    ) -> Option<HttpStateMachine> {
182        debug!("switching to HTTP");
183        match expect
184            .addresses
185            .as_ref()
186            .map(|add| (add.destination(), add.source()))
187        {
188            Some((Some(public_address), Some(session_address))) => {
189                let mut http = Http::new(
190                    self.answers.clone(),
191                    self.configured_backend_timeout,
192                    self.configured_connect_timeout,
193                    self.configured_frontend_timeout,
194                    expect.container_frontend_timeout,
195                    expect.frontend,
196                    expect.frontend_token,
197                    self.listener.clone(),
198                    self.pool.clone(),
199                    Protocol::HTTP,
200                    public_address,
201                    expect.request_id,
202                    Some(session_address),
203                    self.sticky_name.clone(),
204                )
205                .ok()?;
206                http.frontend_readiness.event = expect.frontend_readiness.event;
207
208                gauge_add!("protocol.proxy.expect", -1);
209                gauge_add!("protocol.http", 1);
210                Some(HttpStateMachine::Http(http))
211            }
212            _ => None,
213        }
214    }
215
216    fn upgrade_http(&mut self, http: Http<TcpStream, HttpListener>) -> Option<HttpStateMachine> {
217        debug!("http switching to ws");
218        let front_token = self.frontend_token;
219        let back_token = match http.backend_token {
220            Some(back_token) => back_token,
221            None => {
222                warn!(
223                    "Could not upgrade http request on cluster '{:?}' ({:?}) using backend '{:?}' into websocket for request '{}'",
224                    http.context.cluster_id,
225                    self.frontend_token,
226                    http.context.backend_id,
227                    http.context.id
228                );
229                return None;
230            }
231        };
232
233        let ws_context = http.websocket_context();
234        let mut container_frontend_timeout = http.container_frontend_timeout;
235        let mut container_backend_timeout = http.container_backend_timeout;
236        container_frontend_timeout.reset();
237        container_backend_timeout.reset();
238
239        let backend_buffer = if let ResponseStream::BackendAnswer(kawa) = http.response_stream {
240            kawa.storage.buffer
241        } else {
242            return None;
243        };
244
245        let mut pipe = Pipe::new(
246            backend_buffer,
247            http.context.backend_id,
248            http.backend_socket,
249            http.backend,
250            Some(container_backend_timeout),
251            Some(container_frontend_timeout),
252            http.context.cluster_id,
253            http.request_stream.storage.buffer,
254            front_token,
255            http.frontend_socket,
256            self.listener.clone(),
257            Protocol::HTTP,
258            http.context.id,
259            http.context.session_address,
260            ws_context,
261        );
262
263        pipe.frontend_readiness.event = http.frontend_readiness.event;
264        pipe.backend_readiness.event = http.backend_readiness.event;
265        pipe.set_back_token(back_token);
266
267        gauge_add!("protocol.http", -1);
268        gauge_add!("protocol.ws", 1);
269        gauge_add!("http.active_requests", -1);
270        gauge_add!("websocket.active_requests", 1);
271        Some(HttpStateMachine::WebSocket(pipe))
272    }
273
274    fn upgrade_websocket(&self, ws: Pipe<TcpStream, HttpListener>) -> Option<HttpStateMachine> {
275        // what do we do here?
276        error!("Upgrade called on WS, this should not happen");
277        Some(HttpStateMachine::WebSocket(ws))
278    }
279}
280
281impl ProxySession for HttpSession {
282    fn close(&mut self) {
283        if self.has_been_closed {
284            return;
285        }
286
287        trace!("Closing HTTP session");
288        self.metrics.service_stop();
289
290        // Restore gauges
291        match self.state.marker() {
292            StateMarker::Expect => gauge_add!("protocol.proxy.expect", -1),
293            StateMarker::Http => gauge_add!("protocol.http", -1),
294            StateMarker::WebSocket => {
295                gauge_add!("protocol.ws", -1);
296                gauge_add!("websocket.active_requests", -1);
297            }
298        }
299
300        if self.state.failed() {
301            match self.state.marker() {
302                StateMarker::Expect => incr!("http.upgrade.expect.failed"),
303                StateMarker::Http => incr!("http.upgrade.http.failed"),
304                StateMarker::WebSocket => incr!("http.upgrade.ws.failed"),
305            }
306            return;
307        }
308
309        self.state.cancel_timeouts();
310        // defer backend closing to the state
311        self.state.close(self.proxy.clone(), &mut self.metrics);
312
313        let front_socket = self.state.front_socket();
314        if let Err(e) = front_socket.shutdown(Shutdown::Both) {
315            // error 107 NotConnected can happen when was never fully connected, or was already disconnected due to error
316            if e.kind() != ErrorKind::NotConnected {
317                error!(
318                    "error shutting down front socket({:?}): {:?}",
319                    front_socket, e
320                )
321            }
322        }
323
324        // deregister the frontend and remove it
325        let proxy = self.proxy.borrow();
326        let fd = front_socket.as_raw_fd();
327        if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
328            error!(
329                "error deregistering front socket({:?}) while closing HTTP session: {:?}",
330                fd, e
331            );
332        }
333        proxy.remove_session(self.frontend_token);
334
335        self.has_been_closed = true;
336    }
337
338    fn timeout(&mut self, token: Token) -> SessionIsToBeClosed {
339        let state_result = self.state.timeout(token, &mut self.metrics);
340        state_result == StateResult::CloseSession
341    }
342
343    fn protocol(&self) -> Protocol {
344        Protocol::HTTP
345    }
346
347    fn update_readiness(&mut self, token: Token, events: Ready) {
348        trace!(
349            "token {:?} got event {}",
350            token,
351            super::ready_to_string(events)
352        );
353        self.last_event = Instant::now();
354        self.metrics.wait_start();
355        self.state.update_readiness(token, events);
356    }
357
358    fn ready(&mut self, session: Rc<RefCell<dyn ProxySession>>) -> SessionIsToBeClosed {
359        self.metrics.service_start();
360
361        let session_result =
362            self.state
363                .ready(session.clone(), self.proxy.clone(), &mut self.metrics);
364
365        let to_be_closed = match session_result {
366            SessionResult::Close => true,
367            SessionResult::Continue => false,
368            SessionResult::Upgrade => match self.upgrade() {
369                false => self.ready(session),
370                true => true,
371            },
372        };
373
374        self.metrics.service_stop();
375        to_be_closed
376    }
377
378    fn shutting_down(&mut self) -> SessionIsToBeClosed {
379        self.state.shutting_down()
380    }
381
382    fn last_event(&self) -> Instant {
383        self.last_event
384    }
385
386    fn print_session(&self) {
387        self.state.print_state("HTTP");
388        error!("Metrics: {:?}", self.metrics);
389    }
390
391    fn frontend_token(&self) -> Token {
392        self.frontend_token
393    }
394}
395
396pub type Hostname = String;
397
398pub struct HttpListener {
399    active: bool,
400    address: SocketAddr,
401    answers: Rc<RefCell<HttpAnswers>>,
402    config: HttpListenerConfig,
403    fronts: Router,
404    listener: Option<MioTcpListener>,
405    tags: BTreeMap<String, CachedTags>,
406    token: Token,
407}
408
409impl ListenerHandler for HttpListener {
410    fn get_addr(&self) -> &SocketAddr {
411        &self.address
412    }
413
414    fn get_tags(&self, key: &str) -> Option<&CachedTags> {
415        self.tags.get(key)
416    }
417
418    fn set_tags(&mut self, key: String, tags: Option<BTreeMap<String, String>>) {
419        match tags {
420            Some(tags) => self.tags.insert(key, CachedTags::new(tags)),
421            None => self.tags.remove(&key),
422        };
423    }
424}
425
426impl L7ListenerHandler for HttpListener {
427    fn get_sticky_name(&self) -> &str {
428        &self.config.sticky_name
429    }
430
431    fn get_connect_timeout(&self) -> u32 {
432        self.config.connect_timeout
433    }
434
435    // redundant, already called once in extract_route
436    fn frontend_from_request(
437        &self,
438        host: &str,
439        uri: &str,
440        method: &Method,
441    ) -> Result<Route, FrontendFromRequestError> {
442        let start = Instant::now();
443        let (remaining_input, (hostname, _)) = match hostname_and_port(host.as_bytes()) {
444            Ok(tuple) => tuple,
445            Err(parse_error) => {
446                // parse_error contains a slice of given_host, which should NOT escape this scope
447                return Err(FrontendFromRequestError::HostParse {
448                    host: host.to_owned(),
449                    error: parse_error.to_string(),
450                });
451            }
452        };
453        if remaining_input != &b""[..] {
454            return Err(FrontendFromRequestError::InvalidCharsAfterHost(
455                host.to_owned(),
456            ));
457        }
458
459        /*if port == Some(&b"80"[..]) {
460        // it is alright to call from_utf8_unchecked,
461        // we already verified that there are only ascii
462        // chars in there
463          unsafe { from_utf8_unchecked(hostname) }
464        } else {
465          host
466        }
467        */
468        let host = unsafe { from_utf8_unchecked(hostname) };
469
470        let route = self.fronts.lookup(host, uri, method).map_err(|e| {
471            incr!("http.failed_backend_matching");
472            FrontendFromRequestError::NoClusterFound(e)
473        })?;
474
475        let now = Instant::now();
476
477        if let Route::ClusterId(cluster) = &route {
478            time!("frontend_matching_time", cluster, (now - start).as_millis());
479        }
480
481        Ok(route)
482    }
483}
484
485pub struct HttpProxy {
486    backends: Rc<RefCell<BackendMap>>,
487    clusters: HashMap<ClusterId, Cluster>,
488    listeners: HashMap<Token, Rc<RefCell<HttpListener>>>,
489    pool: Rc<RefCell<Pool>>,
490    registry: Registry,
491    sessions: Rc<RefCell<SessionManager>>,
492}
493
494impl HttpProxy {
495    pub fn new(
496        registry: Registry,
497        sessions: Rc<RefCell<SessionManager>>,
498        pool: Rc<RefCell<Pool>>,
499        backends: Rc<RefCell<BackendMap>>,
500    ) -> HttpProxy {
501        HttpProxy {
502            backends,
503            clusters: HashMap::new(),
504            listeners: HashMap::new(),
505            pool,
506            registry,
507            sessions,
508        }
509    }
510
511    pub fn add_listener(
512        &mut self,
513        config: HttpListenerConfig,
514        token: Token,
515    ) -> Result<Token, ProxyError> {
516        match self.listeners.entry(token) {
517            Entry::Vacant(entry) => {
518                let http_listener =
519                    HttpListener::new(config, token).map_err(ProxyError::AddListener)?;
520                entry.insert(Rc::new(RefCell::new(http_listener)));
521                Ok(token)
522            }
523            _ => Err(ProxyError::ListenerAlreadyPresent),
524        }
525    }
526
527    pub fn get_listener(&self, token: &Token) -> Option<Rc<RefCell<HttpListener>>> {
528        self.listeners.get(token).cloned()
529    }
530
531    pub fn remove_listener(&mut self, remove: RemoveListener) -> Result<(), ProxyError> {
532        let len = self.listeners.len();
533        let remove_address = remove.address.into();
534        self.listeners
535            .retain(|_, l| l.borrow().address != remove_address);
536
537        if !self.listeners.len() < len {
538            info!("no HTTP listener to remove at address {:?}", remove_address);
539        }
540        Ok(())
541    }
542
543    pub fn activate_listener(
544        &self,
545        addr: &SocketAddr,
546        tcp_listener: Option<MioTcpListener>,
547    ) -> Result<Token, ProxyError> {
548        let listener = self
549            .listeners
550            .values()
551            .find(|listener| listener.borrow().address == *addr)
552            .ok_or(ProxyError::NoListenerFound(addr.to_owned()))?;
553
554        listener
555            .borrow_mut()
556            .activate(&self.registry, tcp_listener)
557            .map_err(|listener_error| ProxyError::ListenerActivation {
558                address: *addr,
559                listener_error,
560            })
561    }
562
563    pub fn give_back_listeners(&mut self) -> Vec<(SocketAddr, MioTcpListener)> {
564        self.listeners
565            .iter()
566            .filter_map(|(_, listener)| {
567                let mut owned = listener.borrow_mut();
568                if let Some(listener) = owned.listener.take() {
569                    return Some((owned.address, listener));
570                }
571
572                None
573            })
574            .collect()
575    }
576
577    pub fn give_back_listener(
578        &mut self,
579        address: SocketAddr,
580    ) -> Result<(Token, MioTcpListener), ProxyError> {
581        let listener = self
582            .listeners
583            .values()
584            .find(|listener| listener.borrow().address == address)
585            .ok_or(ProxyError::NoListenerFound(address))?;
586
587        let mut owned = listener.borrow_mut();
588
589        let taken_listener = owned
590            .listener
591            .take()
592            .ok_or(ProxyError::UnactivatedListener)?;
593
594        Ok((owned.token, taken_listener))
595    }
596
597    pub fn add_cluster(&mut self, mut cluster: Cluster) -> Result<(), ProxyError> {
598        if let Some(answer_503) = cluster.answer_503.take() {
599            for listener in self.listeners.values() {
600                listener
601                    .borrow()
602                    .answers
603                    .borrow_mut()
604                    .add_custom_answer(&cluster.cluster_id, answer_503.clone())
605                    .map_err(|(status, error)| {
606                        ProxyError::AddCluster(ListenerError::TemplateParse(status, error))
607                    })?;
608            }
609        }
610        self.clusters.insert(cluster.cluster_id.clone(), cluster);
611        Ok(())
612    }
613
614    pub fn remove_cluster(&mut self, cluster_id: &str) -> Result<(), ProxyError> {
615        self.clusters.remove(cluster_id);
616
617        for listener in self.listeners.values() {
618            listener
619                .borrow()
620                .answers
621                .borrow_mut()
622                .remove_custom_answer(cluster_id);
623        }
624        Ok(())
625    }
626
627    pub fn add_http_frontend(&mut self, front: RequestHttpFrontend) -> Result<(), ProxyError> {
628        let front = front.clone().to_frontend().map_err(|request_error| {
629            ProxyError::WrongInputFrontend {
630                front,
631                error: request_error.to_string(),
632            }
633        })?;
634
635        let mut listener = self
636            .listeners
637            .values()
638            .find(|l| l.borrow().address == front.address)
639            .ok_or(ProxyError::NoListenerFound(front.address))?
640            .borrow_mut();
641
642        let hostname = front.hostname.to_owned();
643        let tags = front.tags.to_owned();
644
645        listener
646            .add_http_front(front)
647            .map_err(ProxyError::AddFrontend)?;
648        listener.set_tags(hostname, tags);
649        Ok(())
650    }
651
652    pub fn remove_http_frontend(&mut self, front: RequestHttpFrontend) -> Result<(), ProxyError> {
653        let front = front.clone().to_frontend().map_err(|request_error| {
654            ProxyError::WrongInputFrontend {
655                front,
656                error: request_error.to_string(),
657            }
658        })?;
659
660        let mut listener = self
661            .listeners
662            .values()
663            .find(|l| l.borrow().address == front.address)
664            .ok_or(ProxyError::NoListenerFound(front.address))?
665            .borrow_mut();
666
667        let hostname = front.hostname.to_owned();
668
669        listener
670            .remove_http_front(front)
671            .map_err(ProxyError::RemoveFrontend)?;
672
673        listener.set_tags(hostname, None);
674        Ok(())
675    }
676
677    pub fn soft_stop(&mut self) -> Result<(), ProxyError> {
678        let listeners: HashMap<_, _> = self.listeners.drain().collect();
679        let mut socket_errors = vec![];
680        for (_, l) in listeners.iter() {
681            if let Some(mut sock) = l.borrow_mut().listener.take() {
682                debug!("Deregistering socket {:?}", sock);
683                if let Err(e) = self.registry.deregister(&mut sock) {
684                    let error = format!("socket {sock:?}: {e:?}");
685                    socket_errors.push(error);
686                }
687            }
688        }
689
690        if !socket_errors.is_empty() {
691            return Err(ProxyError::SoftStop {
692                proxy_protocol: "HTTP".to_string(),
693                error: format!("Error deregistering listen sockets: {:?}", socket_errors),
694            });
695        }
696
697        Ok(())
698    }
699
700    pub fn hard_stop(&mut self) -> Result<(), ProxyError> {
701        let mut listeners: HashMap<_, _> = self.listeners.drain().collect();
702        let mut socket_errors = vec![];
703        for (_, l) in listeners.drain() {
704            if let Some(mut sock) = l.borrow_mut().listener.take() {
705                debug!("Deregistering socket {:?}", sock);
706                if let Err(e) = self.registry.deregister(&mut sock) {
707                    let error = format!("socket {sock:?}: {e:?}");
708                    socket_errors.push(error);
709                }
710            }
711        }
712
713        if !socket_errors.is_empty() {
714            return Err(ProxyError::HardStop {
715                proxy_protocol: "HTTP".to_string(),
716                error: format!("Error deregistering listen sockets: {:?}", socket_errors),
717            });
718        }
719
720        Ok(())
721    }
722}
723
724impl HttpListener {
725    pub fn new(config: HttpListenerConfig, token: Token) -> Result<HttpListener, ListenerError> {
726        Ok(HttpListener {
727            active: false,
728            address: config.address.into(),
729            answers: Rc::new(RefCell::new(
730                HttpAnswers::new(&config.http_answers)
731                    .map_err(|(status, error)| ListenerError::TemplateParse(status, error))?,
732            )),
733            config,
734            fronts: Router::new(),
735            listener: None,
736            tags: BTreeMap::new(),
737            token,
738        })
739    }
740
741    pub fn activate(
742        &mut self,
743        registry: &Registry,
744        tcp_listener: Option<MioTcpListener>,
745    ) -> Result<Token, ListenerError> {
746        if self.active {
747            return Ok(self.token);
748        }
749        let address: SocketAddr = self.config.address.into();
750
751        let mut listener = match tcp_listener {
752            Some(tcp_listener) => tcp_listener,
753            None => {
754                server_bind(address).map_err(|server_bind_error| ListenerError::Activation {
755                    address,
756                    error: server_bind_error.to_string(),
757                })?
758            }
759        };
760
761        registry
762            .register(&mut listener, self.token, Interest::READABLE)
763            .map_err(ListenerError::SocketRegistration)?;
764
765        self.listener = Some(listener);
766        self.active = true;
767        Ok(self.token)
768    }
769
770    pub fn add_http_front(&mut self, http_front: HttpFrontend) -> Result<(), ListenerError> {
771        self.fronts
772            .add_http_front(&http_front)
773            .map_err(ListenerError::AddFrontend)
774    }
775
776    pub fn remove_http_front(&mut self, http_front: HttpFrontend) -> Result<(), ListenerError> {
777        debug!("removing http_front {:?}", http_front);
778        self.fronts
779            .remove_http_front(&http_front)
780            .map_err(ListenerError::RemoveFrontend)
781    }
782
783    fn accept(&mut self) -> Result<TcpStream, AcceptError> {
784        if let Some(ref sock) = self.listener {
785            sock.accept()
786                .map_err(|e| match e.kind() {
787                    ErrorKind::WouldBlock => AcceptError::WouldBlock,
788                    _ => {
789                        error!("accept() IO error: {:?}", e);
790                        AcceptError::IoError
791                    }
792                })
793                .map(|(sock, _)| sock)
794        } else {
795            error!("cannot accept connections, no listening socket available");
796            Err(AcceptError::IoError)
797        }
798    }
799}
800
801impl ProxyConfiguration for HttpProxy {
802    fn notify(&mut self, request: WorkerRequest) -> WorkerResponse {
803        let request_id = request.id.clone();
804
805        let result = match request.content.request_type {
806            Some(RequestType::AddCluster(cluster)) => {
807                debug!("{} add cluster {:?}", request.id, cluster);
808                self.add_cluster(cluster)
809            }
810            Some(RequestType::RemoveCluster(cluster_id)) => {
811                debug!("{} remove cluster {:?}", request_id, cluster_id);
812                self.remove_cluster(&cluster_id)
813            }
814            Some(RequestType::AddHttpFrontend(front)) => {
815                debug!("{} add front {:?}", request_id, front);
816                self.add_http_frontend(front)
817            }
818            Some(RequestType::RemoveHttpFrontend(front)) => {
819                debug!("{} remove front {:?}", request_id, front);
820                self.remove_http_frontend(front)
821            }
822            Some(RequestType::RemoveListener(remove)) => {
823                debug!("removing HTTP listener at address {:?}", remove.address);
824                self.remove_listener(remove)
825            }
826            Some(RequestType::SoftStop(_)) => {
827                debug!("{} processing soft shutdown", request_id);
828                match self.soft_stop() {
829                    Ok(()) => {
830                        info!("{} soft stop successful", request_id);
831                        return WorkerResponse::processing(request.id);
832                    }
833                    Err(e) => Err(e),
834                }
835            }
836            Some(RequestType::HardStop(_)) => {
837                debug!("{} processing hard shutdown", request_id);
838                match self.hard_stop() {
839                    Ok(()) => {
840                        info!("{} hard stop successful", request_id);
841                        return WorkerResponse::processing(request.id);
842                    }
843                    Err(e) => Err(e),
844                }
845            }
846            Some(RequestType::Status(_)) => {
847                debug!("{} status", request_id);
848                Ok(())
849            }
850            other_command => {
851                debug!(
852                    "{} unsupported message for HTTP proxy, ignoring: {:?}",
853                    request.id, other_command
854                );
855                Err(ProxyError::UnsupportedMessage)
856            }
857        };
858
859        match result {
860            Ok(()) => {
861                debug!("{} successful", request_id);
862                WorkerResponse::ok(request_id)
863            }
864            Err(proxy_error) => {
865                debug!("{} unsuccessful: {}", request_id, proxy_error);
866                WorkerResponse::error(request_id, proxy_error)
867            }
868        }
869    }
870
871    fn accept(&mut self, token: ListenToken) -> Result<TcpStream, AcceptError> {
872        if let Some(listener) = self.listeners.get(&Token(token.0)) {
873            listener.borrow_mut().accept()
874        } else {
875            Err(AcceptError::IoError)
876        }
877    }
878
879    fn create_session(
880        &mut self,
881        mut frontend_sock: TcpStream,
882        listener_token: ListenToken,
883        wait_time: Duration,
884        proxy: Rc<RefCell<Self>>,
885    ) -> Result<(), AcceptError> {
886        let listener = self
887            .listeners
888            .get(&Token(listener_token.0))
889            .cloned()
890            .ok_or(AcceptError::IoError)?;
891
892        if let Err(e) = frontend_sock.set_nodelay(true) {
893            error!(
894                "error setting nodelay on front socket({:?}): {:?}",
895                frontend_sock, e
896            );
897        }
898        let mut session_manager = self.sessions.borrow_mut();
899        let session_entry = session_manager.slab.vacant_entry();
900        let session_token = Token(session_entry.key());
901        let owned = listener.borrow();
902
903        if let Err(register_error) = self.registry.register(
904            &mut frontend_sock,
905            session_token,
906            Interest::READABLE | Interest::WRITABLE,
907        ) {
908            error!(
909                "error registering listen socket({:?}): {:?}",
910                frontend_sock, register_error
911            );
912            return Err(AcceptError::RegisterError);
913        }
914
915        let public_address: SocketAddr = match owned.config.public_address {
916            Some(pub_addr) => pub_addr.into(),
917            None => owned.config.address.into(),
918        };
919
920        let session = HttpSession::new(
921            owned.answers.clone(),
922            Duration::from_secs(owned.config.back_timeout as u64),
923            Duration::from_secs(owned.config.connect_timeout as u64),
924            Duration::from_secs(owned.config.front_timeout as u64),
925            Duration::from_secs(owned.config.request_timeout as u64),
926            owned.config.expect_proxy,
927            listener.clone(),
928            Rc::downgrade(&self.pool),
929            proxy,
930            public_address,
931            frontend_sock,
932            owned.config.sticky_name.clone(),
933            session_token,
934            wait_time,
935        )?;
936
937        let session = Rc::new(RefCell::new(session));
938        session_entry.insert(session);
939
940        Ok(())
941    }
942}
943
944impl L7Proxy for HttpProxy {
945    fn kind(&self) -> ListenerType {
946        ListenerType::Http
947    }
948
949    fn register_socket(
950        &self,
951        source: &mut TcpStream,
952        token: Token,
953        interest: Interest,
954    ) -> Result<(), std::io::Error> {
955        self.registry.register(source, token, interest)
956    }
957
958    fn deregister_socket(&self, tcp_stream: &mut TcpStream) -> Result<(), std::io::Error> {
959        self.registry.deregister(tcp_stream)
960    }
961
962    fn add_session(&self, session: Rc<RefCell<dyn ProxySession>>) -> Token {
963        let mut session_manager = self.sessions.borrow_mut();
964        let entry = session_manager.slab.vacant_entry();
965        let token = Token(entry.key());
966        let _entry = entry.insert(session);
967        token
968    }
969
970    fn remove_session(&self, token: Token) -> bool {
971        self.sessions
972            .borrow_mut()
973            .slab
974            .try_remove(token.0)
975            .is_some()
976    }
977
978    fn backends(&self) -> Rc<RefCell<BackendMap>> {
979        self.backends.clone()
980    }
981
982    fn clusters(&self) -> &HashMap<ClusterId, Cluster> {
983        &self.clusters
984    }
985}
986
987pub mod testing {
988    use crate::testing::*;
989
990    /// this function is not used, but is available for example and testing purposes
991    pub fn start_http_worker(
992        config: HttpListenerConfig,
993        channel: ProxyChannel,
994        max_buffers: usize,
995        buffer_size: usize,
996    ) -> anyhow::Result<()> {
997        let address = config.address.into();
998
999        let ServerParts {
1000            event_loop,
1001            registry,
1002            sessions,
1003            pool,
1004            backends,
1005            client_scm_socket: _,
1006            server_scm_socket,
1007            server_config,
1008        } = prebuild_server(max_buffers, buffer_size, true)?;
1009
1010        let token = {
1011            let mut sessions = sessions.borrow_mut();
1012            let entry = sessions.slab.vacant_entry();
1013            let key = entry.key();
1014            let _ = entry.insert(Rc::new(RefCell::new(ListenSession {
1015                protocol: Protocol::HTTPListen,
1016            })));
1017            Token(key)
1018        };
1019
1020        let mut proxy = HttpProxy::new(registry, sessions.clone(), pool.clone(), backends.clone());
1021        proxy
1022            .add_listener(config, token)
1023            .with_context(|| "Failed at creating adding the listener")?;
1024        proxy
1025            .activate_listener(&address, None)
1026            .with_context(|| "Failed at creating activating the listener")?;
1027
1028        let mut server = Server::new(
1029            event_loop,
1030            channel,
1031            server_scm_socket,
1032            sessions,
1033            pool,
1034            backends,
1035            Some(proxy),
1036            None,
1037            None,
1038            server_config,
1039            None,
1040            false,
1041        )
1042        .with_context(|| "Failed at creating server")?;
1043
1044        debug!("starting event loop");
1045        server.run();
1046        debug!("ending event loop");
1047        Ok(())
1048    }
1049}
1050
1051#[cfg(test)]
1052mod tests {
1053    extern crate tiny_http;
1054
1055    use std::{
1056        io::{Read, Write},
1057        net::TcpStream,
1058        str,
1059        sync::{Arc, Barrier},
1060        thread,
1061        time::Duration,
1062    };
1063
1064    use sozu_command::proto::command::{CustomHttpAnswers, SocketAddress};
1065
1066    use super::{testing::start_http_worker, *};
1067    use crate::sozu_command::{
1068        channel::Channel,
1069        config::ListenerBuilder,
1070        proto::command::{LoadBalancingParams, PathRule, RulePosition, WorkerRequest},
1071        response::{Backend, HttpFrontend},
1072    };
1073
1074    /*
1075    #[test]
1076    #[cfg(target_pointer_width = "64")]
1077    fn size_test() {
1078      assert_size!(ExpectProxyProtocol<mio::net::TcpStream>, 520);
1079      assert_size!(Http<mio::net::TcpStream>, 1232);
1080      assert_size!(Pipe<mio::net::TcpStream>, 272);
1081      assert_size!(State, 1240);
1082      // fails depending on the platform?
1083      assert_size!(Session, 1592);
1084    }
1085    */
1086
1087    #[test]
1088    fn round_trip() {
1089        setup_test_logger!();
1090        let barrier = Arc::new(Barrier::new(2));
1091        start_server(1025, barrier.clone());
1092        barrier.wait();
1093
1094        let config = ListenerBuilder::new_http(SocketAddress::new_v4(127, 0, 0, 1, 1024))
1095            .to_http(None)
1096            .expect("could not create listener config");
1097
1098        let (mut command, channel) =
1099            Channel::generate(1000, 10000).expect("should create a channel");
1100        let _jg = thread::spawn(move || {
1101            setup_test_logger!();
1102            start_http_worker(config, channel, 10, 16384).expect("could not start the http server");
1103        });
1104
1105        let front = RequestHttpFrontend {
1106            cluster_id: Some(String::from("cluster_1")),
1107            address: SocketAddress::new_v4(127, 0, 0, 1, 1024),
1108            hostname: String::from("localhost"),
1109            path: PathRule::prefix(String::from("/")),
1110            ..Default::default()
1111        };
1112        command
1113            .write_message(&WorkerRequest {
1114                id: String::from("ID_ABCD"),
1115                content: RequestType::AddHttpFrontend(front).into(),
1116            })
1117            .unwrap();
1118        let backend = Backend {
1119            cluster_id: String::from("cluster_1"),
1120            backend_id: String::from("cluster_1-0"),
1121            address: SocketAddress::new_v4(127, 0, 0, 1, 1025).into(),
1122            load_balancing_parameters: Some(LoadBalancingParams::default()),
1123            sticky_id: None,
1124            backup: None,
1125        };
1126        command
1127            .write_message(&WorkerRequest {
1128                id: String::from("ID_EFGH"),
1129                content: RequestType::AddBackend(backend.to_add_backend()).into(),
1130            })
1131            .unwrap();
1132
1133        println!("test received: {:?}", command.read_message());
1134        println!("test received: {:?}", command.read_message());
1135
1136        let mut client = TcpStream::connect(("127.0.0.1", 1024)).expect("could not connect");
1137
1138        // 5 seconds of timeout
1139        client.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
1140        let w = client
1141            .write(&b"GET / HTTP/1.1\r\nHost: localhost:1024\r\nConnection: Close\r\n\r\n"[..]);
1142        println!("http client write: {w:?}");
1143
1144        barrier.wait();
1145        let mut buffer = [0; 4096];
1146        let mut index = 0;
1147
1148        loop {
1149            assert!(index <= 191);
1150            if index == 191 {
1151                break;
1152            }
1153
1154            let r = client.read(&mut buffer[index..]);
1155            println!("http client read: {r:?}");
1156            match r {
1157                Err(e) => assert!(false, "client request should not fail. Error: {e:?}"),
1158                Ok(sz) => {
1159                    index += sz;
1160                }
1161            }
1162        }
1163        println!(
1164            "Response: {}",
1165            str::from_utf8(&buffer[..index]).expect("could not make string from buffer")
1166        );
1167    }
1168
1169    #[test]
1170    fn keep_alive() {
1171        setup_test_logger!();
1172        let barrier = Arc::new(Barrier::new(2));
1173        start_server(1028, barrier.clone());
1174        barrier.wait();
1175
1176        let config = ListenerBuilder::new_http(SocketAddress::new_v4(127, 0, 0, 1, 1031))
1177            .to_http(None)
1178            .expect("could not create listener config");
1179
1180        let (mut command, channel) =
1181            Channel::generate(1000, 10000).expect("should create a channel");
1182
1183        let _jg = thread::spawn(move || {
1184            setup_test_logger!();
1185            start_http_worker(config, channel, 10, 16384).expect("could not start the http server");
1186        });
1187
1188        let front = RequestHttpFrontend {
1189            address: SocketAddress::new_v4(127, 0, 0, 1, 1031),
1190            hostname: String::from("localhost"),
1191            path: PathRule::prefix(String::from("/")),
1192            cluster_id: Some(String::from("cluster_1")),
1193            ..Default::default()
1194        };
1195        command
1196            .write_message(&WorkerRequest {
1197                id: String::from("ID_ABCD"),
1198                content: RequestType::AddHttpFrontend(front).into(),
1199            })
1200            .unwrap();
1201        let backend = Backend {
1202            address: SocketAddress::new_v4(127, 0, 0, 1, 1028).into(),
1203            backend_id: String::from("cluster_1-0"),
1204            backup: None,
1205            cluster_id: String::from("cluster_1"),
1206            load_balancing_parameters: Some(LoadBalancingParams::default()),
1207            sticky_id: None,
1208        };
1209        command
1210            .write_message(&WorkerRequest {
1211                id: String::from("ID_EFGH"),
1212                content: RequestType::AddBackend(backend.to_add_backend()).into(),
1213            })
1214            .unwrap();
1215
1216        println!("test received: {:?}", command.read_message());
1217        println!("test received: {:?}", command.read_message());
1218
1219        let mut client = TcpStream::connect(("127.0.0.1", 1031)).expect("could not connect");
1220        // 5 seconds of timeout
1221        client.set_read_timeout(Some(Duration::new(5, 0))).unwrap();
1222
1223        let w = client
1224            .write(&b"GET / HTTP/1.1\r\nHost: localhost:1031\r\n\r\n"[..])
1225            .unwrap();
1226        println!("http client write: {w:?}");
1227        barrier.wait();
1228
1229        let mut buffer = [0; 4096];
1230        let mut index = 0;
1231
1232        loop {
1233            assert!(index <= 191);
1234            if index == 191 {
1235                break;
1236            }
1237
1238            let r = client.read(&mut buffer[index..]);
1239            println!("http client read: {r:?}");
1240            match r {
1241                Err(e) => assert!(false, "client request should not fail. Error: {e:?}"),
1242                Ok(sz) => {
1243                    index += sz;
1244                }
1245            }
1246        }
1247
1248        println!(
1249            "Response: {}",
1250            str::from_utf8(&buffer[..index]).expect("could not make string from buffer")
1251        );
1252
1253        println!("first request ended, will send second one");
1254        let w2 = client.write(&b"GET / HTTP/1.1\r\nHost: localhost:1031\r\n\r\n"[..]);
1255        println!("http client write: {w2:?}");
1256        barrier.wait();
1257
1258        let mut buffer2 = [0; 4096];
1259        let mut index = 0;
1260
1261        loop {
1262            assert!(index <= 191);
1263            if index == 191 {
1264                break;
1265            }
1266
1267            let r2 = client.read(&mut buffer2[index..]);
1268            println!("http client read: {r2:?}");
1269            match r2 {
1270                Err(e) => assert!(false, "client request should not fail. Error: {e:?}"),
1271                Ok(sz) => {
1272                    index += sz;
1273                }
1274            }
1275        }
1276        println!(
1277            "Response: {}",
1278            str::from_utf8(&buffer2[..index]).expect("could not make string from buffer")
1279        );
1280    }
1281
1282    use self::tiny_http::{Response, Server};
1283
1284    fn start_server(port: u16, barrier: Arc<Barrier>) {
1285        thread::spawn(move || {
1286            setup_test_logger!();
1287            let server =
1288                Server::http(&format!("127.0.0.1:{port}")).expect("could not create server");
1289            info!("starting web server in port {}", port);
1290            barrier.wait();
1291
1292            for request in server.incoming_requests() {
1293                info!(
1294                    "backend web server got request -> method: {:?}, url: {:?}, headers: {:?}",
1295                    request.method(),
1296                    request.url(),
1297                    request.headers()
1298                );
1299
1300                let response = Response::from_string("hello world");
1301                request.respond(response).unwrap();
1302                info!("backend web server sent response");
1303                barrier.wait();
1304                info!("server session stopped");
1305            }
1306
1307            println!("server on port {port} closed");
1308        });
1309    }
1310
1311    #[test]
1312    fn frontend_from_request_test() {
1313        let cluster_id1 = "cluster_1".to_owned();
1314        let cluster_id2 = "cluster_2".to_owned();
1315        let cluster_id3 = "cluster_3".to_owned();
1316        let uri1 = "/".to_owned();
1317        let uri2 = "/yolo".to_owned();
1318        let uri3 = "/yolo/swag".to_owned();
1319
1320        let mut fronts = Router::new();
1321        fronts
1322            .add_http_front(&HttpFrontend {
1323                address: "0.0.0.0:80".parse().unwrap(),
1324                hostname: "lolcatho.st".to_owned(),
1325                method: None,
1326                path: PathRule::prefix(uri1),
1327                position: RulePosition::Tree,
1328                cluster_id: Some(cluster_id1),
1329                tags: None,
1330            })
1331            .expect("Could not add http frontend");
1332        fronts
1333            .add_http_front(&HttpFrontend {
1334                address: "0.0.0.0:80".parse().unwrap(),
1335                hostname: "lolcatho.st".to_owned(),
1336                method: None,
1337                path: PathRule::prefix(uri2),
1338                position: RulePosition::Tree,
1339                cluster_id: Some(cluster_id2),
1340                tags: None,
1341            })
1342            .expect("Could not add http frontend");
1343        fronts
1344            .add_http_front(&HttpFrontend {
1345                address: "0.0.0.0:80".parse().unwrap(),
1346                hostname: "lolcatho.st".to_owned(),
1347                method: None,
1348                path: PathRule::prefix(uri3),
1349                position: RulePosition::Tree,
1350                cluster_id: Some(cluster_id3),
1351                tags: None,
1352            })
1353            .expect("Could not add http frontend");
1354        fronts
1355            .add_http_front(&HttpFrontend {
1356                address: "0.0.0.0:80".parse().unwrap(),
1357                hostname: "other.domain".to_owned(),
1358                method: None,
1359                path: PathRule::prefix("/test".to_owned()),
1360                position: RulePosition::Tree,
1361                cluster_id: Some("cluster_1".to_owned()),
1362                tags: None,
1363            })
1364            .expect("Could not add http frontend");
1365
1366        let address = SocketAddress::new_v4(127, 0, 0, 1, 1030);
1367
1368        let default_config = ListenerBuilder::new_http(address.clone())
1369            .to_http(None)
1370            .expect("Could not create default HTTP listener config");
1371
1372        let listener = HttpListener {
1373            listener: None,
1374            address: address.into(),
1375            fronts,
1376            answers: Rc::new(RefCell::new(
1377                HttpAnswers::new(&Some(CustomHttpAnswers::default())).unwrap(),
1378            )),
1379            config: default_config,
1380            token: Token(0),
1381            active: true,
1382            tags: BTreeMap::new(),
1383        };
1384
1385        let frontend1 = listener.frontend_from_request("lolcatho.st", "/", &Method::Get);
1386        let frontend2 = listener.frontend_from_request("lolcatho.st", "/test", &Method::Get);
1387        let frontend3 = listener.frontend_from_request("lolcatho.st", "/yolo/test", &Method::Get);
1388        let frontend4 = listener.frontend_from_request("lolcatho.st", "/yolo/swag", &Method::Get);
1389        let frontend5 = listener.frontend_from_request("domain", "/", &Method::Get);
1390        assert_eq!(
1391            frontend1.expect("should find frontend"),
1392            Route::ClusterId("cluster_1".to_string())
1393        );
1394        assert_eq!(
1395            frontend2.expect("should find frontend"),
1396            Route::ClusterId("cluster_1".to_string())
1397        );
1398        assert_eq!(
1399            frontend3.expect("should find frontend"),
1400            Route::ClusterId("cluster_2".to_string())
1401        );
1402        assert_eq!(
1403            frontend4.expect("should find frontend"),
1404            Route::ClusterId("cluster_3".to_string())
1405        );
1406        assert!(frontend5.is_err());
1407    }
1408}