Skip to main content

ntex_server/
server.rs

1#![allow(clippy::missing_panics_doc)]
2use std::sync::{Arc, atomic::AtomicBool, atomic::Ordering};
3use std::task::{Context, Poll, ready};
4use std::{future::Future, io, pin::Pin};
5
6use async_channel::Sender;
7use ntex_rt::signals::Signal;
8
9use crate::manager::ServerCommand;
10
11#[derive(Debug)]
12pub(crate) struct ServerShared {
13    pub(crate) paused: AtomicBool,
14}
15
16/// Server controller
17#[derive(Debug)]
18pub struct Server<T> {
19    shared: Arc<ServerShared>,
20    cmd: Sender<ServerCommand<T>>,
21    stop: Option<oneshot::AsyncReceiver<()>>,
22}
23
24impl<T> Server<T> {
25    pub(crate) fn new(cmd: Sender<ServerCommand<T>>, shared: Arc<ServerShared>) -> Self {
26        Server {
27            cmd,
28            shared,
29            stop: None,
30        }
31    }
32
33    /// Start streaming server building process
34    pub fn builder() -> crate::net::ServerBuilder {
35        crate::net::ServerBuilder::default()
36    }
37
38    pub(crate) fn signal(&self, sig: Signal) {
39        let _ = self.cmd.try_send(ServerCommand::Signal(sig));
40    }
41
42    /// Send item to worker pool
43    pub fn process(&self, item: T) -> Result<(), T> {
44        if self.shared.paused.load(Ordering::Acquire) {
45            Err(item)
46        } else if let Err(e) = self.cmd.try_send(ServerCommand::Item(item)) {
47            match e.into_inner() {
48                ServerCommand::Item(item) => Err(item),
49                _ => panic!(),
50            }
51        } else {
52            Ok(())
53        }
54    }
55
56    /// Pause accepting incoming connections
57    ///
58    /// If socket contains some pending connection, they might be dropped.
59    /// All opened connection remains active.
60    pub fn pause(&self) -> impl Future<Output = ()> + use<T> {
61        let (tx, rx) = oneshot::channel();
62        let _ = self.cmd.try_send(ServerCommand::Pause(tx));
63        async move {
64            let _ = rx.await;
65        }
66    }
67
68    /// Resume accepting incoming connections
69    pub fn resume(&self) -> impl Future<Output = ()> + use<T> {
70        let (tx, rx) = oneshot::channel();
71        let _ = self.cmd.try_send(ServerCommand::Resume(tx));
72        async move {
73            let _ = rx.await;
74        }
75    }
76
77    /// Stop incoming connection processing, stop all workers and exit.
78    ///
79    /// If server starts with `spawn()` method, then spawned thread get terminated.
80    pub fn stop(&self, graceful: bool) -> impl Future<Output = ()> + use<T> {
81        let (tx, rx) = oneshot::channel();
82        let _ = self.cmd.try_send(ServerCommand::Stop {
83            graceful,
84            completion: Some(tx),
85        });
86        async move {
87            let _ = rx.await;
88        }
89    }
90}
91
92impl<T> Clone for Server<T> {
93    fn clone(&self) -> Self {
94        Self {
95            cmd: self.cmd.clone(),
96            shared: self.shared.clone(),
97            stop: None,
98        }
99    }
100}
101
102impl<T> Future for Server<T> {
103    type Output = io::Result<()>;
104
105    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
106        let this = self.get_mut();
107
108        if this.stop.is_none() {
109            let (tx, rx) = oneshot::async_channel();
110            if this.cmd.try_send(ServerCommand::NotifyStopped(tx)).is_err() {
111                return Poll::Ready(Ok(()));
112            }
113            this.stop = Some(rx);
114        }
115
116        let _ = ready!(Pin::new(this.stop.as_mut().unwrap()).poll(cx));
117
118        Poll::Ready(Ok(()))
119    }
120}