use portable_atomic::AtomicU64;
use std::io;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
use crossbeam_channel as channel;
use crate::jetstream::{AckPolicy, ConsumerInfo, ConsumerOwnership, JetStream};
use crate::message::Message;
use crate::DEFAULT_FLUSH_TIMEOUT;
#[derive(Debug)]
pub(crate) struct Inner {
pub(crate) sid: Arc<AtomicU64>,
pub(crate) messages: channel::Receiver<Message>,
pub(crate) stream: String,
pub(crate) consumer: String,
pub(crate) consumer_ack_policy: AckPolicy,
pub(crate) num_pending: u64,
pub(crate) consumer_ownership: ConsumerOwnership,
pub(crate) context: JetStream,
}
impl Drop for Inner {
fn drop(&mut self) {
self.context
.connection
.0
.client
.unsubscribe(self.sid.load(Ordering::Relaxed))
.ok();
if self.consumer_ownership == ConsumerOwnership::Yes {
self.context
.delete_consumer(&self.stream, &self.consumer)
.ok();
}
}
}
#[derive(Clone, Debug)]
pub struct PushSubscription(pub(crate) Arc<Inner>);
impl PushSubscription {
pub(crate) fn new(
sid: Arc<AtomicU64>,
consumer_info: ConsumerInfo,
consumer_ownership: ConsumerOwnership,
messages: channel::Receiver<Message>,
context: JetStream,
) -> PushSubscription {
PushSubscription(Arc::new(Inner {
sid,
stream: consumer_info.stream_name,
consumer: consumer_info.name,
consumer_ack_policy: consumer_info.config.ack_policy,
num_pending: consumer_info.num_pending,
consumer_ownership,
messages,
context,
}))
}
fn preprocess(&self, message: &Message) -> bool {
if message.is_flow_control() {
message.respond(b"").ok();
return true;
}
if message.is_idle_heartbeat() {
return true;
}
false
}
pub fn next(&self) -> Option<Message> {
loop {
return match self.0.messages.recv().ok() {
Some(message) => {
if self.preprocess(&message) {
continue;
}
Some(message)
}
None => None,
};
}
}
pub fn try_next(&self) -> Option<Message> {
loop {
return match self.0.messages.try_recv().ok() {
Some(message) => {
if self.preprocess(&message) {
continue;
}
Some(message)
}
None => None,
};
}
}
pub fn next_timeout(&self, mut timeout: Duration) -> io::Result<Message> {
loop {
let start = Instant::now();
return match self.0.messages.recv_timeout(timeout) {
Ok(message) => {
if self.preprocess(&message) {
timeout = timeout.saturating_sub(start.elapsed());
continue;
}
Ok(message)
}
Err(channel::RecvTimeoutError::Timeout) => Err(io::Error::new(
io::ErrorKind::TimedOut,
"next_timeout: timed out",
)),
Err(channel::RecvTimeoutError::Disconnected) => Err(io::Error::new(
io::ErrorKind::Other,
"next_timeout: unsubscribed",
)),
};
}
}
pub fn messages(&self) -> Iter<'_> {
Iter { subscription: self }
}
pub fn iter(&self) -> Iter<'_> {
Iter { subscription: self }
}
pub fn try_iter(&self) -> TryIter<'_> {
TryIter { subscription: self }
}
pub fn timeout_iter(&self, timeout: Duration) -> TimeoutIter<'_> {
TimeoutIter {
subscription: self,
to: timeout,
}
}
pub fn with_handler<F>(self, handler: F) -> Handler
where
F: Fn(Message) -> io::Result<()> + Send + 'static,
{
let sub = self.clone();
thread::Builder::new()
.name(format!(
"nats_jetstream_push_subscriber_{}_{}",
self.0.stream, self.0.consumer,
))
.spawn(move || {
for m in &sub {
if let Err(e) = handler(m) {
log::error!("Error in callback! {:?}", e);
}
}
})
.expect("threads should be spawnable");
Handler { subscription: self }
}
pub fn with_process_handler<F>(self, handler: F) -> Handler
where
F: Fn(&Message) -> io::Result<()> + Send + 'static,
{
let consumer_ack_policy = self.0.consumer_ack_policy;
let sub = self.clone();
thread::Builder::new()
.name(format!(
"nats_push_subscriber_{}_{}",
self.0.consumer, self.0.stream
))
.spawn(move || {
for message in &sub {
if let Err(err) = handler(&message) {
log::error!("Error in callback! {:?}", err);
}
if consumer_ack_policy != AckPolicy::None {
if let Err(err) = message.ack() {
log::error!("Error in callback! {:?}", err);
}
}
}
})
.expect("threads should be spawnable");
Handler { subscription: self }
}
pub fn process<R, F: Fn(&Message) -> io::Result<R>>(&mut self, f: F) -> io::Result<R> {
let next = self.next().unwrap();
let result = f(&next)?;
if self.0.consumer_ack_policy != AckPolicy::None {
next.ack()?;
}
Ok(result)
}
pub fn process_timeout<R, F: Fn(&Message) -> io::Result<R>>(
&mut self,
timeout: Duration,
f: F,
) -> io::Result<R> {
let next = self.next_timeout(timeout)?;
let ret = f(&next)?;
if self.0.consumer_ack_policy != AckPolicy::None {
next.ack()?;
}
Ok(ret)
}
pub fn consumer_info(&self) -> io::Result<ConsumerInfo> {
self.0
.context
.consumer_info(&self.0.stream, &self.0.consumer)
}
pub fn unsubscribe(self) -> io::Result<()> {
self.0
.context
.connection
.0
.client
.unsubscribe(self.0.sid.load(Ordering::Relaxed))?;
while self.0.messages.try_recv().is_ok() {}
if self.0.consumer_ownership == ConsumerOwnership::Yes {
self.0
.context
.delete_consumer(&self.0.stream, &self.0.consumer)
.ok();
}
Ok(())
}
pub fn close(self) -> io::Result<()> {
self.unsubscribe()
}
pub fn drain(&self) -> io::Result<()> {
self.0
.context
.connection
.0
.client
.flush(DEFAULT_FLUSH_TIMEOUT)?;
self.0
.context
.connection
.0
.client
.unsubscribe(self.0.sid.load(Ordering::Relaxed))?;
if self.0.consumer_ownership == ConsumerOwnership::Yes {
self.0
.context
.delete_consumer(&self.0.stream, &self.0.consumer)
.ok();
}
Ok(())
}
}
impl IntoIterator for PushSubscription {
type Item = Message;
type IntoIter = IntoIter;
fn into_iter(self) -> IntoIter {
IntoIter { subscription: self }
}
}
impl<'a> IntoIterator for &'a PushSubscription {
type Item = Message;
type IntoIter = Iter<'a>;
fn into_iter(self) -> Iter<'a> {
Iter { subscription: self }
}
}
pub struct Handler {
subscription: PushSubscription,
}
impl Handler {
pub fn unsubscribe(self) -> io::Result<()> {
self.subscription.unsubscribe()
}
}
pub struct TryIter<'a> {
subscription: &'a PushSubscription,
}
impl<'a> Iterator for TryIter<'a> {
type Item = Message;
fn next(&mut self) -> Option<Self::Item> {
self.subscription.try_next()
}
}
pub struct Iter<'a> {
subscription: &'a PushSubscription,
}
impl<'a> Iterator for Iter<'a> {
type Item = Message;
fn next(&mut self) -> Option<Self::Item> {
self.subscription.next()
}
}
pub struct IntoIter {
subscription: PushSubscription,
}
impl Iterator for IntoIter {
type Item = Message;
fn next(&mut self) -> Option<Self::Item> {
self.subscription.next()
}
}
pub struct TimeoutIter<'a> {
subscription: &'a PushSubscription,
to: Duration,
}
impl<'a> Iterator for TimeoutIter<'a> {
type Item = Message;
fn next(&mut self) -> Option<Self::Item> {
self.subscription.next_timeout(self.to).ok()
}
}