use crate::{BoxFuture, Stream};
#[cfg(not(feature = "otel"))]
use log::{debug, error};
use pin_project::pin_project;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde_repr::{Deserialize_repr, Serialize_repr};
use std::{
collections::{HashSet, VecDeque},
convert::TryFrom,
error, fmt,
fmt::Debug,
io::{self, ErrorKind},
pin::Pin,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::Duration,
};
use tokio::sync::Mutex;
#[cfg(feature = "otel")]
use tracing::{debug, error};
const ORDERED_IDLE_HEARTBEAT: Duration = Duration::from_nanos(5_000_000_000);
mod push_subscription;
mod types;
pub use push_subscription::PushSubscription;
pub use types::*;
use crate::{
header::{self, HeaderMap},
Connection, Message,
};
#[derive(Clone)]
pub struct JetStreamOptions {
pub(crate) api_prefix: String,
}
impl Debug for JetStreamOptions {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_map()
.entry(&"api_prefix", &self.api_prefix)
.finish()
}
}
impl Default for JetStreamOptions {
fn default() -> JetStreamOptions {
JetStreamOptions {
api_prefix: "$JS.API.".to_string(),
}
}
}
impl JetStreamOptions {
pub fn new() -> JetStreamOptions {
JetStreamOptions::default()
}
#[must_use]
pub fn api_prefix(mut self, mut api_prefix: String) -> Self {
if !api_prefix.ends_with('.') {
api_prefix.push('.');
}
self.api_prefix = api_prefix;
self
}
#[must_use]
pub fn domain(self, domain: &str) -> Self {
if domain.is_empty() {
self.api_prefix("".to_string())
} else {
self.api_prefix(format!("$JS.{}.API", domain))
}
}
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(untagged)]
enum ApiResponse<T> {
Err { error: Error },
Ok(T),
}
#[derive(Debug, PartialEq, Eq, Serialize_repr, Deserialize_repr, Clone, Copy)]
#[repr(u64)]
pub enum ErrorCode {
ClusterPeerNotMember = 10040,
ConsumerEphemeralWithDurableInSubject = 10019,
StreamExternalDelPrefixOverlaps = 10022,
AccountResourcesExceeded = 10002,
ClusterNotAvail = 10008,
StreamSubjectOverlap = 10065,
StreamWrongLastSequence = 10071,
TemplateNameNotMatchSubject = 10073,
ClusterNoPeers = 10005,
ConsumerEphemeralWithDurableName = 10020,
InsufficientResources = 10023,
MirrorMaxMessageSizeTooBig = 10030,
StreamTemplateDelete = 10067,
BadRequest = 10003,
ClusterUnSupportFeature = 10036,
ConsumerNotFound = 10014,
SourceMaxMessageSizeTooBig = 10046,
StreamAssignment = 10048,
StreamMessageExceedsMaximum = 10054,
StreamTemplateCreate = 10066,
InvalidJSON = 10025,
StreamInvalidExternalDeliverySubject = 10024,
StreamRestore = 10062,
ClusterIncomplete = 10004,
NoAccount = 10035,
RaftGeneral = 10041,
RestoreSubscribeFailed = 10042,
StreamDelete = 10050,
StreamExternalApiOverlap = 10021,
MirrorWithSubjects = 10034,
NotEnabled = 10076,
NotEnabledForAccount = 10039,
SequenceNotFound = 10043,
StreamMirrorNotUpdatable = 10055,
StreamSequenceNotMatch = 10063,
StreamWrongLastMsgId = 10070,
TempStorageFailed = 10072,
StorageResourcesExceeded = 10047,
StreamMismatch = 10056,
StreamNotMatch = 10060,
MirrorConsumerSetupFailed = 10029,
NotEmptyRequest = 10038,
StreamNameExist = 10058,
ClusterTags = 10011,
MaximumConsumersLimit = 10026,
SourceConsumerSetupFailed = 10045,
ConsumerCreate = 10012,
ConsumerDurableNameNotInSubject = 10016,
StreamLimits = 10053,
StreamReplicasNotUpdatable = 10061,
StreamTemplateNotFound = 10068,
ClusterNotAssigned = 10007,
ClusterNotLeader = 10009,
ConsumerNameExist = 10013,
MirrorWithSources = 10031,
StreamNotFound = 10059,
ClusterRequired = 10010,
ConsumerDurableNameNotSet = 10018,
MaximumStreamsLimit = 10027,
MirrorWithStartSeqAndTime = 10032,
StreamSnapshot = 10064,
StreamUpdate = 10069,
ClusterNotActive = 10006,
ConsumerDurableNameNotMatchSubject = 10017,
MemoryResourcesExceeded = 10028,
MirrorWithSubjectFilters = 10033,
StreamCreate = 10049,
ClusterServerNotMember = 10044,
NoMessageFound = 10037,
SnapshotDeliverSubjectInvalid = 10015,
StreamGeneralErrorF = 10051,
StreamInvalidConfigF = 10052,
StreamReplicasNotSupported = 10074,
StreamMsgDeleteFailedF = 10057,
PeerRemap = 10075,
StreamStoreFailedF = 10077,
ConsumerConfigRequired = 10078,
ConsumerDeliverToWildcards = 10079,
ConsumerPushMaxWaiting = 10080,
ConsumerDeliverCycle = 10081,
ConsumerMaxPendingAckPolicyRequired = 10082,
ConsumerSmallHeartbeat = 10083,
ConsumerPullRequiresAck = 10084,
ConsumerPullNotDurable = 10085,
ConsumerPullWithRateLimit = 10086,
ConsumerMaxWaitingNegative = 10087,
ConsumerHBRequiresPush = 10088,
ConsumerFCRequiresPush = 10089,
ConsumerDirectRequiresPush = 10090,
ConsumerDirectRequiresEphemeral = 10091,
ConsumerOnMapped = 10092,
ConsumerFilterNotSubset = 10093,
ConsumerInvalidPolicy = 10094,
ConsumerInvalidSampling = 10095,
StreamInvalid = 10096,
ConsumerWQRequiresExplicitAck = 10098,
ConsumerWQMultipleUnfiltered = 10099,
ConsumerWQConsumerNotUnique = 10100,
ConsumerWQConsumerNotDeliverAll = 10101,
ConsumerNameTooLong = 10102,
ConsumerBadDurableName = 10103,
ConsumerStoreFailed = 10104,
ConsumerExistingActive = 10105,
ConsumerReplacementWithDifferentName = 10106,
ConsumerDescriptionTooLong = 10107,
StreamHeaderExceedsMaximum = 10097,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Error {
code: usize,
err_code: ErrorCode,
description: Option<String>,
}
impl Error {
pub fn code(&self) -> usize {
self.code
}
pub fn error_code(&self) -> ErrorCode {
self.err_code
}
}
impl fmt::Display for Error {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
fmt,
"{} (code {}, error code {})",
self.code,
self.description.as_ref().unwrap_or(&"unknown".to_string()),
self.err_code as u64,
)
}
}
impl error::Error for Error {}
#[derive(Debug, Default, Serialize, Deserialize, Clone)]
struct PagedRequest {
offset: i64,
}
#[derive(Debug, Default, Serialize, Deserialize, Clone)]
struct PagedResponse<T> {
pub r#type: String,
#[serde(alias = "streams", alias = "consumers")]
pub items: Option<VecDeque<T>>,
pub total: usize,
pub offset: usize,
pub limit: usize,
}
#[derive(Debug)]
#[pin_project]
pub struct PagedIterator<'a, T> {
#[pin]
manager: &'a JetStream,
subject: String,
offset: i64,
items: VecDeque<T>,
done: bool,
}
impl<'x, 'y, 'z: 'x, T: 'x> PagedIterator<'z, T>
where
T: DeserializeOwned + Debug,
{
fn to_stream(mut self: Pin<Box<Self>>) -> impl Stream<Item = io::Result<T>> + 'x {
async_stream::try_stream! {
if !self.done {
if !self.items.is_empty() {
yield self.items.pop_front().unwrap();
}
let req = serde_json::ser::to_vec(&PagedRequest {
offset: self.offset,
})
.unwrap();
let mut page: PagedResponse<T> =
match self.manager.js_request(&self.subject, &req).await {
Ok(page) => page,
Err(e) => {
(*self).done = true;
Err(e)?
}
};
if page.items.is_none() {
(*self).done = true;
} else {
let items = page.items.take().unwrap();
(*self).offset += i64::try_from(items.len()).unwrap();
(*self).items = items;
if self.items.is_empty() {
(*self).done = true;
} else {
yield self.items.pop_front().unwrap()
}
}
}
}
}
}
#[derive(Debug, Clone)]
struct SubscriptionPreprocessor {
sequence_pair: Arc<Mutex<SequencePair>>,
context: JetStream,
consumer_config: ConsumerConfig,
stream_name: String,
shared_sid: Arc<AtomicU64>,
}
impl crate::client::Preprocessor for SubscriptionPreprocessor {
fn process<'proc>(&'proc self, sid: u64, message: &'proc Message) -> BoxFuture<'proc, bool> {
Box::pin(async move {
if message.is_flow_control() {
return false;
}
if message.is_idle_heartbeat() {
let maybe_consumer_stalled = message
.headers
.as_ref()
.and_then(|headers| {
headers
.get(header::NATS_CONSUMER_STALLED)
.map(|set| set.iter().next().cloned())
})
.flatten();
if let Some(consumer_stalled) = maybe_consumer_stalled {
debug!("publish on stalled idle heartbeat");
if let Some(Err(e)) = self
.context
.connection
.try_publish_with_reply_or_headers(&consumer_stalled, None, None, b"")
.await
{
error!("try_publish errored: {}", e);
}
}
let maybe_consumer_seq = message
.headers
.as_ref()
.and_then(|headers| {
headers
.get(header::NATS_LAST_CONSUMER)
.map(|set| set.iter().next().cloned())
})
.flatten();
if let Some(consumer_seq) = maybe_consumer_seq {
let consumer_seq = consumer_seq.parse::<u64>().unwrap();
let sequence_info = self.sequence_pair.lock().await;
if consumer_seq != sequence_info.consumer_seq {
return self
.context
.handle_sequence_mismatch(
self.consumer_config.clone(),
self.sequence_pair.clone(),
self.stream_name.clone(),
self.shared_sid.clone(),
sid,
sequence_info.stream_seq + 1,
)
.await;
}
}
return false;
}
if let Some(message_info) = message.jetstream_message_info() {
let mut sequence_info = self.sequence_pair.lock().await;
if message_info.consumer_seq != sequence_info.consumer_seq + 1 {
return self
.context
.handle_sequence_mismatch(
self.consumer_config.clone(),
self.sequence_pair.clone(),
self.stream_name.clone(),
self.shared_sid.clone(),
sid,
sequence_info.stream_seq + 1,
)
.await;
}
sequence_info.stream_seq = message_info.stream_seq;
sequence_info.consumer_seq = message_info.consumer_seq;
}
false
})
}
}
#[derive(Clone, Debug)]
pub struct JetStream {
pub(crate) connection: Connection,
pub(crate) options: JetStreamOptions,
}
impl JetStream {
pub fn new(connection: Connection, options: JetStreamOptions) -> Self {
Self {
connection,
options,
}
}
pub async fn publish(&self, subject: &str, data: impl AsRef<[u8]>) -> io::Result<PublishAck> {
self.publish_with_options_or_headers(subject, None, None, data)
.await
}
pub async fn publish_with_options(
&self,
subject: &str,
data: impl AsRef<[u8]>,
options: &PublishOptions,
) -> io::Result<PublishAck> {
self.publish_with_options_or_headers(subject, Some(options), None, data)
.await
}
pub async fn publish_message(&self, message: &Message) -> io::Result<PublishAck> {
self.publish_with_options_or_headers(
&message.subject,
None,
message.headers.as_ref(),
&message.data,
)
.await
}
pub async fn publish_message_with_options(
&self,
message: &Message,
options: &PublishOptions,
) -> io::Result<PublishAck> {
self.publish_with_options_or_headers(
&message.subject,
Some(options),
message.headers.as_ref(),
&message.data,
)
.await
}
pub(crate) async fn publish_with_options_or_headers(
&self,
subject: &str,
maybe_options: Option<&PublishOptions>,
maybe_headers: Option<&HeaderMap>,
msg: impl AsRef<[u8]>,
) -> io::Result<PublishAck> {
let maybe_headers = if let Some(options) = maybe_options {
let mut headers = maybe_headers.map_or_else(HeaderMap::default, HeaderMap::clone);
if let Some(v) = options.id.as_ref() {
let entry = headers
.inner
.entry(header::NATS_MSG_ID.to_string())
.or_insert_with(HashSet::default);
entry.insert(v.to_string());
}
if let Some(v) = options.expected_last_msg_id.as_ref() {
let entry = headers
.inner
.entry(header::NATS_EXPECTED_LAST_MSG_ID.to_string())
.or_insert_with(HashSet::default);
entry.insert(v.to_string());
}
if let Some(v) = options.expected_stream.as_ref() {
let entry = headers
.inner
.entry(header::NATS_EXPECTED_STREAM.to_string())
.or_insert_with(HashSet::default);
entry.insert(v.to_string());
}
if let Some(v) = options.expected_last_sequence.as_ref() {
let entry = headers
.inner
.entry(header::NATS_EXPECTED_LAST_SEQUENCE.to_string())
.or_insert_with(HashSet::default);
entry.insert(v.to_string());
}
if let Some(v) = options.expected_last_subject_sequence.as_ref() {
let entry = headers
.inner
.entry(header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE.to_string())
.or_insert_with(HashSet::default);
entry.insert(v.to_string());
}
Some(headers)
} else {
maybe_headers.cloned()
};
let maybe_timeout = maybe_options.and_then(|options| options.timeout);
let res_msg = self
.connection
.request_with_headers_or_timeout(subject, maybe_headers.as_ref(), maybe_timeout, msg)
.await?;
let res: ApiResponse<PublishAck> = serde_json::de::from_slice(&res_msg.data)?;
match res {
ApiResponse::Ok(pub_ack) => Ok(pub_ack),
ApiResponse::Err { error, .. } => {
log::error!(
"failed to parse API response: {:?}",
std::str::from_utf8(&res_msg.data)
);
Err(io::Error::new(ErrorKind::Other, error))
}
}
}
pub async fn subscribe(&self, subject: &str) -> io::Result<PushSubscription> {
self.do_push_subscribe(subject, None, None).await
}
pub async fn subscribe_with_options(
&self,
subject: &str,
options: &SubscribeOptions,
) -> io::Result<PushSubscription> {
self.do_push_subscribe(subject, None, Some(options)).await
}
pub async fn queue_subscribe(
&self,
subject: &str,
queue: &str,
) -> io::Result<PushSubscription> {
self.do_push_subscribe(subject, Some(queue.to_string()), None)
.await
}
pub async fn queue_subscribe_with_options(
&self,
subject: &str,
queue: &str,
options: &SubscribeOptions,
) -> io::Result<PushSubscription> {
self.do_push_subscribe(subject, Some(queue.to_string()), Some(options))
.await
}
async fn handle_sequence_mismatch(
&self,
consumer_config: ConsumerConfig,
sequence_pair: Arc<Mutex<SequencePair>>,
stream_name: String,
shared_sid: Arc<AtomicU64>,
sid: u64,
start_seq: u64,
) -> bool {
if !self.connection.0.client.mute(sid).await.unwrap() {
return true;
}
let context = self.clone();
tokio::spawn(async move {
let new_deliver_subject = context.connection.new_inbox();
let result = context
.connection
.0
.client
.resubscribe(sid, &new_deliver_subject)
.await;
if let Ok(new_sid) = result {
shared_sid.store(new_sid, Ordering::Relaxed);
let mut consumer_config = consumer_config.clone();
consumer_config.deliver_subject = Some(new_deliver_subject);
consumer_config.deliver_policy = DeliverPolicy::ByStartSeq;
consumer_config.opt_start_seq = Some(start_seq);
context
.add_consumer(stream_name, consumer_config)
.await
.ok();
}
let mut sequence_info = sequence_pair.lock().await;
sequence_info.consumer_seq = 0;
});
true
}
async fn do_push_subscribe(
&self,
subject: &str,
maybe_queue: Option<String>,
maybe_options: Option<&SubscribeOptions>,
) -> io::Result<PushSubscription> {
if subject.is_empty()
&& maybe_options
.map(|options| options.stream_name.as_ref())
.is_none()
{
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Subject required",
));
}
let wants_idle_heartbeat =
maybe_options.map_or(false, |options| options.idle_heartbeat.is_some());
let wants_flow_control =
maybe_options.map_or(false, |options| options.flow_control.is_some());
if maybe_queue.is_some() {
if wants_idle_heartbeat {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"queue subscription doesn't support idle heartbeat",
));
}
if wants_flow_control {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"queue subscription doesn't support flow control",
));
}
};
if let Some(options) = maybe_options.filter(|options| options.ordered) {
if maybe_queue.is_some() {
return Err(io::Error::new(
io::ErrorKind::Other,
"queues not be set for an ordered consumer",
));
}
if options.durable_name.is_some() {
return Err(io::Error::new(
io::ErrorKind::Other,
"durable name can not be set for an ordered consumer",
));
}
if options.consumer_name.is_some() {
return Err(io::Error::new(
io::ErrorKind::Other,
"can not bind existing consumer for an ordered consumer",
));
}
if options.ack_policy.is_some() {
return Err(io::Error::new(
io::ErrorKind::Other,
"ack policy can not be set for an ordered consumer",
));
}
if options.max_deliver.is_some() {
return Err(io::Error::new(
io::ErrorKind::Other,
"max deliver can not be set for an ordered consumer",
));
}
if options.deliver_subject.is_some() {
return Err(io::Error::new(
io::ErrorKind::Other,
"deliver subject can not be set for an ordered consumer",
));
}
}
let process_consumer_info = |info: ConsumerInfo| {
if !info.config.filter_subject.is_empty() && subject != info.config.filter_subject {
return Err(io::Error::new(
io::ErrorKind::Other,
"subject does not match consumer",
));
}
if let Some(deliver_group) = info.config.deliver_group.as_ref() {
if let Some(queue) = &maybe_queue {
if deliver_group != queue {
return Err(io::Error::new(
io::ErrorKind::Other,
format!(
"cannot create a queue subscription {} for a consumer with a deliver group {}",
queue, deliver_group
),
));
}
} else {
return Err(io::Error::new(
io::ErrorKind::Other,
format!(
"cannot create a subscription for a consumer with a deliver group {}",
deliver_group
),
));
}
} else {
if maybe_queue.is_some() {
return Err(io::Error::new(
io::ErrorKind::Other,
"cannot create a queue subscription for a consumer without a deliver group",
));
}
if maybe_queue.is_some() && info.push_bound {
return Err(io::Error::new(
io::ErrorKind::Other,
"consumer is already bound to a subscription",
));
}
}
if let Some(options) = maybe_options {
if options.durable_name.is_some()
&& options.durable_name != info.config.durable_name
{
return Err(io::Error::new(
io::ErrorKind::Other,
format!("configuration requests durable name to be {:?}, but consumer's value is {:?}", options.durable_name, info.config.durable_name
)));
}
if options.description.is_some() && options.description != info.config.description {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("configuration requests description to be {:?}, but consumer's value is {:?}", options.description, info.config.description
)));
}
if options.deliver_policy.is_some()
&& options.deliver_policy.unwrap() != info.config.deliver_policy
{
return Err(io::Error::new(
io::ErrorKind::Other,
format!("configuration requests deliver policy to be {:?}, but consumer's value is {:?}", options.deliver_policy, info.config.deliver_policy
)));
}
if options.opt_start_seq.is_some()
&& options.opt_start_seq != info.config.opt_start_seq
{
return Err(io::Error::new(
io::ErrorKind::Other,
format!("configuration requests optional start sequence to be {:?}, but consumer's value is {:?}", options.opt_start_seq, info.config.opt_start_seq
)));
}
if options.opt_start_time.is_some()
&& options.opt_start_time != info.config.opt_start_time
{
return Err(io::Error::new(
io::ErrorKind::Other,
format!("configuration requests optional start time to be {:?}, but consumer's value is {:?}", options.opt_start_time, info.config.opt_start_time
)));
}
if options.ack_policy.is_some()
&& options.ack_policy.unwrap() != info.config.ack_policy
{
return Err(io::Error::new(
io::ErrorKind::Other,
format!("configuration requests ack policy to be {:?}, but consumer's value is {:?}", options.ack_policy, info.config.ack_policy
)));
}
if options.ack_wait.is_some() && options.ack_wait.unwrap() != info.config.ack_wait {
return Err(io::Error::new(
io::ErrorKind::Other,
format!(
"configuration requests ack wait to be {:?}, but consumer's value is {:?}",
options.ack_wait, info.config.ack_wait
),
));
}
if options.max_deliver.is_some()
&& options.max_deliver.unwrap() != info.config.max_deliver
{
return Err(io::Error::new(
io::ErrorKind::Other,
format!("configuration requests max deliver to be {:?}, but consumer's value is {:?}", options.max_deliver, info.config.max_deliver
)));
}
if options.replay_policy.is_some()
&& options.replay_policy.unwrap() != info.config.replay_policy
{
return Err(io::Error::new(
io::ErrorKind::Other,
format!("configuration requests replay policy to be {:?}, but consumer's value is {:?}", options.replay_policy, info.config.replay_policy
)));
}
if options.rate_limit.is_some()
&& options.rate_limit.unwrap() != info.config.rate_limit
{
return Err(io::Error::new(
io::ErrorKind::Other,
format!("configuration requests rate limit to be {:?}, but consumer's value is {:?}", options.rate_limit, info.config.rate_limit
)));
}
if options.sample_frequency.is_some()
&& options.sample_frequency.unwrap() != info.config.sample_frequency
{
return Err(io::Error::new(
io::ErrorKind::Other,
format!("configuration requests sample frequency to be {:?}, but consumer's value is {:?}", options.sample_frequency, info.config.sample_frequency
)));
}
if options.max_waiting.is_some()
&& options.max_waiting.unwrap() != info.config.max_waiting
{
return Err(io::Error::new(
io::ErrorKind::Other,
format!("configuration requests max waiting to be {:?}, but consumer's value is {:?}", options.max_waiting, info.config.max_waiting
)));
}
if options.max_ack_pending.is_some()
&& options.max_ack_pending.unwrap() != info.config.max_ack_pending
{
return Err(io::Error::new(
io::ErrorKind::Other,
format!("configuration requests max ack pending to be {:?}, but consumer's value is {:?}", options.max_ack_pending, info.config.max_ack_pending
)));
}
if options.flow_control.is_some() && !info.config.flow_control {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("configuration requests flow control to be {:?}, but consumer's value is {:?}", options.flow_control, info.config.flow_control
)));
}
if options.idle_heartbeat.is_some()
&& options.idle_heartbeat.unwrap() != info.config.idle_heartbeat
{
return Err(io::Error::new(
io::ErrorKind::Other,
format!(
"configuration requests heartbeat to be {:?}, but consumer's value is {:?}",
options.idle_heartbeat, info.config.idle_heartbeat
),
));
}
}
Ok(info)
};
let stream_name = match maybe_options.and_then(|options| options.stream_name.to_owned()) {
Some(sub) => Ok(sub),
None => self.stream_name_by_subject(subject).await,
}?;
#[allow(clippy::or_fun_call)]
let maybe_durable_name = maybe_options
.and_then(|options| options.durable_name.as_deref())
.or(maybe_queue.as_deref());
let maybe_consumer_name = maybe_options
.and_then(|options| options.consumer_name.as_deref())
.or(maybe_durable_name);
let bind_only = maybe_options.map_or(false, |options| options.bind_only);
let maybe_consumer_info = if let Some(consumer_name) = maybe_consumer_name {
let consumer_info_result = self
.consumer_info(&stream_name, &consumer_name)
.await
.and_then(process_consumer_info);
match consumer_info_result {
Ok(info) => Some(info),
Err(err) => {
if err.kind() != io::ErrorKind::Other {
return Err(err);
}
if bind_only {
if let Some(inner) = err.into_inner() {
if let Ok(err) = inner.downcast::<Error>() {
if err.error_code() == ErrorCode::ConsumerNotFound {
return Err(io::Error::new(io::ErrorKind::Other, err));
}
}
}
}
None
}
}
} else {
None
};
let is_ordered = maybe_options.map_or(false, |options| options.ordered);
let consumer_config = {
let mut config = if let Some(options) = maybe_options {
ConsumerConfig {
ack_policy: options.ack_policy.unwrap_or_default(),
ack_wait: options.ack_wait.unwrap_or_default(),
deliver_policy: options.deliver_policy.unwrap_or_default(),
deliver_subject: options.deliver_subject.clone(),
description: options.description.clone(),
durable_name: options.durable_name.clone(),
flow_control: options.flow_control.unwrap_or_default(),
headers_only: options.headers_only.unwrap_or_default(),
idle_heartbeat: options.idle_heartbeat.unwrap_or_default(),
max_ack_pending: options.max_ack_pending.unwrap_or_default(),
max_deliver: options.max_deliver.unwrap_or_default(),
max_waiting: options.max_waiting.unwrap_or_default(),
opt_start_seq: options.opt_start_seq,
opt_start_time: options.opt_start_time,
rate_limit: options.rate_limit.unwrap_or_default(),
replay_policy: options.replay_policy.unwrap_or_default(),
sample_frequency: options.sample_frequency.unwrap_or_default(),
..Default::default()
}
} else {
ConsumerConfig::default()
};
config.filter_subject = subject.to_string();
if let Some(queue) = &maybe_queue {
if config.durable_name.is_none() {
config.durable_name = Some(queue.to_owned());
}
config.deliver_group = Some(queue.to_owned());
}
if config.deliver_subject.is_none() {
config.deliver_subject = Some(self.connection.new_inbox());
}
if is_ordered {
config.flow_control = true;
config.ack_policy = AckPolicy::None;
config.max_deliver = 1;
config.ack_wait = Duration::from_nanos(1_000_000);
if config.idle_heartbeat.is_zero() {
config.idle_heartbeat = ORDERED_IDLE_HEARTBEAT;
}
}
config
};
let shared_sid = Arc::new(AtomicU64::new(0));
let sequence_pair = Arc::new(Mutex::new(SequencePair {
consumer_seq: 0,
stream_seq: 0,
}));
let deliver_subject = maybe_consumer_info
.as_ref()
.and_then(|consumer_info| consumer_info.config.deliver_subject.clone())
.or_else(|| consumer_config.deliver_subject.clone())
.ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
"must use pull subscribe to bind to pull based consumer",
)
})?;
let preprocessor = SubscriptionPreprocessor {
sequence_pair: sequence_pair.clone(),
context: self.clone(),
consumer_config: consumer_config.clone(),
stream_name: stream_name.clone(),
shared_sid: shared_sid.clone(),
};
let (mut sid, mut receiver) = self
.connection
.0
.client
.subscribe_with_preprocessor(
deliver_subject,
maybe_queue.clone(),
Box::pin(preprocessor.clone()),
)
.await?;
let (consumer_info, consumer_ownership) = match maybe_consumer_info {
Some(consumer_info) => (consumer_info, ConsumerOwnership::No),
None => match self.add_consumer(&stream_name, &consumer_config).await {
Ok(consumer_info) => (consumer_info, ConsumerOwnership::Yes),
Err(err) => {
self.connection.0.client.unsubscribe(sid).await?;
if err.kind() != io::ErrorKind::Other {
return Err(err);
}
if let Some(inner) = err.into_inner() {
if let Ok(err) = inner.downcast::<Error>() {
if err.error_code() != ErrorCode::ConsumerNameExist {
return Err(io::Error::new(io::ErrorKind::Other, err));
}
}
}
let consumer_name = maybe_consumer_name
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "stream not found"))?;
let consumer_info = self
.consumer_info(&stream_name, &consumer_name)
.await
.and_then(process_consumer_info)?;
let deliver_subject = consumer_info
.config
.deliver_subject
.as_ref()
.ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
"must use pull subscribe to bind to pull based consumer",
)
})?;
let (new_sid, new_receiver) = self
.connection
.0
.client
.subscribe_with_preprocessor(
deliver_subject.to_string(),
maybe_queue,
Box::pin(preprocessor),
)
.await?;
sid = new_sid;
receiver = new_receiver;
(consumer_info, ConsumerOwnership::No)
}
},
};
shared_sid.store(sid, Ordering::Relaxed);
Ok(PushSubscription::new(
shared_sid,
consumer_info,
consumer_ownership,
receiver,
self.clone(),
))
}
pub async fn add_stream<S>(&self, stream_config: S) -> io::Result<StreamInfo>
where
StreamConfig: From<S>,
{
let config: StreamConfig = stream_config.into();
if config.name.is_empty() {
return Err(io::Error::new(
ErrorKind::InvalidInput,
"the stream name must not be empty",
));
}
let subject: String = format!("{}STREAM.CREATE.{}", self.api_prefix(), config.name);
let req = serde_json::ser::to_vec(&config)?;
self.js_request(&subject, &req).await
}
pub async fn update_stream(&self, config: &StreamConfig) -> io::Result<StreamInfo> {
if config.name.is_empty() {
return Err(io::Error::new(
ErrorKind::InvalidInput,
"the stream name must not be empty",
));
}
let subject: String = format!("{}STREAM.UPDATE.{}", self.api_prefix(), config.name);
let req = serde_json::ser::to_vec(&config)?;
self.js_request(&subject, &req).await
}
pub fn stream_names(&self) -> impl Stream<Item = io::Result<String>> + '_ {
Box::pin(PagedIterator {
subject: format!("{}STREAM.NAMES", self.api_prefix()),
manager: self,
offset: 0,
items: Default::default(),
done: false,
})
.to_stream()
}
async fn stream_name_by_subject(&self, subject: &str) -> io::Result<String> {
let req = serde_json::ser::to_vec(&StreamNamesRequest {
subject: subject.to_string(),
})?;
let request_subject = format!("{}STREAM.NAMES", self.api_prefix());
self.js_request::<StreamNamesResponse>(&request_subject, &req)
.await
.map(|res| res.streams.first().unwrap().to_string())
}
pub fn list_streams(&self) -> impl Stream<Item = io::Result<StreamInfo>> + '_ {
Box::pin(PagedIterator {
subject: format!("{}STREAM.LIST", self.api_prefix()),
manager: self,
offset: 0,
items: Default::default(),
done: false,
})
.to_stream()
}
pub fn list_consumers<S>(
&self,
stream: S,
) -> io::Result<impl Stream<Item = io::Result<ConsumerInfo>> + '_>
where
S: AsRef<str>,
{
let stream: &str = stream.as_ref();
if stream.is_empty() {
return Err(io::Error::new(
ErrorKind::InvalidInput,
"the stream name must not be empty",
));
}
let subject: String = format!("{}CONSUMER.LIST.{}", self.api_prefix(), stream);
Ok(Box::pin(PagedIterator {
subject,
manager: self,
offset: 0,
items: Default::default(),
done: false,
})
.to_stream())
}
pub async fn stream_info<S: AsRef<str>>(&self, stream: S) -> io::Result<StreamInfo> {
let stream: &str = stream.as_ref();
if stream.is_empty() {
return Err(io::Error::new(
ErrorKind::InvalidInput,
"the stream name must not be empty",
));
}
let subject: String = format!("{}STREAM.INFO.{}", self.api_prefix(), stream);
self.js_request(&subject, b"").await
}
pub async fn purge_stream<S: AsRef<str>>(&self, stream: S) -> io::Result<PurgeResponse> {
let stream: &str = stream.as_ref();
if stream.is_empty() {
return Err(io::Error::new(
ErrorKind::InvalidInput,
"the stream name must not be empty",
));
}
let subject = format!("{}STREAM.PURGE.{}", self.api_prefix(), stream);
self.js_request(&subject, b"").await
}
pub async fn purge_stream_subject<S: AsRef<str>>(
&self,
stream: S,
filter_subject: &str,
) -> io::Result<PurgeResponse> {
let stream: &str = stream.as_ref();
if stream.is_empty() {
return Err(io::Error::new(
ErrorKind::InvalidInput,
"the stream name must not be empty",
));
}
let subject = format!("{}STREAM.PURGE.{}", self.api_prefix(), stream);
let request = serde_json::to_vec(&PurgeRequest {
filter: Some(filter_subject.to_string()),
..Default::default()
})?;
self.js_request(&subject, &request).await
}
pub async fn get_message<S: AsRef<str>>(
&self,
stream: S,
seq: u64,
) -> io::Result<StreamMessage> {
let stream: &str = stream.as_ref();
if stream.is_empty() {
return Err(io::Error::new(
ErrorKind::InvalidInput,
"the stream name must not be empty",
));
}
let subject = format!("{}STREAM.MSG.GET.{}", self.api_prefix(), stream);
let request = serde_json::ser::to_vec(&StreamMessageGetRequest {
seq: Some(seq),
last_by_subject: None,
})?;
let raw_message = self
.js_request::<StreamMessageGetResponse>(&subject, &request)
.await
.map(|response| response.message)?;
let message = StreamMessage::try_from(raw_message)?;
Ok(message)
}
pub async fn get_last_message<S: AsRef<str>>(
&self,
stream_name: S,
stream_subject: &str,
) -> io::Result<StreamMessage> {
let stream_name: &str = stream_name.as_ref();
if stream_name.is_empty() {
return Err(io::Error::new(
ErrorKind::InvalidInput,
"the stream name must not be empty",
));
}
let subject = format!("{}STREAM.MSG.GET.{}", self.api_prefix(), stream_name);
let request = serde_json::ser::to_vec(&StreamMessageGetRequest {
seq: None,
last_by_subject: Some(stream_subject.to_string()),
})?;
let raw_message = self
.js_request::<StreamMessageGetResponse>(&subject, &request)
.await
.map(|response| response.message)?;
let message = StreamMessage::try_from(raw_message)?;
Ok(message)
}
pub async fn delete_message<S: AsRef<str>>(
&self,
stream: S,
sequence_number: u64,
) -> io::Result<bool> {
let stream: &str = stream.as_ref();
if stream.is_empty() {
return Err(io::Error::new(
ErrorKind::InvalidInput,
"the stream name must not be empty",
));
}
let req = serde_json::ser::to_vec(&DeleteRequest {
seq: sequence_number,
})
.unwrap();
let subject = format!("{}STREAM.MSG.DELETE.{}", self.api_prefix(), stream);
self.js_request::<DeleteResponse>(&subject, &req)
.await
.map(|dr| dr.success)
}
pub async fn delete_stream<S: AsRef<str>>(&self, stream: S) -> io::Result<bool> {
let stream: &str = stream.as_ref();
if stream.is_empty() {
return Err(io::Error::new(
ErrorKind::InvalidInput,
"the stream name must not be empty",
));
}
let subject = format!("{}STREAM.DELETE.{}", self.api_prefix(), stream);
self.js_request::<DeleteResponse>(&subject, b"")
.await
.map(|dr| dr.success)
}
pub async fn add_consumer<S, C>(&self, stream: S, config: C) -> io::Result<ConsumerInfo>
where
S: AsRef<str>,
ConsumerConfig: From<C>,
{
let config = ConsumerConfig::from(config);
let stream = stream.as_ref();
if stream.is_empty() {
return Err(io::Error::new(
ErrorKind::InvalidInput,
"the stream name must not be empty",
));
}
let subject = if let Some(ref durable_name) = config.durable_name {
format!(
"{}CONSUMER.DURABLE.CREATE.{}.{}",
self.api_prefix(),
stream,
durable_name
)
} else {
format!("{}CONSUMER.CREATE.{}", self.api_prefix(), stream)
};
let req = CreateConsumerRequest {
stream_name: stream.into(),
config,
};
let ser_req = serde_json::ser::to_vec(&req)?;
self.js_request(&subject, &ser_req).await
}
pub async fn delete_consumer<S, C>(&self, stream: S, consumer: C) -> io::Result<bool>
where
S: AsRef<str>,
C: AsRef<str>,
{
let stream = stream.as_ref();
if stream.is_empty() {
return Err(io::Error::new(
ErrorKind::InvalidInput,
"the stream name must not be empty",
));
}
let consumer = consumer.as_ref();
if consumer.is_empty() {
return Err(io::Error::new(
ErrorKind::InvalidInput,
"the consumer name must not be empty",
));
}
let subject = format!(
"{}CONSUMER.DELETE.{}.{}",
self.api_prefix(),
stream,
consumer
);
self.js_request::<DeleteResponse>(&subject, b"")
.await
.map(|dr| dr.success)
}
pub async fn consumer_info<S, C>(&self, stream: S, consumer: C) -> io::Result<ConsumerInfo>
where
S: AsRef<str>,
C: AsRef<str>,
{
let stream: &str = stream.as_ref();
if stream.is_empty() {
return Err(io::Error::new(
ErrorKind::InvalidInput,
"the stream name must not be empty",
));
}
let consumer: &str = consumer.as_ref();
let subject: String = format!("{}CONSUMER.INFO.{}.{}", self.api_prefix(), stream, consumer);
self.js_request(&subject, b"").await
}
pub async fn account_info(&self) -> io::Result<AccountInfo> {
self.js_request(&format!("{}INFO", self.api_prefix()), b"")
.await
}
async fn js_request<Res>(&self, subject: &str, req: &[u8]) -> io::Result<Res>
where
Res: DeserializeOwned,
{
let res_msg = self.connection.request(subject, req).await?;
assert!(
!res_msg.data.is_empty(),
"js_request received empty response"
);
let res: ApiResponse<Res> = serde_json::de::from_slice(&res_msg.data)?;
match res {
ApiResponse::Ok(stream_info) => Ok(stream_info),
ApiResponse::Err { error, .. } => {
log::error!(
"failed to parse API response: {:?}",
std::str::from_utf8(&res_msg.data)
);
Err(io::Error::new(io::ErrorKind::Other, error))
}
}
}
fn api_prefix(&self) -> &str {
&self.options.api_prefix
}
}
pub fn new(nc: Connection) -> JetStream {
JetStream::new(nc, JetStreamOptions::default())
}