1use super::request::*;
2use super::session::*;
3use super::session_manager::*;
4use cyfs_base::*;
5
6use async_std::net::TcpStream;
7use cyfs_debug::Mutex;
8use futures::future::{AbortHandle, Abortable, Aborted};
9use http_types::Url;
10use std::net::SocketAddr;
11use std::sync::Arc;
12
13const WS_CONNECT_RETRY_MIN_INTERVAL_SECS: u64 = 2;
15const WS_CONNECT_RETRY_MAX_INTERVAL_SECS: u64 = 60;
16
17struct WebSocketClientHandles {
18 waker: Option<AbortHandle>,
20
21 running_task: Option<async_std::task::JoinHandle<()>>,
23}
24
25impl Default for WebSocketClientHandles {
26 fn default() -> Self {
27 Self {
28 waker: None,
29 running_task: None,
30 }
31 }
32}
33
34struct WebSocketClientSessionState {
35 session: Option<Arc<WebSocketSession>>,
36 stopped: bool,
37}
38
39#[derive(Clone)]
40pub struct WebSocketClient {
41 service_url: Url,
42 service_addr: SocketAddr,
43
44 session_manager: WebSocketSessionManager,
45
46 handles: Arc<Mutex<WebSocketClientHandles>>,
48
49 session: Arc<Mutex<WebSocketClientSessionState>>,
50}
51
52impl WebSocketClient {
53 pub fn new(service_url: Url, handler: Box<dyn WebSocketRequestHandler>) -> Self {
54 let service_addr = format!(
55 "{}:{}",
56 service_url.host().unwrap(),
57 service_url.port().unwrap()
58 )
59 .parse()
60 .unwrap();
61
62 Self {
63 service_url,
64 service_addr,
65 session_manager: WebSocketSessionManager::new(handler),
66 handles: Arc::new(Mutex::new(WebSocketClientHandles::default())),
67 session: Arc::new(Mutex::new(WebSocketClientSessionState {
68 session: None,
69 stopped: false,
70 })),
71 }
72 }
73
74 pub fn service_addr(&self) -> &SocketAddr {
75 &self.service_addr
76 }
77
78 pub fn select_session(&self) -> Option<Arc<WebSocketSession>> {
80 self.session_manager.select_session()
81 }
82
83 pub fn start(&self) {
84 let this = self.clone();
85 let task = async_std::task::spawn(this.run());
86
87 let mut handles = self.handles.lock().unwrap();
88 assert!(handles.running_task.is_none());
89 handles.running_task = Some(task);
90 }
91
92 async fn run(self) {
93 let mut retry_interval = WS_CONNECT_RETRY_MIN_INTERVAL_SECS;
94
95 loop {
96 info!("ws will connect to {}", self.service_url);
97
98 match self.run_once().await {
99 Ok(_) => {
100 warn!("ws session complete");
101 retry_interval = WS_CONNECT_RETRY_MIN_INTERVAL_SECS;
102 }
103 Err(e) => {
104 if e.code() == BuckyErrorCode::Aborted {
105 break;
106 }
107 error!("ws session complete with error: {}", e);
108 }
109 };
110
111 let (abort_handle, abort_registration) = AbortHandle::new_pair();
112 self.handles.lock().unwrap().waker = Some(abort_handle);
113 let future = Abortable::new(
114 async_std::task::sleep(std::time::Duration::from_secs(retry_interval)),
115 abort_registration,
116 );
117
118 match future.await {
119 Ok(_) => {
120 retry_interval *= 2;
121 if retry_interval >= WS_CONNECT_RETRY_MAX_INTERVAL_SECS {
122 retry_interval = WS_CONNECT_RETRY_MAX_INTERVAL_SECS;
123 }
124 }
125 Err(Aborted { .. }) => {
126 retry_interval = WS_CONNECT_RETRY_MIN_INTERVAL_SECS;
127 }
128 };
129 }
130
131 info!("ws client stopped! url={}", self.service_url);
132 }
133
134 pub fn retry(&self) {
136 if let Some(waker) = self.handles.lock().unwrap().waker.take() {
137 waker.abort();
138 }
139 }
140
141 pub async fn run_once(&self) -> BuckyResult<()> {
142 let tcp_stream = TcpStream::connect(self.service_addr).await.map_err(|e| {
143 let msg = format!("ws connect to {} error: {}", self.service_addr, e);
144 error!("{}", msg);
145
146 BuckyError::from(e)
147 })?;
148
149 let conn_info = (
150 tcp_stream.local_addr().unwrap(),
151 tcp_stream.peer_addr().unwrap(),
152 );
153
154 let session = self
155 .session_manager
156 .new_session(&self.service_url, conn_info)?;
157
158 let stopped = {
159 let mut state = self.session.lock().unwrap();
160 assert!(state.session.is_none());
161 if !state.stopped {
162 state.session = Some(session.clone());
163 } else {
164 warn!(
165 "ws client run session but already stopped! sid={}",
166 session.sid()
167 );
168 }
169
170 state.stopped
171 };
172
173 let ret = if !stopped {
174 let ret = self
175 .session_manager
176 .run_client_session(&self.service_url, session, tcp_stream)
177 .await;
178
179 self.session.lock().unwrap().session.take();
180
181 ret
182 } else {
183 Err(BuckyError::from(BuckyErrorCode::Aborted))
184 };
185
186 ret
187 }
188
189 pub async fn stop(&self) {
190 let session = {
191 let mut state = self.session.lock().unwrap();
192 state.stopped = true;
193 state.session.take()
194 };
195
196 if let Some(session) = session {
197 session.stop();
198 }
199
200 let task = self.handles.lock().unwrap().running_task.take();
201 if let Some(task) = task {
202 task.await;
203 }
204
205 self.session_manager.stop();
206 }
207}