br_web_server/
lib.rs

1mod base64;
2pub mod stream;
3pub mod config;
4mod encoding;
5pub mod request;
6pub mod response;
7pub mod websocket;
8use crate::stream::{Protocol, Scheme};
9use crate::config::{Config};
10use crate::request::{Method, Request, Upgrade};
11use crate::response::Response;
12use crate::websocket::{CloseCode, ErrorCode, Message, Websocket, USERS};
13use log::{error, info, warn};
14use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod};
15use std::io::{Error};
16use std::net::{IpAddr, SocketAddr, TcpListener};
17use std::sync::{Arc, Mutex};
18use std::time::{Duration, Instant};
19use std::{io, thread};
20use std::fmt::Debug;
21
22/// 网络服务
23#[derive(Clone, Debug)]
24pub struct WebServer;
25
26impl WebServer {
27    /// 后台服务器
28    pub fn new_service(config: &Config, factory: fn(out: Websocket) -> Box<dyn Handler>) {
29        loop {
30            match WebServer::service(&config.clone(), factory) {
31                Ok(()) => {}
32                Err(e) => error!("服务器错误: {}[{}]: {}", file!(), line!(), e),
33            }
34            warn!("服务器 1秒后重启");
35            thread::sleep(Duration::from_secs(1));
36        }
37    }
38    fn service(config: &Config, factory: fn(out: Websocket) -> Box<dyn Handler>) -> io::Result<()> {
39        info!("==================== 网络服务 服务信息 ====================");
40        info!("日志记录: {}",if config.log {"开启"} else {"关闭"});
41        info!("调试模式: {}",if config.debug { "开启" } else { "关闭" });
42        info!("地    址: {}", config.host);
43        info!("端 口 号: {}", config.port);
44        info!("服务地址: {}://{}{}",if config.https { "https" } else { "http" },config.host,if config.port > 0 {format!(":{}", config.port)} else {String::new()});
45        info!("根 目 录: {}", config.root_path.to_str().unwrap());
46        info!("访问目录: {}", config.public);
47        info!("运行目录: {}", config.runtime);
48        info!("SSL/TLS: {}",if config.https { "开启" } else { "关闭" });
49
50        if config.https {
51            info!("证书目录KEY: {:?}", config.tls.key);
52            info!("证书目录PEM: {:?}", config.tls.certs);
53        }
54
55        let addrs = [SocketAddr::from((
56            IpAddr::V4(config.host.parse().unwrap()),
57            config.port,
58        ))];
59        let listener = TcpListener::bind(&addrs[..])?;
60        info!("==================== 网络服务 启动成功 ====================");
61
62        let acceptor = Self::ssl(&config.clone())?;
63        for stream in listener.incoming() {
64            match stream {
65                Ok(stream) => {
66                    let config_new = config.clone();
67                    let acceptor_new = acceptor.clone();
68
69                    thread::spawn(move || -> io::Result<()> {
70                        let handle_time = Instant::now();
71
72                        // 设置超时时间
73                        if config_new.write_timeout > 0 {
74                            stream.set_write_timeout(Some(Duration::from_secs(config_new.write_timeout))).unwrap_or_default();
75                        }
76                        if config_new.read_timeout > 0 {
77                            stream.set_read_timeout(Some(Duration::from_secs(config_new.read_timeout))).unwrap_or_default();
78                        }
79
80                        // 获取请求客户端IP
81                        let client_ip = stream.peer_addr().unwrap().ip().to_string();
82                        // 获取服务端IP
83                        let server_ip = stream.local_addr().unwrap().ip().to_string();
84
85
86                        let mut scheme = if config_new.https {
87                            match acceptor_new.accept(stream.try_clone().unwrap()) {
88                                Ok(e) => Scheme::Https(Arc::new(Mutex::new(e)), client_ip.clone()),
89                                Err(_) => return Err(Error::other("加载加密请求失败")),
90                            }
91                        } else {
92                            Scheme::Http(Arc::new(Mutex::new(stream.try_clone().unwrap())), client_ip.clone())
93                        };
94
95                        let mut request = Request::default(config_new.clone(), handle_time);
96                        let mut response = Response::default(config_new.clone(), Arc::new(Mutex::new(scheme.clone())));
97
98                        let mut bytes = vec![];
99                        //  请求行处理
100                        scheme.request_line(&mut bytes, &mut request)?;
101
102                        match request.protocol {
103                            Protocol::HTTP1_0 | Protocol::HTTP1_1 => {
104                                scheme.request_header(&mut bytes, &mut request)?;
105                                scheme.request_body(&mut bytes, &mut request)?;
106                                request.handle(&server_ip, &client_ip);
107
108                                response.set_request(&mut request);
109
110                                match request.upgrade {
111                                    Upgrade::Websocket => {
112                                        let mut websocket = Websocket::new(request.clone(), response.clone());
113                                        if let Ok(()) = websocket.handle(factory) {};
114                                        if USERS.get(&websocket.key.to_string()).is_some() {
115                                            USERS.remove(&websocket.key);
116                                        }
117                                        if response.config.debug {
118                                            let len = USERS.len();
119                                            info!("退出: {}", len);
120                                        }
121                                    }
122                                    Upgrade::Http => {
123                                        match request.method {
124                                            Method::OPTIONS => {
125                                                let websocket = Websocket::new(request.clone(), response.clone());
126                                                factory(websocket).on_options(&mut response);
127                                                response.send()?;
128                                            }
129                                            _ => {
130                                                match request.read_resource() {
131                                                    Ok(e) => response.status(200).file(&e).send()?,
132                                                    Err(_) => {
133                                                        let websocket = Websocket::new(request.clone(), response.clone());
134                                                        let mut factory_new = factory(websocket);
135                                                        factory_new.on_request(request.clone(), &mut response);
136                                                        factory_new.on_response(request.clone(), &mut response);
137                                                        response.send()?;
138                                                    }
139                                                }
140                                            }
141                                        }
142                                    }
143                                    Upgrade::None => {
144                                        response.status(404).send()?;
145                                    }
146                                }
147                            }
148                            Protocol::HTTP2 => {
149                                let websocket = Websocket::new(request.clone(), response.clone());
150                                scheme.http2_handle(&mut bytes, &mut request, &mut response, &server_ip, &client_ip)?;
151                                let mut factory_new = factory(websocket);
152                                factory_new.on_request(request.clone(), &mut response);
153                                factory_new.on_response(request.clone(), &mut response);
154                                scheme.send_http2(&mut response)?;
155                                //loop {
156                                //    scheme.http2_handle(&mut bytes, &mut request, &mut response, &server_ip, &client_ip)?;
157                                //    let mut factory_new = factory(websocket);
158                                //    factory_new.on_request(request.clone(), &mut response);
159                                //    factory_new.on_response(request.clone(), &mut response);
160                                //    scheme.send_http2(&mut response)?;
161                                //    break;
162                                //}
163                            }
164                            Protocol::None => {
165                                return Err(Error::other("未知请求"));
166                            }
167                        }
168
169                        // 日志记录
170                        match request.save_log() {
171                            Ok(_) => {}
172                            Err(_) => {
173                                error!("日志记录错误");
174                            }
175                        };
176                        Ok(())
177                    });
178                }
179                Err(e) => return Err(e),
180            }
181        }
182        Ok(())
183    }
184
185    fn ssl(config: &Config) -> io::Result<Arc<SslAcceptor>> {
186        if config.https {
187            let mut acceptor = SslAcceptor::mozilla_intermediate(SslMethod::tls())?;
188            if !config.tls.key.is_file() {
189                return Err(Error::other(
190                    format!("private.key 不存在: {:?}", config.tls.key).as_str(),
191                ));
192            }
193            if !config.tls.certs.is_file() {
194                return Err(Error::other(
195                    format!("certificate.pem 不存在: {:?}", config.tls.certs).as_str(),
196                ));
197            }
198            acceptor.set_private_key_file(config.tls.key.clone(), SslFiletype::PEM)?;
199            acceptor.set_certificate_file(config.tls.certs.clone(), SslFiletype::PEM)?;
200            Ok(Arc::new(acceptor.build()))
201        } else {
202            Ok(Arc::new(SslAcceptor::mozilla_intermediate(SslMethod::tls())?.build()))
203        }
204    }
205}
206
207pub trait HandlerClone {
208    fn clone_box(&self) -> Box<dyn Handler>;
209}
210
211// 实现 HandlerClone for 所有 Handler + Clone 的实现者
212impl<T> HandlerClone for T
213where
214    T: 'static + Handler + Clone,
215{
216    fn clone_box(&self) -> Box<dyn Handler> {
217        Box::new(self.clone())
218    }
219}
220
221// 为 dyn Handler 实现 Clone
222impl Clone for Box<dyn Handler> {
223    fn clone(&self) -> Box<dyn Handler> {
224        self.clone_box()
225    }
226}
227pub trait Handler: Send + Sync + HandlerClone + Debug {
228    /// 请求 处理
229    fn on_request(&mut self, _request: Request, _response: &mut Response) {}
230    /// 预检请求处理 OPTIONS
231    fn on_options(&mut self, response: &mut Response) {
232        if !response.headers.has_key("Access-Control-Allow-Origin") {
233            response.header("Access-Control-Allow-Origin", "*");
234        }
235        // GET,POST,OPTIONS
236        if !response.headers.has_key("Access-Control-Allow-Methods") {
237            response.header("Access-Control-Allow-Methods", "*");
238        }
239        // Content-Type, Authorization, X-Real-IP,X-Forwarded-For
240        if !response.headers.has_key("Access-Control-Allow-Headers") {
241            response.header("Access-Control-Allow-Headers", "Content-Type,Authorization,X-Real-IP,X-Forwarded-For");
242        } else {
243            let headers = response.headers["Access-Control-Allow-Headers"].to_string();
244            response.header("Access-Control-Allow-Headers", format!("Content-Type,Authorization,X-Real-IP,X-Forwarded-For,{}", headers).as_str());
245        }
246        // 允许前端跨域请求时发送和接收 cookie / 凭据
247        if !response.headers.has_key("Access-Control-Expose-Headers") {
248            response.header("Access-Control-Expose-Headers", "true");
249        }
250        // 允许前端访问请求头
251        if !response.headers.has_key("Access-Control-Expose-Headers") {
252            response.header("Access-Control-Expose-Headers", "Content-Disposition");
253        }
254        if !response.headers.has_key("Access-Control-Max-Age") {
255            response.header("Access-Control-Max-Age", "0");
256        }
257    }
258    /// 响应 处理
259    fn on_response(&mut self, request: Request, response: &mut Response) {
260        if response.config.origin.is_empty() {
261            response.header("Access-Control-Allow-Origin", "*");
262        } else {
263            let origin = request.header["origin"].as_str().unwrap_or("");
264            for item in response.config.origin.clone() {
265                if origin.contains(item.as_str()) {
266                    response.header("Access-Control-Allow-Origin", origin);
267                }
268            }
269        }
270        if response.headers.has_key("Content-Disposition") {
271            response.header("Access-Control-Expose-Headers", "Content-Disposition");
272        }
273    }
274
275    fn on_frame(&mut self) -> io::Result<()> {
276        Ok(())
277    }
278    /// 握手监听
279    fn on_open(&mut self) -> io::Result<()> {
280        Ok(())
281    }
282    /// 接收到消息
283    fn on_message(&mut self, _msg: Message) -> io::Result<()> {
284        Ok(())
285    }
286    /// 关闭监听
287    fn on_close(&mut self, _code: CloseCode, _reason: &str) {}
288    /// 错误监听
289    fn on_error(&mut self, _err: ErrorCode) {}
290    /// 关机监听
291    fn on_shutdown(&mut self) {}
292    /// ping
293    fn on_ping(&mut self, _msg: Message) {}
294    /// pong
295    fn on_pong(&mut self, _msg: Message) {}
296}
297