sozu_lib/
http.rs

1use std::{
2    cell::RefCell,
3    collections::{hash_map::Entry, BTreeMap, HashMap},
4    io::ErrorKind,
5    net::{Shutdown, SocketAddr},
6    os::unix::io::AsRawFd,
7    rc::{Rc, Weak},
8    str::from_utf8_unchecked,
9    time::{Duration, Instant},
10};
11
12use mio::{
13    net::{TcpListener as MioTcpListener, TcpStream},
14    unix::SourceFd,
15    Interest, Registry, Token,
16};
17use rusty_ulid::Ulid;
18
19use sozu_command::{
20    logging::CachedTags,
21    proto::command::{
22        request::RequestType, Cluster, HttpListenerConfig, ListenerType, RemoveListener,
23        RequestHttpFrontend, WorkerRequest, WorkerResponse,
24    },
25    ready::Ready,
26    response::HttpFrontend,
27    state::ClusterId,
28};
29
30use crate::{
31    backends::BackendMap,
32    pool::Pool,
33    protocol::{
34        http::{
35            answers::HttpAnswers,
36            parser::{hostname_and_port, Method},
37            ResponseStream,
38        },
39        proxy_protocol::expect::ExpectProxyProtocol,
40        Http, Pipe, SessionState,
41    },
42    router::{Route, Router},
43    server::{ListenToken, SessionManager},
44    socket::server_bind,
45    timer::TimeoutContainer,
46    AcceptError, FrontendFromRequestError, L7ListenerHandler, L7Proxy, ListenerError,
47    ListenerHandler, Protocol, ProxyConfiguration, ProxyError, ProxySession, SessionIsToBeClosed,
48    SessionMetrics, SessionResult, StateMachineBuilder, StateResult,
49};
50
51#[derive(PartialEq, Eq)]
52pub enum SessionStatus {
53    Normal,
54    DefaultAnswer,
55}
56
57StateMachineBuilder! {
58    /// The various Stages of an HTTP connection:
59    ///
60    /// 1. optional (ExpectProxyProtocol)
61    /// 2. HTTP
62    /// 3. WebSocket (passthrough)
63    enum HttpStateMachine impl SessionState {
64        Expect(ExpectProxyProtocol<TcpStream>),
65        Http(Http<TcpStream, HttpListener>),
66        WebSocket(Pipe<TcpStream, HttpListener>),
67    }
68}
69
70/// HTTP Session to insert in the SessionManager
71///
72/// 1 session <=> 1 HTTP connection (client to sozu)
73pub struct HttpSession {
74    answers: Rc<RefCell<HttpAnswers>>,
75    configured_backend_timeout: Duration,
76    configured_connect_timeout: Duration,
77    configured_frontend_timeout: Duration,
78    frontend_token: Token,
79    last_event: Instant,
80    listener: Rc<RefCell<HttpListener>>,
81    metrics: SessionMetrics,
82    pool: Weak<RefCell<Pool>>,
83    proxy: Rc<RefCell<HttpProxy>>,
84    state: HttpStateMachine,
85    sticky_name: String,
86    has_been_closed: bool,
87}
88
89impl HttpSession {
90    #[allow(clippy::too_many_arguments)]
91    pub fn new(
92        answers: Rc<RefCell<HttpAnswers>>,
93        configured_backend_timeout: Duration,
94        configured_connect_timeout: Duration,
95        configured_frontend_timeout: Duration,
96        configured_request_timeout: Duration,
97        expect_proxy: bool,
98        listener: Rc<RefCell<HttpListener>>,
99        pool: Weak<RefCell<Pool>>,
100        proxy: Rc<RefCell<HttpProxy>>,
101        public_address: SocketAddr,
102        sock: TcpStream,
103        sticky_name: String,
104        token: Token,
105        wait_time: Duration,
106    ) -> Result<Self, AcceptError> {
107        let request_id = Ulid::generate();
108        let container_frontend_timeout = TimeoutContainer::new(configured_request_timeout, token);
109
110        let state = if expect_proxy {
111            trace!("starting in expect proxy state");
112            gauge_add!("protocol.proxy.expect", 1);
113
114            HttpStateMachine::Expect(ExpectProxyProtocol::new(
115                container_frontend_timeout,
116                sock,
117                token,
118                request_id,
119            ))
120        } else {
121            gauge_add!("protocol.http", 1);
122            let session_address = sock.peer_addr().ok();
123
124            HttpStateMachine::Http(Http::new(
125                answers.clone(),
126                configured_backend_timeout,
127                configured_connect_timeout,
128                configured_frontend_timeout,
129                container_frontend_timeout,
130                sock,
131                token,
132                listener.clone(),
133                pool.clone(),
134                Protocol::HTTP,
135                public_address,
136                request_id,
137                session_address,
138                sticky_name.clone(),
139            )?)
140        };
141
142        let metrics = SessionMetrics::new(Some(wait_time));
143        Ok(HttpSession {
144            answers,
145            configured_backend_timeout,
146            configured_connect_timeout,
147            configured_frontend_timeout,
148            frontend_token: token,
149            has_been_closed: false,
150            last_event: Instant::now(),
151            listener,
152            metrics,
153            pool,
154            proxy,
155            state,
156            sticky_name,
157        })
158    }
159
160    pub fn upgrade(&mut self) -> SessionIsToBeClosed {
161        debug!("HTTP::upgrade");
162        let new_state = match self.state.take() {
163            HttpStateMachine::Http(http) => self.upgrade_http(http),
164            HttpStateMachine::Expect(expect) => self.upgrade_expect(expect),
165            HttpStateMachine::WebSocket(ws) => self.upgrade_websocket(ws),
166            HttpStateMachine::FailedUpgrade(_) => unreachable!(),
167        };
168
169        match new_state {
170            Some(state) => {
171                self.state = state;
172                false
173            }
174            // The state stays FailedUpgrade, but the Session should be closed right after
175            None => true,
176        }
177    }
178
179    fn upgrade_expect(
180        &mut self,
181        expect: ExpectProxyProtocol<TcpStream>,
182    ) -> Option<HttpStateMachine> {
183        debug!("switching to HTTP");
184        match expect
185            .addresses
186            .as_ref()
187            .map(|add| (add.destination(), add.source()))
188        {
189            Some((Some(public_address), Some(session_address))) => {
190                let mut http = Http::new(
191                    self.answers.clone(),
192                    self.configured_backend_timeout,
193                    self.configured_connect_timeout,
194                    self.configured_frontend_timeout,
195                    expect.container_frontend_timeout,
196                    expect.frontend,
197                    expect.frontend_token,
198                    self.listener.clone(),
199                    self.pool.clone(),
200                    Protocol::HTTP,
201                    public_address,
202                    expect.request_id,
203                    Some(session_address),
204                    self.sticky_name.clone(),
205                )
206                .ok()?;
207                http.frontend_readiness.event = expect.frontend_readiness.event;
208
209                gauge_add!("protocol.proxy.expect", -1);
210                gauge_add!("protocol.http", 1);
211                Some(HttpStateMachine::Http(http))
212            }
213            _ => None,
214        }
215    }
216
217    fn upgrade_http(&mut self, http: Http<TcpStream, HttpListener>) -> Option<HttpStateMachine> {
218        debug!("http switching to ws");
219        let front_token = self.frontend_token;
220        let back_token = match http.backend_token {
221            Some(back_token) => back_token,
222            None => {
223                warn!(
224                    "Could not upgrade http request on cluster '{:?}' ({:?}) using backend '{:?}' into websocket for request '{}'",
225                    http.context.cluster_id, self.frontend_token, http.context.backend_id, http.context.id
226                );
227                return None;
228            }
229        };
230
231        let ws_context = http.websocket_context();
232        let mut container_frontend_timeout = http.container_frontend_timeout;
233        let mut container_backend_timeout = http.container_backend_timeout;
234        container_frontend_timeout.reset();
235        container_backend_timeout.reset();
236
237        let backend_buffer = if let ResponseStream::BackendAnswer(kawa) = http.response_stream {
238            kawa.storage.buffer
239        } else {
240            return None;
241        };
242
243        let mut pipe = Pipe::new(
244            backend_buffer,
245            http.context.backend_id,
246            http.backend_socket,
247            http.backend,
248            Some(container_backend_timeout),
249            Some(container_frontend_timeout),
250            http.context.cluster_id,
251            http.request_stream.storage.buffer,
252            front_token,
253            http.frontend_socket,
254            self.listener.clone(),
255            Protocol::HTTP,
256            http.context.id,
257            http.context.session_address,
258            ws_context,
259        );
260
261        pipe.frontend_readiness.event = http.frontend_readiness.event;
262        pipe.backend_readiness.event = http.backend_readiness.event;
263        pipe.set_back_token(back_token);
264
265        gauge_add!("protocol.http", -1);
266        gauge_add!("protocol.ws", 1);
267        gauge_add!("http.active_requests", -1);
268        gauge_add!("websocket.active_requests", 1);
269        Some(HttpStateMachine::WebSocket(pipe))
270    }
271
272    fn upgrade_websocket(&self, ws: Pipe<TcpStream, HttpListener>) -> Option<HttpStateMachine> {
273        // what do we do here?
274        error!("Upgrade called on WS, this should not happen");
275        Some(HttpStateMachine::WebSocket(ws))
276    }
277}
278
279impl ProxySession for HttpSession {
280    fn close(&mut self) {
281        if self.has_been_closed {
282            return;
283        }
284
285        trace!("Closing HTTP session");
286        self.metrics.service_stop();
287
288        // Restore gauges
289        match self.state.marker() {
290            StateMarker::Expect => gauge_add!("protocol.proxy.expect", -1),
291            StateMarker::Http => gauge_add!("protocol.http", -1),
292            StateMarker::WebSocket => {
293                gauge_add!("protocol.ws", -1);
294                gauge_add!("websocket.active_requests", -1);
295            }
296        }
297
298        if self.state.failed() {
299            match self.state.marker() {
300                StateMarker::Expect => incr!("http.upgrade.expect.failed"),
301                StateMarker::Http => incr!("http.upgrade.http.failed"),
302                StateMarker::WebSocket => incr!("http.upgrade.ws.failed"),
303            }
304            return;
305        }
306
307        self.state.cancel_timeouts();
308        // defer backend closing to the state
309        self.state.close(self.proxy.clone(), &mut self.metrics);
310
311        let front_socket = self.state.front_socket();
312        if let Err(e) = front_socket.shutdown(Shutdown::Both) {
313            // error 107 NotConnected can happen when was never fully connected, or was already disconnected due to error
314            if e.kind() != ErrorKind::NotConnected {
315                error!(
316                    "error shutting down front socket({:?}): {:?}",
317                    front_socket, e
318                )
319            }
320        }
321
322        // deregister the frontend and remove it
323        let proxy = self.proxy.borrow();
324        let fd = front_socket.as_raw_fd();
325        if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
326            error!(
327                "error deregistering front socket({:?}) while closing HTTP session: {:?}",
328                fd, e
329            );
330        }
331        proxy.remove_session(self.frontend_token);
332
333        self.has_been_closed = true;
334    }
335
336    fn timeout(&mut self, token: Token) -> SessionIsToBeClosed {
337        let state_result = self.state.timeout(token, &mut self.metrics);
338        state_result == StateResult::CloseSession
339    }
340
341    fn protocol(&self) -> Protocol {
342        Protocol::HTTP
343    }
344
345    fn update_readiness(&mut self, token: Token, events: Ready) {
346        trace!(
347            "token {:?} got event {}",
348            token,
349            super::ready_to_string(events)
350        );
351        self.last_event = Instant::now();
352        self.metrics.wait_start();
353        self.state.update_readiness(token, events);
354    }
355
356    fn ready(&mut self, session: Rc<RefCell<dyn ProxySession>>) -> SessionIsToBeClosed {
357        self.metrics.service_start();
358
359        let session_result =
360            self.state
361                .ready(session.clone(), self.proxy.clone(), &mut self.metrics);
362
363        let to_be_closed = match session_result {
364            SessionResult::Close => true,
365            SessionResult::Continue => false,
366            SessionResult::Upgrade => match self.upgrade() {
367                false => self.ready(session),
368                true => true,
369            },
370        };
371
372        self.metrics.service_stop();
373        to_be_closed
374    }
375
376    fn shutting_down(&mut self) -> SessionIsToBeClosed {
377        self.state.shutting_down()
378    }
379
380    fn last_event(&self) -> Instant {
381        self.last_event
382    }
383
384    fn print_session(&self) {
385        self.state.print_state("HTTP");
386        error!("Metrics: {:?}", self.metrics);
387    }
388
389    fn frontend_token(&self) -> Token {
390        self.frontend_token
391    }
392}
393
394pub type Hostname = String;
395
396pub struct HttpListener {
397    active: bool,
398    address: SocketAddr,
399    answers: Rc<RefCell<HttpAnswers>>,
400    config: HttpListenerConfig,
401    fronts: Router,
402    listener: Option<MioTcpListener>,
403    tags: BTreeMap<String, CachedTags>,
404    token: Token,
405}
406
407impl ListenerHandler for HttpListener {
408    fn get_addr(&self) -> &SocketAddr {
409        &self.address
410    }
411
412    fn get_tags(&self, key: &str) -> Option<&CachedTags> {
413        self.tags.get(key)
414    }
415
416    fn set_tags(&mut self, key: String, tags: Option<BTreeMap<String, String>>) {
417        match tags {
418            Some(tags) => self.tags.insert(key, CachedTags::new(tags)),
419            None => self.tags.remove(&key),
420        };
421    }
422}
423
424impl L7ListenerHandler for HttpListener {
425    fn get_sticky_name(&self) -> &str {
426        &self.config.sticky_name
427    }
428
429    fn get_connect_timeout(&self) -> u32 {
430        self.config.connect_timeout
431    }
432
433    // redundant, already called once in extract_route
434    fn frontend_from_request(
435        &self,
436        host: &str,
437        uri: &str,
438        method: &Method,
439    ) -> Result<Route, FrontendFromRequestError> {
440        let start = Instant::now();
441        let (remaining_input, (hostname, _)) = match hostname_and_port(host.as_bytes()) {
442            Ok(tuple) => tuple,
443            Err(parse_error) => {
444                // parse_error contains a slice of given_host, which should NOT escape this scope
445                return Err(FrontendFromRequestError::HostParse {
446                    host: host.to_owned(),
447                    error: parse_error.to_string(),
448                });
449            }
450        };
451        if remaining_input != &b""[..] {
452            return Err(FrontendFromRequestError::InvalidCharsAfterHost(
453                host.to_owned(),
454            ));
455        }
456
457        /*if port == Some(&b"80"[..]) {
458        // it is alright to call from_utf8_unchecked,
459        // we already verified that there are only ascii
460        // chars in there
461          unsafe { from_utf8_unchecked(hostname) }
462        } else {
463          host
464        }
465        */
466        let host = unsafe { from_utf8_unchecked(hostname) };
467
468        let route = self.fronts.lookup(host, uri, method).map_err(|e| {
469            incr!("http.failed_backend_matching");
470            FrontendFromRequestError::NoClusterFound(e)
471        })?;
472
473        let now = Instant::now();
474
475        if let Route::ClusterId(cluster) = &route {
476            time!("frontend_matching_time", cluster, (now - start).as_millis());
477        }
478
479        Ok(route)
480    }
481}
482
483pub struct HttpProxy {
484    backends: Rc<RefCell<BackendMap>>,
485    clusters: HashMap<ClusterId, Cluster>,
486    listeners: HashMap<Token, Rc<RefCell<HttpListener>>>,
487    pool: Rc<RefCell<Pool>>,
488    registry: Registry,
489    sessions: Rc<RefCell<SessionManager>>,
490}
491
492impl HttpProxy {
493    pub fn new(
494        registry: Registry,
495        sessions: Rc<RefCell<SessionManager>>,
496        pool: Rc<RefCell<Pool>>,
497        backends: Rc<RefCell<BackendMap>>,
498    ) -> HttpProxy {
499        HttpProxy {
500            backends,
501            clusters: HashMap::new(),
502            listeners: HashMap::new(),
503            pool,
504            registry,
505            sessions,
506        }
507    }
508
509    pub fn add_listener(
510        &mut self,
511        config: HttpListenerConfig,
512        token: Token,
513    ) -> Result<Token, ProxyError> {
514        match self.listeners.entry(token) {
515            Entry::Vacant(entry) => {
516                let http_listener =
517                    HttpListener::new(config, token).map_err(ProxyError::AddListener)?;
518                entry.insert(Rc::new(RefCell::new(http_listener)));
519                Ok(token)
520            }
521            _ => Err(ProxyError::ListenerAlreadyPresent),
522        }
523    }
524
525    pub fn get_listener(&self, token: &Token) -> Option<Rc<RefCell<HttpListener>>> {
526        self.listeners.get(token).cloned()
527    }
528
529    pub fn remove_listener(&mut self, remove: RemoveListener) -> Result<(), ProxyError> {
530        let len = self.listeners.len();
531        let remove_address = remove.address.into();
532        self.listeners
533            .retain(|_, l| l.borrow().address != remove_address);
534
535        if !self.listeners.len() < len {
536            info!("no HTTP listener to remove at address {:?}", remove_address);
537        }
538        Ok(())
539    }
540
541    pub fn activate_listener(
542        &self,
543        addr: &SocketAddr,
544        tcp_listener: Option<MioTcpListener>,
545    ) -> Result<Token, ProxyError> {
546        let listener = self
547            .listeners
548            .values()
549            .find(|listener| listener.borrow().address == *addr)
550            .ok_or(ProxyError::NoListenerFound(addr.to_owned()))?;
551
552        listener
553            .borrow_mut()
554            .activate(&self.registry, tcp_listener)
555            .map_err(|listener_error| ProxyError::ListenerActivation {
556                address: *addr,
557                listener_error,
558            })
559    }
560
561    pub fn give_back_listeners(&mut self) -> Vec<(SocketAddr, MioTcpListener)> {
562        self.listeners
563            .iter()
564            .filter_map(|(_, listener)| {
565                let mut owned = listener.borrow_mut();
566                if let Some(listener) = owned.listener.take() {
567                    return Some((owned.address, listener));
568                }
569
570                None
571            })
572            .collect()
573    }
574
575    pub fn give_back_listener(
576        &mut self,
577        address: SocketAddr,
578    ) -> Result<(Token, MioTcpListener), ProxyError> {
579        let listener = self
580            .listeners
581            .values()
582            .find(|listener| listener.borrow().address == address)
583            .ok_or(ProxyError::NoListenerFound(address))?;
584
585        let mut owned = listener.borrow_mut();
586
587        let taken_listener = owned
588            .listener
589            .take()
590            .ok_or(ProxyError::UnactivatedListener)?;
591
592        Ok((owned.token, taken_listener))
593    }
594
595    pub fn add_cluster(&mut self, mut cluster: Cluster) -> Result<(), ProxyError> {
596        if let Some(answer_503) = cluster.answer_503.take() {
597            for listener in self.listeners.values() {
598                listener
599                    .borrow()
600                    .answers
601                    .borrow_mut()
602                    .add_custom_answer(&cluster.cluster_id, answer_503.clone())
603                    .map_err(|(status, error)| {
604                        ProxyError::AddCluster(ListenerError::TemplateParse(status, error))
605                    })?;
606            }
607        }
608        self.clusters.insert(cluster.cluster_id.clone(), cluster);
609        Ok(())
610    }
611
612    pub fn remove_cluster(&mut self, cluster_id: &str) -> Result<(), ProxyError> {
613        self.clusters.remove(cluster_id);
614
615        for listener in self.listeners.values() {
616            listener
617                .borrow()
618                .answers
619                .borrow_mut()
620                .remove_custom_answer(cluster_id);
621        }
622        Ok(())
623    }
624
625    pub fn add_http_frontend(&mut self, front: RequestHttpFrontend) -> Result<(), ProxyError> {
626        let front = front.clone().to_frontend().map_err(|request_error| {
627            ProxyError::WrongInputFrontend {
628                front,
629                error: request_error.to_string(),
630            }
631        })?;
632
633        let mut listener = self
634            .listeners
635            .values()
636            .find(|l| l.borrow().address == front.address)
637            .ok_or(ProxyError::NoListenerFound(front.address))?
638            .borrow_mut();
639
640        let hostname = front.hostname.to_owned();
641        let tags = front.tags.to_owned();
642
643        listener
644            .add_http_front(front)
645            .map_err(ProxyError::AddFrontend)?;
646        listener.set_tags(hostname, tags);
647        Ok(())
648    }
649
650    pub fn remove_http_frontend(&mut self, front: RequestHttpFrontend) -> Result<(), ProxyError> {
651        let front = front.clone().to_frontend().map_err(|request_error| {
652            ProxyError::WrongInputFrontend {
653                front,
654                error: request_error.to_string(),
655            }
656        })?;
657
658        let mut listener = self
659            .listeners
660            .values()
661            .find(|l| l.borrow().address == front.address)
662            .ok_or(ProxyError::NoListenerFound(front.address))?
663            .borrow_mut();
664
665        let hostname = front.hostname.to_owned();
666
667        listener
668            .remove_http_front(front)
669            .map_err(ProxyError::RemoveFrontend)?;
670
671        listener.set_tags(hostname, None);
672        Ok(())
673    }
674
675    pub fn soft_stop(&mut self) -> Result<(), ProxyError> {
676        let listeners: HashMap<_, _> = self.listeners.drain().collect();
677        let mut socket_errors = vec![];
678        for (_, l) in listeners.iter() {
679            if let Some(mut sock) = l.borrow_mut().listener.take() {
680                debug!("Deregistering socket {:?}", sock);
681                if let Err(e) = self.registry.deregister(&mut sock) {
682                    let error = format!("socket {sock:?}: {e:?}");
683                    socket_errors.push(error);
684                }
685            }
686        }
687
688        if !socket_errors.is_empty() {
689            return Err(ProxyError::SoftStop {
690                proxy_protocol: "HTTP".to_string(),
691                error: format!("Error deregistering listen sockets: {:?}", socket_errors),
692            });
693        }
694
695        Ok(())
696    }
697
698    pub fn hard_stop(&mut self) -> Result<(), ProxyError> {
699        let mut listeners: HashMap<_, _> = self.listeners.drain().collect();
700        let mut socket_errors = vec![];
701        for (_, l) in listeners.drain() {
702            if let Some(mut sock) = l.borrow_mut().listener.take() {
703                debug!("Deregistering socket {:?}", sock);
704                if let Err(e) = self.registry.deregister(&mut sock) {
705                    let error = format!("socket {sock:?}: {e:?}");
706                    socket_errors.push(error);
707                }
708            }
709        }
710
711        if !socket_errors.is_empty() {
712            return Err(ProxyError::HardStop {
713                proxy_protocol: "HTTP".to_string(),
714                error: format!("Error deregistering listen sockets: {:?}", socket_errors),
715            });
716        }
717
718        Ok(())
719    }
720}
721
722impl HttpListener {
723    pub fn new(config: HttpListenerConfig, token: Token) -> Result<HttpListener, ListenerError> {
724        Ok(HttpListener {
725            active: false,
726            address: config.address.into(),
727            answers: Rc::new(RefCell::new(
728                HttpAnswers::new(&config.http_answers)
729                    .map_err(|(status, error)| ListenerError::TemplateParse(status, error))?,
730            )),
731            config,
732            fronts: Router::new(),
733            listener: None,
734            tags: BTreeMap::new(),
735            token,
736        })
737    }
738
739    pub fn activate(
740        &mut self,
741        registry: &Registry,
742        tcp_listener: Option<MioTcpListener>,
743    ) -> Result<Token, ListenerError> {
744        if self.active {
745            return Ok(self.token);
746        }
747        let address: SocketAddr = self.config.address.into();
748
749        let mut listener = match tcp_listener {
750            Some(tcp_listener) => tcp_listener,
751            None => {
752                server_bind(address).map_err(|server_bind_error| ListenerError::Activation {
753                    address,
754                    error: server_bind_error.to_string(),
755                })?
756            }
757        };
758
759        registry
760            .register(&mut listener, self.token, Interest::READABLE)
761            .map_err(ListenerError::SocketRegistration)?;
762
763        self.listener = Some(listener);
764        self.active = true;
765        Ok(self.token)
766    }
767
768    pub fn add_http_front(&mut self, http_front: HttpFrontend) -> Result<(), ListenerError> {
769        self.fronts
770            .add_http_front(&http_front)
771            .map_err(ListenerError::AddFrontend)
772    }
773
774    pub fn remove_http_front(&mut self, http_front: HttpFrontend) -> Result<(), ListenerError> {
775        debug!("removing http_front {:?}", http_front);
776        self.fronts
777            .remove_http_front(&http_front)
778            .map_err(ListenerError::RemoveFrontend)
779    }
780
781    fn accept(&mut self) -> Result<TcpStream, AcceptError> {
782        if let Some(ref sock) = self.listener {
783            sock.accept()
784                .map_err(|e| match e.kind() {
785                    ErrorKind::WouldBlock => AcceptError::WouldBlock,
786                    _ => {
787                        error!("accept() IO error: {:?}", e);
788                        AcceptError::IoError
789                    }
790                })
791                .map(|(sock, _)| sock)
792        } else {
793            error!("cannot accept connections, no listening socket available");
794            Err(AcceptError::IoError)
795        }
796    }
797}
798
799impl ProxyConfiguration for HttpProxy {
800    fn notify(&mut self, request: WorkerRequest) -> WorkerResponse {
801        let request_id = request.id.clone();
802
803        let result = match request.content.request_type {
804            Some(RequestType::AddCluster(cluster)) => {
805                debug!("{} add cluster {:?}", request.id, cluster);
806                self.add_cluster(cluster)
807            }
808            Some(RequestType::RemoveCluster(cluster_id)) => {
809                debug!("{} remove cluster {:?}", request_id, cluster_id);
810                self.remove_cluster(&cluster_id)
811            }
812            Some(RequestType::AddHttpFrontend(front)) => {
813                debug!("{} add front {:?}", request_id, front);
814                self.add_http_frontend(front)
815            }
816            Some(RequestType::RemoveHttpFrontend(front)) => {
817                debug!("{} remove front {:?}", request_id, front);
818                self.remove_http_frontend(front)
819            }
820            Some(RequestType::RemoveListener(remove)) => {
821                debug!("removing HTTP listener at address {:?}", remove.address);
822                self.remove_listener(remove)
823            }
824            Some(RequestType::SoftStop(_)) => {
825                debug!("{} processing soft shutdown", request_id);
826                match self.soft_stop() {
827                    Ok(()) => {
828                        info!("{} soft stop successful", request_id);
829                        return WorkerResponse::processing(request.id);
830                    }
831                    Err(e) => Err(e),
832                }
833            }
834            Some(RequestType::HardStop(_)) => {
835                debug!("{} processing hard shutdown", request_id);
836                match self.hard_stop() {
837                    Ok(()) => {
838                        info!("{} hard stop successful", request_id);
839                        return WorkerResponse::processing(request.id);
840                    }
841                    Err(e) => Err(e),
842                }
843            }
844            Some(RequestType::Status(_)) => {
845                debug!("{} status", request_id);
846                Ok(())
847            }
848            other_command => {
849                debug!(
850                    "{} unsupported message for HTTP proxy, ignoring: {:?}",
851                    request.id, other_command
852                );
853                Err(ProxyError::UnsupportedMessage)
854            }
855        };
856
857        match result {
858            Ok(()) => {
859                debug!("{} successful", request_id);
860                WorkerResponse::ok(request_id)
861            }
862            Err(proxy_error) => {
863                debug!("{} unsuccessful: {}", request_id, proxy_error);
864                WorkerResponse::error(request_id, proxy_error)
865            }
866        }
867    }
868
869    fn accept(&mut self, token: ListenToken) -> Result<TcpStream, AcceptError> {
870        if let Some(listener) = self.listeners.get(&Token(token.0)) {
871            listener.borrow_mut().accept()
872        } else {
873            Err(AcceptError::IoError)
874        }
875    }
876
877    fn create_session(
878        &mut self,
879        mut frontend_sock: TcpStream,
880        listener_token: ListenToken,
881        wait_time: Duration,
882        proxy: Rc<RefCell<Self>>,
883    ) -> Result<(), AcceptError> {
884        let listener = self
885            .listeners
886            .get(&Token(listener_token.0))
887            .cloned()
888            .ok_or(AcceptError::IoError)?;
889
890        if let Err(e) = frontend_sock.set_nodelay(true) {
891            error!(
892                "error setting nodelay on front socket({:?}): {:?}",
893                frontend_sock, e
894            );
895        }
896        let mut session_manager = self.sessions.borrow_mut();
897        let session_entry = session_manager.slab.vacant_entry();
898        let session_token = Token(session_entry.key());
899        let owned = listener.borrow();
900
901        if let Err(register_error) = self.registry.register(
902            &mut frontend_sock,
903            session_token,
904            Interest::READABLE | Interest::WRITABLE,
905        ) {
906            error!(
907                "error registering listen socket({:?}): {:?}",
908                frontend_sock, register_error
909            );
910            return Err(AcceptError::RegisterError);
911        }
912
913        let public_address: SocketAddr = match owned.config.public_address {
914            Some(pub_addr) => pub_addr.into(),
915            None => owned.config.address.into(),
916        };
917
918        let session = HttpSession::new(
919            owned.answers.clone(),
920            Duration::from_secs(owned.config.back_timeout as u64),
921            Duration::from_secs(owned.config.connect_timeout as u64),
922            Duration::from_secs(owned.config.front_timeout as u64),
923            Duration::from_secs(owned.config.request_timeout as u64),
924            owned.config.expect_proxy,
925            listener.clone(),
926            Rc::downgrade(&self.pool),
927            proxy,
928            public_address,
929            frontend_sock,
930            owned.config.sticky_name.clone(),
931            session_token,
932            wait_time,
933        )?;
934
935        let session = Rc::new(RefCell::new(session));
936        session_entry.insert(session);
937
938        Ok(())
939    }
940}
941
942impl L7Proxy for HttpProxy {
943    fn kind(&self) -> ListenerType {
944        ListenerType::Http
945    }
946
947    fn register_socket(
948        &self,
949        source: &mut TcpStream,
950        token: Token,
951        interest: Interest,
952    ) -> Result<(), std::io::Error> {
953        self.registry.register(source, token, interest)
954    }
955
956    fn deregister_socket(&self, tcp_stream: &mut TcpStream) -> Result<(), std::io::Error> {
957        self.registry.deregister(tcp_stream)
958    }
959
960    fn add_session(&self, session: Rc<RefCell<dyn ProxySession>>) -> Token {
961        let mut session_manager = self.sessions.borrow_mut();
962        let entry = session_manager.slab.vacant_entry();
963        let token = Token(entry.key());
964        let _entry = entry.insert(session);
965        token
966    }
967
968    fn remove_session(&self, token: Token) -> bool {
969        self.sessions
970            .borrow_mut()
971            .slab
972            .try_remove(token.0)
973            .is_some()
974    }
975
976    fn backends(&self) -> Rc<RefCell<BackendMap>> {
977        self.backends.clone()
978    }
979
980    fn clusters(&self) -> &HashMap<ClusterId, Cluster> {
981        &self.clusters
982    }
983}
984
985pub mod testing {
986    use crate::testing::*;
987
988    /// this function is not used, but is available for example and testing purposes
989    pub fn start_http_worker(
990        config: HttpListenerConfig,
991        channel: ProxyChannel,
992        max_buffers: usize,
993        buffer_size: usize,
994    ) -> anyhow::Result<()> {
995        let address = config.address.into();
996
997        let ServerParts {
998            event_loop,
999            registry,
1000            sessions,
1001            pool,
1002            backends,
1003            client_scm_socket: _,
1004            server_scm_socket,
1005            server_config,
1006        } = prebuild_server(max_buffers, buffer_size, true)?;
1007
1008        let token = {
1009            let mut sessions = sessions.borrow_mut();
1010            let entry = sessions.slab.vacant_entry();
1011            let key = entry.key();
1012            let _ = entry.insert(Rc::new(RefCell::new(ListenSession {
1013                protocol: Protocol::HTTPListen,
1014            })));
1015            Token(key)
1016        };
1017
1018        let mut proxy = HttpProxy::new(registry, sessions.clone(), pool.clone(), backends.clone());
1019        proxy
1020            .add_listener(config, token)
1021            .with_context(|| "Failed at creating adding the listener")?;
1022        proxy
1023            .activate_listener(&address, None)
1024            .with_context(|| "Failed at creating activating the listener")?;
1025
1026        let mut server = Server::new(
1027            event_loop,
1028            channel,
1029            server_scm_socket,
1030            sessions,
1031            pool,
1032            backends,
1033            Some(proxy),
1034            None,
1035            None,
1036            server_config,
1037            None,
1038            false,
1039        )
1040        .with_context(|| "Failed at creating server")?;
1041
1042        debug!("starting event loop");
1043        server.run();
1044        debug!("ending event loop");
1045        Ok(())
1046    }
1047}
1048
1049#[cfg(test)]
1050mod tests {
1051    extern crate tiny_http;
1052
1053    use super::testing::start_http_worker;
1054    use super::*;
1055    use sozu_command::proto::command::{CustomHttpAnswers, SocketAddress};
1056
1057    use crate::sozu_command::{
1058        channel::Channel,
1059        config::ListenerBuilder,
1060        proto::command::{LoadBalancingParams, PathRule, RulePosition, WorkerRequest},
1061        response::{Backend, HttpFrontend},
1062    };
1063
1064    use std::{
1065        io::{Read, Write},
1066        net::TcpStream,
1067        str,
1068        sync::{Arc, Barrier},
1069        thread,
1070        time::Duration,
1071    };
1072
1073    /*
1074    #[test]
1075    #[cfg(target_pointer_width = "64")]
1076    fn size_test() {
1077      assert_size!(ExpectProxyProtocol<mio::net::TcpStream>, 520);
1078      assert_size!(Http<mio::net::TcpStream>, 1232);
1079      assert_size!(Pipe<mio::net::TcpStream>, 272);
1080      assert_size!(State, 1240);
1081      // fails depending on the platform?
1082      assert_size!(Session, 1592);
1083    }
1084    */
1085
1086    #[test]
1087    fn round_trip() {
1088        setup_test_logger!();
1089        let barrier = Arc::new(Barrier::new(2));
1090        start_server(1025, barrier.clone());
1091        barrier.wait();
1092
1093        let config = ListenerBuilder::new_http(SocketAddress::new_v4(127, 0, 0, 1, 1024))
1094            .to_http(None)
1095            .expect("could not create listener config");
1096
1097        let (mut command, channel) =
1098            Channel::generate(1000, 10000).expect("should create a channel");
1099        let _jg = thread::spawn(move || {
1100            setup_test_logger!();
1101            start_http_worker(config, channel, 10, 16384).expect("could not start the http server");
1102        });
1103
1104        let front = RequestHttpFrontend {
1105            cluster_id: Some(String::from("cluster_1")),
1106            address: SocketAddress::new_v4(127, 0, 0, 1, 1024),
1107            hostname: String::from("localhost"),
1108            path: PathRule::prefix(String::from("/")),
1109            ..Default::default()
1110        };
1111        command
1112            .write_message(&WorkerRequest {
1113                id: String::from("ID_ABCD"),
1114                content: RequestType::AddHttpFrontend(front).into(),
1115            })
1116            .unwrap();
1117        let backend = Backend {
1118            cluster_id: String::from("cluster_1"),
1119            backend_id: String::from("cluster_1-0"),
1120            address: SocketAddress::new_v4(127, 0, 0, 1, 1025).into(),
1121            load_balancing_parameters: Some(LoadBalancingParams::default()),
1122            sticky_id: None,
1123            backup: None,
1124        };
1125        command
1126            .write_message(&WorkerRequest {
1127                id: String::from("ID_EFGH"),
1128                content: RequestType::AddBackend(backend.to_add_backend()).into(),
1129            })
1130            .unwrap();
1131
1132        println!("test received: {:?}", command.read_message());
1133        println!("test received: {:?}", command.read_message());
1134
1135        let mut client = TcpStream::connect(("127.0.0.1", 1024)).expect("could not connect");
1136
1137        // 5 seconds of timeout
1138        client.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
1139        let w = client
1140            .write(&b"GET / HTTP/1.1\r\nHost: localhost:1024\r\nConnection: Close\r\n\r\n"[..]);
1141        println!("http client write: {w:?}");
1142
1143        barrier.wait();
1144        let mut buffer = [0; 4096];
1145        let mut index = 0;
1146
1147        loop {
1148            assert!(index <= 191);
1149            if index == 191 {
1150                break;
1151            }
1152
1153            let r = client.read(&mut buffer[index..]);
1154            println!("http client read: {r:?}");
1155            match r {
1156                Err(e) => assert!(false, "client request should not fail. Error: {e:?}"),
1157                Ok(sz) => {
1158                    index += sz;
1159                }
1160            }
1161        }
1162        println!(
1163            "Response: {}",
1164            str::from_utf8(&buffer[..index]).expect("could not make string from buffer")
1165        );
1166    }
1167
1168    #[test]
1169    fn keep_alive() {
1170        setup_test_logger!();
1171        let barrier = Arc::new(Barrier::new(2));
1172        start_server(1028, barrier.clone());
1173        barrier.wait();
1174
1175        let config = ListenerBuilder::new_http(SocketAddress::new_v4(127, 0, 0, 1, 1031))
1176            .to_http(None)
1177            .expect("could not create listener config");
1178
1179        let (mut command, channel) =
1180            Channel::generate(1000, 10000).expect("should create a channel");
1181
1182        let _jg = thread::spawn(move || {
1183            setup_test_logger!();
1184            start_http_worker(config, channel, 10, 16384).expect("could not start the http server");
1185        });
1186
1187        let front = RequestHttpFrontend {
1188            address: SocketAddress::new_v4(127, 0, 0, 1, 1031),
1189            hostname: String::from("localhost"),
1190            path: PathRule::prefix(String::from("/")),
1191            cluster_id: Some(String::from("cluster_1")),
1192            ..Default::default()
1193        };
1194        command
1195            .write_message(&WorkerRequest {
1196                id: String::from("ID_ABCD"),
1197                content: RequestType::AddHttpFrontend(front).into(),
1198            })
1199            .unwrap();
1200        let backend = Backend {
1201            address: SocketAddress::new_v4(127, 0, 0, 1, 1028).into(),
1202            backend_id: String::from("cluster_1-0"),
1203            backup: None,
1204            cluster_id: String::from("cluster_1"),
1205            load_balancing_parameters: Some(LoadBalancingParams::default()),
1206            sticky_id: None,
1207        };
1208        command
1209            .write_message(&WorkerRequest {
1210                id: String::from("ID_EFGH"),
1211                content: RequestType::AddBackend(backend.to_add_backend()).into(),
1212            })
1213            .unwrap();
1214
1215        println!("test received: {:?}", command.read_message());
1216        println!("test received: {:?}", command.read_message());
1217
1218        let mut client = TcpStream::connect(("127.0.0.1", 1031)).expect("could not connect");
1219        // 5 seconds of timeout
1220        client.set_read_timeout(Some(Duration::new(5, 0))).unwrap();
1221
1222        let w = client
1223            .write(&b"GET / HTTP/1.1\r\nHost: localhost:1031\r\n\r\n"[..])
1224            .unwrap();
1225        println!("http client write: {w:?}");
1226        barrier.wait();
1227
1228        let mut buffer = [0; 4096];
1229        let mut index = 0;
1230
1231        loop {
1232            assert!(index <= 191);
1233            if index == 191 {
1234                break;
1235            }
1236
1237            let r = client.read(&mut buffer[index..]);
1238            println!("http client read: {r:?}");
1239            match r {
1240                Err(e) => assert!(false, "client request should not fail. Error: {e:?}"),
1241                Ok(sz) => {
1242                    index += sz;
1243                }
1244            }
1245        }
1246
1247        println!(
1248            "Response: {}",
1249            str::from_utf8(&buffer[..index]).expect("could not make string from buffer")
1250        );
1251
1252        println!("first request ended, will send second one");
1253        let w2 = client.write(&b"GET / HTTP/1.1\r\nHost: localhost:1031\r\n\r\n"[..]);
1254        println!("http client write: {w2:?}");
1255        barrier.wait();
1256
1257        let mut buffer2 = [0; 4096];
1258        let mut index = 0;
1259
1260        loop {
1261            assert!(index <= 191);
1262            if index == 191 {
1263                break;
1264            }
1265
1266            let r2 = client.read(&mut buffer2[index..]);
1267            println!("http client read: {r2:?}");
1268            match r2 {
1269                Err(e) => assert!(false, "client request should not fail. Error: {e:?}"),
1270                Ok(sz) => {
1271                    index += sz;
1272                }
1273            }
1274        }
1275        println!(
1276            "Response: {}",
1277            str::from_utf8(&buffer2[..index]).expect("could not make string from buffer")
1278        );
1279    }
1280
1281    use self::tiny_http::{Response, Server};
1282
1283    fn start_server(port: u16, barrier: Arc<Barrier>) {
1284        thread::spawn(move || {
1285            setup_test_logger!();
1286            let server =
1287                Server::http(&format!("127.0.0.1:{port}")).expect("could not create server");
1288            info!("starting web server in port {}", port);
1289            barrier.wait();
1290
1291            for request in server.incoming_requests() {
1292                info!(
1293                    "backend web server got request -> method: {:?}, url: {:?}, headers: {:?}",
1294                    request.method(),
1295                    request.url(),
1296                    request.headers()
1297                );
1298
1299                let response = Response::from_string("hello world");
1300                request.respond(response).unwrap();
1301                info!("backend web server sent response");
1302                barrier.wait();
1303                info!("server session stopped");
1304            }
1305
1306            println!("server on port {port} closed");
1307        });
1308    }
1309
1310    #[test]
1311    fn frontend_from_request_test() {
1312        let cluster_id1 = "cluster_1".to_owned();
1313        let cluster_id2 = "cluster_2".to_owned();
1314        let cluster_id3 = "cluster_3".to_owned();
1315        let uri1 = "/".to_owned();
1316        let uri2 = "/yolo".to_owned();
1317        let uri3 = "/yolo/swag".to_owned();
1318
1319        let mut fronts = Router::new();
1320        fronts
1321            .add_http_front(&HttpFrontend {
1322                address: "0.0.0.0:80".parse().unwrap(),
1323                hostname: "lolcatho.st".to_owned(),
1324                method: None,
1325                path: PathRule::prefix(uri1),
1326                position: RulePosition::Tree,
1327                cluster_id: Some(cluster_id1),
1328                tags: None,
1329            })
1330            .expect("Could not add http frontend");
1331        fronts
1332            .add_http_front(&HttpFrontend {
1333                address: "0.0.0.0:80".parse().unwrap(),
1334                hostname: "lolcatho.st".to_owned(),
1335                method: None,
1336                path: PathRule::prefix(uri2),
1337                position: RulePosition::Tree,
1338                cluster_id: Some(cluster_id2),
1339                tags: None,
1340            })
1341            .expect("Could not add http frontend");
1342        fronts
1343            .add_http_front(&HttpFrontend {
1344                address: "0.0.0.0:80".parse().unwrap(),
1345                hostname: "lolcatho.st".to_owned(),
1346                method: None,
1347                path: PathRule::prefix(uri3),
1348                position: RulePosition::Tree,
1349                cluster_id: Some(cluster_id3),
1350                tags: None,
1351            })
1352            .expect("Could not add http frontend");
1353        fronts
1354            .add_http_front(&HttpFrontend {
1355                address: "0.0.0.0:80".parse().unwrap(),
1356                hostname: "other.domain".to_owned(),
1357                method: None,
1358                path: PathRule::prefix("/test".to_owned()),
1359                position: RulePosition::Tree,
1360                cluster_id: Some("cluster_1".to_owned()),
1361                tags: None,
1362            })
1363            .expect("Could not add http frontend");
1364
1365        let address = SocketAddress::new_v4(127, 0, 0, 1, 1030);
1366
1367        let default_config = ListenerBuilder::new_http(address.clone())
1368            .to_http(None)
1369            .expect("Could not create default HTTP listener config");
1370
1371        let listener = HttpListener {
1372            listener: None,
1373            address: address.into(),
1374            fronts,
1375            answers: Rc::new(RefCell::new(
1376                HttpAnswers::new(&Some(CustomHttpAnswers::default())).unwrap(),
1377            )),
1378            config: default_config,
1379            token: Token(0),
1380            active: true,
1381            tags: BTreeMap::new(),
1382        };
1383
1384        let frontend1 = listener.frontend_from_request("lolcatho.st", "/", &Method::Get);
1385        let frontend2 = listener.frontend_from_request("lolcatho.st", "/test", &Method::Get);
1386        let frontend3 = listener.frontend_from_request("lolcatho.st", "/yolo/test", &Method::Get);
1387        let frontend4 = listener.frontend_from_request("lolcatho.st", "/yolo/swag", &Method::Get);
1388        let frontend5 = listener.frontend_from_request("domain", "/", &Method::Get);
1389        assert_eq!(
1390            frontend1.expect("should find frontend"),
1391            Route::ClusterId("cluster_1".to_string())
1392        );
1393        assert_eq!(
1394            frontend2.expect("should find frontend"),
1395            Route::ClusterId("cluster_1".to_string())
1396        );
1397        assert_eq!(
1398            frontend3.expect("should find frontend"),
1399            Route::ClusterId("cluster_2".to_string())
1400        );
1401        assert_eq!(
1402            frontend4.expect("should find frontend"),
1403            Route::ClusterId("cluster_3".to_string())
1404        );
1405        assert!(frontend5.is_err());
1406    }
1407}