1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
use std::io::{Write, Result, Error};
use std::sync::atomic::{AtomicUsize, Ordering};
use https::{status::StatusCode,
header::{CONTENT_LENGTH}};
use flate2::{Compression, write::GzEncoder};
use bytes::BufMut;
use tcp::{Socket, SocketHandle, SocketEvent,
utils::{SocketContext, Ready}};
use log::debug;
use crate::{service::HttpService,
request::HttpRequest,
response::HttpResponse,
utils::{HttpRecvResult, ContentEncode}};
use crate::service::ServiceFactory;
// Http连接唯一ID分配器
static HTTP_CONNECT_UID_ALLOCATOR: AtomicUsize = AtomicUsize::new(0);
/*
* Http连接
*/
pub struct HttpConnect<S: Socket, H: ServiceFactory<S, Service = HS>, HS: HttpService<S>> {
uid: usize, //唯一ID
handle: SocketHandle<S>, //当前连接的Tcp连接句柄
host: H, //当前连接对应的主机
service: HS, //当前连接的服务
keep_alive: usize, //连接保持时间
}
unsafe impl<S: Socket, H: ServiceFactory<S, Service = HS>, HS: HttpService<S, >> Send for HttpConnect<S, H, HS> {}
unsafe impl<S: Socket, H: ServiceFactory<S, Service = HS>, HS: HttpService<S>> Sync for HttpConnect<S, H, HS> {}
impl<S: Socket, H: ServiceFactory<S, Service = HS>, HS: HttpService<S>> Clone for HttpConnect<S, H, HS> {
// 重新分配当前主机的服务,并复制其它资源
fn clone(&self) -> Self {
HttpConnect {
uid: self.uid,
handle: self.handle.clone(),
host: self.host.clone(),
service: self.get_host().new_service(),
keep_alive: self.keep_alive,
}
}
}
impl<S: Socket, H: ServiceFactory<S, Service = HS>, HS: HttpService<S>> Drop for HttpConnect<S, H, HS> {
fn drop(&mut self) {
debug!("Drop http connect, http_uid: {:?}, token: {:?}, tcp_uid: {:?}, remote: {:?}, local: {:?}, closed: {:?}",
self.uid,
self.handle.get_token(),
self.handle.get_uid(),
self.handle.get_remote(),
self.handle.get_local(),
self.handle.is_closed());
}
}
/*
* Http连接同步方法
*/
impl<S: Socket, H: ServiceFactory<S, Service = HS>, HS: HttpService<S>> HttpConnect<S, H, HS> {
/// 构建指定Tcp连接句柄、异步任务等待队列、Http服务和Http连接保持时长的Http连接
pub fn new(handle: SocketHandle<S>,
host: H,
service: HS,
keep_alive: usize) -> Self {
HttpConnect {
uid: HTTP_CONNECT_UID_ALLOCATOR.fetch_add(1, Ordering::Relaxed),
handle,
host,
service,
keep_alive,
}
}
/// 获取Http连接的唯一ID
pub fn get_uid(&self) -> usize {
self.uid
}
/// 获取Http连接对应的主机
pub fn get_host(&self) -> &H {
&self.host
}
/// 异步回应指定的Http响应
pub fn reply<B>(&self, buf: B) -> Result<()>
where B: AsRef<[u8]> + Send + 'static {
//首先回应本次Http请求
self.handle.write_ready(buf)
}
/// 更新Http连接的超时时长
pub fn update_timeout(&self) {
let mut event = SocketEvent::empty();
event.set::<usize>(self.keep_alive);
self.handle.set_timeout(self.keep_alive, event);
}
/// 抛出服务器端指定错误,将通过本次Http请求的响应返回给客户端,并关闭当前Http连接
pub fn throw(&self,
mut resp: HttpResponse,
mut error: StatusCode,
reason: Error) -> Result<()> {
if !error.is_server_error() {
//不是服务器端错误,则强制为服务器端未知错误
error = StatusCode::INTERNAL_SERVER_ERROR;
}
//设置错误原因
let body = reason.to_string().into_bytes();
let len = body.len();
//设置启始行和响应头
resp.header(CONTENT_LENGTH.as_str(), len.to_string().as_str());
resp.status(error.as_u16());
let header: Vec<u8> = resp.into();
//首先回应本次Http请求
self.reply(vec![header, body].concat())?;
//关闭当前Http连接
self.close(Err(reason))
}
/// 关闭当前Http连接
pub fn close(&self, reason: Result<()>) -> Result<()> {
//TODO 需要处理Http连接关闭时的相关问题...
self.handle.close(reason)
}
}
/*
* Http连接异步方法
*/
impl<S: Socket, H: ServiceFactory<S, Service = HS>, HS: HttpService<S>> HttpConnect<S, H, HS> {
/// 运行连接上的服务
pub async fn run_service(&mut self,
req: HttpRequest<S>) {
self.update_timeout(); //在调用服务前,更新当前Http连接的超时时长
match self.service.call(req).await {
Err(e) => {
//服务调用异常
let resp = HttpResponse::empty();
let _ = self.throw(resp, StatusCode::INTERNAL_SERVER_ERROR, e.into());
},
Ok(resp) => {
//服务调用完成,则序列化响应,并回应本次Http请求
if resp.is_stream() {
//流响应
let content_encoding = resp.get_content_encode_by_stream().clone();
let (header_buf, body) = resp.into_header_and_body();
//首先发送响应头
if let Err(e) = self.reply(header_buf) {
//发送响应头错误,则关闭连接并退出流响应,避免继续等待响应体导致资源悬挂
let _ = self.close(Err(e));
return;
} else {
//发送响应头成功
if let Some(body) = body {
//响应体存在,则异步获取响应体流
loop {
match body.next().await {
HttpRecvResult::Err(e) => {
//获取Http响应体错误,则关闭连接并退出流响应
let _ = self.close(Err(e));
break;
},
HttpRecvResult::Ok(Some((_index, part))) => {
//获取到的是Http响应体块的后继,则立即向对端发送
match encode_content(&content_encoding, part) {
Err(e) => {
//编码响应体块的后续失败,则关闭连接并退出流响应
let _ = self.close(Err(e));
break;
},
Ok(encoded) => {
//编码响应体块的后续成功
let mut buf = Vec::with_capacity(encoded.len() + 16);
buf.put((format!("{:x}", encoded.len()) + "\r\n").as_bytes());
buf.put(encoded.as_slice());
buf.put("\r\n".as_bytes());
if let Err(e) = self.reply(buf) {
//写响应体块失败,则关闭连接并退出流响应
let _ = self.close(Err(e));
break;
}
},
}
},
HttpRecvResult::Ok(None) => {
//获取到的是Http响应体块的尾部,则发送流响应结束帧,并退出循环
if let Err(e) = self.reply("0\r\n\r\n") {
let _ = self.close(Err(e));
}
break;
},
HttpRecvResult::Fin(_) => {
//获取到的是Http响应体块的尾部,则发送流响应结束帧,并退出循环
if let Err(e) = self.reply("0\r\n\r\n") {
let _ = self.close(Err(e));
}
break;
},
}
}
} else {
//响应体不存在,则立即发送流响应结束帧
if let Err(e) = self.reply("0\r\n\r\n") {
let _ = self.close(Err(e));
}
}
}
} else {
//块响应
let buf: Vec<u8> = resp.into();
if let Err(e) = self.reply(buf) {
//回应错误,则立即抛出回应异常
let resp = HttpResponse::empty();
let _ = self.throw(resp, StatusCode::INTERNAL_SERVER_ERROR, e);
}
}
},
}
}
}
// 编码Http内容
fn encode_content(encoding: &ContentEncode,
content: Vec<u8>) -> Result<Vec<u8>> {
match encoding {
ContentEncode::Gzip(level) => {
//接受gzip编码
let gzip = new_gzip(Vec::new(),
Compression::new(*level));
match encode_gzip(gzip, content.as_slice()) {
Err(e) => {
//编码错误,则立即返回错误
Err(e)
},
Ok(output) => {
//编码成功
Ok(output)
},
}
},
_ => {
//暂时不支持其它内容编码,则忽略
Ok(content)
}
}
}
// 创建指定流压缩级别的gzip编码器
fn new_gzip(writer: Vec<u8>, level: Compression) -> GzEncoder<Vec<u8>> {
GzEncoder::new(writer, level)
}
// 进行gzip编码
fn encode_gzip(mut gzip: GzEncoder<Vec<u8>>, input: &[u8]) -> Result<Vec<u8>> {
if let Err(e) = gzip.write_all(input) {
//写入失败,则返回错误
return Err(e);
}
gzip.finish()
}