1use std::sync::{atomic::AtomicBool, atomic::Ordering, Arc};
2use std::task::{ready, Context, Poll};
3use std::{future::Future, io, pin::Pin};
4
5use async_channel::Sender;
6
7use crate::{manager::ServerCommand, signals::Signal};
8
9#[derive(Debug)]
10pub(crate) struct ServerShared {
11 pub(crate) paused: AtomicBool,
12}
13
14#[derive(Debug)]
16pub struct Server<T> {
17 shared: Arc<ServerShared>,
18 cmd: Sender<ServerCommand<T>>,
19 stop: Option<oneshot::Receiver<()>>,
20}
21
22impl<T> Server<T> {
23 pub(crate) fn new(cmd: Sender<ServerCommand<T>>, shared: Arc<ServerShared>) -> Self {
24 Server {
25 cmd,
26 shared,
27 stop: None,
28 }
29 }
30
31 pub fn build() -> crate::net::ServerBuilder {
33 crate::net::ServerBuilder::default()
34 }
35
36 pub(crate) fn signal(&self, sig: Signal) {
37 let _ = self.cmd.try_send(ServerCommand::Signal(sig));
38 }
39
40 pub fn process(&self, item: T) -> Result<(), T> {
42 if self.shared.paused.load(Ordering::Acquire) {
43 Err(item)
44 } else if let Err(e) = self.cmd.try_send(ServerCommand::Item(item)) {
45 match e.into_inner() {
46 ServerCommand::Item(item) => Err(item),
47 _ => panic!(),
48 }
49 } else {
50 Ok(())
51 }
52 }
53
54 pub fn pause(&self) -> impl Future<Output = ()> {
59 let (tx, rx) = oneshot::channel();
60 let _ = self.cmd.try_send(ServerCommand::Pause(tx));
61 async move {
62 let _ = rx.await;
63 }
64 }
65
66 pub fn resume(&self) -> impl Future<Output = ()> {
68 let (tx, rx) = oneshot::channel();
69 let _ = self.cmd.try_send(ServerCommand::Resume(tx));
70 async move {
71 let _ = rx.await;
72 }
73 }
74
75 pub fn stop(&self, graceful: bool) -> impl Future<Output = ()> {
79 let (tx, rx) = oneshot::channel();
80 let _ = self.cmd.try_send(ServerCommand::Stop {
81 graceful,
82 completion: Some(tx),
83 });
84 async move {
85 let _ = rx.await;
86 }
87 }
88}
89
90impl<T> Clone for Server<T> {
91 fn clone(&self) -> Self {
92 Self {
93 cmd: self.cmd.clone(),
94 shared: self.shared.clone(),
95 stop: None,
96 }
97 }
98}
99
100impl<T> Future for Server<T> {
101 type Output = io::Result<()>;
102
103 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
104 let this = self.get_mut();
105
106 if this.stop.is_none() {
107 let (tx, rx) = oneshot::channel();
108 if this.cmd.try_send(ServerCommand::NotifyStopped(tx)).is_err() {
109 return Poll::Ready(Ok(()));
110 }
111 this.stop = Some(rx);
112 }
113
114 let _ = ready!(Pin::new(this.stop.as_mut().unwrap()).poll(cx));
115
116 Poll::Ready(Ok(()))
117 }
118}