ntex_server/
server.rs

1use std::sync::{atomic::AtomicBool, atomic::Ordering, Arc};
2use std::task::{ready, Context, Poll};
3use std::{future::Future, io, pin::Pin};
4
5use async_channel::Sender;
6
7use crate::{manager::ServerCommand, signals::Signal};
8
9#[derive(Debug)]
10pub(crate) struct ServerShared {
11    pub(crate) paused: AtomicBool,
12}
13
14/// Server controller
15#[derive(Debug)]
16pub struct Server<T> {
17    shared: Arc<ServerShared>,
18    cmd: Sender<ServerCommand<T>>,
19    stop: Option<oneshot::Receiver<()>>,
20}
21
22impl<T> Server<T> {
23    pub(crate) fn new(cmd: Sender<ServerCommand<T>>, shared: Arc<ServerShared>) -> Self {
24        Server {
25            cmd,
26            shared,
27            stop: None,
28        }
29    }
30
31    /// Start streaming server building process
32    pub fn build() -> crate::net::ServerBuilder {
33        crate::net::ServerBuilder::default()
34    }
35
36    pub(crate) fn signal(&self, sig: Signal) {
37        let _ = self.cmd.try_send(ServerCommand::Signal(sig));
38    }
39
40    /// Send item to worker pool
41    pub fn process(&self, item: T) -> Result<(), T> {
42        if self.shared.paused.load(Ordering::Acquire) {
43            Err(item)
44        } else if let Err(e) = self.cmd.try_send(ServerCommand::Item(item)) {
45            match e.into_inner() {
46                ServerCommand::Item(item) => Err(item),
47                _ => panic!(),
48            }
49        } else {
50            Ok(())
51        }
52    }
53
54    /// Pause accepting incoming connections
55    ///
56    /// If socket contains some pending connection, they might be dropped.
57    /// All opened connection remains active.
58    pub fn pause(&self) -> impl Future<Output = ()> {
59        let (tx, rx) = oneshot::channel();
60        let _ = self.cmd.try_send(ServerCommand::Pause(tx));
61        async move {
62            let _ = rx.await;
63        }
64    }
65
66    /// Resume accepting incoming connections
67    pub fn resume(&self) -> impl Future<Output = ()> {
68        let (tx, rx) = oneshot::channel();
69        let _ = self.cmd.try_send(ServerCommand::Resume(tx));
70        async move {
71            let _ = rx.await;
72        }
73    }
74
75    /// Stop incoming connection processing, stop all workers and exit.
76    ///
77    /// If server starts with `spawn()` method, then spawned thread get terminated.
78    pub fn stop(&self, graceful: bool) -> impl Future<Output = ()> {
79        let (tx, rx) = oneshot::channel();
80        let _ = self.cmd.try_send(ServerCommand::Stop {
81            graceful,
82            completion: Some(tx),
83        });
84        async move {
85            let _ = rx.await;
86        }
87    }
88}
89
90impl<T> Clone for Server<T> {
91    fn clone(&self) -> Self {
92        Self {
93            cmd: self.cmd.clone(),
94            shared: self.shared.clone(),
95            stop: None,
96        }
97    }
98}
99
100impl<T> Future for Server<T> {
101    type Output = io::Result<()>;
102
103    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
104        let this = self.get_mut();
105
106        if this.stop.is_none() {
107            let (tx, rx) = oneshot::channel();
108            if this.cmd.try_send(ServerCommand::NotifyStopped(tx)).is_err() {
109                return Poll::Ready(Ok(()));
110            }
111            this.stop = Some(rx);
112        }
113
114        let _ = ready!(Pin::new(this.stop.as_mut().unwrap()).poll(cx));
115
116        Poll::Ready(Ok(()))
117    }
118}