rift 0.5.1

Rust Thrift library
Documentation
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::cmp;
use std::io;

/// Simple transport that contains both a fixed-length
/// internal read buffer and a fixed-length internal write
/// buffer.
///
/// On a `write` bytes are written to the internal write
/// buffer. Writes are no longer accepted once this buffer
/// is full. Callers must `empty_write_buffer` before
/// subsequent writes are accepted.
///
/// You can set readable bytes in the internal read buffer
/// by filling it with `set_readable_bytes`. Callers can
/// then read until the buffer is depleted. No further reads
/// are accepted until the internal read buffer is
/// replenished again.
pub struct TBufferTransport {
    rbuf: Box<[u8]>,
    rpos: usize,
    ridx: usize,
    rcap: usize,
    wbuf: Box<[u8]>,
    wpos: usize,
    wcap: usize,
}

impl TBufferTransport {
    /// Constructs a new, empty `TBufferTransport` with the given
    /// read buffer capacity and write buffer capacity.
    pub fn with_capacity(read_buffer_capacity: usize, write_buffer_capacity: usize) -> TBufferTransport {
        TBufferTransport {
            rbuf: vec![0; read_buffer_capacity].into_boxed_slice(),
            ridx: 0,
            rpos: 0,
            rcap: read_buffer_capacity,
            wbuf: vec![0; write_buffer_capacity].into_boxed_slice(),
            wpos: 0,
            wcap: write_buffer_capacity,
        }
    }

    /// Return a slice containing the bytes held
    /// by the internal read buffer. Returns an empty
    /// slice if no readable bytes are present.
    pub fn read_buffer(&self) -> &[u8] {
        &self.rbuf[..self.ridx]
    }

    // FIXME: do I really need this API call?
    // FIXME: should this simply reset to the last set of readable bytes?
    /// Reset the number of readable bytes to zero.
    /// Subsequent calls to `read` will return nothing.
    pub fn empty_read_buffer(&mut self) {
        self.rpos = 0;
        self.ridx = 0;
    }

    /// Copy bytes from the source buffer `buf` into
    /// the internal read buffer, overwriting any existing
    /// bytes. Returns the number of bytes copied, which is
    /// `min(buf.len(), internal_read_buf.len())`.
    pub fn set_readable_bytes(&mut self, buf: &[u8]) -> usize {
        self.empty_read_buffer();
        let max_bytes = cmp::min(self.rcap, buf.len());
        self.rbuf[..max_bytes].clone_from_slice(&buf[..max_bytes]);
        self.ridx = max_bytes;
        max_bytes
    }

    /// Return a slice containing the bytes held by
    /// the internal write buffer. Returns an empty
    /// slice if no bytes were written.
    pub fn write_buffer(&self) -> &[u8] {
        &self.wbuf[..self.wpos]
    }

    /// Resets the internal write buffer, making it
    /// seem like no bytes were written. Calling
    /// `write_buffer` after this returns an empty
    /// slice.
    pub fn empty_write_buffer(&mut self) {
        self.wpos = 0;
    }
}

impl io::Read for TBufferTransport {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        let nread = cmp::min(buf.len(), self.ridx - self.rpos);
        buf[..nread].clone_from_slice(&self.rbuf[self.rpos..self.rpos + nread]);
        self.rpos += nread;
        Ok(nread)
    }
}

impl io::Write for TBufferTransport {
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        let nwrite = cmp::min(buf.len(), self.wcap - self.wpos);
        self.wbuf[self.wpos..self.wpos + nwrite].clone_from_slice(&buf[..nwrite]);
        self.wpos += nwrite;
        Ok(nwrite)
    }

    fn flush(&mut self) -> io::Result<()> {
        Ok(()) // nothing to do on flush
    }
}

#[cfg(test)]
mod tests {
    use std::io::{Read, Write};

    use super::TBufferTransport;

