use super::*;
use log::trace;
use runng_derive::{NngGetOpts, NngSetOpts};
use std::ptr;
#[derive(Debug, NngGetOpts, NngSetOpts)]
#[prefix = "nng_stream_"]
pub struct NngStream {
stream: *mut nng_stream,
}
pub type IoVec = Vec<Vec<u8>>;
#[allow(clippy::ptr_arg)]
fn as_iov(vec: &IoVec) -> Vec<nng_iov> {
let mut iovs = Vec::new();
for buffer in vec.iter() {
let iov = nng_iov {
iov_buf: buffer.as_ptr() as *mut _,
iov_len: buffer.len(),
};
iovs.push(iov);
}
iovs
}
impl NngStream {
pub fn send(
&mut self,
queue: &mut impl AioWorkQueue,
iov: IoVec,
) -> oneshot::Receiver<Result<usize>> {
let (sender, receiver) = oneshot::channel();
let send = SendAioWork(self.stream, iov, Some(sender));
let send = Box::new(send);
queue.push_back(send);
receiver
}
pub fn recv(
&mut self,
queue: &mut impl AioWorkQueue,
iov: IoVec,
) -> oneshot::Receiver<Result<IoVec>> {
let (sender, receiver) = oneshot::channel();
let recv = RecvAioWork(self.stream, iov, Some(sender));
let recv = Box::new(recv);
queue.push_back(recv);
receiver
}
pub fn close(&self) {
unsafe { nng_stream_close(self.stream) }
}
}
impl Drop for NngStream {
fn drop(&mut self) {
unsafe { nng_stream_free(self.stream) }
}
}
struct SendAioWork(
*mut nng_stream,
IoVec,
Option<oneshot::Sender<Result<usize>>>,
);
impl AioWork for SendAioWork {
fn begin(&self, aio: &NngAio) {
trace!("Sending...");
unsafe {
let iovs = as_iov(&self.1);
aio.set_iov(&iovs).unwrap();
nng_stream_send(self.0, aio.nng_aio());
}
}
fn finish(&mut self, aio: &NngAio) {
unsafe {
let res = aio.result();
trace!("Send: {:?}", res);
let res = match res {
Ok(()) => Ok(aio.aio_count()),
Err(err) => Err(err),
};
if let Err(err) = self.2.take().unwrap().send(res) {
debug!("Finish failed: {:?}", err);
}
}
}
}
impl NngWrapper for NngStream {
type NngType = *mut nng_stream;
unsafe fn get_nng_type(&self) -> Self::NngType {
self.stream
}
}
struct RecvAioWork(
*mut nng_stream,
IoVec,
Option<oneshot::Sender<Result<IoVec>>>,
);
impl AioWork for RecvAioWork {
fn begin(&self, aio: &NngAio) {
trace!("Receiving...");
unsafe {
let iovs = as_iov(&self.1);
aio.set_iov(&iovs).unwrap();
nng_stream_recv(self.0, aio.nng_aio());
}
}
fn finish(&mut self, aio: &NngAio) {
unsafe {
let res = aio.result();
trace!("Receive: {:?}", res);
let res = res.map(|_| self.1.to_owned());
if let Err(err) = self.2.take().unwrap().send(res) {
debug!("Finish failed: {:?}", err);
}
}
}
}
#[derive(Debug, NngGetOpts, NngSetOpts)]
#[prefix = "nng_stream_listener_"]
pub struct StreamListener {
listener: *mut nng_stream_listener,
}
impl StreamListener {
pub fn alloc(addr: &str) -> Result<Self> {
let mut listener: *mut nng_stream_listener = ptr::null_mut();
let res = unsafe {
let (_cstring, addr) = to_cstr(addr)?;
let res = nng_stream_listener_alloc(&mut listener, addr);
nng_int_to_result(res)
};
res.map(|_| Self { listener })
}
pub fn alloc_url(_url: nng_url) -> Result<Self> {
unimplemented!()
}
pub fn listen(&self) -> Result<()> {
unsafe {
let res = nng_stream_listener_listen(self.listener);
nng_int_to_result(res)
}
}
pub fn accept(
&mut self,
queue: &mut impl AioWorkQueue,
) -> oneshot::Receiver<Result<NngStream>> {
let (sender, receiver) = oneshot::channel();
let accept = AcceptAioWork(self.listener, Some(sender));
let accept = Box::new(accept);
queue.push_back(accept);
receiver
}
pub fn close(&self) {
unsafe { nng_stream_listener_close(self.listener) }
}
}
impl Drop for StreamListener {
fn drop(&mut self) {
unsafe { nng_stream_listener_free(self.listener) }
}
}
impl NngWrapper for StreamListener {
type NngType = *mut nng_stream_listener;
unsafe fn get_nng_type(&self) -> Self::NngType {
self.listener
}
}
struct AcceptAioWork(
*mut nng_stream_listener,
Option<oneshot::Sender<Result<NngStream>>>,
);
impl AioWork for AcceptAioWork {
fn begin(&self, aio: &NngAio) {
trace!("Accepting...");
unsafe {
nng_stream_listener_accept(self.0, aio.nng_aio());
}
}
fn finish(&mut self, aio: &NngAio) {
unsafe {
let res = aio.result();
trace!("Accept: {:?}", res);
let res = match res {
Ok(()) => {
let ptr = aio.get_output(0);
let stream = NngStream {
stream: ptr as *mut nng_stream,
};
Ok(stream)
}
Err(err) => Err(err),
};
if let Err(err) = self.1.take().unwrap().send(res) {
debug!("Finish failed: {:?}", err);
}
}
}
}
#[derive(Debug, NngGetOpts, NngSetOpts)]
#[prefix = "nng_stream_dialer_"]
pub struct StreamDialer {
dialer: *mut nng_stream_dialer,
}
impl StreamDialer {
pub fn alloc(addr: &str) -> Result<Self> {
let mut dialer: *mut nng_stream_dialer = ptr::null_mut();
let res = unsafe {
let (_cstring, addr) = to_cstr(addr)?;
let res = nng_stream_dialer_alloc(&mut dialer, addr);
nng_int_to_result(res)
};
res.map(|_| Self { dialer })
}
pub fn alloc_url(_url: nng_url) -> Result<Self> {
unimplemented!()
}
pub fn dial(&mut self, queue: &mut impl AioWorkQueue) -> oneshot::Receiver<Result<NngStream>> {
let (sender, receiver) = oneshot::channel();
let accept = DialAioWork(self.dialer, Some(sender));
let accept = Box::new(accept);
queue.push_back(accept);
receiver
}
pub fn close(&self) {
unsafe { nng_stream_dialer_close(self.dialer) }
}
}
impl Drop for StreamDialer {
fn drop(&mut self) {
unsafe { nng_stream_dialer_free(self.dialer) }
}
}
impl NngWrapper for StreamDialer {
type NngType = *mut nng_stream_dialer;
unsafe fn get_nng_type(&self) -> Self::NngType {
self.dialer
}
}
struct DialAioWork(
*mut nng_stream_dialer,
Option<oneshot::Sender<Result<NngStream>>>,
);
impl AioWork for DialAioWork {
fn begin(&self, aio: &NngAio) {
unsafe {
nng_stream_dialer_dial(self.0, aio.nng_aio());
}
}
fn finish(&mut self, aio: &NngAio) {
unsafe {
let res = aio.result();
let res = match res {
Ok(()) => {
let ptr = aio.get_output(0);
let stream = NngStream {
stream: ptr as *mut nng_stream,
};
Ok(stream)
}
Err(err) => Err(err),
};
if let Err(err) = self.1.take().unwrap().send(res) {
debug!("Finish failed: {:?}", err);
}
}
}
}