use std::io;
use std::sync::Arc;
use std::time::{Duration, Instant};
use crate::jetstream::{ConsumerInfo, ConsumerOwnership, JetStream};
use crate::Message;
use super::{AckPolicy, BatchOptions};
use crossbeam_channel as channel;
#[derive(Debug)]
pub(crate) struct Inner {
pid: u64,
pub(crate) messages: channel::Receiver<Message>,
pub(crate) inbox: String,
pub(crate) consumer_ack_policy: AckPolicy,
pub(crate) info: ConsumerInfo,
pub(crate) consumer_ownership: ConsumerOwnership,
pub(crate) context: JetStream,
}
impl Drop for Inner {
fn drop(&mut self) {
self.context.connection.0.client.unsubscribe(self.pid).ok();
if self.consumer_ownership == ConsumerOwnership::Yes {
self.context
.delete_consumer(&self.info.stream_name, &self.info.name)
.ok();
}
}
}
#[derive(Clone, Debug)]
pub struct PullSubscription(pub(crate) Arc<Inner>);
impl PullSubscription {
pub(crate) fn new(
pid: u64,
consumer_info: ConsumerInfo,
consumer_ownership: ConsumerOwnership,
inbox: String,
messages: channel::Receiver<Message>,
context: JetStream,
) -> PullSubscription {
PullSubscription(Arc::new(Inner {
pid,
inbox,
messages,
consumer_ownership,
consumer_ack_policy: consumer_info.config.ack_policy,
info: consumer_info,
context,
}))
}
pub fn fetch<I: Into<BatchOptions>>(&self, batch: I) -> io::Result<BatchIter<'_>> {
let batch_options = batch.into();
self.request_batch(batch_options)?;
Ok(BatchIter {
batch_size: batch_options.batch,
processed: 0,
subscription: self,
})
}
pub fn timeout_fetch<I: Into<BatchOptions>>(
&self,
batch: I,
timeout: Duration,
) -> io::Result<TimeoutBatchIter<'_>> {
let batch_options = batch.into();
self.request_batch(batch_options)?;
Ok(TimeoutBatchIter {
timeout,
batch_size: batch_options.batch,
processed: 0,
subscription: self,
})
}
pub fn fetch_with_handler<F, I>(&self, batch: I, mut handler: F) -> io::Result<()>
where
F: FnMut(&Message) -> io::Result<()>,
I: Into<BatchOptions> + Copy,
{
let mut last_message;
let consumer_ack_policy = self.0.consumer_ack_policy;
let batch = self.fetch(batch)?;
for message in batch {
handler(&message)?;
if consumer_ack_policy != AckPolicy::None {
message.ack()?
}
last_message = Some(message);
if consumer_ack_policy == AckPolicy::All {
if let Some(last_message) = last_message {
last_message.ack()?;
}
}
}
Ok(())
}
pub fn next(&self) -> Option<Message> {
self.preprocess(self.0.messages.recv().ok())
}
pub fn try_next(&self) -> Option<Message> {
self.preprocess(self.0.messages.try_recv().ok())
}
pub fn next_timeout(&self, mut timeout: Duration) -> io::Result<Message> {
loop {
let start = Instant::now();
return match self.0.messages.recv_timeout(timeout) {
Ok(message) => {
if message.is_no_messages() {
timeout = timeout.saturating_sub(start.elapsed());
continue;
}
if message.is_request_timeout() {
return Err(io::Error::new(
io::ErrorKind::Other,
"next_timeout: Pull Request timed out",
));
}
Ok(message)
}
Err(channel::RecvTimeoutError::Timeout) => Err(io::Error::new(
io::ErrorKind::TimedOut,
"next_timeout: timed out",
)),
Err(channel::RecvTimeoutError::Disconnected) => Err(io::Error::new(
io::ErrorKind::Other,
"next_timeout: unsubscribed",
)),
};
}
}
pub fn request_batch<I: Into<BatchOptions>>(&self, batch: I) -> io::Result<()> {
let batch_opts = batch.into();
let subject = format!(
"{}CONSUMER.MSG.NEXT.{}.{}",
self.0.context.api_prefix(),
self.0.info.stream_name,
self.0.info.name,
);
let request = serde_json::to_vec(&batch_opts)?;
self.0.context.connection.publish_with_reply_or_headers(
&subject,
Some(self.0.inbox.as_str()),
None,
request,
)?;
Ok(())
}
pub fn iter(&self) -> Iter<'_> {
Iter { subscription: self }
}
fn preprocess(&self, message: Option<Message>) -> Option<Message> {
if let Some(message) = message {
if message.is_no_messages() {
return None;
}
if message.is_request_timeout() {
return None;
}
return Some(message);
}
message
}
}
pub struct Iter<'a> {
subscription: &'a PullSubscription,
}
impl<'a> Iterator for Iter<'a> {
type Item = Message;
fn next(&mut self) -> Option<Self::Item> {
self.subscription.next()
}
}
pub struct TimeoutIter<'a> {
subscription: &'a PullSubscription,
timeout: Duration,
}
impl<'a> Iterator for TimeoutIter<'a> {
type Item = io::Result<Message>;
fn next(&mut self) -> Option<Self::Item> {
Some(self.subscription.next_timeout(self.timeout))
}
}
pub struct BatchIter<'a> {
batch_size: usize,
processed: usize,
subscription: &'a PullSubscription,
}
impl<'a> Iterator for BatchIter<'a> {
type Item = Message;
fn next(&mut self) -> Option<Self::Item> {
if self.processed >= self.batch_size {
None
} else {
self.processed += 1;
self.subscription.next()
}
}
}
pub struct TryBatchIter<'a> {
batch_size: usize,
processed: usize,
subscription: &'a PullSubscription,
}
impl<'a> Iterator for TryBatchIter<'a> {
type Item = Message;
fn next(&mut self) -> Option<Self::Item> {
if self.processed == 0 {
self.processed += 1;
return self.subscription.next();
}
if self.processed >= self.batch_size {
None
} else {
self.processed += 1;
self.subscription.try_next()
}
}
}
pub struct TimeoutBatchIter<'a> {
batch_size: usize,
processed: usize,
timeout: Duration,
subscription: &'a PullSubscription,
}
impl<'a> Iterator for TimeoutBatchIter<'a> {
type Item = io::Result<Message>;
fn next(&mut self) -> Option<Self::Item> {
if self.processed >= self.batch_size {
None
} else {
self.processed += 1;
Some(self.subscription.next_timeout(self.timeout))
}
}
}
impl From<usize> for BatchOptions {
fn from(batch: usize) -> Self {
BatchOptions {
batch,
expires: None,
no_wait: false,
}
}
}