use http::{HttpResult, HttpError};
use http::connection::{
DataChunk,
EndStream,
};
use http::session::{
SessionState,
StreamDataChunk, StreamDataError,
Stream,
};
pub trait DataPrioritizer {
fn get_next_chunk(&mut self) -> HttpResult<Option<DataChunk>>;
}
pub struct SimplePrioritizer<'a, 'b, State> where State: SessionState + 'a {
state: &'a mut State,
buf: &'b mut [u8],
}
impl<'a, 'b, State> SimplePrioritizer<'a, 'b, State> where State: SessionState +'a {
pub fn new(state: &'a mut State, buf: &'b mut [u8]) -> SimplePrioritizer<'a, 'b, State> {
SimplePrioritizer {
state: state,
buf: buf,
}
}
}
impl<'a, 'b, State> DataPrioritizer for SimplePrioritizer<'a, 'b, State>
where State: SessionState +'a {
fn get_next_chunk(&mut self) -> HttpResult<Option<DataChunk>> {
for stream in self.state.iter().filter(|s| !s.is_closed_local()) {
let res = stream.get_data_chunk(self.buf);
match res {
Ok(StreamDataChunk::Last(total)) => {
return Ok(Some(DataChunk::new_borrowed(
&self.buf[..total], stream.id(), EndStream::Yes)));
},
Ok(StreamDataChunk::Chunk(total)) => {
return Ok(Some(DataChunk::new_borrowed(
&self.buf[..total], stream.id(), EndStream::No)));
},
Ok(StreamDataChunk::Unavailable) => {
},
Err(StreamDataError::Closed) => {
stream.close_local();
},
Err(StreamDataError::Other(e)) => {
return Err(HttpError::Other(e));
},
};
}
Ok(None)
}
}
#[cfg(test)]
mod tests {
use super::{DataPrioritizer, SimplePrioritizer};
use http::session::{DefaultSessionState, SessionState, Stream};
use http::tests::common::TestStream;
#[test]
fn test_simple_prioritizer() {
fn prepare_state() -> DefaultSessionState<TestStream> {
DefaultSessionState::new()
}
{
let mut buf = [0; 5];
let mut state = prepare_state();
let mut prioritizer = SimplePrioritizer::new(&mut state, &mut buf);
let chunk = prioritizer.get_next_chunk().unwrap();
assert!(chunk.is_none());
}
{
let mut buf = [0; 5];
let mut state = prepare_state();
let mut stream = TestStream::new(1);
stream.set_outgoing(vec![1, 2, 3]);
state.insert_stream(stream);
let mut prioritizer = SimplePrioritizer::new(&mut state, &mut buf);
{
let chunk = prioritizer.get_next_chunk().unwrap().unwrap();
assert_eq!(chunk.data, vec![1, 2, 3]);
}
assert!(prioritizer.get_next_chunk().unwrap().is_none());
}
{
let mut buf = [0; 2];
let mut state = prepare_state();
let mut stream = TestStream::new(1);
stream.set_outgoing(vec![1, 2, 3]);
state.insert_stream(stream);
let mut prioritizer = SimplePrioritizer::new(&mut state, &mut buf);
{
let chunk = prioritizer.get_next_chunk().unwrap().unwrap();
assert_eq!(chunk.data, vec![1, 2]);
}
{
let chunk = prioritizer.get_next_chunk().unwrap().unwrap();
assert_eq!(chunk.data, vec![3]);
}
assert!(prioritizer.get_next_chunk().unwrap().is_none());
}
{
let mut buf = [0; 10];
let mut state = prepare_state();
for id in 0..3 {
let mut stream = TestStream::new(id);
stream.set_outgoing(vec![1, 2, 3]);
state.insert_stream(stream);
}
for _ in 0..3 {
{
let mut prioritizer = SimplePrioritizer::new(&mut state, &mut buf);
let chunk = prioritizer.get_next_chunk().unwrap().unwrap();
assert_eq!(chunk.data, vec![1, 2, 3]);
}
for b in buf.iter_mut() { *b = 0; }
}
let mut prioritizer = SimplePrioritizer::new(&mut state, &mut buf);
assert!(prioritizer.get_next_chunk().unwrap().is_none());
}
}
}