1#![allow(clippy::missing_panics_doc)]
2use std::sync::{Arc, atomic::AtomicBool, atomic::Ordering};
3use std::task::{Context, Poll, ready};
4use std::{future::Future, io, pin::Pin};
5
6use async_channel::Sender;
7
8use crate::{manager::ServerCommand, signals::Signal};
9
10#[derive(Debug)]
11pub(crate) struct ServerShared {
12 pub(crate) paused: AtomicBool,
13}
14
15#[derive(Debug)]
17pub struct Server<T> {
18 shared: Arc<ServerShared>,
19 cmd: Sender<ServerCommand<T>>,
20 stop: Option<oneshot::Receiver<()>>,
21}
22
23impl<T> Server<T> {
24 pub(crate) fn new(cmd: Sender<ServerCommand<T>>, shared: Arc<ServerShared>) -> Self {
25 Server {
26 cmd,
27 shared,
28 stop: None,
29 }
30 }
31
32 pub fn builder() -> crate::net::ServerBuilder {
34 crate::net::ServerBuilder::default()
35 }
36
37 pub(crate) fn signal(&self, sig: Signal) {
38 let _ = self.cmd.try_send(ServerCommand::Signal(sig));
39 }
40
41 pub fn process(&self, item: T) -> Result<(), T> {
43 if self.shared.paused.load(Ordering::Acquire) {
44 Err(item)
45 } else if let Err(e) = self.cmd.try_send(ServerCommand::Item(item)) {
46 match e.into_inner() {
47 ServerCommand::Item(item) => Err(item),
48 _ => panic!(),
49 }
50 } else {
51 Ok(())
52 }
53 }
54
55 pub fn pause(&self) -> impl Future<Output = ()> + use<T> {
60 let (tx, rx) = oneshot::channel();
61 let _ = self.cmd.try_send(ServerCommand::Pause(tx));
62 async move {
63 let _ = rx.await;
64 }
65 }
66
67 pub fn resume(&self) -> impl Future<Output = ()> + use<T> {
69 let (tx, rx) = oneshot::channel();
70 let _ = self.cmd.try_send(ServerCommand::Resume(tx));
71 async move {
72 let _ = rx.await;
73 }
74 }
75
76 pub fn stop(&self, graceful: bool) -> impl Future<Output = ()> + use<T> {
80 let (tx, rx) = oneshot::channel();
81 let _ = self.cmd.try_send(ServerCommand::Stop {
82 graceful,
83 completion: Some(tx),
84 });
85 async move {
86 let _ = rx.await;
87 }
88 }
89}
90
91impl<T> Clone for Server<T> {
92 fn clone(&self) -> Self {
93 Self {
94 cmd: self.cmd.clone(),
95 shared: self.shared.clone(),
96 stop: None,
97 }
98 }
99}
100
101impl<T> Future for Server<T> {
102 type Output = io::Result<()>;
103
104 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
105 let this = self.get_mut();
106
107 if this.stop.is_none() {
108 let (tx, rx) = oneshot::channel();
109 if this.cmd.try_send(ServerCommand::NotifyStopped(tx)).is_err() {
110 return Poll::Ready(Ok(()));
111 }
112 this.stop = Some(rx);
113 }
114
115 let _ = ready!(Pin::new(this.stop.as_mut().unwrap()).poll(cx));
116
117 Poll::Ready(Ok(()))
118 }
119}