1use std::{
2 cell::RefCell,
3 collections::{BTreeMap, HashMap, hash_map::Entry},
4 io::ErrorKind,
5 net::{Shutdown, SocketAddr},
6 os::unix::io::AsRawFd,
7 rc::{Rc, Weak},
8 str::from_utf8_unchecked,
9 time::{Duration, Instant},
10};
11
12use mio::{
13 Interest, Registry, Token,
14 net::{TcpListener as MioTcpListener, TcpStream},
15 unix::SourceFd,
16};
17use rusty_ulid::Ulid;
18use sozu_command::{
19 logging::CachedTags,
20 proto::command::{
21 Cluster, HttpListenerConfig, ListenerType, RemoveListener, RequestHttpFrontend,
22 WorkerRequest, WorkerResponse, request::RequestType,
23 },
24 ready::Ready,
25 response::HttpFrontend,
26 state::ClusterId,
27};
28
29use crate::{
30 AcceptError, FrontendFromRequestError, L7ListenerHandler, L7Proxy, ListenerError,
31 ListenerHandler, Protocol, ProxyConfiguration, ProxyError, ProxySession, SessionIsToBeClosed,
32 SessionMetrics, SessionResult, StateMachineBuilder, StateResult,
33 backends::BackendMap,
34 pool::Pool,
35 protocol::{
36 Http, Pipe, SessionState,
37 http::{
38 ResponseStream,
39 answers::HttpAnswers,
40 parser::{Method, hostname_and_port},
41 },
42 proxy_protocol::expect::ExpectProxyProtocol,
43 },
44 router::{Route, Router},
45 server::{ListenToken, SessionManager},
46 socket::server_bind,
47 timer::TimeoutContainer,
48};
49
50#[derive(PartialEq, Eq)]
51pub enum SessionStatus {
52 Normal,
53 DefaultAnswer,
54}
55
56StateMachineBuilder! {
57 enum HttpStateMachine impl SessionState {
63 Expect(ExpectProxyProtocol<TcpStream>),
64 Http(Http<TcpStream, HttpListener>),
65 WebSocket(Pipe<TcpStream, HttpListener>),
66 }
67}
68
69pub struct HttpSession {
73 answers: Rc<RefCell<HttpAnswers>>,
74 configured_backend_timeout: Duration,
75 configured_connect_timeout: Duration,
76 configured_frontend_timeout: Duration,
77 frontend_token: Token,
78 last_event: Instant,
79 listener: Rc<RefCell<HttpListener>>,
80 metrics: SessionMetrics,
81 pool: Weak<RefCell<Pool>>,
82 proxy: Rc<RefCell<HttpProxy>>,
83 state: HttpStateMachine,
84 sticky_name: String,
85 has_been_closed: bool,
86}
87
88impl HttpSession {
89 #[allow(clippy::too_many_arguments)]
90 pub fn new(
91 answers: Rc<RefCell<HttpAnswers>>,
92 configured_backend_timeout: Duration,
93 configured_connect_timeout: Duration,
94 configured_frontend_timeout: Duration,
95 configured_request_timeout: Duration,
96 expect_proxy: bool,
97 listener: Rc<RefCell<HttpListener>>,
98 pool: Weak<RefCell<Pool>>,
99 proxy: Rc<RefCell<HttpProxy>>,
100 public_address: SocketAddr,
101 sock: TcpStream,
102 sticky_name: String,
103 token: Token,
104 wait_time: Duration,
105 ) -> Result<Self, AcceptError> {
106 let request_id = Ulid::generate();
107 let container_frontend_timeout = TimeoutContainer::new(configured_request_timeout, token);
108
109 let state = if expect_proxy {
110 trace!("starting in expect proxy state");
111 gauge_add!("protocol.proxy.expect", 1);
112
113 HttpStateMachine::Expect(ExpectProxyProtocol::new(
114 container_frontend_timeout,
115 sock,
116 token,
117 request_id,
118 ))
119 } else {
120 gauge_add!("protocol.http", 1);
121 let session_address = sock.peer_addr().ok();
122
123 HttpStateMachine::Http(Http::new(
124 answers.clone(),
125 configured_backend_timeout,
126 configured_connect_timeout,
127 configured_frontend_timeout,
128 container_frontend_timeout,
129 sock,
130 token,
131 listener.clone(),
132 pool.clone(),
133 Protocol::HTTP,
134 public_address,
135 request_id,
136 session_address,
137 sticky_name.clone(),
138 )?)
139 };
140
141 let metrics = SessionMetrics::new(Some(wait_time));
142 Ok(HttpSession {
143 answers,
144 configured_backend_timeout,
145 configured_connect_timeout,
146 configured_frontend_timeout,
147 frontend_token: token,
148 has_been_closed: false,
149 last_event: Instant::now(),
150 listener,
151 metrics,
152 pool,
153 proxy,
154 state,
155 sticky_name,
156 })
157 }
158
159 pub fn upgrade(&mut self) -> SessionIsToBeClosed {
160 debug!("HTTP::upgrade");
161 let new_state = match self.state.take() {
162 HttpStateMachine::Http(http) => self.upgrade_http(http),
163 HttpStateMachine::Expect(expect) => self.upgrade_expect(expect),
164 HttpStateMachine::WebSocket(ws) => self.upgrade_websocket(ws),
165 HttpStateMachine::FailedUpgrade(_) => unreachable!(),
166 };
167
168 match new_state {
169 Some(state) => {
170 self.state = state;
171 false
172 }
173 None => true,
175 }
176 }
177
178 fn upgrade_expect(
179 &mut self,
180 expect: ExpectProxyProtocol<TcpStream>,
181 ) -> Option<HttpStateMachine> {
182 debug!("switching to HTTP");
183 match expect
184 .addresses
185 .as_ref()
186 .map(|add| (add.destination(), add.source()))
187 {
188 Some((Some(public_address), Some(session_address))) => {
189 let mut http = Http::new(
190 self.answers.clone(),
191 self.configured_backend_timeout,
192 self.configured_connect_timeout,
193 self.configured_frontend_timeout,
194 expect.container_frontend_timeout,
195 expect.frontend,
196 expect.frontend_token,
197 self.listener.clone(),
198 self.pool.clone(),
199 Protocol::HTTP,
200 public_address,
201 expect.request_id,
202 Some(session_address),
203 self.sticky_name.clone(),
204 )
205 .ok()?;
206 http.frontend_readiness.event = expect.frontend_readiness.event;
207
208 gauge_add!("protocol.proxy.expect", -1);
209 gauge_add!("protocol.http", 1);
210 Some(HttpStateMachine::Http(http))
211 }
212 _ => None,
213 }
214 }
215
216 fn upgrade_http(&mut self, http: Http<TcpStream, HttpListener>) -> Option<HttpStateMachine> {
217 debug!("http switching to ws");
218 let front_token = self.frontend_token;
219 let back_token = match http.backend_token {
220 Some(back_token) => back_token,
221 None => {
222 warn!(
223 "Could not upgrade http request on cluster '{:?}' ({:?}) using backend '{:?}' into websocket for request '{}'",
224 http.context.cluster_id,
225 self.frontend_token,
226 http.context.backend_id,
227 http.context.id
228 );
229 return None;
230 }
231 };
232
233 let ws_context = http.websocket_context();
234 let mut container_frontend_timeout = http.container_frontend_timeout;
235 let mut container_backend_timeout = http.container_backend_timeout;
236 container_frontend_timeout.reset();
237 container_backend_timeout.reset();
238
239 let backend_buffer = if let ResponseStream::BackendAnswer(kawa) = http.response_stream {
240 kawa.storage.buffer
241 } else {
242 return None;
243 };
244
245 let mut pipe = Pipe::new(
246 backend_buffer,
247 http.context.backend_id,
248 http.backend_socket,
249 http.backend,
250 Some(container_backend_timeout),
251 Some(container_frontend_timeout),
252 http.context.cluster_id,
253 http.request_stream.storage.buffer,
254 front_token,
255 http.frontend_socket,
256 self.listener.clone(),
257 Protocol::HTTP,
258 http.context.id,
259 http.context.session_address,
260 ws_context,
261 );
262
263 pipe.frontend_readiness.event = http.frontend_readiness.event;
264 pipe.backend_readiness.event = http.backend_readiness.event;
265 pipe.set_back_token(back_token);
266
267 gauge_add!("protocol.http", -1);
268 gauge_add!("protocol.ws", 1);
269 gauge_add!("http.active_requests", -1);
270 gauge_add!("websocket.active_requests", 1);
271 Some(HttpStateMachine::WebSocket(pipe))
272 }
273
274 fn upgrade_websocket(&self, ws: Pipe<TcpStream, HttpListener>) -> Option<HttpStateMachine> {
275 error!("Upgrade called on WS, this should not happen");
277 Some(HttpStateMachine::WebSocket(ws))
278 }
279}
280
281impl ProxySession for HttpSession {
282 fn close(&mut self) {
283 if self.has_been_closed {
284 return;
285 }
286
287 trace!("Closing HTTP session");
288 self.metrics.service_stop();
289
290 match self.state.marker() {
292 StateMarker::Expect => gauge_add!("protocol.proxy.expect", -1),
293 StateMarker::Http => gauge_add!("protocol.http", -1),
294 StateMarker::WebSocket => {
295 gauge_add!("protocol.ws", -1);
296 gauge_add!("websocket.active_requests", -1);
297 }
298 }
299
300 if self.state.failed() {
301 match self.state.marker() {
302 StateMarker::Expect => incr!("http.upgrade.expect.failed"),
303 StateMarker::Http => incr!("http.upgrade.http.failed"),
304 StateMarker::WebSocket => incr!("http.upgrade.ws.failed"),
305 }
306 return;
307 }
308
309 self.state.cancel_timeouts();
310 self.state.close(self.proxy.clone(), &mut self.metrics);
312
313 let front_socket = self.state.front_socket();
314 if let Err(e) = front_socket.shutdown(Shutdown::Both) {
315 if e.kind() != ErrorKind::NotConnected {
317 error!(
318 "error shutting down front socket({:?}): {:?}",
319 front_socket, e
320 )
321 }
322 }
323
324 let proxy = self.proxy.borrow();
326 let fd = front_socket.as_raw_fd();
327 if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
328 error!(
329 "error deregistering front socket({:?}) while closing HTTP session: {:?}",
330 fd, e
331 );
332 }
333 proxy.remove_session(self.frontend_token);
334
335 self.has_been_closed = true;
336 }
337
338 fn timeout(&mut self, token: Token) -> SessionIsToBeClosed {
339 let state_result = self.state.timeout(token, &mut self.metrics);
340 state_result == StateResult::CloseSession
341 }
342
343 fn protocol(&self) -> Protocol {
344 Protocol::HTTP
345 }
346
347 fn update_readiness(&mut self, token: Token, events: Ready) {
348 trace!(
349 "token {:?} got event {}",
350 token,
351 super::ready_to_string(events)
352 );
353 self.last_event = Instant::now();
354 self.metrics.wait_start();
355 self.state.update_readiness(token, events);
356 }
357
358 fn ready(&mut self, session: Rc<RefCell<dyn ProxySession>>) -> SessionIsToBeClosed {
359 self.metrics.service_start();
360
361 let session_result =
362 self.state
363 .ready(session.clone(), self.proxy.clone(), &mut self.metrics);
364
365 let to_be_closed = match session_result {
366 SessionResult::Close => true,
367 SessionResult::Continue => false,
368 SessionResult::Upgrade => match self.upgrade() {
369 false => self.ready(session),
370 true => true,
371 },
372 };
373
374 self.metrics.service_stop();
375 to_be_closed
376 }
377
378 fn shutting_down(&mut self) -> SessionIsToBeClosed {
379 self.state.shutting_down()
380 }
381
382 fn last_event(&self) -> Instant {
383 self.last_event
384 }
385
386 fn print_session(&self) {
387 self.state.print_state("HTTP");
388 error!("Metrics: {:?}", self.metrics);
389 }
390
391 fn frontend_token(&self) -> Token {
392 self.frontend_token
393 }
394}
395
396pub type Hostname = String;
397
398pub struct HttpListener {
399 active: bool,
400 address: SocketAddr,
401 answers: Rc<RefCell<HttpAnswers>>,
402 config: HttpListenerConfig,
403 fronts: Router,
404 listener: Option<MioTcpListener>,
405 tags: BTreeMap<String, CachedTags>,
406 token: Token,
407}
408
409impl ListenerHandler for HttpListener {
410 fn get_addr(&self) -> &SocketAddr {
411 &self.address
412 }
413
414 fn get_tags(&self, key: &str) -> Option<&CachedTags> {
415 self.tags.get(key)
416 }
417
418 fn set_tags(&mut self, key: String, tags: Option<BTreeMap<String, String>>) {
419 match tags {
420 Some(tags) => self.tags.insert(key, CachedTags::new(tags)),
421 None => self.tags.remove(&key),
422 };
423 }
424}
425
426impl L7ListenerHandler for HttpListener {
427 fn get_sticky_name(&self) -> &str {
428 &self.config.sticky_name
429 }
430
431 fn get_connect_timeout(&self) -> u32 {
432 self.config.connect_timeout
433 }
434
435 fn frontend_from_request(
437 &self,
438 host: &str,
439 uri: &str,
440 method: &Method,
441 ) -> Result<Route, FrontendFromRequestError> {
442 let start = Instant::now();
443 let (remaining_input, (hostname, _)) = match hostname_and_port(host.as_bytes()) {
444 Ok(tuple) => tuple,
445 Err(parse_error) => {
446 return Err(FrontendFromRequestError::HostParse {
448 host: host.to_owned(),
449 error: parse_error.to_string(),
450 });
451 }
452 };
453 if remaining_input != &b""[..] {
454 return Err(FrontendFromRequestError::InvalidCharsAfterHost(
455 host.to_owned(),
456 ));
457 }
458
459 let host = unsafe { from_utf8_unchecked(hostname) };
469
470 let route = self.fronts.lookup(host, uri, method).map_err(|e| {
471 incr!("http.failed_backend_matching");
472 FrontendFromRequestError::NoClusterFound(e)
473 })?;
474
475 let now = Instant::now();
476
477 if let Route::ClusterId(cluster) = &route {
478 time!("frontend_matching_time", cluster, (now - start).as_millis());
479 }
480
481 Ok(route)
482 }
483}
484
485pub struct HttpProxy {
486 backends: Rc<RefCell<BackendMap>>,
487 clusters: HashMap<ClusterId, Cluster>,
488 listeners: HashMap<Token, Rc<RefCell<HttpListener>>>,
489 pool: Rc<RefCell<Pool>>,
490 registry: Registry,
491 sessions: Rc<RefCell<SessionManager>>,
492}
493
494impl HttpProxy {
495 pub fn new(
496 registry: Registry,
497 sessions: Rc<RefCell<SessionManager>>,
498 pool: Rc<RefCell<Pool>>,
499 backends: Rc<RefCell<BackendMap>>,
500 ) -> HttpProxy {
501 HttpProxy {
502 backends,
503 clusters: HashMap::new(),
504 listeners: HashMap::new(),
505 pool,
506 registry,
507 sessions,
508 }
509 }
510
511 pub fn add_listener(
512 &mut self,
513 config: HttpListenerConfig,
514 token: Token,
515 ) -> Result<Token, ProxyError> {
516 match self.listeners.entry(token) {
517 Entry::Vacant(entry) => {
518 let http_listener =
519 HttpListener::new(config, token).map_err(ProxyError::AddListener)?;
520 entry.insert(Rc::new(RefCell::new(http_listener)));
521 Ok(token)
522 }
523 _ => Err(ProxyError::ListenerAlreadyPresent),
524 }
525 }
526
527 pub fn get_listener(&self, token: &Token) -> Option<Rc<RefCell<HttpListener>>> {
528 self.listeners.get(token).cloned()
529 }
530
531 pub fn remove_listener(&mut self, remove: RemoveListener) -> Result<(), ProxyError> {
532 let len = self.listeners.len();
533 let remove_address = remove.address.into();
534 self.listeners
535 .retain(|_, l| l.borrow().address != remove_address);
536
537 if !self.listeners.len() < len {
538 info!("no HTTP listener to remove at address {:?}", remove_address);
539 }
540 Ok(())
541 }
542
543 pub fn activate_listener(
544 &self,
545 addr: &SocketAddr,
546 tcp_listener: Option<MioTcpListener>,
547 ) -> Result<Token, ProxyError> {
548 let listener = self
549 .listeners
550 .values()
551 .find(|listener| listener.borrow().address == *addr)
552 .ok_or(ProxyError::NoListenerFound(addr.to_owned()))?;
553
554 listener
555 .borrow_mut()
556 .activate(&self.registry, tcp_listener)
557 .map_err(|listener_error| ProxyError::ListenerActivation {
558 address: *addr,
559 listener_error,
560 })
561 }
562
563 pub fn give_back_listeners(&mut self) -> Vec<(SocketAddr, MioTcpListener)> {
564 self.listeners
565 .iter()
566 .filter_map(|(_, listener)| {
567 let mut owned = listener.borrow_mut();
568 if let Some(listener) = owned.listener.take() {
569 return Some((owned.address, listener));
570 }
571
572 None
573 })
574 .collect()
575 }
576
577 pub fn give_back_listener(
578 &mut self,
579 address: SocketAddr,
580 ) -> Result<(Token, MioTcpListener), ProxyError> {
581 let listener = self
582 .listeners
583 .values()
584 .find(|listener| listener.borrow().address == address)
585 .ok_or(ProxyError::NoListenerFound(address))?;
586
587 let mut owned = listener.borrow_mut();
588
589 let taken_listener = owned
590 .listener
591 .take()
592 .ok_or(ProxyError::UnactivatedListener)?;
593
594 Ok((owned.token, taken_listener))
595 }
596
597 pub fn add_cluster(&mut self, mut cluster: Cluster) -> Result<(), ProxyError> {
598 if let Some(answer_503) = cluster.answer_503.take() {
599 for listener in self.listeners.values() {
600 listener
601 .borrow()
602 .answers
603 .borrow_mut()
604 .add_custom_answer(&cluster.cluster_id, answer_503.clone())
605 .map_err(|(status, error)| {
606 ProxyError::AddCluster(ListenerError::TemplateParse(status, error))
607 })?;
608 }
609 }
610 self.clusters.insert(cluster.cluster_id.clone(), cluster);
611 Ok(())
612 }
613
614 pub fn remove_cluster(&mut self, cluster_id: &str) -> Result<(), ProxyError> {
615 self.clusters.remove(cluster_id);
616
617 for listener in self.listeners.values() {
618 listener
619 .borrow()
620 .answers
621 .borrow_mut()
622 .remove_custom_answer(cluster_id);
623 }
624 Ok(())
625 }
626
627 pub fn add_http_frontend(&mut self, front: RequestHttpFrontend) -> Result<(), ProxyError> {
628 let front = front.clone().to_frontend().map_err(|request_error| {
629 ProxyError::WrongInputFrontend {
630 front,
631 error: request_error.to_string(),
632 }
633 })?;
634
635 let mut listener = self
636 .listeners
637 .values()
638 .find(|l| l.borrow().address == front.address)
639 .ok_or(ProxyError::NoListenerFound(front.address))?
640 .borrow_mut();
641
642 let hostname = front.hostname.to_owned();
643 let tags = front.tags.to_owned();
644
645 listener
646 .add_http_front(front)
647 .map_err(ProxyError::AddFrontend)?;
648 listener.set_tags(hostname, tags);
649 Ok(())
650 }
651
652 pub fn remove_http_frontend(&mut self, front: RequestHttpFrontend) -> Result<(), ProxyError> {
653 let front = front.clone().to_frontend().map_err(|request_error| {
654 ProxyError::WrongInputFrontend {
655 front,
656 error: request_error.to_string(),
657 }
658 })?;
659
660 let mut listener = self
661 .listeners
662 .values()
663 .find(|l| l.borrow().address == front.address)
664 .ok_or(ProxyError::NoListenerFound(front.address))?
665 .borrow_mut();
666
667 let hostname = front.hostname.to_owned();
668
669 listener
670 .remove_http_front(front)
671 .map_err(ProxyError::RemoveFrontend)?;
672
673 listener.set_tags(hostname, None);
674 Ok(())
675 }
676
677 pub fn soft_stop(&mut self) -> Result<(), ProxyError> {
678 let listeners: HashMap<_, _> = self.listeners.drain().collect();
679 let mut socket_errors = vec![];
680 for (_, l) in listeners.iter() {
681 if let Some(mut sock) = l.borrow_mut().listener.take() {
682 debug!("Deregistering socket {:?}", sock);
683 if let Err(e) = self.registry.deregister(&mut sock) {
684 let error = format!("socket {sock:?}: {e:?}");
685 socket_errors.push(error);
686 }
687 }
688 }
689
690 if !socket_errors.is_empty() {
691 return Err(ProxyError::SoftStop {
692 proxy_protocol: "HTTP".to_string(),
693 error: format!("Error deregistering listen sockets: {:?}", socket_errors),
694 });
695 }
696
697 Ok(())
698 }
699
700 pub fn hard_stop(&mut self) -> Result<(), ProxyError> {
701 let mut listeners: HashMap<_, _> = self.listeners.drain().collect();
702 let mut socket_errors = vec![];
703 for (_, l) in listeners.drain() {
704 if let Some(mut sock) = l.borrow_mut().listener.take() {
705 debug!("Deregistering socket {:?}", sock);
706 if let Err(e) = self.registry.deregister(&mut sock) {
707 let error = format!("socket {sock:?}: {e:?}");
708 socket_errors.push(error);
709 }
710 }
711 }
712
713 if !socket_errors.is_empty() {
714 return Err(ProxyError::HardStop {
715 proxy_protocol: "HTTP".to_string(),
716 error: format!("Error deregistering listen sockets: {:?}", socket_errors),
717 });
718 }
719
720 Ok(())
721 }
722}
723
724impl HttpListener {
725 pub fn new(config: HttpListenerConfig, token: Token) -> Result<HttpListener, ListenerError> {
726 Ok(HttpListener {
727 active: false,
728 address: config.address.into(),
729 answers: Rc::new(RefCell::new(
730 HttpAnswers::new(&config.http_answers)
731 .map_err(|(status, error)| ListenerError::TemplateParse(status, error))?,
732 )),
733 config,
734 fronts: Router::new(),
735 listener: None,
736 tags: BTreeMap::new(),
737 token,
738 })
739 }
740
741 pub fn activate(
742 &mut self,
743 registry: &Registry,
744 tcp_listener: Option<MioTcpListener>,
745 ) -> Result<Token, ListenerError> {
746 if self.active {
747 return Ok(self.token);
748 }
749 let address: SocketAddr = self.config.address.into();
750
751 let mut listener = match tcp_listener {
752 Some(tcp_listener) => tcp_listener,
753 None => {
754 server_bind(address).map_err(|server_bind_error| ListenerError::Activation {
755 address,
756 error: server_bind_error.to_string(),
757 })?
758 }
759 };
760
761 registry
762 .register(&mut listener, self.token, Interest::READABLE)
763 .map_err(ListenerError::SocketRegistration)?;
764
765 self.listener = Some(listener);
766 self.active = true;
767 Ok(self.token)
768 }
769
770 pub fn add_http_front(&mut self, http_front: HttpFrontend) -> Result<(), ListenerError> {
771 self.fronts
772 .add_http_front(&http_front)
773 .map_err(ListenerError::AddFrontend)
774 }
775
776 pub fn remove_http_front(&mut self, http_front: HttpFrontend) -> Result<(), ListenerError> {
777 debug!("removing http_front {:?}", http_front);
778 self.fronts
779 .remove_http_front(&http_front)
780 .map_err(ListenerError::RemoveFrontend)
781 }
782
783 fn accept(&mut self) -> Result<TcpStream, AcceptError> {
784 if let Some(ref sock) = self.listener {
785 sock.accept()
786 .map_err(|e| match e.kind() {
787 ErrorKind::WouldBlock => AcceptError::WouldBlock,
788 _ => {
789 error!("accept() IO error: {:?}", e);
790 AcceptError::IoError
791 }
792 })
793 .map(|(sock, _)| sock)
794 } else {
795 error!("cannot accept connections, no listening socket available");
796 Err(AcceptError::IoError)
797 }
798 }
799}
800
801impl ProxyConfiguration for HttpProxy {
802 fn notify(&mut self, request: WorkerRequest) -> WorkerResponse {
803 let request_id = request.id.clone();
804
805 let result = match request.content.request_type {
806 Some(RequestType::AddCluster(cluster)) => {
807 debug!("{} add cluster {:?}", request.id, cluster);
808 self.add_cluster(cluster)
809 }
810 Some(RequestType::RemoveCluster(cluster_id)) => {
811 debug!("{} remove cluster {:?}", request_id, cluster_id);
812 self.remove_cluster(&cluster_id)
813 }
814 Some(RequestType::AddHttpFrontend(front)) => {
815 debug!("{} add front {:?}", request_id, front);
816 self.add_http_frontend(front)
817 }
818 Some(RequestType::RemoveHttpFrontend(front)) => {
819 debug!("{} remove front {:?}", request_id, front);
820 self.remove_http_frontend(front)
821 }
822 Some(RequestType::RemoveListener(remove)) => {
823 debug!("removing HTTP listener at address {:?}", remove.address);
824 self.remove_listener(remove)
825 }
826 Some(RequestType::SoftStop(_)) => {
827 debug!("{} processing soft shutdown", request_id);
828 match self.soft_stop() {
829 Ok(()) => {
830 info!("{} soft stop successful", request_id);
831 return WorkerResponse::processing(request.id);
832 }
833 Err(e) => Err(e),
834 }
835 }
836 Some(RequestType::HardStop(_)) => {
837 debug!("{} processing hard shutdown", request_id);
838 match self.hard_stop() {
839 Ok(()) => {
840 info!("{} hard stop successful", request_id);
841 return WorkerResponse::processing(request.id);
842 }
843 Err(e) => Err(e),
844 }
845 }
846 Some(RequestType::Status(_)) => {
847 debug!("{} status", request_id);
848 Ok(())
849 }
850 other_command => {
851 debug!(
852 "{} unsupported message for HTTP proxy, ignoring: {:?}",
853 request.id, other_command
854 );
855 Err(ProxyError::UnsupportedMessage)
856 }
857 };
858
859 match result {
860 Ok(()) => {
861 debug!("{} successful", request_id);
862 WorkerResponse::ok(request_id)
863 }
864 Err(proxy_error) => {
865 debug!("{} unsuccessful: {}", request_id, proxy_error);
866 WorkerResponse::error(request_id, proxy_error)
867 }
868 }
869 }
870
871 fn accept(&mut self, token: ListenToken) -> Result<TcpStream, AcceptError> {
872 if let Some(listener) = self.listeners.get(&Token(token.0)) {
873 listener.borrow_mut().accept()
874 } else {
875 Err(AcceptError::IoError)
876 }
877 }
878
879 fn create_session(
880 &mut self,
881 mut frontend_sock: TcpStream,
882 listener_token: ListenToken,
883 wait_time: Duration,
884 proxy: Rc<RefCell<Self>>,
885 ) -> Result<(), AcceptError> {
886 let listener = self
887 .listeners
888 .get(&Token(listener_token.0))
889 .cloned()
890 .ok_or(AcceptError::IoError)?;
891
892 if let Err(e) = frontend_sock.set_nodelay(true) {
893 error!(
894 "error setting nodelay on front socket({:?}): {:?}",
895 frontend_sock, e
896 );
897 }
898 let mut session_manager = self.sessions.borrow_mut();
899 let session_entry = session_manager.slab.vacant_entry();
900 let session_token = Token(session_entry.key());
901 let owned = listener.borrow();
902
903 if let Err(register_error) = self.registry.register(
904 &mut frontend_sock,
905 session_token,
906 Interest::READABLE | Interest::WRITABLE,
907 ) {
908 error!(
909 "error registering listen socket({:?}): {:?}",
910 frontend_sock, register_error
911 );
912 return Err(AcceptError::RegisterError);
913 }
914
915 let public_address: SocketAddr = match owned.config.public_address {
916 Some(pub_addr) => pub_addr.into(),
917 None => owned.config.address.into(),
918 };
919
920 let session = HttpSession::new(
921 owned.answers.clone(),
922 Duration::from_secs(owned.config.back_timeout as u64),
923 Duration::from_secs(owned.config.connect_timeout as u64),
924 Duration::from_secs(owned.config.front_timeout as u64),
925 Duration::from_secs(owned.config.request_timeout as u64),
926 owned.config.expect_proxy,
927 listener.clone(),
928 Rc::downgrade(&self.pool),
929 proxy,
930 public_address,
931 frontend_sock,
932 owned.config.sticky_name.clone(),
933 session_token,
934 wait_time,
935 )?;
936
937 let session = Rc::new(RefCell::new(session));
938 session_entry.insert(session);
939
940 Ok(())
941 }
942}
943
944impl L7Proxy for HttpProxy {
945 fn kind(&self) -> ListenerType {
946 ListenerType::Http
947 }
948
949 fn register_socket(
950 &self,
951 source: &mut TcpStream,
952 token: Token,
953 interest: Interest,
954 ) -> Result<(), std::io::Error> {
955 self.registry.register(source, token, interest)
956 }
957
958 fn deregister_socket(&self, tcp_stream: &mut TcpStream) -> Result<(), std::io::Error> {
959 self.registry.deregister(tcp_stream)
960 }
961
962 fn add_session(&self, session: Rc<RefCell<dyn ProxySession>>) -> Token {
963 let mut session_manager = self.sessions.borrow_mut();
964 let entry = session_manager.slab.vacant_entry();
965 let token = Token(entry.key());
966 let _entry = entry.insert(session);
967 token
968 }
969
970 fn remove_session(&self, token: Token) -> bool {
971 self.sessions
972 .borrow_mut()
973 .slab
974 .try_remove(token.0)
975 .is_some()
976 }
977
978 fn backends(&self) -> Rc<RefCell<BackendMap>> {
979 self.backends.clone()
980 }
981
982 fn clusters(&self) -> &HashMap<ClusterId, Cluster> {
983 &self.clusters
984 }
985}
986
987pub mod testing {
988 use crate::testing::*;
989
990 pub fn start_http_worker(
992 config: HttpListenerConfig,
993 channel: ProxyChannel,
994 max_buffers: usize,
995 buffer_size: usize,
996 ) -> anyhow::Result<()> {
997 let address = config.address.into();
998
999 let ServerParts {
1000 event_loop,
1001 registry,
1002 sessions,
1003 pool,
1004 backends,
1005 client_scm_socket: _,
1006 server_scm_socket,
1007 server_config,
1008 } = prebuild_server(max_buffers, buffer_size, true)?;
1009
1010 let token = {
1011 let mut sessions = sessions.borrow_mut();
1012 let entry = sessions.slab.vacant_entry();
1013 let key = entry.key();
1014 let _ = entry.insert(Rc::new(RefCell::new(ListenSession {
1015 protocol: Protocol::HTTPListen,
1016 })));
1017 Token(key)
1018 };
1019
1020 let mut proxy = HttpProxy::new(registry, sessions.clone(), pool.clone(), backends.clone());
1021 proxy
1022 .add_listener(config, token)
1023 .with_context(|| "Failed at creating adding the listener")?;
1024 proxy
1025 .activate_listener(&address, None)
1026 .with_context(|| "Failed at creating activating the listener")?;
1027
1028 let mut server = Server::new(
1029 event_loop,
1030 channel,
1031 server_scm_socket,
1032 sessions,
1033 pool,
1034 backends,
1035 Some(proxy),
1036 None,
1037 None,
1038 server_config,
1039 None,
1040 false,
1041 )
1042 .with_context(|| "Failed at creating server")?;
1043
1044 debug!("starting event loop");
1045 server.run();
1046 debug!("ending event loop");
1047 Ok(())
1048 }
1049}
1050
1051#[cfg(test)]
1052mod tests {
1053 extern crate tiny_http;
1054
1055 use std::{
1056 io::{Read, Write},
1057 net::TcpStream,
1058 str,
1059 sync::{Arc, Barrier},
1060 thread,
1061 time::Duration,
1062 };
1063
1064 use sozu_command::proto::command::{CustomHttpAnswers, SocketAddress};
1065
1066 use super::{testing::start_http_worker, *};
1067 use crate::sozu_command::{
1068 channel::Channel,
1069 config::ListenerBuilder,
1070 proto::command::{LoadBalancingParams, PathRule, RulePosition, WorkerRequest},
1071 response::{Backend, HttpFrontend},
1072 };
1073
1074 #[test]
1088 fn round_trip() {
1089 setup_test_logger!();
1090 let barrier = Arc::new(Barrier::new(2));
1091 start_server(1025, barrier.clone());
1092 barrier.wait();
1093
1094 let config = ListenerBuilder::new_http(SocketAddress::new_v4(127, 0, 0, 1, 1024))
1095 .to_http(None)
1096 .expect("could not create listener config");
1097
1098 let (mut command, channel) =
1099 Channel::generate(1000, 10000).expect("should create a channel");
1100 let _jg = thread::spawn(move || {
1101 setup_test_logger!();
1102 start_http_worker(config, channel, 10, 16384).expect("could not start the http server");
1103 });
1104
1105 let front = RequestHttpFrontend {
1106 cluster_id: Some(String::from("cluster_1")),
1107 address: SocketAddress::new_v4(127, 0, 0, 1, 1024),
1108 hostname: String::from("localhost"),
1109 path: PathRule::prefix(String::from("/")),
1110 ..Default::default()
1111 };
1112 command
1113 .write_message(&WorkerRequest {
1114 id: String::from("ID_ABCD"),
1115 content: RequestType::AddHttpFrontend(front).into(),
1116 })
1117 .unwrap();
1118 let backend = Backend {
1119 cluster_id: String::from("cluster_1"),
1120 backend_id: String::from("cluster_1-0"),
1121 address: SocketAddress::new_v4(127, 0, 0, 1, 1025).into(),
1122 load_balancing_parameters: Some(LoadBalancingParams::default()),
1123 sticky_id: None,
1124 backup: None,
1125 };
1126 command
1127 .write_message(&WorkerRequest {
1128 id: String::from("ID_EFGH"),
1129 content: RequestType::AddBackend(backend.to_add_backend()).into(),
1130 })
1131 .unwrap();
1132
1133 println!("test received: {:?}", command.read_message());
1134 println!("test received: {:?}", command.read_message());
1135
1136 let mut client = TcpStream::connect(("127.0.0.1", 1024)).expect("could not connect");
1137
1138 client.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
1140 let w = client
1141 .write(&b"GET / HTTP/1.1\r\nHost: localhost:1024\r\nConnection: Close\r\n\r\n"[..]);
1142 println!("http client write: {w:?}");
1143
1144 barrier.wait();
1145 let mut buffer = [0; 4096];
1146 let mut index = 0;
1147
1148 loop {
1149 assert!(index <= 191);
1150 if index == 191 {
1151 break;
1152 }
1153
1154 let r = client.read(&mut buffer[index..]);
1155 println!("http client read: {r:?}");
1156 match r {
1157 Err(e) => assert!(false, "client request should not fail. Error: {e:?}"),
1158 Ok(sz) => {
1159 index += sz;
1160 }
1161 }
1162 }
1163 println!(
1164 "Response: {}",
1165 str::from_utf8(&buffer[..index]).expect("could not make string from buffer")
1166 );
1167 }
1168
1169 #[test]
1170 fn keep_alive() {
1171 setup_test_logger!();
1172 let barrier = Arc::new(Barrier::new(2));
1173 start_server(1028, barrier.clone());
1174 barrier.wait();
1175
1176 let config = ListenerBuilder::new_http(SocketAddress::new_v4(127, 0, 0, 1, 1031))
1177 .to_http(None)
1178 .expect("could not create listener config");
1179
1180 let (mut command, channel) =
1181 Channel::generate(1000, 10000).expect("should create a channel");
1182
1183 let _jg = thread::spawn(move || {
1184 setup_test_logger!();
1185 start_http_worker(config, channel, 10, 16384).expect("could not start the http server");
1186 });
1187
1188 let front = RequestHttpFrontend {
1189 address: SocketAddress::new_v4(127, 0, 0, 1, 1031),
1190 hostname: String::from("localhost"),
1191 path: PathRule::prefix(String::from("/")),
1192 cluster_id: Some(String::from("cluster_1")),
1193 ..Default::default()
1194 };
1195 command
1196 .write_message(&WorkerRequest {
1197 id: String::from("ID_ABCD"),
1198 content: RequestType::AddHttpFrontend(front).into(),
1199 })
1200 .unwrap();
1201 let backend = Backend {
1202 address: SocketAddress::new_v4(127, 0, 0, 1, 1028).into(),
1203 backend_id: String::from("cluster_1-0"),
1204 backup: None,
1205 cluster_id: String::from("cluster_1"),
1206 load_balancing_parameters: Some(LoadBalancingParams::default()),
1207 sticky_id: None,
1208 };
1209 command
1210 .write_message(&WorkerRequest {
1211 id: String::from("ID_EFGH"),
1212 content: RequestType::AddBackend(backend.to_add_backend()).into(),
1213 })
1214 .unwrap();
1215
1216 println!("test received: {:?}", command.read_message());
1217 println!("test received: {:?}", command.read_message());
1218
1219 let mut client = TcpStream::connect(("127.0.0.1", 1031)).expect("could not connect");
1220 client.set_read_timeout(Some(Duration::new(5, 0))).unwrap();
1222
1223 let w = client
1224 .write(&b"GET / HTTP/1.1\r\nHost: localhost:1031\r\n\r\n"[..])
1225 .unwrap();
1226 println!("http client write: {w:?}");
1227 barrier.wait();
1228
1229 let mut buffer = [0; 4096];
1230 let mut index = 0;
1231
1232 loop {
1233 assert!(index <= 191);
1234 if index == 191 {
1235 break;
1236 }
1237
1238 let r = client.read(&mut buffer[index..]);
1239 println!("http client read: {r:?}");
1240 match r {
1241 Err(e) => assert!(false, "client request should not fail. Error: {e:?}"),
1242 Ok(sz) => {
1243 index += sz;
1244 }
1245 }
1246 }
1247
1248 println!(
1249 "Response: {}",
1250 str::from_utf8(&buffer[..index]).expect("could not make string from buffer")
1251 );
1252
1253 println!("first request ended, will send second one");
1254 let w2 = client.write(&b"GET / HTTP/1.1\r\nHost: localhost:1031\r\n\r\n"[..]);
1255 println!("http client write: {w2:?}");
1256 barrier.wait();
1257
1258 let mut buffer2 = [0; 4096];
1259 let mut index = 0;
1260
1261 loop {
1262 assert!(index <= 191);
1263 if index == 191 {
1264 break;
1265 }
1266
1267 let r2 = client.read(&mut buffer2[index..]);
1268 println!("http client read: {r2:?}");
1269 match r2 {
1270 Err(e) => assert!(false, "client request should not fail. Error: {e:?}"),
1271 Ok(sz) => {
1272 index += sz;
1273 }
1274 }
1275 }
1276 println!(
1277 "Response: {}",
1278 str::from_utf8(&buffer2[..index]).expect("could not make string from buffer")
1279 );
1280 }
1281
1282 use self::tiny_http::{Response, Server};
1283
1284 fn start_server(port: u16, barrier: Arc<Barrier>) {
1285 thread::spawn(move || {
1286 setup_test_logger!();
1287 let server =
1288 Server::http(&format!("127.0.0.1:{port}")).expect("could not create server");
1289 info!("starting web server in port {}", port);
1290 barrier.wait();
1291
1292 for request in server.incoming_requests() {
1293 info!(
1294 "backend web server got request -> method: {:?}, url: {:?}, headers: {:?}",
1295 request.method(),
1296 request.url(),
1297 request.headers()
1298 );
1299
1300 let response = Response::from_string("hello world");
1301 request.respond(response).unwrap();
1302 info!("backend web server sent response");
1303 barrier.wait();
1304 info!("server session stopped");
1305 }
1306
1307 println!("server on port {port} closed");
1308 });
1309 }
1310
1311 #[test]
1312 fn frontend_from_request_test() {
1313 let cluster_id1 = "cluster_1".to_owned();
1314 let cluster_id2 = "cluster_2".to_owned();
1315 let cluster_id3 = "cluster_3".to_owned();
1316 let uri1 = "/".to_owned();
1317 let uri2 = "/yolo".to_owned();
1318 let uri3 = "/yolo/swag".to_owned();
1319
1320 let mut fronts = Router::new();
1321 fronts
1322 .add_http_front(&HttpFrontend {
1323 address: "0.0.0.0:80".parse().unwrap(),
1324 hostname: "lolcatho.st".to_owned(),
1325 method: None,
1326 path: PathRule::prefix(uri1),
1327 position: RulePosition::Tree,
1328 cluster_id: Some(cluster_id1),
1329 tags: None,
1330 })
1331 .expect("Could not add http frontend");
1332 fronts
1333 .add_http_front(&HttpFrontend {
1334 address: "0.0.0.0:80".parse().unwrap(),
1335 hostname: "lolcatho.st".to_owned(),
1336 method: None,
1337 path: PathRule::prefix(uri2),
1338 position: RulePosition::Tree,
1339 cluster_id: Some(cluster_id2),
1340 tags: None,
1341 })
1342 .expect("Could not add http frontend");
1343 fronts
1344 .add_http_front(&HttpFrontend {
1345 address: "0.0.0.0:80".parse().unwrap(),
1346 hostname: "lolcatho.st".to_owned(),
1347 method: None,
1348 path: PathRule::prefix(uri3),
1349 position: RulePosition::Tree,
1350 cluster_id: Some(cluster_id3),
1351 tags: None,
1352 })
1353 .expect("Could not add http frontend");
1354 fronts
1355 .add_http_front(&HttpFrontend {
1356 address: "0.0.0.0:80".parse().unwrap(),
1357 hostname: "other.domain".to_owned(),
1358 method: None,
1359 path: PathRule::prefix("/test".to_owned()),
1360 position: RulePosition::Tree,
1361 cluster_id: Some("cluster_1".to_owned()),
1362 tags: None,
1363 })
1364 .expect("Could not add http frontend");
1365
1366 let address = SocketAddress::new_v4(127, 0, 0, 1, 1030);
1367
1368 let default_config = ListenerBuilder::new_http(address.clone())
1369 .to_http(None)
1370 .expect("Could not create default HTTP listener config");
1371
1372 let listener = HttpListener {
1373 listener: None,
1374 address: address.into(),
1375 fronts,
1376 answers: Rc::new(RefCell::new(
1377 HttpAnswers::new(&Some(CustomHttpAnswers::default())).unwrap(),
1378 )),
1379 config: default_config,
1380 token: Token(0),
1381 active: true,
1382 tags: BTreeMap::new(),
1383 };
1384
1385 let frontend1 = listener.frontend_from_request("lolcatho.st", "/", &Method::Get);
1386 let frontend2 = listener.frontend_from_request("lolcatho.st", "/test", &Method::Get);
1387 let frontend3 = listener.frontend_from_request("lolcatho.st", "/yolo/test", &Method::Get);
1388 let frontend4 = listener.frontend_from_request("lolcatho.st", "/yolo/swag", &Method::Get);
1389 let frontend5 = listener.frontend_from_request("domain", "/", &Method::Get);
1390 assert_eq!(
1391 frontend1.expect("should find frontend"),
1392 Route::ClusterId("cluster_1".to_string())
1393 );
1394 assert_eq!(
1395 frontend2.expect("should find frontend"),
1396 Route::ClusterId("cluster_1".to_string())
1397 );
1398 assert_eq!(
1399 frontend3.expect("should find frontend"),
1400 Route::ClusterId("cluster_2".to_string())
1401 );
1402 assert_eq!(
1403 frontend4.expect("should find frontend"),
1404 Route::ClusterId("cluster_3".to_string())
1405 );
1406 assert!(frontend5.is_err());
1407 }
1408}