use std::collections::HashMap;
use std::sync::mpsc::{Sender, Receiver};
use std::sync::mpsc;
use std::thread;
use std::io;
use http::{StreamId, HttpError, Response, Header, HttpResult};
use http::frame::RawFrame;
use http::transport::TransportStream;
use http::connection::{SendFrame, ReceiveFrame, HttpFrame, HttpConnection};
use http::session::{SessionState, DefaultSessionState, DefaultStream, Stream};
use http::client::{ClientConnection, HttpConnect, ClientStream, RequestStream};
struct AsyncRequest {
pub method: Vec<u8>,
pub path: Vec<u8>,
pub headers: Vec<Header>,
pub body: Option<Vec<u8>>,
tx: Sender<Response>,
}
struct ChannelFrameSender<S> where S: SendFrame {
rx: Receiver<RawFrame>,
inner: S,
}
impl<S> ChannelFrameSender<S> where S: SendFrame {
fn new(inner: S) -> (ChannelFrameSender<S>, ChannelFrameSenderHandle) {
let (send, recv) = mpsc::channel();
let handle = ChannelFrameSenderHandle { tx: send };
let sender = ChannelFrameSender {
rx: recv,
inner: inner,
};
(sender, handle)
}
fn send_next(&mut self) -> HttpResult<()> {
let frame = try!(
self.rx.recv()
.map_err(|_| {
io::Error::new(io::ErrorKind::Other, "Unable to send frame")
})
);
debug!("Performing the actual send frame IO");
self.inner.send_raw_frame(frame)
}
}
struct ChannelFrameSenderHandle {
tx: Sender<RawFrame>,
}
impl SendFrame for ChannelFrameSenderHandle {
fn send_raw_frame(&mut self, frame: RawFrame) -> HttpResult<()> {
try!(self.tx.send(frame)
.map_err(|_| {
io::Error::new(io::ErrorKind::Other, "Unable to send frame")
}));
debug!("Queued the frame for sending...");
Ok(())
}
}
struct ChannelFrameReceiver<R> where R: ReceiveFrame {
tx: Sender<HttpFrame>,
inner: R,
}
impl<R> ChannelFrameReceiver<R> where R: ReceiveFrame {
fn new(inner: R) -> (ChannelFrameReceiver<R>, ChannelFrameReceiverHandle) {
let (send, recv) = mpsc::channel();
let handle = ChannelFrameReceiverHandle { rx: recv };
let receiver = ChannelFrameReceiver {
tx: send,
inner: inner,
};
(receiver, handle)
}
fn read_next(&mut self) -> HttpResult<()> {
let frame = try!(self.inner.recv_frame());
try!(self.tx.send(frame)
.map_err(|_| {
io::Error::new(io::ErrorKind::Other, "Unable to read frame")
}));
Ok(())
}
}
struct ChannelFrameReceiverHandle {
rx: Receiver<HttpFrame>,
}
impl ReceiveFrame for ChannelFrameReceiverHandle {
fn recv_frame(&mut self) -> HttpResult<HttpFrame> {
self.rx.recv()
.map_err(|_| {
HttpError::from(io::Error::new(io::ErrorKind::Other, "Unable to read frame"))
})
}
}
enum ClientServiceErr {
Done,
Http(HttpError),
}
impl From<HttpError> for ClientServiceErr {
fn from(err: HttpError) -> ClientServiceErr { ClientServiceErr::Http(err) }
}
enum WorkItem {
Request(AsyncRequest),
HandleFrame,
SendData,
NewClient,
ClientLeft,
}
struct ClientService {
next_stream_id: StreamId,
outstanding_reqs: u32,
limit: u32,
conn: ClientConnection<ChannelFrameSenderHandle, ChannelFrameReceiverHandle>,
chans: HashMap<StreamId, Sender<Response>>,
work_queue: Receiver<WorkItem>,
request_queue: Vec<AsyncRequest>,
client_count: i32,
host: Vec<u8>,
initialized: bool,
}
struct Service<S>(
ClientService,
Sender<WorkItem>,
ChannelFrameReceiver<S>,
ChannelFrameSender<S>) where S: TransportStream;
impl ClientService {
pub fn new<S>(client_stream: ClientStream<S>) -> Option<Service<S>>
where S: TransportStream {
let (tx, rx): (Sender<WorkItem>, Receiver<WorkItem>) =
mpsc::channel();
let ClientStream(stream, scheme, host) = client_stream;
let sender = stream.try_split().unwrap();
let receiver = stream;
let (recv_frame, recv_handle) = ChannelFrameReceiver::new(receiver);
let (send_frame, send_handle) = ChannelFrameSender::new(sender);
let conn = ClientConnection::with_connection(
HttpConnection::new(
send_handle,
recv_handle,
scheme),
DefaultSessionState::new());
let service = ClientService {
next_stream_id: 1,
outstanding_reqs: 0,
limit: 3,
conn: conn,
chans: HashMap::new(),
work_queue: rx,
request_queue: Vec::new(),
client_count: 0,
host: host.as_bytes().to_vec(),
initialized: false,
};
Some(Service(service, tx, recv_frame, send_frame))
}
pub fn run_once(&mut self) -> Result<(), ClientServiceErr> {
let work_item = match self.work_queue.recv() {
Ok(item) => item,
Err(_) => return Err(ClientServiceErr::Done),
};
match work_item {
WorkItem::Request(async_req) => {
debug!("Queuing request");
self.request_queue.push(async_req);
self.queue_next_request();
Ok(())
},
WorkItem::HandleFrame => {
if !self.initialized {
try!(self.conn.init());
self.initialized = true;
Ok(())
} else {
self.handle_frame()
}
},
WorkItem::SendData => {
debug!("Will queue some request data");
try!(self.conn.send_next_data());
Ok(())
}
WorkItem::NewClient => {
self.client_count += 1;
Ok(())
},
WorkItem::ClientLeft => {
self.client_count -= 1;
if self.client_count == 0 {
Err(ClientServiceErr::Done)
} else {
Ok(())
}
}
}
}
fn handle_frame(&mut self) -> Result<(), ClientServiceErr> {
debug!("Handling next frame");
try!(self.conn.handle_next_frame());
self.handle_closed();
self.queue_next_request();
Ok(())
}
fn send_request(&mut self, async_req: AsyncRequest) {
let (req, tx) = self.create_request(async_req);
debug!("Sending new request... id = {}", req.stream.id());
self.chans.insert(req.stream.id(), tx);
self.conn.start_request(req).ok().unwrap();
self.outstanding_reqs += 1;
}
fn create_request(&mut self, async_req: AsyncRequest)
-> (RequestStream<DefaultStream>, Sender<Response>) {
let mut headers: Vec<Header> = Vec::new();
headers.extend(vec![
(b":method".to_vec(), async_req.method),
(b":path".to_vec(), async_req.path),
(b":authority".to_vec(), self.host.clone()),
(b":scheme".to_vec(), self.conn.scheme().as_bytes().to_vec()),
].into_iter());
headers.extend(async_req.headers.into_iter());
let mut stream = DefaultStream::new(self.next_stream_id);
self.next_stream_id += 2;
match async_req.body {
Some(body) => stream.set_full_data(body),
None => stream.close_local(),
};
(
RequestStream {
stream: stream,
headers: headers,
},
async_req.tx
)
}
fn send_response(&mut self, stream: DefaultStream) {
match self.chans.remove(&stream.stream_id) {
None => {
panic!("Received a response for an unknown request!");
},
Some(tx) => {
let _ = tx.send(Response {
stream_id: stream.stream_id,
headers: stream.headers.unwrap(),
body: stream.body,
});
}
};
}
fn handle_closed(&mut self) {
let done = self.conn.state.get_closed();
for stream in done {
self.send_response(stream);
self.outstanding_reqs -= 1;
}
}
fn queue_next_request(&mut self) {
if self.outstanding_reqs < self.limit {
debug!("Not over the limit yet. Checking for more requests...");
if self.request_queue.len() > 0 {
let async_req = self.request_queue.remove(0);
self.send_request(async_req);
}
}
}
}
pub struct Client {
sender: Sender<WorkItem>,
}
impl Clone for Client {
fn clone(&self) -> Client {
self.sender.send(WorkItem::NewClient).unwrap();
Client {
sender: self.sender.clone(),
}
}
}
impl Drop for Client {
fn drop(&mut self) {
let _ = self.sender.send(WorkItem::ClientLeft);
}
}
impl Client {
pub fn with_connector<C, S>(connector: C) -> Option<Client>
where C: HttpConnect<Stream=S>, S: TransportStream + Send + 'static {
let client_stream = connector.connect().ok().unwrap();
let mut sck = client_stream.0.try_split().unwrap();
let service = match ClientService::new(client_stream) {
Some(service) => service,
None => return None,
};
let Service(mut service, rx, mut recv_frame, mut send_frame) = service;
if let Err(_) = rx.send(WorkItem::NewClient) {
return None;
}
let read_notify = rx.clone();
let sender_work_queue = rx.clone();
thread::spawn(move || {
while let Ok(_) = service.run_once() {}
debug!("Service thread halting");
sck.close().unwrap();
});
thread::spawn(move || {
while let Ok(_) = send_frame.send_next() {
sender_work_queue.send(WorkItem::SendData).unwrap();
}
debug!("Sender thread halting");
});
thread::spawn(move || {
while let Ok(_) = recv_frame.read_next() {
read_notify.send(WorkItem::HandleFrame).unwrap();
}
debug!("Reader thread halting");
});
Some(Client {
sender: rx,
})
}
pub fn request(&self, method: &[u8], path: &[u8], headers: &[Header], body: Option<Vec<u8>>)
-> Option<Receiver<Response>> {
let (resp_tx, resp_rx): (Sender<Response>, Receiver<Response>) =
mpsc::channel();
let res = self.sender.send(WorkItem::Request(AsyncRequest {
method: method.to_vec(),
path: path.to_vec(),
headers: headers.to_vec(),
body: body,
tx: resp_tx,
}));
match res {
Ok(_) => Some(resp_rx),
Err(_) => None,
}
}
pub fn get(&self, path: &[u8], headers: &[Header]) -> Option<Receiver<Response>> {
self.request(b"GET", path, headers, None)
}
pub fn post(&self, path: &[u8], headers: &[Header], body: Vec<u8>)
-> Option<Receiver<Response>> {
self.request(b"POST", path, headers, Some(body))
}
}