use {
core::{
hash::{Hash, Hasher},
time::Duration,
},
std::{
io::ErrorKind,
os::{
fd::FromRawFd,
linux::net::SocketAddrExt,
unix::net::{SocketAddr, UnixListener, UnixStream},
},
thread,
time::Instant,
},
crate::{Result, SLEEP_DURATION, UdsxUnixStream},
super::Namaste,
};
#[derive(Debug)]
pub struct RwNamaste<R, W> where R: AsRef<[u8]>, W: AsRef<[u8]> {
read_id: R,
read: Option<Namaste>,
write_id: W,
write: Option<Namaste>,
}
impl<R, W> RwNamaste<R, W> where R: AsRef<[u8]>, W: AsRef<[u8]> {
pub fn make(read_id: R, write_id: W) -> Result<Self> {
if read_id.as_ref() != write_id.as_ref() {
Ok(Self {
read_id,
read: None,
write_id,
write: None,
})
} else {
Err(err!("Read ID must be different to write ID"))
}
}
pub fn read(&mut self) -> Result<()> {
loop {
match &self.read {
None => {
let read = match Namaste::make(&self.read_id) {
Ok(namaste) => namaste,
Err(_) => match self.connect_to(&self.read_id) {
Ok(stream) => Namaste {
_sender: super::spawn_thread(unsafe {
UnixListener::from_raw_fd(stream.recv_streams::<_, _, 1>(super::STREAM_ID)?[usize::MIN])
})?,
},
Err(_) => continue,
},
};
return match self.connect_to(&self.write_id) {
Ok(_) => Err(err!("Another write is working")),
Err(_) => Ok(self.read = Some(read)),
};
},
Some(_) => return Ok(()),
};
}
}
pub fn try_read(&mut self, timeout: Duration) -> Result<()> {
let start = Instant::now();
loop {
match self.read() {
Ok(()) => return Ok(()),
Err(_) => {
thread::sleep(SLEEP_DURATION);
match Instant::now().checked_duration_since(start) {
Some(duration) => if duration >= timeout {
return Err(err!(ErrorKind::TimedOut, "Timed out waiting for a read"));
},
None => return Err(err!("Failed calling Instant::checked_duration_since()")),
};
},
};
}
}
pub fn drop_read(&mut self) {
self.read.take();
}
fn connect_to<B>(&self, address: B) -> Result<UnixStream> where B: AsRef<[u8]> {
UnixStream::connect_addr(&SocketAddr::from_abstract_name(address)?)
}
pub fn write(&mut self) -> Result<()> {
match &self.write {
None => {
let write = Namaste::make(&self.write_id)?;
match self.connect_to(&self.read_id) {
Ok(_) => Err(err!("Another read is working")),
Err(_) => Ok(self.write = Some(write)),
}
},
Some(_) => Ok(()),
}
}
pub fn try_write(&mut self, timeout: Duration) -> Result<()> {
let start = Instant::now();
loop {
match self.write() {
Ok(()) => return Ok(()),
Err(_) => {
thread::sleep(SLEEP_DURATION);
match Instant::now().checked_duration_since(start) {
Some(duration) => if duration >= timeout {
return Err(err!(ErrorKind::TimedOut, "Timed out waiting for a write"));
},
None => return Err(err!("Failed calling Instant::checked_duration_since()")),
};
},
};
}
}
pub fn drop_write(&mut self) {
self.write.take();
}
}
impl<R, W> Clone for RwNamaste<R, W> where R: AsRef<[u8]> + Clone, W: AsRef<[u8]> + Clone {
fn clone(&self) -> Self {
Self {
read_id: self.read_id.clone(),
read: None,
write_id: self.write_id.clone(),
write: None,
}
}
}
impl<R, W> Eq for RwNamaste<R, W> where R: AsRef<[u8]>, W: AsRef<[u8]> {}
impl<R, W> PartialEq for RwNamaste<R, W> where R: AsRef<[u8]>, W: AsRef<[u8]> {
fn eq(&self, other: &Self) -> bool {
self.read_id.as_ref() == other.read_id.as_ref() && self.write_id.as_ref() == other.write_id.as_ref()
}
}
impl<R, W> Hash for RwNamaste<R, W> where R: AsRef<[u8]>, W: AsRef<[u8]> {
fn hash<H>(&self, h: &mut H) where H: Hasher {
self.read_id.as_ref().hash(h);
self.write_id.as_ref().hash(h);
}
}