use super::Transport;
use {Error, Type};
use std::collections::VecDeque;
use std::io::prelude::*;
use std::io::Cursor;
use std::mem;
pub type PacketSize = u32;
#[derive(Clone, Debug)]
enum State
{
AwaitingSize(Vec<u8>),
AwaitingPacket {
size: PacketSize,
received_data: Vec<u8>,
},
}
#[derive(Clone, Debug)]
pub struct Simple
{
state: State,
packets: VecDeque<Vec<u8>>,
}
impl Simple
{
pub fn new() -> Self {
Simple {
state: State::AwaitingSize(Vec::new()),
packets: VecDeque::new(),
}
}
}
impl Transport for Simple
{
fn process_data(&mut self,
read: &mut Read) -> Result<(), Error> {
loop {
match self.state.clone() {
State::AwaitingSize(mut size_bytes) => {
let remaining_bytes = mem::size_of::<PacketSize>() - size_bytes.len();
let mut received_bytes = vec![0; remaining_bytes];
let bytes_read = read.read(&mut received_bytes)?;
received_bytes.drain(bytes_read..);
assert_eq!(received_bytes.len(), bytes_read);
size_bytes.extend(received_bytes.into_iter());
if size_bytes.len() == mem::size_of::<PacketSize>() {
let mut size_buffer = Cursor::new(size_bytes);
let size = PacketSize::read(&mut size_buffer).unwrap();
self.state = State::AwaitingPacket { size: size, received_data: Vec::new() }
} else {
self.state = State::AwaitingSize(size_bytes);
break;
}
},
State::AwaitingPacket { size, mut received_data } => {
let remaining_bytes = (size as usize) - received_data.len();
assert!(remaining_bytes > 0);
let mut received_bytes = vec![0; remaining_bytes];
let bytes_read = read.read(&mut received_bytes)?;
received_bytes.drain(bytes_read..);
assert_eq!(received_bytes.len(), bytes_read);
received_data.extend(received_bytes.into_iter());
assert!(received_data.len() <= (size as usize));
if (size as usize) == received_data.len() {
self.packets.push_back(received_data);
self.state = State::AwaitingSize(Vec::new());
} else {
self.state = State::AwaitingPacket { size: size, received_data: received_data };
break;
}
},
}
}
Ok(())
}
fn send_raw_packet(&mut self,
write: &mut Write,
packet: &[u8]) -> Result<(), Error> {
(packet.len() as PacketSize).write(write)?;
write.write(&packet)?;
Ok(())
}
fn receive_raw_packet(&mut self) -> Result<Option<Vec<u8>>, Error> {
Ok(self.packets.pop_front())
}
}
#[cfg(test)]
mod test
{
pub use super::Simple;
pub use std::io::Cursor;
pub use wire::stream::Transport;
describe! simple_transport {
before_each {
let data: Vec<u8> = vec![5, 4, 3, 2, 1];
let expected_data = &[
0x00, 0x00, 0x00, 0x05, 0x05, 0x04, 0x03, 0x02, 0x01, ];
let mut transport = Simple::new();
}
describe! writing {
it "serializes the data with a 32-bit length prefix and then the raw data" {
let mut buffer = Cursor::new(Vec::new());
transport.send_raw_packet(&mut buffer, &data).unwrap();
let written_data = buffer.into_inner();
assert_eq!(&written_data, &expected_data);
}
}
describe! reading {
it "successfully handles data with a 32-bit length prefix" {
let mut buffer = Cursor::new(&expected_data);
transport.process_data(&mut buffer).unwrap();
let read_data = transport.receive_raw_packet().ok().unwrap().unwrap();
assert_eq!(&read_data, &data);
}
}
}
}