1use std::{
2 cell::RefCell,
3 collections::{hash_map::Entry, BTreeMap, HashMap},
4 io::ErrorKind,
5 net::{Shutdown, SocketAddr},
6 os::unix::io::AsRawFd,
7 rc::{Rc, Weak},
8 str::from_utf8_unchecked,
9 time::{Duration, Instant},
10};
11
12use mio::{
13 net::{TcpListener as MioTcpListener, TcpStream},
14 unix::SourceFd,
15 Interest, Registry, Token,
16};
17use rusty_ulid::Ulid;
18
19use sozu_command::{
20 logging::CachedTags,
21 proto::command::{
22 request::RequestType, Cluster, HttpListenerConfig, ListenerType, RemoveListener,
23 RequestHttpFrontend, WorkerRequest, WorkerResponse,
24 },
25 ready::Ready,
26 response::HttpFrontend,
27 state::ClusterId,
28};
29
30use crate::{
31 backends::BackendMap,
32 pool::Pool,
33 protocol::{
34 http::{
35 answers::HttpAnswers,
36 parser::{hostname_and_port, Method},
37 ResponseStream,
38 },
39 proxy_protocol::expect::ExpectProxyProtocol,
40 Http, Pipe, SessionState,
41 },
42 router::{Route, Router},
43 server::{ListenToken, SessionManager},
44 socket::server_bind,
45 timer::TimeoutContainer,
46 AcceptError, FrontendFromRequestError, L7ListenerHandler, L7Proxy, ListenerError,
47 ListenerHandler, Protocol, ProxyConfiguration, ProxyError, ProxySession, SessionIsToBeClosed,
48 SessionMetrics, SessionResult, StateMachineBuilder, StateResult,
49};
50
51#[derive(PartialEq, Eq)]
52pub enum SessionStatus {
53 Normal,
54 DefaultAnswer,
55}
56
57StateMachineBuilder! {
58 enum HttpStateMachine impl SessionState {
64 Expect(ExpectProxyProtocol<TcpStream>),
65 Http(Http<TcpStream, HttpListener>),
66 WebSocket(Pipe<TcpStream, HttpListener>),
67 }
68}
69
70pub struct HttpSession {
74 answers: Rc<RefCell<HttpAnswers>>,
75 configured_backend_timeout: Duration,
76 configured_connect_timeout: Duration,
77 configured_frontend_timeout: Duration,
78 frontend_token: Token,
79 last_event: Instant,
80 listener: Rc<RefCell<HttpListener>>,
81 metrics: SessionMetrics,
82 pool: Weak<RefCell<Pool>>,
83 proxy: Rc<RefCell<HttpProxy>>,
84 state: HttpStateMachine,
85 sticky_name: String,
86 has_been_closed: bool,
87}
88
89impl HttpSession {
90 #[allow(clippy::too_many_arguments)]
91 pub fn new(
92 answers: Rc<RefCell<HttpAnswers>>,
93 configured_backend_timeout: Duration,
94 configured_connect_timeout: Duration,
95 configured_frontend_timeout: Duration,
96 configured_request_timeout: Duration,
97 expect_proxy: bool,
98 listener: Rc<RefCell<HttpListener>>,
99 pool: Weak<RefCell<Pool>>,
100 proxy: Rc<RefCell<HttpProxy>>,
101 public_address: SocketAddr,
102 sock: TcpStream,
103 sticky_name: String,
104 token: Token,
105 wait_time: Duration,
106 ) -> Result<Self, AcceptError> {
107 let request_id = Ulid::generate();
108 let container_frontend_timeout = TimeoutContainer::new(configured_request_timeout, token);
109
110 let state = if expect_proxy {
111 trace!("starting in expect proxy state");
112 gauge_add!("protocol.proxy.expect", 1);
113
114 HttpStateMachine::Expect(ExpectProxyProtocol::new(
115 container_frontend_timeout,
116 sock,
117 token,
118 request_id,
119 ))
120 } else {
121 gauge_add!("protocol.http", 1);
122 let session_address = sock.peer_addr().ok();
123
124 HttpStateMachine::Http(Http::new(
125 answers.clone(),
126 configured_backend_timeout,
127 configured_connect_timeout,
128 configured_frontend_timeout,
129 container_frontend_timeout,
130 sock,
131 token,
132 listener.clone(),
133 pool.clone(),
134 Protocol::HTTP,
135 public_address,
136 request_id,
137 session_address,
138 sticky_name.clone(),
139 )?)
140 };
141
142 let metrics = SessionMetrics::new(Some(wait_time));
143 Ok(HttpSession {
144 answers,
145 configured_backend_timeout,
146 configured_connect_timeout,
147 configured_frontend_timeout,
148 frontend_token: token,
149 has_been_closed: false,
150 last_event: Instant::now(),
151 listener,
152 metrics,
153 pool,
154 proxy,
155 state,
156 sticky_name,
157 })
158 }
159
160 pub fn upgrade(&mut self) -> SessionIsToBeClosed {
161 debug!("HTTP::upgrade");
162 let new_state = match self.state.take() {
163 HttpStateMachine::Http(http) => self.upgrade_http(http),
164 HttpStateMachine::Expect(expect) => self.upgrade_expect(expect),
165 HttpStateMachine::WebSocket(ws) => self.upgrade_websocket(ws),
166 HttpStateMachine::FailedUpgrade(_) => unreachable!(),
167 };
168
169 match new_state {
170 Some(state) => {
171 self.state = state;
172 false
173 }
174 None => true,
176 }
177 }
178
179 fn upgrade_expect(
180 &mut self,
181 expect: ExpectProxyProtocol<TcpStream>,
182 ) -> Option<HttpStateMachine> {
183 debug!("switching to HTTP");
184 match expect
185 .addresses
186 .as_ref()
187 .map(|add| (add.destination(), add.source()))
188 {
189 Some((Some(public_address), Some(session_address))) => {
190 let mut http = Http::new(
191 self.answers.clone(),
192 self.configured_backend_timeout,
193 self.configured_connect_timeout,
194 self.configured_frontend_timeout,
195 expect.container_frontend_timeout,
196 expect.frontend,
197 expect.frontend_token,
198 self.listener.clone(),
199 self.pool.clone(),
200 Protocol::HTTP,
201 public_address,
202 expect.request_id,
203 Some(session_address),
204 self.sticky_name.clone(),
205 )
206 .ok()?;
207 http.frontend_readiness.event = expect.frontend_readiness.event;
208
209 gauge_add!("protocol.proxy.expect", -1);
210 gauge_add!("protocol.http", 1);
211 Some(HttpStateMachine::Http(http))
212 }
213 _ => None,
214 }
215 }
216
217 fn upgrade_http(&mut self, http: Http<TcpStream, HttpListener>) -> Option<HttpStateMachine> {
218 debug!("http switching to ws");
219 let front_token = self.frontend_token;
220 let back_token = match http.backend_token {
221 Some(back_token) => back_token,
222 None => {
223 warn!(
224 "Could not upgrade http request on cluster '{:?}' ({:?}) using backend '{:?}' into websocket for request '{}'",
225 http.context.cluster_id, self.frontend_token, http.context.backend_id, http.context.id
226 );
227 return None;
228 }
229 };
230
231 let ws_context = http.websocket_context();
232 let mut container_frontend_timeout = http.container_frontend_timeout;
233 let mut container_backend_timeout = http.container_backend_timeout;
234 container_frontend_timeout.reset();
235 container_backend_timeout.reset();
236
237 let backend_buffer = if let ResponseStream::BackendAnswer(kawa) = http.response_stream {
238 kawa.storage.buffer
239 } else {
240 return None;
241 };
242
243 let mut pipe = Pipe::new(
244 backend_buffer,
245 http.context.backend_id,
246 http.backend_socket,
247 http.backend,
248 Some(container_backend_timeout),
249 Some(container_frontend_timeout),
250 http.context.cluster_id,
251 http.request_stream.storage.buffer,
252 front_token,
253 http.frontend_socket,
254 self.listener.clone(),
255 Protocol::HTTP,
256 http.context.id,
257 http.context.session_address,
258 ws_context,
259 );
260
261 pipe.frontend_readiness.event = http.frontend_readiness.event;
262 pipe.backend_readiness.event = http.backend_readiness.event;
263 pipe.set_back_token(back_token);
264
265 gauge_add!("protocol.http", -1);
266 gauge_add!("protocol.ws", 1);
267 gauge_add!("http.active_requests", -1);
268 gauge_add!("websocket.active_requests", 1);
269 Some(HttpStateMachine::WebSocket(pipe))
270 }
271
272 fn upgrade_websocket(&self, ws: Pipe<TcpStream, HttpListener>) -> Option<HttpStateMachine> {
273 error!("Upgrade called on WS, this should not happen");
275 Some(HttpStateMachine::WebSocket(ws))
276 }
277}
278
279impl ProxySession for HttpSession {
280 fn close(&mut self) {
281 if self.has_been_closed {
282 return;
283 }
284
285 trace!("Closing HTTP session");
286 self.metrics.service_stop();
287
288 match self.state.marker() {
290 StateMarker::Expect => gauge_add!("protocol.proxy.expect", -1),
291 StateMarker::Http => gauge_add!("protocol.http", -1),
292 StateMarker::WebSocket => {
293 gauge_add!("protocol.ws", -1);
294 gauge_add!("websocket.active_requests", -1);
295 }
296 }
297
298 if self.state.failed() {
299 match self.state.marker() {
300 StateMarker::Expect => incr!("http.upgrade.expect.failed"),
301 StateMarker::Http => incr!("http.upgrade.http.failed"),
302 StateMarker::WebSocket => incr!("http.upgrade.ws.failed"),
303 }
304 return;
305 }
306
307 self.state.cancel_timeouts();
308 self.state.close(self.proxy.clone(), &mut self.metrics);
310
311 let front_socket = self.state.front_socket();
312 if let Err(e) = front_socket.shutdown(Shutdown::Both) {
313 if e.kind() != ErrorKind::NotConnected {
315 error!(
316 "error shutting down front socket({:?}): {:?}",
317 front_socket, e
318 )
319 }
320 }
321
322 let proxy = self.proxy.borrow();
324 let fd = front_socket.as_raw_fd();
325 if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
326 error!(
327 "error deregistering front socket({:?}) while closing HTTP session: {:?}",
328 fd, e
329 );
330 }
331 proxy.remove_session(self.frontend_token);
332
333 self.has_been_closed = true;
334 }
335
336 fn timeout(&mut self, token: Token) -> SessionIsToBeClosed {
337 let state_result = self.state.timeout(token, &mut self.metrics);
338 state_result == StateResult::CloseSession
339 }
340
341 fn protocol(&self) -> Protocol {
342 Protocol::HTTP
343 }
344
345 fn update_readiness(&mut self, token: Token, events: Ready) {
346 trace!(
347 "token {:?} got event {}",
348 token,
349 super::ready_to_string(events)
350 );
351 self.last_event = Instant::now();
352 self.metrics.wait_start();
353 self.state.update_readiness(token, events);
354 }
355
356 fn ready(&mut self, session: Rc<RefCell<dyn ProxySession>>) -> SessionIsToBeClosed {
357 self.metrics.service_start();
358
359 let session_result =
360 self.state
361 .ready(session.clone(), self.proxy.clone(), &mut self.metrics);
362
363 let to_be_closed = match session_result {
364 SessionResult::Close => true,
365 SessionResult::Continue => false,
366 SessionResult::Upgrade => match self.upgrade() {
367 false => self.ready(session),
368 true => true,
369 },
370 };
371
372 self.metrics.service_stop();
373 to_be_closed
374 }
375
376 fn shutting_down(&mut self) -> SessionIsToBeClosed {
377 self.state.shutting_down()
378 }
379
380 fn last_event(&self) -> Instant {
381 self.last_event
382 }
383
384 fn print_session(&self) {
385 self.state.print_state("HTTP");
386 error!("Metrics: {:?}", self.metrics);
387 }
388
389 fn frontend_token(&self) -> Token {
390 self.frontend_token
391 }
392}
393
394pub type Hostname = String;
395
396pub struct HttpListener {
397 active: bool,
398 address: SocketAddr,
399 answers: Rc<RefCell<HttpAnswers>>,
400 config: HttpListenerConfig,
401 fronts: Router,
402 listener: Option<MioTcpListener>,
403 tags: BTreeMap<String, CachedTags>,
404 token: Token,
405}
406
407impl ListenerHandler for HttpListener {
408 fn get_addr(&self) -> &SocketAddr {
409 &self.address
410 }
411
412 fn get_tags(&self, key: &str) -> Option<&CachedTags> {
413 self.tags.get(key)
414 }
415
416 fn set_tags(&mut self, key: String, tags: Option<BTreeMap<String, String>>) {
417 match tags {
418 Some(tags) => self.tags.insert(key, CachedTags::new(tags)),
419 None => self.tags.remove(&key),
420 };
421 }
422}
423
424impl L7ListenerHandler for HttpListener {
425 fn get_sticky_name(&self) -> &str {
426 &self.config.sticky_name
427 }
428
429 fn get_connect_timeout(&self) -> u32 {
430 self.config.connect_timeout
431 }
432
433 fn frontend_from_request(
435 &self,
436 host: &str,
437 uri: &str,
438 method: &Method,
439 ) -> Result<Route, FrontendFromRequestError> {
440 let start = Instant::now();
441 let (remaining_input, (hostname, _)) = match hostname_and_port(host.as_bytes()) {
442 Ok(tuple) => tuple,
443 Err(parse_error) => {
444 return Err(FrontendFromRequestError::HostParse {
446 host: host.to_owned(),
447 error: parse_error.to_string(),
448 });
449 }
450 };
451 if remaining_input != &b""[..] {
452 return Err(FrontendFromRequestError::InvalidCharsAfterHost(
453 host.to_owned(),
454 ));
455 }
456
457 let host = unsafe { from_utf8_unchecked(hostname) };
467
468 let route = self.fronts.lookup(host, uri, method).map_err(|e| {
469 incr!("http.failed_backend_matching");
470 FrontendFromRequestError::NoClusterFound(e)
471 })?;
472
473 let now = Instant::now();
474
475 if let Route::ClusterId(cluster) = &route {
476 time!("frontend_matching_time", cluster, (now - start).as_millis());
477 }
478
479 Ok(route)
480 }
481}
482
483pub struct HttpProxy {
484 backends: Rc<RefCell<BackendMap>>,
485 clusters: HashMap<ClusterId, Cluster>,
486 listeners: HashMap<Token, Rc<RefCell<HttpListener>>>,
487 pool: Rc<RefCell<Pool>>,
488 registry: Registry,
489 sessions: Rc<RefCell<SessionManager>>,
490}
491
492impl HttpProxy {
493 pub fn new(
494 registry: Registry,
495 sessions: Rc<RefCell<SessionManager>>,
496 pool: Rc<RefCell<Pool>>,
497 backends: Rc<RefCell<BackendMap>>,
498 ) -> HttpProxy {
499 HttpProxy {
500 backends,
501 clusters: HashMap::new(),
502 listeners: HashMap::new(),
503 pool,
504 registry,
505 sessions,
506 }
507 }
508
509 pub fn add_listener(
510 &mut self,
511 config: HttpListenerConfig,
512 token: Token,
513 ) -> Result<Token, ProxyError> {
514 match self.listeners.entry(token) {
515 Entry::Vacant(entry) => {
516 let http_listener =
517 HttpListener::new(config, token).map_err(ProxyError::AddListener)?;
518 entry.insert(Rc::new(RefCell::new(http_listener)));
519 Ok(token)
520 }
521 _ => Err(ProxyError::ListenerAlreadyPresent),
522 }
523 }
524
525 pub fn get_listener(&self, token: &Token) -> Option<Rc<RefCell<HttpListener>>> {
526 self.listeners.get(token).cloned()
527 }
528
529 pub fn remove_listener(&mut self, remove: RemoveListener) -> Result<(), ProxyError> {
530 let len = self.listeners.len();
531 let remove_address = remove.address.into();
532 self.listeners
533 .retain(|_, l| l.borrow().address != remove_address);
534
535 if !self.listeners.len() < len {
536 info!("no HTTP listener to remove at address {:?}", remove_address);
537 }
538 Ok(())
539 }
540
541 pub fn activate_listener(
542 &self,
543 addr: &SocketAddr,
544 tcp_listener: Option<MioTcpListener>,
545 ) -> Result<Token, ProxyError> {
546 let listener = self
547 .listeners
548 .values()
549 .find(|listener| listener.borrow().address == *addr)
550 .ok_or(ProxyError::NoListenerFound(addr.to_owned()))?;
551
552 listener
553 .borrow_mut()
554 .activate(&self.registry, tcp_listener)
555 .map_err(|listener_error| ProxyError::ListenerActivation {
556 address: *addr,
557 listener_error,
558 })
559 }
560
561 pub fn give_back_listeners(&mut self) -> Vec<(SocketAddr, MioTcpListener)> {
562 self.listeners
563 .iter()
564 .filter_map(|(_, listener)| {
565 let mut owned = listener.borrow_mut();
566 if let Some(listener) = owned.listener.take() {
567 return Some((owned.address, listener));
568 }
569
570 None
571 })
572 .collect()
573 }
574
575 pub fn give_back_listener(
576 &mut self,
577 address: SocketAddr,
578 ) -> Result<(Token, MioTcpListener), ProxyError> {
579 let listener = self
580 .listeners
581 .values()
582 .find(|listener| listener.borrow().address == address)
583 .ok_or(ProxyError::NoListenerFound(address))?;
584
585 let mut owned = listener.borrow_mut();
586
587 let taken_listener = owned
588 .listener
589 .take()
590 .ok_or(ProxyError::UnactivatedListener)?;
591
592 Ok((owned.token, taken_listener))
593 }
594
595 pub fn add_cluster(&mut self, mut cluster: Cluster) -> Result<(), ProxyError> {
596 if let Some(answer_503) = cluster.answer_503.take() {
597 for listener in self.listeners.values() {
598 listener
599 .borrow()
600 .answers
601 .borrow_mut()
602 .add_custom_answer(&cluster.cluster_id, answer_503.clone())
603 .map_err(|(status, error)| {
604 ProxyError::AddCluster(ListenerError::TemplateParse(status, error))
605 })?;
606 }
607 }
608 self.clusters.insert(cluster.cluster_id.clone(), cluster);
609 Ok(())
610 }
611
612 pub fn remove_cluster(&mut self, cluster_id: &str) -> Result<(), ProxyError> {
613 self.clusters.remove(cluster_id);
614
615 for listener in self.listeners.values() {
616 listener
617 .borrow()
618 .answers
619 .borrow_mut()
620 .remove_custom_answer(cluster_id);
621 }
622 Ok(())
623 }
624
625 pub fn add_http_frontend(&mut self, front: RequestHttpFrontend) -> Result<(), ProxyError> {
626 let front = front.clone().to_frontend().map_err(|request_error| {
627 ProxyError::WrongInputFrontend {
628 front,
629 error: request_error.to_string(),
630 }
631 })?;
632
633 let mut listener = self
634 .listeners
635 .values()
636 .find(|l| l.borrow().address == front.address)
637 .ok_or(ProxyError::NoListenerFound(front.address))?
638 .borrow_mut();
639
640 let hostname = front.hostname.to_owned();
641 let tags = front.tags.to_owned();
642
643 listener
644 .add_http_front(front)
645 .map_err(ProxyError::AddFrontend)?;
646 listener.set_tags(hostname, tags);
647 Ok(())
648 }
649
650 pub fn remove_http_frontend(&mut self, front: RequestHttpFrontend) -> Result<(), ProxyError> {
651 let front = front.clone().to_frontend().map_err(|request_error| {
652 ProxyError::WrongInputFrontend {
653 front,
654 error: request_error.to_string(),
655 }
656 })?;
657
658 let mut listener = self
659 .listeners
660 .values()
661 .find(|l| l.borrow().address == front.address)
662 .ok_or(ProxyError::NoListenerFound(front.address))?
663 .borrow_mut();
664
665 let hostname = front.hostname.to_owned();
666
667 listener
668 .remove_http_front(front)
669 .map_err(ProxyError::RemoveFrontend)?;
670
671 listener.set_tags(hostname, None);
672 Ok(())
673 }
674
675 pub fn soft_stop(&mut self) -> Result<(), ProxyError> {
676 let listeners: HashMap<_, _> = self.listeners.drain().collect();
677 let mut socket_errors = vec![];
678 for (_, l) in listeners.iter() {
679 if let Some(mut sock) = l.borrow_mut().listener.take() {
680 debug!("Deregistering socket {:?}", sock);
681 if let Err(e) = self.registry.deregister(&mut sock) {
682 let error = format!("socket {sock:?}: {e:?}");
683 socket_errors.push(error);
684 }
685 }
686 }
687
688 if !socket_errors.is_empty() {
689 return Err(ProxyError::SoftStop {
690 proxy_protocol: "HTTP".to_string(),
691 error: format!("Error deregistering listen sockets: {:?}", socket_errors),
692 });
693 }
694
695 Ok(())
696 }
697
698 pub fn hard_stop(&mut self) -> Result<(), ProxyError> {
699 let mut listeners: HashMap<_, _> = self.listeners.drain().collect();
700 let mut socket_errors = vec![];
701 for (_, l) in listeners.drain() {
702 if let Some(mut sock) = l.borrow_mut().listener.take() {
703 debug!("Deregistering socket {:?}", sock);
704 if let Err(e) = self.registry.deregister(&mut sock) {
705 let error = format!("socket {sock:?}: {e:?}");
706 socket_errors.push(error);
707 }
708 }
709 }
710
711 if !socket_errors.is_empty() {
712 return Err(ProxyError::HardStop {
713 proxy_protocol: "HTTP".to_string(),
714 error: format!("Error deregistering listen sockets: {:?}", socket_errors),
715 });
716 }
717
718 Ok(())
719 }
720}
721
722impl HttpListener {
723 pub fn new(config: HttpListenerConfig, token: Token) -> Result<HttpListener, ListenerError> {
724 Ok(HttpListener {
725 active: false,
726 address: config.address.into(),
727 answers: Rc::new(RefCell::new(
728 HttpAnswers::new(&config.http_answers)
729 .map_err(|(status, error)| ListenerError::TemplateParse(status, error))?,
730 )),
731 config,
732 fronts: Router::new(),
733 listener: None,
734 tags: BTreeMap::new(),
735 token,
736 })
737 }
738
739 pub fn activate(
740 &mut self,
741 registry: &Registry,
742 tcp_listener: Option<MioTcpListener>,
743 ) -> Result<Token, ListenerError> {
744 if self.active {
745 return Ok(self.token);
746 }
747 let address: SocketAddr = self.config.address.into();
748
749 let mut listener = match tcp_listener {
750 Some(tcp_listener) => tcp_listener,
751 None => {
752 server_bind(address).map_err(|server_bind_error| ListenerError::Activation {
753 address,
754 error: server_bind_error.to_string(),
755 })?
756 }
757 };
758
759 registry
760 .register(&mut listener, self.token, Interest::READABLE)
761 .map_err(ListenerError::SocketRegistration)?;
762
763 self.listener = Some(listener);
764 self.active = true;
765 Ok(self.token)
766 }
767
768 pub fn add_http_front(&mut self, http_front: HttpFrontend) -> Result<(), ListenerError> {
769 self.fronts
770 .add_http_front(&http_front)
771 .map_err(ListenerError::AddFrontend)
772 }
773
774 pub fn remove_http_front(&mut self, http_front: HttpFrontend) -> Result<(), ListenerError> {
775 debug!("removing http_front {:?}", http_front);
776 self.fronts
777 .remove_http_front(&http_front)
778 .map_err(ListenerError::RemoveFrontend)
779 }
780
781 fn accept(&mut self) -> Result<TcpStream, AcceptError> {
782 if let Some(ref sock) = self.listener {
783 sock.accept()
784 .map_err(|e| match e.kind() {
785 ErrorKind::WouldBlock => AcceptError::WouldBlock,
786 _ => {
787 error!("accept() IO error: {:?}", e);
788 AcceptError::IoError
789 }
790 })
791 .map(|(sock, _)| sock)
792 } else {
793 error!("cannot accept connections, no listening socket available");
794 Err(AcceptError::IoError)
795 }
796 }
797}
798
799impl ProxyConfiguration for HttpProxy {
800 fn notify(&mut self, request: WorkerRequest) -> WorkerResponse {
801 let request_id = request.id.clone();
802
803 let result = match request.content.request_type {
804 Some(RequestType::AddCluster(cluster)) => {
805 debug!("{} add cluster {:?}", request.id, cluster);
806 self.add_cluster(cluster)
807 }
808 Some(RequestType::RemoveCluster(cluster_id)) => {
809 debug!("{} remove cluster {:?}", request_id, cluster_id);
810 self.remove_cluster(&cluster_id)
811 }
812 Some(RequestType::AddHttpFrontend(front)) => {
813 debug!("{} add front {:?}", request_id, front);
814 self.add_http_frontend(front)
815 }
816 Some(RequestType::RemoveHttpFrontend(front)) => {
817 debug!("{} remove front {:?}", request_id, front);
818 self.remove_http_frontend(front)
819 }
820 Some(RequestType::RemoveListener(remove)) => {
821 debug!("removing HTTP listener at address {:?}", remove.address);
822 self.remove_listener(remove)
823 }
824 Some(RequestType::SoftStop(_)) => {
825 debug!("{} processing soft shutdown", request_id);
826 match self.soft_stop() {
827 Ok(()) => {
828 info!("{} soft stop successful", request_id);
829 return WorkerResponse::processing(request.id);
830 }
831 Err(e) => Err(e),
832 }
833 }
834 Some(RequestType::HardStop(_)) => {
835 debug!("{} processing hard shutdown", request_id);
836 match self.hard_stop() {
837 Ok(()) => {
838 info!("{} hard stop successful", request_id);
839 return WorkerResponse::processing(request.id);
840 }
841 Err(e) => Err(e),
842 }
843 }
844 Some(RequestType::Status(_)) => {
845 debug!("{} status", request_id);
846 Ok(())
847 }
848 other_command => {
849 debug!(
850 "{} unsupported message for HTTP proxy, ignoring: {:?}",
851 request.id, other_command
852 );
853 Err(ProxyError::UnsupportedMessage)
854 }
855 };
856
857 match result {
858 Ok(()) => {
859 debug!("{} successful", request_id);
860 WorkerResponse::ok(request_id)
861 }
862 Err(proxy_error) => {
863 debug!("{} unsuccessful: {}", request_id, proxy_error);
864 WorkerResponse::error(request_id, proxy_error)
865 }
866 }
867 }
868
869 fn accept(&mut self, token: ListenToken) -> Result<TcpStream, AcceptError> {
870 if let Some(listener) = self.listeners.get(&Token(token.0)) {
871 listener.borrow_mut().accept()
872 } else {
873 Err(AcceptError::IoError)
874 }
875 }
876
877 fn create_session(
878 &mut self,
879 mut frontend_sock: TcpStream,
880 listener_token: ListenToken,
881 wait_time: Duration,
882 proxy: Rc<RefCell<Self>>,
883 ) -> Result<(), AcceptError> {
884 let listener = self
885 .listeners
886 .get(&Token(listener_token.0))
887 .cloned()
888 .ok_or(AcceptError::IoError)?;
889
890 if let Err(e) = frontend_sock.set_nodelay(true) {
891 error!(
892 "error setting nodelay on front socket({:?}): {:?}",
893 frontend_sock, e
894 );
895 }
896 let mut session_manager = self.sessions.borrow_mut();
897 let session_entry = session_manager.slab.vacant_entry();
898 let session_token = Token(session_entry.key());
899 let owned = listener.borrow();
900
901 if let Err(register_error) = self.registry.register(
902 &mut frontend_sock,
903 session_token,
904 Interest::READABLE | Interest::WRITABLE,
905 ) {
906 error!(
907 "error registering listen socket({:?}): {:?}",
908 frontend_sock, register_error
909 );
910 return Err(AcceptError::RegisterError);
911 }
912
913 let public_address: SocketAddr = match owned.config.public_address {
914 Some(pub_addr) => pub_addr.into(),
915 None => owned.config.address.into(),
916 };
917
918 let session = HttpSession::new(
919 owned.answers.clone(),
920 Duration::from_secs(owned.config.back_timeout as u64),
921 Duration::from_secs(owned.config.connect_timeout as u64),
922 Duration::from_secs(owned.config.front_timeout as u64),
923 Duration::from_secs(owned.config.request_timeout as u64),
924 owned.config.expect_proxy,
925 listener.clone(),
926 Rc::downgrade(&self.pool),
927 proxy,
928 public_address,
929 frontend_sock,
930 owned.config.sticky_name.clone(),
931 session_token,
932 wait_time,
933 )?;
934
935 let session = Rc::new(RefCell::new(session));
936 session_entry.insert(session);
937
938 Ok(())
939 }
940}
941
942impl L7Proxy for HttpProxy {
943 fn kind(&self) -> ListenerType {
944 ListenerType::Http
945 }
946
947 fn register_socket(
948 &self,
949 source: &mut TcpStream,
950 token: Token,
951 interest: Interest,
952 ) -> Result<(), std::io::Error> {
953 self.registry.register(source, token, interest)
954 }
955
956 fn deregister_socket(&self, tcp_stream: &mut TcpStream) -> Result<(), std::io::Error> {
957 self.registry.deregister(tcp_stream)
958 }
959
960 fn add_session(&self, session: Rc<RefCell<dyn ProxySession>>) -> Token {
961 let mut session_manager = self.sessions.borrow_mut();
962 let entry = session_manager.slab.vacant_entry();
963 let token = Token(entry.key());
964 let _entry = entry.insert(session);
965 token
966 }
967
968 fn remove_session(&self, token: Token) -> bool {
969 self.sessions
970 .borrow_mut()
971 .slab
972 .try_remove(token.0)
973 .is_some()
974 }
975
976 fn backends(&self) -> Rc<RefCell<BackendMap>> {
977 self.backends.clone()
978 }
979
980 fn clusters(&self) -> &HashMap<ClusterId, Cluster> {
981 &self.clusters
982 }
983}
984
985pub mod testing {
986 use crate::testing::*;
987
988 pub fn start_http_worker(
990 config: HttpListenerConfig,
991 channel: ProxyChannel,
992 max_buffers: usize,
993 buffer_size: usize,
994 ) -> anyhow::Result<()> {
995 let address = config.address.into();
996
997 let ServerParts {
998 event_loop,
999 registry,
1000 sessions,
1001 pool,
1002 backends,
1003 client_scm_socket: _,
1004 server_scm_socket,
1005 server_config,
1006 } = prebuild_server(max_buffers, buffer_size, true)?;
1007
1008 let token = {
1009 let mut sessions = sessions.borrow_mut();
1010 let entry = sessions.slab.vacant_entry();
1011 let key = entry.key();
1012 let _ = entry.insert(Rc::new(RefCell::new(ListenSession {
1013 protocol: Protocol::HTTPListen,
1014 })));
1015 Token(key)
1016 };
1017
1018 let mut proxy = HttpProxy::new(registry, sessions.clone(), pool.clone(), backends.clone());
1019 proxy
1020 .add_listener(config, token)
1021 .with_context(|| "Failed at creating adding the listener")?;
1022 proxy
1023 .activate_listener(&address, None)
1024 .with_context(|| "Failed at creating activating the listener")?;
1025
1026 let mut server = Server::new(
1027 event_loop,
1028 channel,
1029 server_scm_socket,
1030 sessions,
1031 pool,
1032 backends,
1033 Some(proxy),
1034 None,
1035 None,
1036 server_config,
1037 None,
1038 false,
1039 )
1040 .with_context(|| "Failed at creating server")?;
1041
1042 debug!("starting event loop");
1043 server.run();
1044 debug!("ending event loop");
1045 Ok(())
1046 }
1047}
1048
1049#[cfg(test)]
1050mod tests {
1051 extern crate tiny_http;
1052
1053 use super::testing::start_http_worker;
1054 use super::*;
1055 use sozu_command::proto::command::{CustomHttpAnswers, SocketAddress};
1056
1057 use crate::sozu_command::{
1058 channel::Channel,
1059 config::ListenerBuilder,
1060 proto::command::{LoadBalancingParams, PathRule, RulePosition, WorkerRequest},
1061 response::{Backend, HttpFrontend},
1062 };
1063
1064 use std::{
1065 io::{Read, Write},
1066 net::TcpStream,
1067 str,
1068 sync::{Arc, Barrier},
1069 thread,
1070 time::Duration,
1071 };
1072
1073 #[test]
1087 fn round_trip() {
1088 setup_test_logger!();
1089 let barrier = Arc::new(Barrier::new(2));
1090 start_server(1025, barrier.clone());
1091 barrier.wait();
1092
1093 let config = ListenerBuilder::new_http(SocketAddress::new_v4(127, 0, 0, 1, 1024))
1094 .to_http(None)
1095 .expect("could not create listener config");
1096
1097 let (mut command, channel) =
1098 Channel::generate(1000, 10000).expect("should create a channel");
1099 let _jg = thread::spawn(move || {
1100 setup_test_logger!();
1101 start_http_worker(config, channel, 10, 16384).expect("could not start the http server");
1102 });
1103
1104 let front = RequestHttpFrontend {
1105 cluster_id: Some(String::from("cluster_1")),
1106 address: SocketAddress::new_v4(127, 0, 0, 1, 1024),
1107 hostname: String::from("localhost"),
1108 path: PathRule::prefix(String::from("/")),
1109 ..Default::default()
1110 };
1111 command
1112 .write_message(&WorkerRequest {
1113 id: String::from("ID_ABCD"),
1114 content: RequestType::AddHttpFrontend(front).into(),
1115 })
1116 .unwrap();
1117 let backend = Backend {
1118 cluster_id: String::from("cluster_1"),
1119 backend_id: String::from("cluster_1-0"),
1120 address: SocketAddress::new_v4(127, 0, 0, 1, 1025).into(),
1121 load_balancing_parameters: Some(LoadBalancingParams::default()),
1122 sticky_id: None,
1123 backup: None,
1124 };
1125 command
1126 .write_message(&WorkerRequest {
1127 id: String::from("ID_EFGH"),
1128 content: RequestType::AddBackend(backend.to_add_backend()).into(),
1129 })
1130 .unwrap();
1131
1132 println!("test received: {:?}", command.read_message());
1133 println!("test received: {:?}", command.read_message());
1134
1135 let mut client = TcpStream::connect(("127.0.0.1", 1024)).expect("could not connect");
1136
1137 client.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
1139 let w = client
1140 .write(&b"GET / HTTP/1.1\r\nHost: localhost:1024\r\nConnection: Close\r\n\r\n"[..]);
1141 println!("http client write: {w:?}");
1142
1143 barrier.wait();
1144 let mut buffer = [0; 4096];
1145 let mut index = 0;
1146
1147 loop {
1148 assert!(index <= 191);
1149 if index == 191 {
1150 break;
1151 }
1152
1153 let r = client.read(&mut buffer[index..]);
1154 println!("http client read: {r:?}");
1155 match r {
1156 Err(e) => assert!(false, "client request should not fail. Error: {e:?}"),
1157 Ok(sz) => {
1158 index += sz;
1159 }
1160 }
1161 }
1162 println!(
1163 "Response: {}",
1164 str::from_utf8(&buffer[..index]).expect("could not make string from buffer")
1165 );
1166 }
1167
1168 #[test]
1169 fn keep_alive() {
1170 setup_test_logger!();
1171 let barrier = Arc::new(Barrier::new(2));
1172 start_server(1028, barrier.clone());
1173 barrier.wait();
1174
1175 let config = ListenerBuilder::new_http(SocketAddress::new_v4(127, 0, 0, 1, 1031))
1176 .to_http(None)
1177 .expect("could not create listener config");
1178
1179 let (mut command, channel) =
1180 Channel::generate(1000, 10000).expect("should create a channel");
1181
1182 let _jg = thread::spawn(move || {
1183 setup_test_logger!();
1184 start_http_worker(config, channel, 10, 16384).expect("could not start the http server");
1185 });
1186
1187 let front = RequestHttpFrontend {
1188 address: SocketAddress::new_v4(127, 0, 0, 1, 1031),
1189 hostname: String::from("localhost"),
1190 path: PathRule::prefix(String::from("/")),
1191 cluster_id: Some(String::from("cluster_1")),
1192 ..Default::default()
1193 };
1194 command
1195 .write_message(&WorkerRequest {
1196 id: String::from("ID_ABCD"),
1197 content: RequestType::AddHttpFrontend(front).into(),
1198 })
1199 .unwrap();
1200 let backend = Backend {
1201 address: SocketAddress::new_v4(127, 0, 0, 1, 1028).into(),
1202 backend_id: String::from("cluster_1-0"),
1203 backup: None,
1204 cluster_id: String::from("cluster_1"),
1205 load_balancing_parameters: Some(LoadBalancingParams::default()),
1206 sticky_id: None,
1207 };
1208 command
1209 .write_message(&WorkerRequest {
1210 id: String::from("ID_EFGH"),
1211 content: RequestType::AddBackend(backend.to_add_backend()).into(),
1212 })
1213 .unwrap();
1214
1215 println!("test received: {:?}", command.read_message());
1216 println!("test received: {:?}", command.read_message());
1217
1218 let mut client = TcpStream::connect(("127.0.0.1", 1031)).expect("could not connect");
1219 client.set_read_timeout(Some(Duration::new(5, 0))).unwrap();
1221
1222 let w = client
1223 .write(&b"GET / HTTP/1.1\r\nHost: localhost:1031\r\n\r\n"[..])
1224 .unwrap();
1225 println!("http client write: {w:?}");
1226 barrier.wait();
1227
1228 let mut buffer = [0; 4096];
1229 let mut index = 0;
1230
1231 loop {
1232 assert!(index <= 191);
1233 if index == 191 {
1234 break;
1235 }
1236
1237 let r = client.read(&mut buffer[index..]);
1238 println!("http client read: {r:?}");
1239 match r {
1240 Err(e) => assert!(false, "client request should not fail. Error: {e:?}"),
1241 Ok(sz) => {
1242 index += sz;
1243 }
1244 }
1245 }
1246
1247 println!(
1248 "Response: {}",
1249 str::from_utf8(&buffer[..index]).expect("could not make string from buffer")
1250 );
1251
1252 println!("first request ended, will send second one");
1253 let w2 = client.write(&b"GET / HTTP/1.1\r\nHost: localhost:1031\r\n\r\n"[..]);
1254 println!("http client write: {w2:?}");
1255 barrier.wait();
1256
1257 let mut buffer2 = [0; 4096];
1258 let mut index = 0;
1259
1260 loop {
1261 assert!(index <= 191);
1262 if index == 191 {
1263 break;
1264 }
1265
1266 let r2 = client.read(&mut buffer2[index..]);
1267 println!("http client read: {r2:?}");
1268 match r2 {
1269 Err(e) => assert!(false, "client request should not fail. Error: {e:?}"),
1270 Ok(sz) => {
1271 index += sz;
1272 }
1273 }
1274 }
1275 println!(
1276 "Response: {}",
1277 str::from_utf8(&buffer2[..index]).expect("could not make string from buffer")
1278 );
1279 }
1280
1281 use self::tiny_http::{Response, Server};
1282
1283 fn start_server(port: u16, barrier: Arc<Barrier>) {
1284 thread::spawn(move || {
1285 setup_test_logger!();
1286 let server =
1287 Server::http(&format!("127.0.0.1:{port}")).expect("could not create server");
1288 info!("starting web server in port {}", port);
1289 barrier.wait();
1290
1291 for request in server.incoming_requests() {
1292 info!(
1293 "backend web server got request -> method: {:?}, url: {:?}, headers: {:?}",
1294 request.method(),
1295 request.url(),
1296 request.headers()
1297 );
1298
1299 let response = Response::from_string("hello world");
1300 request.respond(response).unwrap();
1301 info!("backend web server sent response");
1302 barrier.wait();
1303 info!("server session stopped");
1304 }
1305
1306 println!("server on port {port} closed");
1307 });
1308 }
1309
1310 #[test]
1311 fn frontend_from_request_test() {
1312 let cluster_id1 = "cluster_1".to_owned();
1313 let cluster_id2 = "cluster_2".to_owned();
1314 let cluster_id3 = "cluster_3".to_owned();
1315 let uri1 = "/".to_owned();
1316 let uri2 = "/yolo".to_owned();
1317 let uri3 = "/yolo/swag".to_owned();
1318
1319 let mut fronts = Router::new();
1320 fronts
1321 .add_http_front(&HttpFrontend {
1322 address: "0.0.0.0:80".parse().unwrap(),
1323 hostname: "lolcatho.st".to_owned(),
1324 method: None,
1325 path: PathRule::prefix(uri1),
1326 position: RulePosition::Tree,
1327 cluster_id: Some(cluster_id1),
1328 tags: None,
1329 })
1330 .expect("Could not add http frontend");
1331 fronts
1332 .add_http_front(&HttpFrontend {
1333 address: "0.0.0.0:80".parse().unwrap(),
1334 hostname: "lolcatho.st".to_owned(),
1335 method: None,
1336 path: PathRule::prefix(uri2),
1337 position: RulePosition::Tree,
1338 cluster_id: Some(cluster_id2),
1339 tags: None,
1340 })
1341 .expect("Could not add http frontend");
1342 fronts
1343 .add_http_front(&HttpFrontend {
1344 address: "0.0.0.0:80".parse().unwrap(),
1345 hostname: "lolcatho.st".to_owned(),
1346 method: None,
1347 path: PathRule::prefix(uri3),
1348 position: RulePosition::Tree,
1349 cluster_id: Some(cluster_id3),
1350 tags: None,
1351 })
1352 .expect("Could not add http frontend");
1353 fronts
1354 .add_http_front(&HttpFrontend {
1355 address: "0.0.0.0:80".parse().unwrap(),
1356 hostname: "other.domain".to_owned(),
1357 method: None,
1358 path: PathRule::prefix("/test".to_owned()),
1359 position: RulePosition::Tree,
1360 cluster_id: Some("cluster_1".to_owned()),
1361 tags: None,
1362 })
1363 .expect("Could not add http frontend");
1364
1365 let address = SocketAddress::new_v4(127, 0, 0, 1, 1030);
1366
1367 let default_config = ListenerBuilder::new_http(address.clone())
1368 .to_http(None)
1369 .expect("Could not create default HTTP listener config");
1370
1371 let listener = HttpListener {
1372 listener: None,
1373 address: address.into(),
1374 fronts,
1375 answers: Rc::new(RefCell::new(
1376 HttpAnswers::new(&Some(CustomHttpAnswers::default())).unwrap(),
1377 )),
1378 config: default_config,
1379 token: Token(0),
1380 active: true,
1381 tags: BTreeMap::new(),
1382 };
1383
1384 let frontend1 = listener.frontend_from_request("lolcatho.st", "/", &Method::Get);
1385 let frontend2 = listener.frontend_from_request("lolcatho.st", "/test", &Method::Get);
1386 let frontend3 = listener.frontend_from_request("lolcatho.st", "/yolo/test", &Method::Get);
1387 let frontend4 = listener.frontend_from_request("lolcatho.st", "/yolo/swag", &Method::Get);
1388 let frontend5 = listener.frontend_from_request("domain", "/", &Method::Get);
1389 assert_eq!(
1390 frontend1.expect("should find frontend"),
1391 Route::ClusterId("cluster_1".to_string())
1392 );
1393 assert_eq!(
1394 frontend2.expect("should find frontend"),
1395 Route::ClusterId("cluster_1".to_string())
1396 );
1397 assert_eq!(
1398 frontend3.expect("should find frontend"),
1399 Route::ClusterId("cluster_2".to_string())
1400 );
1401 assert_eq!(
1402 frontend4.expect("should find frontend"),
1403 Route::ClusterId("cluster_3".to_string())
1404 );
1405 assert!(frontend5.is_err());
1406 }
1407}