1#![allow(clippy::missing_panics_doc)]
2use std::sync::{Arc, atomic::AtomicBool, atomic::Ordering};
3use std::task::{Context, Poll, ready};
4use std::{future::Future, io, pin::Pin};
5
6use async_channel::Sender;
7use ntex_rt::signals::Signal;
8
9use crate::manager::ServerCommand;
10
11#[derive(Debug)]
12pub(crate) struct ServerShared {
13 pub(crate) paused: AtomicBool,
14}
15
16#[derive(Debug)]
18pub struct Server<T> {
19 shared: Arc<ServerShared>,
20 cmd: Sender<ServerCommand<T>>,
21 stop: Option<oneshot::AsyncReceiver<()>>,
22}
23
24impl<T> Server<T> {
25 pub(crate) fn new(cmd: Sender<ServerCommand<T>>, shared: Arc<ServerShared>) -> Self {
26 Server {
27 cmd,
28 shared,
29 stop: None,
30 }
31 }
32
33 pub fn builder() -> crate::net::ServerBuilder {
35 crate::net::ServerBuilder::default()
36 }
37
38 pub(crate) fn signal(&self, sig: Signal) {
39 let _ = self.cmd.try_send(ServerCommand::Signal(sig));
40 }
41
42 pub fn process(&self, item: T) -> Result<(), T> {
44 if self.shared.paused.load(Ordering::Acquire) {
45 Err(item)
46 } else if let Err(e) = self.cmd.try_send(ServerCommand::Item(item)) {
47 match e.into_inner() {
48 ServerCommand::Item(item) => Err(item),
49 _ => panic!(),
50 }
51 } else {
52 Ok(())
53 }
54 }
55
56 pub fn pause(&self) -> impl Future<Output = ()> + use<T> {
61 let (tx, rx) = oneshot::channel();
62 let _ = self.cmd.try_send(ServerCommand::Pause(tx));
63 async move {
64 let _ = rx.await;
65 }
66 }
67
68 pub fn resume(&self) -> impl Future<Output = ()> + use<T> {
70 let (tx, rx) = oneshot::channel();
71 let _ = self.cmd.try_send(ServerCommand::Resume(tx));
72 async move {
73 let _ = rx.await;
74 }
75 }
76
77 pub fn stop(&self, graceful: bool) -> impl Future<Output = ()> + use<T> {
81 let (tx, rx) = oneshot::channel();
82 let _ = self.cmd.try_send(ServerCommand::Stop {
83 graceful,
84 completion: Some(tx),
85 });
86 async move {
87 let _ = rx.await;
88 }
89 }
90}
91
92impl<T> Clone for Server<T> {
93 fn clone(&self) -> Self {
94 Self {
95 cmd: self.cmd.clone(),
96 shared: self.shared.clone(),
97 stop: None,
98 }
99 }
100}
101
102impl<T> Future for Server<T> {
103 type Output = io::Result<()>;
104
105 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
106 let this = self.get_mut();
107
108 if this.stop.is_none() {
109 let (tx, rx) = oneshot::async_channel();
110 if this.cmd.try_send(ServerCommand::NotifyStopped(tx)).is_err() {
111 return Poll::Ready(Ok(()));
112 }
113 this.stop = Some(rx);
114 }
115
116 let _ = ready!(Pin::new(this.stop.as_mut().unwrap()).poll(cx));
117
118 Poll::Ready(Ok(()))
119 }
120}