1use std::future::Future;
2use std::io;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::task::{Context, Poll};
6
7use futures::future::BoxFuture;
8use futures::FutureExt;
9
10use crate::error::RiemannClientError;
11use crate::options::RiemannClientOptions;
12use crate::transport::Transport;
13
14pub(crate) enum ClientState {
15 Connected(Arc<Transport>),
16 Connecting(BoxFuture<'static, Result<Transport, io::Error>>),
17 Disconnected,
18}
19
20pub(crate) struct Inner {
21 pub(crate) options: RiemannClientOptions,
22 pub(crate) state: ClientState,
23}
24
25impl Future for Inner {
26 type Output = Result<Arc<Transport>, RiemannClientError>;
27
28 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
29 match &mut self.state {
30 ClientState::Connected(conn) => Poll::Ready(Ok(conn.clone())),
31 ClientState::Connecting(ref mut f) => match f.poll_unpin(cx) {
32 Poll::Ready(Ok(conn)) => {
33 let connection = Arc::new(conn);
35 self.state = ClientState::Connected(connection.clone());
36 Poll::Ready(Ok(connection))
37 }
38 Poll::Ready(Err(e)) => {
39 self.state = ClientState::Disconnected;
41 Poll::Ready(Err(RiemannClientError::from(e)))
42 }
43 Poll::Pending => {
44 Poll::Pending
46 }
47 },
48 ClientState::Disconnected => {
49 let f = Transport::connect(self.options.clone()).boxed();
50 self.state = ClientState::Connecting(f);
51 cx.waker().clone().wake();
52 Poll::Pending
53 }
54 }
55 }
56}