1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
// Copyright (c) 2016 DWANGO Co., Ltd. All Rights Reserved.
// See the LICENSE file at the top-level directory of this distribution.
extern crate clap;
extern crate fibers;
extern crate futures;
extern crate handy_async;
extern crate httparse;
use clap::{App, Arg};
use fibers::{Executor, Spawn, ThreadPoolExecutor};
use futures::{Future, Stream};
use handy_async::io::{ReadFrom, WriteInto};
use handy_async::pattern::{self, Branch, Pattern, Window};
use std::io;
fn main() {
let matches = App::new("http_echo_srv")
.arg(
Arg::with_name("PORT")
.short("p")
.takes_value(true)
.default_value("3000"),
)
.arg(Arg::with_name("VERBOSE").short("v"))
.get_matches();
let port = matches.value_of("PORT").unwrap();
let addr = format!("0.0.0.0:{}", port)
.parse()
.expect("Invalid TCP bind address");
let verbose = matches.is_present("VERBOSE");
let executor = ThreadPoolExecutor::new().expect("Cannot create Executor");
let handle = executor.handle();
executor.spawn(
fibers::net::TcpListener::bind(addr)
.and_then(move |listener| {
println!("# Start listening: {}: ", addr);
listener.incoming().for_each(move |(client, addr)| {
if verbose {
println!("# CONNECTED: {}", addr);
}
handle.spawn(
client
.and_then(|client| {
let read_request_pattern = pattern::read::until(|buf, _is_eos| {
// Read header
let mut headers = [httparse::EMPTY_HEADER; 16];
let mut req = httparse::Request::new(&mut headers);
let status = req.parse(buf).map_err(into_io_error)?;
if status.is_partial() {
Ok(None)
} else {
let content_len = get_content_length(req.headers)?;
let content_offset = status.unwrap();
Ok(Some((content_offset, content_len)))
}
})
.and_then(|(mut buf, (content_offset, content_len))| {
// Read content
let read_size = buf.len();
let request_len = content_offset + content_len;
buf.resize(request_len, 0);
let buf = Window::new(buf).skip(read_size);
let pattern = if read_size == request_len {
Branch::A(Ok(buf)) as Branch<_, _>
} else {
Branch::B(buf)
};
pattern.map(move |buf: Window<_>| buf.set_start(content_offset))
});
read_request_pattern.read_from(client).then(|result| {
// Write response
let (client, result) = match result {
Ok((client, content)) => (client, Ok(content)),
Err(error) => {
let (client, io_error) = error.unwrap();
(client, Err(io_error))
}
};
let pattern = Pattern::and_then(result, |content| {
format!(
"HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n",
content.as_ref().len()
)
.chain(content)
.map(|_| ())
})
.or_else(
|error: io::Error| {
let message = error.to_string();
format!(
"HTTP/1.1 500 OK\r\nContent-Length: {}\r\n\r\n",
message.len()
)
.chain(message)
.map(|_| ())
},
);
pattern.write_into(client).map_err(|e| e.into_error())
})
})
.then(move |r| {
if verbose {
println!("# Client finished: {:?}", r);
}
Ok(())
}),
);
Ok(())
})
})
.then(|r| {
println!("# Listener finished: {:?}", r);
Ok(())
}),
);
executor.run().expect("Execution failed");
}
fn get_content_length(headers: &[httparse::Header]) -> io::Result<usize> {
headers
.iter()
.find(|h| h.name.to_lowercase() == "content-length")
.map(|h| {
std::str::from_utf8(h.value)
.map_err(into_io_error)
.and_then(|s| s.parse::<usize>().map_err(into_io_error))
})
.unwrap_or_else(|| {
Err(io::Error::new(
io::ErrorKind::InvalidData,
"No content-length header",
))
})
}
fn into_io_error<E: std::error::Error + Send + Sync + 'static>(e: E) -> io::Error {
io::Error::new(io::ErrorKind::Other, Box::new(e))
}