Skip to main content

ntex_server/
server.rs

1#![allow(clippy::missing_panics_doc)]
2use std::sync::{Arc, atomic::AtomicBool, atomic::Ordering};
3use std::task::{Context, Poll, ready};
4use std::{future::Future, io, pin::Pin};
5
6use async_channel::Sender;
7
8use crate::{manager::ServerCommand, signals::Signal};
9
10#[derive(Debug)]
11pub(crate) struct ServerShared {
12    pub(crate) paused: AtomicBool,
13}
14
15/// Server controller
16#[derive(Debug)]
17pub struct Server<T> {
18    shared: Arc<ServerShared>,
19    cmd: Sender<ServerCommand<T>>,
20    stop: Option<oneshot::Receiver<()>>,
21}
22
23impl<T> Server<T> {
24    pub(crate) fn new(cmd: Sender<ServerCommand<T>>, shared: Arc<ServerShared>) -> Self {
25        Server {
26            cmd,
27            shared,
28            stop: None,
29        }
30    }
31
32    /// Start streaming server building process
33    pub fn builder() -> crate::net::ServerBuilder {
34        crate::net::ServerBuilder::default()
35    }
36
37    pub(crate) fn signal(&self, sig: Signal) {
38        let _ = self.cmd.try_send(ServerCommand::Signal(sig));
39    }
40
41    /// Send item to worker pool
42    pub fn process(&self, item: T) -> Result<(), T> {
43        if self.shared.paused.load(Ordering::Acquire) {
44            Err(item)
45        } else if let Err(e) = self.cmd.try_send(ServerCommand::Item(item)) {
46            match e.into_inner() {
47                ServerCommand::Item(item) => Err(item),
48                _ => panic!(),
49            }
50        } else {
51            Ok(())
52        }
53    }
54
55    /// Pause accepting incoming connections
56    ///
57    /// If socket contains some pending connection, they might be dropped.
58    /// All opened connection remains active.
59    pub fn pause(&self) -> impl Future<Output = ()> + use<T> {
60        let (tx, rx) = oneshot::channel();
61        let _ = self.cmd.try_send(ServerCommand::Pause(tx));
62        async move {
63            let _ = rx.await;
64        }
65    }
66
67    /// Resume accepting incoming connections
68    pub fn resume(&self) -> impl Future<Output = ()> + use<T> {
69        let (tx, rx) = oneshot::channel();
70        let _ = self.cmd.try_send(ServerCommand::Resume(tx));
71        async move {
72            let _ = rx.await;
73        }
74    }
75
76    /// Stop incoming connection processing, stop all workers and exit.
77    ///
78    /// If server starts with `spawn()` method, then spawned thread get terminated.
79    pub fn stop(&self, graceful: bool) -> impl Future<Output = ()> + use<T> {
80        let (tx, rx) = oneshot::channel();
81        let _ = self.cmd.try_send(ServerCommand::Stop {
82            graceful,
83            completion: Some(tx),
84        });
85        async move {
86            let _ = rx.await;
87        }
88    }
89}
90
91impl<T> Clone for Server<T> {
92    fn clone(&self) -> Self {
93        Self {
94            cmd: self.cmd.clone(),
95            shared: self.shared.clone(),
96            stop: None,
97        }
98    }
99}
100
101impl<T> Future for Server<T> {
102    type Output = io::Result<()>;
103
104    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
105        let this = self.get_mut();
106
107        if this.stop.is_none() {
108            let (tx, rx) = oneshot::channel();
109            if this.cmd.try_send(ServerCommand::NotifyStopped(tx)).is_err() {
110                return Poll::Ready(Ok(()));
111            }
112            this.stop = Some(rx);
113        }
114
115        let _ = ready!(Pin::new(this.stop.as_mut().unwrap()).poll(cx));
116
117        Poll::Ready(Ok(()))
118    }
119}