1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
use super::request::*;
use super::session::*;
use super::session_manager::*;
use cyfs_base::{BuckyError, BuckyResult};
use async_std::net::TcpStream;
use http_types::Url;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use futures::future::{AbortHandle, Abortable};
const WS_CONNECT_RETRY_MIN_INTERVAL_SECS: u64 = 2;
const WS_CONNECT_RETRY_MAX_INTERVAL_SECS: u64 = 60;
#[derive(Clone)]
pub struct WebSocketClient {
service_url: Url,
service_addr: SocketAddr,
session_manager: WebSocketSessionManager,
waker: Arc<Mutex<Option<AbortHandle>>>,
}
impl WebSocketClient {
pub fn new(service_url: Url, handler: Box<dyn WebSocketRequestHandler>) -> Self {
let service_addr = format!(
"{}:{}",
service_url.host().unwrap(),
service_url.port().unwrap()
)
.parse()
.unwrap();
Self {
service_url,
service_addr,
session_manager: WebSocketSessionManager::new(handler),
waker: Arc::new(Mutex::new(None)),
}
}
pub fn service_addr(&self) -> &SocketAddr {
&self.service_addr
}
pub fn select_session(&self) -> Option<Arc<WebSocketSession>> {
self.session_manager.select_session()
}
pub fn start(&self) {
let this = self.clone();
async_std::task::spawn(async move {
this.run().await;
});
}
pub async fn run(self) {
let mut retry_interval = WS_CONNECT_RETRY_MIN_INTERVAL_SECS;
loop {
info!("will ws connect to {}", self.service_url);
match self.run_once().await {
Ok(_) => {
warn!("ws session complete");
retry_interval = WS_CONNECT_RETRY_MIN_INTERVAL_SECS;
}
Err(e) => {
error!("ws session complete with error: {}", e);
}
};
let (abort_handle, abort_registration) = AbortHandle::new_pair();
*self.waker.lock().unwrap() = Some(abort_handle);
let future = Abortable::new(
async_std::task::sleep(std::time::Duration::from_secs(retry_interval)),
abort_registration
);
match future.await {
Ok(_) => {
retry_interval *= 2;
if retry_interval >= WS_CONNECT_RETRY_MAX_INTERVAL_SECS {
retry_interval = WS_CONNECT_RETRY_MAX_INTERVAL_SECS;
}
}
Err(futures::future::Aborted { .. }) => {
retry_interval = WS_CONNECT_RETRY_MIN_INTERVAL_SECS;
}
};
}
}
pub fn retry(&self) {
if let Some(waker) = self.waker.lock().unwrap().take() {
waker.abort();
}
}
pub async fn run_once(&self) -> BuckyResult<()> {
let tcp_stream = TcpStream::connect(self.service_addr).await.map_err(|e| {
let msg = format!("ws connect to {} error: {}", self.service_addr, e);
error!("{}", msg);
BuckyError::from(e)
})?;
let conn_info = (
tcp_stream.local_addr().unwrap(),
tcp_stream.peer_addr().unwrap(),
);
self.session_manager
.run_client_session(&self.service_url, conn_info, tcp_stream)
.await
}
}