cyfs_lib/ws/
session_manager.rs1use super::request::*;
2use super::session::*;
3use cyfs_base::*;
4use cyfs_debug::Mutex;
5
6use async_std::io::{Read, Write};
7use http_types::Url;
8use rand::Rng;
9use std::collections::HashMap;
10use std::marker::Send;
11use std::net::SocketAddr;
12use std::sync::Arc;
13
14struct WebSocketSessionManagerInner {
15 list: HashMap<u32, Arc<WebSocketSession>>,
16 next_sid: u32,
17 handler: Option<Box<dyn WebSocketRequestHandler>>,
18
19 next_select_index: usize,
21}
22
23impl Drop for WebSocketSessionManagerInner {
24 fn drop(&mut self) {
25 warn!("ws session manager dropped! next_sid={}", self.next_sid);
26 }
27}
28
29impl WebSocketSessionManagerInner {
30 fn new(handler: Box<dyn WebSocketRequestHandler>) -> Self {
31 let mut rng = ::rand::thread_rng();
33 let sid = loop {
34 let ret = rng.gen::<u32>();
35 if ret != u32::MAX {
36 break ret;
37 }
38 };
39
40 info!("ws manager sid start at {}", sid);
41
42 Self {
43 list: HashMap::new(),
44 next_sid: sid,
45 handler: Some(handler),
46 next_select_index: 0,
47 }
48 }
49
50 fn get_session(&self, sid: &u32) -> Option<Arc<WebSocketSession>> {
51 self.list.get(sid).map(|v| v.to_owned())
52 }
53
54 fn select_session(&mut self) -> Option<Arc<WebSocketSession>> {
56 match self.list.len() {
57 0 => None,
58 1 => {
59 let session = self.list.iter().next().unwrap().1.to_owned();
60 if session.is_valid() {
61 Some(session)
62 } else {
63 None
64 }
65 }
66 count @ _ => {
67 for _ in 0..count {
69 let index = self.next_select_index % count;
70 self.next_select_index += 1;
71
72 let session = self.list.iter().nth(index).unwrap().1.to_owned();
73 if session.is_valid() {
74 return Some(session);
75 }
76 }
77
78 None
80 }
81 }
82 }
83
84 fn new_session(
85 &mut self,
86 source: String,
87 conn_info: (SocketAddr, SocketAddr),
88 ) -> BuckyResult<Arc<WebSocketSession>> {
89 let sid = self.next_sid;
90 self.next_sid += 1;
91 if self.next_sid == u32::MAX {
92 self.next_sid = 0;
93 }
94
95 let handler = self.handler.as_ref().map(|item| item.clone_handler());
96 if handler.is_none() {
97 let msg = format!(
98 "new ws session but request handler is empty! source={}",
99 source
100 );
101 error!("{}", msg);
102 return Err(BuckyError::new(BuckyErrorCode::ErrorState, msg));
103 }
104
105 let session = WebSocketSession::new(
106 sid,
107 source,
108 conn_info,
109 handler.unwrap(),
110 );
111
112 let session = Arc::new(session);
113 if let Some(_) = self.list.insert(sid, session.clone()) {
114 unreachable!();
115 }
116
117 Ok(session)
118 }
119
120 pub fn remove_session(&mut self, sid: u32) -> Option<Arc<WebSocketSession>> {
121 self.list.remove(&sid)
122 }
123}
124
125#[derive(Clone)]
126pub struct WebSocketSessionManager(Arc<Mutex<WebSocketSessionManagerInner>>);
127
128impl WebSocketSessionManager {
129 pub fn new(handler: Box<dyn WebSocketRequestHandler>) -> Self {
130 Self(Arc::new(Mutex::new(WebSocketSessionManagerInner::new(
131 handler,
132 ))))
133 }
134
135 pub fn get_session(&self, sid: &u32) -> Option<Arc<WebSocketSession>> {
136 self.0.lock().unwrap().get_session(sid)
137 }
138
139 pub fn select_session(&self) -> Option<Arc<WebSocketSession>> {
140 self.0.lock().unwrap().select_session()
141 }
142
143 pub fn stop(&self) {
144 let mut inner = self.0.lock().unwrap();
145 assert!(inner.handler.is_some());
146 inner.handler = None;
147 }
148
149 pub fn new_session(
150 &self,
151 service_url: &Url,
152 conn_info: (SocketAddr, SocketAddr),
153 ) -> BuckyResult<Arc<WebSocketSession>> {
154 self.0
155 .lock()
156 .unwrap()
157 .new_session(service_url.to_string(), conn_info)
158 }
159
160 pub async fn run_client_session<S>(
161 &self,
162 service_url: &Url,
163 session: Arc<WebSocketSession>,
164 stream: S,
165 ) -> BuckyResult<()>
166 where
167 S: Read + Write + Unpin + Send + 'static,
168 {
169 let inner = self.0.clone();
170
171 let ret = WebSocketSession::run_client(session.clone(), &service_url, stream).await;
172
173 let current = inner.lock().unwrap().remove_session(session.sid());
174 if current.is_none() {
175 unreachable!("session not exists! sid={}", session.sid());
176 }
177
178 ret
179 }
180
181 pub fn run_server_session<S>(
182 &self,
183 source: String,
184 conn_info: (SocketAddr, SocketAddr),
185 stream: S,
186 ) where
187 S: Read + Write + Unpin + Send + 'static,
188 {
189 let inner = self.0.clone();
190 let session = inner
191 .lock()
192 .unwrap()
193 .new_session(source, conn_info)
194 .unwrap();
195 async_std::task::spawn(async move {
196 let _ = WebSocketSession::run_server(session.clone(), stream).await;
197
198 let ret = inner.lock().unwrap().remove_session(session.sid());
199 if ret.is_none() {
200 unreachable!("session not exists! sid={}", session.sid());
201 }
202 });
203 }
204}