use std::{
collections::VecDeque,
convert::TryFrom,
error, fmt,
fmt::Debug,
io::{self, ErrorKind},
time::Duration,
};
use parking_lot::Mutex;
use portable_atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::thread;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde_repr::{Deserialize_repr, Serialize_repr};
const ORDERED_IDLE_HEARTBEAT: Duration = Duration::from_nanos(5_000_000_000);
pub mod pull_subscription;
pub mod push_subscription;
mod types;
pub use crate::jetstream::pull_subscription::PullSubscription;
pub use crate::jetstream::push_subscription::PushSubscription;
pub use types::*;
#[deprecated(note = "Use PullSubscribeOptions instead")]
#[doc(hidden)]
pub type PullSubscibeOptions = PullSubscribeOptions;
use crate::{
header::{self, HeaderMap},
Connection, Message,
};
#[derive(Clone)]
pub struct JetStreamOptions {
pub(crate) api_prefix: String,
pub(crate) has_domain: bool,
}
impl Debug for JetStreamOptions {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_map()
.entry(&"api_prefix", &self.api_prefix)
.finish()
}
}
impl Default for JetStreamOptions {
fn default() -> JetStreamOptions {
JetStreamOptions {
api_prefix: "$JS.API.".to_string(),
has_domain: false,
}
}
}
impl JetStreamOptions {
pub fn new() -> JetStreamOptions {
JetStreamOptions::default()
}
pub fn api_prefix(mut self, mut api_prefix: String) -> Self {
if !api_prefix.ends_with('.') {
api_prefix.push('.');
}
self.api_prefix = api_prefix;
self
}
pub fn domain(mut self, domain: &str) -> Self {
if domain.is_empty() {
self.api_prefix("".to_string())
} else {
self.has_domain = true;
self.api_prefix(format!("$JS.{domain}.API"))
}
}
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(untagged)]
enum ApiResponse<T> {
Err { error: Error },
Ok(T),
}
#[derive(Debug, PartialEq, Eq, Serialize_repr, Deserialize_repr, Clone, Copy)]
#[repr(u64)]
pub enum ErrorCode {
ClusterPeerNotMember = 10040,
ConsumerEphemeralWithDurableInSubject = 10019,
StreamExternalDelPrefixOverlaps = 10022,
AccountResourcesExceeded = 10002,
ClusterNotAvail = 10008,
StreamSubjectOverlap = 10065,
StreamWrongLastSequence = 10071,
TemplateNameNotMatchSubject = 10073,
ClusterNoPeers = 10005,
ConsumerEphemeralWithDurableName = 10020,
InsufficientResources = 10023,
MirrorMaxMessageSizeTooBig = 10030,
StreamTemplateDelete = 10067,
BadRequest = 10003,
ClusterUnSupportFeature = 10036,
ConsumerNotFound = 10014,
SourceMaxMessageSizeTooBig = 10046,
StreamAssignment = 10048,
StreamMessageExceedsMaximum = 10054,
StreamTemplateCreate = 10066,
InvalidJSON = 10025,
StreamInvalidExternalDeliverySubject = 10024,
StreamRestore = 10062,
ClusterIncomplete = 10004,
NoAccount = 10035,
RaftGeneral = 10041,
RestoreSubscribeFailed = 10042,
StreamDelete = 10050,
StreamExternalApiOverlap = 10021,
MirrorWithSubjects = 10034,
NotEnabled = 10076,
NotEnabledForAccount = 10039,
SequenceNotFound = 10043,
StreamMirrorNotUpdatable = 10055,
StreamSequenceNotMatch = 10063,
StreamWrongLastMsgId = 10070,
TempStorageFailed = 10072,
StorageResourcesExceeded = 10047,
StreamMismatch = 10056,
StreamNotMatch = 10060,
MirrorConsumerSetupFailed = 10029,
NotEmptyRequest = 10038,
StreamNameExist = 10058,
ClusterTags = 10011,
MaximumConsumersLimit = 10026,
SourceConsumerSetupFailed = 10045,
ConsumerCreate = 10012,
ConsumerDurableNameNotInSubject = 10016,
StreamLimits = 10053,
StreamReplicasNotUpdatable = 10061,
StreamTemplateNotFound = 10068,
ClusterNotAssigned = 10007,
ClusterNotLeader = 10009,
ConsumerNameExist = 10013,
MirrorWithSources = 10031,
StreamNotFound = 10059,
ClusterRequired = 10010,
ConsumerDurableNameNotSet = 10018,
MaximumStreamsLimit = 10027,
MirrorWithStartSeqAndTime = 10032,
StreamSnapshot = 10064,
StreamUpdate = 10069,
ClusterNotActive = 10006,
ConsumerDurableNameNotMatchSubject = 10017,
MemoryResourcesExceeded = 10028,
MirrorWithSubjectFilters = 10033,
StreamCreate = 10049,
ClusterServerNotMember = 10044,
NoMessageFound = 10037,
SnapshotDeliverSubjectInvalid = 10015,
StreamGeneralErrorF = 10051,
StreamInvalidConfigF = 10052,
StreamReplicasNotSupported = 10074,
StreamMsgDeleteFailedF = 10057,
PeerRemap = 10075,
StreamStoreFailedF = 10077,
ConsumerConfigRequired = 10078,
ConsumerDeliverToWildcards = 10079,
ConsumerPushMaxWaiting = 10080,
ConsumerDeliverCycle = 10081,
ConsumerMaxPendingAckPolicyRequired = 10082,
JSConsumerMaxRequestBatchNegative = 10114,
JSConsumerMaxRequestExpiresToSmall = 10115,
ConsumerSmallHeartbeat = 10083,
ConsumerPullRequiresAck = 10084,
ConsumerPullNotDurable = 10085,
ConsumerPullWithRateLimit = 10086,
ConsumerMaxWaitingNegative = 10087,
ConsumerHBRequiresPush = 10088,
ConsumerFCRequiresPush = 10089,
ConsumerDirectRequiresPush = 10090,
ConsumerDirectRequiresEphemeral = 10091,
ConsumerOnMapped = 10092,
ConsumerFilterNotSubset = 10093,
ConsumerInvalidPolicy = 10094,
ConsumerInvalidSampling = 10095,
StreamInvalid = 10096,
ConsumerWQRequiresExplicitAck = 10098,
ConsumerWQMultipleUnfiltered = 10099,
ConsumerWQConsumerNotUnique = 10100,
ConsumerWQConsumerNotDeliverAll = 10101,
ConsumerNameTooLong = 10102,
ConsumerBadDurableName = 10103,
ConsumerStoreFailed = 10104,
ConsumerExistingActive = 10105,
ConsumerReplacementWithDifferentName = 10106,
ConsumerDescriptionTooLong = 10107,
StreamHeaderExceedsMaximum = 10097,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Error {
code: usize,
err_code: ErrorCode,
description: Option<String>,
}
impl Error {
pub fn code(&self) -> usize {
self.code
}
pub fn error_code(&self) -> ErrorCode {
self.err_code
}
}
impl fmt::Display for Error {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
fmt,
"{} (code {}, error code {})",
self.code,
self.description.as_ref().unwrap_or(&"unknown".to_string()),
self.err_code as u64,
)
}
}
impl error::Error for Error {}
#[derive(Debug, Default, Serialize, Deserialize, Clone)]
struct PagedRequest {
offset: i64,
}
#[derive(Debug, Default, Serialize, Deserialize, Clone)]
struct PagedResponse<T> {
pub r#type: String,
#[serde(alias = "streams", alias = "consumers")]
pub items: Option<VecDeque<T>>,
pub total: usize,
pub offset: usize,
pub limit: usize,
}
#[derive(Debug)]
pub struct PagedIterator<'a, T> {
manager: &'a JetStream,
subject: String,
offset: i64,
items: VecDeque<T>,
done: bool,
}
impl<'a, T> std::iter::FusedIterator for PagedIterator<'a, T> where T: DeserializeOwned + Debug {}
impl<'a, T> Iterator for PagedIterator<'a, T>
where
T: DeserializeOwned + Debug,
{
type Item = io::Result<T>;
fn next(&mut self) -> Option<io::Result<T>> {
if self.done {
return None;
}
if !self.items.is_empty() {
return Some(Ok(self.items.pop_front().unwrap()));
}
let req = serde_json::ser::to_vec(&PagedRequest {
offset: self.offset,
})
.unwrap();
let res: io::Result<PagedResponse<T>> = self.manager.js_request(&self.subject, &req);
let mut page = match res {
Err(e) => {
self.done = true;
return Some(Err(e));
}
Ok(page) => page,
};
if page.items.is_none() {
self.done = true;
return None;
}
let items = page.items.take().unwrap();
self.offset += i64::try_from(items.len()).unwrap();
self.items = items;
if self.items.is_empty() {
self.done = true;
None
} else {
Some(Ok(self.items.pop_front().unwrap()))
}
}
}
#[derive(Clone, Debug)]
pub struct JetStream {
pub(crate) connection: Connection,
pub(crate) options: JetStreamOptions,
}
impl JetStream {
pub fn new(connection: Connection, options: JetStreamOptions) -> Self {
Self {
connection,
options,
}
}
pub fn publish(&self, subject: &str, data: impl AsRef<[u8]>) -> io::Result<PublishAck> {
self.publish_with_options_or_headers(subject, None, None, data)
}
pub fn publish_with_options(
&self,
subject: &str,
data: impl AsRef<[u8]>,
options: &PublishOptions,
) -> io::Result<PublishAck> {
self.publish_with_options_or_headers(subject, Some(options), None, data)
}
pub fn publish_message(&self, message: &Message) -> io::Result<PublishAck> {
self.publish_with_options_or_headers(
&message.subject,
None,
message.headers.as_ref(),
&message.data,
)
}
pub fn publish_message_with_options(
&self,
message: &Message,
options: &PublishOptions,
) -> io::Result<PublishAck> {
self.publish_with_options_or_headers(
&message.subject,
Some(options),
message.headers.as_ref(),
&message.data,
)
}
pub(crate) fn publish_with_options_or_headers(
&self,
subject: &str,
maybe_options: Option<&PublishOptions>,
maybe_headers: Option<&HeaderMap>,
msg: impl AsRef<[u8]>,
) -> io::Result<PublishAck> {
let maybe_headers = if let Some(options) = maybe_options {
let mut headers = maybe_headers.map_or_else(HeaderMap::default, HeaderMap::clone);
if let Some(v) = options.id.as_ref() {
headers.insert(header::NATS_MSG_ID, v.to_string());
}
if let Some(v) = options.expected_last_msg_id.as_ref() {
headers.insert(header::NATS_EXPECTED_LAST_MSG_ID, v.to_string());
}
if let Some(v) = options.expected_stream.as_ref() {
headers.insert(header::NATS_EXPECTED_STREAM, v.to_string());
}
if let Some(v) = options.expected_last_sequence.as_ref() {
headers.insert(header::NATS_EXPECTED_LAST_SEQUENCE, v.to_string());
}
if let Some(v) = options.expected_last_subject_sequence.as_ref() {
headers.insert(header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE, v.to_string());
}
Some(headers)
} else {
maybe_headers.cloned()
};
let maybe_timeout = maybe_options.and_then(|options| options.timeout);
let res_msg = self.connection.request_with_headers_or_timeout(
subject,
maybe_headers.as_ref(),
maybe_timeout,
msg,
)?;
let res: ApiResponse<PublishAck> = serde_json::de::from_slice(&res_msg.data)?;
match res {
ApiResponse::Ok(pub_ack) => Ok(pub_ack),
ApiResponse::Err { error, .. } => {
log::debug!(
"failed to parse API response: {:?}",
std::str::from_utf8(&res_msg.data)
);
Err(io::Error::new(ErrorKind::Other, error))
}
}
}
pub fn subscribe(&self, subject: &str) -> io::Result<PushSubscription> {
self.do_push_subscribe(subject, None, None)
}
pub fn pull_subscribe(&self, subject: &str) -> io::Result<PullSubscription> {
self.do_pull_subscribe(subject, None)
}
pub fn pull_subscribe_with_options(
&self,
subject: &str,
options: &PullSubscribeOptions,
) -> io::Result<PullSubscription> {
self.do_pull_subscribe(subject, Some(options))
}
pub(crate) fn do_pull_subscribe(
&self,
subject: &str,
maybe_options: Option<&PullSubscribeOptions>,
) -> io::Result<PullSubscription> {
let stream_name = maybe_options
.and_then(|options| options.stream_name.to_owned())
.map_or_else(|| self.stream_name_by_subject(subject), Ok)?;
let maybe_durable_consumer =
maybe_options.and_then(|options| options.durable_name.to_owned());
let process_consumer_info = |info: ConsumerInfo| {
info.config.validate_for(&ConsumerKind::Pull)?;
if !info.config.filter_subject.is_empty() && subject != info.config.filter_subject {
return Err(io::Error::new(
io::ErrorKind::Other,
"subjects do not match",
));
}
Ok(info)
};
let (consumer_info, consumer_ownership) = {
if let Some(durable_name) = maybe_durable_consumer {
match self.consumer_info(stream_name, durable_name) {
Ok(info) => (info, ConsumerOwnership::No),
Err(err) => {
return Err(io::Error::new(
ErrorKind::NotFound,
format!("provided durable consumer doesn't exist: {err}"),
));
}
}
} else {
let consumer_config = {
maybe_options
.and_then(|options| options.consumer_config.clone())
.unwrap_or_else(|| ConsumerConfig {
deliver_policy: DeliverPolicy::All,
ack_policy: AckPolicy::Explicit,
filter_subject: subject.to_string(),
replay_policy: ReplayPolicy::Instant,
..Default::default()
})
};
consumer_config.validate_for(&ConsumerKind::Pull)?;
(
self.add_consumer(stream_name, consumer_config)?,
ConsumerOwnership::Yes,
)
}
};
let consumer_info = process_consumer_info(consumer_info)?;
let inbox = self.connection.new_inbox();
let (pid, messages) = self.connection.0.client.subscribe(inbox.as_str(), None)?;
Ok(PullSubscription::new(
pid,
consumer_info,
consumer_ownership,
inbox,
messages,
self.clone(),
))
}
pub fn subscribe_with_options(
&self,
subject: &str,
options: &SubscribeOptions,
) -> io::Result<PushSubscription> {
self.do_push_subscribe(subject, None, Some(options))
}
pub fn queue_subscribe(&self, subject: &str, queue: &str) -> io::Result<PushSubscription> {
self.do_push_subscribe(subject, Some(queue), None)
}
pub fn queue_subscribe_with_options(
&self,
subject: &str,
queue: &str,
options: &SubscribeOptions,
) -> io::Result<PushSubscription> {
self.do_push_subscribe(subject, Some(queue), Some(options))
}
fn do_push_subscribe(
&self,
subject: &str,
maybe_queue: Option<&str>,
maybe_options: Option<&SubscribeOptions>,
) -> io::Result<PushSubscription> {
if subject.is_empty()
&& maybe_options
.map(|options| options.stream_name.as_ref())
.is_none()
{
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Subject required",
));
}
let wants_idle_heartbeat =
maybe_options.map_or(false, |options| options.idle_heartbeat.is_some());
let wants_flow_control =
maybe_options.map_or(false, |options| options.flow_control.is_some());
if maybe_queue.is_some() {
if wants_idle_heartbeat {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"queue subscription doesn't support idle heartbeat",
));
}
if wants_flow_control {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"queue subscription doesn't support flow control",
));
}
};
if let Some(options) = maybe_options.filter(|options| options.ordered) {
if maybe_queue.is_some() {
return Err(io::Error::new(
io::ErrorKind::Other,
"queues not be set for an ordered consumer",
));
}
if options.durable_name.is_some() {
return Err(io::Error::new(
io::ErrorKind::Other,
"durable name can not be set for an ordered consumer",
));
}
if options.consumer_name.is_some() {
return Err(io::Error::new(
io::ErrorKind::Other,
"can not bind existing consumer for an ordered consumer",
));
}
if options.ack_policy.is_some() {
return Err(io::Error::new(
io::ErrorKind::Other,
"ack policy can not be set for an ordered consumer",
));
}
if options.max_deliver.is_some() {
return Err(io::Error::new(
io::ErrorKind::Other,
"max deliver can not be set for an ordered consumer",
));
}
if options.deliver_subject.is_some() {
return Err(io::Error::new(
io::ErrorKind::Other,
"deliver subject can not be set for an ordered consumer",
));
}
}
let process_consumer_info = |info: ConsumerInfo| {
if !info.config.filter_subject.is_empty() && subject != info.config.filter_subject {
return Err(io::Error::new(
io::ErrorKind::Other,
"subject does not match consumer",
));
}
if let Some(deliver_group) = info.config.deliver_group.as_ref() {
if let Some(queue) = maybe_queue {
if deliver_group != queue {
return Err(io::Error::new(
io::ErrorKind::Other,
format!(
"cannot create a queue subscription {queue} for a consumer with a deliver group {deliver_group}"
),
));
}
} else {
return Err(io::Error::new(
io::ErrorKind::Other,
format!(
"cannot create a subscription for a consumer with a deliver group {deliver_group}"
),
));
}
} else {
if maybe_queue.is_some() {
return Err(io::Error::new(
io::ErrorKind::Other,
"cannot create a queue subscription for a consumer without a deliver group",
));
}
if maybe_queue.is_some() && info.push_bound {
return Err(io::Error::new(
io::ErrorKind::Other,
"consumer is already bound to a subscription",
));
}
}
if let Some(options) = maybe_options {
if options.durable_name.is_some()
&& options.durable_name != info.config.durable_name
{
return Err(io::Error::new(
io::ErrorKind::Other,
format!("configuration requests durable name to be {:?}, but consumer's value is {:?}", options.durable_name, info.config.durable_name
)));
}
if options.description.is_some() && options.description != info.config.description {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("configuration requests description to be {:?}, but consumer's value is {:?}", options.description, info.config.description
)));
}
if options.deliver_policy.is_some()
&& options.deliver_policy.unwrap() != info.config.deliver_policy
{
return Err(io::Error::new(
io::ErrorKind::Other,
format!("configuration requests deliver policy to be {:?}, but consumer's value is {:?}", options.deliver_policy, info.config.deliver_policy
)));
}
if options.opt_start_seq.is_some()
&& options.opt_start_seq != info.config.opt_start_seq
{
return Err(io::Error::new(
io::ErrorKind::Other,
format!("configuration requests optional start sequence to be {:?}, but consumer's value is {:?}", options.opt_start_seq, info.config.opt_start_seq
)));
}
if options.opt_start_time.is_some()
&& options.opt_start_time != info.config.opt_start_time
{
return Err(io::Error::new(
io::ErrorKind::Other,
format!("configuration requests optional start time to be {:?}, but consumer's value is {:?}", options.opt_start_time, info.config.opt_start_time
)));
}
if options.ack_policy.is_some()
&& options.ack_policy.unwrap() != info.config.ack_policy
{
return Err(io::Error::new(
io::ErrorKind::Other,
format!("configuration requests ack policy to be {:?}, but consumer's value is {:?}", options.ack_policy, info.config.ack_policy
)));
}
if options.ack_wait.is_some() && options.ack_wait.unwrap() != info.config.ack_wait {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("configuration requests ack wait to be {:?}, but consumer's value is {:?}", options.ack_wait, info.config.ack_wait
)));
}
if options.max_deliver.is_some()
&& options.max_deliver.unwrap() != info.config.max_deliver
{
return Err(io::Error::new(
io::ErrorKind::Other,
format!("configuration requests max deliver to be {:?}, but consumer's value is {:?}", options.max_deliver, info.config.max_deliver
)));
}
if options.replay_policy.is_some()
&& options.replay_policy.unwrap() != info.config.replay_policy
{
return Err(io::Error::new(
io::ErrorKind::Other,
format!("configuration requests replay policy to be {:?}, but consumer's value is {:?}", options.replay_policy, info.config.replay_policy
)));
}
if options.rate_limit.is_some()
&& options.rate_limit.unwrap() != info.config.rate_limit
{
return Err(io::Error::new(
io::ErrorKind::Other,
format!("configuration requests rate limit to be {:?}, but consumer's value is {:?}", options.rate_limit, info.config.rate_limit
)));
}
if options.sample_frequency.is_some()
&& options.sample_frequency.unwrap() != info.config.sample_frequency
{
return Err(io::Error::new(
io::ErrorKind::Other,
format!("configuration requests sample frequency to be {:?}, but consumer's value is {:?}", options.sample_frequency, info.config.sample_frequency
)));
}
if options.max_waiting.is_some()
&& options.max_waiting.unwrap() != info.config.max_waiting
{
return Err(io::Error::new(
io::ErrorKind::Other,
format!("configuration requests max waiting to be {:?}, but consumer's value is {:?}", options.max_waiting, info.config.max_waiting
)));
}
if options.max_ack_pending.is_some()
&& options.max_ack_pending.unwrap() != info.config.max_ack_pending
{
return Err(io::Error::new(
io::ErrorKind::Other,
format!("configuration requests max ack pending to be {:?}, but consumer's value is {:?}", options.max_ack_pending, info.config.max_ack_pending
)));
}
if options.flow_control.is_some() && !info.config.flow_control {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("configuration requests flow control to be {:?}, but consumer's value is {:?}", options.flow_control, info.config.flow_control
)));
}
if options.idle_heartbeat.is_some()
&& options.idle_heartbeat.unwrap() != info.config.idle_heartbeat
{
return Err(io::Error::new(
io::ErrorKind::Other,
format!("configuration requests heartbeat to be {:?}, but consumer's value is {:?}", options.idle_heartbeat, info.config.idle_heartbeat
)));
}
}
Ok(info)
};
let stream_name = maybe_options
.and_then(|options| options.stream_name.to_owned())
.map_or_else(|| self.stream_name_by_subject(subject), Ok)?;
let maybe_durable_name = maybe_options
.and_then(|options| options.durable_name.as_deref())
.or(maybe_queue);
let maybe_consumer_name = maybe_options
.and_then(|options| options.consumer_name.as_deref())
.or(maybe_durable_name);
let bind_only = maybe_options.map_or(false, |options| options.bind_only);
let maybe_consumer_info = if let Some(consumer_name) = maybe_consumer_name {
let consumer_info_result = self
.consumer_info(&stream_name, consumer_name)
.and_then(process_consumer_info);
match consumer_info_result {
Ok(info) => Some(info),
Err(err) => {
if err.kind() != io::ErrorKind::Other {
return Err(err);
}
if bind_only {
if let Some(inner) = err.into_inner() {
if let Ok(err) = inner.downcast::<Error>() {
if err.error_code() == ErrorCode::ConsumerNotFound {
return Err(io::Error::new(io::ErrorKind::Other, err));
}
}
}
}
None
}
}
} else {
None
};
let is_ordered = maybe_options.map_or(false, |options| options.ordered);
let consumer_config = {
let mut config = if let Some(options) = maybe_options {
ConsumerConfig {
ack_policy: options.ack_policy.unwrap_or_default(),
ack_wait: options.ack_wait.unwrap_or_default(),
deliver_policy: options.deliver_policy.unwrap_or_default(),
deliver_subject: options.deliver_subject.clone(),
description: options.description.clone(),
durable_name: options.durable_name.clone(),
flow_control: options.flow_control.unwrap_or_default(),
headers_only: options.headers_only.unwrap_or_default(),
idle_heartbeat: options.idle_heartbeat.unwrap_or_default(),
max_ack_pending: options.max_ack_pending.unwrap_or_default(),
max_deliver: options.max_deliver.unwrap_or_default(),
max_waiting: options.max_waiting.unwrap_or_default(),
opt_start_seq: options.opt_start_seq,
opt_start_time: options.opt_start_time,
rate_limit: options.rate_limit.unwrap_or_default(),
replay_policy: options.replay_policy.unwrap_or_default(),
sample_frequency: options.sample_frequency.unwrap_or_default(),
..Default::default()
}
} else {
ConsumerConfig::default()
};
config.filter_subject = subject.to_string();
if let Some(queue) = maybe_queue {
if config.durable_name.is_none() {
config.durable_name = Some(queue.to_owned());
}
config.deliver_group = Some(queue.to_owned());
}
if config.deliver_subject.is_none() {
config.deliver_subject = Some(self.connection.new_inbox());
}
if is_ordered {
config.flow_control = true;
config.ack_policy = AckPolicy::None;
config.max_deliver = 1;
config.ack_wait = Duration::from_nanos(1_000_000);
if config.idle_heartbeat.is_zero() {
config.idle_heartbeat = ORDERED_IDLE_HEARTBEAT;
}
}
config
};
let shared_sid = Arc::new(AtomicU64::new(0));
let preprocessor = {
let sequence_pair = Arc::new(Mutex::new(SequencePair {
consumer_seq: 0,
stream_seq: 0,
}));
let handle_sequence_mismatch = {
let context = self.clone();
let consumer_config = consumer_config.clone();
let sequence_pair = sequence_pair.clone();
let stream_name = stream_name.clone();
let shared_sid = shared_sid.clone();
move |sid: u64, start_seq: u64| {
let stream_name = stream_name.clone();
let context = context.clone();
let consumer_config = consumer_config.clone();
let sequence_pair = sequence_pair.clone();
let shared_sid = shared_sid.clone();
if !context.connection.0.client.mute(sid).unwrap() {
return true;
}
thread::spawn(move || {
let new_deliver_subject = context.connection.new_inbox();
let result = context
.connection
.0
.client
.resubscribe(sid, &new_deliver_subject);
if let Ok(new_sid) = result {
shared_sid.store(new_sid, Ordering::Relaxed);
let mut consumer_config = consumer_config.clone();
consumer_config.deliver_subject = Some(new_deliver_subject);
consumer_config.deliver_policy = DeliverPolicy::ByStartSeq;
consumer_config.opt_start_seq = Some(start_seq);
context.add_consumer(stream_name, consumer_config).ok();
}
let mut sequence_info = sequence_pair.lock();
sequence_info.consumer_seq = 0;
});
true
}
};
let context = self.clone();
move |sid: u64, message: &Message| {
if message.is_flow_control() {
return false;
}
if message.is_idle_heartbeat() {
let maybe_consumer_stalled = message
.headers
.as_ref()
.and_then(|headers| headers.get(header::NATS_CONSUMER_STALLED));
if let Some(consumer_stalled) = maybe_consumer_stalled {
context.connection.try_publish_with_reply_or_headers(
consumer_stalled,
None,
None,
b"",
);
}
if is_ordered {
return false;
}
let maybe_consumer_seq = message
.headers
.as_ref()
.and_then(|headers| headers.get(header::NATS_LAST_CONSUMER));
if let Some(consumer_seq) = maybe_consumer_seq {
let consumer_seq = consumer_seq.parse::<u64>().unwrap();
let sequence_info = sequence_pair.lock();
if consumer_seq != sequence_info.consumer_seq {
return handle_sequence_mismatch(sid, sequence_info.stream_seq + 1);
}
}
return false;
}
if !is_ordered {
return false;
}
if let Some(message_info) = message.jetstream_message_info() {
let mut sequence_info = sequence_pair.lock();
if message_info.consumer_seq != sequence_info.consumer_seq + 1 {
return handle_sequence_mismatch(sid, sequence_info.stream_seq + 1);
}
sequence_info.stream_seq = message_info.stream_seq;
sequence_info.consumer_seq = message_info.consumer_seq;
}
false
}
};
let deliver_subject = maybe_consumer_info
.as_ref()
.and_then(|consumer_info| consumer_info.config.deliver_subject.clone())
.or_else(|| consumer_config.deliver_subject.clone())
.ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
"must use pull subscribe to bind to pull based consumer",
)
})?;
let (mut sid, mut receiver) = self.connection.0.client.subscribe_with_preprocessor(
&deliver_subject,
maybe_queue,
Box::new(preprocessor.clone()),
)?;
let (consumer_info, consumer_ownership) = match maybe_consumer_info {
Some(consumer_info) => (consumer_info, ConsumerOwnership::No),
None => match self.add_consumer(&stream_name, &consumer_config) {
Ok(consumer_info) => (consumer_info, ConsumerOwnership::Yes),
Err(err) => {
self.connection.0.client.unsubscribe(sid)?;
if err.kind() != io::ErrorKind::Other {
return Err(err);
}
if let Some(inner) = err.into_inner() {
if let Ok(err) = inner.downcast::<Error>() {
if err.error_code() != ErrorCode::ConsumerNameExist {
return Err(io::Error::new(io::ErrorKind::Other, err));
}
}
}
let consumer_name = maybe_consumer_name
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "stream not found"))?;
let consumer_info = self
.consumer_info(&stream_name, consumer_name)
.and_then(process_consumer_info)?;
let deliver_subject = consumer_info
.config
.deliver_subject
.as_ref()
.ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
"must use pull subscribe to bind to pull based consumer",
)
})?;
let (new_sid, new_receiver) =
self.connection.0.client.subscribe_with_preprocessor(
deliver_subject,
maybe_queue,
Box::new(preprocessor),
)?;
sid = new_sid;
receiver = new_receiver;
(consumer_info, ConsumerOwnership::No)
}
},
};
shared_sid.store(sid, Ordering::Relaxed);
Ok(PushSubscription::new(
shared_sid,
consumer_info,
consumer_ownership,
receiver,
self.clone(),
))
}
pub fn add_stream<S>(&self, stream_config: S) -> io::Result<StreamInfo>
where
StreamConfig: From<S>,
{
let config: StreamConfig = stream_config.into();
if config.name.is_empty() {
return Err(io::Error::new(
ErrorKind::InvalidInput,
"the stream name must not be empty",
));
}
let subject: String = format!("{}STREAM.CREATE.{}", self.api_prefix(), config.name);
let req = serde_json::ser::to_vec(&config)?;
self.js_request(&subject, &req)
}
pub fn update_stream(&self, config: &StreamConfig) -> io::Result<StreamInfo> {
if config.name.is_empty() {
return Err(io::Error::new(
ErrorKind::InvalidInput,
"the stream name must not be empty",
));
}
let subject: String = format!("{}STREAM.UPDATE.{}", self.api_prefix(), config.name);
let req = serde_json::ser::to_vec(&config)?;
self.js_request(&subject, &req)
}
pub fn stream_names(&self) -> PagedIterator<'_, String> {
PagedIterator {
subject: format!("{}STREAM.NAMES", self.api_prefix()),
manager: self,
offset: 0,
items: Default::default(),
done: false,
}
}
fn stream_name_by_subject(&self, subject: &str) -> io::Result<String> {
let req = serde_json::ser::to_vec(&StreamNamesRequest {
subject: subject.to_string(),
})?;
let request_subject = format!("{}STREAM.NAMES", self.api_prefix());
self.js_request::<StreamNamesResponse>(&request_subject, &req)
.map(|resp| resp.streams)?
.map_or_else(
|| {
Err(io::Error::new(
ErrorKind::NotFound,
"could not find stream for given subject",
))
},
|stream| Ok(stream.first().unwrap().to_string()),
)
}
pub fn list_streams(&self) -> PagedIterator<'_, StreamInfo> {
PagedIterator {
subject: format!("{}STREAM.LIST", self.api_prefix()),
manager: self,
offset: 0,
items: Default::default(),
done: false,
}
}
pub fn list_consumers<S>(&self, stream: S) -> io::Result<PagedIterator<'_, ConsumerInfo>>
where
S: AsRef<str>,
{
let stream: &str = stream.as_ref();
if stream.is_empty() {
return Err(io::Error::new(
ErrorKind::InvalidInput,
"the stream name must not be empty",
));
}
let subject: String = format!("{}CONSUMER.LIST.{}", self.api_prefix(), stream);
Ok(PagedIterator {
subject,
manager: self,
offset: 0,
items: Default::default(),
done: false,
})
}
pub fn stream_info<S: AsRef<str>>(&self, stream: S) -> io::Result<StreamInfo> {
let stream: &str = stream.as_ref();
if stream.is_empty() {
return Err(io::Error::new(
ErrorKind::InvalidInput,
"the stream name must not be empty",
));
}
let subject: String = format!("{}STREAM.INFO.{}", self.api_prefix(), stream);
self.js_request(&subject, b"")
}
pub fn purge_stream<S: AsRef<str>>(&self, stream: S) -> io::Result<PurgeResponse> {
let stream: &str = stream.as_ref();
if stream.is_empty() {
return Err(io::Error::new(
ErrorKind::InvalidInput,
"the stream name must not be empty",
));
}
let subject = format!("{}STREAM.PURGE.{}", self.api_prefix(), stream);
self.js_request(&subject, b"")
}
pub fn purge_stream_subject<S: AsRef<str>>(
&self,
stream: S,
filter_subject: &str,
) -> io::Result<PurgeResponse> {
let stream: &str = stream.as_ref();
if stream.is_empty() {
return Err(io::Error::new(
ErrorKind::InvalidInput,
"the stream name must not be empty",
));
}
let subject = format!("{}STREAM.PURGE.{}", self.api_prefix(), stream);
let request = serde_json::to_vec(&PurgeRequest {
filter: Some(filter_subject.to_string()),
..Default::default()
})?;
self.js_request(&subject, &request)
}
pub fn get_message<S: AsRef<str>>(&self, stream: S, seq: u64) -> io::Result<StreamMessage> {
let stream: &str = stream.as_ref();
if stream.is_empty() {
return Err(io::Error::new(
ErrorKind::InvalidInput,
"the stream name must not be empty",
));
}
let subject = format!("{}STREAM.MSG.GET.{}", self.api_prefix(), stream);
let request = serde_json::ser::to_vec(&StreamMessageGetRequest {
seq: Some(seq),
last_by_subject: None,
})?;
let raw_message = self
.js_request::<StreamMessageGetResponse>(&subject, &request)
.map(|response| response.message)?;
let message = StreamMessage::try_from(raw_message)?;
Ok(message)
}
pub fn get_last_message<S: AsRef<str>>(
&self,
stream_name: S,
stream_subject: &str,
) -> io::Result<StreamMessage> {
let stream_name: &str = stream_name.as_ref();
if stream_name.is_empty() {
return Err(io::Error::new(
ErrorKind::InvalidInput,
"the stream name must not be empty",
));
}
let subject = format!("{}STREAM.MSG.GET.{}", self.api_prefix(), stream_name);
let request = serde_json::ser::to_vec(&StreamMessageGetRequest {
seq: None,
last_by_subject: Some(stream_subject.to_string()),
})?;
let raw_message = self
.js_request::<StreamMessageGetResponse>(&subject, &request)
.map(|response| response.message)?;
let message = StreamMessage::try_from(raw_message)?;
Ok(message)
}
pub fn delete_message<S: AsRef<str>>(
&self,
stream: S,
sequence_number: u64,
) -> io::Result<bool> {
let stream: &str = stream.as_ref();
if stream.is_empty() {
return Err(io::Error::new(
ErrorKind::InvalidInput,
"the stream name must not be empty",
));
}
let req = serde_json::ser::to_vec(&DeleteRequest {
seq: sequence_number,
})
.unwrap();
let subject = format!("{}STREAM.MSG.DELETE.{}", self.api_prefix(), stream);
self.js_request::<DeleteResponse>(&subject, &req)
.map(|dr| dr.success)
}
pub fn delete_stream<S: AsRef<str>>(&self, stream: S) -> io::Result<bool> {
let stream: &str = stream.as_ref();
if stream.is_empty() {
return Err(io::Error::new(
ErrorKind::InvalidInput,
"the stream name must not be empty",
));
}
let subject = format!("{}STREAM.DELETE.{}", self.api_prefix(), stream);
self.js_request::<DeleteResponse>(&subject, b"")
.map(|dr| dr.success)
}
pub fn add_consumer<S, C>(&self, stream: S, config: C) -> io::Result<ConsumerInfo>
where
S: AsRef<str>,
ConsumerConfig: From<C>,
{
let config = ConsumerConfig::from(config);
let stream = stream.as_ref();
if stream.is_empty() {
return Err(io::Error::new(
ErrorKind::InvalidInput,
"the stream name must not be empty",
));
}
let subject = if let Some(ref durable_name) = config.durable_name {
format!(
"{}CONSUMER.DURABLE.CREATE.{}.{}",
self.api_prefix(),
stream,
durable_name
)
} else {
format!("{}CONSUMER.CREATE.{}", self.api_prefix(), stream)
};
let req = CreateConsumerRequest {
stream_name: stream.into(),
config,
};
let ser_req = serde_json::ser::to_vec(&req)?;
self.js_request(&subject, &ser_req)
}
pub fn delete_consumer<S, C>(&self, stream: S, consumer: C) -> io::Result<bool>
where
S: AsRef<str>,
C: AsRef<str>,
{
let stream = stream.as_ref();
if stream.is_empty() {
return Err(io::Error::new(
ErrorKind::InvalidInput,
"the stream name must not be empty",
));
}
let consumer = consumer.as_ref();
if consumer.is_empty() {
return Err(io::Error::new(
ErrorKind::InvalidInput,
"the consumer name must not be empty",
));
}
let subject = format!(
"{}CONSUMER.DELETE.{}.{}",
self.api_prefix(),
stream,
consumer
);
self.js_request::<DeleteResponse>(&subject, b"")
.map(|dr| dr.success)
}
pub fn consumer_info<S, C>(&self, stream: S, consumer: C) -> io::Result<ConsumerInfo>
where
S: AsRef<str>,
C: AsRef<str>,
{
let stream: &str = stream.as_ref();
if stream.is_empty() {
return Err(io::Error::new(
ErrorKind::InvalidInput,
"the stream name must not be empty",
));
}
let consumer: &str = consumer.as_ref();
let subject: String = format!("{}CONSUMER.INFO.{}.{}", self.api_prefix(), stream, consumer);
self.js_request(&subject, b"")
}
pub fn account_info(&self) -> io::Result<AccountInfo> {
self.js_request(&format!("{}INFO", self.api_prefix()), b"")
}
fn js_request<Res>(&self, subject: &str, req: &[u8]) -> io::Result<Res>
where
Res: DeserializeOwned,
{
let res_msg = self
.connection
.request_timeout(subject, req, Duration::from_secs(5))?;
let res: ApiResponse<Res> = serde_json::de::from_slice(&res_msg.data)?;
match res {
ApiResponse::Ok(stream_info) => Ok(stream_info),
ApiResponse::Err { error, .. } => {
log::error!(
"failed to parse API response: {:?}",
std::str::from_utf8(&res_msg.data)
);
Err(io::Error::new(io::ErrorKind::Other, error))
}
}
}
fn api_prefix(&self) -> &str {
&self.options.api_prefix
}
}
pub fn new(nc: Connection) -> JetStream {
JetStream::new(nc, JetStreamOptions::default())
}