diffusion 0.9.0

This is the rust implementation of diffusion library. Diffusion is an effcient message-based data distribution library.
Documentation
use crate::{Error, Reader, Result, Writer};

use std::io;
use std::result;
use std::io::{Read, Write};

const FILE_HEADER: &[u8] = b"DFSN";

/// is a reader that reads from a file.
/// This file needs to be generated by a corresponding writer.
/// Currently it does not support a growing file, e.g. files cannot be modified during read.
#[derive(Debug)]
pub struct FileReader<T>
    where T: Read
{
    file: T,
}

impl<T> FileReader<T> where T: Read {
    /// returns a new instance of `FileReader`.
    /// It returns error if the file is found corrupted.
    pub fn new(mut file: T) -> Result<FileReader<T>> {
        let mut header = vec![0u8; FILE_HEADER.len()];
        let read_length = file.read(&mut header)?;
        if read_length == FILE_HEADER.len() && &*header.into_boxed_slice() == FILE_HEADER {
            Ok(FileReader { file })
        } else {
            Err(Error::CorruptSegmentHeader)
        }
    }
}

enum ReadExactError {
    Partial(usize),
    Other(io::Error),
}

fn read_exact(read: &mut dyn Read, mut buf: &mut [u8]) -> result::Result<(), ReadExactError> {
    let mut bytes_read = 0;
    while !buf.is_empty() {
        match read.read(buf) {
            Ok(0) => break,
            Ok(n) => { let tmp = buf; buf = &mut tmp[n..]; bytes_read += n; }
            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
            Err(e) => return Err(ReadExactError::Other(e)),
        }
    }
    if !buf.is_empty() {
        Err(ReadExactError::Partial(bytes_read))
    } else {
        Ok(())
    }
}

impl<T> Reader for FileReader<T> where T: Read {
    fn read(&mut self) -> Result<Option<Vec<u8>>> {
        // Rust currently does not support constexpr.
        let mut header = [0u8; 4];
        match read_exact(&mut self.file, &mut header) {
            Ok(()) => {
            }
            Err(ReadExactError::Partial(bytes_read)) => {
                if bytes_read == 0 {
                    return Ok(None);
                } else {
                    return Err(Error::CorruptMsgHeader);
                }
            }
            Err(ReadExactError::Other(io_error)) => {
                return Err(io_error.into());
            }
        }
        let body_length_number = i32::from_le_bytes(header);
        let body_length = body_length_number as usize;
        let mut full_buffer = vec![0u8; body_length];
        match read_exact(&mut self.file, &mut full_buffer) {
            Ok(()) => {
                Ok(Some(full_buffer))
            }
            Err(ReadExactError::Partial(bytes_read)) => {
                Err(Error::InsufficientLength(body_length - bytes_read))
            }
            Err(ReadExactError::Other(io_error)) => {
                Err(io_error.into())
            }
        }
    }
}

impl<T> Iterator for FileReader<T> where T: Read {
    type Item = Result<Vec<u8>>;
    fn next(&mut self) -> Option<Result<Vec<u8>>> {
        match self.read() {
            Ok(Some(data)) => Some(Ok(data)),
            Ok(None) => None,
            Err(error) => Some(Err(error)),
        }
    }
}

/// is a writer for file.
/// It can only start to write a new file but not append to an existing file.
#[derive(Debug)]
pub struct FileWriter<T>
    where T: Write
{
    file: T,
}

impl<T> FileWriter<T> where T: Write {
    /// returns a new file writer instance.
    /// It returns error if there is IO error during the process.
    pub fn new(mut file: T) -> Result<FileWriter<T>> {
        if file.write(FILE_HEADER)? == FILE_HEADER.len() {
            Ok(FileWriter { file })
        } else {
            Err(Error::CorruptSegmentHeader)
        }
    }

    /// writes multiple buffers as one message
    /// returns `Ok(())` if write is successful.
    pub fn write_multiple(&mut self, bufs: &[&[u8]]) -> Result<()> {
        let total_length_i32 = bufs.iter().fold(0, |sum, buf| sum + buf.len()) as i32;
        let header: [u8; 4] = total_length_i32.to_le_bytes();
        self.file.write_all(&header)?;
        for buf in bufs {
            self.file.write_all(buf)?;
        }
        Ok(())
    }
}

impl<T> Writer for FileWriter<T> where T: Write {
    #[inline]
    fn write(&mut self, buf: &[u8]) -> Result<()> {
        self.write_multiple(&[buf])
    }
}