cyfs_lib/ws/
client.rs

1use super::request::*;
2use super::session::*;
3use super::session_manager::*;
4use cyfs_base::*;
5
6use async_std::net::TcpStream;
7use cyfs_debug::Mutex;
8use futures::future::{AbortHandle, Abortable, Aborted};
9use http_types::Url;
10use std::net::SocketAddr;
11use std::sync::Arc;
12
13// connect的重试间隔
14const WS_CONNECT_RETRY_MIN_INTERVAL_SECS: u64 = 2;
15const WS_CONNECT_RETRY_MAX_INTERVAL_SECS: u64 = 60;
16
17struct WebSocketClientHandles {
18    // 用以唤醒重试等待
19    waker: Option<AbortHandle>,
20
21    // 取消运行
22    running_task: Option<async_std::task::JoinHandle<()>>,
23}
24
25impl Default for WebSocketClientHandles {
26    fn default() -> Self {
27        Self {
28            waker: None,
29            running_task: None,
30        }
31    }
32}
33
34struct WebSocketClientSessionState {
35    session: Option<Arc<WebSocketSession>>,
36    stopped: bool,
37}
38
39#[derive(Clone)]
40pub struct WebSocketClient {
41    service_url: Url,
42    service_addr: SocketAddr,
43
44    session_manager: WebSocketSessionManager,
45
46    // 用以唤醒重试等待
47    handles: Arc<Mutex<WebSocketClientHandles>>,
48
49    session: Arc<Mutex<WebSocketClientSessionState>>,
50}
51
52impl WebSocketClient {
53    pub fn new(service_url: Url, handler: Box<dyn WebSocketRequestHandler>) -> Self {
54        let service_addr = format!(
55            "{}:{}",
56            service_url.host().unwrap(),
57            service_url.port().unwrap()
58        )
59        .parse()
60        .unwrap();
61
62        Self {
63            service_url,
64            service_addr,
65            session_manager: WebSocketSessionManager::new(handler),
66            handles: Arc::new(Mutex::new(WebSocketClientHandles::default())),
67            session: Arc::new(Mutex::new(WebSocketClientSessionState {
68                session: None,
69                stopped: false,
70            })),
71        }
72    }
73
74    pub fn service_addr(&self) -> &SocketAddr {
75        &self.service_addr
76    }
77
78    // 随机选择一个session
79    pub fn select_session(&self) -> Option<Arc<WebSocketSession>> {
80        self.session_manager.select_session()
81    }
82
83    pub fn start(&self) {
84        let this = self.clone();
85        let task = async_std::task::spawn(this.run());
86
87        let mut handles = self.handles.lock().unwrap();
88        assert!(handles.running_task.is_none());
89        handles.running_task = Some(task);
90    }
91
92    async fn run(self) {
93        let mut retry_interval = WS_CONNECT_RETRY_MIN_INTERVAL_SECS;
94
95        loop {
96            info!("ws will connect to {}", self.service_url);
97
98            match self.run_once().await {
99                Ok(_) => {
100                    warn!("ws session complete");
101                    retry_interval = WS_CONNECT_RETRY_MIN_INTERVAL_SECS;
102                }
103                Err(e) => {
104                    if e.code() == BuckyErrorCode::Aborted {
105                        break;
106                    }
107                    error!("ws session complete with error: {}", e);
108                }
109            };
110
111            let (abort_handle, abort_registration) = AbortHandle::new_pair();
112            self.handles.lock().unwrap().waker = Some(abort_handle);
113            let future = Abortable::new(
114                async_std::task::sleep(std::time::Duration::from_secs(retry_interval)),
115                abort_registration,
116            );
117
118            match future.await {
119                Ok(_) => {
120                    retry_interval *= 2;
121                    if retry_interval >= WS_CONNECT_RETRY_MAX_INTERVAL_SECS {
122                        retry_interval = WS_CONNECT_RETRY_MAX_INTERVAL_SECS;
123                    }
124                }
125                Err(Aborted { .. }) => {
126                    retry_interval = WS_CONNECT_RETRY_MIN_INTERVAL_SECS;
127                }
128            };
129        }
130
131        info!("ws client stopped! url={}", self.service_url);
132    }
133
134    // 立即发起一次重试
135    pub fn retry(&self) {
136        if let Some(waker) = self.handles.lock().unwrap().waker.take() {
137            waker.abort();
138        }
139    }
140
141    pub async fn run_once(&self) -> BuckyResult<()> {
142        let tcp_stream = TcpStream::connect(self.service_addr).await.map_err(|e| {
143            let msg = format!("ws connect to {} error: {}", self.service_addr, e);
144            error!("{}", msg);
145
146            BuckyError::from(e)
147        })?;
148
149        let conn_info = (
150            tcp_stream.local_addr().unwrap(),
151            tcp_stream.peer_addr().unwrap(),
152        );
153
154        let session = self
155            .session_manager
156            .new_session(&self.service_url, conn_info)?;
157
158        let stopped = {
159            let mut state = self.session.lock().unwrap();
160            assert!(state.session.is_none());
161            if !state.stopped {
162                state.session = Some(session.clone());
163            } else {
164                warn!(
165                    "ws client run session but already stopped! sid={}",
166                    session.sid()
167                );
168            }
169
170            state.stopped
171        };
172
173        let ret = if !stopped {
174            let ret = self
175                .session_manager
176                .run_client_session(&self.service_url, session, tcp_stream)
177                .await;
178
179            self.session.lock().unwrap().session.take();
180
181            ret
182        } else {
183            Err(BuckyError::from(BuckyErrorCode::Aborted))
184        };
185
186        ret
187    }
188
189    pub async fn stop(&self) {
190        let session = {
191            let mut state = self.session.lock().unwrap();
192            state.stopped = true;
193            state.session.take()
194        };
195
196        if let Some(session) = session {
197            session.stop();
198        }
199
200        let task = self.handles.lock().unwrap().running_task.take();
201        if let Some(task) = task {
202            task.await;
203        }
204
205        self.session_manager.stop();
206    }
207}