use std::collections::HashMap;
use std::error::Error;
use std::io::Read;
use std::io::Cursor;
use std::iter::FromIterator;
use http::{StreamId, Header};
pub trait Session {
fn new_data_chunk(&mut self, stream_id: StreamId, data: &[u8]);
fn new_headers(&mut self, stream_id: StreamId, headers: Vec<Header>);
fn end_of_stream(&mut self, stream_id: StreamId);
}
pub struct StreamIter<'a, S: Stream + 'a>(Box<Iterator<Item=&'a mut S> + 'a>);
impl<'a, S> Iterator for StreamIter<'a, S> where S: Stream + 'a {
type Item = &'a mut S;
#[inline]
fn next(&mut self) -> Option<&'a mut S> { self.0.next() }
}
pub trait SessionState {
type Stream: Stream;
fn insert_stream(&mut self, stream: Self::Stream);
fn get_stream_ref(&self, stream_id: StreamId) -> Option<&Self::Stream>;
fn get_stream_mut(&mut self, stream_id: StreamId) -> Option<&mut Self::Stream>;
fn remove_stream(&mut self, stream_id: StreamId) -> Option<Self::Stream>;
fn iter(&mut self) -> StreamIter<Self::Stream>;
fn get_closed(&mut self) -> Vec<Self::Stream> {
let ids: Vec<_> = self.iter()
.filter_map(|s| {
if s.is_closed() { Some(s.id()) } else { None }
})
.collect();
FromIterator::from_iter(ids.into_iter().map(|i| self.remove_stream(i).unwrap()))
}
}
pub struct DefaultSessionState<S> where S: Stream {
streams: HashMap<StreamId, S>,
}
impl<S> DefaultSessionState<S> where S: Stream {
pub fn new() -> DefaultSessionState<S> {
DefaultSessionState {
streams: HashMap::new(),
}
}
}
impl<S> SessionState for DefaultSessionState<S> where S: Stream {
type Stream = S;
#[inline]
fn insert_stream(&mut self, stream: Self::Stream) {
self.streams.insert(stream.id(), stream);
}
#[inline]
fn get_stream_ref(&self, stream_id: StreamId) -> Option<&Self::Stream> {
self.streams.get(&stream_id)
}
#[inline]
fn get_stream_mut(&mut self, stream_id: StreamId) -> Option<&mut Self::Stream> {
self.streams.get_mut(&stream_id)
}
#[inline]
fn remove_stream(&mut self, stream_id: StreamId) -> Option<Self::Stream> {
self.streams.remove(&stream_id)
}
#[inline]
fn iter(&mut self) -> StreamIter<S> {
StreamIter(Box::new(self.streams.iter_mut().map(|(_, s)| s)))
}
}
#[derive(Clone, Copy, PartialEq, Debug)]
pub enum StreamState {
Idle,
ReservedLocal,
ReservedRemote,
Open,
HalfClosedRemote,
HalfClosedLocal,
Closed,
}
#[derive(Debug)]
pub enum StreamDataError {
Closed,
Other(Box<Error + Send + Sync>),
}
impl<E> From<E> for StreamDataError where E: Error + Send + Sync + 'static {
fn from(err: E) -> StreamDataError { StreamDataError::Other(Box::new(err)) }
}
#[derive(Clone, Copy, PartialEq, Debug)]
pub enum StreamDataChunk {
Chunk(usize),
Last(usize),
Unavailable,
}
pub trait Stream {
fn new(stream_id: StreamId) -> Self;
fn new_data_chunk(&mut self, data: &[u8]);
fn set_headers(&mut self, headers: Vec<Header>);
fn set_state(&mut self, state: StreamState);
fn get_data_chunk(&mut self, buf: &mut [u8]) -> Result<StreamDataChunk, StreamDataError>;
fn id(&self) -> StreamId;
fn state(&self) -> StreamState;
fn close(&mut self) { self.set_state(StreamState::Closed); }
fn close_local(&mut self) {
let next = match self.state() {
StreamState::HalfClosedRemote => StreamState::Closed,
_ => StreamState::HalfClosedLocal,
};
self.set_state(next);
}
fn close_remote(&mut self) {
let next = match self.state() {
StreamState::HalfClosedLocal => StreamState::Closed,
_ => StreamState::HalfClosedRemote,
};
self.set_state(next);
}
fn is_closed(&self) -> bool { self.state() == StreamState::Closed }
fn is_closed_local(&self) -> bool {
match self.state() {
StreamState::HalfClosedLocal | StreamState::Closed => true,
_ => false,
}
}
fn is_closed_remote(&self) -> bool {
match self.state() {
StreamState::HalfClosedRemote | StreamState::Closed => true,
_ => false,
}
}
}
#[derive(Clone)]
pub struct DefaultStream {
pub stream_id: StreamId,
pub headers: Option<Vec<Header>>,
pub body: Vec<u8>,
pub state: StreamState,
data: Option<Cursor<Vec<u8>>>,
}
impl DefaultStream {
pub fn new(stream_id: StreamId) -> DefaultStream {
DefaultStream {
stream_id: stream_id,
headers: None,
body: Vec::new(),
state: StreamState::Open,
data: None,
}
}
#[inline]
pub fn set_full_data(&mut self, data: Vec<u8>) {
self.data = Some(Cursor::new(data));
}
}
impl Stream for DefaultStream {
fn new(stream_id: StreamId) -> DefaultStream {
DefaultStream::new(stream_id)
}
fn new_data_chunk(&mut self, data: &[u8]) {
self.body.extend(data.to_vec().into_iter());
}
fn set_headers(&mut self, headers: Vec<Header>) {
self.headers = Some(headers);
}
fn set_state(&mut self, state: StreamState) { self.state = state; }
fn id(&self) -> StreamId {
self.stream_id
}
fn state(&self) -> StreamState { self.state }
fn get_data_chunk(&mut self, buf: &mut [u8]) -> Result<StreamDataChunk, StreamDataError> {
if self.is_closed_local() {
return Err(StreamDataError::Closed);
}
let chunk = match self.data.as_mut() {
None => StreamDataChunk::Unavailable,
Some(d) => {
let read = d.read(buf).unwrap();
if (d.position() as usize) == d.get_ref().len() {
StreamDataChunk::Last(read)
} else {
StreamDataChunk::Chunk(read)
}
}
};
match chunk {
StreamDataChunk::Last(_) => self.close_local(),
_ => {},
};
Ok(chunk)
}
}
#[cfg(test)]
mod tests {
use super::{
Stream,
DefaultSessionState,
DefaultStream,
StreamDataChunk, StreamDataError,
SessionState,
};
use http::tests::common::TestStream;
#[test]
fn test_default_session_state() {
fn new_mock_state() -> DefaultSessionState<TestStream> { DefaultSessionState::new() }
{
let mut state = new_mock_state();
state.insert_stream(Stream::new(1));
assert_eq!(state.get_stream_ref(1).unwrap().id(), 1);
}
{
let mut state = new_mock_state();
state.insert_stream(Stream::new(101));
let stream = state.remove_stream(101).unwrap();
assert_eq!(101, stream.id());
}
{
let mut state = new_mock_state();
state.insert_stream(Stream::new(1));
assert!(state.get_stream_ref(3).is_none());
}
{
let mut state = new_mock_state();
state.insert_stream(Stream::new(1));
state.insert_stream(Stream::new(7));
state.insert_stream(Stream::new(3));
let mut streams: Vec<_> = state.iter().collect();
streams.sort_by(|s1, s2| s1.id().cmp(&s2.id()));
assert_eq!(vec![1, 3, 7], streams.into_iter().map(|s| s.id()).collect::<Vec<_>>());
}
{
let mut state = new_mock_state();
assert_eq!(state.iter().collect::<Vec<_>>().len(), 0);
}
{
let mut state = new_mock_state();
state.insert_stream(Stream::new(1));
state.insert_stream(Stream::new(7));
state.insert_stream(Stream::new(3));
state.get_stream_mut(1).unwrap().close();
state.get_stream_mut(7).unwrap().close();
let mut closed = state.get_closed();
assert_eq!(state.streams.len(), 1);
assert_eq!(closed.len(), 2);
closed.sort_by(|s1, s2| s1.id().cmp(&s2.id()));
assert_eq!(vec![1, 7], closed.into_iter().map(|s| s.id()).collect::<Vec<_>>());
}
}
#[test]
fn test_default_stream_get_data() {
let mut buf = Vec::with_capacity(2);
unsafe { buf.set_len(2); }
{
let mut stream = DefaultStream::new(1);
let res = stream.get_data_chunk(&mut buf).ok().unwrap();
assert_eq!(res, StreamDataChunk::Unavailable);
}
{
let mut stream = DefaultStream::new(1);
stream.close();
let res = stream.get_data_chunk(&mut buf).err().unwrap();
assert!(match res {
StreamDataError::Closed => true,
_ => false,
});
}
{
let mut stream = DefaultStream::new(1);
stream.close_local();
let res = stream.get_data_chunk(&mut buf).err().unwrap();
assert!(match res {
StreamDataError::Closed => true,
_ => false,
});
}
{
let mut stream = DefaultStream::new(1);
stream.set_full_data(vec![1, 2, 3, 4]);
let res = stream.get_data_chunk(&mut buf).ok().unwrap();
assert_eq!(res, StreamDataChunk::Chunk(2));
assert_eq!(buf, vec![1, 2]);
let res = stream.get_data_chunk(&mut buf).ok().unwrap();
assert_eq!(res, StreamDataChunk::Last(2));
assert_eq!(buf, vec![3, 4]);
let res = stream.get_data_chunk(&mut buf).err().unwrap();
assert!(match res {
StreamDataError::Closed => true,
_ => false,
});
}
{
let mut stream = DefaultStream::new(1);
stream.set_full_data(vec![1, 2, 3, 4, 5]);
let res = stream.get_data_chunk(&mut buf).ok().unwrap();
assert_eq!(res, StreamDataChunk::Chunk(2));
assert_eq!(buf, vec![1, 2]);
let res = stream.get_data_chunk(&mut buf).ok().unwrap();
assert_eq!(res, StreamDataChunk::Chunk(2));
assert_eq!(buf, vec![3, 4]);
let res = stream.get_data_chunk(&mut buf).ok().unwrap();
assert_eq!(res, StreamDataChunk::Last(1));
assert_eq!(&buf[..1], &vec![5][..]);
}
{
let mut stream = DefaultStream::new(1);
stream.set_full_data(vec![]);
let res = stream.get_data_chunk(&mut buf).ok().unwrap();
assert_eq!(res, StreamDataChunk::Last(0));
}
}
}