use alloc::sync::Arc;
#[cfg(feature = "builder")]
pub use builder::ContextBuilder;
use derive_more::{Debug as DebugDeriveMore, Display as DisplayDeriveMore};
use num_traits::PrimInt;
use crate::{ZmqResult, ffi::RawContext, zmq_sys_crate};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum ContextOption {
IoThreads,
MaxSockets,
ThreadPriority,
ThreadSchedulingPolicy,
MaxMessageSize,
ThreadAffinityCPUAdd,
ThreadAffinityCPURemove,
ThreadNamePrefix,
#[cfg(feature = "draft-api")]
ZeroCopyReceiving,
IPv6,
Blocky,
SocketLimit,
}
impl From<ContextOption> for i32 {
fn from(value: ContextOption) -> Self {
match value {
ContextOption::Blocky => zmq_sys_crate::ZMQ_BLOCKY as i32,
ContextOption::IoThreads => zmq_sys_crate::ZMQ_IO_THREADS as i32,
ContextOption::SocketLimit => zmq_sys_crate::ZMQ_SOCKET_LIMIT as i32,
ContextOption::ThreadSchedulingPolicy => zmq_sys_crate::ZMQ_THREAD_SCHED_POLICY as i32,
ContextOption::ThreadPriority => zmq_sys_crate::ZMQ_THREAD_PRIORITY as i32,
ContextOption::ThreadAffinityCPUAdd => {
zmq_sys_crate::ZMQ_THREAD_AFFINITY_CPU_ADD as i32
}
ContextOption::ThreadAffinityCPURemove => {
zmq_sys_crate::ZMQ_THREAD_AFFINITY_CPU_REMOVE as i32
}
ContextOption::ThreadNamePrefix => zmq_sys_crate::ZMQ_THREAD_NAME_PREFIX as i32,
ContextOption::MaxMessageSize => zmq_sys_crate::ZMQ_MAX_MSGSZ as i32,
ContextOption::MaxSockets => zmq_sys_crate::ZMQ_MAX_SOCKETS as i32,
ContextOption::IPv6 => zmq_sys_crate::ZMQ_IPV6 as i32,
#[cfg(feature = "draft-api")]
ContextOption::ZeroCopyReceiving => zmq_sys_crate::ZMQ_ZERO_COPY_RECV as i32,
}
}
}
#[cfg(test)]
mod context_option_tests {
use rstest::*;
use super::ContextOption;
use crate::zmq_sys_crate;
#[rstest]
#[case(ContextOption::Blocky, zmq_sys_crate::ZMQ_BLOCKY as i32)]
#[case(ContextOption::IoThreads, zmq_sys_crate::ZMQ_IO_THREADS as i32)]
#[case(ContextOption::SocketLimit, zmq_sys_crate::ZMQ_SOCKET_LIMIT as i32)]
#[case(ContextOption::ThreadSchedulingPolicy, zmq_sys_crate::ZMQ_THREAD_SCHED_POLICY as i32)]
#[case(ContextOption::ThreadPriority, zmq_sys_crate::ZMQ_THREAD_PRIORITY as i32)]
#[case(ContextOption::ThreadAffinityCPUAdd, zmq_sys_crate::ZMQ_THREAD_AFFINITY_CPU_ADD as i32)]
#[case(ContextOption::ThreadAffinityCPURemove, zmq_sys_crate::ZMQ_THREAD_AFFINITY_CPU_REMOVE as i32)]
#[case(ContextOption::ThreadNamePrefix, zmq_sys_crate::ZMQ_THREAD_NAME_PREFIX as i32)]
#[case(ContextOption::MaxMessageSize, zmq_sys_crate::ZMQ_MAX_MSGSZ as i32)]
#[case(ContextOption::MaxSockets, zmq_sys_crate::ZMQ_MAX_SOCKETS as i32)]
#[case(ContextOption::IPv6, zmq_sys_crate::ZMQ_IPV6 as i32)]
#[cfg_attr(feature = "draft-api", case(ContextOption::ZeroCopyReceiving, zmq_sys_crate::ZMQ_ZERO_COPY_RECV as i32))]
fn context_options_convert_to_i32(#[case] option: ContextOption, #[case] expected: i32) {
assert_eq!(<ContextOption as Into<i32>>::into(option), expected);
}
}
#[derive(DebugDeriveMore, DisplayDeriveMore)]
#[debug("ZmqContext {{ ... }}")]
#[display("ZmqContext")]
pub struct Context {
pub(crate) inner: Arc<RawContext>,
}
unsafe impl Send for Context {}
unsafe impl Sync for Context {}
impl Context {
pub fn new() -> ZmqResult<Self> {
let inner = RawContext::new()?;
Ok(Self::from_raw_context(inner))
}
pub(crate) fn from_raw_context(raw_context: RawContext) -> Self {
Self {
inner: raw_context.into(),
}
}
pub(crate) fn as_raw(&self) -> &RawContext {
&self.inner
}
pub fn set_option_bool(&self, option: ContextOption, value: bool) -> ZmqResult<()> {
self.inner.set_ctxopt_bool(option.into(), value)
}
pub fn set_option_int<V>(&self, option: ContextOption, value: V) -> ZmqResult<()>
where
V: PrimInt + Into<i32>,
{
self.inner.set_ctxopt_int(option.into(), value)
}
#[cfg(feature = "draft-api")]
pub fn set_option_string<V>(&self, option: ContextOption, value: V) -> ZmqResult<()>
where
V: AsRef<str>,
{
self.inner.set_ctxopt_string(option.into(), value.as_ref())
}
pub fn get_option_bool(&self, option: ContextOption) -> ZmqResult<bool> {
self.inner.get_ctxpt_bool(option.into())
}
pub fn get_option_int<V>(&self, option: ContextOption) -> ZmqResult<V>
where
V: PrimInt + From<i32>,
{
self.inner.get_ctxopt_int(option.into())
}
#[cfg(feature = "draft-api")]
pub fn get_option_string(&self, option: ContextOption) -> ZmqResult<String> {
self.inner.get_ctxopt_string(option.into())
}
pub fn set_blocky(&self, value: bool) -> ZmqResult<()> {
self.set_option_bool(ContextOption::Blocky, value)
}
pub fn blocky(&self) -> ZmqResult<bool> {
self.get_option_bool(ContextOption::Blocky)
}
pub fn set_io_threads(&self, value: i32) -> ZmqResult<()> {
self.set_option_int(ContextOption::IoThreads, value)
}
pub fn io_threads(&self) -> ZmqResult<i32> {
self.get_option_int(ContextOption::IoThreads)
}
pub fn set_max_message_size(&self, value: i32) -> ZmqResult<()> {
self.set_option_int(ContextOption::MaxMessageSize, value)
}
pub fn max_message_size(&self) -> ZmqResult<i32> {
self.get_option_int(ContextOption::MaxMessageSize)
}
pub fn set_max_sockets(&self, value: i32) -> ZmqResult<()> {
self.set_option_int(ContextOption::MaxSockets, value)
}
pub fn max_sockets(&self) -> ZmqResult<i32> {
self.get_option_int(ContextOption::MaxSockets)
}
pub fn socket_limit(&self) -> ZmqResult<i32> {
self.get_option_int(ContextOption::SocketLimit)
}
pub fn set_ipv6(&self, value: bool) -> ZmqResult<()> {
self.set_option_bool(ContextOption::IPv6, value)
}
pub fn ipv6(&self) -> ZmqResult<bool> {
self.get_option_bool(ContextOption::IPv6)
}
#[cfg(feature = "draft-api")]
pub fn set_zero_copy_receiving(&self, value: bool) -> ZmqResult<()> {
self.set_option_bool(ContextOption::ZeroCopyReceiving, value)
}
#[cfg(feature = "draft-api")]
pub fn zero_copy_receiving(&self) -> ZmqResult<bool> {
self.get_option_bool(ContextOption::ZeroCopyReceiving)
}
pub fn shutdown(&self) -> ZmqResult<()> {
self.inner.shutdown()
}
}
impl Clone for Context {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
#[cfg(feature = "builder")]
mod builder {
use derive_builder::Builder;
use serde::{Deserialize, Serialize};
use crate::{ZmqResult, context::Context};
#[derive(Default, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Builder)]
#[builder(
pattern = "owned",
name = "ContextBuilder",
public,
build_fn(skip, error = "ZmqError"),
derive(PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)
)]
#[builder_struct_attr(doc = "Builder for [`Context`].\n\n")]
#[allow(dead_code)]
struct ContextConfig {
#[builder(default = true)]
blocky: bool,
#[builder(setter(into), default = 1)]
io_threads: i32,
#[builder(setter(into), default = "i32::MAX")]
max_message_size: i32,
#[cfg(feature = "draft-api")]
#[builder(default = true)]
zero_copy_receiving: bool,
#[builder(setter(into), default = 1023)]
max_sockets: i32,
#[builder(default = false)]
ipv6: bool,
}
impl ContextBuilder {
pub fn apply(self, context: &Context) -> ZmqResult<()> {
if let Some(blocky) = self.blocky {
context.set_blocky(blocky)?;
}
if let Some(io_threads) = self.io_threads {
context.set_io_threads(io_threads)?;
}
if let Some(max_msg_size) = self.max_message_size {
context.set_max_message_size(max_msg_size)?;
}
if let Some(max_sockets) = self.max_sockets {
context.set_max_sockets(max_sockets)?;
}
if let Some(ipv6) = self.ipv6 {
context.set_ipv6(ipv6)?;
}
#[cfg(feature = "draft-api")]
if let Some(zero_copy_receiving) = self.zero_copy_receiving {
context.set_zero_copy_receiving(zero_copy_receiving)?;
}
Ok(())
}
pub fn build(self) -> ZmqResult<Context> {
let context = Context::new()?;
self.apply(&context)?;
Ok(context)
}
}
#[cfg(test)]
mod context_builder_tests {
use super::ContextBuilder;
use crate::prelude::ZmqResult;
#[test]
fn context_builder_with_default_settings() -> ZmqResult<()> {
let context = ContextBuilder::default().build()?;
assert!(context.blocky()?);
assert_eq!(context.max_message_size()?, i32::MAX);
assert!(!context.ipv6()?);
assert_eq!(context.max_sockets()?, 1023);
assert_eq!(context.io_threads()?, 1);
Ok(())
}
#[test]
fn context_builder_with_custom_settings() -> ZmqResult<()> {
let context = ContextBuilder::default()
.blocky(true)
.max_message_size(42)
.ipv6(true)
.max_sockets(21)
.io_threads(2)
.build()?;
assert!(context.blocky()?);
assert_eq!(context.max_message_size()?, 42);
assert!(context.ipv6()?);
assert_eq!(context.max_sockets()?, 21);
assert_eq!(context.io_threads()?, 2);
Ok(())
}
#[cfg(feature = "draft-api")]
#[test]
fn context_builder_with_draft_api_settings() -> ZmqResult<()> {
let context = ContextBuilder::default()
.zero_copy_receiving(false)
.build()?;
assert!(!context.zero_copy_receiving()?);
Ok(())
}
}
}
#[cfg(test)]
mod context_tests {
use rstest::*;
use super::Context;
#[cfg(feature = "draft-api")]
use crate::prelude::ContextOption;
use crate::prelude::{ZmqError, ZmqResult};
#[rstest]
#[case(true)]
#[case(false)]
fn context_with_blocky_option(#[case] option_value: bool) -> ZmqResult<()> {
let context = Context::new()?;
context.set_blocky(option_value)?;
assert_eq!(context.blocky()?, option_value);
Ok(())
}
#[rstest]
#[case(0)]
#[case(1)]
#[case(42)]
#[case(i32::MAX)]
fn context_with_io_threads(#[case] option_value: i32) -> ZmqResult<()> {
let context = Context::new()?;
context.set_io_threads(option_value)?;
assert_eq!(context.io_threads()?, option_value);
Ok(())
}
#[test]
fn context_with_invalid_io_threads() -> ZmqResult<()> {
let context = Context::new()?;
let result = context.set_io_threads(-1);
assert!(result.is_err_and(|err| err == ZmqError::InvalidArgument));
Ok(())
}
#[rstest]
#[case(0)]
#[case(1)]
#[case(42)]
#[case(i32::MAX)]
fn context_with_max_message_size(#[case] option_value: i32) -> ZmqResult<()> {
let context = Context::new()?;
context.set_max_message_size(option_value)?;
assert_eq!(context.max_message_size()?, option_value);
Ok(())
}
#[test]
fn context_with_invalid_max_message_size() -> ZmqResult<()> {
let context = Context::new()?;
let result = context.set_max_message_size(-1);
assert!(result.is_err_and(|err| err == ZmqError::InvalidArgument));
Ok(())
}
#[rstest]
#[case(1)]
#[case(42)]
#[case(i32::MAX)]
fn context_with_max_sockets(#[case] option_value: i32) -> ZmqResult<()> {
let context = Context::new()?;
context.set_max_sockets(option_value)?;
assert_eq!(context.max_sockets()?, option_value);
Ok(())
}
#[rstest]
#[case(0)]
#[case(-1)]
fn context_with_invalid_max_sockets(#[case] option_value: i32) -> ZmqResult<()> {
let context = Context::new()?;
let result = context.set_max_sockets(option_value);
assert!(result.is_err_and(|err| err == ZmqError::InvalidArgument));
Ok(())
}
#[test]
fn context_socket_limit() -> ZmqResult<()> {
let context = Context::new()?;
assert_eq!(context.socket_limit()?, 65535);
Ok(())
}
#[rstest]
#[case(true)]
#[case(false)]
fn context_with_ipv6(#[case] option_value: bool) -> ZmqResult<()> {
let context = Context::new()?;
context.set_ipv6(option_value)?;
assert_eq!(context.ipv6()?, option_value);
Ok(())
}
#[cfg(feature = "draft-api")]
#[rstest]
#[case(true)]
#[case(false)]
fn context_with_zero_copy_receiving(#[case] option_value: bool) -> ZmqResult<()> {
let context = Context::new()?;
context.set_zero_copy_receiving(option_value)?;
assert_eq!(context.zero_copy_receiving()?, option_value);
Ok(())
}
#[cfg(feature = "draft-api")]
#[test]
fn context_with_threadname_prefix() -> ZmqResult<()> {
let context = Context::new()?;
context.set_option_string(ContextOption::ThreadNamePrefix, "asdf")?;
assert_eq!(
context.get_option_string(ContextOption::ThreadNamePrefix)?,
"asdf"
);
Ok(())
}
}