1use std::future::Future;
2use std::io;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use futures::channel::mpsc::UnboundedSender;
7use futures::channel::oneshot;
8use futures::FutureExt;
9
10use crate::builder::ServerBuilder;
11use crate::signals::Signal;
12
13#[derive(Debug)]
14pub(crate) enum ServerCommand {
15 WorkerFaulted(usize),
16 Pause(oneshot::Sender<()>),
17 Resume(oneshot::Sender<()>),
18 Signal(Signal),
19 Stop {
21 graceful: bool,
22 completion: Option<oneshot::Sender<()>>,
23 },
24 Notify(oneshot::Sender<()>),
26}
27
28#[derive(Debug)]
29pub struct Server(
30 UnboundedSender<ServerCommand>,
31 Option<oneshot::Receiver<()>>,
32);
33
34impl Server {
35 pub(crate) fn new(tx: UnboundedSender<ServerCommand>) -> Self {
36 Server(tx, None)
37 }
38
39 pub fn build() -> ServerBuilder {
41 ServerBuilder::default()
42 }
43
44 pub(crate) fn signal(&self, sig: Signal) {
45 let _ = self.0.unbounded_send(ServerCommand::Signal(sig));
46 }
47
48 pub(crate) fn worker_faulted(&self, idx: usize) {
49 let _ = self.0.unbounded_send(ServerCommand::WorkerFaulted(idx));
50 }
51
52 pub fn pause(&self) -> impl Future<Output = ()> {
57 let (tx, rx) = oneshot::channel();
58 let _ = self.0.unbounded_send(ServerCommand::Pause(tx));
59 rx.map(|_| ())
60 }
61
62 pub fn resume(&self) -> impl Future<Output = ()> {
64 let (tx, rx) = oneshot::channel();
65 let _ = self.0.unbounded_send(ServerCommand::Resume(tx));
66 rx.map(|_| ())
67 }
68
69 pub fn stop(&self, graceful: bool) -> impl Future<Output = ()> {
73 let (tx, rx) = oneshot::channel();
74 let _ = self.0.unbounded_send(ServerCommand::Stop {
75 graceful,
76 completion: Some(tx),
77 });
78 rx.map(|_| ())
79 }
80}
81
82impl Clone for Server {
83 fn clone(&self) -> Self {
84 Self(self.0.clone(), None)
85 }
86}
87
88impl Future for Server {
89 type Output = io::Result<()>;
90
91 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
92 let this = self.get_mut();
93
94 if this.1.is_none() {
95 let (tx, rx) = oneshot::channel();
96 if this.0.unbounded_send(ServerCommand::Notify(tx)).is_err() {
97 return Poll::Ready(Ok(()));
98 }
99 this.1 = Some(rx);
100 }
101
102 match Pin::new(this.1.as_mut().unwrap()).poll(cx) {
103 Poll::Pending => Poll::Pending,
104 Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
105 Poll::Ready(Err(_)) => Poll::Ready(Ok(())),
106 }
107 }
108}