sozu_lib/
server.rs

1//! event loop management
2use std::{
3    cell::RefCell,
4    collections::{HashSet, VecDeque},
5    io::Error as IoError,
6    os::unix::io::{AsRawFd, FromRawFd},
7    rc::Rc,
8    time::{Duration, Instant},
9};
10
11use mio::{
12    net::{TcpListener as MioTcpListener, TcpStream},
13    Events, Interest, Poll, Token,
14};
15use slab::Slab;
16
17use sozu_command::{
18    channel::Channel,
19    logging,
20    proto::command::{
21        request::RequestType, response_content::ContentType, ActivateListener, AddBackend,
22        CertificatesWithFingerprints, Cluster, ClusterHashes, ClusterInformations,
23        DeactivateListener, Event, HttpListenerConfig, HttpsListenerConfig, InitialState,
24        ListenerType, LoadBalancingAlgorithms, LoadMetric, MetricsConfiguration, RemoveBackend,
25        Request, ResponseStatus, ServerConfig, TcpListenerConfig as CommandTcpListener,
26        WorkerRequest, WorkerResponse,
27    },
28    ready::Ready,
29    scm_socket::{Listeners, ScmSocket, ScmSocketError},
30    state::ConfigState,
31};
32
33use crate::{
34    backends::{Backend, BackendMap},
35    features::FEATURES,
36    http, https,
37    metrics::METRICS,
38    pool::Pool,
39    tcp,
40    timer::Timer,
41    AcceptError, Protocol, ProxyConfiguration, ProxySession, SessionIsToBeClosed,
42};
43
44// Number of retries to perform on a server after a connection failure
45pub const CONN_RETRIES: u8 = 3;
46
47pub type ProxyChannel = Channel<WorkerResponse, WorkerRequest>;
48
49thread_local! {
50  pub static QUEUE: RefCell<VecDeque<WorkerResponse>> = const { RefCell::new(VecDeque::new()) };
51}
52
53thread_local! {
54  pub static TIMER: RefCell<Timer<Token>> = RefCell::new(Timer::default());
55}
56
57pub fn push_queue(message: WorkerResponse) {
58    QUEUE.with(|queue| {
59        (*queue.borrow_mut()).push_back(message);
60    });
61}
62
63pub fn push_event(event: Event) {
64    QUEUE.with(|queue| {
65        (*queue.borrow_mut()).push_back(WorkerResponse {
66            id: "EVENT".to_string(),
67            message: String::new(),
68            status: ResponseStatus::Processing.into(),
69            content: Some(ContentType::Event(event).into()),
70        });
71    });
72}
73
74#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
75pub struct ListenToken(pub usize);
76#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
77pub struct SessionToken(pub usize);
78
79impl From<usize> for ListenToken {
80    fn from(val: usize) -> ListenToken {
81        ListenToken(val)
82    }
83}
84
85impl From<ListenToken> for usize {
86    fn from(val: ListenToken) -> usize {
87        val.0
88    }
89}
90
91impl From<usize> for SessionToken {
92    fn from(val: usize) -> SessionToken {
93        SessionToken(val)
94    }
95}
96
97impl From<SessionToken> for usize {
98    fn from(val: SessionToken) -> usize {
99        val.0
100    }
101}
102
103pub struct SessionManager {
104    pub max_connections: usize,
105    pub nb_connections: usize,
106    pub can_accept: bool,
107    pub slab: Slab<Rc<RefCell<dyn ProxySession>>>,
108}
109
110impl SessionManager {
111    pub fn new(
112        slab: Slab<Rc<RefCell<dyn ProxySession>>>,
113        max_connections: usize,
114    ) -> Rc<RefCell<Self>> {
115        Rc::new(RefCell::new(SessionManager {
116            max_connections,
117            nb_connections: 0,
118            can_accept: true,
119            slab,
120        }))
121    }
122
123    /// The slab is considered at capacity if it contains more sessions than twice max_connections
124    pub fn at_capacity(&self) -> bool {
125        self.slab.len() >= 10 + 2 * self.max_connections
126    }
127
128    /// Check the number of connections against max_connections, and the slab capacity.
129    /// Returns false if limits are reached.
130    pub fn check_limits(&mut self) -> bool {
131        if self.nb_connections >= self.max_connections {
132            error!("max number of session connection reached, flushing the accept queue");
133            gauge!("accept_queue.backpressure", 1);
134            self.can_accept = false;
135            return false;
136        }
137
138        if self.at_capacity() {
139            error!("not enough memory to accept another session, flushing the accept queue");
140            error!(
141                "nb_connections: {}, max_connections: {}",
142                self.nb_connections, self.max_connections
143            );
144            gauge!("accept_queue.backpressure", 1);
145            self.can_accept = false;
146
147            return false;
148        }
149
150        true
151    }
152
153    pub fn to_session(token: Token) -> SessionToken {
154        SessionToken(token.0)
155    }
156
157    pub fn incr(&mut self) {
158        self.nb_connections += 1;
159        assert!(self.nb_connections <= self.max_connections);
160        gauge!("client.connections", self.nb_connections);
161    }
162
163    /// Decrements the number of sessions, start accepting new connections
164    /// if the capacity limit of 90% has not been reached.
165    pub fn decr(&mut self) {
166        assert!(self.nb_connections != 0);
167        self.nb_connections -= 1;
168        gauge!("client.connections", self.nb_connections);
169
170        // do not be ready to accept right away, wait until we get back to 10% capacity
171        if !self.can_accept && self.nb_connections < self.max_connections * 90 / 100 {
172            debug!(
173                "nb_connections = {}, max_connections = {}, starting to accept again",
174                self.nb_connections, self.max_connections
175            );
176            gauge!("accept_queue.backpressure", 0);
177            self.can_accept = true;
178        }
179    }
180}
181
182#[derive(thiserror::Error, Debug)]
183pub enum ServerError {
184    #[error("could not create event loop with MIO poll: {0}")]
185    CreatePoll(IoError),
186    #[error("could not clone the MIO registry: {0}")]
187    CloneRegistry(IoError),
188    #[error("could not register the channel: {0}")]
189    RegisterChannel(IoError),
190    #[error("{msg}:{scm_err}")]
191    ScmSocket {
192        msg: String,
193        scm_err: ScmSocketError,
194    },
195}
196
197/// `Server` handles the event loop, the listeners, the sessions and
198/// communication with the configuration channel.
199///
200/// A listener wraps a listen socket, the associated proxying protocols
201/// (HTTP, HTTPS and TCP) and the routing configuration for clusters.
202/// Listeners handle creating sessions from accepted sockets.
203///
204/// A session manages a "front" socket for a connected client, and all
205/// of the associated data (back socket, protocol state machine, buffers,
206/// metrics...).
207///
208/// `Server` gets configuration updates from the channel (domIN/path routes,
209/// backend server address...).
210///
211/// Listeners and sessions are all stored in a slab structure to index them
212/// by a [Token], they all have to implement the [ProxySession] trait.
213pub struct Server {
214    accept_queue_timeout: Duration,
215    accept_queue: VecDeque<(TcpStream, ListenToken, Protocol, Instant)>,
216    accept_ready: HashSet<ListenToken>,
217    backends: Rc<RefCell<BackendMap>>,
218    base_sessions_count: usize,
219    channel: ProxyChannel,
220    config_state: ConfigState,
221    current_poll_errors: i32,
222    http: Rc<RefCell<http::HttpProxy>>,
223    https: Rc<RefCell<https::HttpsProxy>>,
224    last_sessions_len: usize,
225    last_shutting_down_message: Option<Instant>,
226    last_zombie_check: Instant,
227    loop_start: Instant,
228    max_poll_errors: i32, // TODO: make this configurable? this defaults to 10000 for now
229    pub poll: Poll,
230    poll_timeout: Option<Duration>, // TODO: make this configurable? this defaults to 1000 milliseconds for now
231    scm_listeners: Option<Listeners>,
232    scm: ScmSocket,
233    sessions: Rc<RefCell<SessionManager>>,
234    should_poll_at: Option<Instant>,
235    shutting_down: Option<String>,
236    tcp: Rc<RefCell<tcp::TcpProxy>>,
237    zombie_check_interval: Duration,
238}
239
240impl Server {
241    pub fn try_new_from_config(
242        worker_to_main_channel: ProxyChannel,
243        worker_to_main_scm: ScmSocket,
244        config: ServerConfig,
245        initial_state: InitialState,
246        expects_initial_status: bool,
247    ) -> Result<Self, ServerError> {
248        let event_loop = Poll::new().map_err(ServerError::CreatePoll)?;
249        let pool = Rc::new(RefCell::new(Pool::with_capacity(
250            config.min_buffers as usize,
251            config.max_buffers as usize,
252            config.buffer_size as usize,
253        )));
254        let backends = Rc::new(RefCell::new(BackendMap::new()));
255
256        //FIXME: we will use a few entries for the channel, metrics socket and the listeners
257        //FIXME: for HTTP/2, we will have more than 2 entries per session
258        let sessions: Rc<RefCell<SessionManager>> = SessionManager::new(
259            Slab::with_capacity(config.slab_capacity() as usize),
260            config.max_connections as usize,
261        );
262        {
263            let mut s = sessions.borrow_mut();
264            let entry = s.slab.vacant_entry();
265            trace!("taking token {:?} for channel", SessionToken(entry.key()));
266            entry.insert(Rc::new(RefCell::new(ListenSession {
267                protocol: Protocol::Channel,
268            })));
269        }
270        {
271            let mut s = sessions.borrow_mut();
272            let entry = s.slab.vacant_entry();
273            trace!("taking token {:?} for metrics", SessionToken(entry.key()));
274            entry.insert(Rc::new(RefCell::new(ListenSession {
275                protocol: Protocol::Timer,
276            })));
277        }
278        {
279            let mut s = sessions.borrow_mut();
280            let entry = s.slab.vacant_entry();
281            trace!("taking token {:?} for metrics", SessionToken(entry.key()));
282            entry.insert(Rc::new(RefCell::new(ListenSession {
283                protocol: Protocol::Metrics,
284            })));
285        }
286
287        Server::new(
288            event_loop,
289            worker_to_main_channel,
290            worker_to_main_scm,
291            sessions,
292            pool,
293            backends,
294            None,
295            None,
296            None,
297            config,
298            Some(initial_state),
299            expects_initial_status,
300        )
301    }
302
303    #[allow(clippy::too_many_arguments)]
304    pub fn new(
305        poll: Poll,
306        mut channel: ProxyChannel,
307        scm: ScmSocket,
308        sessions: Rc<RefCell<SessionManager>>,
309        pool: Rc<RefCell<Pool>>,
310        backends: Rc<RefCell<BackendMap>>,
311        http: Option<http::HttpProxy>,
312        https: Option<https::HttpsProxy>,
313        tcp: Option<tcp::TcpProxy>,
314        server_config: ServerConfig,
315        initial_state: Option<InitialState>,
316        expects_initial_status: bool,
317    ) -> Result<Self, ServerError> {
318        FEATURES.with(|_features| {
319            // initializing feature flags
320        });
321
322        poll.registry()
323            .register(
324                &mut channel,
325                Token(0),
326                Interest::READABLE | Interest::WRITABLE,
327            )
328            .map_err(ServerError::RegisterChannel)?;
329
330        METRICS.with(|metrics| {
331            if let Some(sock) = (*metrics.borrow_mut()).socket_mut() {
332                poll.registry()
333                    .register(sock, Token(2), Interest::WRITABLE)
334                    .expect("should register the metrics socket");
335            }
336        });
337
338        let base_sessions_count = sessions.borrow().slab.len();
339
340        let http = Rc::new(RefCell::new(match http {
341            Some(http) => http,
342            None => {
343                let registry = poll
344                    .registry()
345                    .try_clone()
346                    .map_err(ServerError::CloneRegistry)?;
347
348                http::HttpProxy::new(registry, sessions.clone(), pool.clone(), backends.clone())
349            }
350        }));
351
352        let https = Rc::new(RefCell::new(match https {
353            Some(https) => https,
354            None => {
355                let registry = poll
356                    .registry()
357                    .try_clone()
358                    .map_err(ServerError::CloneRegistry)?;
359
360                https::HttpsProxy::new(registry, sessions.clone(), pool.clone(), backends.clone())
361            }
362        }));
363
364        let tcp = Rc::new(RefCell::new(match tcp {
365            Some(tcp) => tcp,
366            None => {
367                let registry = poll
368                    .registry()
369                    .try_clone()
370                    .map_err(ServerError::CloneRegistry)?;
371
372                tcp::TcpProxy::new(registry, sessions.clone(), pool.clone(), backends.clone())
373            }
374        }));
375
376        let mut server = Server {
377            accept_queue_timeout: Duration::from_secs(u64::from(
378                server_config.accept_queue_timeout,
379            )),
380            accept_queue: VecDeque::new(),
381            accept_ready: HashSet::new(),
382            backends,
383            base_sessions_count,
384            channel,
385            config_state: ConfigState::new(),
386            current_poll_errors: 0,
387            http,
388            https,
389            last_sessions_len: 0, // to be reset on server run
390            last_shutting_down_message: None,
391            last_zombie_check: Instant::now(), // to be reset on server run
392            loop_start: Instant::now(),        // to be reset on server run
393            max_poll_errors: 10000,            // TODO: make it configurable?
394            poll_timeout: Some(Duration::from_millis(1000)), // TODO: make it configurable?
395            poll,
396            scm_listeners: None,
397            scm,
398            sessions,
399            should_poll_at: None,
400            shutting_down: None,
401            tcp,
402            zombie_check_interval: Duration::from_secs(u64::from(
403                server_config.zombie_check_interval,
404            )),
405        };
406
407        // initialize the worker with the state we got from a file
408        if let Some(state) = initial_state {
409            for request in state.requests {
410                trace!("generating initial config request: {:#?}", request);
411                server.notify_proxys(request);
412            }
413
414            // do not send back answers to the initialization messages
415            QUEUE.with(|queue| {
416                (*queue.borrow_mut()).clear();
417            });
418        }
419
420        if expects_initial_status {
421            // the main process sends a Status message, so we can notify it
422            // when the initial state is loaded
423            server.block_channel();
424            let msg = server.channel.read_message();
425            debug!("got message: {:?}", msg);
426
427            if let Ok(WorkerRequest {
428                id,
429                content:
430                    Request {
431                        request_type: Some(RequestType::Status(_)),
432                    },
433            }) = msg
434            {
435                if let Err(e) = server.channel.write_message(&WorkerResponse::ok(id)) {
436                    error!("Could not send an ok to the main process: {}", e);
437                }
438            } else {
439                panic!("plz give me a status request first when I start, you sent me this instead: {:?}", msg);
440            }
441            server.unblock_channel();
442        }
443
444        info!("will try to receive listeners");
445        server
446            .scm
447            .set_blocking(true)
448            .map_err(|scm_err| ServerError::ScmSocket {
449                msg: "Could not set the scm socket to blocking".to_string(),
450                scm_err,
451            })?;
452        let listeners =
453            server
454                .scm
455                .receive_listeners()
456                .map_err(|scm_err| ServerError::ScmSocket {
457                    msg: "could not receive listeners from the scm socket".to_string(),
458                    scm_err,
459                })?;
460        server
461            .scm
462            .set_blocking(false)
463            .map_err(|scm_err| ServerError::ScmSocket {
464                msg: "Could not set the scm socket to unblocking".to_string(),
465                scm_err,
466            })?;
467        info!("received listeners: {:?}", listeners);
468        server.scm_listeners = Some(listeners);
469
470        Ok(server)
471    }
472
473    /// The server runs in a loop until a shutdown is ordered
474    pub fn run(&mut self) {
475        let mut events = Events::with_capacity(1024); // TODO: make event capacity configurable?
476        self.last_sessions_len = self.sessions.borrow().slab.len();
477
478        self.last_zombie_check = Instant::now();
479        self.loop_start = Instant::now();
480
481        loop {
482            self.check_for_poll_errors();
483
484            let timeout = self.reset_loop_time_and_get_timeout();
485
486            match self.poll.poll(&mut events, timeout) {
487                Ok(_) => self.current_poll_errors = 0,
488                Err(error) => {
489                    error!("Error while polling events: {:?}", error);
490                    self.current_poll_errors += 1;
491                    continue;
492                }
493            }
494
495            let after_epoll = Instant::now();
496            time!("epoll_time", (after_epoll - self.loop_start).as_millis());
497            self.loop_start = after_epoll;
498
499            self.send_queue();
500
501            for event in events.iter() {
502                match event.token() {
503                    // this is the command channel
504                    Token(0) => {
505                        if event.is_error() {
506                            error!("error reading from command channel");
507                            continue;
508                        }
509                        if event.is_read_closed() || event.is_write_closed() {
510                            error!("command channel was closed");
511                            return;
512                        }
513                        let ready = Ready::from(event);
514                        self.channel.handle_events(ready);
515
516                        // loop here because iterations has borrow issues
517                        loop {
518                            QUEUE.with(|queue| {
519                                if !(*queue.borrow()).is_empty() {
520                                    self.channel.interest.insert(Ready::WRITABLE);
521                                }
522                            });
523
524                            //trace!("WORKER[{}] channel readiness={:?}, interest={:?}, queue={} elements",
525                            //  line!(), self.channel.readiness, self.channel.interest, self.queue.len());
526                            if self.channel.readiness() == Ready::EMPTY {
527                                break;
528                            }
529
530                            // exit the big loop if the message is HardStop
531                            if self.read_channel_messages_and_notify() {
532                                return;
533                            }
534
535                            QUEUE.with(|queue| {
536                                if !(*queue.borrow()).is_empty() {
537                                    self.channel.interest.insert(Ready::WRITABLE);
538                                }
539                            });
540
541                            self.send_queue();
542                        }
543                    }
544                    // timer tick
545                    Token(1) => {
546                        while let Some(t) = TIMER.with(|timer| timer.borrow_mut().poll()) {
547                            self.timeout(t);
548                        }
549                    }
550                    // metrics socket is writable
551                    Token(2) => METRICS.with(|metrics| {
552                        (*metrics.borrow_mut()).writable();
553                    }),
554                    // ListenToken: 1 listener <=> 1 token
555                    // ProtocolToken (HTTP/HTTPS/TCP): 1 connection <=> 1 token
556                    token => self.ready(token, Ready::from(event)),
557                }
558            }
559
560            if let Some(t) = self.should_poll_at.as_ref() {
561                if *t <= Instant::now() {
562                    while let Some(t) = TIMER.with(|timer| timer.borrow_mut().poll()) {
563                        //info!("polled for timeout: {:?}", t);
564                        self.timeout(t);
565                    }
566                }
567            }
568            self.handle_remaining_readiness();
569            self.create_sessions();
570
571            self.should_poll_at = TIMER.with(|timer| timer.borrow().next_poll_date());
572
573            self.zombie_check();
574
575            let now = time::OffsetDateTime::now_utc();
576            // clear the local metrics drain every plain hour (01:00, 02:00, etc.) to prevent memory overuse
577            // TODO: have one-hour-lasting metrics instead
578            if now.minute() == 00 && now.second() == 0 {
579                METRICS.with(|metrics| {
580                    (*metrics.borrow_mut()).clear_local();
581                });
582            }
583
584            gauge!("client.connections", self.sessions.borrow().nb_connections);
585            gauge!("slab.entries", self.sessions.borrow().slab.len());
586            METRICS.with(|metrics| {
587                (*metrics.borrow_mut()).send_data();
588            });
589
590            if self.shutting_down.is_some() && self.shut_down_sessions() {
591                return;
592            }
593        }
594    }
595
596    fn check_for_poll_errors(&mut self) {
597        if self.current_poll_errors >= self.max_poll_errors {
598            error!(
599                "Something is going very wrong. Last {} poll() calls failed, crashing..",
600                self.current_poll_errors
601            );
602            panic!(
603                "poll() calls failed {} times in a row",
604                self.current_poll_errors
605            );
606        }
607    }
608
609    fn reset_loop_time_and_get_timeout(&mut self) -> Option<Duration> {
610        let now = Instant::now();
611        time!("event_loop_time", (now - self.loop_start).as_millis());
612
613        let timeout = match self.should_poll_at.as_ref() {
614            None => self.poll_timeout,
615            Some(i) => {
616                if *i <= now {
617                    self.poll_timeout
618                } else {
619                    let dur = *i - now;
620                    match self.poll_timeout {
621                        None => Some(dur),
622                        Some(t) => {
623                            if t < dur {
624                                Some(t)
625                            } else {
626                                Some(dur)
627                            }
628                        }
629                    }
630                }
631            }
632        };
633
634        self.loop_start = now;
635        timeout
636    }
637
638    /// Returns true if hardstop
639    fn read_channel_messages_and_notify(&mut self) -> bool {
640        if !self.channel.readiness().is_readable() {
641            return false;
642        }
643
644        if let Err(e) = self.channel.readable() {
645            error!("error reading from channel: {:?}", e);
646        }
647
648        loop {
649            let request = self.channel.read_message();
650            debug!("Received request {:?}", request);
651            match request {
652                Ok(request) => match request.content.request_type {
653                    Some(RequestType::HardStop(_)) => {
654                        let req_id = request.id.clone();
655                        self.notify(request);
656                        if let Err(e) = self.channel.write_message(&WorkerResponse::ok(req_id)) {
657                            error!("Could not send ok response to the main process: {}", e);
658                        }
659                        if let Err(e) = self.channel.run() {
660                            error!("Error while running the server channel: {}", e);
661                        }
662                        return true;
663                    }
664                    Some(RequestType::SoftStop(_)) => {
665                        self.shutting_down = Some(request.id.clone());
666                        self.last_sessions_len = self.sessions.borrow().slab.len();
667                        self.notify(request);
668                    }
669                    Some(RequestType::ReturnListenSockets(_)) => {
670                        info!("received ReturnListenSockets order");
671                        match self.return_listen_sockets() {
672                            Ok(_) => push_queue(WorkerResponse::ok(request.id)),
673                            Err(error) => push_queue(worker_response_error(
674                                request.id,
675                                format!("Could not send listeners on scm socket: {error:?}"),
676                            )),
677                        }
678                    }
679                    _ => self.notify(request),
680                },
681                // Not an error per se, occurs when there is nothing to read
682                Err(_) => {
683                    // if the message was too large, we grow the buffer and retry to read if possible
684                    if (self.channel.interest & self.channel.readiness).is_readable() {
685                        if let Err(e) = self.channel.readable() {
686                            error!("error reading from channel: {:?}", e);
687                        }
688                        continue;
689                    }
690                    break;
691                }
692            }
693        }
694        false
695    }
696
697    /// Scans all sessions that have been inactive for longer than the configured interval
698    fn zombie_check(&mut self) {
699        let now = Instant::now();
700        if now - self.last_zombie_check < self.zombie_check_interval {
701            return;
702        }
703        info!("zombie check");
704        self.last_zombie_check = now;
705
706        let mut zombie_tokens = HashSet::new();
707
708        // find the zombie sessions
709        for (_index, session) in self
710            .sessions
711            .borrow_mut()
712            .slab
713            .iter_mut()
714            .filter(|(_, c)| now - c.borrow().last_event() > self.zombie_check_interval)
715        {
716            let session_token = session.borrow().frontend_token();
717            if !zombie_tokens.contains(&session_token) {
718                session.borrow().print_session();
719                zombie_tokens.insert(session_token);
720            }
721        }
722
723        let zombie_count = zombie_tokens.len() as i64;
724        count!("zombies", zombie_count);
725
726        let remaining_count = self.shut_down_sessions_by_frontend_tokens(zombie_tokens);
727        info!(
728            "removing {} zombies ({} remaining entries after close)",
729            zombie_count, remaining_count
730        );
731    }
732
733    /// Calls close on targeted sessions, yields the number of entries in the slab
734    /// that were not properly removed
735    fn shut_down_sessions_by_frontend_tokens(&self, tokens: HashSet<Token>) -> usize {
736        if tokens.is_empty() {
737            return 0;
738        }
739
740        // close the sessions associated with the tokens
741        for token in &tokens {
742            if self.sessions.borrow().slab.contains(token.0) {
743                let session = { self.sessions.borrow_mut().slab.remove(token.0) };
744                session.borrow_mut().close();
745                self.sessions.borrow_mut().decr();
746            }
747        }
748
749        // find the entries of closed sessions in the session manager (they should not be there)
750        let mut dangling_entries = HashSet::new();
751        for (entry_key, session) in &self.sessions.borrow().slab {
752            if tokens.contains(&session.borrow().frontend_token()) {
753                dangling_entries.insert(entry_key);
754            }
755        }
756
757        // remove these from the session manager
758        let mut dangling_entries_count = 0;
759        for entry_key in dangling_entries {
760            let mut sessions = self.sessions.borrow_mut();
761            if sessions.slab.contains(entry_key) {
762                sessions.slab.remove(entry_key);
763                dangling_entries_count += 1;
764            }
765        }
766        dangling_entries_count
767    }
768
769    /// Order sessions to shut down, check that they are all down
770    fn shut_down_sessions(&mut self) -> bool {
771        let sessions_count = self.sessions.borrow().slab.len();
772        let mut sessions_to_shut_down = HashSet::new();
773
774        for (_key, session) in &self.sessions.borrow().slab {
775            if session.borrow_mut().shutting_down() {
776                sessions_to_shut_down.insert(Token(session.borrow().frontend_token().0));
777            }
778        }
779        let _ = self.shut_down_sessions_by_frontend_tokens(sessions_to_shut_down);
780
781        let new_sessions_count = self.sessions.borrow().slab.len();
782
783        if new_sessions_count < sessions_count {
784            let now = Instant::now();
785            if let Some(last) = self.last_shutting_down_message {
786                if (now - last) > Duration::from_secs(5) {
787                    info!(
788                        "closed {} sessions, {} sessions left, base_sessions_count = {}",
789                        sessions_count - new_sessions_count,
790                        new_sessions_count,
791                        self.base_sessions_count
792                    );
793                }
794            }
795            self.last_shutting_down_message = Some(now);
796        }
797
798        if new_sessions_count <= self.base_sessions_count {
799            info!("last session stopped, shutting down!");
800            if let Err(e) = self.channel.run() {
801                error!("Error while running the server channel: {}", e);
802            }
803            // self.block_channel();
804            let id = self
805                .shutting_down
806                .take()
807                .expect("should have shut down correctly"); // panicking here makes sense actually
808
809            debug!("Responding OK to main process for request {}", id);
810
811            let proxy_response = WorkerResponse::ok(id);
812            if let Err(e) = self.channel.write_message(&proxy_response) {
813                error!("Could not write response to the main process: {}", e);
814            }
815            if let Err(e) = self.channel.run() {
816                error!("Error while running the server channel: {}", e);
817            }
818            return true;
819        }
820
821        if new_sessions_count < self.last_sessions_len {
822            info!(
823                "shutting down, {} slab elements remaining (base: {})",
824                new_sessions_count - self.base_sessions_count,
825                self.base_sessions_count
826            );
827            self.last_sessions_len = new_sessions_count;
828        }
829
830        false
831    }
832
833    fn kill_session(&self, session: Rc<RefCell<dyn ProxySession>>) {
834        let token = session.borrow().frontend_token();
835        let _ = self.shut_down_sessions_by_frontend_tokens(HashSet::from([token]));
836    }
837
838    fn send_queue(&mut self) {
839        if self.channel.readiness.is_writable() {
840            QUEUE.with(|q| {
841                let mut queue = q.borrow_mut();
842                loop {
843                    if let Some(resp) = queue.pop_front() {
844                        debug!("Sending response {:?}", resp);
845                        if let Err(e) = self.channel.write_message(&resp) {
846                            error!("Could not write message {} on the channel: {}", resp, e);
847                            queue.push_front(resp);
848                        }
849                    }
850
851                    if self.channel.back_buf.available_data() > 0 {
852                        if let Err(e) = self.channel.writable() {
853                            error!("error writing to channel: {:?}", e);
854                        }
855                    }
856
857                    if !self.channel.readiness.is_writable() {
858                        break;
859                    }
860
861                    if self.channel.back_buf.available_data() == 0 && queue.len() == 0 {
862                        break;
863                    }
864                }
865            });
866        }
867    }
868
869    fn notify(&mut self, message: WorkerRequest) {
870        match &message.content.request_type {
871            Some(RequestType::ConfigureMetrics(configuration)) => {
872                match MetricsConfiguration::try_from(*configuration) {
873                    Ok(metrics_config) => {
874                        METRICS.with(|metrics| {
875                            (*metrics.borrow_mut()).configure(&metrics_config);
876                            push_queue(WorkerResponse::ok(message.id));
877                        });
878                    }
879                    Err(e) => {
880                        error!("Error configuring metrics: {}", e);
881                        push_queue(WorkerResponse::error(message.id, e));
882                    }
883                }
884                return;
885            }
886            Some(RequestType::QueryMetrics(query_metrics_options)) => {
887                METRICS.with(|metrics| {
888                    match (*metrics.borrow_mut()).query(query_metrics_options) {
889                        Ok(c) => push_queue(WorkerResponse::ok_with_content(message.id, c)),
890                        Err(e) => {
891                            error!("Error querying metrics: {}", e);
892                            push_queue(WorkerResponse::error(message.id, e))
893                        }
894                    }
895                });
896                return;
897            }
898            Some(RequestType::Logging(logging_filter)) => {
899                info!(
900                    "{} changing logging filter to {}",
901                    message.id, logging_filter
902                );
903                // there should not be any errors as it was already parsed by the main process
904                let (directives, _errors) = logging::parse_logging_spec(logging_filter);
905                logging::LOGGER.with(|logger| {
906                    logger.borrow_mut().set_directives(directives);
907                });
908                push_queue(WorkerResponse::ok(message.id));
909                return;
910            }
911            Some(RequestType::QueryClustersHashes(_)) => {
912                push_queue(WorkerResponse::ok_with_content(
913                    message.id.clone(),
914                    ContentType::ClusterHashes(ClusterHashes {
915                        map: self.config_state.hash_state(),
916                    })
917                    .into(),
918                ));
919                return;
920            }
921            Some(RequestType::QueryClusterById(cluster_id)) => {
922                push_queue(WorkerResponse::ok_with_content(
923                    message.id.clone(),
924                    ContentType::Clusters(ClusterInformations {
925                        vec: self
926                            .config_state
927                            .cluster_state(cluster_id)
928                            .map_or(vec![], |ci| vec![ci]),
929                    })
930                    .into(),
931                ));
932            }
933            Some(RequestType::QueryClustersByDomain(domain)) => {
934                let cluster_ids = self
935                    .config_state
936                    .get_cluster_ids_by_domain(domain.hostname.clone(), domain.path.clone());
937                let vec = cluster_ids
938                    .iter()
939                    .filter_map(|cluster_id| self.config_state.cluster_state(cluster_id))
940                    .collect();
941
942                push_queue(WorkerResponse::ok_with_content(
943                    message.id.clone(),
944                    ContentType::Clusters(ClusterInformations { vec }).into(),
945                ));
946                return;
947            }
948            Some(RequestType::QueryCertificatesFromWorkers(filters)) => {
949                if filters.fingerprint.is_some() {
950                    let certs = self.config_state.get_certificates(filters.clone());
951                    let response = if !certs.is_empty() {
952                        WorkerResponse::ok_with_content(
953                            message.id.clone(),
954                            ContentType::CertificatesWithFingerprints(
955                                CertificatesWithFingerprints { certs },
956                            )
957                            .into(),
958                        )
959                    } else {
960                        worker_response_error(
961                            message.id.clone(),
962                            "Could not find certificate for this fingerprint",
963                        )
964                    };
965                    push_queue(response);
966                    return;
967                }
968                // if all certificates are queried, or filtered by domain name,
969                // the request will be handled by the https proxy
970            }
971            _other_request => {}
972        }
973        self.notify_proxys(message);
974    }
975
976    pub fn notify_proxys(&mut self, request: WorkerRequest) {
977        if let Err(e) = self.config_state.dispatch(&request.content) {
978            error!("Could not execute order on config state: {}", e);
979        }
980
981        let req_id = request.id.clone();
982
983        match request.content.request_type {
984            Some(RequestType::AddCluster(ref cluster)) => {
985                self.add_cluster(cluster);
986                //not returning because the message must still be handled by each proxy
987            }
988            Some(RequestType::AddBackend(ref backend)) => {
989                push_queue(self.add_backend(&req_id, backend));
990                return;
991            }
992            Some(RequestType::RemoveBackend(ref remove_backend)) => {
993                push_queue(self.remove_backend(&req_id, remove_backend));
994                return;
995            }
996            _ => {}
997        };
998
999        let proxy_destinations = request.content.get_destinations();
1000        let mut notify_response = None;
1001        if proxy_destinations.to_http_proxy {
1002            notify_response = Some(self.http.borrow_mut().notify(request.clone()));
1003        }
1004        if proxy_destinations.to_https_proxy {
1005            let http_proxy_response = self.https.borrow_mut().notify(request.clone());
1006            if http_proxy_response.is_failure() || notify_response.is_none() {
1007                notify_response = Some(http_proxy_response);
1008            }
1009        }
1010        if proxy_destinations.to_tcp_proxy {
1011            let tcp_proxy_response = self.tcp.borrow_mut().notify(request.clone());
1012            if tcp_proxy_response.is_failure() || notify_response.is_none() {
1013                notify_response = Some(tcp_proxy_response);
1014            }
1015        }
1016        if let Some(response) = notify_response {
1017            push_queue(response);
1018        }
1019
1020        match request.content.request_type {
1021            // special case for adding listeners, because we need to register a listener
1022            Some(RequestType::AddHttpListener(listener)) => {
1023                push_queue(self.notify_add_http_listener(&req_id, listener));
1024            }
1025            Some(RequestType::AddHttpsListener(listener)) => {
1026                push_queue(self.notify_add_https_listener(&req_id, listener));
1027            }
1028            Some(RequestType::AddTcpListener(listener)) => {
1029                push_queue(self.notify_add_tcp_listener(&req_id, listener));
1030            }
1031            Some(RequestType::RemoveListener(ref remove)) => {
1032                debug!("{} remove {:?} listener {:?}", req_id, remove.proxy, remove);
1033                self.base_sessions_count -= 1;
1034                let response = match ListenerType::try_from(remove.proxy) {
1035                    Ok(ListenerType::Http) => self.http.borrow_mut().notify(request),
1036                    Ok(ListenerType::Https) => self.https.borrow_mut().notify(request),
1037                    Ok(ListenerType::Tcp) => self.tcp.borrow_mut().notify(request),
1038                    Err(_) => WorkerResponse::error(req_id, "Wrong variant ListenerType"),
1039                };
1040                push_queue(response);
1041            }
1042            Some(RequestType::ActivateListener(ref activate)) => {
1043                push_queue(self.notify_activate_listener(&req_id, activate));
1044            }
1045            Some(RequestType::DeactivateListener(ref deactivate)) => {
1046                push_queue(self.notify_deactivate_listener(&req_id, deactivate));
1047            }
1048            _other_request => {}
1049        };
1050    }
1051
1052    fn add_cluster(&mut self, cluster: &Cluster) {
1053        self.backends
1054            .borrow_mut()
1055            .set_load_balancing_policy_for_cluster(
1056                &cluster.cluster_id,
1057                LoadBalancingAlgorithms::try_from(cluster.load_balancing).unwrap_or_default(),
1058                cluster
1059                    .load_metric
1060                    .and_then(|n| LoadMetric::try_from(n).ok()),
1061            );
1062    }
1063
1064    fn add_backend(&mut self, req_id: &str, add_backend: &AddBackend) -> WorkerResponse {
1065        let new_backend = Backend::new(
1066            &add_backend.backend_id,
1067            add_backend.address.into(),
1068            add_backend.sticky_id.clone(),
1069            add_backend.load_balancing_parameters,
1070            add_backend.backup,
1071        );
1072        self.backends
1073            .borrow_mut()
1074            .add_backend(&add_backend.cluster_id, new_backend);
1075
1076        WorkerResponse::ok(req_id)
1077    }
1078
1079    fn remove_backend(&mut self, req_id: &str, backend: &RemoveBackend) -> WorkerResponse {
1080        let address = backend.address.into();
1081        self.backends
1082            .borrow_mut()
1083            .remove_backend(&backend.cluster_id, &address);
1084
1085        WorkerResponse::ok(req_id)
1086    }
1087
1088    fn notify_add_http_listener(
1089        &mut self,
1090        req_id: &str,
1091        listener: HttpListenerConfig,
1092    ) -> WorkerResponse {
1093        debug!("{} add http listener {:?}", req_id, listener);
1094
1095        if self.sessions.borrow().at_capacity() {
1096            return worker_response_error(req_id, "session list is full, cannot add a listener");
1097        }
1098
1099        let mut session_manager = self.sessions.borrow_mut();
1100        let entry = session_manager.slab.vacant_entry();
1101        let token = Token(entry.key());
1102
1103        match self.http.borrow_mut().add_listener(listener, token) {
1104            Ok(_token) => {
1105                entry.insert(Rc::new(RefCell::new(ListenSession {
1106                    protocol: Protocol::HTTPListen,
1107                })));
1108                self.base_sessions_count += 1;
1109                WorkerResponse::ok(req_id)
1110            }
1111            Err(e) => worker_response_error(req_id, format!("Could not add HTTP listener: {e}")),
1112        }
1113    }
1114
1115    fn notify_add_https_listener(
1116        &mut self,
1117        req_id: &str,
1118        listener: HttpsListenerConfig,
1119    ) -> WorkerResponse {
1120        debug!("{} add https listener {:?}", req_id, listener);
1121
1122        if self.sessions.borrow().at_capacity() {
1123            return worker_response_error(req_id, "session list is full, cannot add a listener");
1124        }
1125
1126        let mut session_manager = self.sessions.borrow_mut();
1127        let entry = session_manager.slab.vacant_entry();
1128        let token = Token(entry.key());
1129
1130        match self
1131            .https
1132            .borrow_mut()
1133            .add_listener(listener.clone(), token)
1134        {
1135            Ok(_token) => {
1136                entry.insert(Rc::new(RefCell::new(ListenSession {
1137                    protocol: Protocol::HTTPSListen,
1138                })));
1139                self.base_sessions_count += 1;
1140                WorkerResponse::ok(req_id)
1141            }
1142            Err(e) => worker_response_error(req_id, format!("Could not add HTTPS listener: {e}")),
1143        }
1144    }
1145
1146    fn notify_add_tcp_listener(
1147        &mut self,
1148        req_id: &str,
1149        listener: CommandTcpListener,
1150    ) -> WorkerResponse {
1151        debug!("{} add tcp listener {:?}", req_id, listener);
1152
1153        if self.sessions.borrow().at_capacity() {
1154            return worker_response_error(req_id, "session list is full, cannot add a listener");
1155        }
1156
1157        let mut session_manager = self.sessions.borrow_mut();
1158        let entry = session_manager.slab.vacant_entry();
1159        let token = Token(entry.key());
1160
1161        match self.tcp.borrow_mut().add_listener(listener, token) {
1162            Ok(_token) => {
1163                entry.insert(Rc::new(RefCell::new(ListenSession {
1164                    protocol: Protocol::TCPListen,
1165                })));
1166                self.base_sessions_count += 1;
1167                WorkerResponse::ok(req_id)
1168            }
1169            Err(e) => worker_response_error(req_id, format!("Could not add TCP listener: {e}")),
1170        }
1171    }
1172
1173    fn notify_activate_listener(
1174        &mut self,
1175        req_id: &str,
1176        activate: &ActivateListener,
1177    ) -> WorkerResponse {
1178        debug!(
1179            "{} activate {:?} listener {:?}",
1180            req_id, activate.proxy, activate
1181        );
1182
1183        let address: std::net::SocketAddr = activate.address.into();
1184
1185        match ListenerType::try_from(activate.proxy) {
1186            Ok(ListenerType::Http) => {
1187                let listener = self
1188                    .scm_listeners
1189                    .as_mut()
1190                    .and_then(|s| s.get_http(&address))
1191                    .map(|fd| unsafe { MioTcpListener::from_raw_fd(fd) });
1192
1193                let activated_token = self.http.borrow_mut().activate_listener(&address, listener);
1194                match activated_token {
1195                    Ok(token) => {
1196                        self.accept(ListenToken(token.0), Protocol::HTTPListen);
1197                        WorkerResponse::ok(req_id)
1198                    }
1199                    Err(activate_error) => worker_response_error(
1200                        req_id,
1201                        format!("Could not activate HTTP listener: {}", activate_error),
1202                    ),
1203                }
1204            }
1205            Ok(ListenerType::Https) => {
1206                let listener = self
1207                    .scm_listeners
1208                    .as_mut()
1209                    .and_then(|s| s.get_https(&address))
1210                    .map(|fd| unsafe { MioTcpListener::from_raw_fd(fd) });
1211
1212                let activated_token = self
1213                    .https
1214                    .borrow_mut()
1215                    .activate_listener(&address, listener);
1216                match activated_token {
1217                    Ok(token) => {
1218                        self.accept(ListenToken(token.0), Protocol::HTTPSListen);
1219                        WorkerResponse::ok(req_id)
1220                    }
1221                    Err(activate_error) => worker_response_error(
1222                        req_id,
1223                        format!("Could not activate HTTPS listener: {}", activate_error),
1224                    ),
1225                }
1226            }
1227            Ok(ListenerType::Tcp) => {
1228                let listener = self
1229                    .scm_listeners
1230                    .as_mut()
1231                    .and_then(|s| s.get_tcp(&address))
1232                    .map(|fd| unsafe { MioTcpListener::from_raw_fd(fd) });
1233
1234                let listener_token = self.tcp.borrow_mut().activate_listener(&address, listener);
1235                match listener_token {
1236                    Ok(token) => {
1237                        self.accept(ListenToken(token.0), Protocol::TCPListen);
1238                        WorkerResponse::ok(req_id)
1239                    }
1240                    Err(activate_error) => worker_response_error(
1241                        req_id,
1242                        format!("Could not activate TCP listener: {}", activate_error),
1243                    ),
1244                }
1245            }
1246            Err(_) => worker_response_error(req_id, "Wrong variant for ListenerType on request"),
1247        }
1248    }
1249
1250    fn notify_deactivate_listener(
1251        &mut self,
1252        req_id: &str,
1253        deactivate: &DeactivateListener,
1254    ) -> WorkerResponse {
1255        debug!(
1256            "{} deactivate {:?} listener {:?}",
1257            req_id, deactivate.proxy, deactivate
1258        );
1259
1260        let address: std::net::SocketAddr = deactivate.address.into();
1261
1262        match ListenerType::try_from(deactivate.proxy) {
1263            Ok(ListenerType::Http) => {
1264                let (token, mut listener) = match self.http.borrow_mut().give_back_listener(address)
1265                {
1266                    Ok((token, listener)) => (token, listener),
1267                    Err(e) => {
1268                        return worker_response_error(
1269                            req_id,
1270                            format!(
1271                                "Couldn't deactivate HTTP listener at address {address:?}: {e}"
1272                            ),
1273                        )
1274                    }
1275                };
1276
1277                if let Err(e) = self.poll.registry().deregister(&mut listener) {
1278                    error!(
1279                        "error deregistering HTTP listen socket({:?}): {:?}",
1280                        deactivate, e
1281                    );
1282                }
1283
1284                {
1285                    let mut sessions = self.sessions.borrow_mut();
1286                    if sessions.slab.contains(token.0) {
1287                        sessions.slab.remove(token.0);
1288                        info!("removed listen token {:?}", token);
1289                    }
1290                }
1291
1292                if deactivate.to_scm {
1293                    self.unblock_scm_socket();
1294                    let listeners = Listeners {
1295                        http: vec![(address, listener.as_raw_fd())],
1296                        tls: vec![],
1297                        tcp: vec![],
1298                    };
1299                    info!("sending HTTP listener: {:?}", listeners);
1300                    let res = self.scm.send_listeners(&listeners);
1301
1302                    self.block_scm_socket();
1303
1304                    info!("sent HTTP listener: {:?}", res);
1305                }
1306                WorkerResponse::ok(req_id)
1307            }
1308            Ok(ListenerType::Https) => {
1309                let (token, mut listener) =
1310                    match self.https.borrow_mut().give_back_listener(address) {
1311                        Ok((token, listener)) => (token, listener),
1312                        Err(e) => {
1313                            return worker_response_error(
1314                                req_id,
1315                                format!(
1316                                "Couldn't deactivate HTTPS listener at address {address:?}: {e}",
1317                            ),
1318                            )
1319                        }
1320                    };
1321                if let Err(e) = self.poll.registry().deregister(&mut listener) {
1322                    error!(
1323                        "error deregistering HTTPS listen socket({:?}): {:?}",
1324                        deactivate, e
1325                    );
1326                }
1327                if self.sessions.borrow().slab.contains(token.0) {
1328                    self.sessions.borrow_mut().slab.remove(token.0);
1329                    info!("removed listen token {:?}", token);
1330                }
1331
1332                if deactivate.to_scm {
1333                    self.unblock_scm_socket();
1334                    let listeners = Listeners {
1335                        http: vec![],
1336                        tls: vec![(address, listener.as_raw_fd())],
1337                        tcp: vec![],
1338                    };
1339                    info!("sending HTTPS listener: {:?}", listeners);
1340                    let res = self.scm.send_listeners(&listeners);
1341
1342                    self.block_scm_socket();
1343
1344                    info!("sent HTTPS listener: {:?}", res);
1345                }
1346                WorkerResponse::ok(req_id)
1347            }
1348            Ok(ListenerType::Tcp) => {
1349                let (token, mut listener) = match self.tcp.borrow_mut().give_back_listener(address)
1350                {
1351                    Ok((token, listener)) => (token, listener),
1352                    Err(e) => {
1353                        return worker_response_error(
1354                            req_id,
1355                            format!(
1356                                "Could not deactivate TCP listener at address {:?}: {}",
1357                                address, e
1358                            ),
1359                        )
1360                    }
1361                };
1362
1363                if let Err(e) = self.poll.registry().deregister(&mut listener) {
1364                    error!(
1365                        "error deregistering TCP listen socket({:?}): {:?}",
1366                        deactivate, e
1367                    );
1368                }
1369                if self.sessions.borrow().slab.contains(token.0) {
1370                    self.sessions.borrow_mut().slab.remove(token.0);
1371                    info!("removed listen token {:?}", token);
1372                }
1373
1374                if deactivate.to_scm {
1375                    self.unblock_scm_socket();
1376                    let listeners = Listeners {
1377                        http: vec![],
1378                        tls: vec![],
1379                        tcp: vec![(address, listener.as_raw_fd())],
1380                    };
1381                    info!("sending TCP listener: {:?}", listeners);
1382                    let res = self.scm.send_listeners(&listeners);
1383
1384                    self.block_scm_socket();
1385
1386                    info!("sent TCP listener: {:?}", res);
1387                }
1388                WorkerResponse::ok(req_id)
1389            }
1390            Err(_) => worker_response_error(req_id, "Wrong variant for ListenerType on request"),
1391        }
1392    }
1393
1394    /// Send all socket addresses and file descriptors of all proxies, via the scm socket
1395    pub fn return_listen_sockets(&mut self) -> Result<(), ScmSocketError> {
1396        self.unblock_scm_socket();
1397
1398        let mut http_listeners = self.http.borrow_mut().give_back_listeners();
1399        for &mut (_, ref mut sock) in http_listeners.iter_mut() {
1400            if let Err(e) = self.poll.registry().deregister(sock) {
1401                error!(
1402                    "error deregistering HTTP listen socket({:?}): {:?}",
1403                    sock, e
1404                );
1405            }
1406        }
1407
1408        let mut https_listeners = self.https.borrow_mut().give_back_listeners();
1409        for &mut (_, ref mut sock) in https_listeners.iter_mut() {
1410            if let Err(e) = self.poll.registry().deregister(sock) {
1411                error!(
1412                    "error deregistering HTTPS listen socket({:?}): {:?}",
1413                    sock, e
1414                );
1415            }
1416        }
1417
1418        let mut tcp_listeners = self.tcp.borrow_mut().give_back_listeners();
1419        for &mut (_, ref mut sock) in tcp_listeners.iter_mut() {
1420            if let Err(e) = self.poll.registry().deregister(sock) {
1421                error!("error deregistering TCP listen socket({:?}): {:?}", sock, e);
1422            }
1423        }
1424
1425        // use as_raw_fd because the listeners should be dropped after sending them
1426        let listeners = Listeners {
1427            http: http_listeners
1428                .iter()
1429                .map(|(addr, listener)| (*addr, listener.as_raw_fd()))
1430                .collect(),
1431            tls: https_listeners
1432                .iter()
1433                .map(|(addr, listener)| (*addr, listener.as_raw_fd()))
1434                .collect(),
1435            tcp: tcp_listeners
1436                .iter()
1437                .map(|(addr, listener)| (*addr, listener.as_raw_fd()))
1438                .collect(),
1439        };
1440        info!("sending default listeners: {:?}", listeners);
1441        let res = self.scm.send_listeners(&listeners);
1442
1443        self.block_scm_socket();
1444
1445        info!("sent default listeners: {:?}", res);
1446        res
1447    }
1448
1449    fn block_scm_socket(&mut self) {
1450        if let Err(e) = self.scm.set_blocking(true) {
1451            error!("Could not block scm socket: {}", e);
1452        }
1453    }
1454
1455    fn unblock_scm_socket(&mut self) {
1456        if let Err(e) = self.scm.set_blocking(false) {
1457            error!("Could not unblock scm socket: {}", e);
1458        }
1459    }
1460
1461    pub fn to_session(&self, token: Token) -> SessionToken {
1462        SessionToken(token.0)
1463    }
1464
1465    pub fn from_session(&self, token: SessionToken) -> Token {
1466        Token(token.0)
1467    }
1468
1469    pub fn accept(&mut self, token: ListenToken, protocol: Protocol) {
1470        match protocol {
1471            Protocol::TCPListen => loop {
1472                match self.tcp.borrow_mut().accept(token) {
1473                    Ok(sock) => self.accept_queue.push_back((
1474                        sock,
1475                        token,
1476                        Protocol::TCPListen,
1477                        Instant::now(),
1478                    )),
1479                    Err(AcceptError::WouldBlock) => {
1480                        self.accept_ready.remove(&token);
1481                        break;
1482                    }
1483                    Err(other) => {
1484                        error!("error accepting TCP sockets: {:?}", other);
1485                        self.accept_ready.remove(&token);
1486                        break;
1487                    }
1488                }
1489            },
1490            Protocol::HTTPListen => loop {
1491                match self.http.borrow_mut().accept(token) {
1492                    Ok(sock) => self.accept_queue.push_back((
1493                        sock,
1494                        token,
1495                        Protocol::HTTPListen,
1496                        Instant::now(),
1497                    )),
1498                    Err(AcceptError::WouldBlock) => {
1499                        self.accept_ready.remove(&token);
1500                        break;
1501                    }
1502                    Err(other) => {
1503                        error!("error accepting HTTP sockets: {:?}", other);
1504                        self.accept_ready.remove(&token);
1505                        break;
1506                    }
1507                }
1508            },
1509            Protocol::HTTPSListen => loop {
1510                match self.https.borrow_mut().accept(token) {
1511                    Ok(sock) => self.accept_queue.push_back((
1512                        sock,
1513                        token,
1514                        Protocol::HTTPSListen,
1515                        Instant::now(),
1516                    )),
1517                    Err(AcceptError::WouldBlock) => {
1518                        self.accept_ready.remove(&token);
1519                        break;
1520                    }
1521                    Err(other) => {
1522                        error!("error accepting HTTPS sockets: {:?}", other);
1523                        self.accept_ready.remove(&token);
1524                        break;
1525                    }
1526                }
1527            },
1528            _ => panic!("should not call accept() on a HTTP, HTTPS or TCP session"),
1529        }
1530
1531        gauge!("accept_queue.connections", self.accept_queue.len());
1532    }
1533
1534    pub fn create_sessions(&mut self) {
1535        while let Some((sock, token, protocol, timestamp)) = self.accept_queue.pop_back() {
1536            let wait_time = Instant::now() - timestamp;
1537            time!("accept_queue.wait_time", wait_time.as_millis());
1538            if wait_time > self.accept_queue_timeout {
1539                incr!("accept_queue.timeout");
1540                continue;
1541            }
1542
1543            if !self.sessions.borrow_mut().check_limits() {
1544                break;
1545            }
1546
1547            //FIXME: check the timestamp
1548            //TODO: create_session should return the session and
1549            // the server should insert it in the the SessionManager
1550            match protocol {
1551                Protocol::TCPListen => {
1552                    let proxy = self.tcp.clone();
1553                    if self
1554                        .tcp
1555                        .borrow_mut()
1556                        .create_session(sock, token, wait_time, proxy)
1557                        .is_err()
1558                    {
1559                        break;
1560                    }
1561                }
1562                Protocol::HTTPListen => {
1563                    let proxy = self.http.clone();
1564                    if self
1565                        .http
1566                        .borrow_mut()
1567                        .create_session(sock, token, wait_time, proxy)
1568                        .is_err()
1569                    {
1570                        break;
1571                    }
1572                }
1573                Protocol::HTTPSListen => {
1574                    if self
1575                        .https
1576                        .borrow_mut()
1577                        .create_session(sock, token, wait_time, self.https.clone())
1578                        .is_err()
1579                    {
1580                        break;
1581                    }
1582                }
1583                _ => panic!("should not call accept() on a HTTP, HTTPS or TCP session"),
1584            };
1585            self.sessions.borrow_mut().incr();
1586        }
1587
1588        gauge!("accept_queue.connections", self.accept_queue.len());
1589    }
1590
1591    pub fn ready(&mut self, token: Token, events: Ready) {
1592        trace!("PROXY\t{:?} got events: {:?}", token, events);
1593
1594        let session_token = token.0;
1595        if self.sessions.borrow().slab.contains(session_token) {
1596            //info!("sessions contains {:?}", session_token);
1597            let protocol = self.sessions.borrow().slab[session_token]
1598                .borrow()
1599                .protocol();
1600            //info!("protocol: {:?}", protocol);
1601            match protocol {
1602                Protocol::HTTPListen | Protocol::HTTPSListen | Protocol::TCPListen => {
1603                    //info!("PROTOCOL IS LISTEN");
1604                    if events.is_readable() {
1605                        self.accept_ready.insert(ListenToken(token.0));
1606                        if self.sessions.borrow().can_accept {
1607                            self.accept(ListenToken(token.0), protocol);
1608                        }
1609                        return;
1610                    }
1611
1612                    if events.is_writable() {
1613                        error!(
1614                            "received writable for listener {:?}, this should not happen",
1615                            token
1616                        );
1617                        return;
1618                    }
1619
1620                    if events.is_hup() {
1621                        error!("should not happen: server {:?} closed", token);
1622                        return;
1623                    }
1624
1625                    unreachable!();
1626                }
1627                _ => {}
1628            }
1629
1630            let session = self.sessions.borrow_mut().slab[session_token].clone();
1631            session.borrow_mut().update_readiness(token, events);
1632            if session.borrow_mut().ready(session.clone()) {
1633                self.kill_session(session);
1634            }
1635        }
1636    }
1637
1638    pub fn timeout(&mut self, token: Token) {
1639        trace!("PROXY\t{:?} got timeout", token);
1640
1641        let session_token = token.0;
1642        if self.sessions.borrow().slab.contains(session_token) {
1643            let session = self.sessions.borrow_mut().slab[session_token].clone();
1644            if session.borrow_mut().timeout(token) {
1645                self.kill_session(session);
1646            }
1647        }
1648    }
1649
1650    pub fn handle_remaining_readiness(&mut self) {
1651        // try to accept again after handling all session events,
1652        // since we might have released a few session slots
1653        if self.sessions.borrow().can_accept && !self.accept_ready.is_empty() {
1654            while let Some(token) = self
1655                .accept_ready
1656                .iter()
1657                .next()
1658                .map(|token| ListenToken(token.0))
1659            {
1660                let protocol = self.sessions.borrow().slab[token.0].borrow().protocol();
1661                self.accept(token, protocol);
1662                if !self.sessions.borrow().can_accept || self.accept_ready.is_empty() {
1663                    break;
1664                }
1665            }
1666        }
1667    }
1668    fn block_channel(&mut self) {
1669        if let Err(e) = self.channel.blocking() {
1670            error!("Could not block channel: {}", e);
1671        }
1672    }
1673    fn unblock_channel(&mut self) {
1674        if let Err(e) = self.channel.nonblocking() {
1675            error!("Could not block channel: {}", e);
1676        }
1677    }
1678}
1679
1680/// log the error together with the request id
1681/// create a WorkerResponse
1682fn worker_response_error<S: ToString, T: ToString>(request_id: S, error: T) -> WorkerResponse {
1683    error!(
1684        "error on request {}, {}",
1685        request_id.to_string(),
1686        error.to_string()
1687    );
1688    WorkerResponse::error(request_id, error)
1689}
1690
1691pub struct ListenSession {
1692    pub protocol: Protocol,
1693}
1694
1695impl ProxySession for ListenSession {
1696    fn last_event(&self) -> Instant {
1697        Instant::now()
1698    }
1699
1700    fn print_session(&self) {}
1701
1702    fn frontend_token(&self) -> Token {
1703        Token(0)
1704    }
1705
1706    fn protocol(&self) -> Protocol {
1707        self.protocol
1708    }
1709
1710    fn ready(&mut self, _session: Rc<RefCell<dyn ProxySession>>) -> SessionIsToBeClosed {
1711        false
1712    }
1713
1714    fn shutting_down(&mut self) -> SessionIsToBeClosed {
1715        false
1716    }
1717
1718    fn update_readiness(&mut self, _token: Token, _events: Ready) {}
1719
1720    fn close(&mut self) {}
1721
1722    fn timeout(&mut self, _token: Token) -> SessionIsToBeClosed {
1723        error!(
1724            "called ProxySession::timeout(token={:?}, time) on ListenSession {{ protocol: {:?} }}",
1725            _token, self.protocol
1726        );
1727        false
1728    }
1729}