meio_connect/
client.rs

1//! The implementation of a WebSocket client.
2
3use crate::talker::{Talker, TalkerCompatible, WsIncoming};
4use anyhow::Error;
5use async_trait::async_trait;
6use futures::channel::mpsc;
7use meio::prelude::{
8    ActionHandler, Actor, Address, InstantAction, InstantActionHandler, LiteTask, StopReceiver,
9};
10use meio_protocol::{Protocol, ProtocolData};
11use std::marker::PhantomData;
12use std::time::{Duration, Instant};
13use thiserror::Error;
14use tokio::net::TcpStream;
15use tokio::time::sleep;
16use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
17
18/// Sender for outgoing messages from a client.
19#[derive(Debug, Clone)]
20pub struct WsSender<T: ProtocolData> {
21    log_target: String,
22    tx: mpsc::UnboundedSender<T>,
23}
24
25impl<T: ProtocolData> WsSender<T> {
26    /// Creates a new instance of the sender.
27    pub(crate) fn new(base_log_target: &str, tx: mpsc::UnboundedSender<T>) -> Self {
28        let log_target = format!("{}::WsSender", base_log_target);
29        Self { log_target, tx }
30    }
31
32    /// Send outgoing message to a server.
33    pub fn send(&self, msg: T) {
34        if let Err(err) = self.tx.unbounded_send(msg) {
35            log::error!(target: &self.log_target, "Can't send a message to ws outgoing client part: {}", err);
36        }
37    }
38}
39
40/// The active status of a connection.
41#[derive(Debug)]
42pub enum WsClientStatus<P: Protocol> {
43    /// Connected to a server.
44    Connected {
45        /// The sender for outgoing messages.
46        sender: WsSender<P::ToServer>,
47    },
48    /// Connection failed. The sender is not valid anymore.
49    Failed {
50        /// The reason of failing.
51        reason: WsFailReason,
52    },
53}
54
55/// The reason of failing of a WebSocket connection.
56#[derive(Error, Debug, Clone)]
57pub enum WsFailReason {
58    /// Connection was closed by a server.
59    #[error("closed by server")]
60    ClosedByServer,
61    /// Connection failed.
62    #[error("connection failed")]
63    ConnectionFailed,
64    /// Server is not available.
65    #[error("server not available")]
66    ServerNotAvailable,
67}
68
69impl<P: Protocol> InstantAction for WsClientStatus<P> {}
70
71/// The client to connect to a WebSocket server.
72///
73/// It's a `LiteTask` attached to an `Actor` that recieves incoming messages and
74/// a `WsSender` instance and uses it send outgoing messages back to a server.
75pub struct WsClient<P, A>
76where
77    A: Actor,
78{
79    log_target: String,
80    url: String,
81    repeat_interval: Option<Duration>,
82    address: Address<A>,
83    _protocol: PhantomData<P>,
84}
85
86impl<P, A> WsClient<P, A>
87where
88    P: Protocol,
89    A: Actor + InstantActionHandler<WsClientStatus<P>> + ActionHandler<WsIncoming<P::ToClient>>,
90{
91    /// Created a new client.
92    ///
93    /// The client connects to `url`, but if connection failed it retries to connect
94    /// in `repeat_interval`. When connection espablished it send incoming data to an
95    /// actor using the `address`.
96    pub fn new(url: String, repeat_interval: Option<Duration>, address: Address<A>) -> Self {
97        let log_target = format!("WsClient::{}", url);
98        Self {
99            log_target,
100            url,
101            repeat_interval,
102            address,
103            _protocol: PhantomData,
104        }
105    }
106}
107
108impl<P, A> TalkerCompatible for WsClient<P, A>
109where
110    P: Protocol,
111    A: Actor + InstantActionHandler<WsClientStatus<P>> + ActionHandler<WsIncoming<P::ToClient>>,
112{
113    type WebSocket = WebSocketStream<MaybeTlsStream<TcpStream>>;
114    type Message = tungstenite::Message;
115    type Error = tungstenite::Error;
116    type Actor = A;
117    type Codec = P::Codec;
118    type Incoming = P::ToClient;
119    type Outgoing = P::ToServer;
120}
121
122#[async_trait]
123impl<P, A> LiteTask for WsClient<P, A>
124where
125    P: Protocol,
126    A: Actor + InstantActionHandler<WsClientStatus<P>> + ActionHandler<WsIncoming<P::ToClient>>,
127{
128    type Output = ();
129
130    fn log_target(&self) -> &str {
131        &self.log_target
132    }
133
134    async fn routine(mut self, stop: StopReceiver) -> Result<Self::Output, Error> {
135        self.connection_routine(stop).await
136    }
137}
138
139impl<P, A> WsClient<P, A>
140where
141    P: Protocol,
142    A: Actor + InstantActionHandler<WsClientStatus<P>> + ActionHandler<WsIncoming<P::ToClient>>,
143{
144    // TODO: Return fail `TermReason` like server does
145    async fn connection_routine(&mut self, mut stop: StopReceiver) -> Result<(), Error> {
146        while stop.is_alive() {
147            log::trace!(target: &self.log_target, "Ws client conencting to: {}", self.url);
148            let res = connect_async(&self.url).await;
149            let mut last_success = Instant::now();
150            let fail_reason;
151            let original_err: Error;
152            match res {
153                Ok((wss, _resp)) => {
154                    log::debug!(target: &self.log_target, "Client connected successfully to: {}", self.url);
155                    last_success = Instant::now();
156                    let (tx, rx) = mpsc::unbounded();
157                    let sender = WsSender::new(&self.log_target, tx);
158                    self.address
159                        .instant(WsClientStatus::<P>::Connected { sender })?;
160                    // Interruptable by a stop
161                    let mut talker = Talker::<Self>::new(
162                        &self.log_target,
163                        self.address.clone(),
164                        wss,
165                        rx,
166                        stop.clone(),
167                    );
168                    let res = talker.routine().await;
169                    match res {
170                        Ok(reason) => {
171                            if reason.is_interrupted() {
172                                log::info!(target: &self.log_target, "Interrupted by a user");
173                                return Ok(());
174                            } else {
175                                log::error!(target: &self.log_target, "Server closed a connection");
176                                fail_reason = WsFailReason::ClosedByServer;
177                                original_err = WsFailReason::ClosedByServer.into();
178                            }
179                        }
180                        Err(err) => {
181                            log::error!(target: &self.log_target, "Ws connecion to {} failed: {}", self.url, err);
182                            fail_reason = WsFailReason::ConnectionFailed;
183                            original_err = err;
184                        }
185                    }
186                }
187                Err(err) => {
188                    log::error!(target: &self.log_target, "Can't connect to {}: {}", self.url, err);
189                    fail_reason = WsFailReason::ServerNotAvailable;
190                    original_err = err.into();
191                }
192            }
193            self.address.instant(WsClientStatus::<P>::Failed {
194                reason: fail_reason.clone(),
195            })?;
196            if let Some(dur) = self.repeat_interval {
197                let elapsed = last_success.elapsed();
198                if elapsed < dur {
199                    let remained = dur - elapsed;
200                    stop.or(sleep(remained)).await?;
201                }
202                log::debug!(target: &self.log_target, "Next attempt to connect to: {}", self.url);
203            } else {
204                // No reconnection required by user
205                return Err(original_err);
206            }
207        }
208        Ok(())
209    }
210}