1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
use futures::sync::mpsc::UnboundedSender;
use futures::sync::oneshot;
use futures::Future;

use crate::builder::ServerBuilder;
use crate::signals::Signal;

#[derive(Debug)]
pub(crate) enum ServerCommand {
    WorkerDied(usize),
    Pause(oneshot::Sender<()>),
    Resume(oneshot::Sender<()>),
    Signal(Signal),
    /// Whether to try and shut down gracefully
    Stop {
        graceful: bool,
        completion: Option<oneshot::Sender<()>>,
    },
}

#[derive(Debug, Clone)]
pub struct Server(UnboundedSender<ServerCommand>);

impl Server {
    pub(crate) fn new(tx: UnboundedSender<ServerCommand>) -> Self {
        Server(tx)
    }

    /// Start server building process
    pub fn build() -> ServerBuilder {
        ServerBuilder::default()
    }

    pub(crate) fn signal(&self, sig: Signal) {
        let _ = self.0.unbounded_send(ServerCommand::Signal(sig));
    }

    pub(crate) fn worker_died(&self, idx: usize) {
        let _ = self.0.unbounded_send(ServerCommand::WorkerDied(idx));
    }

    /// Pause accepting incoming connections
    ///
    /// If socket contains some pending connection, they might be dropped.
    /// All opened connection remains active.
    pub fn pause(&self) -> impl Future<Item = (), Error = ()> {
        let (tx, rx) = oneshot::channel();
        let _ = self.0.unbounded_send(ServerCommand::Pause(tx));
        rx.map_err(|_| ())
    }

    /// Resume accepting incoming connections
    pub fn resume(&self) -> impl Future<Item = (), Error = ()> {
        let (tx, rx) = oneshot::channel();
        let _ = self.0.unbounded_send(ServerCommand::Resume(tx));
        rx.map_err(|_| ())
    }

    /// Stop incoming connection processing, stop all workers and exit.
    ///
    /// If server starts with `spawn()` method, then spawned thread get terminated.
    pub fn stop(&self, graceful: bool) -> impl Future<Item = (), Error = ()> {
        let (tx, rx) = oneshot::channel();
        let _ = self.0.unbounded_send(ServerCommand::Stop {
            graceful,
            completion: Some(tx),
        });
        rx.map_err(|_| ())
    }
}