1use std::{
3 cell::RefCell,
4 collections::{HashSet, VecDeque},
5 io::Error as IoError,
6 os::unix::io::{AsRawFd, FromRawFd},
7 rc::Rc,
8 time::{Duration, Instant},
9};
10
11use mio::{
12 net::{TcpListener as MioTcpListener, TcpStream},
13 Events, Interest, Poll, Token,
14};
15use slab::Slab;
16
17use sozu_command::{
18 channel::Channel,
19 logging,
20 proto::command::{
21 request::RequestType, response_content::ContentType, ActivateListener, AddBackend,
22 CertificatesWithFingerprints, Cluster, ClusterHashes, ClusterInformations,
23 DeactivateListener, Event, HttpListenerConfig, HttpsListenerConfig, InitialState,
24 ListenerType, LoadBalancingAlgorithms, LoadMetric, MetricsConfiguration, RemoveBackend,
25 Request, ResponseStatus, ServerConfig, TcpListenerConfig as CommandTcpListener,
26 WorkerRequest, WorkerResponse,
27 },
28 ready::Ready,
29 scm_socket::{Listeners, ScmSocket, ScmSocketError},
30 state::ConfigState,
31};
32
33use crate::{
34 backends::{Backend, BackendMap},
35 features::FEATURES,
36 http, https,
37 metrics::METRICS,
38 pool::Pool,
39 tcp,
40 timer::Timer,
41 AcceptError, Protocol, ProxyConfiguration, ProxySession, SessionIsToBeClosed,
42};
43
44pub const CONN_RETRIES: u8 = 3;
46
47pub type ProxyChannel = Channel<WorkerResponse, WorkerRequest>;
48
49thread_local! {
50 pub static QUEUE: RefCell<VecDeque<WorkerResponse>> = const { RefCell::new(VecDeque::new()) };
51}
52
53thread_local! {
54 pub static TIMER: RefCell<Timer<Token>> = RefCell::new(Timer::default());
55}
56
57pub fn push_queue(message: WorkerResponse) {
58 QUEUE.with(|queue| {
59 (*queue.borrow_mut()).push_back(message);
60 });
61}
62
63pub fn push_event(event: Event) {
64 QUEUE.with(|queue| {
65 (*queue.borrow_mut()).push_back(WorkerResponse {
66 id: "EVENT".to_string(),
67 message: String::new(),
68 status: ResponseStatus::Processing.into(),
69 content: Some(ContentType::Event(event).into()),
70 });
71 });
72}
73
74#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
75pub struct ListenToken(pub usize);
76#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
77pub struct SessionToken(pub usize);
78
79impl From<usize> for ListenToken {
80 fn from(val: usize) -> ListenToken {
81 ListenToken(val)
82 }
83}
84
85impl From<ListenToken> for usize {
86 fn from(val: ListenToken) -> usize {
87 val.0
88 }
89}
90
91impl From<usize> for SessionToken {
92 fn from(val: usize) -> SessionToken {
93 SessionToken(val)
94 }
95}
96
97impl From<SessionToken> for usize {
98 fn from(val: SessionToken) -> usize {
99 val.0
100 }
101}
102
103pub struct SessionManager {
104 pub max_connections: usize,
105 pub nb_connections: usize,
106 pub can_accept: bool,
107 pub slab: Slab<Rc<RefCell<dyn ProxySession>>>,
108}
109
110impl SessionManager {
111 pub fn new(
112 slab: Slab<Rc<RefCell<dyn ProxySession>>>,
113 max_connections: usize,
114 ) -> Rc<RefCell<Self>> {
115 Rc::new(RefCell::new(SessionManager {
116 max_connections,
117 nb_connections: 0,
118 can_accept: true,
119 slab,
120 }))
121 }
122
123 pub fn at_capacity(&self) -> bool {
125 self.slab.len() >= 10 + 2 * self.max_connections
126 }
127
128 pub fn check_limits(&mut self) -> bool {
131 if self.nb_connections >= self.max_connections {
132 error!("max number of session connection reached, flushing the accept queue");
133 gauge!("accept_queue.backpressure", 1);
134 self.can_accept = false;
135 return false;
136 }
137
138 if self.at_capacity() {
139 error!("not enough memory to accept another session, flushing the accept queue");
140 error!(
141 "nb_connections: {}, max_connections: {}",
142 self.nb_connections, self.max_connections
143 );
144 gauge!("accept_queue.backpressure", 1);
145 self.can_accept = false;
146
147 return false;
148 }
149
150 true
151 }
152
153 pub fn to_session(token: Token) -> SessionToken {
154 SessionToken(token.0)
155 }
156
157 pub fn incr(&mut self) {
158 self.nb_connections += 1;
159 assert!(self.nb_connections <= self.max_connections);
160 gauge!("client.connections", self.nb_connections);
161 }
162
163 pub fn decr(&mut self) {
166 assert!(self.nb_connections != 0);
167 self.nb_connections -= 1;
168 gauge!("client.connections", self.nb_connections);
169
170 if !self.can_accept && self.nb_connections < self.max_connections * 90 / 100 {
172 debug!(
173 "nb_connections = {}, max_connections = {}, starting to accept again",
174 self.nb_connections, self.max_connections
175 );
176 gauge!("accept_queue.backpressure", 0);
177 self.can_accept = true;
178 }
179 }
180}
181
182#[derive(thiserror::Error, Debug)]
183pub enum ServerError {
184 #[error("could not create event loop with MIO poll: {0}")]
185 CreatePoll(IoError),
186 #[error("could not clone the MIO registry: {0}")]
187 CloneRegistry(IoError),
188 #[error("could not register the channel: {0}")]
189 RegisterChannel(IoError),
190 #[error("{msg}:{scm_err}")]
191 ScmSocket {
192 msg: String,
193 scm_err: ScmSocketError,
194 },
195}
196
197pub struct Server {
214 accept_queue_timeout: Duration,
215 accept_queue: VecDeque<(TcpStream, ListenToken, Protocol, Instant)>,
216 accept_ready: HashSet<ListenToken>,
217 backends: Rc<RefCell<BackendMap>>,
218 base_sessions_count: usize,
219 channel: ProxyChannel,
220 config_state: ConfigState,
221 current_poll_errors: i32,
222 http: Rc<RefCell<http::HttpProxy>>,
223 https: Rc<RefCell<https::HttpsProxy>>,
224 last_sessions_len: usize,
225 last_shutting_down_message: Option<Instant>,
226 last_zombie_check: Instant,
227 loop_start: Instant,
228 max_poll_errors: i32, pub poll: Poll,
230 poll_timeout: Option<Duration>, scm_listeners: Option<Listeners>,
232 scm: ScmSocket,
233 sessions: Rc<RefCell<SessionManager>>,
234 should_poll_at: Option<Instant>,
235 shutting_down: Option<String>,
236 tcp: Rc<RefCell<tcp::TcpProxy>>,
237 zombie_check_interval: Duration,
238}
239
240impl Server {
241 pub fn try_new_from_config(
242 worker_to_main_channel: ProxyChannel,
243 worker_to_main_scm: ScmSocket,
244 config: ServerConfig,
245 initial_state: InitialState,
246 expects_initial_status: bool,
247 ) -> Result<Self, ServerError> {
248 let event_loop = Poll::new().map_err(ServerError::CreatePoll)?;
249 let pool = Rc::new(RefCell::new(Pool::with_capacity(
250 config.min_buffers as usize,
251 config.max_buffers as usize,
252 config.buffer_size as usize,
253 )));
254 let backends = Rc::new(RefCell::new(BackendMap::new()));
255
256 let sessions: Rc<RefCell<SessionManager>> = SessionManager::new(
259 Slab::with_capacity(config.slab_capacity() as usize),
260 config.max_connections as usize,
261 );
262 {
263 let mut s = sessions.borrow_mut();
264 let entry = s.slab.vacant_entry();
265 trace!("taking token {:?} for channel", SessionToken(entry.key()));
266 entry.insert(Rc::new(RefCell::new(ListenSession {
267 protocol: Protocol::Channel,
268 })));
269 }
270 {
271 let mut s = sessions.borrow_mut();
272 let entry = s.slab.vacant_entry();
273 trace!("taking token {:?} for metrics", SessionToken(entry.key()));
274 entry.insert(Rc::new(RefCell::new(ListenSession {
275 protocol: Protocol::Timer,
276 })));
277 }
278 {
279 let mut s = sessions.borrow_mut();
280 let entry = s.slab.vacant_entry();
281 trace!("taking token {:?} for metrics", SessionToken(entry.key()));
282 entry.insert(Rc::new(RefCell::new(ListenSession {
283 protocol: Protocol::Metrics,
284 })));
285 }
286
287 Server::new(
288 event_loop,
289 worker_to_main_channel,
290 worker_to_main_scm,
291 sessions,
292 pool,
293 backends,
294 None,
295 None,
296 None,
297 config,
298 Some(initial_state),
299 expects_initial_status,
300 )
301 }
302
303 #[allow(clippy::too_many_arguments)]
304 pub fn new(
305 poll: Poll,
306 mut channel: ProxyChannel,
307 scm: ScmSocket,
308 sessions: Rc<RefCell<SessionManager>>,
309 pool: Rc<RefCell<Pool>>,
310 backends: Rc<RefCell<BackendMap>>,
311 http: Option<http::HttpProxy>,
312 https: Option<https::HttpsProxy>,
313 tcp: Option<tcp::TcpProxy>,
314 server_config: ServerConfig,
315 initial_state: Option<InitialState>,
316 expects_initial_status: bool,
317 ) -> Result<Self, ServerError> {
318 FEATURES.with(|_features| {
319 });
321
322 poll.registry()
323 .register(
324 &mut channel,
325 Token(0),
326 Interest::READABLE | Interest::WRITABLE,
327 )
328 .map_err(ServerError::RegisterChannel)?;
329
330 METRICS.with(|metrics| {
331 if let Some(sock) = (*metrics.borrow_mut()).socket_mut() {
332 poll.registry()
333 .register(sock, Token(2), Interest::WRITABLE)
334 .expect("should register the metrics socket");
335 }
336 });
337
338 let base_sessions_count = sessions.borrow().slab.len();
339
340 let http = Rc::new(RefCell::new(match http {
341 Some(http) => http,
342 None => {
343 let registry = poll
344 .registry()
345 .try_clone()
346 .map_err(ServerError::CloneRegistry)?;
347
348 http::HttpProxy::new(registry, sessions.clone(), pool.clone(), backends.clone())
349 }
350 }));
351
352 let https = Rc::new(RefCell::new(match https {
353 Some(https) => https,
354 None => {
355 let registry = poll
356 .registry()
357 .try_clone()
358 .map_err(ServerError::CloneRegistry)?;
359
360 https::HttpsProxy::new(registry, sessions.clone(), pool.clone(), backends.clone())
361 }
362 }));
363
364 let tcp = Rc::new(RefCell::new(match tcp {
365 Some(tcp) => tcp,
366 None => {
367 let registry = poll
368 .registry()
369 .try_clone()
370 .map_err(ServerError::CloneRegistry)?;
371
372 tcp::TcpProxy::new(registry, sessions.clone(), pool.clone(), backends.clone())
373 }
374 }));
375
376 let mut server = Server {
377 accept_queue_timeout: Duration::from_secs(u64::from(
378 server_config.accept_queue_timeout,
379 )),
380 accept_queue: VecDeque::new(),
381 accept_ready: HashSet::new(),
382 backends,
383 base_sessions_count,
384 channel,
385 config_state: ConfigState::new(),
386 current_poll_errors: 0,
387 http,
388 https,
389 last_sessions_len: 0, last_shutting_down_message: None,
391 last_zombie_check: Instant::now(), loop_start: Instant::now(), max_poll_errors: 10000, poll_timeout: Some(Duration::from_millis(1000)), poll,
396 scm_listeners: None,
397 scm,
398 sessions,
399 should_poll_at: None,
400 shutting_down: None,
401 tcp,
402 zombie_check_interval: Duration::from_secs(u64::from(
403 server_config.zombie_check_interval,
404 )),
405 };
406
407 if let Some(state) = initial_state {
409 for request in state.requests {
410 trace!("generating initial config request: {:#?}", request);
411 server.notify_proxys(request);
412 }
413
414 QUEUE.with(|queue| {
416 (*queue.borrow_mut()).clear();
417 });
418 }
419
420 if expects_initial_status {
421 server.block_channel();
424 let msg = server.channel.read_message();
425 debug!("got message: {:?}", msg);
426
427 if let Ok(WorkerRequest {
428 id,
429 content:
430 Request {
431 request_type: Some(RequestType::Status(_)),
432 },
433 }) = msg
434 {
435 if let Err(e) = server.channel.write_message(&WorkerResponse::ok(id)) {
436 error!("Could not send an ok to the main process: {}", e);
437 }
438 } else {
439 panic!("plz give me a status request first when I start, you sent me this instead: {:?}", msg);
440 }
441 server.unblock_channel();
442 }
443
444 info!("will try to receive listeners");
445 server
446 .scm
447 .set_blocking(true)
448 .map_err(|scm_err| ServerError::ScmSocket {
449 msg: "Could not set the scm socket to blocking".to_string(),
450 scm_err,
451 })?;
452 let listeners =
453 server
454 .scm
455 .receive_listeners()
456 .map_err(|scm_err| ServerError::ScmSocket {
457 msg: "could not receive listeners from the scm socket".to_string(),
458 scm_err,
459 })?;
460 server
461 .scm
462 .set_blocking(false)
463 .map_err(|scm_err| ServerError::ScmSocket {
464 msg: "Could not set the scm socket to unblocking".to_string(),
465 scm_err,
466 })?;
467 info!("received listeners: {:?}", listeners);
468 server.scm_listeners = Some(listeners);
469
470 Ok(server)
471 }
472
473 pub fn run(&mut self) {
475 let mut events = Events::with_capacity(1024); self.last_sessions_len = self.sessions.borrow().slab.len();
477
478 self.last_zombie_check = Instant::now();
479 self.loop_start = Instant::now();
480
481 loop {
482 self.check_for_poll_errors();
483
484 let timeout = self.reset_loop_time_and_get_timeout();
485
486 match self.poll.poll(&mut events, timeout) {
487 Ok(_) => self.current_poll_errors = 0,
488 Err(error) => {
489 error!("Error while polling events: {:?}", error);
490 self.current_poll_errors += 1;
491 continue;
492 }
493 }
494
495 let after_epoll = Instant::now();
496 time!("epoll_time", (after_epoll - self.loop_start).as_millis());
497 self.loop_start = after_epoll;
498
499 self.send_queue();
500
501 for event in events.iter() {
502 match event.token() {
503 Token(0) => {
505 if event.is_error() {
506 error!("error reading from command channel");
507 continue;
508 }
509 if event.is_read_closed() || event.is_write_closed() {
510 error!("command channel was closed");
511 return;
512 }
513 let ready = Ready::from(event);
514 self.channel.handle_events(ready);
515
516 loop {
518 QUEUE.with(|queue| {
519 if !(*queue.borrow()).is_empty() {
520 self.channel.interest.insert(Ready::WRITABLE);
521 }
522 });
523
524 if self.channel.readiness() == Ready::EMPTY {
527 break;
528 }
529
530 if self.read_channel_messages_and_notify() {
532 return;
533 }
534
535 QUEUE.with(|queue| {
536 if !(*queue.borrow()).is_empty() {
537 self.channel.interest.insert(Ready::WRITABLE);
538 }
539 });
540
541 self.send_queue();
542 }
543 }
544 Token(1) => {
546 while let Some(t) = TIMER.with(|timer| timer.borrow_mut().poll()) {
547 self.timeout(t);
548 }
549 }
550 Token(2) => METRICS.with(|metrics| {
552 (*metrics.borrow_mut()).writable();
553 }),
554 token => self.ready(token, Ready::from(event)),
557 }
558 }
559
560 if let Some(t) = self.should_poll_at.as_ref() {
561 if *t <= Instant::now() {
562 while let Some(t) = TIMER.with(|timer| timer.borrow_mut().poll()) {
563 self.timeout(t);
565 }
566 }
567 }
568 self.handle_remaining_readiness();
569 self.create_sessions();
570
571 self.should_poll_at = TIMER.with(|timer| timer.borrow().next_poll_date());
572
573 self.zombie_check();
574
575 let now = time::OffsetDateTime::now_utc();
576 if now.minute() == 00 && now.second() == 0 {
579 METRICS.with(|metrics| {
580 (*metrics.borrow_mut()).clear_local();
581 });
582 }
583
584 gauge!("client.connections", self.sessions.borrow().nb_connections);
585 gauge!("slab.entries", self.sessions.borrow().slab.len());
586 METRICS.with(|metrics| {
587 (*metrics.borrow_mut()).send_data();
588 });
589
590 if self.shutting_down.is_some() && self.shut_down_sessions() {
591 return;
592 }
593 }
594 }
595
596 fn check_for_poll_errors(&mut self) {
597 if self.current_poll_errors >= self.max_poll_errors {
598 error!(
599 "Something is going very wrong. Last {} poll() calls failed, crashing..",
600 self.current_poll_errors
601 );
602 panic!(
603 "poll() calls failed {} times in a row",
604 self.current_poll_errors
605 );
606 }
607 }
608
609 fn reset_loop_time_and_get_timeout(&mut self) -> Option<Duration> {
610 let now = Instant::now();
611 time!("event_loop_time", (now - self.loop_start).as_millis());
612
613 let timeout = match self.should_poll_at.as_ref() {
614 None => self.poll_timeout,
615 Some(i) => {
616 if *i <= now {
617 self.poll_timeout
618 } else {
619 let dur = *i - now;
620 match self.poll_timeout {
621 None => Some(dur),
622 Some(t) => {
623 if t < dur {
624 Some(t)
625 } else {
626 Some(dur)
627 }
628 }
629 }
630 }
631 }
632 };
633
634 self.loop_start = now;
635 timeout
636 }
637
638 fn read_channel_messages_and_notify(&mut self) -> bool {
640 if !self.channel.readiness().is_readable() {
641 return false;
642 }
643
644 if let Err(e) = self.channel.readable() {
645 error!("error reading from channel: {:?}", e);
646 }
647
648 loop {
649 let request = self.channel.read_message();
650 debug!("Received request {:?}", request);
651 match request {
652 Ok(request) => match request.content.request_type {
653 Some(RequestType::HardStop(_)) => {
654 let req_id = request.id.clone();
655 self.notify(request);
656 if let Err(e) = self.channel.write_message(&WorkerResponse::ok(req_id)) {
657 error!("Could not send ok response to the main process: {}", e);
658 }
659 if let Err(e) = self.channel.run() {
660 error!("Error while running the server channel: {}", e);
661 }
662 return true;
663 }
664 Some(RequestType::SoftStop(_)) => {
665 self.shutting_down = Some(request.id.clone());
666 self.last_sessions_len = self.sessions.borrow().slab.len();
667 self.notify(request);
668 }
669 Some(RequestType::ReturnListenSockets(_)) => {
670 info!("received ReturnListenSockets order");
671 match self.return_listen_sockets() {
672 Ok(_) => push_queue(WorkerResponse::ok(request.id)),
673 Err(error) => push_queue(worker_response_error(
674 request.id,
675 format!("Could not send listeners on scm socket: {error:?}"),
676 )),
677 }
678 }
679 _ => self.notify(request),
680 },
681 Err(_) => {
683 if (self.channel.interest & self.channel.readiness).is_readable() {
685 if let Err(e) = self.channel.readable() {
686 error!("error reading from channel: {:?}", e);
687 }
688 continue;
689 }
690 break;
691 }
692 }
693 }
694 false
695 }
696
697 fn zombie_check(&mut self) {
699 let now = Instant::now();
700 if now - self.last_zombie_check < self.zombie_check_interval {
701 return;
702 }
703 info!("zombie check");
704 self.last_zombie_check = now;
705
706 let mut zombie_tokens = HashSet::new();
707
708 for (_index, session) in self
710 .sessions
711 .borrow_mut()
712 .slab
713 .iter_mut()
714 .filter(|(_, c)| now - c.borrow().last_event() > self.zombie_check_interval)
715 {
716 let session_token = session.borrow().frontend_token();
717 if !zombie_tokens.contains(&session_token) {
718 session.borrow().print_session();
719 zombie_tokens.insert(session_token);
720 }
721 }
722
723 let zombie_count = zombie_tokens.len() as i64;
724 count!("zombies", zombie_count);
725
726 let remaining_count = self.shut_down_sessions_by_frontend_tokens(zombie_tokens);
727 info!(
728 "removing {} zombies ({} remaining entries after close)",
729 zombie_count, remaining_count
730 );
731 }
732
733 fn shut_down_sessions_by_frontend_tokens(&self, tokens: HashSet<Token>) -> usize {
736 if tokens.is_empty() {
737 return 0;
738 }
739
740 for token in &tokens {
742 if self.sessions.borrow().slab.contains(token.0) {
743 let session = { self.sessions.borrow_mut().slab.remove(token.0) };
744 session.borrow_mut().close();
745 self.sessions.borrow_mut().decr();
746 }
747 }
748
749 let mut dangling_entries = HashSet::new();
751 for (entry_key, session) in &self.sessions.borrow().slab {
752 if tokens.contains(&session.borrow().frontend_token()) {
753 dangling_entries.insert(entry_key);
754 }
755 }
756
757 let mut dangling_entries_count = 0;
759 for entry_key in dangling_entries {
760 let mut sessions = self.sessions.borrow_mut();
761 if sessions.slab.contains(entry_key) {
762 sessions.slab.remove(entry_key);
763 dangling_entries_count += 1;
764 }
765 }
766 dangling_entries_count
767 }
768
769 fn shut_down_sessions(&mut self) -> bool {
771 let sessions_count = self.sessions.borrow().slab.len();
772 let mut sessions_to_shut_down = HashSet::new();
773
774 for (_key, session) in &self.sessions.borrow().slab {
775 if session.borrow_mut().shutting_down() {
776 sessions_to_shut_down.insert(Token(session.borrow().frontend_token().0));
777 }
778 }
779 let _ = self.shut_down_sessions_by_frontend_tokens(sessions_to_shut_down);
780
781 let new_sessions_count = self.sessions.borrow().slab.len();
782
783 if new_sessions_count < sessions_count {
784 let now = Instant::now();
785 if let Some(last) = self.last_shutting_down_message {
786 if (now - last) > Duration::from_secs(5) {
787 info!(
788 "closed {} sessions, {} sessions left, base_sessions_count = {}",
789 sessions_count - new_sessions_count,
790 new_sessions_count,
791 self.base_sessions_count
792 );
793 }
794 }
795 self.last_shutting_down_message = Some(now);
796 }
797
798 if new_sessions_count <= self.base_sessions_count {
799 info!("last session stopped, shutting down!");
800 if let Err(e) = self.channel.run() {
801 error!("Error while running the server channel: {}", e);
802 }
803 let id = self
805 .shutting_down
806 .take()
807 .expect("should have shut down correctly"); debug!("Responding OK to main process for request {}", id);
810
811 let proxy_response = WorkerResponse::ok(id);
812 if let Err(e) = self.channel.write_message(&proxy_response) {
813 error!("Could not write response to the main process: {}", e);
814 }
815 if let Err(e) = self.channel.run() {
816 error!("Error while running the server channel: {}", e);
817 }
818 return true;
819 }
820
821 if new_sessions_count < self.last_sessions_len {
822 info!(
823 "shutting down, {} slab elements remaining (base: {})",
824 new_sessions_count - self.base_sessions_count,
825 self.base_sessions_count
826 );
827 self.last_sessions_len = new_sessions_count;
828 }
829
830 false
831 }
832
833 fn kill_session(&self, session: Rc<RefCell<dyn ProxySession>>) {
834 let token = session.borrow().frontend_token();
835 let _ = self.shut_down_sessions_by_frontend_tokens(HashSet::from([token]));
836 }
837
838 fn send_queue(&mut self) {
839 if self.channel.readiness.is_writable() {
840 QUEUE.with(|q| {
841 let mut queue = q.borrow_mut();
842 loop {
843 if let Some(resp) = queue.pop_front() {
844 debug!("Sending response {:?}", resp);
845 if let Err(e) = self.channel.write_message(&resp) {
846 error!("Could not write message {} on the channel: {}", resp, e);
847 queue.push_front(resp);
848 }
849 }
850
851 if self.channel.back_buf.available_data() > 0 {
852 if let Err(e) = self.channel.writable() {
853 error!("error writing to channel: {:?}", e);
854 }
855 }
856
857 if !self.channel.readiness.is_writable() {
858 break;
859 }
860
861 if self.channel.back_buf.available_data() == 0 && queue.len() == 0 {
862 break;
863 }
864 }
865 });
866 }
867 }
868
869 fn notify(&mut self, message: WorkerRequest) {
870 match &message.content.request_type {
871 Some(RequestType::ConfigureMetrics(configuration)) => {
872 match MetricsConfiguration::try_from(*configuration) {
873 Ok(metrics_config) => {
874 METRICS.with(|metrics| {
875 (*metrics.borrow_mut()).configure(&metrics_config);
876 push_queue(WorkerResponse::ok(message.id));
877 });
878 }
879 Err(e) => {
880 error!("Error configuring metrics: {}", e);
881 push_queue(WorkerResponse::error(message.id, e));
882 }
883 }
884 return;
885 }
886 Some(RequestType::QueryMetrics(query_metrics_options)) => {
887 METRICS.with(|metrics| {
888 match (*metrics.borrow_mut()).query(query_metrics_options) {
889 Ok(c) => push_queue(WorkerResponse::ok_with_content(message.id, c)),
890 Err(e) => {
891 error!("Error querying metrics: {}", e);
892 push_queue(WorkerResponse::error(message.id, e))
893 }
894 }
895 });
896 return;
897 }
898 Some(RequestType::Logging(logging_filter)) => {
899 info!(
900 "{} changing logging filter to {}",
901 message.id, logging_filter
902 );
903 let (directives, _errors) = logging::parse_logging_spec(logging_filter);
905 logging::LOGGER.with(|logger| {
906 logger.borrow_mut().set_directives(directives);
907 });
908 push_queue(WorkerResponse::ok(message.id));
909 return;
910 }
911 Some(RequestType::QueryClustersHashes(_)) => {
912 push_queue(WorkerResponse::ok_with_content(
913 message.id.clone(),
914 ContentType::ClusterHashes(ClusterHashes {
915 map: self.config_state.hash_state(),
916 })
917 .into(),
918 ));
919 return;
920 }
921 Some(RequestType::QueryClusterById(cluster_id)) => {
922 push_queue(WorkerResponse::ok_with_content(
923 message.id.clone(),
924 ContentType::Clusters(ClusterInformations {
925 vec: self
926 .config_state
927 .cluster_state(cluster_id)
928 .map_or(vec![], |ci| vec![ci]),
929 })
930 .into(),
931 ));
932 }
933 Some(RequestType::QueryClustersByDomain(domain)) => {
934 let cluster_ids = self
935 .config_state
936 .get_cluster_ids_by_domain(domain.hostname.clone(), domain.path.clone());
937 let vec = cluster_ids
938 .iter()
939 .filter_map(|cluster_id| self.config_state.cluster_state(cluster_id))
940 .collect();
941
942 push_queue(WorkerResponse::ok_with_content(
943 message.id.clone(),
944 ContentType::Clusters(ClusterInformations { vec }).into(),
945 ));
946 return;
947 }
948 Some(RequestType::QueryCertificatesFromWorkers(filters)) => {
949 if filters.fingerprint.is_some() {
950 let certs = self.config_state.get_certificates(filters.clone());
951 let response = if !certs.is_empty() {
952 WorkerResponse::ok_with_content(
953 message.id.clone(),
954 ContentType::CertificatesWithFingerprints(
955 CertificatesWithFingerprints { certs },
956 )
957 .into(),
958 )
959 } else {
960 worker_response_error(
961 message.id.clone(),
962 "Could not find certificate for this fingerprint",
963 )
964 };
965 push_queue(response);
966 return;
967 }
968 }
971 _other_request => {}
972 }
973 self.notify_proxys(message);
974 }
975
976 pub fn notify_proxys(&mut self, request: WorkerRequest) {
977 if let Err(e) = self.config_state.dispatch(&request.content) {
978 error!("Could not execute order on config state: {}", e);
979 }
980
981 let req_id = request.id.clone();
982
983 match request.content.request_type {
984 Some(RequestType::AddCluster(ref cluster)) => {
985 self.add_cluster(cluster);
986 }
988 Some(RequestType::AddBackend(ref backend)) => {
989 push_queue(self.add_backend(&req_id, backend));
990 return;
991 }
992 Some(RequestType::RemoveBackend(ref remove_backend)) => {
993 push_queue(self.remove_backend(&req_id, remove_backend));
994 return;
995 }
996 _ => {}
997 };
998
999 let proxy_destinations = request.content.get_destinations();
1000 let mut notify_response = None;
1001 if proxy_destinations.to_http_proxy {
1002 notify_response = Some(self.http.borrow_mut().notify(request.clone()));
1003 }
1004 if proxy_destinations.to_https_proxy {
1005 let http_proxy_response = self.https.borrow_mut().notify(request.clone());
1006 if http_proxy_response.is_failure() || notify_response.is_none() {
1007 notify_response = Some(http_proxy_response);
1008 }
1009 }
1010 if proxy_destinations.to_tcp_proxy {
1011 let tcp_proxy_response = self.tcp.borrow_mut().notify(request.clone());
1012 if tcp_proxy_response.is_failure() || notify_response.is_none() {
1013 notify_response = Some(tcp_proxy_response);
1014 }
1015 }
1016 if let Some(response) = notify_response {
1017 push_queue(response);
1018 }
1019
1020 match request.content.request_type {
1021 Some(RequestType::AddHttpListener(listener)) => {
1023 push_queue(self.notify_add_http_listener(&req_id, listener));
1024 }
1025 Some(RequestType::AddHttpsListener(listener)) => {
1026 push_queue(self.notify_add_https_listener(&req_id, listener));
1027 }
1028 Some(RequestType::AddTcpListener(listener)) => {
1029 push_queue(self.notify_add_tcp_listener(&req_id, listener));
1030 }
1031 Some(RequestType::RemoveListener(ref remove)) => {
1032 debug!("{} remove {:?} listener {:?}", req_id, remove.proxy, remove);
1033 self.base_sessions_count -= 1;
1034 let response = match ListenerType::try_from(remove.proxy) {
1035 Ok(ListenerType::Http) => self.http.borrow_mut().notify(request),
1036 Ok(ListenerType::Https) => self.https.borrow_mut().notify(request),
1037 Ok(ListenerType::Tcp) => self.tcp.borrow_mut().notify(request),
1038 Err(_) => WorkerResponse::error(req_id, "Wrong variant ListenerType"),
1039 };
1040 push_queue(response);
1041 }
1042 Some(RequestType::ActivateListener(ref activate)) => {
1043 push_queue(self.notify_activate_listener(&req_id, activate));
1044 }
1045 Some(RequestType::DeactivateListener(ref deactivate)) => {
1046 push_queue(self.notify_deactivate_listener(&req_id, deactivate));
1047 }
1048 _other_request => {}
1049 };
1050 }
1051
1052 fn add_cluster(&mut self, cluster: &Cluster) {
1053 self.backends
1054 .borrow_mut()
1055 .set_load_balancing_policy_for_cluster(
1056 &cluster.cluster_id,
1057 LoadBalancingAlgorithms::try_from(cluster.load_balancing).unwrap_or_default(),
1058 cluster
1059 .load_metric
1060 .and_then(|n| LoadMetric::try_from(n).ok()),
1061 );
1062 }
1063
1064 fn add_backend(&mut self, req_id: &str, add_backend: &AddBackend) -> WorkerResponse {
1065 let new_backend = Backend::new(
1066 &add_backend.backend_id,
1067 add_backend.address.into(),
1068 add_backend.sticky_id.clone(),
1069 add_backend.load_balancing_parameters,
1070 add_backend.backup,
1071 );
1072 self.backends
1073 .borrow_mut()
1074 .add_backend(&add_backend.cluster_id, new_backend);
1075
1076 WorkerResponse::ok(req_id)
1077 }
1078
1079 fn remove_backend(&mut self, req_id: &str, backend: &RemoveBackend) -> WorkerResponse {
1080 let address = backend.address.into();
1081 self.backends
1082 .borrow_mut()
1083 .remove_backend(&backend.cluster_id, &address);
1084
1085 WorkerResponse::ok(req_id)
1086 }
1087
1088 fn notify_add_http_listener(
1089 &mut self,
1090 req_id: &str,
1091 listener: HttpListenerConfig,
1092 ) -> WorkerResponse {
1093 debug!("{} add http listener {:?}", req_id, listener);
1094
1095 if self.sessions.borrow().at_capacity() {
1096 return worker_response_error(req_id, "session list is full, cannot add a listener");
1097 }
1098
1099 let mut session_manager = self.sessions.borrow_mut();
1100 let entry = session_manager.slab.vacant_entry();
1101 let token = Token(entry.key());
1102
1103 match self.http.borrow_mut().add_listener(listener, token) {
1104 Ok(_token) => {
1105 entry.insert(Rc::new(RefCell::new(ListenSession {
1106 protocol: Protocol::HTTPListen,
1107 })));
1108 self.base_sessions_count += 1;
1109 WorkerResponse::ok(req_id)
1110 }
1111 Err(e) => worker_response_error(req_id, format!("Could not add HTTP listener: {e}")),
1112 }
1113 }
1114
1115 fn notify_add_https_listener(
1116 &mut self,
1117 req_id: &str,
1118 listener: HttpsListenerConfig,
1119 ) -> WorkerResponse {
1120 debug!("{} add https listener {:?}", req_id, listener);
1121
1122 if self.sessions.borrow().at_capacity() {
1123 return worker_response_error(req_id, "session list is full, cannot add a listener");
1124 }
1125
1126 let mut session_manager = self.sessions.borrow_mut();
1127 let entry = session_manager.slab.vacant_entry();
1128 let token = Token(entry.key());
1129
1130 match self
1131 .https
1132 .borrow_mut()
1133 .add_listener(listener.clone(), token)
1134 {
1135 Ok(_token) => {
1136 entry.insert(Rc::new(RefCell::new(ListenSession {
1137 protocol: Protocol::HTTPSListen,
1138 })));
1139 self.base_sessions_count += 1;
1140 WorkerResponse::ok(req_id)
1141 }
1142 Err(e) => worker_response_error(req_id, format!("Could not add HTTPS listener: {e}")),
1143 }
1144 }
1145
1146 fn notify_add_tcp_listener(
1147 &mut self,
1148 req_id: &str,
1149 listener: CommandTcpListener,
1150 ) -> WorkerResponse {
1151 debug!("{} add tcp listener {:?}", req_id, listener);
1152
1153 if self.sessions.borrow().at_capacity() {
1154 return worker_response_error(req_id, "session list is full, cannot add a listener");
1155 }
1156
1157 let mut session_manager = self.sessions.borrow_mut();
1158 let entry = session_manager.slab.vacant_entry();
1159 let token = Token(entry.key());
1160
1161 match self.tcp.borrow_mut().add_listener(listener, token) {
1162 Ok(_token) => {
1163 entry.insert(Rc::new(RefCell::new(ListenSession {
1164 protocol: Protocol::TCPListen,
1165 })));
1166 self.base_sessions_count += 1;
1167 WorkerResponse::ok(req_id)
1168 }
1169 Err(e) => worker_response_error(req_id, format!("Could not add TCP listener: {e}")),
1170 }
1171 }
1172
1173 fn notify_activate_listener(
1174 &mut self,
1175 req_id: &str,
1176 activate: &ActivateListener,
1177 ) -> WorkerResponse {
1178 debug!(
1179 "{} activate {:?} listener {:?}",
1180 req_id, activate.proxy, activate
1181 );
1182
1183 let address: std::net::SocketAddr = activate.address.into();
1184
1185 match ListenerType::try_from(activate.proxy) {
1186 Ok(ListenerType::Http) => {
1187 let listener = self
1188 .scm_listeners
1189 .as_mut()
1190 .and_then(|s| s.get_http(&address))
1191 .map(|fd| unsafe { MioTcpListener::from_raw_fd(fd) });
1192
1193 let activated_token = self.http.borrow_mut().activate_listener(&address, listener);
1194 match activated_token {
1195 Ok(token) => {
1196 self.accept(ListenToken(token.0), Protocol::HTTPListen);
1197 WorkerResponse::ok(req_id)
1198 }
1199 Err(activate_error) => worker_response_error(
1200 req_id,
1201 format!("Could not activate HTTP listener: {}", activate_error),
1202 ),
1203 }
1204 }
1205 Ok(ListenerType::Https) => {
1206 let listener = self
1207 .scm_listeners
1208 .as_mut()
1209 .and_then(|s| s.get_https(&address))
1210 .map(|fd| unsafe { MioTcpListener::from_raw_fd(fd) });
1211
1212 let activated_token = self
1213 .https
1214 .borrow_mut()
1215 .activate_listener(&address, listener);
1216 match activated_token {
1217 Ok(token) => {
1218 self.accept(ListenToken(token.0), Protocol::HTTPSListen);
1219 WorkerResponse::ok(req_id)
1220 }
1221 Err(activate_error) => worker_response_error(
1222 req_id,
1223 format!("Could not activate HTTPS listener: {}", activate_error),
1224 ),
1225 }
1226 }
1227 Ok(ListenerType::Tcp) => {
1228 let listener = self
1229 .scm_listeners
1230 .as_mut()
1231 .and_then(|s| s.get_tcp(&address))
1232 .map(|fd| unsafe { MioTcpListener::from_raw_fd(fd) });
1233
1234 let listener_token = self.tcp.borrow_mut().activate_listener(&address, listener);
1235 match listener_token {
1236 Ok(token) => {
1237 self.accept(ListenToken(token.0), Protocol::TCPListen);
1238 WorkerResponse::ok(req_id)
1239 }
1240 Err(activate_error) => worker_response_error(
1241 req_id,
1242 format!("Could not activate TCP listener: {}", activate_error),
1243 ),
1244 }
1245 }
1246 Err(_) => worker_response_error(req_id, "Wrong variant for ListenerType on request"),
1247 }
1248 }
1249
1250 fn notify_deactivate_listener(
1251 &mut self,
1252 req_id: &str,
1253 deactivate: &DeactivateListener,
1254 ) -> WorkerResponse {
1255 debug!(
1256 "{} deactivate {:?} listener {:?}",
1257 req_id, deactivate.proxy, deactivate
1258 );
1259
1260 let address: std::net::SocketAddr = deactivate.address.into();
1261
1262 match ListenerType::try_from(deactivate.proxy) {
1263 Ok(ListenerType::Http) => {
1264 let (token, mut listener) = match self.http.borrow_mut().give_back_listener(address)
1265 {
1266 Ok((token, listener)) => (token, listener),
1267 Err(e) => {
1268 return worker_response_error(
1269 req_id,
1270 format!(
1271 "Couldn't deactivate HTTP listener at address {address:?}: {e}"
1272 ),
1273 )
1274 }
1275 };
1276
1277 if let Err(e) = self.poll.registry().deregister(&mut listener) {
1278 error!(
1279 "error deregistering HTTP listen socket({:?}): {:?}",
1280 deactivate, e
1281 );
1282 }
1283
1284 {
1285 let mut sessions = self.sessions.borrow_mut();
1286 if sessions.slab.contains(token.0) {
1287 sessions.slab.remove(token.0);
1288 info!("removed listen token {:?}", token);
1289 }
1290 }
1291
1292 if deactivate.to_scm {
1293 self.unblock_scm_socket();
1294 let listeners = Listeners {
1295 http: vec![(address, listener.as_raw_fd())],
1296 tls: vec![],
1297 tcp: vec![],
1298 };
1299 info!("sending HTTP listener: {:?}", listeners);
1300 let res = self.scm.send_listeners(&listeners);
1301
1302 self.block_scm_socket();
1303
1304 info!("sent HTTP listener: {:?}", res);
1305 }
1306 WorkerResponse::ok(req_id)
1307 }
1308 Ok(ListenerType::Https) => {
1309 let (token, mut listener) =
1310 match self.https.borrow_mut().give_back_listener(address) {
1311 Ok((token, listener)) => (token, listener),
1312 Err(e) => {
1313 return worker_response_error(
1314 req_id,
1315 format!(
1316 "Couldn't deactivate HTTPS listener at address {address:?}: {e}",
1317 ),
1318 )
1319 }
1320 };
1321 if let Err(e) = self.poll.registry().deregister(&mut listener) {
1322 error!(
1323 "error deregistering HTTPS listen socket({:?}): {:?}",
1324 deactivate, e
1325 );
1326 }
1327 if self.sessions.borrow().slab.contains(token.0) {
1328 self.sessions.borrow_mut().slab.remove(token.0);
1329 info!("removed listen token {:?}", token);
1330 }
1331
1332 if deactivate.to_scm {
1333 self.unblock_scm_socket();
1334 let listeners = Listeners {
1335 http: vec![],
1336 tls: vec![(address, listener.as_raw_fd())],
1337 tcp: vec![],
1338 };
1339 info!("sending HTTPS listener: {:?}", listeners);
1340 let res = self.scm.send_listeners(&listeners);
1341
1342 self.block_scm_socket();
1343
1344 info!("sent HTTPS listener: {:?}", res);
1345 }
1346 WorkerResponse::ok(req_id)
1347 }
1348 Ok(ListenerType::Tcp) => {
1349 let (token, mut listener) = match self.tcp.borrow_mut().give_back_listener(address)
1350 {
1351 Ok((token, listener)) => (token, listener),
1352 Err(e) => {
1353 return worker_response_error(
1354 req_id,
1355 format!(
1356 "Could not deactivate TCP listener at address {:?}: {}",
1357 address, e
1358 ),
1359 )
1360 }
1361 };
1362
1363 if let Err(e) = self.poll.registry().deregister(&mut listener) {
1364 error!(
1365 "error deregistering TCP listen socket({:?}): {:?}",
1366 deactivate, e
1367 );
1368 }
1369 if self.sessions.borrow().slab.contains(token.0) {
1370 self.sessions.borrow_mut().slab.remove(token.0);
1371 info!("removed listen token {:?}", token);
1372 }
1373
1374 if deactivate.to_scm {
1375 self.unblock_scm_socket();
1376 let listeners = Listeners {
1377 http: vec![],
1378 tls: vec![],
1379 tcp: vec![(address, listener.as_raw_fd())],
1380 };
1381 info!("sending TCP listener: {:?}", listeners);
1382 let res = self.scm.send_listeners(&listeners);
1383
1384 self.block_scm_socket();
1385
1386 info!("sent TCP listener: {:?}", res);
1387 }
1388 WorkerResponse::ok(req_id)
1389 }
1390 Err(_) => worker_response_error(req_id, "Wrong variant for ListenerType on request"),
1391 }
1392 }
1393
1394 pub fn return_listen_sockets(&mut self) -> Result<(), ScmSocketError> {
1396 self.unblock_scm_socket();
1397
1398 let mut http_listeners = self.http.borrow_mut().give_back_listeners();
1399 for &mut (_, ref mut sock) in http_listeners.iter_mut() {
1400 if let Err(e) = self.poll.registry().deregister(sock) {
1401 error!(
1402 "error deregistering HTTP listen socket({:?}): {:?}",
1403 sock, e
1404 );
1405 }
1406 }
1407
1408 let mut https_listeners = self.https.borrow_mut().give_back_listeners();
1409 for &mut (_, ref mut sock) in https_listeners.iter_mut() {
1410 if let Err(e) = self.poll.registry().deregister(sock) {
1411 error!(
1412 "error deregistering HTTPS listen socket({:?}): {:?}",
1413 sock, e
1414 );
1415 }
1416 }
1417
1418 let mut tcp_listeners = self.tcp.borrow_mut().give_back_listeners();
1419 for &mut (_, ref mut sock) in tcp_listeners.iter_mut() {
1420 if let Err(e) = self.poll.registry().deregister(sock) {
1421 error!("error deregistering TCP listen socket({:?}): {:?}", sock, e);
1422 }
1423 }
1424
1425 let listeners = Listeners {
1427 http: http_listeners
1428 .iter()
1429 .map(|(addr, listener)| (*addr, listener.as_raw_fd()))
1430 .collect(),
1431 tls: https_listeners
1432 .iter()
1433 .map(|(addr, listener)| (*addr, listener.as_raw_fd()))
1434 .collect(),
1435 tcp: tcp_listeners
1436 .iter()
1437 .map(|(addr, listener)| (*addr, listener.as_raw_fd()))
1438 .collect(),
1439 };
1440 info!("sending default listeners: {:?}", listeners);
1441 let res = self.scm.send_listeners(&listeners);
1442
1443 self.block_scm_socket();
1444
1445 info!("sent default listeners: {:?}", res);
1446 res
1447 }
1448
1449 fn block_scm_socket(&mut self) {
1450 if let Err(e) = self.scm.set_blocking(true) {
1451 error!("Could not block scm socket: {}", e);
1452 }
1453 }
1454
1455 fn unblock_scm_socket(&mut self) {
1456 if let Err(e) = self.scm.set_blocking(false) {
1457 error!("Could not unblock scm socket: {}", e);
1458 }
1459 }
1460
1461 pub fn to_session(&self, token: Token) -> SessionToken {
1462 SessionToken(token.0)
1463 }
1464
1465 pub fn from_session(&self, token: SessionToken) -> Token {
1466 Token(token.0)
1467 }
1468
1469 pub fn accept(&mut self, token: ListenToken, protocol: Protocol) {
1470 match protocol {
1471 Protocol::TCPListen => loop {
1472 match self.tcp.borrow_mut().accept(token) {
1473 Ok(sock) => self.accept_queue.push_back((
1474 sock,
1475 token,
1476 Protocol::TCPListen,
1477 Instant::now(),
1478 )),
1479 Err(AcceptError::WouldBlock) => {
1480 self.accept_ready.remove(&token);
1481 break;
1482 }
1483 Err(other) => {
1484 error!("error accepting TCP sockets: {:?}", other);
1485 self.accept_ready.remove(&token);
1486 break;
1487 }
1488 }
1489 },
1490 Protocol::HTTPListen => loop {
1491 match self.http.borrow_mut().accept(token) {
1492 Ok(sock) => self.accept_queue.push_back((
1493 sock,
1494 token,
1495 Protocol::HTTPListen,
1496 Instant::now(),
1497 )),
1498 Err(AcceptError::WouldBlock) => {
1499 self.accept_ready.remove(&token);
1500 break;
1501 }
1502 Err(other) => {
1503 error!("error accepting HTTP sockets: {:?}", other);
1504 self.accept_ready.remove(&token);
1505 break;
1506 }
1507 }
1508 },
1509 Protocol::HTTPSListen => loop {
1510 match self.https.borrow_mut().accept(token) {
1511 Ok(sock) => self.accept_queue.push_back((
1512 sock,
1513 token,
1514 Protocol::HTTPSListen,
1515 Instant::now(),
1516 )),
1517 Err(AcceptError::WouldBlock) => {
1518 self.accept_ready.remove(&token);
1519 break;
1520 }
1521 Err(other) => {
1522 error!("error accepting HTTPS sockets: {:?}", other);
1523 self.accept_ready.remove(&token);
1524 break;
1525 }
1526 }
1527 },
1528 _ => panic!("should not call accept() on a HTTP, HTTPS or TCP session"),
1529 }
1530
1531 gauge!("accept_queue.connections", self.accept_queue.len());
1532 }
1533
1534 pub fn create_sessions(&mut self) {
1535 while let Some((sock, token, protocol, timestamp)) = self.accept_queue.pop_back() {
1536 let wait_time = Instant::now() - timestamp;
1537 time!("accept_queue.wait_time", wait_time.as_millis());
1538 if wait_time > self.accept_queue_timeout {
1539 incr!("accept_queue.timeout");
1540 continue;
1541 }
1542
1543 if !self.sessions.borrow_mut().check_limits() {
1544 break;
1545 }
1546
1547 match protocol {
1551 Protocol::TCPListen => {
1552 let proxy = self.tcp.clone();
1553 if self
1554 .tcp
1555 .borrow_mut()
1556 .create_session(sock, token, wait_time, proxy)
1557 .is_err()
1558 {
1559 break;
1560 }
1561 }
1562 Protocol::HTTPListen => {
1563 let proxy = self.http.clone();
1564 if self
1565 .http
1566 .borrow_mut()
1567 .create_session(sock, token, wait_time, proxy)
1568 .is_err()
1569 {
1570 break;
1571 }
1572 }
1573 Protocol::HTTPSListen => {
1574 if self
1575 .https
1576 .borrow_mut()
1577 .create_session(sock, token, wait_time, self.https.clone())
1578 .is_err()
1579 {
1580 break;
1581 }
1582 }
1583 _ => panic!("should not call accept() on a HTTP, HTTPS or TCP session"),
1584 };
1585 self.sessions.borrow_mut().incr();
1586 }
1587
1588 gauge!("accept_queue.connections", self.accept_queue.len());
1589 }
1590
1591 pub fn ready(&mut self, token: Token, events: Ready) {
1592 trace!("PROXY\t{:?} got events: {:?}", token, events);
1593
1594 let session_token = token.0;
1595 if self.sessions.borrow().slab.contains(session_token) {
1596 let protocol = self.sessions.borrow().slab[session_token]
1598 .borrow()
1599 .protocol();
1600 match protocol {
1602 Protocol::HTTPListen | Protocol::HTTPSListen | Protocol::TCPListen => {
1603 if events.is_readable() {
1605 self.accept_ready.insert(ListenToken(token.0));
1606 if self.sessions.borrow().can_accept {
1607 self.accept(ListenToken(token.0), protocol);
1608 }
1609 return;
1610 }
1611
1612 if events.is_writable() {
1613 error!(
1614 "received writable for listener {:?}, this should not happen",
1615 token
1616 );
1617 return;
1618 }
1619
1620 if events.is_hup() {
1621 error!("should not happen: server {:?} closed", token);
1622 return;
1623 }
1624
1625 unreachable!();
1626 }
1627 _ => {}
1628 }
1629
1630 let session = self.sessions.borrow_mut().slab[session_token].clone();
1631 session.borrow_mut().update_readiness(token, events);
1632 if session.borrow_mut().ready(session.clone()) {
1633 self.kill_session(session);
1634 }
1635 }
1636 }
1637
1638 pub fn timeout(&mut self, token: Token) {
1639 trace!("PROXY\t{:?} got timeout", token);
1640
1641 let session_token = token.0;
1642 if self.sessions.borrow().slab.contains(session_token) {
1643 let session = self.sessions.borrow_mut().slab[session_token].clone();
1644 if session.borrow_mut().timeout(token) {
1645 self.kill_session(session);
1646 }
1647 }
1648 }
1649
1650 pub fn handle_remaining_readiness(&mut self) {
1651 if self.sessions.borrow().can_accept && !self.accept_ready.is_empty() {
1654 while let Some(token) = self
1655 .accept_ready
1656 .iter()
1657 .next()
1658 .map(|token| ListenToken(token.0))
1659 {
1660 let protocol = self.sessions.borrow().slab[token.0].borrow().protocol();
1661 self.accept(token, protocol);
1662 if !self.sessions.borrow().can_accept || self.accept_ready.is_empty() {
1663 break;
1664 }
1665 }
1666 }
1667 }
1668 fn block_channel(&mut self) {
1669 if let Err(e) = self.channel.blocking() {
1670 error!("Could not block channel: {}", e);
1671 }
1672 }
1673 fn unblock_channel(&mut self) {
1674 if let Err(e) = self.channel.nonblocking() {
1675 error!("Could not block channel: {}", e);
1676 }
1677 }
1678}
1679
1680fn worker_response_error<S: ToString, T: ToString>(request_id: S, error: T) -> WorkerResponse {
1683 error!(
1684 "error on request {}, {}",
1685 request_id.to_string(),
1686 error.to_string()
1687 );
1688 WorkerResponse::error(request_id, error)
1689}
1690
1691pub struct ListenSession {
1692 pub protocol: Protocol,
1693}
1694
1695impl ProxySession for ListenSession {
1696 fn last_event(&self) -> Instant {
1697 Instant::now()
1698 }
1699
1700 fn print_session(&self) {}
1701
1702 fn frontend_token(&self) -> Token {
1703 Token(0)
1704 }
1705
1706 fn protocol(&self) -> Protocol {
1707 self.protocol
1708 }
1709
1710 fn ready(&mut self, _session: Rc<RefCell<dyn ProxySession>>) -> SessionIsToBeClosed {
1711 false
1712 }
1713
1714 fn shutting_down(&mut self) -> SessionIsToBeClosed {
1715 false
1716 }
1717
1718 fn update_readiness(&mut self, _token: Token, _events: Ready) {}
1719
1720 fn close(&mut self) {}
1721
1722 fn timeout(&mut self, _token: Token) -> SessionIsToBeClosed {
1723 error!(
1724 "called ProxySession::timeout(token={:?}, time) on ListenSession {{ protocol: {:?} }}",
1725 _token, self.protocol
1726 );
1727 false
1728 }
1729}