use std::net::TcpStream;
use std::io;
use std::fmt;
use std::error;
use http::{HttpScheme, HttpResult, StreamId, Header, HttpError};
use http::transport::TransportStream;
use http::frame::{SettingsFrame, HttpSetting, Frame};
use http::connection::{
SendFrame, ReceiveFrame,
SendStatus,
HttpConnection,
EndStream,
};
use http::session::{
Session,
Stream, DefaultStream,
DefaultSessionState, SessionState,
};
use http::priority::SimplePrioritizer;
#[cfg(feature="tls")]
pub mod tls;
pub fn write_preface<W: io::Write>(stream: &mut W) -> Result<(), io::Error> {
let preface = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
try!(stream.write_all(preface));
let settings = {
let mut frame = SettingsFrame::new();
frame.add_setting(HttpSetting::EnablePush(0));
frame
};
try!(stream.write_all(&settings.serialize()));
debug!("Sent client preface");
Ok(())
}
pub struct ClientStream<TS: TransportStream>(pub TS, pub HttpScheme, pub String);
pub trait HttpConnectError: error::Error + Send + Sync {}
impl<E> From<E> for HttpError where E: HttpConnectError + 'static {
fn from(e: E) -> HttpError { HttpError::Other(Box::new(e)) }
}
pub trait HttpConnect {
type Stream: TransportStream;
type Err: HttpConnectError + 'static;
fn connect(self) -> Result<ClientStream<Self::Stream>, Self::Err>;
}
pub struct CleartextConnector<'a> {
pub host: &'a str,
pub port: u16,
}
impl<'a> CleartextConnector<'a> {
pub fn new(host: &'a str) -> CleartextConnector {
CleartextConnector { host: host, port: 80 }
}
pub fn with_port(host: &'a str, port: u16) -> CleartextConnector {
CleartextConnector { host: host, port: port }
}
}
#[derive(Debug)]
pub struct CleartextConnectError(io::Error);
impl fmt::Display for CleartextConnectError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "Cleartext HTTP/2 connect error: {}", (self as &error::Error).description())
}
}
impl error::Error for CleartextConnectError {
fn description(&self) -> &str {
self.0.description()
}
fn cause(&self) -> Option<&error::Error> {
self.0.cause()
}
}
impl From<io::Error> for CleartextConnectError {
fn from(e: io::Error) -> CleartextConnectError { CleartextConnectError(e) }
}
impl HttpConnectError for CleartextConnectError {}
impl<'a> HttpConnect for CleartextConnector<'a> {
type Stream = TcpStream;
type Err = CleartextConnectError;
fn connect(self) -> Result<ClientStream<TcpStream>, CleartextConnectError> {
let mut stream = try!(TcpStream::connect((self.host, self.port)));
try!(write_preface(&mut stream));
Ok(ClientStream(stream, HttpScheme::Http, self.host.into()))
}
}
pub struct RequestStream<S> where S: Stream {
pub headers: Vec<Header>,
pub stream: S,
}
pub struct ClientConnection<S, R, State=DefaultSessionState<DefaultStream>>
where S: SendFrame, R: ReceiveFrame, State: SessionState {
conn: HttpConnection<S, R>,
pub state: State,
}
impl<S, R, State> ClientConnection<S, R, State>
where S: SendFrame, R: ReceiveFrame, State: SessionState {
pub fn with_connection(conn: HttpConnection<S, R>, state: State)
-> ClientConnection<S, R, State> {
ClientConnection {
conn: conn,
state: state,
}
}
#[inline]
pub fn scheme(&self) -> HttpScheme {
self.conn.scheme
}
pub fn init(&mut self) -> HttpResult<()> {
try!(self.read_preface());
Ok(())
}
fn read_preface(&mut self) -> HttpResult<()> {
let mut session = ClientSession::new(&mut self.state);
self.conn.expect_settings(&mut session)
}
pub fn start_request(&mut self, req: RequestStream<State::Stream>) -> HttpResult<()> {
let end_stream = if req.stream.is_closed_local() { EndStream::Yes } else { EndStream::No };
try!(self.conn.send_headers(req.headers, req.stream.id(), end_stream));
self.state.insert_stream(req.stream);
Ok(())
}
#[inline]
pub fn handle_next_frame(&mut self) -> HttpResult<()> {
let mut session = ClientSession::new(&mut self.state);
self.conn.handle_next_frame(&mut session)
}
pub fn send_next_data(&mut self) -> HttpResult<SendStatus> {
debug!("Sending next data...");
const MAX_CHUNK_SIZE: usize = 8 * 1024;
let mut buf = [0; MAX_CHUNK_SIZE];
let mut prioritizer = SimplePrioritizer::new(&mut self.state, &mut buf);
self.conn.send_next_data(&mut prioritizer)
}
}
pub struct ClientSession<'a, State> where State: SessionState + 'a {
state: &'a mut State,
}
impl<'a, State> ClientSession<'a, State> where State: SessionState + 'a {
#[inline]
pub fn new(state: &'a mut State) -> ClientSession<State> {
ClientSession {
state: state,
}
}
}
impl<'a, State> Session for ClientSession<'a, State> where State: SessionState + 'a {
fn new_data_chunk(&mut self, stream_id: StreamId, data: &[u8]) {
debug!("Data chunk for stream {}", stream_id);
let mut stream = match self.state.get_stream_mut(stream_id) {
None => {
debug!("Received a frame for an unknown stream!");
return;
},
Some(stream) => stream,
};
stream.new_data_chunk(data);
}
fn new_headers(&mut self, stream_id: StreamId, headers: Vec<Header>) {
debug!("Headers for stream {}", stream_id);
let mut stream = match self.state.get_stream_mut(stream_id) {
None => {
debug!("Received a frame for an unknown stream!");
return;
},
Some(stream) => stream,
};
stream.set_headers(headers);
}
fn end_of_stream(&mut self, stream_id: StreamId) {
debug!("End of stream {}", stream_id);
let mut stream = match self.state.get_stream_mut(stream_id) {
None => {
debug!("Received a frame for an unknown stream!");
return;
},
Some(stream) => stream,
};
stream.close()
}
}
#[cfg(test)]
mod tests {
use super::{
ClientSession,
write_preface,
RequestStream,
};
use std::mem;
use http::StreamId;
use http::tests::common::{
TestStream,
build_mock_client_conn,
};
use http::frame::{
SettingsFrame,
DataFrame,
RawFrame,
Frame,
unpack_header,
};
use http::connection::{
HttpFrame,
SendStatus,
};
use http::session::{Session, SessionState, Stream, DefaultSessionState};
#[test]
fn test_init_client_conn() {
let frames = vec![HttpFrame::SettingsFrame(SettingsFrame::new())];
let mut conn = build_mock_client_conn(frames);
conn.init().unwrap();
assert_eq!(conn.conn.receiver.recv_list.len(), 0);
let frame = match conn.conn.sender.sent.remove(0) {
HttpFrame::SettingsFrame(frame) => frame,
_ => panic!("ACK not sent!"),
};
assert!(frame.is_ack());
}
#[test]
fn test_init_client_conn_no_settings() {
let frames = vec![HttpFrame::DataFrame(DataFrame::new(1))];
let mut conn = build_mock_client_conn(frames);
assert!(conn.init().is_err());
}
fn prepare_stream(id: StreamId, data: Option<Vec<u8>>) -> TestStream {
let mut stream = TestStream::new(id);
match data {
None => stream.close_local(),
Some(d) => stream.set_outgoing(d),
};
return stream;
}
#[test]
fn test_client_conn_send_next_data() {
{
let mut conn = build_mock_client_conn(vec![]);
let res = conn.send_next_data().unwrap();
assert_eq!(res, SendStatus::Nothing);
}
{
let mut conn = build_mock_client_conn(vec![]);
conn.state.insert_stream(prepare_stream(1, None));
let res = conn.send_next_data().unwrap();
assert_eq!(res, SendStatus::Nothing);
}
{
let mut conn = build_mock_client_conn(vec![]);
conn.state.insert_stream(prepare_stream(1, Some(vec![1, 2, 3])));
let res = conn.send_next_data().unwrap();
assert_eq!(res, SendStatus::Sent);
let res = conn.send_next_data().unwrap();
assert_eq!(res, SendStatus::Nothing);
}
{
let mut conn = build_mock_client_conn(vec![]);
conn.state.insert_stream(prepare_stream(1, Some(vec![1, 2, 3])));
conn.state.insert_stream(prepare_stream(3, Some(vec![1, 2, 3])));
conn.state.insert_stream(prepare_stream(5, Some(vec![1, 2, 3])));
for _ in 0..3 {
let res = conn.send_next_data().unwrap();
assert_eq!(res, SendStatus::Sent);
}
let res = conn.send_next_data().unwrap();
assert_eq!(res, SendStatus::Nothing);
}
}
#[test]
fn test_client_conn_start_request() {
{
let mut conn = build_mock_client_conn(vec![]);
conn.start_request(RequestStream {
headers: vec![
(b":method".to_vec(), b"GET".to_vec()),
],
stream: prepare_stream(1, None),
}).unwrap();
assert!(conn.state.get_stream_ref(1).is_some());
assert_eq!(conn.conn.sender.sent.len(), 1);
match conn.conn.sender.sent[0] {
HttpFrame::HeadersFrame(ref frame) => {
assert!(frame.is_end_of_stream());
},
_ => panic!("Expected a Headers frame"),
};
}
{
let mut conn = build_mock_client_conn(vec![]);
conn.start_request(RequestStream {
headers: vec![
(b":method".to_vec(), b"POST".to_vec()),
],
stream: prepare_stream(1, Some(vec![1, 2, 3])),
}).unwrap();
assert!(conn.state.get_stream_ref(1).is_some());
assert_eq!(conn.conn.sender.sent.len(), 1);
match conn.conn.sender.sent[0] {
HttpFrame::HeadersFrame(ref frame) => {
assert!(!frame.is_end_of_stream());
},
_ => panic!("Expected a Headers frame"),
};
}
}
#[test]
fn test_client_session_notifies_stream() {
let mut state = DefaultSessionState::<TestStream>::new();
state.insert_stream(Stream::new(1));
{
let mut session = ClientSession::new(&mut state);
session.new_data_chunk(1, &[1, 2, 3]);
}
assert_eq!(state.get_stream_ref(1).unwrap().body, vec![1, 2, 3]);
{
let mut session = ClientSession::new(&mut state);
session.new_data_chunk(1, &[4]);
}
assert_eq!(state.get_stream_ref(1).unwrap().body, vec![1, 2, 3, 4]);
let headers = vec![(b":method".to_vec(), b"GET".to_vec())];
{
let mut session = ClientSession::new(&mut state);
session.new_headers(1, headers.clone());
}
assert_eq!(state.get_stream_ref(1).unwrap().headers.clone().unwrap(),
headers);
state.insert_stream(Stream::new(3));
{
let mut session = ClientSession::new(&mut state);
session.new_data_chunk(3, &[100]);
}
assert_eq!(state.get_stream_ref(3).unwrap().body, vec![100]);
{
let mut session = ClientSession::new(&mut state);
session.end_of_stream(1);
}
assert!(state.get_stream_ref(1).unwrap().is_closed());
assert!(!state.get_stream_ref(3).unwrap().is_closed());
assert_eq!(state.iter().collect::<Vec<_>>().len(), 2);
let closed = state.get_closed();
assert_eq!(closed.len(), 1);
assert_eq!(closed[0].id(), 1);
assert_eq!(state.iter().collect::<Vec<_>>().len(), 1);
}
#[test]
fn test_write_preface() {
fn get_frame_from_buf<F: Frame>(buf: &[u8]) -> (F, usize) {
let headers = unpack_header(unsafe {
assert!(buf.len() >= 9);
mem::transmute(buf.as_ptr())
});
let len = headers.0 as usize;
let raw = RawFrame::from_buf(&buf[..9 + len]).unwrap();
let frame = Frame::from_raw(raw).unwrap();
(frame, len + 9)
}
let mut written: Vec<u8> = Vec::new();
write_preface(&mut written).unwrap();
let preface = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
let frames_buf = &written[preface.len()..];
assert_eq!(preface, &written[..preface.len()]);
let (frame, _): (SettingsFrame, _) = get_frame_from_buf(frames_buf);
assert!(!frame.is_ack());
}
}