pub mod aio;
pub mod bus;
pub mod pair;
pub mod pair_stream;
pub mod pull;
pub mod pull_stream;
pub mod push;
pub mod reply;
pub mod reply_stream;
pub mod request;
pub mod simple;
pub mod stream;
pub use self::aio::*;
pub use self::bus::*;
pub use self::pair::*;
pub use self::pair_stream::*;
pub use self::pull::*;
pub use self::pull_stream::*;
pub use self::push::*;
pub use self::reply::*;
pub use self::reply_stream::*;
pub use self::request::*;
pub use self::simple::*;
pub use self::stream::*;
use crate::{msg::NngMsg, *};
use futures::{
channel::{mpsc, oneshot},
future,
sink::SinkExt,
Sink,
};
use futures_util::future::FutureExt;
use log::debug;
use runng_sys::*;
use std::collections::VecDeque;
pub trait AsyncContext: Sized {
fn new(socket: NngSocket) -> Result<Self>;
}
pub trait AsyncStreamContext: Sized {
fn new(socket: NngSocket, buffer: usize) -> Result<Self>;
}
pub trait AsyncSocket: Socket {
type ContextType: AsyncContext;
fn create_async(&self) -> Result<Self::ContextType> {
let socket = self.socket().clone();
let ctx = Self::ContextType::new(socket)?;
Ok(ctx)
}
}
pub trait AsyncStream: Socket {
type ContextType: AsyncStreamContext;
fn create_async_stream(&self, buffer: usize) -> Result<Self::ContextType> {
let socket = self.socket().clone();
let ctx = Self::ContextType::new(socket, buffer)?;
Ok(ctx)
}
}
fn try_signal_complete(sender: &mut mpsc::Sender<Result<NngMsg>>, message: Result<NngMsg>) {
let res = sender.try_send(message);
if let Err(err) = res {
if err.is_disconnected() {
let message = err.into_inner();
debug!("mpsc::disconnected {:?}", message);
futures::executor::block_on(sender.close()).unwrap();
} else {
debug!("mpsc::send failed {}", err);
}
}
}
pub type AsyncMsg = future::BoxFuture<'static, Result<NngMsg>>;
pub type AsyncUnit = future::BoxFuture<'static, Result<()>>;
#[derive(Debug, Default)]
struct WorkQueue {
waiting: VecDeque<oneshot::Sender<Result<NngMsg>>>,
ready: VecDeque<Result<NngMsg>>,
}
impl WorkQueue {
fn push_back(&mut self, message: Result<NngMsg>) {
if let Some(sender) = self.waiting.pop_front() {
sender
.send(message)
.unwrap_or_else(|err| debug!("Dropping message: {:?}", err));
} else {
self.ready.push_back(message);
}
}
fn pop_front(&mut self) -> AsyncMsg {
if let Some(item) = self.ready.pop_front() {
Box::pin(future::ready(item))
} else {
let (sender, receiver) = oneshot::channel();
self.waiting.push_back(sender);
let receiver = receiver.map(result::flatten_result);
Box::pin(receiver)
}
}
}
trait NngSink: Sink<Result<NngMsg>, Error = mpsc::SendError> {}
impl<T: Sink<Result<NngMsg>, Error = mpsc::SendError>> NngSink for T {}
pub trait AioWork {
fn begin(&self, aio: &NngAio);
fn finish(&mut self, aio: &NngAio);
}
pub type AioWorkRequest = Box<dyn AioWork>;
pub trait AioWorkQueue {
fn push_back(&mut self, obj: AioWorkRequest);
}