use std::marker::PhantomData;
use psrdada_sys::*;
use tracing::{debug, error};
use crate::{
client::{DataClient, HeaderClient},
errors::{PsrdadaError, PsrdadaResult},
};
mod private {
pub struct Token;
}
pub trait DadaClient {
fn buf(&mut self, _: private::Token) -> *const ipcbuf_t;
fn state(&mut self) -> State {
unsafe { *self.buf(private::Token) }.state.into()
}
fn reader(&mut self) -> PsrdadaResult<Reader> {
Reader::new(self)
}
fn writer(&mut self) -> PsrdadaResult<Writer> {
Writer::new(self)
}
}
pub struct Writer<'a> {
buf: *const ipcbuf_t,
_phantom: PhantomData<&'a ipcbuf_t>,
}
impl Writer<'_> {
fn lock(&mut self) -> PsrdadaResult<()> {
debug!("Locking buffer for writing");
if unsafe { ipcbuf_lock_write(self.buf as *mut _) } != 0 {
error!("Couldn't lock buffer for writing");
Err(PsrdadaError::DadaLockingError)
} else {
Ok(())
}
}
fn unlock(&mut self) -> PsrdadaResult<()> {
debug!("Unlocking buffer from writing");
if unsafe { ipcbuf_unlock_write(self.buf as *mut _) } != 0 {
error!("Couldn't unlock buffer from writing");
Err(PsrdadaError::DadaLockingError)
} else {
Ok(())
}
}
fn new<T: DadaClient + ?Sized>(client: &mut T) -> PsrdadaResult<Self> {
let mut writer = Self {
buf: client.buf(private::Token),
_phantom: PhantomData,
};
writer.lock()?;
Ok(writer)
}
}
impl Drop for Writer<'_> {
fn drop(&mut self) {
let _ = self.unlock();
}
}
pub struct Reader<'a> {
buf: *const ipcbuf_t,
_phantom: PhantomData<&'a ipcbuf_t>,
}
impl Reader<'_> {
fn lock(&mut self) -> PsrdadaResult<()> {
debug!("Locking buffer for reading");
if unsafe { ipcbuf_lock_read(self.buf as *mut _) } != 0 {
error!("Couldn't lock buffer for reading");
Err(PsrdadaError::DadaLockingError)
} else {
Ok(())
}
}
fn unlock(&mut self) -> PsrdadaResult<()> {
debug!("Unlocking buffer from reading");
if unsafe { ipcbuf_unlock_read(self.buf as *mut _) } != 0 {
error!("Couldn't unlock buffer from reading");
Err(PsrdadaError::DadaLockingError)
} else {
Ok(())
}
}
fn new<T: DadaClient + ?Sized>(client: &mut T) -> PsrdadaResult<Self> {
let mut reader = Self {
buf: client.buf(private::Token),
_phantom: PhantomData,
};
reader.lock()?;
Ok(reader)
}
}
impl Drop for Reader<'_> {
fn drop(&mut self) {
let _ = self.unlock();
}
}
impl DadaClient for HeaderClient<'_> {
fn buf(&mut self, _: private::Token) -> *const ipcbuf_t {
self.buf
}
}
impl DadaClient for DataClient<'_> {
fn buf(&mut self, _: private::Token) -> *const ipcbuf_t {
self.buf
}
}
pub mod read;
pub mod write;
#[repr(i32)]
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub enum State {
Disconnected = 0, Connected = 1, Writer = 2, Writing = 3, WriteChange = 4, Reader = 5, Reading = 6, ReadStop = 7, Viewing = 8, ViewStop = 9, }
impl From<i32> for State {
fn from(value: i32) -> Self {
match value {
0 => State::Disconnected,
1 => State::Connected,
2 => State::Writer,
3 => State::Writing,
4 => State::WriteChange,
5 => State::Reader,
6 => State::Reading,
7 => State::ReadStop,
8 => State::Viewing,
9 => State::ViewStop,
_ => unreachable!(),
}
}
}
#[cfg(test)]
mod tests {
use std::io::{Read, Write};
use test_log::test;
use crate::{
builder::DadaClientBuilder, client::HduClient, io::DadaClient, iter::DadaIterator,
tests::next_key,
};
#[test]
fn test_read_write_many() {
let key = next_key();
let mut client = DadaClientBuilder::new(key)
.num_bufs(4)
.buf_size(4)
.build()
.unwrap();
let (_, mut dc) = client.split();
let mut writer = dc.writer().unwrap();
for i in 0..4 {
let mut block = writer.next().unwrap();
block.write_all(&[0, 1, 2, 3]).unwrap();
if i == 3 {
block.mark_eod();
}
}
drop(writer);
let mut reader = dc.reader().unwrap();
let mut buf = [0u8; 4];
while let Some(mut block) = reader.next() {
block.read_exact(&mut buf).unwrap();
assert_eq!(buf, [0, 1, 2, 3]);
}
}
#[test]
fn test_multithreaded_read_write_many() {
let key = next_key();
let mut client = DadaClientBuilder::new(key)
.num_bufs(4)
.buf_size(4)
.build()
.unwrap();
let handle = std::thread::spawn(move || {
let mut client = HduClient::connect(key).unwrap();
let (_, mut dc) = client.split();
let mut reader = dc.reader().unwrap();
let mut buf = [0u8; 4];
while let Some(mut block) = reader.next() {
block.read_exact(&mut buf).unwrap();
assert_eq!(buf, [0, 1, 2, 3]);
}
});
let (_, mut dc) = client.split();
let mut writer = dc.writer().unwrap();
for i in 0..4 {
let mut block = writer.next().unwrap();
block.write_all(&[0, 1, 2, 3]).unwrap();
if i == 3 {
block.mark_eod();
}
}
handle.join().unwrap();
}
#[test]
fn test_read_to_vec() {}
}