lapin 0.36.2

AMQP client library
Documentation
use amq_protocol::frame::{BackToTheBuffer, GenError, GenResult, WriteContext};
use std::{cmp, io};

#[derive(Debug, PartialEq, Clone)]
pub(crate) struct Buffer {
    memory: Vec<u8>,
    capacity: usize,
    position: usize,
    end: usize,
}

pub(crate) struct Checkpoint(usize);

impl Buffer {
    pub(crate) fn with_capacity(capacity: usize) -> Buffer {
        Buffer {
            memory: vec![0; capacity],
            capacity,
            position: 0,
            end: 0,
        }
    }

    pub(crate) fn checkpoint(&self) -> Checkpoint {
        Checkpoint(self.end)
    }

    pub(crate) fn rollback(&mut self, checkpoint: Checkpoint) {
        self.end = checkpoint.0;
    }

    pub(crate) fn grow(&mut self, new_size: usize) -> bool {
        if self.capacity >= new_size {
            return false;
        }

        self.memory.resize(new_size, 0);
        self.capacity = new_size;
        true
    }

    pub(crate) fn available_data(&self) -> usize {
        self.end - self.position
    }

    pub(crate) fn available_space(&self) -> usize {
        self.capacity - self.end
    }

    pub(crate) fn consume(&mut self, count: usize) -> usize {
        let cnt = cmp::min(count, self.available_data());
        self.position += cnt;
        cnt
    }

    pub(crate) fn fill(&mut self, count: usize) -> usize {
        let cnt = cmp::min(count, self.available_space());
        self.end += cnt;
        cnt
    }

    pub(crate) fn data(&self) -> &[u8] {
        &self.memory[self.position..self.end]
    }

    pub(crate) fn space(&mut self) -> &mut [u8] {
        &mut self.memory[self.end..self.capacity]
    }

    pub(crate) fn shift(&mut self) {
        let length = self.end - self.position;
        if length > self.position {
            return;
        } else {
            let (start, end) = self.memory.split_at_mut(self.position);
            start[..length].copy_from_slice(&end[..length]);
        }
        self.position = 0;
        self.end = length;
    }

    pub(crate) fn shift_unless_available(&mut self, size: usize) {
        if self.available_space() < size {
            self.shift();
        }
    }
}

impl io::Write for &mut Buffer {
    fn write(&mut self, data: &[u8]) -> io::Result<usize> {
        let amt = self.space().write(data)?;
        self.fill(amt);
        Ok(amt)
    }

    fn flush(&mut self) -> io::Result<()> {
        Ok(())
    }
}

impl BackToTheBuffer for &mut Buffer {
    fn reserve_write_use<
        Tmp,
        Gen: Fn(WriteContext<Self>) -> Result<(WriteContext<Self>, Tmp), GenError>,
        Before: Fn(WriteContext<Self>, Tmp) -> GenResult<Self>,
    >(
        mut s: WriteContext<Self>,
        reserved: usize,
        gen: &Gen,
        before: &Before,
    ) -> Result<WriteContext<Self>, GenError> {
        if s.write.available_space() < reserved {
            return Err(GenError::BufferTooSmall(
                reserved - s.write.available_space(),
            ));
        }
        let start = s.write.end;
        s.write.end += reserved;
        gen(s).and_then(|(s, tmp)| {
            let end = s.write.end;
            s.write.end = start;
            before(s, tmp).and_then(|s| {
                s.write.end = end;
                Ok(s)
            })
        })
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::io::Write;

    #[test]
    fn fill_and_consume() {
        let mut b = Buffer::with_capacity(10);
        assert_eq!(b.available_data(), 0);
        assert_eq!(b.available_space(), 10);
        let res = b.space().write(&b"abcd"[..]).map(|size| {
            b.fill(size);
            size
        });
        assert_eq!(res.ok(), Some(4));
        assert_eq!(b.available_data(), 4);
        assert_eq!(b.available_space(), 6);

        assert_eq!(b.data(), &b"abcd"[..]);

        b.consume(2);
        assert_eq!(b.available_data(), 2);
        assert_eq!(b.available_space(), 6);
        assert_eq!(b.data(), &b"cd"[..]);

        b.shift();
        assert_eq!(b.available_data(), 2);
        assert_eq!(b.available_space(), 8);
        assert_eq!(b.data(), &b"cd"[..]);

        assert_eq!(
            b.space()
                .write(&b"efghijklmnop"[..])
                .map(|size| {
                    b.fill(size);
                    size
                })
                .ok(),
            Some(8)
        );
        assert_eq!(b.available_data(), 10);
        assert_eq!(b.available_space(), 0);
        assert_eq!(b.data(), &b"cdefghijkl"[..]);
        b.shift();
        assert_eq!(b.available_data(), 10);
        assert_eq!(b.available_space(), 0);
        assert_eq!(b.data(), &b"cdefghijkl"[..]);
    }
}