    #[test]
    fn must_empty_write_buffer() {
        let mut t = TBufferTransport::with_capacity(0, 1);

        let bytes_to_write: [u8; 1] = [0x01];
        let result = t.write(&bytes_to_write);
        assert_eq!(result.unwrap(), 1);
        assert_eq!(&t.write_buffer(), &bytes_to_write);

        t.empty_write_buffer();
        assert_eq!(t.write_buffer().len(), 0);
    }

    #[test]
    fn must_accept_writes_after_buffer_emptied() {
        let mut t = TBufferTransport::with_capacity(0, 2);

        let bytes_to_write: [u8; 2] = [0x01, 0x02];

        // first write (all bytes written)
        let result = t.write(&bytes_to_write);
        assert_eq!(result.unwrap(), 2);
        assert_eq!(&t.write_buffer(), &bytes_to_write);

        // try write again (nothing should be written)
        let result = t.write(&bytes_to_write);
        assert_eq!(result.unwrap(), 0);
        assert_eq!(&t.write_buffer(), &bytes_to_write); // still the same as before

        // now reset the buffer
        t.empty_write_buffer();
        assert_eq!(t.write_buffer().len(), 0);

        // now try write again - the write should succeed
        let result = t.write(&bytes_to_write);
        assert_eq!(result.unwrap(), 2);
        assert_eq!(&t.write_buffer(), &bytes_to_write);
    }

    #[test]
    fn must_accept_multiple_writes_until_buffer_is_full() {
        let mut t = TBufferTransport::with_capacity(0, 10);

        // first write (all bytes written)
        let bytes_to_write_0: [u8; 2] = [0x01, 0x41];
        let write_0_result = t.write(&bytes_to_write_0);
        assert_eq!(write_0_result.unwrap(), 2);
        assert_eq!(t.write_buffer(), &bytes_to_write_0);

        // second write (all bytes written, starting at index 2)
        let bytes_to_write_1: [u8; 7] = [0x24, 0x41, 0x32, 0x33, 0x11, 0x98, 0xAF];
        let write_1_result = t.write(&bytes_to_write_1);
        assert_eq!(write_1_result.unwrap(), 7);
        assert_eq!(&t.write_buffer()[2..], &bytes_to_write_1);

        // third write (only 1 byte written - that's all we have space for)
        let bytes_to_write_2: [u8; 3] = [0xBF, 0xDA, 0x98];
        let write_2_result = t.write(&bytes_to_write_2);
        assert_eq!(write_2_result.unwrap(), 1);
        assert_eq!(&t.write_buffer()[9..], &bytes_to_write_2[0..1]); // how does this syntax work?!

        // fourth write (no writes are accepted)
        let bytes_to_write_3: [u8; 3] = [0xBF, 0xAA, 0xFD];
        let write_3_result = t.write(&bytes_to_write_3);
        assert_eq!(write_3_result.unwrap(), 0);

        // check the full write buffer
        let mut expected: Vec<u8> = Vec::with_capacity(10);
        expected.extend_from_slice(&bytes_to_write_0);
        expected.extend_from_slice(&bytes_to_write_1);
        expected.extend_from_slice(&bytes_to_write_2[0..1]);
        assert_eq!(t.write_buffer(), &expected[..]);
    }

    #[test]
    fn must_empty_read_buffer() {
        let mut t = TBufferTransport::with_capacity(1, 0);

        let bytes_to_read: [u8; 1] = [0x01];
        let result = t.set_readable_bytes(&bytes_to_read);
        assert_eq!(result, 1);
        assert_eq!(&t.read_buffer(), &bytes_to_read);

        t.empty_read_buffer();
        assert_eq!(t.read_buffer().len(), 0);
    }

    #[test]
    fn must_allow_readable_bytes_to_be_set_after_read_buffer_emptied() {
        let mut t = TBufferTransport::with_capacity(1, 0);

        let bytes_to_read_0: [u8; 1] = [0x01];
        let result = t.set_readable_bytes(&bytes_to_read_0);
        assert_eq!(result, 1);
        assert_eq!(&t.read_buffer(), &bytes_to_read_0);

        t.empty_read_buffer();
        assert_eq!(t.read_buffer().len(), 0);

        let bytes_to_read_1: [u8; 1] = [0x02];
        let result = t.set_readable_bytes(&bytes_to_read_1);
        assert_eq!(result, 1);
        assert_eq!(&t.read_buffer(), &bytes_to_read_1);
    }

