1use std::{
3 cell::RefCell,
4 collections::{HashSet, VecDeque},
5 io::Error as IoError,
6 os::unix::io::{AsRawFd, FromRawFd},
7 rc::Rc,
8 time::{Duration, Instant},
9};
10
11use mio::{
12 Events, Interest, Poll, Token,
13 net::{TcpListener as MioTcpListener, TcpStream},
14};
15use slab::Slab;
16use sozu_command::{
17 channel::Channel,
18 logging,
19 proto::command::{
20 ActivateListener, AddBackend, CertificatesWithFingerprints, Cluster, ClusterHashes,
21 ClusterInformations, DeactivateListener, Event, HttpListenerConfig, HttpsListenerConfig,
22 InitialState, ListenerType, LoadBalancingAlgorithms, LoadMetric, MetricsConfiguration,
23 RemoveBackend, Request, ResponseStatus, ServerConfig,
24 TcpListenerConfig as CommandTcpListener, WorkerRequest, WorkerResponse,
25 request::RequestType, response_content::ContentType,
26 },
27 ready::Ready,
28 scm_socket::{Listeners, ScmSocket, ScmSocketError},
29 state::ConfigState,
30};
31
32use crate::{
33 AcceptError, Protocol, ProxyConfiguration, ProxySession, SessionIsToBeClosed,
34 backends::{Backend, BackendMap},
35 features::FEATURES,
36 http, https,
37 metrics::METRICS,
38 pool::Pool,
39 tcp,
40 timer::Timer,
41};
42
43pub const CONN_RETRIES: u8 = 3;
45
46pub type ProxyChannel = Channel<WorkerResponse, WorkerRequest>;
47
48thread_local! {
49 pub static QUEUE: RefCell<VecDeque<WorkerResponse>> = const { RefCell::new(VecDeque::new()) };
50}
51
52thread_local! {
53 pub static TIMER: RefCell<Timer<Token>> = RefCell::new(Timer::default());
54}
55
56pub fn push_queue(message: WorkerResponse) {
57 QUEUE.with(|queue| {
58 (*queue.borrow_mut()).push_back(message);
59 });
60}
61
62pub fn push_event(event: Event) {
63 QUEUE.with(|queue| {
64 (*queue.borrow_mut()).push_back(WorkerResponse {
65 id: "EVENT".to_string(),
66 message: String::new(),
67 status: ResponseStatus::Processing.into(),
68 content: Some(ContentType::Event(event).into()),
69 });
70 });
71}
72
73#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
74pub struct ListenToken(pub usize);
75#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
76pub struct SessionToken(pub usize);
77
78impl From<usize> for ListenToken {
79 fn from(val: usize) -> ListenToken {
80 ListenToken(val)
81 }
82}
83
84impl From<ListenToken> for usize {
85 fn from(val: ListenToken) -> usize {
86 val.0
87 }
88}
89
90impl From<usize> for SessionToken {
91 fn from(val: usize) -> SessionToken {
92 SessionToken(val)
93 }
94}
95
96impl From<SessionToken> for usize {
97 fn from(val: SessionToken) -> usize {
98 val.0
99 }
100}
101
102pub struct SessionManager {
103 pub max_connections: usize,
104 pub nb_connections: usize,
105 pub can_accept: bool,
106 pub slab: Slab<Rc<RefCell<dyn ProxySession>>>,
107}
108
109impl SessionManager {
110 pub fn new(
111 slab: Slab<Rc<RefCell<dyn ProxySession>>>,
112 max_connections: usize,
113 ) -> Rc<RefCell<Self>> {
114 Rc::new(RefCell::new(SessionManager {
115 max_connections,
116 nb_connections: 0,
117 can_accept: true,
118 slab,
119 }))
120 }
121
122 pub fn at_capacity(&self) -> bool {
124 self.slab.len() >= 10 + 2 * self.max_connections
125 }
126
127 pub fn check_limits(&mut self) -> bool {
130 if self.nb_connections >= self.max_connections {
131 error!("max number of session connection reached, flushing the accept queue");
132 gauge!("accept_queue.backpressure", 1);
133 self.can_accept = false;
134 return false;
135 }
136
137 if self.at_capacity() {
138 error!("not enough memory to accept another session, flushing the accept queue");
139 error!(
140 "nb_connections: {}, max_connections: {}",
141 self.nb_connections, self.max_connections
142 );
143 gauge!("accept_queue.backpressure", 1);
144 self.can_accept = false;
145
146 return false;
147 }
148
149 true
150 }
151
152 pub fn to_session(token: Token) -> SessionToken {
153 SessionToken(token.0)
154 }
155
156 pub fn incr(&mut self) {
157 self.nb_connections += 1;
158 assert!(self.nb_connections <= self.max_connections);
159 gauge!("client.connections", self.nb_connections);
160 }
161
162 pub fn decr(&mut self) {
165 assert!(self.nb_connections != 0);
166 self.nb_connections -= 1;
167 gauge!("client.connections", self.nb_connections);
168
169 if !self.can_accept && self.nb_connections < self.max_connections * 90 / 100 {
171 debug!(
172 "nb_connections = {}, max_connections = {}, starting to accept again",
173 self.nb_connections, self.max_connections
174 );
175 gauge!("accept_queue.backpressure", 0);
176 self.can_accept = true;
177 }
178 }
179}
180
181#[derive(thiserror::Error, Debug)]
182pub enum ServerError {
183 #[error("could not create event loop with MIO poll: {0}")]
184 CreatePoll(IoError),
185 #[error("could not clone the MIO registry: {0}")]
186 CloneRegistry(IoError),
187 #[error("could not register the channel: {0}")]
188 RegisterChannel(IoError),
189 #[error("{msg}:{scm_err}")]
190 ScmSocket {
191 msg: String,
192 scm_err: ScmSocketError,
193 },
194}
195
196pub struct Server {
213 accept_queue_timeout: Duration,
214 accept_queue: VecDeque<(TcpStream, ListenToken, Protocol, Instant)>,
215 accept_ready: HashSet<ListenToken>,
216 backends: Rc<RefCell<BackendMap>>,
217 base_sessions_count: usize,
218 channel: ProxyChannel,
219 config_state: ConfigState,
220 current_poll_errors: i32,
221 http: Rc<RefCell<http::HttpProxy>>,
222 https: Rc<RefCell<https::HttpsProxy>>,
223 last_sessions_len: usize,
224 last_shutting_down_message: Option<Instant>,
225 last_zombie_check: Instant,
226 loop_start: Instant,
227 max_poll_errors: i32, pub poll: Poll,
229 poll_timeout: Option<Duration>, scm_listeners: Option<Listeners>,
231 scm: ScmSocket,
232 sessions: Rc<RefCell<SessionManager>>,
233 should_poll_at: Option<Instant>,
234 shutting_down: Option<String>,
235 tcp: Rc<RefCell<tcp::TcpProxy>>,
236 zombie_check_interval: Duration,
237}
238
239impl Server {
240 pub fn try_new_from_config(
241 worker_to_main_channel: ProxyChannel,
242 worker_to_main_scm: ScmSocket,
243 config: ServerConfig,
244 initial_state: InitialState,
245 expects_initial_status: bool,
246 ) -> Result<Self, ServerError> {
247 let event_loop = Poll::new().map_err(ServerError::CreatePoll)?;
248 let pool = Rc::new(RefCell::new(Pool::with_capacity(
249 config.min_buffers as usize,
250 config.max_buffers as usize,
251 config.buffer_size as usize,
252 )));
253 let backends = Rc::new(RefCell::new(BackendMap::new()));
254
255 let sessions: Rc<RefCell<SessionManager>> = SessionManager::new(
258 Slab::with_capacity(config.slab_capacity() as usize),
259 config.max_connections as usize,
260 );
261 {
262 let mut s = sessions.borrow_mut();
263 let entry = s.slab.vacant_entry();
264 trace!("taking token {:?} for channel", SessionToken(entry.key()));
265 entry.insert(Rc::new(RefCell::new(ListenSession {
266 protocol: Protocol::Channel,
267 })));
268 }
269 {
270 let mut s = sessions.borrow_mut();
271 let entry = s.slab.vacant_entry();
272 trace!("taking token {:?} for metrics", SessionToken(entry.key()));
273 entry.insert(Rc::new(RefCell::new(ListenSession {
274 protocol: Protocol::Timer,
275 })));
276 }
277 {
278 let mut s = sessions.borrow_mut();
279 let entry = s.slab.vacant_entry();
280 trace!("taking token {:?} for metrics", SessionToken(entry.key()));
281 entry.insert(Rc::new(RefCell::new(ListenSession {
282 protocol: Protocol::Metrics,
283 })));
284 }
285
286 Server::new(
287 event_loop,
288 worker_to_main_channel,
289 worker_to_main_scm,
290 sessions,
291 pool,
292 backends,
293 None,
294 None,
295 None,
296 config,
297 Some(initial_state),
298 expects_initial_status,
299 )
300 }
301
302 #[allow(clippy::too_many_arguments)]
303 pub fn new(
304 poll: Poll,
305 mut channel: ProxyChannel,
306 scm: ScmSocket,
307 sessions: Rc<RefCell<SessionManager>>,
308 pool: Rc<RefCell<Pool>>,
309 backends: Rc<RefCell<BackendMap>>,
310 http: Option<http::HttpProxy>,
311 https: Option<https::HttpsProxy>,
312 tcp: Option<tcp::TcpProxy>,
313 server_config: ServerConfig,
314 initial_state: Option<InitialState>,
315 expects_initial_status: bool,
316 ) -> Result<Self, ServerError> {
317 FEATURES.with(|_features| {
318 });
320
321 poll.registry()
322 .register(
323 &mut channel,
324 Token(0),
325 Interest::READABLE | Interest::WRITABLE,
326 )
327 .map_err(ServerError::RegisterChannel)?;
328
329 METRICS.with(|metrics| {
330 if let Some(sock) = (*metrics.borrow_mut()).socket_mut() {
331 poll.registry()
332 .register(sock, Token(2), Interest::WRITABLE)
333 .expect("should register the metrics socket");
334 }
335 });
336
337 let base_sessions_count = sessions.borrow().slab.len();
338
339 let http = Rc::new(RefCell::new(match http {
340 Some(http) => http,
341 None => {
342 let registry = poll
343 .registry()
344 .try_clone()
345 .map_err(ServerError::CloneRegistry)?;
346
347 http::HttpProxy::new(registry, sessions.clone(), pool.clone(), backends.clone())
348 }
349 }));
350
351 let https = Rc::new(RefCell::new(match https {
352 Some(https) => https,
353 None => {
354 let registry = poll
355 .registry()
356 .try_clone()
357 .map_err(ServerError::CloneRegistry)?;
358
359 https::HttpsProxy::new(registry, sessions.clone(), pool.clone(), backends.clone())
360 }
361 }));
362
363 let tcp = Rc::new(RefCell::new(match tcp {
364 Some(tcp) => tcp,
365 None => {
366 let registry = poll
367 .registry()
368 .try_clone()
369 .map_err(ServerError::CloneRegistry)?;
370
371 tcp::TcpProxy::new(registry, sessions.clone(), pool.clone(), backends.clone())
372 }
373 }));
374
375 let mut server = Server {
376 accept_queue_timeout: Duration::from_secs(u64::from(
377 server_config.accept_queue_timeout,
378 )),
379 accept_queue: VecDeque::new(),
380 accept_ready: HashSet::new(),
381 backends,
382 base_sessions_count,
383 channel,
384 config_state: ConfigState::new(),
385 current_poll_errors: 0,
386 http,
387 https,
388 last_sessions_len: 0, last_shutting_down_message: None,
390 last_zombie_check: Instant::now(), loop_start: Instant::now(), max_poll_errors: 10000, poll_timeout: Some(Duration::from_millis(1000)), poll,
395 scm_listeners: None,
396 scm,
397 sessions,
398 should_poll_at: None,
399 shutting_down: None,
400 tcp,
401 zombie_check_interval: Duration::from_secs(u64::from(
402 server_config.zombie_check_interval,
403 )),
404 };
405
406 if let Some(state) = initial_state {
408 for request in state.requests {
409 trace!("generating initial config request: {:#?}", request);
410 server.notify_proxys(request);
411 }
412
413 QUEUE.with(|queue| {
415 (*queue.borrow_mut()).clear();
416 });
417 }
418
419 if expects_initial_status {
420 server.block_channel();
423 let msg = server.channel.read_message();
424 debug!("got message: {:?}", msg);
425
426 if let Ok(WorkerRequest {
427 id,
428 content:
429 Request {
430 request_type: Some(RequestType::Status(_)),
431 },
432 }) = msg
433 {
434 if let Err(e) = server.channel.write_message(&WorkerResponse::ok(id)) {
435 error!("Could not send an ok to the main process: {}", e);
436 }
437 } else {
438 panic!(
439 "plz give me a status request first when I start, you sent me this instead: {:?}",
440 msg
441 );
442 }
443 server.unblock_channel();
444 }
445
446 info!("will try to receive listeners");
447 server
448 .scm
449 .set_blocking(true)
450 .map_err(|scm_err| ServerError::ScmSocket {
451 msg: "Could not set the scm socket to blocking".to_string(),
452 scm_err,
453 })?;
454 let listeners =
455 server
456 .scm
457 .receive_listeners()
458 .map_err(|scm_err| ServerError::ScmSocket {
459 msg: "could not receive listeners from the scm socket".to_string(),
460 scm_err,
461 })?;
462 server
463 .scm
464 .set_blocking(false)
465 .map_err(|scm_err| ServerError::ScmSocket {
466 msg: "Could not set the scm socket to unblocking".to_string(),
467 scm_err,
468 })?;
469 info!("received listeners: {:?}", listeners);
470 server.scm_listeners = Some(listeners);
471
472 Ok(server)
473 }
474
475 pub fn run(&mut self) {
477 let mut events = Events::with_capacity(1024); self.last_sessions_len = self.sessions.borrow().slab.len();
479
480 self.last_zombie_check = Instant::now();
481 self.loop_start = Instant::now();
482
483 loop {
484 self.check_for_poll_errors();
485
486 let timeout = self.reset_loop_time_and_get_timeout();
487
488 match self.poll.poll(&mut events, timeout) {
489 Ok(_) => self.current_poll_errors = 0,
490 Err(error) => {
491 error!("Error while polling events: {:?}", error);
492 self.current_poll_errors += 1;
493 continue;
494 }
495 }
496
497 let after_epoll = Instant::now();
498 time!("epoll_time", (after_epoll - self.loop_start).as_millis());
499 self.loop_start = after_epoll;
500
501 self.send_queue();
502
503 for event in events.iter() {
504 match event.token() {
505 Token(0) => {
507 if event.is_error() {
508 error!("error reading from command channel");
509 continue;
510 }
511 if event.is_read_closed() || event.is_write_closed() {
512 error!("command channel was closed");
513 return;
514 }
515 let ready = Ready::from(event);
516 self.channel.handle_events(ready);
517
518 loop {
520 QUEUE.with(|queue| {
521 if !(*queue.borrow()).is_empty() {
522 self.channel.interest.insert(Ready::WRITABLE);
523 }
524 });
525
526 if self.channel.readiness() == Ready::EMPTY {
529 break;
530 }
531
532 if self.read_channel_messages_and_notify() {
534 return;
535 }
536
537 QUEUE.with(|queue| {
538 if !(*queue.borrow()).is_empty() {
539 self.channel.interest.insert(Ready::WRITABLE);
540 }
541 });
542
543 self.send_queue();
544 }
545 }
546 Token(1) => {
548 while let Some(t) = TIMER.with(|timer| timer.borrow_mut().poll()) {
549 self.timeout(t);
550 }
551 }
552 Token(2) => METRICS.with(|metrics| {
554 (*metrics.borrow_mut()).writable();
555 }),
556 token => self.ready(token, Ready::from(event)),
559 }
560 }
561
562 if let Some(t) = self.should_poll_at.as_ref() {
563 if *t <= Instant::now() {
564 while let Some(t) = TIMER.with(|timer| timer.borrow_mut().poll()) {
565 self.timeout(t);
567 }
568 }
569 }
570 self.handle_remaining_readiness();
571 self.create_sessions();
572
573 self.should_poll_at = TIMER.with(|timer| timer.borrow().next_poll_date());
574
575 self.zombie_check();
576
577 let now = time::OffsetDateTime::now_utc();
578 if now.minute() == 00 && now.second() == 0 {
581 METRICS.with(|metrics| {
582 (*metrics.borrow_mut()).clear_local();
583 });
584 }
585
586 gauge!("client.connections", self.sessions.borrow().nb_connections);
587 gauge!("slab.entries", self.sessions.borrow().slab.len());
588 METRICS.with(|metrics| {
589 (*metrics.borrow_mut()).send_data();
590 });
591
592 if self.shutting_down.is_some() && self.shut_down_sessions() {
593 return;
594 }
595 }
596 }
597
598 fn check_for_poll_errors(&mut self) {
599 if self.current_poll_errors >= self.max_poll_errors {
600 error!(
601 "Something is going very wrong. Last {} poll() calls failed, crashing..",
602 self.current_poll_errors
603 );
604 panic!(
605 "poll() calls failed {} times in a row",
606 self.current_poll_errors
607 );
608 }
609 }
610
611 fn reset_loop_time_and_get_timeout(&mut self) -> Option<Duration> {
612 let now = Instant::now();
613 time!("event_loop_time", (now - self.loop_start).as_millis());
614
615 let timeout = match self.should_poll_at.as_ref() {
616 None => self.poll_timeout,
617 Some(i) => {
618 if *i <= now {
619 self.poll_timeout
620 } else {
621 let dur = *i - now;
622 match self.poll_timeout {
623 None => Some(dur),
624 Some(t) => {
625 if t < dur {
626 Some(t)
627 } else {
628 Some(dur)
629 }
630 }
631 }
632 }
633 }
634 };
635
636 self.loop_start = now;
637 timeout
638 }
639
640 fn read_channel_messages_and_notify(&mut self) -> bool {
642 if !self.channel.readiness().is_readable() {
643 return false;
644 }
645
646 if let Err(e) = self.channel.readable() {
647 error!("error reading from channel: {:?}", e);
648 }
649
650 loop {
651 let request = self.channel.read_message();
652 debug!("Received request {:?}", request);
653 match request {
654 Ok(request) => match request.content.request_type {
655 Some(RequestType::HardStop(_)) => {
656 let req_id = request.id.clone();
657 self.notify(request);
658 if let Err(e) = self.channel.write_message(&WorkerResponse::ok(req_id)) {
659 error!("Could not send ok response to the main process: {}", e);
660 }
661 if let Err(e) = self.channel.run() {
662 error!("Error while running the server channel: {}", e);
663 }
664 return true;
665 }
666 Some(RequestType::SoftStop(_)) => {
667 self.shutting_down = Some(request.id.clone());
668 self.last_sessions_len = self.sessions.borrow().slab.len();
669 self.notify(request);
670 }
671 Some(RequestType::ReturnListenSockets(_)) => {
672 info!("received ReturnListenSockets order");
673 match self.return_listen_sockets() {
674 Ok(_) => push_queue(WorkerResponse::ok(request.id)),
675 Err(error) => push_queue(worker_response_error(
676 request.id,
677 format!("Could not send listeners on scm socket: {error:?}"),
678 )),
679 }
680 }
681 _ => self.notify(request),
682 },
683 Err(_) => {
685 if (self.channel.interest & self.channel.readiness).is_readable() {
687 if let Err(e) = self.channel.readable() {
688 error!("error reading from channel: {:?}", e);
689 }
690 continue;
691 }
692 break;
693 }
694 }
695 }
696 false
697 }
698
699 fn zombie_check(&mut self) {
701 let now = Instant::now();
702 if now - self.last_zombie_check < self.zombie_check_interval {
703 return;
704 }
705 info!("zombie check");
706 self.last_zombie_check = now;
707
708 let mut zombie_tokens = HashSet::new();
709
710 for (_index, session) in self
712 .sessions
713 .borrow_mut()
714 .slab
715 .iter_mut()
716 .filter(|(_, c)| now - c.borrow().last_event() > self.zombie_check_interval)
717 {
718 let session_token = session.borrow().frontend_token();
719 if !zombie_tokens.contains(&session_token) {
720 session.borrow().print_session();
721 zombie_tokens.insert(session_token);
722 }
723 }
724
725 let zombie_count = zombie_tokens.len() as i64;
726 count!("zombies", zombie_count);
727
728 let remaining_count = self.shut_down_sessions_by_frontend_tokens(zombie_tokens);
729 info!(
730 "removing {} zombies ({} remaining entries after close)",
731 zombie_count, remaining_count
732 );
733 }
734
735 fn shut_down_sessions_by_frontend_tokens(&self, tokens: HashSet<Token>) -> usize {
738 if tokens.is_empty() {
739 return 0;
740 }
741
742 for token in &tokens {
744 if self.sessions.borrow().slab.contains(token.0) {
745 let session = { self.sessions.borrow_mut().slab.remove(token.0) };
746 session.borrow_mut().close();
747 self.sessions.borrow_mut().decr();
748 }
749 }
750
751 let mut dangling_entries = HashSet::new();
753 for (entry_key, session) in &self.sessions.borrow().slab {
754 if tokens.contains(&session.borrow().frontend_token()) {
755 dangling_entries.insert(entry_key);
756 }
757 }
758
759 let mut dangling_entries_count = 0;
761 for entry_key in dangling_entries {
762 let mut sessions = self.sessions.borrow_mut();
763 if sessions.slab.contains(entry_key) {
764 sessions.slab.remove(entry_key);
765 dangling_entries_count += 1;
766 }
767 }
768 dangling_entries_count
769 }
770
771 fn shut_down_sessions(&mut self) -> bool {
773 let sessions_count = self.sessions.borrow().slab.len();
774 let mut sessions_to_shut_down = HashSet::new();
775
776 for (_key, session) in &self.sessions.borrow().slab {
777 if session.borrow_mut().shutting_down() {
778 sessions_to_shut_down.insert(Token(session.borrow().frontend_token().0));
779 }
780 }
781 let _ = self.shut_down_sessions_by_frontend_tokens(sessions_to_shut_down);
782
783 let new_sessions_count = self.sessions.borrow().slab.len();
784
785 if new_sessions_count < sessions_count {
786 let now = Instant::now();
787 if let Some(last) = self.last_shutting_down_message {
788 if (now - last) > Duration::from_secs(5) {
789 info!(
790 "closed {} sessions, {} sessions left, base_sessions_count = {}",
791 sessions_count - new_sessions_count,
792 new_sessions_count,
793 self.base_sessions_count
794 );
795 }
796 }
797 self.last_shutting_down_message = Some(now);
798 }
799
800 if new_sessions_count <= self.base_sessions_count {
801 info!("last session stopped, shutting down!");
802 if let Err(e) = self.channel.run() {
803 error!("Error while running the server channel: {}", e);
804 }
805 let id = self
807 .shutting_down
808 .take()
809 .expect("should have shut down correctly"); debug!("Responding OK to main process for request {}", id);
812
813 let proxy_response = WorkerResponse::ok(id);
814 if let Err(e) = self.channel.write_message(&proxy_response) {
815 error!("Could not write response to the main process: {}", e);
816 }
817 if let Err(e) = self.channel.run() {
818 error!("Error while running the server channel: {}", e);
819 }
820 return true;
821 }
822
823 if new_sessions_count < self.last_sessions_len {
824 info!(
825 "shutting down, {} slab elements remaining (base: {})",
826 new_sessions_count - self.base_sessions_count,
827 self.base_sessions_count
828 );
829 self.last_sessions_len = new_sessions_count;
830 }
831
832 false
833 }
834
835 fn kill_session(&self, session: Rc<RefCell<dyn ProxySession>>) {
836 let token = session.borrow().frontend_token();
837 let _ = self.shut_down_sessions_by_frontend_tokens(HashSet::from([token]));
838 }
839
840 fn send_queue(&mut self) {
841 if self.channel.readiness.is_writable() {
842 QUEUE.with(|q| {
843 let mut queue = q.borrow_mut();
844 loop {
845 if let Some(resp) = queue.pop_front() {
846 debug!("Sending response {:?}", resp);
847 if let Err(e) = self.channel.write_message(&resp) {
848 error!("Could not write message {} on the channel: {}", resp, e);
849 queue.push_front(resp);
850 }
851 }
852
853 if self.channel.back_buf.available_data() > 0 {
854 if let Err(e) = self.channel.writable() {
855 error!("error writing to channel: {:?}", e);
856 }
857 }
858
859 if !self.channel.readiness.is_writable() {
860 break;
861 }
862
863 if self.channel.back_buf.available_data() == 0 && queue.len() == 0 {
864 break;
865 }
866 }
867 });
868 }
869 }
870
871 fn notify(&mut self, message: WorkerRequest) {
872 match &message.content.request_type {
873 Some(RequestType::ConfigureMetrics(configuration)) => {
874 match MetricsConfiguration::try_from(*configuration) {
875 Ok(metrics_config) => {
876 METRICS.with(|metrics| {
877 (*metrics.borrow_mut()).configure(&metrics_config);
878 push_queue(WorkerResponse::ok(message.id));
879 });
880 }
881 Err(e) => {
882 error!("Error configuring metrics: {}", e);
883 push_queue(WorkerResponse::error(message.id, e));
884 }
885 }
886 return;
887 }
888 Some(RequestType::QueryMetrics(query_metrics_options)) => {
889 METRICS.with(|metrics| {
890 match (*metrics.borrow_mut()).query(query_metrics_options) {
891 Ok(c) => push_queue(WorkerResponse::ok_with_content(message.id, c)),
892 Err(e) => {
893 error!("Error querying metrics: {}", e);
894 push_queue(WorkerResponse::error(message.id, e))
895 }
896 }
897 });
898 return;
899 }
900 Some(RequestType::Logging(logging_filter)) => {
901 info!(
902 "{} changing logging filter to {}",
903 message.id, logging_filter
904 );
905 let (directives, _errors) = logging::parse_logging_spec(logging_filter);
907 logging::LOGGER.with(|logger| {
908 logger.borrow_mut().set_directives(directives);
909 });
910 push_queue(WorkerResponse::ok(message.id));
911 return;
912 }
913 Some(RequestType::QueryClustersHashes(_)) => {
914 push_queue(WorkerResponse::ok_with_content(
915 message.id.clone(),
916 ContentType::ClusterHashes(ClusterHashes {
917 map: self.config_state.hash_state(),
918 })
919 .into(),
920 ));
921 return;
922 }
923 Some(RequestType::QueryClusterById(cluster_id)) => {
924 push_queue(WorkerResponse::ok_with_content(
925 message.id.clone(),
926 ContentType::Clusters(ClusterInformations {
927 vec: self
928 .config_state
929 .cluster_state(cluster_id)
930 .map_or(vec![], |ci| vec![ci]),
931 })
932 .into(),
933 ));
934 }
935 Some(RequestType::QueryClustersByDomain(domain)) => {
936 let cluster_ids = self
937 .config_state
938 .get_cluster_ids_by_domain(domain.hostname.clone(), domain.path.clone());
939 let vec = cluster_ids
940 .iter()
941 .filter_map(|cluster_id| self.config_state.cluster_state(cluster_id))
942 .collect();
943
944 push_queue(WorkerResponse::ok_with_content(
945 message.id.clone(),
946 ContentType::Clusters(ClusterInformations { vec }).into(),
947 ));
948 return;
949 }
950 Some(RequestType::QueryCertificatesFromWorkers(filters)) => {
951 if filters.fingerprint.is_some() {
952 let certs = self.config_state.get_certificates(filters.clone());
953 let response = if !certs.is_empty() {
954 WorkerResponse::ok_with_content(
955 message.id.clone(),
956 ContentType::CertificatesWithFingerprints(
957 CertificatesWithFingerprints { certs },
958 )
959 .into(),
960 )
961 } else {
962 worker_response_error(
963 message.id.clone(),
964 "Could not find certificate for this fingerprint",
965 )
966 };
967 push_queue(response);
968 return;
969 }
970 }
973 _other_request => {}
974 }
975 self.notify_proxys(message);
976 }
977
978 pub fn notify_proxys(&mut self, request: WorkerRequest) {
979 if let Err(e) = self.config_state.dispatch(&request.content) {
980 error!("Could not execute order on config state: {}", e);
981 }
982
983 let req_id = request.id.clone();
984
985 match request.content.request_type {
986 Some(RequestType::AddCluster(ref cluster)) => {
987 self.add_cluster(cluster);
988 }
990 Some(RequestType::AddBackend(ref backend)) => {
991 push_queue(self.add_backend(&req_id, backend));
992 return;
993 }
994 Some(RequestType::RemoveBackend(ref remove_backend)) => {
995 push_queue(self.remove_backend(&req_id, remove_backend));
996 return;
997 }
998 _ => {}
999 };
1000
1001 let proxy_destinations = request.content.get_destinations();
1002 let mut notify_response = None;
1003 if proxy_destinations.to_http_proxy {
1004 notify_response = Some(self.http.borrow_mut().notify(request.clone()));
1005 }
1006 if proxy_destinations.to_https_proxy {
1007 let http_proxy_response = self.https.borrow_mut().notify(request.clone());
1008 if http_proxy_response.is_failure() || notify_response.is_none() {
1009 notify_response = Some(http_proxy_response);
1010 }
1011 }
1012 if proxy_destinations.to_tcp_proxy {
1013 let tcp_proxy_response = self.tcp.borrow_mut().notify(request.clone());
1014 if tcp_proxy_response.is_failure() || notify_response.is_none() {
1015 notify_response = Some(tcp_proxy_response);
1016 }
1017 }
1018 if let Some(response) = notify_response {
1019 push_queue(response);
1020 }
1021
1022 match request.content.request_type {
1023 Some(RequestType::AddHttpListener(listener)) => {
1025 push_queue(self.notify_add_http_listener(&req_id, listener));
1026 }
1027 Some(RequestType::AddHttpsListener(listener)) => {
1028 push_queue(self.notify_add_https_listener(&req_id, listener));
1029 }
1030 Some(RequestType::AddTcpListener(listener)) => {
1031 push_queue(self.notify_add_tcp_listener(&req_id, listener));
1032 }
1033 Some(RequestType::RemoveListener(ref remove)) => {
1034 debug!("{} remove {:?} listener {:?}", req_id, remove.proxy, remove);
1035 self.base_sessions_count -= 1;
1036 let response = match ListenerType::try_from(remove.proxy) {
1037 Ok(ListenerType::Http) => self.http.borrow_mut().notify(request),
1038 Ok(ListenerType::Https) => self.https.borrow_mut().notify(request),
1039 Ok(ListenerType::Tcp) => self.tcp.borrow_mut().notify(request),
1040 Err(_) => WorkerResponse::error(req_id, "Wrong variant ListenerType"),
1041 };
1042 push_queue(response);
1043 }
1044 Some(RequestType::ActivateListener(ref activate)) => {
1045 push_queue(self.notify_activate_listener(&req_id, activate));
1046 }
1047 Some(RequestType::DeactivateListener(ref deactivate)) => {
1048 push_queue(self.notify_deactivate_listener(&req_id, deactivate));
1049 }
1050 _other_request => {}
1051 };
1052 }
1053
1054 fn add_cluster(&mut self, cluster: &Cluster) {
1055 self.backends
1056 .borrow_mut()
1057 .set_load_balancing_policy_for_cluster(
1058 &cluster.cluster_id,
1059 LoadBalancingAlgorithms::try_from(cluster.load_balancing).unwrap_or_default(),
1060 cluster
1061 .load_metric
1062 .and_then(|n| LoadMetric::try_from(n).ok()),
1063 );
1064 }
1065
1066 fn add_backend(&mut self, req_id: &str, add_backend: &AddBackend) -> WorkerResponse {
1067 let new_backend = Backend::new(
1068 &add_backend.backend_id,
1069 add_backend.address.into(),
1070 add_backend.sticky_id.clone(),
1071 add_backend.load_balancing_parameters,
1072 add_backend.backup,
1073 );
1074 self.backends
1075 .borrow_mut()
1076 .add_backend(&add_backend.cluster_id, new_backend);
1077
1078 WorkerResponse::ok(req_id)
1079 }
1080
1081 fn remove_backend(&mut self, req_id: &str, backend: &RemoveBackend) -> WorkerResponse {
1082 let address = backend.address.into();
1083 self.backends
1084 .borrow_mut()
1085 .remove_backend(&backend.cluster_id, &address);
1086
1087 WorkerResponse::ok(req_id)
1088 }
1089
1090 fn notify_add_http_listener(
1091 &mut self,
1092 req_id: &str,
1093 listener: HttpListenerConfig,
1094 ) -> WorkerResponse {
1095 debug!("{} add http listener {:?}", req_id, listener);
1096
1097 if self.sessions.borrow().at_capacity() {
1098 return worker_response_error(req_id, "session list is full, cannot add a listener");
1099 }
1100
1101 let mut session_manager = self.sessions.borrow_mut();
1102 let entry = session_manager.slab.vacant_entry();
1103 let token = Token(entry.key());
1104
1105 match self.http.borrow_mut().add_listener(listener, token) {
1106 Ok(_token) => {
1107 entry.insert(Rc::new(RefCell::new(ListenSession {
1108 protocol: Protocol::HTTPListen,
1109 })));
1110 self.base_sessions_count += 1;
1111 WorkerResponse::ok(req_id)
1112 }
1113 Err(e) => worker_response_error(req_id, format!("Could not add HTTP listener: {e}")),
1114 }
1115 }
1116
1117 fn notify_add_https_listener(
1118 &mut self,
1119 req_id: &str,
1120 listener: HttpsListenerConfig,
1121 ) -> WorkerResponse {
1122 debug!("{} add https listener {:?}", req_id, listener);
1123
1124 if self.sessions.borrow().at_capacity() {
1125 return worker_response_error(req_id, "session list is full, cannot add a listener");
1126 }
1127
1128 let mut session_manager = self.sessions.borrow_mut();
1129 let entry = session_manager.slab.vacant_entry();
1130 let token = Token(entry.key());
1131
1132 match self
1133 .https
1134 .borrow_mut()
1135 .add_listener(listener.clone(), token)
1136 {
1137 Ok(_token) => {
1138 entry.insert(Rc::new(RefCell::new(ListenSession {
1139 protocol: Protocol::HTTPSListen,
1140 })));
1141 self.base_sessions_count += 1;
1142 WorkerResponse::ok(req_id)
1143 }
1144 Err(e) => worker_response_error(req_id, format!("Could not add HTTPS listener: {e}")),
1145 }
1146 }
1147
1148 fn notify_add_tcp_listener(
1149 &mut self,
1150 req_id: &str,
1151 listener: CommandTcpListener,
1152 ) -> WorkerResponse {
1153 debug!("{} add tcp listener {:?}", req_id, listener);
1154
1155 if self.sessions.borrow().at_capacity() {
1156 return worker_response_error(req_id, "session list is full, cannot add a listener");
1157 }
1158
1159 let mut session_manager = self.sessions.borrow_mut();
1160 let entry = session_manager.slab.vacant_entry();
1161 let token = Token(entry.key());
1162
1163 match self.tcp.borrow_mut().add_listener(listener, token) {
1164 Ok(_token) => {
1165 entry.insert(Rc::new(RefCell::new(ListenSession {
1166 protocol: Protocol::TCPListen,
1167 })));
1168 self.base_sessions_count += 1;
1169 WorkerResponse::ok(req_id)
1170 }
1171 Err(e) => worker_response_error(req_id, format!("Could not add TCP listener: {e}")),
1172 }
1173 }
1174
1175 fn notify_activate_listener(
1176 &mut self,
1177 req_id: &str,
1178 activate: &ActivateListener,
1179 ) -> WorkerResponse {
1180 debug!(
1181 "{} activate {:?} listener {:?}",
1182 req_id, activate.proxy, activate
1183 );
1184
1185 let address: std::net::SocketAddr = activate.address.into();
1186
1187 match ListenerType::try_from(activate.proxy) {
1188 Ok(ListenerType::Http) => {
1189 let listener = self
1190 .scm_listeners
1191 .as_mut()
1192 .and_then(|s| s.get_http(&address))
1193 .map(|fd| unsafe { MioTcpListener::from_raw_fd(fd) });
1194
1195 let activated_token = self.http.borrow_mut().activate_listener(&address, listener);
1196 match activated_token {
1197 Ok(token) => {
1198 self.accept(ListenToken(token.0), Protocol::HTTPListen);
1199 WorkerResponse::ok(req_id)
1200 }
1201 Err(activate_error) => worker_response_error(
1202 req_id,
1203 format!("Could not activate HTTP listener: {}", activate_error),
1204 ),
1205 }
1206 }
1207 Ok(ListenerType::Https) => {
1208 let listener = self
1209 .scm_listeners
1210 .as_mut()
1211 .and_then(|s| s.get_https(&address))
1212 .map(|fd| unsafe { MioTcpListener::from_raw_fd(fd) });
1213
1214 let activated_token = self
1215 .https
1216 .borrow_mut()
1217 .activate_listener(&address, listener);
1218 match activated_token {
1219 Ok(token) => {
1220 self.accept(ListenToken(token.0), Protocol::HTTPSListen);
1221 WorkerResponse::ok(req_id)
1222 }
1223 Err(activate_error) => worker_response_error(
1224 req_id,
1225 format!("Could not activate HTTPS listener: {}", activate_error),
1226 ),
1227 }
1228 }
1229 Ok(ListenerType::Tcp) => {
1230 let listener = self
1231 .scm_listeners
1232 .as_mut()
1233 .and_then(|s| s.get_tcp(&address))
1234 .map(|fd| unsafe { MioTcpListener::from_raw_fd(fd) });
1235
1236 let listener_token = self.tcp.borrow_mut().activate_listener(&address, listener);
1237 match listener_token {
1238 Ok(token) => {
1239 self.accept(ListenToken(token.0), Protocol::TCPListen);
1240 WorkerResponse::ok(req_id)
1241 }
1242 Err(activate_error) => worker_response_error(
1243 req_id,
1244 format!("Could not activate TCP listener: {}", activate_error),
1245 ),
1246 }
1247 }
1248 Err(_) => worker_response_error(req_id, "Wrong variant for ListenerType on request"),
1249 }
1250 }
1251
1252 fn notify_deactivate_listener(
1253 &mut self,
1254 req_id: &str,
1255 deactivate: &DeactivateListener,
1256 ) -> WorkerResponse {
1257 debug!(
1258 "{} deactivate {:?} listener {:?}",
1259 req_id, deactivate.proxy, deactivate
1260 );
1261
1262 let address: std::net::SocketAddr = deactivate.address.into();
1263
1264 match ListenerType::try_from(deactivate.proxy) {
1265 Ok(ListenerType::Http) => {
1266 let (token, mut listener) = match self.http.borrow_mut().give_back_listener(address)
1267 {
1268 Ok((token, listener)) => (token, listener),
1269 Err(e) => {
1270 return worker_response_error(
1271 req_id,
1272 format!(
1273 "Couldn't deactivate HTTP listener at address {address:?}: {e}"
1274 ),
1275 );
1276 }
1277 };
1278
1279 if let Err(e) = self.poll.registry().deregister(&mut listener) {
1280 error!(
1281 "error deregistering HTTP listen socket({:?}): {:?}",
1282 deactivate, e
1283 );
1284 }
1285
1286 {
1287 let mut sessions = self.sessions.borrow_mut();
1288 if sessions.slab.contains(token.0) {
1289 sessions.slab.remove(token.0);
1290 info!("removed listen token {:?}", token);
1291 }
1292 }
1293
1294 if deactivate.to_scm {
1295 self.unblock_scm_socket();
1296 let listeners = Listeners {
1297 http: vec![(address, listener.as_raw_fd())],
1298 tls: vec![],
1299 tcp: vec![],
1300 };
1301 info!("sending HTTP listener: {:?}", listeners);
1302 let res = self.scm.send_listeners(&listeners);
1303
1304 self.block_scm_socket();
1305
1306 info!("sent HTTP listener: {:?}", res);
1307 }
1308 WorkerResponse::ok(req_id)
1309 }
1310 Ok(ListenerType::Https) => {
1311 let (token, mut listener) = match self
1312 .https
1313 .borrow_mut()
1314 .give_back_listener(address)
1315 {
1316 Ok((token, listener)) => (token, listener),
1317 Err(e) => {
1318 return worker_response_error(
1319 req_id,
1320 format!(
1321 "Couldn't deactivate HTTPS listener at address {address:?}: {e}",
1322 ),
1323 );
1324 }
1325 };
1326 if let Err(e) = self.poll.registry().deregister(&mut listener) {
1327 error!(
1328 "error deregistering HTTPS listen socket({:?}): {:?}",
1329 deactivate, e
1330 );
1331 }
1332 if self.sessions.borrow().slab.contains(token.0) {
1333 self.sessions.borrow_mut().slab.remove(token.0);
1334 info!("removed listen token {:?}", token);
1335 }
1336
1337 if deactivate.to_scm {
1338 self.unblock_scm_socket();
1339 let listeners = Listeners {
1340 http: vec![],
1341 tls: vec![(address, listener.as_raw_fd())],
1342 tcp: vec![],
1343 };
1344 info!("sending HTTPS listener: {:?}", listeners);
1345 let res = self.scm.send_listeners(&listeners);
1346
1347 self.block_scm_socket();
1348
1349 info!("sent HTTPS listener: {:?}", res);
1350 }
1351 WorkerResponse::ok(req_id)
1352 }
1353 Ok(ListenerType::Tcp) => {
1354 let (token, mut listener) = match self.tcp.borrow_mut().give_back_listener(address)
1355 {
1356 Ok((token, listener)) => (token, listener),
1357 Err(e) => {
1358 return worker_response_error(
1359 req_id,
1360 format!(
1361 "Could not deactivate TCP listener at address {:?}: {}",
1362 address, e
1363 ),
1364 );
1365 }
1366 };
1367
1368 if let Err(e) = self.poll.registry().deregister(&mut listener) {
1369 error!(
1370 "error deregistering TCP listen socket({:?}): {:?}",
1371 deactivate, e
1372 );
1373 }
1374 if self.sessions.borrow().slab.contains(token.0) {
1375 self.sessions.borrow_mut().slab.remove(token.0);
1376 info!("removed listen token {:?}", token);
1377 }
1378
1379 if deactivate.to_scm {
1380 self.unblock_scm_socket();
1381 let listeners = Listeners {
1382 http: vec![],
1383 tls: vec![],
1384 tcp: vec![(address, listener.as_raw_fd())],
1385 };
1386 info!("sending TCP listener: {:?}", listeners);
1387 let res = self.scm.send_listeners(&listeners);
1388
1389 self.block_scm_socket();
1390
1391 info!("sent TCP listener: {:?}", res);
1392 }
1393 WorkerResponse::ok(req_id)
1394 }
1395 Err(_) => worker_response_error(req_id, "Wrong variant for ListenerType on request"),
1396 }
1397 }
1398
1399 pub fn return_listen_sockets(&mut self) -> Result<(), ScmSocketError> {
1401 self.unblock_scm_socket();
1402
1403 let mut http_listeners = self.http.borrow_mut().give_back_listeners();
1404 for &mut (_, ref mut sock) in http_listeners.iter_mut() {
1405 if let Err(e) = self.poll.registry().deregister(sock) {
1406 error!(
1407 "error deregistering HTTP listen socket({:?}): {:?}",
1408 sock, e
1409 );
1410 }
1411 }
1412
1413 let mut https_listeners = self.https.borrow_mut().give_back_listeners();
1414 for &mut (_, ref mut sock) in https_listeners.iter_mut() {
1415 if let Err(e) = self.poll.registry().deregister(sock) {
1416 error!(
1417 "error deregistering HTTPS listen socket({:?}): {:?}",
1418 sock, e
1419 );
1420 }
1421 }
1422
1423 let mut tcp_listeners = self.tcp.borrow_mut().give_back_listeners();
1424 for &mut (_, ref mut sock) in tcp_listeners.iter_mut() {
1425 if let Err(e) = self.poll.registry().deregister(sock) {
1426 error!("error deregistering TCP listen socket({:?}): {:?}", sock, e);
1427 }
1428 }
1429
1430 let listeners = Listeners {
1432 http: http_listeners
1433 .iter()
1434 .map(|(addr, listener)| (*addr, listener.as_raw_fd()))
1435 .collect(),
1436 tls: https_listeners
1437 .iter()
1438 .map(|(addr, listener)| (*addr, listener.as_raw_fd()))
1439 .collect(),
1440 tcp: tcp_listeners
1441 .iter()
1442 .map(|(addr, listener)| (*addr, listener.as_raw_fd()))
1443 .collect(),
1444 };
1445 info!("sending default listeners: {:?}", listeners);
1446 let res = self.scm.send_listeners(&listeners);
1447
1448 self.block_scm_socket();
1449
1450 info!("sent default listeners: {:?}", res);
1451 res
1452 }
1453
1454 fn block_scm_socket(&mut self) {
1455 if let Err(e) = self.scm.set_blocking(true) {
1456 error!("Could not block scm socket: {}", e);
1457 }
1458 }
1459
1460 fn unblock_scm_socket(&mut self) {
1461 if let Err(e) = self.scm.set_blocking(false) {
1462 error!("Could not unblock scm socket: {}", e);
1463 }
1464 }
1465
1466 pub fn to_session(&self, token: Token) -> SessionToken {
1467 SessionToken(token.0)
1468 }
1469
1470 pub fn from_session(&self, token: SessionToken) -> Token {
1471 Token(token.0)
1472 }
1473
1474 pub fn accept(&mut self, token: ListenToken, protocol: Protocol) {
1475 match protocol {
1476 Protocol::TCPListen => loop {
1477 match self.tcp.borrow_mut().accept(token) {
1478 Ok(sock) => self.accept_queue.push_back((
1479 sock,
1480 token,
1481 Protocol::TCPListen,
1482 Instant::now(),
1483 )),
1484 Err(AcceptError::WouldBlock) => {
1485 self.accept_ready.remove(&token);
1486 break;
1487 }
1488 Err(other) => {
1489 error!("error accepting TCP sockets: {:?}", other);
1490 self.accept_ready.remove(&token);
1491 break;
1492 }
1493 }
1494 },
1495 Protocol::HTTPListen => loop {
1496 match self.http.borrow_mut().accept(token) {
1497 Ok(sock) => self.accept_queue.push_back((
1498 sock,
1499 token,
1500 Protocol::HTTPListen,
1501 Instant::now(),
1502 )),
1503 Err(AcceptError::WouldBlock) => {
1504 self.accept_ready.remove(&token);
1505 break;
1506 }
1507 Err(other) => {
1508 error!("error accepting HTTP sockets: {:?}", other);
1509 self.accept_ready.remove(&token);
1510 break;
1511 }
1512 }
1513 },
1514 Protocol::HTTPSListen => loop {
1515 match self.https.borrow_mut().accept(token) {
1516 Ok(sock) => self.accept_queue.push_back((
1517 sock,
1518 token,
1519 Protocol::HTTPSListen,
1520 Instant::now(),
1521 )),
1522 Err(AcceptError::WouldBlock) => {
1523 self.accept_ready.remove(&token);
1524 break;
1525 }
1526 Err(other) => {
1527 error!("error accepting HTTPS sockets: {:?}", other);
1528 self.accept_ready.remove(&token);
1529 break;
1530 }
1531 }
1532 },
1533 _ => panic!("should not call accept() on a HTTP, HTTPS or TCP session"),
1534 }
1535
1536 gauge!("accept_queue.connections", self.accept_queue.len());
1537 }
1538
1539 pub fn create_sessions(&mut self) {
1540 while let Some((sock, token, protocol, timestamp)) = self.accept_queue.pop_back() {
1541 let wait_time = Instant::now() - timestamp;
1542 time!("accept_queue.wait_time", wait_time.as_millis());
1543 if wait_time > self.accept_queue_timeout {
1544 incr!("accept_queue.timeout");
1545 continue;
1546 }
1547
1548 if !self.sessions.borrow_mut().check_limits() {
1549 break;
1550 }
1551
1552 match protocol {
1556 Protocol::TCPListen => {
1557 let proxy = self.tcp.clone();
1558 if self
1559 .tcp
1560 .borrow_mut()
1561 .create_session(sock, token, wait_time, proxy)
1562 .is_err()
1563 {
1564 break;
1565 }
1566 }
1567 Protocol::HTTPListen => {
1568 let proxy = self.http.clone();
1569 if self
1570 .http
1571 .borrow_mut()
1572 .create_session(sock, token, wait_time, proxy)
1573 .is_err()
1574 {
1575 break;
1576 }
1577 }
1578 Protocol::HTTPSListen => {
1579 if self
1580 .https
1581 .borrow_mut()
1582 .create_session(sock, token, wait_time, self.https.clone())
1583 .is_err()
1584 {
1585 break;
1586 }
1587 }
1588 _ => panic!("should not call accept() on a HTTP, HTTPS or TCP session"),
1589 };
1590 self.sessions.borrow_mut().incr();
1591 }
1592
1593 gauge!("accept_queue.connections", self.accept_queue.len());
1594 }
1595
1596 pub fn ready(&mut self, token: Token, events: Ready) {
1597 trace!("PROXY\t{:?} got events: {:?}", token, events);
1598
1599 let session_token = token.0;
1600 if self.sessions.borrow().slab.contains(session_token) {
1601 let protocol = self.sessions.borrow().slab[session_token]
1603 .borrow()
1604 .protocol();
1605 match protocol {
1607 Protocol::HTTPListen | Protocol::HTTPSListen | Protocol::TCPListen => {
1608 if events.is_readable() {
1610 self.accept_ready.insert(ListenToken(token.0));
1611 if self.sessions.borrow().can_accept {
1612 self.accept(ListenToken(token.0), protocol);
1613 }
1614 return;
1615 }
1616
1617 if events.is_writable() {
1618 error!(
1619 "received writable for listener {:?}, this should not happen",
1620 token
1621 );
1622 return;
1623 }
1624
1625 if events.is_hup() {
1626 error!("should not happen: server {:?} closed", token);
1627 return;
1628 }
1629
1630 unreachable!();
1631 }
1632 _ => {}
1633 }
1634
1635 let session = self.sessions.borrow_mut().slab[session_token].clone();
1636 session.borrow_mut().update_readiness(token, events);
1637 if session.borrow_mut().ready(session.clone()) {
1638 self.kill_session(session);
1639 }
1640 }
1641 }
1642
1643 pub fn timeout(&mut self, token: Token) {
1644 trace!("PROXY\t{:?} got timeout", token);
1645
1646 let session_token = token.0;
1647 if self.sessions.borrow().slab.contains(session_token) {
1648 let session = self.sessions.borrow_mut().slab[session_token].clone();
1649 if session.borrow_mut().timeout(token) {
1650 self.kill_session(session);
1651 }
1652 }
1653 }
1654
1655 pub fn handle_remaining_readiness(&mut self) {
1656 if self.sessions.borrow().can_accept && !self.accept_ready.is_empty() {
1659 while let Some(token) = self
1660 .accept_ready
1661 .iter()
1662 .next()
1663 .map(|token| ListenToken(token.0))
1664 {
1665 let protocol = self.sessions.borrow().slab[token.0].borrow().protocol();
1666 self.accept(token, protocol);
1667 if !self.sessions.borrow().can_accept || self.accept_ready.is_empty() {
1668 break;
1669 }
1670 }
1671 }
1672 }
1673 fn block_channel(&mut self) {
1674 if let Err(e) = self.channel.blocking() {
1675 error!("Could not block channel: {}", e);
1676 }
1677 }
1678 fn unblock_channel(&mut self) {
1679 if let Err(e) = self.channel.nonblocking() {
1680 error!("Could not block channel: {}", e);
1681 }
1682 }
1683}
1684
1685fn worker_response_error<S: ToString, T: ToString>(request_id: S, error: T) -> WorkerResponse {
1688 error!(
1689 "error on request {}, {}",
1690 request_id.to_string(),
1691 error.to_string()
1692 );
1693 WorkerResponse::error(request_id, error)
1694}
1695
1696pub struct ListenSession {
1697 pub protocol: Protocol,
1698}
1699
1700impl ProxySession for ListenSession {
1701 fn last_event(&self) -> Instant {
1702 Instant::now()
1703 }
1704
1705 fn print_session(&self) {}
1706
1707 fn frontend_token(&self) -> Token {
1708 Token(0)
1709 }
1710
1711 fn protocol(&self) -> Protocol {
1712 self.protocol
1713 }
1714
1715 fn ready(&mut self, _session: Rc<RefCell<dyn ProxySession>>) -> SessionIsToBeClosed {
1716 false
1717 }
1718
1719 fn shutting_down(&mut self) -> SessionIsToBeClosed {
1720 false
1721 }
1722
1723 fn update_readiness(&mut self, _token: Token, _events: Ready) {}
1724
1725 fn close(&mut self) {}
1726
1727 fn timeout(&mut self, _token: Token) -> SessionIsToBeClosed {
1728 error!(
1729 "called ProxySession::timeout(token={:?}, time) on ListenSession {{ protocol: {:?} }}",
1730 _token, self.protocol
1731 );
1732 false
1733 }
1734}