use std::marker::PhantomData;
use psrdada_sys::*;
use tracing::{debug, error};
use super::Reader;
use crate::iter::DadaIterator;
pub struct ReadBlock<'a> {
buf: *const ipcbuf_t,
bytes_read: usize,
bytes: &'a [u8],
_phantom: PhantomData<&'a ipcbuf_t>,
}
impl ReadBlock<'_> {
pub fn new(reader: &mut Reader) -> Option<Self> {
if unsafe { ipcbuf_eod(reader.buf as *mut _) } == 1 {
debug!("EOD set - returning None");
return None;
}
debug!("Grabbing next readable block");
let mut block_size = 0;
let ptr =
unsafe { ipcbuf_get_next_read(reader.buf as *mut _, &mut block_size) } as *const u8;
let bytes = unsafe { std::slice::from_raw_parts(ptr, block_size as usize) };
if ptr.is_null() {
error!("Next block returned NULL");
if unsafe { ipcbuf_unlock_read(reader.buf as *mut _) } != 0 {
error!("Error unlocking the read block");
}
return None;
}
Some(Self {
buf: reader.buf,
bytes_read: 0,
bytes,
_phantom: PhantomData,
})
}
pub fn done(self) {}
pub fn block(&mut self) -> &[u8] {
self.bytes
}
}
impl Drop for ReadBlock<'_> {
fn drop(&mut self) {
if unsafe { ipcbuf_mark_cleared(self.buf as *mut _) } != 0 {
error!("Couldn't mark the block as fully read");
}
}
}
impl DadaIterator for Reader<'_> {
type Item<'next> = ReadBlock<'next>
where
Self: 'next;
fn next(&mut self) -> Option<Self::Item<'_>> {
ReadBlock::new(self)
}
}
impl std::io::Read for ReadBlock<'_> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let bytes_left_to_read = self.block().len() - self.bytes_read;
if bytes_left_to_read == 0 {
Ok(0)
} else if bytes_left_to_read <= buf.len() {
buf[..bytes_left_to_read].clone_from_slice(&self.bytes[self.bytes_read..]);
self.bytes_read += bytes_left_to_read;
Ok(bytes_left_to_read)
} else {
let bytes_to_read = buf.len();
buf.clone_from_slice(&self.bytes[self.bytes_read..(self.bytes_read + bytes_to_read)]);
self.bytes_read += bytes_to_read;
Ok(bytes_to_read)
}
}
}
#[cfg(test)]
mod tests {
use std::io::{Read, Write};
use test_log::test;
use crate::{
builder::DadaClientBuilder,
io::{read::ReadBlock, DadaClient},
iter::DadaIterator,
tests::next_key,
};
#[test]
fn test_read_write() {
let key = next_key();
let mut client = DadaClientBuilder::new(key).build().unwrap();
let (_, mut dc) = client.split();
let mut writer = dc.writer().unwrap();
let mut block = writer.next().unwrap();
block.write_all(&[0, 1, 2, 3]).unwrap();
block.commit();
drop(writer);
let mut reader = dc.reader().unwrap();
let mut block = ReadBlock::new(&mut reader).unwrap();
assert_eq!(block.block().len(), 4);
assert_eq!(block.block(), &[0, 1, 2, 3]);
block.done();
}
#[test]
fn test_read_write_implicit_eod() {
let key = next_key();
let mut client = DadaClientBuilder::new(key).buf_size(4).build().unwrap();
let (_, mut dc) = client.split();
let mut writer = dc.writer().unwrap();
let mut block = writer.next().unwrap();
block.write_all(&[0, 1, 2]).unwrap();
block.commit();
drop(writer);
let mut reader = dc.reader().unwrap();
let mut block = ReadBlock::new(&mut reader).unwrap();
assert_eq!(block.block().len(), 3);
assert_eq!(block.block(), &[0, 1, 2]);
block.done();
let block = ReadBlock::new(&mut reader);
assert!(block.is_none())
}
#[test]
fn test_read_write_explicit_eod() {
let key = next_key();
let mut client = DadaClientBuilder::new(key).buf_size(4).build().unwrap();
let (_, mut dc) = client.split();
let mut writer = dc.writer().unwrap();
let mut block = writer.next().unwrap();
block.write_all(&[0, 1, 2, 3]).unwrap();
block.mark_eod();
block.commit();
drop(writer);
let mut reader = dc.reader().unwrap();
let mut block = ReadBlock::new(&mut reader).unwrap();
assert_eq!(block.block().len(), 4);
assert_eq!(block.block(), &[0, 1, 2, 3]);
block.done();
let block = ReadBlock::new(&mut reader);
assert!(block.is_none())
}
#[test]
fn test_read_write_with_iter() {
let key = next_key();
let mut client = DadaClientBuilder::new(key).build().unwrap();
let (_, mut dc) = client.split();
let mut writer = dc.writer().unwrap();
let mut block = writer.next().unwrap();
block.write_all(&[0, 1, 2, 3]).unwrap();
block.commit();
drop(writer);
let mut reader = dc.reader().unwrap();
let mut block = reader.next().unwrap();
assert_eq!(block.block().len(), 4);
assert_eq!(block.block(), &[0, 1, 2, 3]);
block.done();
}
#[test]
fn test_read_write_with_std_read() {
let key = next_key();
let mut client = DadaClientBuilder::new(key).build().unwrap();
let (_, mut dc) = client.split();
let mut writer = dc.writer().unwrap();
let mut block = writer.next().unwrap();
block.write_all(&[0, 1, 2, 3]).unwrap();
block.commit();
drop(writer);
let mut reader = dc.reader().unwrap();
let mut block = reader.next().unwrap();
let mut buf = [0u8; 4];
block.read_exact(&mut buf).unwrap();
assert_eq!(buf, [0, 1, 2, 3]);
block.done();
}
#[test]
fn test_read_write_with_std_read_to_vec() {
let key = next_key();
let mut client = DadaClientBuilder::new(key).build().unwrap();
let (_, mut dc) = client.split();
let mut writer = dc.writer().unwrap();
let mut block = writer.next().unwrap();
block.write_all(&[0, 1, 2, 3]).unwrap();
block.commit();
drop(writer);
let mut reader = dc.reader().unwrap();
let mut block = ReadBlock::new(&mut reader).unwrap();
let mut buf = vec![];
block.read_to_end(&mut buf).unwrap();
assert_eq!(buf, [0, 1, 2, 3]);
block.done();
}
}