scrappy_server/
server.rs

1use std::future::Future;
2use std::io;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use futures::channel::mpsc::UnboundedSender;
7use futures::channel::oneshot;
8use futures::FutureExt;
9
10use crate::builder::ServerBuilder;
11use crate::signals::Signal;
12
13#[derive(Debug)]
14pub(crate) enum ServerCommand {
15    WorkerFaulted(usize),
16    Pause(oneshot::Sender<()>),
17    Resume(oneshot::Sender<()>),
18    Signal(Signal),
19    /// Whether to try and shut down gracefully
20    Stop {
21        graceful: bool,
22        completion: Option<oneshot::Sender<()>>,
23    },
24    /// Notify of server stop
25    Notify(oneshot::Sender<()>),
26}
27
28#[derive(Debug)]
29pub struct Server(
30    UnboundedSender<ServerCommand>,
31    Option<oneshot::Receiver<()>>,
32);
33
34impl Server {
35    pub(crate) fn new(tx: UnboundedSender<ServerCommand>) -> Self {
36        Server(tx, None)
37    }
38
39    /// Start server building process
40    pub fn build() -> ServerBuilder {
41        ServerBuilder::default()
42    }
43
44    pub(crate) fn signal(&self, sig: Signal) {
45        let _ = self.0.unbounded_send(ServerCommand::Signal(sig));
46    }
47
48    pub(crate) fn worker_faulted(&self, idx: usize) {
49        let _ = self.0.unbounded_send(ServerCommand::WorkerFaulted(idx));
50    }
51
52    /// Pause accepting incoming connections
53    ///
54    /// If socket contains some pending connection, they might be dropped.
55    /// All opened connection remains active.
56    pub fn pause(&self) -> impl Future<Output = ()> {
57        let (tx, rx) = oneshot::channel();
58        let _ = self.0.unbounded_send(ServerCommand::Pause(tx));
59        rx.map(|_| ())
60    }
61
62    /// Resume accepting incoming connections
63    pub fn resume(&self) -> impl Future<Output = ()> {
64        let (tx, rx) = oneshot::channel();
65        let _ = self.0.unbounded_send(ServerCommand::Resume(tx));
66        rx.map(|_| ())
67    }
68
69    /// Stop incoming connection processing, stop all workers and exit.
70    ///
71    /// If server starts with `spawn()` method, then spawned thread get terminated.
72    pub fn stop(&self, graceful: bool) -> impl Future<Output = ()> {
73        let (tx, rx) = oneshot::channel();
74        let _ = self.0.unbounded_send(ServerCommand::Stop {
75            graceful,
76            completion: Some(tx),
77        });
78        rx.map(|_| ())
79    }
80}
81
82impl Clone for Server {
83    fn clone(&self) -> Self {
84        Self(self.0.clone(), None)
85    }
86}
87
88impl Future for Server {
89    type Output = io::Result<()>;
90
91    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
92        let this = self.get_mut();
93
94        if this.1.is_none() {
95            let (tx, rx) = oneshot::channel();
96            if this.0.unbounded_send(ServerCommand::Notify(tx)).is_err() {
97                return Poll::Ready(Ok(()));
98            }
99            this.1 = Some(rx);
100        }
101
102        match Pin::new(this.1.as_mut().unwrap()).poll(cx) {
103            Poll::Pending => Poll::Pending,
104            Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
105            Poll::Ready(Err(_)) => Poll::Ready(Ok(())),
106        }
107    }
108}