1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
use futures::{FutureExt, SinkExt, StreamExt};
use serde_json::Value;
use tokio::{select, task::JoinHandle};
use tokio_tungstenite::tungstenite::{Message, Utf8Bytes};
use crate::{Client, NotisResponse, RPCResponse, Result};
impl Client {
/// Starts the main websocket reconnection, listening and channel communication for requests and subcriptions.
pub(crate) async fn run(self) {
let mut retry_attempts = -1;
let mut close_connection = false;
loop {
if close_connection {
{
*self.client_connected.lock().await = false;
}
tracing::debug!("Closed connection");
if let Err(e) = self.close_all_subscriptions().await {
tracing::error!(?e);
}
break;
}
let ws = match Self::connect_websocket(self.uri.clone(), &self.config).await {
Ok(ws) => {
{
*self.client_connected.lock().await = true;
}
// first connection is -1
if retry_attempts != -1 {
self.channels.on_reconnect.send(self.clone()).unwrap();
}
retry_attempts = 0;
ws
}
Err(e) => {
if retry_attempts > self.config.max_retry_attempts {
tracing::error!("failed to reconnect after max_retry_attempts");
break;
}
tracing::debug!("failed to reconnect, {e:?}");
{
*self.client_connected.lock().await = false;
}
tokio::time::sleep(self.config.retry_connection).await;
retry_attempts += 1;
continue;
}
};
let (mut write, mut read) = ws.split();
// response
let _self = self.clone();
let handle_res: JoinHandle<Result<()>> = tokio::spawn(async move {
while let Some(msg) = read.next().await {
if let Ok(msg) = msg {
let response: Value = match msg {
Message::Text(data) => serde_json::from_str(data.as_str())?,
Message::Binary(data) => serde_json::from_slice(&data)?,
Message::Close(_) => return Ok(()),
_ => continue,
};
tracing::trace!(?response);
// if we got params, its a notis
if response.get("params").is_some() {
let data: NotisResponse = serde_json::from_value(response)?;
if _self.channels.notis.receiver_count() > 0 {
_self.channels.notis.send(data)?;
}
// otherwise normal request response
} else {
let data: RPCResponse = serde_json::from_value(response)?;
if _self.channels.response.receiver_count() > 0 {
_self.channels.response.send(data)?;
} else {
tracing::debug!("Ignored res send, no receivers");
}
}
}
}
// if we are here, we lost connection
if let Err(e) = _self.channels.on_disconnect.send(_self.clone()) {
tracing::debug!(?e);
};
Ok(())
});
// request
let mut _req = self.channels.request.subscribe();
let handle_req: JoinHandle<Result<()>> = tokio::spawn(async move {
while let Ok(req) = _req.recv().await {
tracing::debug!("sending {req:?}");
write
.send(Message::Text(Utf8Bytes::from(serde_json::to_string(&req)?)))
.await?;
}
Ok(())
});
// needed so we can on demand close down this entire function.
let _close = self.channels.close.1.clone();
let close_h = tokio::spawn(async move {
if _close.lock().await.recv().await.is_some() {
return true;
}
false
});
select! {
result = handle_res.fuse() => {
if let Ok(Err(e)) = result {
tracing::error!(?e);
}
},
result = handle_req.fuse() => {
if let Ok(Err(e)) = result {
tracing::error!(?e);
}
},
close_ws = close_h.fuse() => {
close_connection = close_ws.unwrap_or(false);
continue;
}
};
tracing::debug!(
"either req or res handles were finished, probably lost websocket connection"
);
}
}
}