sozu_lib/
server.rs

1//! event loop management
2use std::{
3    cell::RefCell,
4    collections::{HashSet, VecDeque},
5    io::Error as IoError,
6    os::unix::io::{AsRawFd, FromRawFd},
7    rc::Rc,
8    time::{Duration, Instant},
9};
10
11use mio::{
12    Events, Interest, Poll, Token,
13    net::{TcpListener as MioTcpListener, TcpStream},
14};
15use slab::Slab;
16use sozu_command::{
17    channel::Channel,
18    logging,
19    proto::command::{
20        ActivateListener, AddBackend, CertificatesWithFingerprints, Cluster, ClusterHashes,
21        ClusterInformations, DeactivateListener, Event, HttpListenerConfig, HttpsListenerConfig,
22        InitialState, ListenerType, LoadBalancingAlgorithms, LoadMetric, MetricsConfiguration,
23        RemoveBackend, Request, ResponseStatus, ServerConfig,
24        TcpListenerConfig as CommandTcpListener, WorkerRequest, WorkerResponse,
25        request::RequestType, response_content::ContentType,
26    },
27    ready::Ready,
28    scm_socket::{Listeners, ScmSocket, ScmSocketError},
29    state::ConfigState,
30};
31
32use crate::{
33    AcceptError, Protocol, ProxyConfiguration, ProxySession, SessionIsToBeClosed,
34    backends::{Backend, BackendMap},
35    features::FEATURES,
36    http, https,
37    metrics::METRICS,
38    pool::Pool,
39    tcp,
40    timer::Timer,
41};
42
43// Number of retries to perform on a server after a connection failure
44pub const CONN_RETRIES: u8 = 3;
45
46pub type ProxyChannel = Channel<WorkerResponse, WorkerRequest>;
47
48thread_local! {
49  pub static QUEUE: RefCell<VecDeque<WorkerResponse>> = const { RefCell::new(VecDeque::new()) };
50}
51
52thread_local! {
53  pub static TIMER: RefCell<Timer<Token>> = RefCell::new(Timer::default());
54}
55
56pub fn push_queue(message: WorkerResponse) {
57    QUEUE.with(|queue| {
58        (*queue.borrow_mut()).push_back(message);
59    });
60}
61
62pub fn push_event(event: Event) {
63    QUEUE.with(|queue| {
64        (*queue.borrow_mut()).push_back(WorkerResponse {
65            id: "EVENT".to_string(),
66            message: String::new(),
67            status: ResponseStatus::Processing.into(),
68            content: Some(ContentType::Event(event).into()),
69        });
70    });
71}
72
73#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
74pub struct ListenToken(pub usize);
75#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
76pub struct SessionToken(pub usize);
77
78impl From<usize> for ListenToken {
79    fn from(val: usize) -> ListenToken {
80        ListenToken(val)
81    }
82}
83
84impl From<ListenToken> for usize {
85    fn from(val: ListenToken) -> usize {
86        val.0
87    }
88}
89
90impl From<usize> for SessionToken {
91    fn from(val: usize) -> SessionToken {
92        SessionToken(val)
93    }
94}
95
96impl From<SessionToken> for usize {
97    fn from(val: SessionToken) -> usize {
98        val.0
99    }
100}
101
102pub struct SessionManager {
103    pub max_connections: usize,
104    pub nb_connections: usize,
105    pub can_accept: bool,
106    pub slab: Slab<Rc<RefCell<dyn ProxySession>>>,
107}
108
109impl SessionManager {
110    pub fn new(
111        slab: Slab<Rc<RefCell<dyn ProxySession>>>,
112        max_connections: usize,
113    ) -> Rc<RefCell<Self>> {
114        Rc::new(RefCell::new(SessionManager {
115            max_connections,
116            nb_connections: 0,
117            can_accept: true,
118            slab,
119        }))
120    }
121
122    /// The slab is considered at capacity if it contains more sessions than twice max_connections
123    pub fn at_capacity(&self) -> bool {
124        self.slab.len() >= 10 + 2 * self.max_connections
125    }
126
127    /// Check the number of connections against max_connections, and the slab capacity.
128    /// Returns false if limits are reached.
129    pub fn check_limits(&mut self) -> bool {
130        if self.nb_connections >= self.max_connections {
131            error!("max number of session connection reached, flushing the accept queue");
132            gauge!("accept_queue.backpressure", 1);
133            self.can_accept = false;
134            return false;
135        }
136
137        if self.at_capacity() {
138            error!("not enough memory to accept another session, flushing the accept queue");
139            error!(
140                "nb_connections: {}, max_connections: {}",
141                self.nb_connections, self.max_connections
142            );
143            gauge!("accept_queue.backpressure", 1);
144            self.can_accept = false;
145
146            return false;
147        }
148
149        true
150    }
151
152    pub fn to_session(token: Token) -> SessionToken {
153        SessionToken(token.0)
154    }
155
156    pub fn incr(&mut self) {
157        self.nb_connections += 1;
158        assert!(self.nb_connections <= self.max_connections);
159        gauge!("client.connections", self.nb_connections);
160    }
161
162    /// Decrements the number of sessions, start accepting new connections
163    /// if the capacity limit of 90% has not been reached.
164    pub fn decr(&mut self) {
165        assert!(self.nb_connections != 0);
166        self.nb_connections -= 1;
167        gauge!("client.connections", self.nb_connections);
168
169        // do not be ready to accept right away, wait until we get back to 10% capacity
170        if !self.can_accept && self.nb_connections < self.max_connections * 90 / 100 {
171            debug!(
172                "nb_connections = {}, max_connections = {}, starting to accept again",
173                self.nb_connections, self.max_connections
174            );
175            gauge!("accept_queue.backpressure", 0);
176            self.can_accept = true;
177        }
178    }
179}
180
181#[derive(thiserror::Error, Debug)]
182pub enum ServerError {
183    #[error("could not create event loop with MIO poll: {0}")]
184    CreatePoll(IoError),
185    #[error("could not clone the MIO registry: {0}")]
186    CloneRegistry(IoError),
187    #[error("could not register the channel: {0}")]
188    RegisterChannel(IoError),
189    #[error("{msg}:{scm_err}")]
190    ScmSocket {
191        msg: String,
192        scm_err: ScmSocketError,
193    },
194}
195
196/// `Server` handles the event loop, the listeners, the sessions and
197/// communication with the configuration channel.
198///
199/// A listener wraps a listen socket, the associated proxying protocols
200/// (HTTP, HTTPS and TCP) and the routing configuration for clusters.
201/// Listeners handle creating sessions from accepted sockets.
202///
203/// A session manages a "front" socket for a connected client, and all
204/// of the associated data (back socket, protocol state machine, buffers,
205/// metrics...).
206///
207/// `Server` gets configuration updates from the channel (domIN/path routes,
208/// backend server address...).
209///
210/// Listeners and sessions are all stored in a slab structure to index them
211/// by a [Token], they all have to implement the [ProxySession] trait.
212pub struct Server {
213    accept_queue_timeout: Duration,
214    accept_queue: VecDeque<(TcpStream, ListenToken, Protocol, Instant)>,
215    accept_ready: HashSet<ListenToken>,
216    backends: Rc<RefCell<BackendMap>>,
217    base_sessions_count: usize,
218    channel: ProxyChannel,
219    config_state: ConfigState,
220    current_poll_errors: i32,
221    http: Rc<RefCell<http::HttpProxy>>,
222    https: Rc<RefCell<https::HttpsProxy>>,
223    last_sessions_len: usize,
224    last_shutting_down_message: Option<Instant>,
225    last_zombie_check: Instant,
226    loop_start: Instant,
227    max_poll_errors: i32, // TODO: make this configurable? this defaults to 10000 for now
228    pub poll: Poll,
229    poll_timeout: Option<Duration>, // TODO: make this configurable? this defaults to 1000 milliseconds for now
230    scm_listeners: Option<Listeners>,
231    scm: ScmSocket,
232    sessions: Rc<RefCell<SessionManager>>,
233    should_poll_at: Option<Instant>,
234    shutting_down: Option<String>,
235    tcp: Rc<RefCell<tcp::TcpProxy>>,
236    zombie_check_interval: Duration,
237}
238
239impl Server {
240    pub fn try_new_from_config(
241        worker_to_main_channel: ProxyChannel,
242        worker_to_main_scm: ScmSocket,
243        config: ServerConfig,
244        initial_state: InitialState,
245        expects_initial_status: bool,
246    ) -> Result<Self, ServerError> {
247        let event_loop = Poll::new().map_err(ServerError::CreatePoll)?;
248        let pool = Rc::new(RefCell::new(Pool::with_capacity(
249            config.min_buffers as usize,
250            config.max_buffers as usize,
251            config.buffer_size as usize,
252        )));
253        let backends = Rc::new(RefCell::new(BackendMap::new()));
254
255        //FIXME: we will use a few entries for the channel, metrics socket and the listeners
256        //FIXME: for HTTP/2, we will have more than 2 entries per session
257        let sessions: Rc<RefCell<SessionManager>> = SessionManager::new(
258            Slab::with_capacity(config.slab_capacity() as usize),
259            config.max_connections as usize,
260        );
261        {
262            let mut s = sessions.borrow_mut();
263            let entry = s.slab.vacant_entry();
264            trace!("taking token {:?} for channel", SessionToken(entry.key()));
265            entry.insert(Rc::new(RefCell::new(ListenSession {
266                protocol: Protocol::Channel,
267            })));
268        }
269        {
270            let mut s = sessions.borrow_mut();
271            let entry = s.slab.vacant_entry();
272            trace!("taking token {:?} for metrics", SessionToken(entry.key()));
273            entry.insert(Rc::new(RefCell::new(ListenSession {
274                protocol: Protocol::Timer,
275            })));
276        }
277        {
278            let mut s = sessions.borrow_mut();
279            let entry = s.slab.vacant_entry();
280            trace!("taking token {:?} for metrics", SessionToken(entry.key()));
281            entry.insert(Rc::new(RefCell::new(ListenSession {
282                protocol: Protocol::Metrics,
283            })));
284        }
285
286        Server::new(
287            event_loop,
288            worker_to_main_channel,
289            worker_to_main_scm,
290            sessions,
291            pool,
292            backends,
293            None,
294            None,
295            None,
296            config,
297            Some(initial_state),
298            expects_initial_status,
299        )
300    }
301
302    #[allow(clippy::too_many_arguments)]
303    pub fn new(
304        poll: Poll,
305        mut channel: ProxyChannel,
306        scm: ScmSocket,
307        sessions: Rc<RefCell<SessionManager>>,
308        pool: Rc<RefCell<Pool>>,
309        backends: Rc<RefCell<BackendMap>>,
310        http: Option<http::HttpProxy>,
311        https: Option<https::HttpsProxy>,
312        tcp: Option<tcp::TcpProxy>,
313        server_config: ServerConfig,
314        initial_state: Option<InitialState>,
315        expects_initial_status: bool,
316    ) -> Result<Self, ServerError> {
317        FEATURES.with(|_features| {
318            // initializing feature flags
319        });
320
321        poll.registry()
322            .register(
323                &mut channel,
324                Token(0),
325                Interest::READABLE | Interest::WRITABLE,
326            )
327            .map_err(ServerError::RegisterChannel)?;
328
329        METRICS.with(|metrics| {
330            if let Some(sock) = (*metrics.borrow_mut()).socket_mut() {
331                poll.registry()
332                    .register(sock, Token(2), Interest::WRITABLE)
333                    .expect("should register the metrics socket");
334            }
335        });
336
337        let base_sessions_count = sessions.borrow().slab.len();
338
339        let http = Rc::new(RefCell::new(match http {
340            Some(http) => http,
341            None => {
342                let registry = poll
343                    .registry()
344                    .try_clone()
345                    .map_err(ServerError::CloneRegistry)?;
346
347                http::HttpProxy::new(registry, sessions.clone(), pool.clone(), backends.clone())
348            }
349        }));
350
351        let https = Rc::new(RefCell::new(match https {
352            Some(https) => https,
353            None => {
354                let registry = poll
355                    .registry()
356                    .try_clone()
357                    .map_err(ServerError::CloneRegistry)?;
358
359                https::HttpsProxy::new(registry, sessions.clone(), pool.clone(), backends.clone())
360            }
361        }));
362
363        let tcp = Rc::new(RefCell::new(match tcp {
364            Some(tcp) => tcp,
365            None => {
366                let registry = poll
367                    .registry()
368                    .try_clone()
369                    .map_err(ServerError::CloneRegistry)?;
370
371                tcp::TcpProxy::new(registry, sessions.clone(), pool.clone(), backends.clone())
372            }
373        }));
374
375        let mut server = Server {
376            accept_queue_timeout: Duration::from_secs(u64::from(
377                server_config.accept_queue_timeout,
378            )),
379            accept_queue: VecDeque::new(),
380            accept_ready: HashSet::new(),
381            backends,
382            base_sessions_count,
383            channel,
384            config_state: ConfigState::new(),
385            current_poll_errors: 0,
386            http,
387            https,
388            last_sessions_len: 0, // to be reset on server run
389            last_shutting_down_message: None,
390            last_zombie_check: Instant::now(), // to be reset on server run
391            loop_start: Instant::now(),        // to be reset on server run
392            max_poll_errors: 10000,            // TODO: make it configurable?
393            poll_timeout: Some(Duration::from_millis(1000)), // TODO: make it configurable?
394            poll,
395            scm_listeners: None,
396            scm,
397            sessions,
398            should_poll_at: None,
399            shutting_down: None,
400            tcp,
401            zombie_check_interval: Duration::from_secs(u64::from(
402                server_config.zombie_check_interval,
403            )),
404        };
405
406        // initialize the worker with the state we got from a file
407        if let Some(state) = initial_state {
408            for request in state.requests {
409                trace!("generating initial config request: {:#?}", request);
410                server.notify_proxys(request);
411            }
412
413            // do not send back answers to the initialization messages
414            QUEUE.with(|queue| {
415                (*queue.borrow_mut()).clear();
416            });
417        }
418
419        if expects_initial_status {
420            // the main process sends a Status message, so we can notify it
421            // when the initial state is loaded
422            server.block_channel();
423            let msg = server.channel.read_message();
424            debug!("got message: {:?}", msg);
425
426            if let Ok(WorkerRequest {
427                id,
428                content:
429                    Request {
430                        request_type: Some(RequestType::Status(_)),
431                    },
432            }) = msg
433            {
434                if let Err(e) = server.channel.write_message(&WorkerResponse::ok(id)) {
435                    error!("Could not send an ok to the main process: {}", e);
436                }
437            } else {
438                panic!(
439                    "plz give me a status request first when I start, you sent me this instead: {:?}",
440                    msg
441                );
442            }
443            server.unblock_channel();
444        }
445
446        info!("will try to receive listeners");
447        server
448            .scm
449            .set_blocking(true)
450            .map_err(|scm_err| ServerError::ScmSocket {
451                msg: "Could not set the scm socket to blocking".to_string(),
452                scm_err,
453            })?;
454        let listeners =
455            server
456                .scm
457                .receive_listeners()
458                .map_err(|scm_err| ServerError::ScmSocket {
459                    msg: "could not receive listeners from the scm socket".to_string(),
460                    scm_err,
461                })?;
462        server
463            .scm
464            .set_blocking(false)
465            .map_err(|scm_err| ServerError::ScmSocket {
466                msg: "Could not set the scm socket to unblocking".to_string(),
467                scm_err,
468            })?;
469        info!("received listeners: {:?}", listeners);
470        server.scm_listeners = Some(listeners);
471
472        Ok(server)
473    }
474
475    /// The server runs in a loop until a shutdown is ordered
476    pub fn run(&mut self) {
477        let mut events = Events::with_capacity(1024); // TODO: make event capacity configurable?
478        self.last_sessions_len = self.sessions.borrow().slab.len();
479
480        self.last_zombie_check = Instant::now();
481        self.loop_start = Instant::now();
482
483        loop {
484            self.check_for_poll_errors();
485
486            let timeout = self.reset_loop_time_and_get_timeout();
487
488            match self.poll.poll(&mut events, timeout) {
489                Ok(_) => self.current_poll_errors = 0,
490                Err(error) => {
491                    error!("Error while polling events: {:?}", error);
492                    self.current_poll_errors += 1;
493                    continue;
494                }
495            }
496
497            let after_epoll = Instant::now();
498            time!("epoll_time", (after_epoll - self.loop_start).as_millis());
499            self.loop_start = after_epoll;
500
501            self.send_queue();
502
503            for event in events.iter() {
504                match event.token() {
505                    // this is the command channel
506                    Token(0) => {
507                        if event.is_error() {
508                            error!("error reading from command channel");
509                            continue;
510                        }
511                        if event.is_read_closed() || event.is_write_closed() {
512                            error!("command channel was closed");
513                            return;
514                        }
515                        let ready = Ready::from(event);
516                        self.channel.handle_events(ready);
517
518                        // loop here because iterations has borrow issues
519                        loop {
520                            QUEUE.with(|queue| {
521                                if !(*queue.borrow()).is_empty() {
522                                    self.channel.interest.insert(Ready::WRITABLE);
523                                }
524                            });
525
526                            //trace!("WORKER[{}] channel readiness={:?}, interest={:?}, queue={} elements",
527                            //  line!(), self.channel.readiness, self.channel.interest, self.queue.len());
528                            if self.channel.readiness() == Ready::EMPTY {
529                                break;
530                            }
531
532                            // exit the big loop if the message is HardStop
533                            if self.read_channel_messages_and_notify() {
534                                return;
535                            }
536
537                            QUEUE.with(|queue| {
538                                if !(*queue.borrow()).is_empty() {
539                                    self.channel.interest.insert(Ready::WRITABLE);
540                                }
541                            });
542
543                            self.send_queue();
544                        }
545                    }
546                    // timer tick
547                    Token(1) => {
548                        while let Some(t) = TIMER.with(|timer| timer.borrow_mut().poll()) {
549                            self.timeout(t);
550                        }
551                    }
552                    // metrics socket is writable
553                    Token(2) => METRICS.with(|metrics| {
554                        (*metrics.borrow_mut()).writable();
555                    }),
556                    // ListenToken: 1 listener <=> 1 token
557                    // ProtocolToken (HTTP/HTTPS/TCP): 1 connection <=> 1 token
558                    token => self.ready(token, Ready::from(event)),
559                }
560            }
561
562            if let Some(t) = self.should_poll_at.as_ref() {
563                if *t <= Instant::now() {
564                    while let Some(t) = TIMER.with(|timer| timer.borrow_mut().poll()) {
565                        //info!("polled for timeout: {:?}", t);
566                        self.timeout(t);
567                    }
568                }
569            }
570            self.handle_remaining_readiness();
571            self.create_sessions();
572
573            self.should_poll_at = TIMER.with(|timer| timer.borrow().next_poll_date());
574
575            self.zombie_check();
576
577            let now = time::OffsetDateTime::now_utc();
578            // clear the local metrics drain every plain hour (01:00, 02:00, etc.) to prevent memory overuse
579            // TODO: have one-hour-lasting metrics instead
580            if now.minute() == 00 && now.second() == 0 {
581                METRICS.with(|metrics| {
582                    (*metrics.borrow_mut()).clear_local();
583                });
584            }
585
586            gauge!("client.connections", self.sessions.borrow().nb_connections);
587            gauge!("slab.entries", self.sessions.borrow().slab.len());
588            METRICS.with(|metrics| {
589                (*metrics.borrow_mut()).send_data();
590            });
591
592            if self.shutting_down.is_some() && self.shut_down_sessions() {
593                return;
594            }
595        }
596    }
597
598    fn check_for_poll_errors(&mut self) {
599        if self.current_poll_errors >= self.max_poll_errors {
600            error!(
601                "Something is going very wrong. Last {} poll() calls failed, crashing..",
602                self.current_poll_errors
603            );
604            panic!(
605                "poll() calls failed {} times in a row",
606                self.current_poll_errors
607            );
608        }
609    }
610
611    fn reset_loop_time_and_get_timeout(&mut self) -> Option<Duration> {
612        let now = Instant::now();
613        time!("event_loop_time", (now - self.loop_start).as_millis());
614
615        let timeout = match self.should_poll_at.as_ref() {
616            None => self.poll_timeout,
617            Some(i) => {
618                if *i <= now {
619                    self.poll_timeout
620                } else {
621                    let dur = *i - now;
622                    match self.poll_timeout {
623                        None => Some(dur),
624                        Some(t) => {
625                            if t < dur {
626                                Some(t)
627                            } else {
628                                Some(dur)
629                            }
630                        }
631                    }
632                }
633            }
634        };
635
636        self.loop_start = now;
637        timeout
638    }
639
640    /// Returns true if hardstop
641    fn read_channel_messages_and_notify(&mut self) -> bool {
642        if !self.channel.readiness().is_readable() {
643            return false;
644        }
645
646        if let Err(e) = self.channel.readable() {
647            error!("error reading from channel: {:?}", e);
648        }
649
650        loop {
651            let request = self.channel.read_message();
652            debug!("Received request {:?}", request);
653            match request {
654                Ok(request) => match request.content.request_type {
655                    Some(RequestType::HardStop(_)) => {
656                        let req_id = request.id.clone();
657                        self.notify(request);
658                        if let Err(e) = self.channel.write_message(&WorkerResponse::ok(req_id)) {
659                            error!("Could not send ok response to the main process: {}", e);
660                        }
661                        if let Err(e) = self.channel.run() {
662                            error!("Error while running the server channel: {}", e);
663                        }
664                        return true;
665                    }
666                    Some(RequestType::SoftStop(_)) => {
667                        self.shutting_down = Some(request.id.clone());
668                        self.last_sessions_len = self.sessions.borrow().slab.len();
669                        self.notify(request);
670                    }
671                    Some(RequestType::ReturnListenSockets(_)) => {
672                        info!("received ReturnListenSockets order");
673                        match self.return_listen_sockets() {
674                            Ok(_) => push_queue(WorkerResponse::ok(request.id)),
675                            Err(error) => push_queue(worker_response_error(
676                                request.id,
677                                format!("Could not send listeners on scm socket: {error:?}"),
678                            )),
679                        }
680                    }
681                    _ => self.notify(request),
682                },
683                // Not an error per se, occurs when there is nothing to read
684                Err(_) => {
685                    // if the message was too large, we grow the buffer and retry to read if possible
686                    if (self.channel.interest & self.channel.readiness).is_readable() {
687                        if let Err(e) = self.channel.readable() {
688                            error!("error reading from channel: {:?}", e);
689                        }
690                        continue;
691                    }
692                    break;
693                }
694            }
695        }
696        false
697    }
698
699    /// Scans all sessions that have been inactive for longer than the configured interval
700    fn zombie_check(&mut self) {
701        let now = Instant::now();
702        if now - self.last_zombie_check < self.zombie_check_interval {
703            return;
704        }
705        info!("zombie check");
706        self.last_zombie_check = now;
707
708        let mut zombie_tokens = HashSet::new();
709
710        // find the zombie sessions
711        for (_index, session) in self
712            .sessions
713            .borrow_mut()
714            .slab
715            .iter_mut()
716            .filter(|(_, c)| now - c.borrow().last_event() > self.zombie_check_interval)
717        {
718            let session_token = session.borrow().frontend_token();
719            if !zombie_tokens.contains(&session_token) {
720                session.borrow().print_session();
721                zombie_tokens.insert(session_token);
722            }
723        }
724
725        let zombie_count = zombie_tokens.len() as i64;
726        count!("zombies", zombie_count);
727
728        let remaining_count = self.shut_down_sessions_by_frontend_tokens(zombie_tokens);
729        info!(
730            "removing {} zombies ({} remaining entries after close)",
731            zombie_count, remaining_count
732        );
733    }
734
735    /// Calls close on targeted sessions, yields the number of entries in the slab
736    /// that were not properly removed
737    fn shut_down_sessions_by_frontend_tokens(&self, tokens: HashSet<Token>) -> usize {
738        if tokens.is_empty() {
739            return 0;
740        }
741
742        // close the sessions associated with the tokens
743        for token in &tokens {
744            if self.sessions.borrow().slab.contains(token.0) {
745                let session = { self.sessions.borrow_mut().slab.remove(token.0) };
746                session.borrow_mut().close();
747                self.sessions.borrow_mut().decr();
748            }
749        }
750
751        // find the entries of closed sessions in the session manager (they should not be there)
752        let mut dangling_entries = HashSet::new();
753        for (entry_key, session) in &self.sessions.borrow().slab {
754            if tokens.contains(&session.borrow().frontend_token()) {
755                dangling_entries.insert(entry_key);
756            }
757        }
758
759        // remove these from the session manager
760        let mut dangling_entries_count = 0;
761        for entry_key in dangling_entries {
762            let mut sessions = self.sessions.borrow_mut();
763            if sessions.slab.contains(entry_key) {
764                sessions.slab.remove(entry_key);
765                dangling_entries_count += 1;
766            }
767        }
768        dangling_entries_count
769    }
770
771    /// Order sessions to shut down, check that they are all down
772    fn shut_down_sessions(&mut self) -> bool {
773        let sessions_count = self.sessions.borrow().slab.len();
774        let mut sessions_to_shut_down = HashSet::new();
775
776        for (_key, session) in &self.sessions.borrow().slab {
777            if session.borrow_mut().shutting_down() {
778                sessions_to_shut_down.insert(Token(session.borrow().frontend_token().0));
779            }
780        }
781        let _ = self.shut_down_sessions_by_frontend_tokens(sessions_to_shut_down);
782
783        let new_sessions_count = self.sessions.borrow().slab.len();
784
785        if new_sessions_count < sessions_count {
786            let now = Instant::now();
787            if let Some(last) = self.last_shutting_down_message {
788                if (now - last) > Duration::from_secs(5) {
789                    info!(
790                        "closed {} sessions, {} sessions left, base_sessions_count = {}",
791                        sessions_count - new_sessions_count,
792                        new_sessions_count,
793                        self.base_sessions_count
794                    );
795                }
796            }
797            self.last_shutting_down_message = Some(now);
798        }
799
800        if new_sessions_count <= self.base_sessions_count {
801            info!("last session stopped, shutting down!");
802            if let Err(e) = self.channel.run() {
803                error!("Error while running the server channel: {}", e);
804            }
805            // self.block_channel();
806            let id = self
807                .shutting_down
808                .take()
809                .expect("should have shut down correctly"); // panicking here makes sense actually
810
811            debug!("Responding OK to main process for request {}", id);
812
813            let proxy_response = WorkerResponse::ok(id);
814            if let Err(e) = self.channel.write_message(&proxy_response) {
815                error!("Could not write response to the main process: {}", e);
816            }
817            if let Err(e) = self.channel.run() {
818                error!("Error while running the server channel: {}", e);
819            }
820            return true;
821        }
822
823        if new_sessions_count < self.last_sessions_len {
824            info!(
825                "shutting down, {} slab elements remaining (base: {})",
826                new_sessions_count - self.base_sessions_count,
827                self.base_sessions_count
828            );
829            self.last_sessions_len = new_sessions_count;
830        }
831
832        false
833    }
834
835    fn kill_session(&self, session: Rc<RefCell<dyn ProxySession>>) {
836        let token = session.borrow().frontend_token();
837        let _ = self.shut_down_sessions_by_frontend_tokens(HashSet::from([token]));
838    }
839
840    fn send_queue(&mut self) {
841        if self.channel.readiness.is_writable() {
842            QUEUE.with(|q| {
843                let mut queue = q.borrow_mut();
844                loop {
845                    if let Some(resp) = queue.pop_front() {
846                        debug!("Sending response {:?}", resp);
847                        if let Err(e) = self.channel.write_message(&resp) {
848                            error!("Could not write message {} on the channel: {}", resp, e);
849                            queue.push_front(resp);
850                        }
851                    }
852
853                    if self.channel.back_buf.available_data() > 0 {
854                        if let Err(e) = self.channel.writable() {
855                            error!("error writing to channel: {:?}", e);
856                        }
857                    }
858
859                    if !self.channel.readiness.is_writable() {
860                        break;
861                    }
862
863                    if self.channel.back_buf.available_data() == 0 && queue.len() == 0 {
864                        break;
865                    }
866                }
867            });
868        }
869    }
870
871    fn notify(&mut self, message: WorkerRequest) {
872        match &message.content.request_type {
873            Some(RequestType::ConfigureMetrics(configuration)) => {
874                match MetricsConfiguration::try_from(*configuration) {
875                    Ok(metrics_config) => {
876                        METRICS.with(|metrics| {
877                            (*metrics.borrow_mut()).configure(&metrics_config);
878                            push_queue(WorkerResponse::ok(message.id));
879                        });
880                    }
881                    Err(e) => {
882                        error!("Error configuring metrics: {}", e);
883                        push_queue(WorkerResponse::error(message.id, e));
884                    }
885                }
886                return;
887            }
888            Some(RequestType::QueryMetrics(query_metrics_options)) => {
889                METRICS.with(|metrics| {
890                    match (*metrics.borrow_mut()).query(query_metrics_options) {
891                        Ok(c) => push_queue(WorkerResponse::ok_with_content(message.id, c)),
892                        Err(e) => {
893                            error!("Error querying metrics: {}", e);
894                            push_queue(WorkerResponse::error(message.id, e))
895                        }
896                    }
897                });
898                return;
899            }
900            Some(RequestType::Logging(logging_filter)) => {
901                info!(
902                    "{} changing logging filter to {}",
903                    message.id, logging_filter
904                );
905                // there should not be any errors as it was already parsed by the main process
906                let (directives, _errors) = logging::parse_logging_spec(logging_filter);
907                logging::LOGGER.with(|logger| {
908                    logger.borrow_mut().set_directives(directives);
909                });
910                push_queue(WorkerResponse::ok(message.id));
911                return;
912            }
913            Some(RequestType::QueryClustersHashes(_)) => {
914                push_queue(WorkerResponse::ok_with_content(
915                    message.id.clone(),
916                    ContentType::ClusterHashes(ClusterHashes {
917                        map: self.config_state.hash_state(),
918                    })
919                    .into(),
920                ));
921                return;
922            }
923            Some(RequestType::QueryClusterById(cluster_id)) => {
924                push_queue(WorkerResponse::ok_with_content(
925                    message.id.clone(),
926                    ContentType::Clusters(ClusterInformations {
927                        vec: self
928                            .config_state
929                            .cluster_state(cluster_id)
930                            .map_or(vec![], |ci| vec![ci]),
931                    })
932                    .into(),
933                ));
934            }
935            Some(RequestType::QueryClustersByDomain(domain)) => {
936                let cluster_ids = self
937                    .config_state
938                    .get_cluster_ids_by_domain(domain.hostname.clone(), domain.path.clone());
939                let vec = cluster_ids
940                    .iter()
941                    .filter_map(|cluster_id| self.config_state.cluster_state(cluster_id))
942                    .collect();
943
944                push_queue(WorkerResponse::ok_with_content(
945                    message.id.clone(),
946                    ContentType::Clusters(ClusterInformations { vec }).into(),
947                ));
948                return;
949            }
950            Some(RequestType::QueryCertificatesFromWorkers(filters)) => {
951                if filters.fingerprint.is_some() {
952                    let certs = self.config_state.get_certificates(filters.clone());
953                    let response = if !certs.is_empty() {
954                        WorkerResponse::ok_with_content(
955                            message.id.clone(),
956                            ContentType::CertificatesWithFingerprints(
957                                CertificatesWithFingerprints { certs },
958                            )
959                            .into(),
960                        )
961                    } else {
962                        worker_response_error(
963                            message.id.clone(),
964                            "Could not find certificate for this fingerprint",
965                        )
966                    };
967                    push_queue(response);
968                    return;
969                }
970                // if all certificates are queried, or filtered by domain name,
971                // the request will be handled by the https proxy
972            }
973            _other_request => {}
974        }
975        self.notify_proxys(message);
976    }
977
978    pub fn notify_proxys(&mut self, request: WorkerRequest) {
979        if let Err(e) = self.config_state.dispatch(&request.content) {
980            error!("Could not execute order on config state: {}", e);
981        }
982
983        let req_id = request.id.clone();
984
985        match request.content.request_type {
986            Some(RequestType::AddCluster(ref cluster)) => {
987                self.add_cluster(cluster);
988                //not returning because the message must still be handled by each proxy
989            }
990            Some(RequestType::AddBackend(ref backend)) => {
991                push_queue(self.add_backend(&req_id, backend));
992                return;
993            }
994            Some(RequestType::RemoveBackend(ref remove_backend)) => {
995                push_queue(self.remove_backend(&req_id, remove_backend));
996                return;
997            }
998            _ => {}
999        };
1000
1001        let proxy_destinations = request.content.get_destinations();
1002        let mut notify_response = None;
1003        if proxy_destinations.to_http_proxy {
1004            notify_response = Some(self.http.borrow_mut().notify(request.clone()));
1005        }
1006        if proxy_destinations.to_https_proxy {
1007            let http_proxy_response = self.https.borrow_mut().notify(request.clone());
1008            if http_proxy_response.is_failure() || notify_response.is_none() {
1009                notify_response = Some(http_proxy_response);
1010            }
1011        }
1012        if proxy_destinations.to_tcp_proxy {
1013            let tcp_proxy_response = self.tcp.borrow_mut().notify(request.clone());
1014            if tcp_proxy_response.is_failure() || notify_response.is_none() {
1015                notify_response = Some(tcp_proxy_response);
1016            }
1017        }
1018        if let Some(response) = notify_response {
1019            push_queue(response);
1020        }
1021
1022        match request.content.request_type {
1023            // special case for adding listeners, because we need to register a listener
1024            Some(RequestType::AddHttpListener(listener)) => {
1025                push_queue(self.notify_add_http_listener(&req_id, listener));
1026            }
1027            Some(RequestType::AddHttpsListener(listener)) => {
1028                push_queue(self.notify_add_https_listener(&req_id, listener));
1029            }
1030            Some(RequestType::AddTcpListener(listener)) => {
1031                push_queue(self.notify_add_tcp_listener(&req_id, listener));
1032            }
1033            Some(RequestType::RemoveListener(ref remove)) => {
1034                debug!("{} remove {:?} listener {:?}", req_id, remove.proxy, remove);
1035                self.base_sessions_count -= 1;
1036                let response = match ListenerType::try_from(remove.proxy) {
1037                    Ok(ListenerType::Http) => self.http.borrow_mut().notify(request),
1038                    Ok(ListenerType::Https) => self.https.borrow_mut().notify(request),
1039                    Ok(ListenerType::Tcp) => self.tcp.borrow_mut().notify(request),
1040                    Err(_) => WorkerResponse::error(req_id, "Wrong variant ListenerType"),
1041                };
1042                push_queue(response);
1043            }
1044            Some(RequestType::ActivateListener(ref activate)) => {
1045                push_queue(self.notify_activate_listener(&req_id, activate));
1046            }
1047            Some(RequestType::DeactivateListener(ref deactivate)) => {
1048                push_queue(self.notify_deactivate_listener(&req_id, deactivate));
1049            }
1050            _other_request => {}
1051        };
1052    }
1053
1054    fn add_cluster(&mut self, cluster: &Cluster) {
1055        self.backends
1056            .borrow_mut()
1057            .set_load_balancing_policy_for_cluster(
1058                &cluster.cluster_id,
1059                LoadBalancingAlgorithms::try_from(cluster.load_balancing).unwrap_or_default(),
1060                cluster
1061                    .load_metric
1062                    .and_then(|n| LoadMetric::try_from(n).ok()),
1063            );
1064    }
1065
1066    fn add_backend(&mut self, req_id: &str, add_backend: &AddBackend) -> WorkerResponse {
1067        let new_backend = Backend::new(
1068            &add_backend.backend_id,
1069            add_backend.address.into(),
1070            add_backend.sticky_id.clone(),
1071            add_backend.load_balancing_parameters,
1072            add_backend.backup,
1073        );
1074        self.backends
1075            .borrow_mut()
1076            .add_backend(&add_backend.cluster_id, new_backend);
1077
1078        WorkerResponse::ok(req_id)
1079    }
1080
1081    fn remove_backend(&mut self, req_id: &str, backend: &RemoveBackend) -> WorkerResponse {
1082        let address = backend.address.into();
1083        self.backends
1084            .borrow_mut()
1085            .remove_backend(&backend.cluster_id, &address);
1086
1087        WorkerResponse::ok(req_id)
1088    }
1089
1090    fn notify_add_http_listener(
1091        &mut self,
1092        req_id: &str,
1093        listener: HttpListenerConfig,
1094    ) -> WorkerResponse {
1095        debug!("{} add http listener {:?}", req_id, listener);
1096
1097        if self.sessions.borrow().at_capacity() {
1098            return worker_response_error(req_id, "session list is full, cannot add a listener");
1099        }
1100
1101        let mut session_manager = self.sessions.borrow_mut();
1102        let entry = session_manager.slab.vacant_entry();
1103        let token = Token(entry.key());
1104
1105        match self.http.borrow_mut().add_listener(listener, token) {
1106            Ok(_token) => {
1107                entry.insert(Rc::new(RefCell::new(ListenSession {
1108                    protocol: Protocol::HTTPListen,
1109                })));
1110                self.base_sessions_count += 1;
1111                WorkerResponse::ok(req_id)
1112            }
1113            Err(e) => worker_response_error(req_id, format!("Could not add HTTP listener: {e}")),
1114        }
1115    }
1116
1117    fn notify_add_https_listener(
1118        &mut self,
1119        req_id: &str,
1120        listener: HttpsListenerConfig,
1121    ) -> WorkerResponse {
1122        debug!("{} add https listener {:?}", req_id, listener);
1123
1124        if self.sessions.borrow().at_capacity() {
1125            return worker_response_error(req_id, "session list is full, cannot add a listener");
1126        }
1127
1128        let mut session_manager = self.sessions.borrow_mut();
1129        let entry = session_manager.slab.vacant_entry();
1130        let token = Token(entry.key());
1131
1132        match self
1133            .https
1134            .borrow_mut()
1135            .add_listener(listener.clone(), token)
1136        {
1137            Ok(_token) => {
1138                entry.insert(Rc::new(RefCell::new(ListenSession {
1139                    protocol: Protocol::HTTPSListen,
1140                })));
1141                self.base_sessions_count += 1;
1142                WorkerResponse::ok(req_id)
1143            }
1144            Err(e) => worker_response_error(req_id, format!("Could not add HTTPS listener: {e}")),
1145        }
1146    }
1147
1148    fn notify_add_tcp_listener(
1149        &mut self,
1150        req_id: &str,
1151        listener: CommandTcpListener,
1152    ) -> WorkerResponse {
1153        debug!("{} add tcp listener {:?}", req_id, listener);
1154
1155        if self.sessions.borrow().at_capacity() {
1156            return worker_response_error(req_id, "session list is full, cannot add a listener");
1157        }
1158
1159        let mut session_manager = self.sessions.borrow_mut();
1160        let entry = session_manager.slab.vacant_entry();
1161        let token = Token(entry.key());
1162
1163        match self.tcp.borrow_mut().add_listener(listener, token) {
1164            Ok(_token) => {
1165                entry.insert(Rc::new(RefCell::new(ListenSession {
1166                    protocol: Protocol::TCPListen,
1167                })));
1168                self.base_sessions_count += 1;
1169                WorkerResponse::ok(req_id)
1170            }
1171            Err(e) => worker_response_error(req_id, format!("Could not add TCP listener: {e}")),
1172        }
1173    }
1174
1175    fn notify_activate_listener(
1176        &mut self,
1177        req_id: &str,
1178        activate: &ActivateListener,
1179    ) -> WorkerResponse {
1180        debug!(
1181            "{} activate {:?} listener {:?}",
1182            req_id, activate.proxy, activate
1183        );
1184
1185        let address: std::net::SocketAddr = activate.address.into();
1186
1187        match ListenerType::try_from(activate.proxy) {
1188            Ok(ListenerType::Http) => {
1189                let listener = self
1190                    .scm_listeners
1191                    .as_mut()
1192                    .and_then(|s| s.get_http(&address))
1193                    .map(|fd| unsafe { MioTcpListener::from_raw_fd(fd) });
1194
1195                let activated_token = self.http.borrow_mut().activate_listener(&address, listener);
1196                match activated_token {
1197                    Ok(token) => {
1198                        self.accept(ListenToken(token.0), Protocol::HTTPListen);
1199                        WorkerResponse::ok(req_id)
1200                    }
1201                    Err(activate_error) => worker_response_error(
1202                        req_id,
1203                        format!("Could not activate HTTP listener: {}", activate_error),
1204                    ),
1205                }
1206            }
1207            Ok(ListenerType::Https) => {
1208                let listener = self
1209                    .scm_listeners
1210                    .as_mut()
1211                    .and_then(|s| s.get_https(&address))
1212                    .map(|fd| unsafe { MioTcpListener::from_raw_fd(fd) });
1213
1214                let activated_token = self
1215                    .https
1216                    .borrow_mut()
1217                    .activate_listener(&address, listener);
1218                match activated_token {
1219                    Ok(token) => {
1220                        self.accept(ListenToken(token.0), Protocol::HTTPSListen);
1221                        WorkerResponse::ok(req_id)
1222                    }
1223                    Err(activate_error) => worker_response_error(
1224                        req_id,
1225                        format!("Could not activate HTTPS listener: {}", activate_error),
1226                    ),
1227                }
1228            }
1229            Ok(ListenerType::Tcp) => {
1230                let listener = self
1231                    .scm_listeners
1232                    .as_mut()
1233                    .and_then(|s| s.get_tcp(&address))
1234                    .map(|fd| unsafe { MioTcpListener::from_raw_fd(fd) });
1235
1236                let listener_token = self.tcp.borrow_mut().activate_listener(&address, listener);
1237                match listener_token {
1238                    Ok(token) => {
1239                        self.accept(ListenToken(token.0), Protocol::TCPListen);
1240                        WorkerResponse::ok(req_id)
1241                    }
1242                    Err(activate_error) => worker_response_error(
1243                        req_id,
1244                        format!("Could not activate TCP listener: {}", activate_error),
1245                    ),
1246                }
1247            }
1248            Err(_) => worker_response_error(req_id, "Wrong variant for ListenerType on request"),
1249        }
1250    }
1251
1252    fn notify_deactivate_listener(
1253        &mut self,
1254        req_id: &str,
1255        deactivate: &DeactivateListener,
1256    ) -> WorkerResponse {
1257        debug!(
1258            "{} deactivate {:?} listener {:?}",
1259            req_id, deactivate.proxy, deactivate
1260        );
1261
1262        let address: std::net::SocketAddr = deactivate.address.into();
1263
1264        match ListenerType::try_from(deactivate.proxy) {
1265            Ok(ListenerType::Http) => {
1266                let (token, mut listener) = match self.http.borrow_mut().give_back_listener(address)
1267                {
1268                    Ok((token, listener)) => (token, listener),
1269                    Err(e) => {
1270                        return worker_response_error(
1271                            req_id,
1272                            format!(
1273                                "Couldn't deactivate HTTP listener at address {address:?}: {e}"
1274                            ),
1275                        );
1276                    }
1277                };
1278
1279                if let Err(e) = self.poll.registry().deregister(&mut listener) {
1280                    error!(
1281                        "error deregistering HTTP listen socket({:?}): {:?}",
1282                        deactivate, e
1283                    );
1284                }
1285
1286                {
1287                    let mut sessions = self.sessions.borrow_mut();
1288                    if sessions.slab.contains(token.0) {
1289                        sessions.slab.remove(token.0);
1290                        info!("removed listen token {:?}", token);
1291                    }
1292                }
1293
1294                if deactivate.to_scm {
1295                    self.unblock_scm_socket();
1296                    let listeners = Listeners {
1297                        http: vec![(address, listener.as_raw_fd())],
1298                        tls: vec![],
1299                        tcp: vec![],
1300                    };
1301                    info!("sending HTTP listener: {:?}", listeners);
1302                    let res = self.scm.send_listeners(&listeners);
1303
1304                    self.block_scm_socket();
1305
1306                    info!("sent HTTP listener: {:?}", res);
1307                }
1308                WorkerResponse::ok(req_id)
1309            }
1310            Ok(ListenerType::Https) => {
1311                let (token, mut listener) = match self
1312                    .https
1313                    .borrow_mut()
1314                    .give_back_listener(address)
1315                {
1316                    Ok((token, listener)) => (token, listener),
1317                    Err(e) => {
1318                        return worker_response_error(
1319                            req_id,
1320                            format!(
1321                                "Couldn't deactivate HTTPS listener at address {address:?}: {e}",
1322                            ),
1323                        );
1324                    }
1325                };
1326                if let Err(e) = self.poll.registry().deregister(&mut listener) {
1327                    error!(
1328                        "error deregistering HTTPS listen socket({:?}): {:?}",
1329                        deactivate, e
1330                    );
1331                }
1332                if self.sessions.borrow().slab.contains(token.0) {
1333                    self.sessions.borrow_mut().slab.remove(token.0);
1334                    info!("removed listen token {:?}", token);
1335                }
1336
1337                if deactivate.to_scm {
1338                    self.unblock_scm_socket();
1339                    let listeners = Listeners {
1340                        http: vec![],
1341                        tls: vec![(address, listener.as_raw_fd())],
1342                        tcp: vec![],
1343                    };
1344                    info!("sending HTTPS listener: {:?}", listeners);
1345                    let res = self.scm.send_listeners(&listeners);
1346
1347                    self.block_scm_socket();
1348
1349                    info!("sent HTTPS listener: {:?}", res);
1350                }
1351                WorkerResponse::ok(req_id)
1352            }
1353            Ok(ListenerType::Tcp) => {
1354                let (token, mut listener) = match self.tcp.borrow_mut().give_back_listener(address)
1355                {
1356                    Ok((token, listener)) => (token, listener),
1357                    Err(e) => {
1358                        return worker_response_error(
1359                            req_id,
1360                            format!(
1361                                "Could not deactivate TCP listener at address {:?}: {}",
1362                                address, e
1363                            ),
1364                        );
1365                    }
1366                };
1367
1368                if let Err(e) = self.poll.registry().deregister(&mut listener) {
1369                    error!(
1370                        "error deregistering TCP listen socket({:?}): {:?}",
1371                        deactivate, e
1372                    );
1373                }
1374                if self.sessions.borrow().slab.contains(token.0) {
1375                    self.sessions.borrow_mut().slab.remove(token.0);
1376                    info!("removed listen token {:?}", token);
1377                }
1378
1379                if deactivate.to_scm {
1380                    self.unblock_scm_socket();
1381                    let listeners = Listeners {
1382                        http: vec![],
1383                        tls: vec![],
1384                        tcp: vec![(address, listener.as_raw_fd())],
1385                    };
1386                    info!("sending TCP listener: {:?}", listeners);
1387                    let res = self.scm.send_listeners(&listeners);
1388
1389                    self.block_scm_socket();
1390
1391                    info!("sent TCP listener: {:?}", res);
1392                }
1393                WorkerResponse::ok(req_id)
1394            }
1395            Err(_) => worker_response_error(req_id, "Wrong variant for ListenerType on request"),
1396        }
1397    }
1398
1399    /// Send all socket addresses and file descriptors of all proxies, via the scm socket
1400    pub fn return_listen_sockets(&mut self) -> Result<(), ScmSocketError> {
1401        self.unblock_scm_socket();
1402
1403        let mut http_listeners = self.http.borrow_mut().give_back_listeners();
1404        for &mut (_, ref mut sock) in http_listeners.iter_mut() {
1405            if let Err(e) = self.poll.registry().deregister(sock) {
1406                error!(
1407                    "error deregistering HTTP listen socket({:?}): {:?}",
1408                    sock, e
1409                );
1410            }
1411        }
1412
1413        let mut https_listeners = self.https.borrow_mut().give_back_listeners();
1414        for &mut (_, ref mut sock) in https_listeners.iter_mut() {
1415            if let Err(e) = self.poll.registry().deregister(sock) {
1416                error!(
1417                    "error deregistering HTTPS listen socket({:?}): {:?}",
1418                    sock, e
1419                );
1420            }
1421        }
1422
1423        let mut tcp_listeners = self.tcp.borrow_mut().give_back_listeners();
1424        for &mut (_, ref mut sock) in tcp_listeners.iter_mut() {
1425            if let Err(e) = self.poll.registry().deregister(sock) {
1426                error!("error deregistering TCP listen socket({:?}): {:?}", sock, e);
1427            }
1428        }
1429
1430        // use as_raw_fd because the listeners should be dropped after sending them
1431        let listeners = Listeners {
1432            http: http_listeners
1433                .iter()
1434                .map(|(addr, listener)| (*addr, listener.as_raw_fd()))
1435                .collect(),
1436            tls: https_listeners
1437                .iter()
1438                .map(|(addr, listener)| (*addr, listener.as_raw_fd()))
1439                .collect(),
1440            tcp: tcp_listeners
1441                .iter()
1442                .map(|(addr, listener)| (*addr, listener.as_raw_fd()))
1443                .collect(),
1444        };
1445        info!("sending default listeners: {:?}", listeners);
1446        let res = self.scm.send_listeners(&listeners);
1447
1448        self.block_scm_socket();
1449
1450        info!("sent default listeners: {:?}", res);
1451        res
1452    }
1453
1454    fn block_scm_socket(&mut self) {
1455        if let Err(e) = self.scm.set_blocking(true) {
1456            error!("Could not block scm socket: {}", e);
1457        }
1458    }
1459
1460    fn unblock_scm_socket(&mut self) {
1461        if let Err(e) = self.scm.set_blocking(false) {
1462            error!("Could not unblock scm socket: {}", e);
1463        }
1464    }
1465
1466    pub fn to_session(&self, token: Token) -> SessionToken {
1467        SessionToken(token.0)
1468    }
1469
1470    pub fn from_session(&self, token: SessionToken) -> Token {
1471        Token(token.0)
1472    }
1473
1474    pub fn accept(&mut self, token: ListenToken, protocol: Protocol) {
1475        match protocol {
1476            Protocol::TCPListen => loop {
1477                match self.tcp.borrow_mut().accept(token) {
1478                    Ok(sock) => self.accept_queue.push_back((
1479                        sock,
1480                        token,
1481                        Protocol::TCPListen,
1482                        Instant::now(),
1483                    )),
1484                    Err(AcceptError::WouldBlock) => {
1485                        self.accept_ready.remove(&token);
1486                        break;
1487                    }
1488                    Err(other) => {
1489                        error!("error accepting TCP sockets: {:?}", other);
1490                        self.accept_ready.remove(&token);
1491                        break;
1492                    }
1493                }
1494            },
1495            Protocol::HTTPListen => loop {
1496                match self.http.borrow_mut().accept(token) {
1497                    Ok(sock) => self.accept_queue.push_back((
1498                        sock,
1499                        token,
1500                        Protocol::HTTPListen,
1501                        Instant::now(),
1502                    )),
1503                    Err(AcceptError::WouldBlock) => {
1504                        self.accept_ready.remove(&token);
1505                        break;
1506                    }
1507                    Err(other) => {
1508                        error!("error accepting HTTP sockets: {:?}", other);
1509                        self.accept_ready.remove(&token);
1510                        break;
1511                    }
1512                }
1513            },
1514            Protocol::HTTPSListen => loop {
1515                match self.https.borrow_mut().accept(token) {
1516                    Ok(sock) => self.accept_queue.push_back((
1517                        sock,
1518                        token,
1519                        Protocol::HTTPSListen,
1520                        Instant::now(),
1521                    )),
1522                    Err(AcceptError::WouldBlock) => {
1523                        self.accept_ready.remove(&token);
1524                        break;
1525                    }
1526                    Err(other) => {
1527                        error!("error accepting HTTPS sockets: {:?}", other);
1528                        self.accept_ready.remove(&token);
1529                        break;
1530                    }
1531                }
1532            },
1533            _ => panic!("should not call accept() on a HTTP, HTTPS or TCP session"),
1534        }
1535
1536        gauge!("accept_queue.connections", self.accept_queue.len());
1537    }
1538
1539    pub fn create_sessions(&mut self) {
1540        while let Some((sock, token, protocol, timestamp)) = self.accept_queue.pop_back() {
1541            let wait_time = Instant::now() - timestamp;
1542            time!("accept_queue.wait_time", wait_time.as_millis());
1543            if wait_time > self.accept_queue_timeout {
1544                incr!("accept_queue.timeout");
1545                continue;
1546            }
1547
1548            if !self.sessions.borrow_mut().check_limits() {
1549                break;
1550            }
1551
1552            //FIXME: check the timestamp
1553            //TODO: create_session should return the session and
1554            // the server should insert it in the the SessionManager
1555            match protocol {
1556                Protocol::TCPListen => {
1557                    let proxy = self.tcp.clone();
1558                    if self
1559                        .tcp
1560                        .borrow_mut()
1561                        .create_session(sock, token, wait_time, proxy)
1562                        .is_err()
1563                    {
1564                        break;
1565                    }
1566                }
1567                Protocol::HTTPListen => {
1568                    let proxy = self.http.clone();
1569                    if self
1570                        .http
1571                        .borrow_mut()
1572                        .create_session(sock, token, wait_time, proxy)
1573                        .is_err()
1574                    {
1575                        break;
1576                    }
1577                }
1578                Protocol::HTTPSListen => {
1579                    if self
1580                        .https
1581                        .borrow_mut()
1582                        .create_session(sock, token, wait_time, self.https.clone())
1583                        .is_err()
1584                    {
1585                        break;
1586                    }
1587                }
1588                _ => panic!("should not call accept() on a HTTP, HTTPS or TCP session"),
1589            };
1590            self.sessions.borrow_mut().incr();
1591        }
1592
1593        gauge!("accept_queue.connections", self.accept_queue.len());
1594    }
1595
1596    pub fn ready(&mut self, token: Token, events: Ready) {
1597        trace!("PROXY\t{:?} got events: {:?}", token, events);
1598
1599        let session_token = token.0;
1600        if self.sessions.borrow().slab.contains(session_token) {
1601            //info!("sessions contains {:?}", session_token);
1602            let protocol = self.sessions.borrow().slab[session_token]
1603                .borrow()
1604                .protocol();
1605            //info!("protocol: {:?}", protocol);
1606            match protocol {
1607                Protocol::HTTPListen | Protocol::HTTPSListen | Protocol::TCPListen => {
1608                    //info!("PROTOCOL IS LISTEN");
1609                    if events.is_readable() {
1610                        self.accept_ready.insert(ListenToken(token.0));
1611                        if self.sessions.borrow().can_accept {
1612                            self.accept(ListenToken(token.0), protocol);
1613                        }
1614                        return;
1615                    }
1616
1617                    if events.is_writable() {
1618                        error!(
1619                            "received writable for listener {:?}, this should not happen",
1620                            token
1621                        );
1622                        return;
1623                    }
1624
1625                    if events.is_hup() {
1626                        error!("should not happen: server {:?} closed", token);
1627                        return;
1628                    }
1629
1630                    unreachable!();
1631                }
1632                _ => {}
1633            }
1634
1635            let session = self.sessions.borrow_mut().slab[session_token].clone();
1636            session.borrow_mut().update_readiness(token, events);
1637            if session.borrow_mut().ready(session.clone()) {
1638                self.kill_session(session);
1639            }
1640        }
1641    }
1642
1643    pub fn timeout(&mut self, token: Token) {
1644        trace!("PROXY\t{:?} got timeout", token);
1645
1646        let session_token = token.0;
1647        if self.sessions.borrow().slab.contains(session_token) {
1648            let session = self.sessions.borrow_mut().slab[session_token].clone();
1649            if session.borrow_mut().timeout(token) {
1650                self.kill_session(session);
1651            }
1652        }
1653    }
1654
1655    pub fn handle_remaining_readiness(&mut self) {
1656        // try to accept again after handling all session events,
1657        // since we might have released a few session slots
1658        if self.sessions.borrow().can_accept && !self.accept_ready.is_empty() {
1659            while let Some(token) = self
1660                .accept_ready
1661                .iter()
1662                .next()
1663                .map(|token| ListenToken(token.0))
1664            {
1665                let protocol = self.sessions.borrow().slab[token.0].borrow().protocol();
1666                self.accept(token, protocol);
1667                if !self.sessions.borrow().can_accept || self.accept_ready.is_empty() {
1668                    break;
1669                }
1670            }
1671        }
1672    }
1673    fn block_channel(&mut self) {
1674        if let Err(e) = self.channel.blocking() {
1675            error!("Could not block channel: {}", e);
1676        }
1677    }
1678    fn unblock_channel(&mut self) {
1679        if let Err(e) = self.channel.nonblocking() {
1680            error!("Could not block channel: {}", e);
1681        }
1682    }
1683}
1684
1685/// log the error together with the request id
1686/// create a WorkerResponse
1687fn worker_response_error<S: ToString, T: ToString>(request_id: S, error: T) -> WorkerResponse {
1688    error!(
1689        "error on request {}, {}",
1690        request_id.to_string(),
1691        error.to_string()
1692    );
1693    WorkerResponse::error(request_id, error)
1694}
1695
1696pub struct ListenSession {
1697    pub protocol: Protocol,
1698}
1699
1700impl ProxySession for ListenSession {
1701    fn last_event(&self) -> Instant {
1702        Instant::now()
1703    }
1704
1705    fn print_session(&self) {}
1706
1707    fn frontend_token(&self) -> Token {
1708        Token(0)
1709    }
1710
1711    fn protocol(&self) -> Protocol {
1712        self.protocol
1713    }
1714
1715    fn ready(&mut self, _session: Rc<RefCell<dyn ProxySession>>) -> SessionIsToBeClosed {
1716        false
1717    }
1718
1719    fn shutting_down(&mut self) -> SessionIsToBeClosed {
1720        false
1721    }
1722
1723    fn update_readiness(&mut self, _token: Token, _events: Ready) {}
1724
1725    fn close(&mut self) {}
1726
1727    fn timeout(&mut self, _token: Token) -> SessionIsToBeClosed {
1728        error!(
1729            "called ProxySession::timeout(token={:?}, time) on ListenSession {{ protocol: {:?} }}",
1730            _token, self.protocol
1731        );
1732        false
1733    }
1734}