cyfs_lib/ws/
session_manager.rs

1use super::request::*;
2use super::session::*;
3use cyfs_base::*;
4use cyfs_debug::Mutex;
5
6use async_std::io::{Read, Write};
7use http_types::Url;
8use rand::Rng;
9use std::collections::HashMap;
10use std::marker::Send;
11use std::net::SocketAddr;
12use std::sync::Arc;
13
14struct WebSocketSessionManagerInner {
15    list: HashMap<u32, Arc<WebSocketSession>>,
16    next_sid: u32,
17    handler: Option<Box<dyn WebSocketRequestHandler>>,
18
19    // 带状态的select
20    next_select_index: usize,
21}
22
23impl Drop for WebSocketSessionManagerInner {
24    fn drop(&mut self) {
25        warn!("ws session manager dropped! next_sid={}", self.next_sid);
26    }
27}
28
29impl WebSocketSessionManagerInner {
30    fn new(handler: Box<dyn WebSocketRequestHandler>) -> Self {
31        // sid随机化
32        let mut rng = ::rand::thread_rng();
33        let sid = loop {
34            let ret = rng.gen::<u32>();
35            if ret != u32::MAX {
36                break ret;
37            }
38        };
39
40        info!("ws manager sid start at {}", sid);
41
42        Self {
43            list: HashMap::new(),
44            next_sid: sid,
45            handler: Some(handler),
46            next_select_index: 0,
47        }
48    }
49
50    fn get_session(&self, sid: &u32) -> Option<Arc<WebSocketSession>> {
51        self.list.get(sid).map(|v| v.to_owned())
52    }
53
54    // 随机选择一个session
55    fn select_session(&mut self) -> Option<Arc<WebSocketSession>> {
56        match self.list.len() {
57            0 => None,
58            1 => {
59                let session = self.list.iter().next().unwrap().1.to_owned();
60                if session.is_valid() {
61                    Some(session)
62                } else {
63                    None
64                }
65            }
66            count @ _ => {
67                // 多于一个,那么随机选择一个
68                for _ in 0..count {
69                    let index = self.next_select_index % count;
70                    self.next_select_index += 1;
71
72                    let session = self.list.iter().nth(index).unwrap().1.to_owned();
73                    if session.is_valid() {
74                        return Some(session);
75                    }
76                }
77
78                // 所有session都无效
79                None
80            }
81        }
82    }
83
84    fn new_session(
85        &mut self,
86        source: String,
87        conn_info: (SocketAddr, SocketAddr),
88    ) -> BuckyResult<Arc<WebSocketSession>> {
89        let sid = self.next_sid;
90        self.next_sid += 1;
91        if self.next_sid == u32::MAX {
92            self.next_sid = 0;
93        }
94
95        let handler = self.handler.as_ref().map(|item| item.clone_handler());
96        if handler.is_none() {
97            let msg = format!(
98                "new ws session but request handler is empty! source={}",
99                source
100            );
101            error!("{}", msg);
102            return Err(BuckyError::new(BuckyErrorCode::ErrorState, msg));
103        }
104
105        let session = WebSocketSession::new(
106            sid,
107            source,
108            conn_info,
109            handler.unwrap(),
110        );
111
112        let session = Arc::new(session);
113        if let Some(_) = self.list.insert(sid, session.clone()) {
114            unreachable!();
115        }
116
117        Ok(session)
118    }
119
120    pub fn remove_session(&mut self, sid: u32) -> Option<Arc<WebSocketSession>> {
121        self.list.remove(&sid)
122    }
123}
124
125#[derive(Clone)]
126pub struct WebSocketSessionManager(Arc<Mutex<WebSocketSessionManagerInner>>);
127
128impl WebSocketSessionManager {
129    pub fn new(handler: Box<dyn WebSocketRequestHandler>) -> Self {
130        Self(Arc::new(Mutex::new(WebSocketSessionManagerInner::new(
131            handler,
132        ))))
133    }
134
135    pub fn get_session(&self, sid: &u32) -> Option<Arc<WebSocketSession>> {
136        self.0.lock().unwrap().get_session(sid)
137    }
138
139    pub fn select_session(&self) -> Option<Arc<WebSocketSession>> {
140        self.0.lock().unwrap().select_session()
141    }
142
143    pub fn stop(&self) {
144        let mut inner = self.0.lock().unwrap();
145        assert!(inner.handler.is_some());
146        inner.handler = None;
147    }
148
149    pub fn new_session(
150        &self,
151        service_url: &Url,
152        conn_info: (SocketAddr, SocketAddr),
153    ) -> BuckyResult<Arc<WebSocketSession>> {
154        self.0
155            .lock()
156            .unwrap()
157            .new_session(service_url.to_string(), conn_info)
158    }
159
160    pub async fn run_client_session<S>(
161        &self,
162        service_url: &Url,
163        session: Arc<WebSocketSession>,
164        stream: S,
165    ) -> BuckyResult<()>
166    where
167        S: Read + Write + Unpin + Send + 'static,
168    {
169        let inner = self.0.clone();
170
171        let ret = WebSocketSession::run_client(session.clone(), &service_url, stream).await;
172
173        let current = inner.lock().unwrap().remove_session(session.sid());
174        if current.is_none() {
175            unreachable!("session not exists! sid={}", session.sid());
176        }
177
178        ret
179    }
180
181    pub fn run_server_session<S>(
182        &self,
183        source: String,
184        conn_info: (SocketAddr, SocketAddr),
185        stream: S,
186    ) where
187        S: Read + Write + Unpin + Send + 'static,
188    {
189        let inner = self.0.clone();
190        let session = inner
191            .lock()
192            .unwrap()
193            .new_session(source, conn_info)
194            .unwrap();
195        async_std::task::spawn(async move {
196            let _ = WebSocketSession::run_server(session.clone(), stream).await;
197
198            let ret = inner.lock().unwrap().remove_session(session.sid());
199            if ret.is_none() {
200                unreachable!("session not exists! sid={}", session.sid());
201            }
202        });
203    }
204}