use std::{io, result, fmt};
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering}
};
use std::time::Duration;
use std::os::unix::io::{AsRawFd, RawFd};
use std::marker::PhantomData;
use std::cell::Cell;
use queen_io::{
epoll::{Epoll, Token, Ready, EpollOpt, Source},
queue::spsc::Queue
};
use queen_io::poll;
use nson::Message;
use crate::util::lock::{Lock, LockGuard};
use crate::error::{Result, SendError, RecvError};
pub struct Wire<T: Send> {
capacity: usize,
tx: Queue<result::Result<T, RecvError>>,
rx: Queue<result::Result<T, RecvError>>,
close: Arc<AtomicBool>,
attr: Arc<Lock<Message>>,
send_num: Cell<usize>,
recv_num: Cell<usize>,
_not_sync: PhantomData<*const ()>
}
impl<T: Send> fmt::Debug for Wire<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Wire")
.field("capacity", &self.capacity)
.field("close", &self.is_close())
.field("attr", &*self.attr())
.field("send_num", &self.send_num.get())
.field("recv_num", &self.recv_num.get())
.finish()
}
}
impl<T: Send> Wire<T> {
pub fn pipe(capacity: usize, attr: Message) -> Result<(Wire<T>, Wire<T>)> {
let queue1 = Queue::with_cache(capacity)?;
let queue2 = Queue::with_cache(capacity)?;
let close = Arc::new(AtomicBool::new(false));
let attr = Arc::new(Lock::new(attr));
let wire1 = Wire {
capacity,
tx: queue1.clone(),
rx: queue2.clone(),
close: close.clone(),
attr: attr.clone(),
send_num: Cell::new(0),
recv_num: Cell::new(0),
_not_sync: PhantomData
};
let wire2 = Wire {
capacity,
tx: queue2,
rx: queue1,
close,
attr,
send_num: Cell::new(0),
recv_num: Cell::new(0),
_not_sync: PhantomData
};
Ok((wire1, wire2))
}
#[inline]
pub fn capacity(&self) -> usize {
self.capacity
}
#[inline]
pub fn attr(&self) -> LockGuard<Message> {
self.attr.lock()
}
#[inline]
pub fn close(&self) {
self.tx.push(Err(RecvError::Disconnected));
self.close.store(true, Ordering::Release);
}
#[inline]
pub fn is_close(&self) -> bool {
self.close.load(Ordering::Acquire)
}
#[inline]
pub fn is_full(&self) -> bool {
self.tx.pending() >= self.capacity
}
#[inline]
pub fn pending(&self) -> usize {
self.tx.pending()
}
pub fn send(&self, data: T) -> result::Result<(), SendError<T>> {
if self.is_close() {
return Err(SendError::Disconnected(data))
}
if self.is_full() {
return Err(SendError::Full(data))
}
self.tx.push(Ok(data));
self.send_num.set(self.send_num.get() + 1);
Ok(())
}
#[inline]
pub fn send_num(&self) -> usize {
self.send_num.get()
}
pub fn recv(&self) -> result::Result<T, RecvError> {
match self.rx.pop() {
Some(data) => {
if data.is_ok() {
self.recv_num.set(self.recv_num.get() + 1);
}
data
},
None => Err(RecvError::Empty)
}
}
#[inline]
pub fn recv_num(&self) -> usize {
self.recv_num.get()
}
pub fn wait(&self, timeout: Option<Duration>) -> result::Result<T, RecvError> {
match poll::wait(self.rx.as_raw_fd(), poll::Ready::readable(), timeout) {
Ok(event) => {
if event.is_readable() {
return self.recv()
}
Err(RecvError::TimedOut)
},
Err(_) => {
self.close();
Err(RecvError::Disconnected)
}
}
}
}
impl<T: Send> AsRawFd for Wire<T> {
fn as_raw_fd(&self) -> RawFd {
self.rx.as_raw_fd()
}
}
impl<T: Send> Drop for Wire<T> {
fn drop(&mut self) {
self.close()
}
}
unsafe impl<T: Send> Send for Wire<T> {}
impl<T: Send> Source for Wire<T> {
fn add(&self, epoll: &Epoll, token: Token, interest: Ready, opts: EpollOpt) -> io::Result<()> {
self.rx.add(epoll, token, interest, opts)
}
fn modify(&self, epoll: &Epoll, token: Token, interest: Ready, opts: EpollOpt) -> io::Result<()> {
self.rx.modify(epoll, token, interest, opts)
}
fn delete(&self, epoll: &Epoll) -> io::Result<()> {
self.rx.delete(epoll)
}
}
#[cfg(test)]
mod tests {
use super::Wire;
use std::thread;
use std::time::Duration;
use nson::msg;
use crate::error::{RecvError, SendError};
#[test]
fn send() {
let (wire1, wire2) = Wire::<i32>::pipe(2, msg!{}).unwrap();
assert!(wire1.send(1).is_ok());
drop(wire2);
assert!(wire1.send(2).err() == Some(SendError::Disconnected(2)));
}
#[test]
fn send_full() {
let (wire1, _wire2) = Wire::<i32>::pipe(1, msg!{}).unwrap();
assert!(wire1.send(1).is_ok());
assert!(wire1.send(2).err() == Some(SendError::Full(2)));
}
#[test]
fn test_wait_timeout() {
let (wire1, wire2) = Wire::<i32>::pipe(1, msg!{}).unwrap();
thread::spawn(move || {
thread::sleep(Duration::from_secs(2));
assert!(wire1.send(1).is_ok());
thread::sleep(Duration::from_secs(2));
});
let ret = wire2.wait(Some(Duration::from_secs(1)));
assert!(ret.is_err());
match ret {
Ok(_) => (),
Err(err) => {
assert!(matches!(err, RecvError::TimedOut));
}
}
let ret = wire2.wait(Some(Duration::from_secs(2)));
assert!(ret.is_ok());
thread::sleep(Duration::from_secs(2));
let ret = wire2.wait(Some(Duration::from_secs(2)));
assert!(ret.is_err());
match ret {
Ok(_) => (),
Err(err) => {
assert!(matches!(err, RecvError::Disconnected));
}
}
assert!(wire2.is_close());
}
#[test]
fn test_modify_attr() {
let (wire1, wire2) = Wire::<i32>::pipe(1, msg!{"a": 0}).unwrap();
thread::spawn(move || {
for _ in 0..1000 {
let mut attr = wire1.attr();
let a = attr.get_i32("a").unwrap();
attr.insert("a", a + 1);
drop(attr);
}
});
for _ in 0..1000 {
let mut attr = wire2.attr();
let a = attr.get_i32("a").unwrap();
attr.insert("a", a + 1);
drop(attr);
}
thread::sleep(Duration::from_millis(100));
let attr = wire2.attr();
assert!(attr.get_i32("a").unwrap() == 2000);
}
}