use ::regex::Regex;
use ::std::sync::Arc;
#[derive(Clone, Debug)]
pub struct NSQDeflateLevel {
pub(crate) level: u8,
}
impl NSQDeflateLevel {
pub fn new(level: u8) -> Option<NSQDeflateLevel> {
if !(1..=9).contains(&level) {
None
} else {
Some(NSQDeflateLevel { level })
}
}
pub fn get(&self) -> u8 {
self.level
}
}
#[derive(Clone)]
pub struct NSQConfigSharedTLS {
pub(crate) required: bool,
pub(crate) client_config: Option<Arc<rustls::ClientConfig>>,
pub(crate) domain_name: String,
}
impl NSQConfigSharedTLS {
pub fn new<S: Into<String>>(domain: S) -> Self {
NSQConfigSharedTLS {
required: true,
client_config: None,
domain_name: domain.into(),
}
}
pub fn set_required(mut self, required: bool) -> Self {
self.required = required;
self
}
pub fn set_client_config(
mut self,
client_config: Arc<rustls::ClientConfig>,
) -> Self {
self.client_config = Some(client_config);
self
}
}
#[derive(Debug, Clone)]
pub enum NSQConfigSharedCompression {
Deflate(NSQDeflateLevel),
Snappy,
}
#[derive(Clone)]
pub struct NSQConfigShared {
pub(crate) backoff_max_wait: std::time::Duration,
pub(crate) backoff_healthy_after: std::time::Duration,
pub(crate) compression: Option<NSQConfigSharedCompression>,
pub(crate) tls: Option<NSQConfigSharedTLS>,
pub(crate) credentials: Option<Vec<u8>>,
pub(crate) client_id: Option<String>,
pub(crate) write_timeout: Option<std::time::Duration>,
pub(crate) read_timeout: Option<std::time::Duration>,
pub(crate) hostname: Option<String>,
pub(crate) user_agent: Option<String>,
pub(crate) flush_interval: std::time::Duration,
}
impl NSQConfigShared {
pub fn new() -> Self {
NSQConfigShared {
backoff_max_wait: std::time::Duration::new(60, 0),
backoff_healthy_after: std::time::Duration::new(45, 0),
compression: None,
tls: None,
credentials: None,
client_id: None,
write_timeout: Some(std::time::Duration::new(10, 0)),
read_timeout: Some(std::time::Duration::new(60, 0)),
hostname: None,
user_agent: None,
flush_interval: std::time::Duration::from_millis(250),
}
}
pub fn set_backoff_max_wait(
mut self,
duration: std::time::Duration,
) -> Self {
self.backoff_max_wait = duration;
self
}
pub fn set_backoff_healthy_after(
mut self,
duration: std::time::Duration,
) -> Self {
self.backoff_healthy_after = duration;
self
}
pub fn set_compression(
mut self,
compression: NSQConfigSharedCompression,
) -> Self {
self.compression = Some(compression);
self
}
pub fn set_credentials(mut self, credentials: Vec<u8>) -> Self {
self.credentials = Some(credentials);
self
}
pub fn set_tls(mut self, tls: NSQConfigSharedTLS) -> Self {
self.tls = Some(tls);
self
}
pub fn set_client_id<S: Into<String>>(mut self, client_id: S) -> Self {
self.client_id = Some(client_id.into());
self
}
pub fn set_write_timeout(
mut self,
duration: Option<std::time::Duration>,
) -> Self {
self.write_timeout = duration;
self
}
pub fn set_read_timeout(
mut self,
duration: Option<std::time::Duration>,
) -> Self {
self.read_timeout = duration;
self
}
pub fn set_hostname<S: Into<String>>(mut self, hostname: S) -> Self {
self.hostname = Some(hostname.into());
self
}
pub fn set_user_agent<S: Into<String>>(mut self, user_agent: S) -> Self {
self.user_agent = Some(user_agent.into());
self
}
pub fn set_flush_interval(mut self, duration: std::time::Duration) -> Self {
self.flush_interval = duration;
self
}
}
impl Default for NSQConfigShared {
fn default() -> Self {
Self::new()
}
}
lazy_static! {
static ref NAMEREGEX: Regex =
Regex::new(r"^[\.a-zA-Z0-9_-]+(#ephemeral)?$").unwrap();
}
fn is_valid_name(name: &str) -> bool {
if name.is_empty() || name.len() > 64 {
return false;
}
NAMEREGEX.is_match(name)
}
#[derive(Clone, Debug)]
pub struct NSQTopic {
pub(crate) topic: String,
}
impl NSQTopic {
pub fn new<S: Into<String>>(topic: S) -> Option<Arc<Self>> {
let topic = topic.into();
if is_valid_name(&topic) {
Some(Arc::new(Self { topic }))
} else {
None
}
}
}
#[derive(Clone, Debug)]
pub struct NSQChannel {
pub(crate) channel: String,
}
impl NSQChannel {
pub fn new<S: Into<String>>(channel: S) -> Option<Arc<Self>> {
let channel = channel.into();
if is_valid_name(&channel) {
Some(Arc::new(Self { channel }))
} else {
None
}
}
}
#[derive(Clone, Debug, Copy)]
pub struct NSQSampleRate {
pub(crate) rate: u8,
}
impl NSQSampleRate {
pub fn new(rate: u8) -> Option<NSQSampleRate> {
if !(1..=100).contains(&rate) {
None
} else {
Some(NSQSampleRate { rate })
}
}
pub fn get(&self) -> u8 {
self.rate
}
}
#[derive(Clone)]
pub struct NSQDConfig {
pub address: String,
pub subscribe: Option<(Arc<NSQTopic>, Arc<NSQChannel>)>,
pub shared: NSQConfigShared,
pub sample_rate: Option<NSQSampleRate>,
pub max_requeue_delay: std::time::Duration,
pub base_requeue_delay: std::time::Duration,
}