rustmann/
state.rs

1use std::future::Future;
2use std::io;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::task::{Context, Poll};
6
7use futures::future::BoxFuture;
8use futures::FutureExt;
9
10use crate::error::RiemannClientError;
11use crate::options::RiemannClientOptions;
12use crate::transport::Transport;
13
14pub(crate) enum ClientState {
15    Connected(Arc<Transport>),
16    Connecting(BoxFuture<'static, Result<Transport, io::Error>>),
17    Disconnected,
18}
19
20pub(crate) struct Inner {
21    pub(crate) options: RiemannClientOptions,
22    pub(crate) state: ClientState,
23}
24
25impl Future for Inner {
26    type Output = Result<Arc<Transport>, RiemannClientError>;
27
28    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
29        match &mut self.state {
30            ClientState::Connected(conn) => Poll::Ready(Ok(conn.clone())),
31            ClientState::Connecting(ref mut f) => match f.poll_unpin(cx) {
32                Poll::Ready(Ok(conn)) => {
33                    // connected
34                    let connection = Arc::new(conn);
35                    self.state = ClientState::Connected(connection.clone());
36                    Poll::Ready(Ok(connection))
37                }
38                Poll::Ready(Err(e)) => {
39                    // failed to connect, reset to disconnected
40                    self.state = ClientState::Disconnected;
41                    Poll::Ready(Err(RiemannClientError::from(e)))
42                }
43                Poll::Pending => {
44                    // still connecting
45                    Poll::Pending
46                }
47            },
48            ClientState::Disconnected => {
49                let f = Transport::connect(self.options.clone()).boxed();
50                self.state = ClientState::Connecting(f);
51                cx.waker().clone().wake();
52                Poll::Pending
53            }
54        }
55    }
56}