1use crate::talker::{Talker, TalkerCompatible, WsIncoming};
4use anyhow::Error;
5use async_trait::async_trait;
6use futures::channel::mpsc;
7use meio::prelude::{
8 ActionHandler, Actor, Address, InstantAction, InstantActionHandler, LiteTask, StopReceiver,
9};
10use meio_protocol::{Protocol, ProtocolData};
11use std::marker::PhantomData;
12use std::time::{Duration, Instant};
13use thiserror::Error;
14use tokio::net::TcpStream;
15use tokio::time::sleep;
16use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
17
18#[derive(Debug, Clone)]
20pub struct WsSender<T: ProtocolData> {
21 log_target: String,
22 tx: mpsc::UnboundedSender<T>,
23}
24
25impl<T: ProtocolData> WsSender<T> {
26 pub(crate) fn new(base_log_target: &str, tx: mpsc::UnboundedSender<T>) -> Self {
28 let log_target = format!("{}::WsSender", base_log_target);
29 Self { log_target, tx }
30 }
31
32 pub fn send(&self, msg: T) {
34 if let Err(err) = self.tx.unbounded_send(msg) {
35 log::error!(target: &self.log_target, "Can't send a message to ws outgoing client part: {}", err);
36 }
37 }
38}
39
40#[derive(Debug)]
42pub enum WsClientStatus<P: Protocol> {
43 Connected {
45 sender: WsSender<P::ToServer>,
47 },
48 Failed {
50 reason: WsFailReason,
52 },
53}
54
55#[derive(Error, Debug, Clone)]
57pub enum WsFailReason {
58 #[error("closed by server")]
60 ClosedByServer,
61 #[error("connection failed")]
63 ConnectionFailed,
64 #[error("server not available")]
66 ServerNotAvailable,
67}
68
69impl<P: Protocol> InstantAction for WsClientStatus<P> {}
70
71pub struct WsClient<P, A>
76where
77 A: Actor,
78{
79 log_target: String,
80 url: String,
81 repeat_interval: Option<Duration>,
82 address: Address<A>,
83 _protocol: PhantomData<P>,
84}
85
86impl<P, A> WsClient<P, A>
87where
88 P: Protocol,
89 A: Actor + InstantActionHandler<WsClientStatus<P>> + ActionHandler<WsIncoming<P::ToClient>>,
90{
91 pub fn new(url: String, repeat_interval: Option<Duration>, address: Address<A>) -> Self {
97 let log_target = format!("WsClient::{}", url);
98 Self {
99 log_target,
100 url,
101 repeat_interval,
102 address,
103 _protocol: PhantomData,
104 }
105 }
106}
107
108impl<P, A> TalkerCompatible for WsClient<P, A>
109where
110 P: Protocol,
111 A: Actor + InstantActionHandler<WsClientStatus<P>> + ActionHandler<WsIncoming<P::ToClient>>,
112{
113 type WebSocket = WebSocketStream<MaybeTlsStream<TcpStream>>;
114 type Message = tungstenite::Message;
115 type Error = tungstenite::Error;
116 type Actor = A;
117 type Codec = P::Codec;
118 type Incoming = P::ToClient;
119 type Outgoing = P::ToServer;
120}
121
122#[async_trait]
123impl<P, A> LiteTask for WsClient<P, A>
124where
125 P: Protocol,
126 A: Actor + InstantActionHandler<WsClientStatus<P>> + ActionHandler<WsIncoming<P::ToClient>>,
127{
128 type Output = ();
129
130 fn log_target(&self) -> &str {
131 &self.log_target
132 }
133
134 async fn routine(mut self, stop: StopReceiver) -> Result<Self::Output, Error> {
135 self.connection_routine(stop).await
136 }
137}
138
139impl<P, A> WsClient<P, A>
140where
141 P: Protocol,
142 A: Actor + InstantActionHandler<WsClientStatus<P>> + ActionHandler<WsIncoming<P::ToClient>>,
143{
144 async fn connection_routine(&mut self, mut stop: StopReceiver) -> Result<(), Error> {
146 while stop.is_alive() {
147 log::trace!(target: &self.log_target, "Ws client conencting to: {}", self.url);
148 let res = connect_async(&self.url).await;
149 let mut last_success = Instant::now();
150 let fail_reason;
151 let original_err: Error;
152 match res {
153 Ok((wss, _resp)) => {
154 log::debug!(target: &self.log_target, "Client connected successfully to: {}", self.url);
155 last_success = Instant::now();
156 let (tx, rx) = mpsc::unbounded();
157 let sender = WsSender::new(&self.log_target, tx);
158 self.address
159 .instant(WsClientStatus::<P>::Connected { sender })?;
160 let mut talker = Talker::<Self>::new(
162 &self.log_target,
163 self.address.clone(),
164 wss,
165 rx,
166 stop.clone(),
167 );
168 let res = talker.routine().await;
169 match res {
170 Ok(reason) => {
171 if reason.is_interrupted() {
172 log::info!(target: &self.log_target, "Interrupted by a user");
173 return Ok(());
174 } else {
175 log::error!(target: &self.log_target, "Server closed a connection");
176 fail_reason = WsFailReason::ClosedByServer;
177 original_err = WsFailReason::ClosedByServer.into();
178 }
179 }
180 Err(err) => {
181 log::error!(target: &self.log_target, "Ws connecion to {} failed: {}", self.url, err);
182 fail_reason = WsFailReason::ConnectionFailed;
183 original_err = err;
184 }
185 }
186 }
187 Err(err) => {
188 log::error!(target: &self.log_target, "Can't connect to {}: {}", self.url, err);
189 fail_reason = WsFailReason::ServerNotAvailable;
190 original_err = err.into();
191 }
192 }
193 self.address.instant(WsClientStatus::<P>::Failed {
194 reason: fail_reason.clone(),
195 })?;
196 if let Some(dur) = self.repeat_interval {
197 let elapsed = last_success.elapsed();
198 if elapsed < dur {
199 let remained = dur - elapsed;
200 stop.or(sleep(remained)).await?;
201 }
202 log::debug!(target: &self.log_target, "Next attempt to connect to: {}", self.url);
203 } else {
204 return Err(original_err);
206 }
207 }
208 Ok(())
209 }
210}