    #[test]
    fn must_accept_multiple_reads_until_all_bytes_read() {
        let mut t = TBufferTransport::with_capacity(10, 0);

        let readable_bytes: [u8; 10] = [0xFF, 0xEE, 0xDD, 0xCC, 0xBB, 0x00, 0x1A, 0x2B, 0x3C, 0x4D];

        // check that we're able to set the bytes to be read
        let result = t.set_readable_bytes(&readable_bytes);
        assert_eq!(result, 10);
        assert_eq!(&t.read_buffer(), &readable_bytes);

        // first read
        let mut read_buf_0 = vec![0; 5];
        let read_result = t.read(&mut read_buf_0);
        assert_eq!(read_result.unwrap(), 5);
        assert_eq!(read_buf_0.as_slice(), &(readable_bytes[0..5]));

        // second read
        let mut read_buf_1 = vec![0; 4];
        let read_result = t.read(&mut read_buf_1);
        assert_eq!(read_result.unwrap(), 4);
        assert_eq!(read_buf_1.as_slice(), &(readable_bytes[5..9]));

        // third read (only 1 byte remains to be read)
        let mut read_buf_2 = vec![0; 3];
        let read_result = t.read(&mut read_buf_2);
        assert_eq!(read_result.unwrap(), 1);
        read_buf_2.truncate(1); // FIXME: does the caller have to do this?
        assert_eq!(read_buf_2.as_slice(), &(readable_bytes[9..]));

        // fourth read (nothing should be readable)
        let mut read_buf_3 = vec![0; 10];
        let read_result = t.read(&mut read_buf_3);
        assert_eq!(read_result.unwrap(), 0);
        read_buf_3.truncate(0);

        // check that all the bytes we received match the original (again!)
        let mut bytes_read = Vec::with_capacity(10);
        bytes_read.extend_from_slice(&read_buf_0);
        bytes_read.extend_from_slice(&read_buf_1);
        bytes_read.extend_from_slice(&read_buf_2);
        bytes_read.extend_from_slice(&read_buf_3);
        assert_eq!(&bytes_read, &readable_bytes);
    }

    #[test]
    fn must_allow_reads_to_succeed_after_read_buffer_replenished() {
        let mut t = TBufferTransport::with_capacity(3, 0);

        let readable_bytes_0: [u8; 3] = [0x02, 0xAB, 0x33];

        // check that we're able to set the bytes to be read
        let result = t.set_readable_bytes(&readable_bytes_0);
        assert_eq!(result, 3);
        assert_eq!(&t.read_buffer(), &readable_bytes_0);

        let mut read_buf = vec![0; 4];

        // drain the read buffer
        let read_result = t.read(&mut read_buf);
        assert_eq!(read_result.unwrap(), 3);
        assert_eq!(t.read_buffer(), &read_buf[0..3]);

        // check that a subsequent read fails
        let read_result = t.read(&mut read_buf);
        assert_eq!(read_result.unwrap(), 0);

        // we don't modify the read buffer on failure
        let mut expected_bytes = Vec::with_capacity(4);
        expected_bytes.extend_from_slice(&readable_bytes_0);
        expected_bytes.push(0x00);
        assert_eq!(&read_buf, &expected_bytes);

        // replenish the read buffer again
        let readable_bytes_1: [u8; 2] = [0x91, 0xAA];

        // check that we're able to set the bytes to be read
        let result = t.set_readable_bytes(&readable_bytes_1);
        assert_eq!(result, 2);
        assert_eq!(&t.read_buffer(), &readable_bytes_1);

        // read again
        let read_result = t.read(&mut read_buf);
        assert_eq!(read_result.unwrap(), 2);
        assert_eq!(t.read_buffer(), &read_buf[0..2]);
    }
}