use crate::{auth::server::AuthServer, error::*};
use libzmq_sys as sys;
use sys::errno;
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
use std::{
os::raw::{c_int, c_void},
str, thread,
};
lazy_static! {
static ref GLOBAL_CONTEXT: Ctx = Ctx::new();
}
#[derive(Copy, Clone, Debug)]
#[allow(dead_code)]
enum CtxOption {
IOThreads,
MaxSockets,
MaxMsgSize,
SocketLimit,
IPV6,
Blocky,
}
impl From<CtxOption> for c_int {
fn from(r: CtxOption) -> c_int {
match r {
CtxOption::IOThreads => sys::ZMQ_IO_THREADS as c_int,
CtxOption::MaxSockets => sys::ZMQ_MAX_SOCKETS as c_int,
CtxOption::MaxMsgSize => sys::ZMQ_MAX_MSGSZ as c_int,
CtxOption::SocketLimit => sys::ZMQ_SOCKET_LIMIT as c_int,
CtxOption::IPV6 => sys::ZMQ_IPV6 as c_int,
CtxOption::Blocky => sys::ZMQ_BLOCKY as c_int,
}
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
struct RawCtx {
ctx: *mut c_void,
}
impl RawCtx {
fn new() -> Self {
let ctx = unsafe { sys::zmq_ctx_new() };
if ctx.is_null() {
panic!(msg_from_errno(unsafe { sys::zmq_errno() }));
}
Self { ctx }
}
fn get(self, option: CtxOption) -> i32 {
unsafe { sys::zmq_ctx_get(self.ctx, option.into()) }
}
fn set(self, option: CtxOption, value: i32) -> Result<(), Error> {
let rc = unsafe { sys::zmq_ctx_set(self.ctx, option.into(), value) };
if rc == -1 {
let errno = unsafe { sys::zmq_errno() };
match errno {
errno::EINVAL => {
Err(Error::new(ErrorKind::InvalidInput("invalid value")))
}
_ => panic!(msg_from_errno(errno)),
}
} else {
Ok(())
}
}
fn set_bool(self, opt: CtxOption, flag: bool) -> Result<(), Error> {
self.set(opt, flag as i32)
}
fn terminate(self) {
loop {
let rc = unsafe { sys::zmq_ctx_term(self.ctx) };
if rc == 0 {
break;
} else {
let errno = unsafe { sys::zmq_errno() };
match errno {
errno::EINTR => (),
_ => unreachable!(),
}
}
}
}
fn shutdown(self) {
let rc = unsafe { sys::zmq_ctx_shutdown(self.ctx) };
assert_eq!(rc, 0);
}
}
unsafe impl Send for RawCtx {}
unsafe impl Sync for RawCtx {}
#[derive(Default, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct CtxConfig {
io_threads: Option<i32>,
max_sockets: Option<i32>,
}
impl CtxConfig {
pub fn new() -> Self {
Self::default()
}
pub fn build(&self) -> Result<Ctx, Error> {
let ctx = Ctx::new();
self.apply(ctx.handle())?;
Ok(ctx)
}
pub fn apply(&self, handle: CtxHandle) -> Result<(), Error> {
if let Some(value) = self.io_threads {
handle.set_io_threads(value)?;
}
if let Some(value) = self.max_sockets {
handle.set_max_sockets(value)?;
}
Ok(())
}
pub fn io_threads(&self) -> Option<i32> {
self.io_threads
}
pub fn set_io_threads(&mut self, value: Option<i32>) {
self.io_threads = value;
}
pub fn max_sockets(&mut self) -> Option<i32> {
self.max_sockets
}
pub fn set_max_sockets(&mut self, value: Option<i32>) {
self.max_sockets = value;
}
}
#[derive(Default, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct CtxBuilder {
inner: CtxConfig,
}
impl CtxBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn build(&self) -> Result<Ctx, Error> {
let ctx = Ctx::new();
self.apply(ctx.handle())?;
Ok(ctx)
}
pub fn apply(&self, handle: CtxHandle) -> Result<(), Error> {
self.inner.apply(handle)
}
pub fn io_threads(&mut self, value: i32) -> &mut Self {
self.inner.set_io_threads(Some(value));
self
}
pub fn max_sockets(&mut self, value: i32) -> &mut Self {
self.inner.set_max_sockets(Some(value));
self
}
}
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub struct CtxHandle {
inner: RawCtx,
}
impl CtxHandle {
pub fn io_threads(self) -> i32 {
self.inner.get(CtxOption::IOThreads)
}
pub fn set_io_threads(self, nb_threads: i32) -> Result<(), Error> {
self.inner.set(CtxOption::IOThreads, nb_threads)
}
pub fn max_sockets(self) -> i32 {
self.inner.get(CtxOption::MaxSockets)
}
pub fn set_max_sockets(self, max: i32) -> Result<(), Error> {
self.inner.set(CtxOption::MaxSockets, max)
}
pub fn shutdown(self) {
self.inner.shutdown()
}
pub(crate) fn as_ptr(self) -> *mut c_void {
self.inner.ctx
}
}
#[derive(Eq, PartialEq, Debug)]
pub struct Ctx {
inner: RawCtx,
}
impl Ctx {
pub fn new() -> Self {
let inner = RawCtx::new();
inner.set_bool(CtxOption::IPV6, true).unwrap();
inner.set_bool(CtxOption::Blocky, false).unwrap();
let mut auth = AuthServer::with_ctx(CtxHandle { inner }).unwrap();
thread::spawn(move || auth.run());
Self { inner }
}
pub fn handle(&self) -> CtxHandle {
CtxHandle { inner: self.inner }
}
pub fn global() -> CtxHandle {
GLOBAL_CONTEXT.handle()
}
pub fn io_threads(&self) -> i32 {
self.inner.get(CtxOption::IOThreads)
}
pub fn set_io_threads(&self, nb_threads: i32) -> Result<(), Error> {
self.inner.set(CtxOption::IOThreads, nb_threads)
}
pub fn max_sockets(&self) -> i32 {
self.inner.get(CtxOption::MaxSockets)
}
pub fn set_max_sockets(&self, max: i32) -> Result<(), Error> {
self.inner.set(CtxOption::MaxSockets, max)
}
pub fn socket_limit(&self) -> i32 {
self.inner.get(CtxOption::SocketLimit)
}
pub fn shutdown(&self) {
self.inner.shutdown()
}
}
impl Default for Ctx {
fn default() -> Self {
Self::new()
}
}
impl Drop for Ctx {
fn drop(&mut self) {
self.inner.terminate()
}
}