use bytes::Bytes;
use futures_util::{
future::{BoxFuture, Either},
FutureExt, StreamExt,
};
#[cfg(feature = "server_2_11")]
use time::{serde::rfc3339, OffsetDateTime};
#[cfg(feature = "server_2_10")]
use std::collections::HashMap;
use std::{future, pin::Pin, task::Poll, time::Duration};
use tokio::{task::JoinHandle, time::Sleep};
use serde::{Deserialize, Serialize};
use tracing::{debug, trace};
use crate::{
connection::State,
error::Error,
jetstream::{self, Context},
StatusCode, SubscribeError, Subscriber,
};
use crate::subject::Subject;
#[cfg(feature = "server_2_11")]
use super::PriorityPolicy;
use super::{
backoff, AckPolicy, Consumer, DeliverPolicy, FromConsumer, IntoConsumerConfig, ReplayPolicy,
StreamError, StreamErrorKind,
};
use jetstream::consumer;
impl Consumer<Config> {
pub async fn messages(&self) -> Result<Stream, StreamError> {
Stream::stream(
BatchConfig {
batch: 200,
expires: Some(Duration::from_secs(30)),
no_wait: false,
max_bytes: 0,
idle_heartbeat: Duration::from_secs(15),
min_pending: None,
min_ack_pending: None,
group: None,
#[cfg(feature = "server_2_12")]
priority: None,
},
self,
)
.await
}
pub fn stream(&self) -> StreamBuilder<'_> {
StreamBuilder::new(self)
}
pub async fn request_batch<I: Into<BatchConfig>>(
&self,
batch: I,
inbox: Subject,
) -> Result<(), BatchRequestError> {
debug!("sending batch");
let subject = format!(
"{}.CONSUMER.MSG.NEXT.{}.{}",
self.context.prefix, self.info.stream_name, self.info.name
);
let payload = serde_json::to_vec(&batch.into())
.map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Serialize, err))?;
self.context
.client
.publish_with_reply(subject, inbox, payload.into())
.await
.map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Publish, err))?;
debug!("batch request sent");
Ok(())
}
pub fn fetch(&self) -> FetchBuilder<'_> {
FetchBuilder::new(self)
}
pub fn batch(&self) -> BatchBuilder<'_> {
BatchBuilder::new(self)
}
pub fn sequence(&self, batch: usize) -> Result<Sequence, BatchError> {
let context = self.context.clone();
let subject = format!(
"{}.CONSUMER.MSG.NEXT.{}.{}",
self.context.prefix, self.info.stream_name, self.info.name
);
let request = serde_json::to_vec(&BatchConfig {
batch,
expires: Some(Duration::from_secs(60)),
..Default::default()
})
.map(Bytes::from)
.map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Serialize, err))?;
Ok(Sequence {
context,
subject,
request,
pending_messages: batch,
next: None,
})
}
}
pub struct Batch {
pending_messages: usize,
subscriber: Subscriber,
context: Context,
timeout: Option<Pin<Box<Sleep>>>,
terminated: bool,
}
impl Batch {
async fn batch(batch: BatchConfig, consumer: &Consumer<Config>) -> Result<Batch, BatchError> {
let inbox = Subject::from(consumer.context.client.new_inbox());
let subscription = consumer.context.client.subscribe(inbox.clone()).await?;
consumer.request_batch(batch.clone(), inbox.clone()).await?;
let sleep = batch.expires.map(|expires| {
Box::pin(tokio::time::sleep(
expires.saturating_add(Duration::from_secs(5)),
))
});
Ok(Batch {
pending_messages: batch.batch,
subscriber: subscription,
context: consumer.context.clone(),
terminated: false,
timeout: sleep,
})
}
}
impl futures_util::Stream for Batch {
type Item = Result<jetstream::Message, crate::Error>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
if self.terminated {
return Poll::Ready(None);
}
if self.pending_messages == 0 {
self.terminated = true;
return Poll::Ready(None);
}
if let Some(sleep) = self.timeout.as_mut() {
match sleep.poll_unpin(cx) {
Poll::Ready(_) => {
debug!("batch timeout timer triggered");
self.terminated = true;
return Poll::Ready(None);
}
Poll::Pending => (),
}
}
match self.subscriber.receiver.poll_recv(cx) {
Poll::Ready(maybe_message) => match maybe_message {
Some(message) => match message.status.unwrap_or(StatusCode::OK) {
StatusCode::TIMEOUT => {
debug!("received timeout. Iterator done");
self.terminated = true;
Poll::Ready(None)
}
StatusCode::IDLE_HEARTBEAT => {
debug!("received heartbeat");
Poll::Pending
}
StatusCode::NOT_FOUND => {
debug!("received `NO_MESSAGES`. Iterator done");
self.terminated = true;
Poll::Ready(None)
}
StatusCode::OK => {
debug!("received message");
self.pending_messages -= 1;
Poll::Ready(Some(Ok(jetstream::Message {
context: self.context.clone(),
message,
})))
}
status => {
debug!("received error");
self.terminated = true;
Poll::Ready(Some(Err(Box::new(std::io::Error::other(format!(
"error while processing messages from the stream: {}, {:?}",
status, message.description
))))))
}
},
None => Poll::Ready(None),
},
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
}
pub struct Sequence {
context: Context,
subject: String,
request: Bytes,
pending_messages: usize,
next: Option<BoxFuture<'static, Result<Batch, MessagesError>>>,
}
impl futures_util::Stream for Sequence {
type Item = Result<Batch, MessagesError>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
match self.next.as_mut() {
None => {
let context = self.context.clone();
let subject = self.subject.clone();
let request = self.request.clone();
let pending_messages = self.pending_messages;
let next = self.next.insert(Box::pin(async move {
let inbox = context.client.new_inbox();
let subscriber = context
.client
.subscribe(inbox.clone())
.await
.map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?;
context
.client
.publish_with_reply(subject, inbox, request)
.await
.map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?;
Ok(Batch {
pending_messages,
subscriber,
context,
terminated: false,
timeout: Some(Box::pin(tokio::time::sleep(Duration::from_secs(60)))),
})
}));
match next.as_mut().poll(cx) {
Poll::Ready(result) => {
self.next = None;
Poll::Ready(Some(result.map_err(|err| {
MessagesError::with_source(MessagesErrorKind::Pull, err)
})))
}
Poll::Pending => Poll::Pending,
}
}
Some(next) => match next.as_mut().poll(cx) {
Poll::Ready(result) => {
self.next = None;
Poll::Ready(Some(result.map_err(|err| {
MessagesError::with_source(MessagesErrorKind::Pull, err)
})))
}
Poll::Pending => Poll::Pending,
},
}
}
}
impl Consumer<OrderedConfig> {
pub async fn messages(self) -> Result<Ordered, StreamError> {
let config = Consumer {
config: self.config.clone().into(),
context: self.context.clone(),
info: self.info.clone(),
};
let stream = Stream::stream(
BatchConfig {
batch: 500,
expires: Some(Duration::from_secs(30)),
no_wait: false,
max_bytes: 0,
idle_heartbeat: Duration::from_secs(15),
min_pending: None,
min_ack_pending: None,
group: None,
#[cfg(feature = "server_2_12")]
priority: None,
},
&config,
)
.await?;
Ok(Ordered {
consumer_sequence: 0,
stream_sequence: 0,
missed_heartbeats: false,
create_stream: None,
context: self.context.clone(),
consumer_name: self
.config
.name
.clone()
.unwrap_or_else(|| self.context.client.new_inbox()),
consumer: self.config,
stream: Some(stream),
stream_name: self.info.stream_name.clone(),
})
}
}
#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct OrderedConfig {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
#[serde(default, skip_serializing_if = "is_default")]
pub filter_subject: String,
#[cfg(feature = "server_2_10")]
#[serde(default, skip_serializing_if = "is_default")]
pub filter_subjects: Vec<String>,
pub replay_policy: ReplayPolicy,
#[serde(rename = "rate_limit_bps", default, skip_serializing_if = "is_default")]
pub rate_limit: u64,
#[serde(
rename = "sample_freq",
with = "super::sample_freq_deser",
default,
skip_serializing_if = "is_default"
)]
pub sample_frequency: u8,
#[serde(default, skip_serializing_if = "is_default")]
pub headers_only: bool,
#[serde(flatten)]
pub deliver_policy: DeliverPolicy,
#[serde(default, skip_serializing_if = "is_default")]
pub max_waiting: i64,
#[cfg(feature = "server_2_10")]
#[serde(default, skip_serializing_if = "is_default")]
pub metadata: HashMap<String, String>,
pub max_batch: i64,
pub max_bytes: i64,
pub max_expires: Duration,
}
impl From<OrderedConfig> for Config {
fn from(config: OrderedConfig) -> Self {
Config {
durable_name: None,
name: config.name,
description: config.description,
deliver_policy: config.deliver_policy,
ack_policy: AckPolicy::None,
ack_wait: Duration::default(),
max_deliver: 1,
filter_subject: config.filter_subject,
#[cfg(feature = "server_2_10")]
filter_subjects: config.filter_subjects,
replay_policy: config.replay_policy,
rate_limit: config.rate_limit,
sample_frequency: config.sample_frequency,
max_waiting: config.max_waiting,
max_ack_pending: 0,
headers_only: config.headers_only,
max_batch: config.max_batch,
max_bytes: config.max_bytes,
max_expires: config.max_expires,
inactive_threshold: Duration::from_secs(30),
num_replicas: 1,
memory_storage: true,
#[cfg(feature = "server_2_10")]
metadata: config.metadata,
backoff: Vec::new(),
#[cfg(feature = "server_2_11")]
priority_policy: PriorityPolicy::None,
#[cfg(feature = "server_2_11")]
priority_groups: Vec::new(),
#[cfg(feature = "server_2_11")]
pause_until: None,
}
}
}
impl FromConsumer for OrderedConfig {
fn try_from_consumer_config(
config: crate::jetstream::consumer::Config,
) -> Result<Self, crate::Error>
where
Self: Sized,
{
Ok(OrderedConfig {
name: config.name,
description: config.description,
filter_subject: config.filter_subject,
#[cfg(feature = "server_2_10")]
filter_subjects: config.filter_subjects,
replay_policy: config.replay_policy,
rate_limit: config.rate_limit,
sample_frequency: config.sample_frequency,
headers_only: config.headers_only,
deliver_policy: config.deliver_policy,
max_waiting: config.max_waiting,
#[cfg(feature = "server_2_10")]
metadata: config.metadata,
max_batch: config.max_batch,
max_bytes: config.max_bytes,
max_expires: config.max_expires,
})
}
}
impl IntoConsumerConfig for OrderedConfig {
fn into_consumer_config(self) -> super::Config {
jetstream::consumer::Config {
deliver_subject: None,
durable_name: None,
name: self.name,
description: self.description,
deliver_group: None,
deliver_policy: self.deliver_policy,
ack_policy: AckPolicy::None,
ack_wait: Duration::default(),
max_deliver: 1,
filter_subject: self.filter_subject,
#[cfg(feature = "server_2_10")]
filter_subjects: self.filter_subjects,
replay_policy: self.replay_policy,
rate_limit: self.rate_limit,
sample_frequency: self.sample_frequency,
max_waiting: self.max_waiting,
max_ack_pending: 0,
headers_only: self.headers_only,
flow_control: false,
idle_heartbeat: Duration::default(),
max_batch: 0,
max_bytes: 0,
max_expires: Duration::default(),
inactive_threshold: Duration::from_secs(30),
num_replicas: 1,
memory_storage: true,
#[cfg(feature = "server_2_10")]
metadata: self.metadata,
backoff: Vec::new(),
#[cfg(feature = "server_2_11")]
priority_policy: PriorityPolicy::None,
#[cfg(feature = "server_2_11")]
priority_groups: Vec::new(),
#[cfg(feature = "server_2_11")]
pause_until: None,
}
}
}
pub struct Ordered {
context: Context,
stream_name: String,
consumer: OrderedConfig,
consumer_name: String,
stream: Option<Stream>,
create_stream: Option<BoxFuture<'static, Result<Stream, ConsumerRecreateError>>>,
consumer_sequence: u64,
stream_sequence: u64,
missed_heartbeats: bool,
}
impl futures_util::Stream for Ordered {
type Item = Result<jetstream::Message, OrderedError>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let mut recreate = false;
if let Some(stream) = self.stream.as_mut() {
match stream.poll_next_unpin(cx) {
Poll::Ready(message) => match message {
Some(message) => match message {
Ok(message) => {
self.missed_heartbeats = false;
let info = message.info().map_err(|err| {
OrderedError::with_source(OrderedErrorKind::Other, err)
})?;
trace!("consumer sequence: {:?}, stream sequence {:?}, consumer sequence in message: {:?} stream sequence in message: {:?}",
self.consumer_sequence,
self.stream_sequence,
info.consumer_sequence,
info.stream_sequence);
if info.consumer_sequence != self.consumer_sequence + 1 {
debug!(
"ordered consumer mismatch. current {}, info: {}",
self.consumer_sequence, info.consumer_sequence
);
recreate = true;
self.consumer_sequence = 0;
} else {
self.stream_sequence = info.stream_sequence;
self.consumer_sequence = info.consumer_sequence;
return Poll::Ready(Some(Ok(message)));
}
}
Err(err) => match err.kind() {
MessagesErrorKind::MissingHeartbeat => {
if self.missed_heartbeats {
self.consumer_sequence = 0;
recreate = true;
} else {
self.missed_heartbeats = true;
}
}
MessagesErrorKind::ConsumerDeleted
| MessagesErrorKind::NoResponders => {
recreate = true;
self.consumer_sequence = 0;
}
MessagesErrorKind::Pull
| MessagesErrorKind::PushBasedConsumer
| MessagesErrorKind::Other => {
return Poll::Ready(Some(Err(err.into())));
}
},
},
None => return Poll::Ready(None),
},
Poll::Pending => (),
}
}
if recreate {
self.stream = None;
self.create_stream = Some(Box::pin({
let context = self.context.clone();
let config = self.consumer.clone();
let stream_name = self.stream_name.clone();
let consumer_name = self.consumer_name.clone();
let sequence = self.stream_sequence;
async move {
tryhard::retry_fn(|| {
recreate_consumer_stream(
&context,
&config,
&stream_name,
&consumer_name,
sequence,
)
})
.retries(u32::MAX)
.custom_backoff(backoff)
.await
}
}))
}
if let Some(result) = self.create_stream.as_mut() {
match result.poll_unpin(cx) {
Poll::Ready(result) => match result {
Ok(stream) => {
self.create_stream = None;
self.stream = Some(stream);
return self.poll_next(cx);
}
Err(err) => {
return Poll::Ready(Some(Err(OrderedError::with_source(
OrderedErrorKind::Recreate,
err,
))))
}
},
Poll::Pending => (),
}
}
Poll::Pending
}
}
pub struct Stream {
pending_messages: usize,
pending_bytes: usize,
request_result_rx: tokio::sync::mpsc::Receiver<Result<bool, super::RequestError>>,
request_tx: tokio::sync::watch::Sender<()>,
subscriber: Subscriber,
batch_config: BatchConfig,
context: Context,
pending_request: bool,
task_handle: JoinHandle<()>,
terminated: bool,
heartbeat_timeout: Option<Pin<Box<tokio::time::Sleep>>>,
started: Option<tokio::sync::oneshot::Sender<()>>,
}
impl Drop for Stream {
fn drop(&mut self) {
self.task_handle.abort();
}
}
impl Stream {
async fn stream(
batch_config: BatchConfig,
consumer: &Consumer<Config>,
) -> Result<Stream, StreamError> {
let inbox = consumer.context.client.new_inbox();
let subscription = consumer
.context
.client
.subscribe(inbox.clone())
.await
.map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?;
let subject = format!(
"{}.CONSUMER.MSG.NEXT.{}.{}",
consumer.context.prefix, consumer.info.stream_name, consumer.info.name
);
let (request_result_tx, request_result_rx) = tokio::sync::mpsc::channel(1);
let (request_tx, mut request_rx) = tokio::sync::watch::channel(());
let (started_tx, started_rx) = tokio::sync::oneshot::channel();
let task_handle = tokio::task::spawn({
let batch = batch_config.clone();
let consumer = consumer.clone();
let mut context = consumer.context.clone();
let inbox = inbox.clone();
async move {
started_rx.await.ok();
loop {
let expires = batch_config
.expires
.map(|expires| {
if expires.is_zero() {
Either::Left(future::pending())
} else {
Either::Right(tokio::time::sleep(
expires.saturating_add(Duration::from_secs(5)),
))
}
})
.unwrap_or_else(|| Either::Left(future::pending()));
let prev_state = context.client.state.borrow().to_owned();
let mut pending_reset = false;
tokio::select! {
_ = context.client.state.changed() => {
let state = context.client.state.borrow().to_owned();
if !(state == crate::connection::State::Connected
&& prev_state != State::Connected) {
continue;
}
debug!("detected !Connected -> Connected state change");
match tryhard::retry_fn(|| consumer.get_info())
.retries(5).custom_backoff(backoff).await
.map_err(|err| crate::RequestError::with_source(crate::RequestErrorKind::Other, err).into()) {
Ok(info) => {
if info.num_waiting == 0 {
pending_reset = true;
}
}
Err(err) => {
if let Err(err) = request_result_tx.send(Err(err)).await {
debug!("failed to sent request result: {}", err);
}
},
}
},
_ = request_rx.changed() => debug!("task received request request"),
_ = expires => {
pending_reset = true;
debug!("expired pull request")},
}
let request = serde_json::to_vec(&batch).map(Bytes::from).unwrap();
let result = context
.client
.publish_with_reply(subject.clone(), inbox.clone(), request.clone())
.await
.map(|_| pending_reset);
request_result_tx
.send(result.map(|_| pending_reset).map_err(|err| {
crate::RequestError::with_source(crate::RequestErrorKind::Other, err)
.into()
}))
.await
.ok();
trace!("result send over tx");
}
}
});
Ok(Stream {
task_handle,
request_result_rx,
request_tx,
batch_config,
pending_messages: 0,
pending_bytes: 0,
subscriber: subscription,
context: consumer.context.clone(),
pending_request: false,
terminated: false,
heartbeat_timeout: None,
started: Some(started_tx),
})
}
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum OrderedErrorKind {
MissingHeartbeat,
ConsumerDeleted,
Pull,
PushBasedConsumer,
Recreate,
NoResponders,
Other,
}
impl std::fmt::Display for OrderedErrorKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
Self::ConsumerDeleted => write!(f, "consumer deleted"),
Self::Pull => write!(f, "pull request failed"),
Self::Other => write!(f, "error"),
Self::PushBasedConsumer => write!(f, "cannot use with push consumer"),
Self::Recreate => write!(f, "consumer recreation failed"),
Self::NoResponders => write!(f, "no responders"),
}
}
}
pub type OrderedError = Error<OrderedErrorKind>;
impl From<MessagesError> for OrderedError {
fn from(err: MessagesError) -> Self {
match err.kind() {
MessagesErrorKind::MissingHeartbeat => {
OrderedError::new(OrderedErrorKind::MissingHeartbeat)
}
MessagesErrorKind::ConsumerDeleted => {
OrderedError::new(OrderedErrorKind::ConsumerDeleted)
}
MessagesErrorKind::Pull => OrderedError {
kind: OrderedErrorKind::Pull,
source: err.source,
},
MessagesErrorKind::PushBasedConsumer => {
OrderedError::new(OrderedErrorKind::PushBasedConsumer)
}
MessagesErrorKind::Other => OrderedError {
kind: OrderedErrorKind::Other,
source: err.source,
},
MessagesErrorKind::NoResponders => OrderedError::new(OrderedErrorKind::NoResponders),
}
}
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum MessagesErrorKind {
MissingHeartbeat,
ConsumerDeleted,
Pull,
PushBasedConsumer,
NoResponders,
Other,
}
impl std::fmt::Display for MessagesErrorKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
Self::ConsumerDeleted => write!(f, "consumer deleted"),
Self::Pull => write!(f, "pull request failed"),
Self::Other => write!(f, "error"),
Self::NoResponders => write!(f, "no responders"),
Self::PushBasedConsumer => write!(f, "cannot use with push consumer"),
}
}
}
pub type MessagesError = Error<MessagesErrorKind>;
impl futures_util::Stream for Stream {
type Item = Result<jetstream::Message, MessagesError>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
if let Some(started) = self.started.take() {
trace!("stream started, sending started signal");
if started.send(()).is_err() {
debug!("failed to send started signal");
}
}
if self.terminated {
return Poll::Ready(None);
}
if !self.batch_config.idle_heartbeat.is_zero() {
trace!("checking idle hearbeats");
let timeout = self.batch_config.idle_heartbeat.saturating_mul(2);
match self
.heartbeat_timeout
.get_or_insert_with(|| Box::pin(tokio::time::sleep(timeout)))
.poll_unpin(cx)
{
Poll::Ready(_) => {
self.heartbeat_timeout = None;
return Poll::Ready(Some(Err(MessagesError::new(
MessagesErrorKind::MissingHeartbeat,
))));
}
Poll::Pending => (),
}
}
loop {
trace!("pending messages: {}", self.pending_messages);
if (self.pending_messages <= self.batch_config.batch / 2
|| (self.batch_config.max_bytes > 0
&& self.pending_bytes <= self.batch_config.max_bytes / 2))
&& !self.pending_request
{
debug!("pending messages reached threshold to send new fetch request");
self.request_tx.send(()).ok();
self.pending_request = true;
}
match self.request_result_rx.poll_recv(cx) {
Poll::Ready(resp) => match resp {
Some(resp) => match resp {
Ok(reset) => {
trace!("request response: {:?}", reset);
debug!("request sent, setting pending messages");
if reset {
self.pending_messages = self.batch_config.batch;
self.pending_bytes = self.batch_config.max_bytes;
} else {
self.pending_messages += self.batch_config.batch;
self.pending_bytes += self.batch_config.max_bytes;
}
self.pending_request = false;
continue;
}
Err(err) => {
return Poll::Ready(Some(Err(MessagesError::with_source(
MessagesErrorKind::Pull,
err,
))))
}
},
None => return Poll::Ready(None),
},
Poll::Pending => {
trace!("pending result");
}
}
trace!("polling subscriber");
match self.subscriber.receiver.poll_recv(cx) {
Poll::Ready(maybe_message) => {
self.heartbeat_timeout = None;
match maybe_message {
Some(message) => match message.status.unwrap_or(StatusCode::OK) {
StatusCode::TIMEOUT | StatusCode::REQUEST_TERMINATED => {
debug!("received status message: {:?}", message);
if message.description.as_deref() == Some("Consumer Deleted") {
self.terminated = true;
return Poll::Ready(Some(Err(MessagesError::new(
MessagesErrorKind::ConsumerDeleted,
))));
}
if message.description.as_deref() == Some("Consumer is push based")
{
self.terminated = true;
return Poll::Ready(Some(Err(MessagesError::new(
MessagesErrorKind::PushBasedConsumer,
))));
}
let pending_messages = message
.headers
.as_ref()
.and_then(|headers| headers.get("Nats-Pending-Messages"))
.map_or(Ok(self.batch_config.batch), |x| x.as_str().parse())
.map_err(|err| {
MessagesError::with_source(MessagesErrorKind::Other, err)
})?;
let pending_bytes = message
.headers
.as_ref()
.and_then(|headers| headers.get("Nats-Pending-Bytes"))
.map_or(Ok(self.batch_config.max_bytes), |x| x.as_str().parse())
.map_err(|err| {
MessagesError::with_source(MessagesErrorKind::Other, err)
})?;
debug!(
"timeout reached. remaining messages: {}, bytes {}",
pending_messages, pending_bytes
);
self.pending_messages =
self.pending_messages.saturating_sub(pending_messages);
trace!("message bytes len: {}", pending_bytes);
self.pending_bytes =
self.pending_bytes.saturating_sub(pending_bytes);
continue;
}
StatusCode::IDLE_HEARTBEAT => {
debug!("received idle heartbeat");
continue;
}
StatusCode::OK => {
trace!("message received");
self.pending_messages = self.pending_messages.saturating_sub(1);
self.pending_bytes =
self.pending_bytes.saturating_sub(message.length);
return Poll::Ready(Some(Ok(jetstream::Message {
context: self.context.clone(),
message,
})));
}
StatusCode::NO_RESPONDERS => {
debug!("received no responders");
return Poll::Ready(Some(Err(MessagesError::new(
MessagesErrorKind::NoResponders,
))));
}
status => {
debug!("received unknown message: {:?}", message);
return Poll::Ready(Some(Err(MessagesError::with_source(
MessagesErrorKind::Other,
format!(
"error while processing messages from the stream: {}, {:?}",
status, message.description
),
))));
}
},
None => return Poll::Ready(None),
}
}
Poll::Pending => {
debug!("subscriber still pending");
return std::task::Poll::Pending;
}
}
}
}
}
pub struct StreamBuilder<'a> {
batch: usize,
max_bytes: usize,
heartbeat: Duration,
expires: Duration,
group: Option<String>,
min_pending: Option<usize>,
min_ack_pending: Option<usize>,
#[cfg(feature = "server_2_12")]
priority: Option<usize>,
consumer: &'a Consumer<Config>,
}
impl<'a> StreamBuilder<'a> {
pub fn new(consumer: &'a Consumer<Config>) -> Self {
StreamBuilder {
consumer,
batch: 200,
max_bytes: 0,
expires: Duration::from_secs(30),
heartbeat: Duration::default(),
group: None,
min_pending: None,
min_ack_pending: None,
#[cfg(feature = "server_2_12")]
priority: None,
}
}
pub fn max_bytes_per_batch(mut self, max_bytes: usize) -> Self {
self.max_bytes = max_bytes;
self
}
pub fn max_messages_per_batch(mut self, batch: usize) -> Self {
self.batch = batch;
self
}
pub fn heartbeat(mut self, heartbeat: Duration) -> Self {
self.heartbeat = heartbeat;
self
}
pub fn expires(mut self, expires: Duration) -> Self {
self.expires = expires;
self
}
pub fn min_pending(mut self, min_pending: usize) -> Self {
self.min_pending = Some(min_pending);
self
}
#[cfg(feature = "server_2_12")]
pub fn priority(mut self, priority: usize) -> Self {
self.priority = Some(priority);
self
}
pub fn min_ack_pending(mut self, min_ack_pending: usize) -> Self {
self.min_ack_pending = Some(min_ack_pending);
self
}
pub fn group<T: Into<String>>(mut self, group: T) -> Self {
self.group = Some(group.into());
self
}
pub async fn messages(self) -> Result<Stream, StreamError> {
Stream::stream(
BatchConfig {
batch: self.batch,
expires: Some(self.expires),
no_wait: false,
max_bytes: self.max_bytes,
idle_heartbeat: self.heartbeat,
min_pending: self.min_pending,
group: self.group,
min_ack_pending: self.min_ack_pending,
#[cfg(feature = "server_2_12")]
priority: self.priority,
},
self.consumer,
)
.await
}
}
pub struct FetchBuilder<'a> {
batch: usize,
max_bytes: usize,
heartbeat: Duration,
expires: Option<Duration>,
min_pending: Option<usize>,
min_ack_pending: Option<usize>,
group: Option<String>,
consumer: &'a Consumer<Config>,
}
impl<'a> FetchBuilder<'a> {
pub fn new(consumer: &'a Consumer<Config>) -> Self {
FetchBuilder {
consumer,
batch: 200,
max_bytes: 0,
expires: None,
min_pending: None,
min_ack_pending: None,
group: None,
heartbeat: Duration::default(),
}
}
pub fn max_bytes(mut self, max_bytes: usize) -> Self {
self.max_bytes = max_bytes;
self
}
pub fn max_messages(mut self, batch: usize) -> Self {
self.batch = batch;
self
}
pub fn heartbeat(mut self, heartbeat: Duration) -> Self {
self.heartbeat = heartbeat;
self
}
pub fn expires(mut self, expires: Duration) -> Self {
self.expires = Some(expires);
self
}
pub fn min_pending(mut self, min_pending: usize) -> Self {
self.min_pending = Some(min_pending);
self
}
#[cfg(feature = "server_2_12")]
pub fn priority(mut self, priority: usize) -> Self {
self.batch = priority;
self
}
pub fn min_ack_pending(mut self, min_ack_pending: usize) -> Self {
self.min_ack_pending = Some(min_ack_pending);
self
}
pub fn group<T: Into<String>>(mut self, group: T) -> Self {
self.group = Some(group.into());
self
}
pub async fn messages(self) -> Result<Batch, BatchError> {
Batch::batch(
BatchConfig {
batch: self.batch,
expires: self.expires,
no_wait: true,
max_bytes: self.max_bytes,
idle_heartbeat: self.heartbeat,
min_pending: self.min_pending,
min_ack_pending: self.min_ack_pending,
group: self.group,
#[cfg(feature = "server_2_12")]
priority: None,
},
self.consumer,
)
.await
}
}
pub struct BatchBuilder<'a> {
batch: usize,
max_bytes: usize,
heartbeat: Duration,
expires: Duration,
min_pending: Option<usize>,
min_ack_pending: Option<usize>,
group: Option<String>,
consumer: &'a Consumer<Config>,
}
impl<'a> BatchBuilder<'a> {
pub fn new(consumer: &'a Consumer<Config>) -> Self {
BatchBuilder {
consumer,
batch: 200,
max_bytes: 0,
expires: Duration::ZERO,
heartbeat: Duration::default(),
min_pending: None,
min_ack_pending: None,
group: None,
}
}
pub fn max_bytes(mut self, max_bytes: usize) -> Self {
self.max_bytes = max_bytes;
self
}
pub fn max_messages(mut self, batch: usize) -> Self {
self.batch = batch;
self
}
pub fn heartbeat(mut self, heartbeat: Duration) -> Self {
self.heartbeat = heartbeat;
self
}
pub fn min_pending(mut self, min_pending: usize) -> Self {
self.min_pending = Some(min_pending);
self
}
pub fn min_ack_pending(mut self, min_ack_pending: usize) -> Self {
self.min_ack_pending = Some(min_ack_pending);
self
}
pub fn group<T: Into<String>>(mut self, group: T) -> Self {
self.group = Some(group.into());
self
}
pub fn expires(mut self, expires: Duration) -> Self {
self.expires = expires;
self
}
pub async fn messages(self) -> Result<Batch, BatchError> {
let config = BatchConfig {
batch: self.batch,
expires: Some(self.expires),
no_wait: false,
max_bytes: self.max_bytes,
idle_heartbeat: self.heartbeat,
min_pending: self.min_pending,
min_ack_pending: self.min_ack_pending,
group: self.group,
#[cfg(feature = "server_2_12")]
priority: None,
};
Batch::batch(config, self.consumer).await
}
}
#[derive(Debug, Default, Serialize, Clone, PartialEq, Eq)]
pub struct BatchConfig {
pub batch: usize,
#[serde(skip_serializing_if = "Option::is_none", with = "serde_nanos")]
pub expires: Option<Duration>,
#[serde(skip_serializing_if = "is_default")]
pub no_wait: bool,
pub max_bytes: usize,
#[serde(with = "serde_nanos", skip_serializing_if = "is_default")]
pub idle_heartbeat: Duration,
pub min_pending: Option<usize>,
pub min_ack_pending: Option<usize>,
pub group: Option<String>,
#[cfg(feature = "server_2_12")]
pub priority: Option<usize>,
}
fn is_default<T: Default + Eq>(t: &T) -> bool {
t == &T::default()
}
#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct Config {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub durable_name: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
#[serde(flatten)]
pub deliver_policy: DeliverPolicy,
pub ack_policy: AckPolicy,
#[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
pub ack_wait: Duration,
#[serde(default, skip_serializing_if = "is_default")]
pub max_deliver: i64,
#[serde(default, skip_serializing_if = "is_default")]
pub filter_subject: String,
#[cfg(feature = "server_2_10")]
#[serde(default, skip_serializing_if = "is_default")]
pub filter_subjects: Vec<String>,
pub replay_policy: ReplayPolicy,
#[serde(rename = "rate_limit_bps", default, skip_serializing_if = "is_default")]
pub rate_limit: u64,
#[serde(
rename = "sample_freq",
with = "super::sample_freq_deser",
default,
skip_serializing_if = "is_default"
)]
pub sample_frequency: u8,
#[serde(default, skip_serializing_if = "is_default")]
pub max_waiting: i64,
#[serde(default, skip_serializing_if = "is_default")]
pub max_ack_pending: i64,
#[serde(default, skip_serializing_if = "is_default")]
pub headers_only: bool,
#[serde(default, skip_serializing_if = "is_default")]
pub max_batch: i64,
#[serde(default, skip_serializing_if = "is_default")]
pub max_bytes: i64,
#[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
pub max_expires: Duration,
#[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
pub inactive_threshold: Duration,
#[serde(default, skip_serializing_if = "is_default")]
pub num_replicas: usize,
#[serde(default, skip_serializing_if = "is_default")]
pub memory_storage: bool,
#[cfg(feature = "server_2_10")]
#[serde(default, skip_serializing_if = "is_default")]
pub metadata: HashMap<String, String>,
#[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
pub backoff: Vec<Duration>,
#[cfg(feature = "server_2_11")]
#[serde(default, skip_serializing_if = "is_default")]
pub priority_policy: PriorityPolicy,
#[cfg(feature = "server_2_11")]
#[serde(default, skip_serializing_if = "is_default")]
pub priority_groups: Vec<String>,
#[cfg(feature = "server_2_11")]
#[serde(
default,
with = "rfc3339::option",
skip_serializing_if = "Option::is_none"
)]
pub pause_until: Option<OffsetDateTime>,
}
impl IntoConsumerConfig for &Config {
fn into_consumer_config(self) -> consumer::Config {
self.clone().into_consumer_config()
}
}
impl IntoConsumerConfig for Config {
fn into_consumer_config(self) -> consumer::Config {
jetstream::consumer::Config {
deliver_subject: None,
name: self.name,
durable_name: self.durable_name,
description: self.description,
deliver_group: None,
deliver_policy: self.deliver_policy,
ack_policy: self.ack_policy,
ack_wait: self.ack_wait,
max_deliver: self.max_deliver,
filter_subject: self.filter_subject,
#[cfg(feature = "server_2_10")]
filter_subjects: self.filter_subjects,
replay_policy: self.replay_policy,
rate_limit: self.rate_limit,
sample_frequency: self.sample_frequency,
max_waiting: self.max_waiting,
max_ack_pending: self.max_ack_pending,
headers_only: self.headers_only,
flow_control: false,
idle_heartbeat: Duration::default(),
max_batch: self.max_batch,
max_bytes: self.max_bytes,
max_expires: self.max_expires,
inactive_threshold: self.inactive_threshold,
num_replicas: self.num_replicas,
memory_storage: self.memory_storage,
#[cfg(feature = "server_2_10")]
metadata: self.metadata,
backoff: self.backoff,
#[cfg(feature = "server_2_11")]
priority_policy: self.priority_policy,
#[cfg(feature = "server_2_11")]
priority_groups: self.priority_groups,
#[cfg(feature = "server_2_11")]
pause_until: self.pause_until,
}
}
}
impl FromConsumer for Config {
fn try_from_consumer_config(config: consumer::Config) -> Result<Self, crate::Error> {
if config.deliver_subject.is_some() {
return Err(Box::new(std::io::Error::other(
"pull consumer cannot have delivery subject",
)));
}
Ok(Config {
durable_name: config.durable_name,
name: config.name,
description: config.description,
deliver_policy: config.deliver_policy,
ack_policy: config.ack_policy,
ack_wait: config.ack_wait,
max_deliver: config.max_deliver,
filter_subject: config.filter_subject,
#[cfg(feature = "server_2_10")]
filter_subjects: config.filter_subjects,
replay_policy: config.replay_policy,
rate_limit: config.rate_limit,
sample_frequency: config.sample_frequency,
max_waiting: config.max_waiting,
max_ack_pending: config.max_ack_pending,
headers_only: config.headers_only,
max_batch: config.max_batch,
max_bytes: config.max_bytes,
max_expires: config.max_expires,
inactive_threshold: config.inactive_threshold,
num_replicas: config.num_replicas,
memory_storage: config.memory_storage,
#[cfg(feature = "server_2_10")]
metadata: config.metadata,
backoff: config.backoff,
#[cfg(feature = "server_2_11")]
priority_policy: config.priority_policy,
#[cfg(feature = "server_2_11")]
priority_groups: config.priority_groups,
#[cfg(feature = "server_2_11")]
pause_until: config.pause_until,
})
}
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum BatchRequestErrorKind {
Publish,
Flush,
Serialize,
}
impl std::fmt::Display for BatchRequestErrorKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Publish => write!(f, "publish failed"),
Self::Flush => write!(f, "flush failed"),
Self::Serialize => write!(f, "serialize failed"),
}
}
}
pub type BatchRequestError = Error<BatchRequestErrorKind>;
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum BatchErrorKind {
Subscribe,
Pull,
Flush,
Serialize,
}
impl std::fmt::Display for BatchErrorKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Pull => write!(f, "pull request failed"),
Self::Flush => write!(f, "flush failed"),
Self::Serialize => write!(f, "serialize failed"),
Self::Subscribe => write!(f, "subscribe failed"),
}
}
}
pub type BatchError = Error<BatchErrorKind>;
impl From<SubscribeError> for BatchError {
fn from(err: SubscribeError) -> Self {
BatchError::with_source(BatchErrorKind::Subscribe, err)
}
}
impl From<BatchRequestError> for BatchError {
fn from(err: BatchRequestError) -> Self {
BatchError::with_source(BatchErrorKind::Pull, err)
}
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum ConsumerRecreateErrorKind {
GetStream,
Recreate,
TimedOut,
}
impl std::fmt::Display for ConsumerRecreateErrorKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::GetStream => write!(f, "error getting stream"),
Self::Recreate => write!(f, "consumer creation failed"),
Self::TimedOut => write!(f, "timed out"),
}
}
}
pub type ConsumerRecreateError = Error<ConsumerRecreateErrorKind>;
async fn recreate_consumer_stream(
context: &Context,
config: &OrderedConfig,
stream_name: &str,
consumer_name: &str,
sequence: u64,
) -> Result<Stream, ConsumerRecreateError> {
let span = tracing::span!(
tracing::Level::DEBUG,
"recreate_ordered_consumer",
stream_name = stream_name,
consumer_name = consumer_name,
sequence = sequence
);
let _span_handle = span.enter();
let config = config.to_owned();
trace!("delete old consumer before creating new one");
tokio::time::timeout(
Duration::from_secs(5),
context.delete_consumer_from_stream(consumer_name, stream_name),
)
.await
.ok();
let deliver_policy = {
if sequence == 0 {
DeliverPolicy::All
} else {
DeliverPolicy::ByStartSequence {
start_sequence: sequence + 1,
}
}
};
trace!("create the new ordered consumer for sequence {}", sequence);
let consumer = tokio::time::timeout(
Duration::from_secs(5),
context.create_consumer_on_stream(
jetstream::consumer::pull::OrderedConfig {
deliver_policy,
..config.clone()
},
stream_name,
),
)
.await
.map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::TimedOut, err))?
.map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err))?;
let config = Consumer {
config: config.clone().into(),
context: context.clone(),
info: consumer.info,
};
trace!("create iterator");
let stream = tokio::time::timeout(
Duration::from_secs(5),
Stream::stream(
BatchConfig {
batch: 500,
expires: Some(Duration::from_secs(30)),
no_wait: false,
max_bytes: 0,
idle_heartbeat: Duration::from_secs(15),
min_pending: None,
min_ack_pending: None,
group: None,
#[cfg(feature = "server_2_12")]
priority: None,
},
&config,
),
)
.await
.map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::TimedOut, err))?
.map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err));
trace!("recreated consumer");
stream
}