1use std::{
17 collections::{self, HashMap},
18 fmt::{self, Debug, Display},
19 future::IntoFuture,
20 io::{self},
21 pin::Pin,
22 str::FromStr,
23 task::Poll,
24 time::Duration,
25};
26
27use crate::{
28 error::Error, header::HeaderName, is_valid_subject, HeaderMap, HeaderValue, StatusCode,
29};
30use base64::engine::general_purpose::STANDARD;
31use base64::engine::Engine;
32use bytes::Bytes;
33use futures_util::{future::BoxFuture, FutureExt, TryFutureExt};
34use serde::{Deserialize, Deserializer, Serialize};
35use serde_json::json;
36use time::{serde::rfc3339, OffsetDateTime};
37
38use super::{
39 consumer::{self, Consumer, FromConsumer, IntoConsumerConfig},
40 context::{
41 ConsumerInfoError, ConsumerInfoErrorKind, RequestError, RequestErrorKind, StreamsError,
42 StreamsErrorKind,
43 },
44 errors::ErrorCode,
45 is_valid_name,
46 message::{StreamMessage, StreamMessageError},
47 response::Response,
48 Context,
49};
50
51pub type InfoError = RequestError;
52
53#[derive(Clone, Debug, PartialEq)]
54pub enum DirectGetErrorKind {
55 NotFound,
56 InvalidSubject,
57 TimedOut,
58 Request,
59 ErrorResponse(StatusCode, String),
60 Other,
61}
62
63impl Display for DirectGetErrorKind {
64 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65 match self {
66 Self::InvalidSubject => write!(f, "invalid subject"),
67 Self::NotFound => write!(f, "message not found"),
68 Self::ErrorResponse(status, description) => {
69 write!(f, "unable to get message: {status} {description}")
70 }
71 Self::Other => write!(f, "error getting message"),
72 Self::TimedOut => write!(f, "timed out"),
73 Self::Request => write!(f, "request failed"),
74 }
75 }
76}
77
78pub type DirectGetError = Error<DirectGetErrorKind>;
79
80impl From<crate::RequestError> for DirectGetError {
81 fn from(err: crate::RequestError) -> Self {
82 match err.kind() {
83 crate::RequestErrorKind::TimedOut => DirectGetError::new(DirectGetErrorKind::TimedOut),
84 crate::RequestErrorKind::NoResponders => {
85 DirectGetError::new(DirectGetErrorKind::ErrorResponse(
86 StatusCode::NO_RESPONDERS,
87 "no responders".to_string(),
88 ))
89 }
90 crate::RequestErrorKind::InvalidSubject | crate::RequestErrorKind::Other => {
91 DirectGetError::with_source(DirectGetErrorKind::Other, err)
92 }
93 }
94 }
95}
96
97impl From<serde_json::Error> for DirectGetError {
98 fn from(err: serde_json::Error) -> Self {
99 DirectGetError::with_source(DirectGetErrorKind::Other, err)
100 }
101}
102
103impl From<StreamMessageError> for DirectGetError {
104 fn from(err: StreamMessageError) -> Self {
105 DirectGetError::with_source(DirectGetErrorKind::Other, err)
106 }
107}
108
109#[derive(Clone, Debug, PartialEq)]
110pub enum DeleteMessageErrorKind {
111 Request,
112 TimedOut,
113 JetStream(super::errors::Error),
114}
115
116impl Display for DeleteMessageErrorKind {
117 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
118 match self {
119 Self::Request => write!(f, "request failed"),
120 Self::TimedOut => write!(f, "timed out"),
121 Self::JetStream(err) => write!(f, "JetStream error: {err}"),
122 }
123 }
124}
125
126pub type DeleteMessageError = Error<DeleteMessageErrorKind>;
127
128#[derive(Debug, Clone)]
132pub struct Stream<T = Info> {
133 pub(crate) info: T,
134 pub(crate) context: Context,
135 pub(crate) name: String,
136}
137
138impl Stream<Info> {
139 pub async fn info(&mut self) -> Result<&Info, InfoError> {
157 let subject = format!("STREAM.INFO.{}", self.info.config.name);
158
159 match self.context.request(subject, &json!({})).await? {
160 Response::Ok::<Info>(info) => {
161 self.info = info;
162 Ok(&self.info)
163 }
164 Response::Err { error } => Err(error.into()),
165 }
166 }
167
168 pub fn cached_info(&self) -> &Info {
187 &self.info
188 }
189}
190
191impl<I> Stream<I> {
192 pub async fn get_info(&self) -> Result<Info, InfoError> {
195 let subject = format!("STREAM.INFO.{}", self.name);
196
197 match self.context.request(subject, &json!({})).await? {
198 Response::Ok::<Info>(info) => Ok(info),
199 Response::Err { error } => Err(error.into()),
200 }
201 }
202
203 pub async fn info_with_subjects<F: AsRef<str>>(
226 &self,
227 subjects_filter: F,
228 ) -> Result<InfoWithSubjects, InfoError> {
229 let subjects_filter = subjects_filter.as_ref().to_string();
230 let info = stream_info_with_details(
232 self.context.clone(),
233 self.name.clone(),
234 0,
235 false,
236 subjects_filter.clone(),
237 )
238 .await?;
239
240 Ok(InfoWithSubjects::new(
241 self.context.clone(),
242 info,
243 subjects_filter,
244 ))
245 }
246
247 pub fn info_builder(&self) -> StreamInfoBuilder {
273 StreamInfoBuilder::new(self.context.clone(), self.name.clone())
274 }
275
276 pub fn direct_get_builder(&self) -> DirectGetBuilder<WithHeaders> {
296 DirectGetBuilder::new(self.context.clone(), self.name.clone())
297 }
298
299 pub async fn direct_get_next_for_subject<T: Into<String>>(
334 &self,
335 subject: T,
336 sequence: Option<u64>,
337 ) -> Result<StreamMessage, DirectGetError> {
338 let subject_str = subject.into();
339 if !is_valid_subject(&subject_str) {
340 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject));
341 }
342
343 let mut builder = self.direct_get_builder().next_by_subject(subject_str);
344 if let Some(seq) = sequence {
345 builder = builder.sequence(seq);
346 }
347
348 builder.send().await
349 }
350
351 pub async fn direct_get_first_for_subject<T: Into<String>>(
383 &self,
384 subject: T,
385 ) -> Result<StreamMessage, DirectGetError> {
386 let subject_str = subject.into();
387 if !is_valid_subject(&subject_str) {
388 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject));
389 }
390
391 self.direct_get_builder()
392 .next_by_subject(subject_str)
393 .send()
394 .await
395 }
396
397 pub async fn direct_get(&self, sequence: u64) -> Result<StreamMessage, DirectGetError> {
429 self.direct_get_builder().sequence(sequence).send().await
430 }
431
432 pub async fn direct_get_last_for_subject<T: Into<String>>(
464 &self,
465 subject: T,
466 ) -> Result<StreamMessage, DirectGetError> {
467 self.direct_get_builder()
468 .last_by_subject(subject)
469 .send()
470 .await
471 }
472 pub fn raw_message_builder(&self) -> RawMessageBuilder<WithHeaders> {
492 RawMessageBuilder::new(self.context.clone(), self.name.clone())
493 }
494
495 pub async fn get_raw_message(&self, sequence: u64) -> Result<StreamMessage, RawMessageError> {
525 self.raw_message_builder().sequence(sequence).send().await
526 }
527
528 pub async fn get_first_raw_message_by_subject<T: AsRef<str>>(
552 &self,
553 subject: T,
554 sequence: u64,
555 ) -> Result<StreamMessage, RawMessageError> {
556 self.raw_message_builder()
557 .sequence(sequence)
558 .next_by_subject(subject.as_ref().to_string())
559 .send()
560 .await
561 }
562
563 pub async fn get_next_raw_message_by_subject<T: AsRef<str>>(
587 &self,
588 subject: T,
589 ) -> Result<StreamMessage, RawMessageError> {
590 self.raw_message_builder()
591 .next_by_subject(subject.as_ref().to_string())
592 .send()
593 .await
594 }
595
596 pub async fn get_last_raw_message_by_subject(
620 &self,
621 stream_subject: &str,
622 ) -> Result<StreamMessage, LastRawMessageError> {
623 self.raw_message_builder()
624 .last_by_subject(stream_subject.to_string())
625 .send()
626 .await
627 }
628
629 pub async fn delete_message(&self, sequence: u64) -> Result<bool, DeleteMessageError> {
653 let subject = format!("STREAM.MSG.DELETE.{}", &self.name);
654 let payload = json!({
655 "seq": sequence,
656 });
657
658 let response: Response<DeleteStatus> = self
659 .context
660 .request(subject, &payload)
661 .map_err(|err| match err.kind() {
662 RequestErrorKind::TimedOut => {
663 DeleteMessageError::new(DeleteMessageErrorKind::TimedOut)
664 }
665 _ => DeleteMessageError::with_source(DeleteMessageErrorKind::Request, err),
666 })
667 .await?;
668
669 match response {
670 Response::Err { error } => Err(DeleteMessageError::new(
671 DeleteMessageErrorKind::JetStream(error),
672 )),
673 Response::Ok(value) => Ok(value.success),
674 }
675 }
676
677 pub fn purge(&self) -> Purge<No, No> {
693 Purge::build(self)
694 }
695
696 #[deprecated(
713 since = "0.25.0",
714 note = "Overloads have been replaced with an into_future based builder. Use Stream::purge().filter(subject) instead."
715 )]
716 pub async fn purge_subject<T>(&self, subject: T) -> Result<PurgeResponse, PurgeError>
717 where
718 T: Into<String>,
719 {
720 self.purge().filter(subject).await
721 }
722
723 pub async fn create_consumer<C: IntoConsumerConfig + FromConsumer>(
747 &self,
748 config: C,
749 ) -> Result<Consumer<C>, ConsumerError> {
750 self.context
751 .create_consumer_on_stream(config, self.name.clone())
752 .await
753 }
754
755 #[cfg(feature = "server_2_10")]
779 pub async fn update_consumer<C: IntoConsumerConfig + FromConsumer>(
780 &self,
781 config: C,
782 ) -> Result<Consumer<C>, ConsumerUpdateError> {
783 self.context
784 .update_consumer_on_stream(config, self.name.clone())
785 .await
786 }
787
788 #[cfg(feature = "server_2_10")]
813 pub async fn create_consumer_strict<C: IntoConsumerConfig + FromConsumer>(
814 &self,
815 config: C,
816 ) -> Result<Consumer<C>, ConsumerCreateStrictError> {
817 self.context
818 .create_consumer_strict_on_stream(config, self.name.clone())
819 .await
820 }
821
822 pub async fn consumer_info<T: AsRef<str>>(
839 &self,
840 name: T,
841 ) -> Result<consumer::Info, ConsumerInfoError> {
842 let name = name.as_ref();
843
844 if !is_valid_name(name) {
845 return Err(ConsumerInfoError::new(ConsumerInfoErrorKind::InvalidName));
846 }
847
848 let subject = format!("CONSUMER.INFO.{}.{}", self.name, name);
849
850 match self.context.request(subject, &json!({})).await? {
851 Response::Ok(info) => Ok(info),
852 Response::Err { error } => Err(error.into()),
853 }
854 }
855
856 pub async fn get_consumer<T: FromConsumer + IntoConsumerConfig>(
875 &self,
876 name: &str,
877 ) -> Result<Consumer<T>, crate::Error> {
878 let info = self.consumer_info(name).await?;
879
880 Ok(Consumer::new(
881 T::try_from_consumer_config(info.config.clone())?,
882 info,
883 self.context.clone(),
884 ))
885 }
886
887 pub async fn get_or_create_consumer<T: FromConsumer + IntoConsumerConfig>(
915 &self,
916 name: &str,
917 config: T,
918 ) -> Result<Consumer<T>, ConsumerError> {
919 let subject = format!("CONSUMER.INFO.{}.{}", self.name, name);
920
921 match self.context.request(subject, &json!({})).await? {
922 Response::Err { error } if error.code() == 404 => self.create_consumer(config).await,
923 Response::Err { error } => Err(error.into()),
924 Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
925 T::try_from_consumer_config(info.config.clone()).map_err(|err| {
926 ConsumerError::with_source(ConsumerErrorKind::InvalidConsumerType, err)
927 })?,
928 info,
929 self.context.clone(),
930 )),
931 }
932 }
933
934 pub async fn delete_consumer(&self, name: &str) -> Result<DeleteStatus, ConsumerError> {
955 let subject = format!("CONSUMER.DELETE.{}.{}", self.name, name);
956
957 match self.context.request(subject, &json!({})).await? {
958 Response::Ok(delete_status) => Ok(delete_status),
959 Response::Err { error } => Err(error.into()),
960 }
961 }
962
963 #[cfg(feature = "server_2_11")]
987 pub async fn pause_consumer(
988 &self,
989 name: &str,
990 pause_until: OffsetDateTime,
991 ) -> Result<PauseResponse, ConsumerError> {
992 self.request_pause_consumer(name, Some(pause_until)).await
993 }
994
995 #[cfg(feature = "server_2_11")]
1016 pub async fn resume_consumer(&self, name: &str) -> Result<PauseResponse, ConsumerError> {
1017 self.request_pause_consumer(name, None).await
1018 }
1019
1020 #[cfg(feature = "server_2_11")]
1021 async fn request_pause_consumer(
1022 &self,
1023 name: &str,
1024 pause_until: Option<OffsetDateTime>,
1025 ) -> Result<PauseResponse, ConsumerError> {
1026 let subject = format!("CONSUMER.PAUSE.{}.{}", self.name, name);
1027 let payload = &PauseResumeConsumerRequest { pause_until };
1028
1029 match self.context.request(subject, payload).await? {
1030 Response::Ok::<PauseResponse>(resp) => Ok(resp),
1031 Response::Err { error } => Err(error.into()),
1032 }
1033 }
1034
1035 #[cfg(feature = "server_2_14")]
1067 #[cfg_attr(docsrs, doc(cfg(feature = "server_2_14")))]
1068 pub async fn reset_consumer(
1069 &self,
1070 name: &str,
1071 seq: Option<u64>,
1072 ) -> Result<ConsumerResetResponse, ConsumerResetError> {
1073 let subject = format!("CONSUMER.RESET.{}.{}", self.name, name);
1074 let payload = ConsumerResetRequest {
1075 seq: seq.unwrap_or(0),
1076 };
1077
1078 match self.context.request(subject, &payload).await? {
1079 Response::Ok::<ConsumerResetResponse>(resp) => Ok(resp),
1080 Response::Err { error } => Err(error.into()),
1081 }
1082 }
1083
1084 pub fn consumer_names(&self) -> ConsumerNames {
1103 ConsumerNames {
1104 context: self.context.clone(),
1105 stream: self.name.clone(),
1106 offset: 0,
1107 page_request: None,
1108 consumers: Vec::new(),
1109 done: false,
1110 }
1111 }
1112
1113 pub fn consumers(&self) -> Consumers {
1132 Consumers {
1133 context: self.context.clone(),
1134 stream: self.name.clone(),
1135 offset: 0,
1136 page_request: None,
1137 consumers: Vec::new(),
1138 done: false,
1139 }
1140 }
1141}
1142
1143pub struct StreamInfoBuilder {
1144 pub(crate) context: Context,
1145 pub(crate) name: String,
1146 pub(crate) deleted: bool,
1147 pub(crate) subject: String,
1148}
1149
1150impl StreamInfoBuilder {
1151 fn new(context: Context, name: String) -> Self {
1152 Self {
1153 context,
1154 name,
1155 deleted: false,
1156 subject: "".to_string(),
1157 }
1158 }
1159
1160 pub fn with_deleted(mut self, deleted: bool) -> Self {
1161 self.deleted = deleted;
1162 self
1163 }
1164
1165 pub fn subjects<S: Into<String>>(mut self, subject: S) -> Self {
1166 self.subject = subject.into();
1167 self
1168 }
1169
1170 pub async fn fetch(self) -> Result<InfoWithSubjects, InfoError> {
1171 let info = stream_info_with_details(
1172 self.context.clone(),
1173 self.name.clone(),
1174 0,
1175 self.deleted,
1176 self.subject.clone(),
1177 )
1178 .await?;
1179
1180 Ok(InfoWithSubjects::new(self.context, info, self.subject))
1181 }
1182}
1183
1184#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
1188pub struct Config {
1189 pub name: String,
1191 #[serde(default)]
1193 pub max_bytes: i64,
1194 #[serde(default, rename = "max_msgs")]
1196 pub max_messages: i64,
1197 #[serde(default, rename = "max_msgs_per_subject")]
1199 pub max_messages_per_subject: i64,
1200 pub discard: DiscardPolicy,
1203 #[serde(default, skip_serializing_if = "is_default")]
1205 pub discard_new_per_subject: bool,
1206 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1209 pub subjects: Vec<String>,
1210 pub retention: RetentionPolicy,
1212 #[serde(default)]
1214 pub max_consumers: i32,
1215 #[serde(default, with = "serde_nanos")]
1217 pub max_age: Duration,
1218 #[serde(default, skip_serializing_if = "is_default", rename = "max_msg_size")]
1220 pub max_message_size: i32,
1221 pub storage: StorageType,
1223 pub num_replicas: usize,
1225 #[serde(default, skip_serializing_if = "is_default")]
1227 pub no_ack: bool,
1228 #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
1230 pub duplicate_window: Duration,
1231 #[serde(default, skip_serializing_if = "is_default")]
1233 pub template_owner: String,
1234 #[serde(default, skip_serializing_if = "is_default")]
1236 pub sealed: bool,
1237 #[serde(default, skip_serializing_if = "is_default")]
1239 pub description: Option<String>,
1240 #[serde(
1241 default,
1242 rename = "allow_rollup_hdrs",
1243 skip_serializing_if = "is_default"
1244 )]
1245 pub allow_rollup: bool,
1247 #[serde(default, skip_serializing_if = "is_default")]
1248 pub deny_delete: bool,
1250 #[serde(default, skip_serializing_if = "is_default")]
1252 pub deny_purge: bool,
1253
1254 #[serde(default, skip_serializing_if = "is_default")]
1256 pub republish: Option<Republish>,
1257
1258 #[serde(default, skip_serializing_if = "is_default")]
1261 pub allow_direct: bool,
1262
1263 #[serde(default, skip_serializing_if = "is_default")]
1265 pub mirror_direct: bool,
1266
1267 #[serde(default, skip_serializing_if = "Option::is_none")]
1269 pub mirror: Option<Source>,
1270
1271 #[serde(default, skip_serializing_if = "Option::is_none")]
1273 pub sources: Option<Vec<Source>>,
1274
1275 #[cfg(feature = "server_2_10")]
1276 #[serde(default, skip_serializing_if = "is_default")]
1278 pub metadata: HashMap<String, String>,
1279
1280 #[cfg(feature = "server_2_10")]
1281 #[serde(default, skip_serializing_if = "Option::is_none")]
1283 pub subject_transform: Option<SubjectTransform>,
1284
1285 #[cfg(feature = "server_2_10")]
1286 #[serde(default, skip_serializing_if = "Option::is_none")]
1291 pub compression: Option<Compression>,
1292 #[cfg(feature = "server_2_10")]
1293 #[serde(default, deserialize_with = "default_consumer_limits_as_none")]
1295 pub consumer_limits: Option<ConsumerLimits>,
1296
1297 #[cfg(feature = "server_2_10")]
1298 #[serde(default, skip_serializing_if = "Option::is_none", rename = "first_seq")]
1300 pub first_sequence: Option<u64>,
1301
1302 #[serde(default, skip_serializing_if = "Option::is_none")]
1304 pub placement: Option<Placement>,
1305
1306 #[serde(default, skip_serializing_if = "Option::is_none")]
1308 pub persist_mode: Option<PersistenceMode>,
1309
1310 #[cfg(feature = "server_2_11")]
1312 #[serde(
1313 default,
1314 with = "rfc3339::option",
1315 skip_serializing_if = "Option::is_none"
1316 )]
1317 pub pause_until: Option<OffsetDateTime>,
1318
1319 #[cfg(feature = "server_2_11")]
1321 #[serde(default, skip_serializing_if = "is_default", rename = "allow_msg_ttl")]
1322 pub allow_message_ttl: bool,
1323
1324 #[cfg(feature = "server_2_11")]
1327 #[serde(default, skip_serializing_if = "Option::is_none", with = "serde_nanos")]
1328 pub subject_delete_marker_ttl: Option<Duration>,
1329
1330 #[cfg(feature = "server_2_12")]
1332 #[serde(default, skip_serializing_if = "is_default", rename = "allow_atomic")]
1333 pub allow_atomic_publish: bool,
1334
1335 #[cfg(feature = "server_2_12")]
1337 #[serde(
1338 default,
1339 skip_serializing_if = "is_default",
1340 rename = "allow_msg_schedules"
1341 )]
1342 pub allow_message_schedules: bool,
1343
1344 #[cfg(feature = "server_2_12")]
1346 #[serde(
1347 default,
1348 skip_serializing_if = "is_default",
1349 rename = "allow_msg_counter"
1350 )]
1351 pub allow_message_counter: bool,
1352
1353 #[cfg(feature = "server_2_14")]
1355 #[cfg_attr(docsrs, doc(cfg(feature = "server_2_14")))]
1356 #[serde(default, skip_serializing_if = "is_default", rename = "allow_batched")]
1357 pub allow_batch_publish: bool,
1358}
1359
1360impl From<&Config> for Config {
1361 fn from(sc: &Config) -> Config {
1362 sc.clone()
1363 }
1364}
1365
1366impl From<&str> for Config {
1367 fn from(s: &str) -> Config {
1368 Config {
1369 name: s.to_string(),
1370 ..Default::default()
1371 }
1372 }
1373}
1374
1375#[cfg(feature = "server_2_10")]
1376fn default_consumer_limits_as_none<'de, D>(
1377 deserializer: D,
1378) -> Result<Option<ConsumerLimits>, D::Error>
1379where
1380 D: Deserializer<'de>,
1381{
1382 let consumer_limits = Option::<ConsumerLimits>::deserialize(deserializer)?;
1383 if let Some(cl) = consumer_limits {
1384 if cl == ConsumerLimits::default() {
1385 Ok(None)
1386 } else {
1387 Ok(Some(cl))
1388 }
1389 } else {
1390 Ok(None)
1391 }
1392}
1393#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Default)]
1394pub struct ConsumerLimits {
1395 #[serde(default, with = "serde_nanos")]
1397 pub inactive_threshold: std::time::Duration,
1398 #[serde(default)]
1400 pub max_ack_pending: i64,
1401}
1402
1403#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1404pub enum Compression {
1405 #[serde(rename = "s2")]
1406 S2,
1407 #[serde(rename = "none")]
1408 None,
1409}
1410
1411#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1413pub struct SubjectTransform {
1414 #[serde(rename = "src")]
1415 pub source: String,
1416
1417 #[serde(rename = "dest")]
1418 pub destination: String,
1419}
1420
1421#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1423pub struct Republish {
1424 #[serde(rename = "src")]
1426 pub source: String,
1427 #[serde(rename = "dest")]
1429 pub destination: String,
1430 #[serde(default)]
1432 pub headers_only: bool,
1433}
1434
1435#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
1437pub struct Placement {
1438 #[serde(default, skip_serializing_if = "is_default")]
1440 pub cluster: Option<String>,
1441 #[serde(default, skip_serializing_if = "is_default")]
1443 pub tags: Vec<String>,
1444}
1445
1446#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1449#[repr(u8)]
1450pub enum DiscardPolicy {
1451 #[default]
1453 #[serde(rename = "old")]
1454 Old = 0,
1455 #[serde(rename = "new")]
1457 New = 1,
1458}
1459
1460#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1462#[repr(u8)]
1463pub enum RetentionPolicy {
1464 #[default]
1467 #[serde(rename = "limits")]
1468 Limits = 0,
1469 #[serde(rename = "interest")]
1471 Interest = 1,
1472 #[serde(rename = "workqueue")]
1474 WorkQueue = 2,
1475}
1476
1477#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1479#[repr(u8)]
1480pub enum StorageType {
1481 #[default]
1483 #[serde(rename = "file")]
1484 File = 0,
1485 #[serde(rename = "memory")]
1487 Memory = 1,
1488}
1489
1490#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1492#[repr(u8)]
1493pub enum PersistenceMode {
1494 #[default]
1496 #[serde(rename = "default")]
1497 Default = 0,
1498 #[serde(rename = "async")]
1500 Async = 1,
1501}
1502
1503async fn stream_info_with_details(
1504 context: Context,
1505 stream: String,
1506 offset: usize,
1507 deleted_details: bool,
1508 subjects_filter: String,
1509) -> Result<Info, InfoError> {
1510 let subject = format!("STREAM.INFO.{stream}");
1511
1512 let payload = StreamInfoRequest {
1513 offset,
1514 deleted_details,
1515 subjects_filter,
1516 };
1517
1518 let response: Response<Info> = context.request(subject, &payload).await?;
1519
1520 match response {
1521 Response::Ok(info) => Ok(info),
1522 Response::Err { error } => Err(error.into()),
1523 }
1524}
1525
1526type InfoRequest = BoxFuture<'static, Result<Info, InfoError>>;
1527
1528#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
1529pub struct StreamInfoRequest {
1530 offset: usize,
1531 deleted_details: bool,
1532 subjects_filter: String,
1533}
1534
1535pub struct InfoWithSubjects {
1536 stream: String,
1537 context: Context,
1538 pub info: Info,
1539 offset: usize,
1540 subjects: collections::hash_map::IntoIter<String, usize>,
1541 info_request: Option<InfoRequest>,
1542 subjects_filter: String,
1543 pages_done: bool,
1544}
1545
1546impl InfoWithSubjects {
1547 pub fn new(context: Context, mut info: Info, subject: String) -> Self {
1548 let subjects = info.state.subjects.take().unwrap_or_default();
1549 let name = info.config.name.clone();
1550 InfoWithSubjects {
1551 context,
1552 info,
1553 pages_done: subjects.is_empty(),
1554 offset: subjects.len(),
1555 subjects: subjects.into_iter(),
1556 subjects_filter: subject,
1557 stream: name,
1558 info_request: None,
1559 }
1560 }
1561}
1562
1563impl futures_util::Stream for InfoWithSubjects {
1564 type Item = Result<(String, usize), InfoError>;
1565
1566 fn poll_next(
1567 mut self: Pin<&mut Self>,
1568 cx: &mut std::task::Context<'_>,
1569 ) -> Poll<Option<Self::Item>> {
1570 match self.subjects.next() {
1571 Some((subject, count)) => Poll::Ready(Some(Ok((subject, count)))),
1572 None => {
1573 if self.pages_done {
1575 return Poll::Ready(None);
1576 }
1577 let stream = self.stream.clone();
1578 let context = self.context.clone();
1579 let subjects_filter = self.subjects_filter.clone();
1580 let offset = self.offset;
1581 match self
1582 .info_request
1583 .get_or_insert_with(|| {
1584 Box::pin(stream_info_with_details(
1585 context,
1586 stream,
1587 offset,
1588 false,
1589 subjects_filter,
1590 ))
1591 })
1592 .poll_unpin(cx)
1593 {
1594 Poll::Ready(resp) => match resp {
1595 Ok(info) => {
1596 let subjects = info.state.subjects.clone();
1597 self.offset += subjects.as_ref().map_or_else(|| 0, |s| s.len());
1598 self.info_request = None;
1599 let subjects = subjects.unwrap_or_default();
1600 self.subjects = info.state.subjects.unwrap_or_default().into_iter();
1601 let total = info.paged_info.map(|info| info.total).unwrap_or(0);
1602 if total <= self.offset || subjects.is_empty() {
1603 self.pages_done = true;
1604 }
1605 match self.subjects.next() {
1606 Some((subject, count)) => Poll::Ready(Some(Ok((subject, count)))),
1607 None => Poll::Ready(None),
1608 }
1609 }
1610 Err(err) => Poll::Ready(Some(Err(err))),
1611 },
1612 Poll::Pending => Poll::Pending,
1613 }
1614 }
1615 }
1616 }
1617}
1618
1619#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1621pub struct Info {
1622 pub config: Config,
1624 #[serde(with = "rfc3339")]
1626 pub created: time::OffsetDateTime,
1627 pub state: State,
1629 pub cluster: Option<ClusterInfo>,
1631 #[serde(default)]
1633 pub mirror: Option<SourceInfo>,
1634 #[serde(default)]
1636 pub sources: Vec<SourceInfo>,
1637 #[serde(flatten)]
1638 paged_info: Option<PagedInfo>,
1639}
1640
1641#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1642pub struct PagedInfo {
1643 offset: usize,
1644 total: usize,
1645 limit: usize,
1646}
1647
1648#[derive(Deserialize)]
1649pub struct DeleteStatus {
1650 pub success: bool,
1651}
1652
1653#[cfg(feature = "server_2_11")]
1654#[derive(Deserialize)]
1655pub struct PauseResponse {
1656 pub paused: bool,
1657 #[serde(with = "rfc3339")]
1658 pub pause_until: OffsetDateTime,
1659 #[serde(default, with = "serde_nanos")]
1660 pub pause_remaining: Option<Duration>,
1661}
1662
1663#[cfg(feature = "server_2_11")]
1664#[derive(Serialize, Debug)]
1665struct PauseResumeConsumerRequest {
1666 #[serde(with = "rfc3339::option", skip_serializing_if = "Option::is_none")]
1667 pause_until: Option<OffsetDateTime>,
1668}
1669
1670#[cfg(feature = "server_2_14")]
1671#[derive(Serialize, Debug)]
1672pub(crate) struct ConsumerResetRequest {
1673 #[serde(default, skip_serializing_if = "is_default")]
1674 pub(crate) seq: u64,
1675}
1676
1677#[cfg(feature = "server_2_14")]
1679#[cfg_attr(docsrs, doc(cfg(feature = "server_2_14")))]
1680#[derive(Debug, Deserialize, Clone)]
1681pub struct ConsumerResetResponse {
1682 #[serde(flatten)]
1685 pub info: super::consumer::Info,
1686 pub reset_seq: u64,
1690}
1691
1692#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1694pub struct State {
1695 pub messages: u64,
1697 pub bytes: u64,
1699 #[serde(rename = "first_seq")]
1701 pub first_sequence: u64,
1702 #[serde(with = "rfc3339", rename = "first_ts")]
1704 pub first_timestamp: time::OffsetDateTime,
1705 #[serde(rename = "last_seq")]
1707 pub last_sequence: u64,
1708 #[serde(with = "rfc3339", rename = "last_ts")]
1710 pub last_timestamp: time::OffsetDateTime,
1711 pub consumer_count: usize,
1713 #[serde(default, rename = "num_subjects")]
1715 pub subjects_count: u64,
1716 #[serde(default, rename = "num_deleted")]
1718 pub deleted_count: Option<u64>,
1719 #[serde(default)]
1722 pub deleted: Option<Vec<u64>>,
1723
1724 pub(crate) subjects: Option<HashMap<String, usize>>,
1725}
1726
1727#[derive(Debug, Serialize, Deserialize, Clone)]
1729pub struct RawMessage {
1730 #[serde(rename = "subject")]
1732 pub subject: String,
1733
1734 #[serde(rename = "seq")]
1736 pub sequence: u64,
1737
1738 #[serde(default, rename = "data")]
1740 pub payload: String,
1741
1742 #[serde(default, rename = "hdrs")]
1744 pub headers: Option<String>,
1745
1746 #[serde(rename = "time", with = "rfc3339")]
1748 pub time: time::OffsetDateTime,
1749}
1750
1751impl TryFrom<RawMessage> for StreamMessage {
1752 type Error = crate::Error;
1753
1754 fn try_from(value: RawMessage) -> Result<Self, Self::Error> {
1755 let decoded_payload = STANDARD
1756 .decode(value.payload)
1757 .map_err(|err| Box::new(std::io::Error::other(err)))?;
1758 let decoded_headers = value
1759 .headers
1760 .map(|header| STANDARD.decode(header))
1761 .map_or(Ok(None), |v| v.map(Some))?;
1762
1763 let (headers, _, _) = decoded_headers
1764 .map_or_else(|| Ok((HeaderMap::new(), None, None)), |h| parse_headers(&h))?;
1765
1766 Ok(StreamMessage {
1767 subject: value.subject.into(),
1768 payload: decoded_payload.into(),
1769 headers,
1770 sequence: value.sequence,
1771 time: value.time,
1772 })
1773 }
1774}
1775
1776fn is_continuation(c: char) -> bool {
1777 c == ' ' || c == '\t'
1778}
1779const HEADER_LINE: &str = "NATS/1.0";
1780
1781#[allow(clippy::type_complexity)]
1782fn parse_headers(
1783 buf: &[u8],
1784) -> Result<(HeaderMap, Option<StatusCode>, Option<String>), crate::Error> {
1785 let mut headers = HeaderMap::new();
1786 let mut maybe_status: Option<StatusCode> = None;
1787 let mut maybe_description: Option<String> = None;
1788 let mut lines = if let Ok(line) = std::str::from_utf8(buf) {
1789 line.lines().peekable()
1790 } else {
1791 return Err(Box::new(std::io::Error::other("invalid header")));
1792 };
1793
1794 if let Some(line) = lines.next() {
1795 let line = line
1796 .strip_prefix(HEADER_LINE)
1797 .ok_or_else(|| {
1798 Box::new(std::io::Error::other(
1799 "version line does not start with NATS/1.0",
1800 ))
1801 })?
1802 .trim();
1803
1804 match line.split_once(' ') {
1805 Some((status, description)) => {
1806 if !status.is_empty() {
1807 maybe_status = Some(status.parse()?);
1808 }
1809
1810 if !description.is_empty() {
1811 maybe_description = Some(description.trim().to_string());
1812 }
1813 }
1814 None => {
1815 if !line.is_empty() {
1816 maybe_status = Some(line.parse()?);
1817 }
1818 }
1819 }
1820 } else {
1821 return Err(Box::new(std::io::Error::other(
1822 "expected header information not found",
1823 )));
1824 };
1825
1826 while let Some(line) = lines.next() {
1827 if line.is_empty() {
1828 continue;
1829 }
1830
1831 if let Some((k, v)) = line.split_once(':').to_owned() {
1832 let mut s = String::from(v.trim());
1833 while let Some(v) = lines.next_if(|s| s.starts_with(is_continuation)).to_owned() {
1834 s.push(' ');
1835 s.push_str(v.trim());
1836 }
1837
1838 headers.insert(
1839 HeaderName::from_str(k)?,
1840 HeaderValue::from_str(&s).map_err(|err| Box::new(io::Error::other(err)))?,
1841 );
1842 } else {
1843 return Err(Box::new(std::io::Error::other("malformed header line")));
1844 }
1845 }
1846
1847 if headers.is_empty() {
1848 Ok((HeaderMap::new(), maybe_status, maybe_description))
1849 } else {
1850 Ok((headers, maybe_status, maybe_description))
1851 }
1852}
1853
1854#[derive(Debug, Serialize, Deserialize, Clone)]
1855struct GetRawMessage {
1856 pub(crate) message: RawMessage,
1857}
1858
1859fn is_default<T: Default + Eq>(t: &T) -> bool {
1860 t == &T::default()
1861}
1862#[derive(Debug, Default, Deserialize, Clone, PartialEq, Eq)]
1864pub struct ClusterInfo {
1865 #[serde(default)]
1867 pub name: Option<String>,
1868 #[serde(default)]
1870 pub raft_group: Option<String>,
1871 #[serde(default)]
1873 pub leader: Option<String>,
1874 #[serde(default, with = "rfc3339::option")]
1876 pub leader_since: Option<OffsetDateTime>,
1877 #[cfg(feature = "server_2_12")]
1879 #[serde(default)]
1880 pub system_account: bool,
1882 #[cfg(feature = "server_2_12")]
1883 #[serde(default)]
1885 pub traffic_account: Option<String>,
1886 #[serde(default)]
1888 pub replicas: Vec<PeerInfo>,
1889}
1890
1891#[derive(Debug, Default, Deserialize, Clone, PartialEq, Eq)]
1893pub struct PeerInfo {
1894 pub name: String,
1896 pub current: bool,
1898 #[serde(with = "serde_nanos")]
1900 pub active: Duration,
1901 #[serde(default)]
1903 pub offline: bool,
1904 pub lag: Option<u64>,
1906}
1907
1908#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
1909pub struct SourceInfo {
1910 pub name: String,
1912 pub lag: u64,
1914 #[serde(deserialize_with = "negative_duration_as_none")]
1916 pub active: Option<std::time::Duration>,
1917 #[serde(default)]
1919 pub filter_subject: Option<String>,
1920 #[serde(default)]
1922 pub subject_transform_dest: Option<String>,
1923 #[serde(default)]
1925 pub subject_transforms: Vec<SubjectTransform>,
1926}
1927
1928fn negative_duration_as_none<'de, D>(
1929 deserializer: D,
1930) -> Result<Option<std::time::Duration>, D::Error>
1931where
1932 D: Deserializer<'de>,
1933{
1934 let n = i64::deserialize(deserializer)?;
1935 if n.is_negative() {
1936 Ok(None)
1937 } else {
1938 Ok(Some(std::time::Duration::from_nanos(n as u64)))
1939 }
1940}
1941
1942#[derive(Debug, Deserialize, Clone, Copy)]
1944pub struct PurgeResponse {
1945 pub success: bool,
1947 pub purged: u64,
1949}
1950#[derive(Default, Debug, Serialize, Clone)]
1952pub struct PurgeRequest {
1953 #[serde(default, rename = "seq", skip_serializing_if = "is_default")]
1955 pub sequence: Option<u64>,
1956
1957 #[serde(default, skip_serializing_if = "is_default")]
1959 pub filter: Option<String>,
1960
1961 #[serde(default, skip_serializing_if = "is_default")]
1963 pub keep: Option<u64>,
1964}
1965
1966#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
1967pub struct Source {
1968 pub name: String,
1970 #[serde(default, rename = "opt_start_seq", skip_serializing_if = "is_default")]
1972 pub start_sequence: Option<u64>,
1973 #[serde(
1974 default,
1975 rename = "opt_start_time",
1976 skip_serializing_if = "is_default",
1977 with = "rfc3339::option"
1978 )]
1979 pub start_time: Option<OffsetDateTime>,
1981 #[serde(default, skip_serializing_if = "is_default")]
1983 pub filter_subject: Option<String>,
1984 #[serde(default, skip_serializing_if = "Option::is_none")]
1986 pub external: Option<External>,
1987 #[serde(default, skip_serializing_if = "is_default")]
1989 pub domain: Option<String>,
1990 #[cfg(feature = "server_2_10")]
1992 #[serde(default, skip_serializing_if = "is_default")]
1993 pub subject_transforms: Vec<SubjectTransform>,
1994
1995 #[cfg(feature = "server_2_14")]
2002 #[cfg_attr(docsrs, doc(cfg(feature = "server_2_14")))]
2003 #[serde(default, skip_serializing_if = "Option::is_none")]
2004 pub consumer: Option<StreamConsumerSource>,
2005}
2006
2007#[cfg(feature = "server_2_14")]
2010#[cfg_attr(docsrs, doc(cfg(feature = "server_2_14")))]
2011#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
2012pub struct StreamConsumerSource {
2013 #[serde(default, skip_serializing_if = "is_default")]
2015 pub name: String,
2016 #[serde(default, skip_serializing_if = "is_default")]
2018 pub deliver_subject: String,
2019}
2020
2021#[cfg(feature = "server_2_14")]
2022impl StreamConsumerSource {
2023 pub fn new(name: impl Into<String>, deliver_subject: impl Into<String>) -> Self {
2026 Self {
2027 name: name.into(),
2028 deliver_subject: deliver_subject.into(),
2029 }
2030 }
2031}
2032
2033#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
2034pub struct External {
2035 #[serde(rename = "api")]
2037 pub api_prefix: String,
2038 #[serde(rename = "deliver", skip_serializing_if = "is_default")]
2040 pub delivery_prefix: Option<String>,
2041}
2042
2043use std::marker::PhantomData;
2044
2045#[derive(Debug, Default)]
2046pub struct Yes;
2047#[derive(Debug, Default)]
2048pub struct No;
2049
2050pub trait ToAssign: Debug {}
2051
2052impl ToAssign for Yes {}
2053impl ToAssign for No {}
2054
2055#[derive(Debug)]
2056pub struct Purge<SEQUENCE, KEEP>
2057where
2058 SEQUENCE: ToAssign,
2059 KEEP: ToAssign,
2060{
2061 inner: PurgeRequest,
2062 sequence_set: PhantomData<SEQUENCE>,
2063 keep_set: PhantomData<KEEP>,
2064 context: Context,
2065 stream_name: String,
2066}
2067
2068impl<SEQUENCE, KEEP> Purge<SEQUENCE, KEEP>
2069where
2070 SEQUENCE: ToAssign,
2071 KEEP: ToAssign,
2072{
2073 pub fn filter<T: Into<String>>(mut self, filter: T) -> Purge<SEQUENCE, KEEP> {
2075 self.inner.filter = Some(filter.into());
2076 self
2077 }
2078}
2079
2080impl Purge<No, No> {
2081 pub(crate) fn build<I>(stream: &Stream<I>) -> Purge<No, No> {
2082 Purge {
2083 context: stream.context.clone(),
2084 stream_name: stream.name.clone(),
2085 inner: Default::default(),
2086 sequence_set: PhantomData {},
2087 keep_set: PhantomData {},
2088 }
2089 }
2090}
2091
2092impl<KEEP> Purge<No, KEEP>
2093where
2094 KEEP: ToAssign,
2095{
2096 pub fn keep(self, keep: u64) -> Purge<No, Yes> {
2099 Purge {
2100 context: self.context.clone(),
2101 stream_name: self.stream_name.clone(),
2102 sequence_set: PhantomData {},
2103 keep_set: PhantomData {},
2104 inner: PurgeRequest {
2105 keep: Some(keep),
2106 ..self.inner
2107 },
2108 }
2109 }
2110}
2111impl<SEQUENCE> Purge<SEQUENCE, No>
2112where
2113 SEQUENCE: ToAssign,
2114{
2115 pub fn sequence(self, sequence: u64) -> Purge<Yes, No> {
2118 Purge {
2119 context: self.context.clone(),
2120 stream_name: self.stream_name.clone(),
2121 sequence_set: PhantomData {},
2122 keep_set: PhantomData {},
2123 inner: PurgeRequest {
2124 sequence: Some(sequence),
2125 ..self.inner
2126 },
2127 }
2128 }
2129}
2130
2131#[derive(Clone, Debug, PartialEq)]
2132pub enum PurgeErrorKind {
2133 Request,
2134 TimedOut,
2135 JetStream(super::errors::Error),
2136}
2137
2138impl Display for PurgeErrorKind {
2139 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2140 match self {
2141 Self::Request => write!(f, "request failed"),
2142 Self::TimedOut => write!(f, "timed out"),
2143 Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2144 }
2145 }
2146}
2147
2148pub type PurgeError = Error<PurgeErrorKind>;
2149
2150impl<S, K> IntoFuture for Purge<S, K>
2151where
2152 S: ToAssign + std::marker::Send,
2153 K: ToAssign + std::marker::Send,
2154{
2155 type Output = Result<PurgeResponse, PurgeError>;
2156
2157 type IntoFuture = BoxFuture<'static, Result<PurgeResponse, PurgeError>>;
2158
2159 fn into_future(self) -> Self::IntoFuture {
2160 Box::pin(std::future::IntoFuture::into_future(async move {
2161 let request_subject = format!("STREAM.PURGE.{}", self.stream_name);
2162 let response: Response<PurgeResponse> = self
2163 .context
2164 .request(request_subject, &self.inner)
2165 .map_err(|err| match err.kind() {
2166 RequestErrorKind::TimedOut => PurgeError::new(PurgeErrorKind::TimedOut),
2167 _ => PurgeError::with_source(PurgeErrorKind::Request, err),
2168 })
2169 .await?;
2170
2171 match response {
2172 Response::Err { error } => Err(PurgeError::new(PurgeErrorKind::JetStream(error))),
2173 Response::Ok(response) => Ok(response),
2174 }
2175 }))
2176 }
2177}
2178
2179#[derive(Deserialize, Debug)]
2180struct ConsumerPage {
2181 total: usize,
2182 consumers: Option<Vec<String>>,
2183}
2184
2185#[derive(Deserialize, Debug)]
2186struct ConsumerInfoPage {
2187 total: usize,
2188 consumers: Option<Vec<super::consumer::Info>>,
2189}
2190
2191type ConsumerNamesErrorKind = StreamsErrorKind;
2192type ConsumerNamesError = StreamsError;
2193type PageRequest = BoxFuture<'static, Result<ConsumerPage, RequestError>>;
2194
2195pub struct ConsumerNames {
2196 context: Context,
2197 stream: String,
2198 offset: usize,
2199 page_request: Option<PageRequest>,
2200 consumers: Vec<String>,
2201 done: bool,
2202}
2203
2204impl futures_util::Stream for ConsumerNames {
2205 type Item = Result<String, ConsumerNamesError>;
2206
2207 fn poll_next(
2208 mut self: Pin<&mut Self>,
2209 cx: &mut std::task::Context<'_>,
2210 ) -> std::task::Poll<Option<Self::Item>> {
2211 match self.page_request.as_mut() {
2212 Some(page) => match page.try_poll_unpin(cx) {
2213 std::task::Poll::Ready(page) => {
2214 self.page_request = None;
2215 let page = page.map_err(|err| {
2216 ConsumerNamesError::with_source(ConsumerNamesErrorKind::Other, err)
2217 })?;
2218
2219 if let Some(consumers) = page.consumers {
2220 self.offset += consumers.len();
2221 self.consumers = consumers;
2222 if self.offset >= page.total {
2223 self.done = true;
2224 }
2225 match self.consumers.pop() {
2226 Some(stream) => Poll::Ready(Some(Ok(stream))),
2227 None => Poll::Ready(None),
2228 }
2229 } else {
2230 Poll::Ready(None)
2231 }
2232 }
2233 std::task::Poll::Pending => std::task::Poll::Pending,
2234 },
2235 None => {
2236 if let Some(stream) = self.consumers.pop() {
2237 Poll::Ready(Some(Ok(stream)))
2238 } else {
2239 if self.done {
2240 return Poll::Ready(None);
2241 }
2242 let context = self.context.clone();
2243 let offset = self.offset;
2244 let stream = self.stream.clone();
2245 self.page_request = Some(Box::pin(async move {
2246 match context
2247 .request(
2248 format!("CONSUMER.NAMES.{stream}"),
2249 &json!({
2250 "offset": offset,
2251 }),
2252 )
2253 .await?
2254 {
2255 Response::Err { error } => Err(RequestError::with_source(
2256 super::context::RequestErrorKind::Other,
2257 error,
2258 )),
2259 Response::Ok(page) => Ok(page),
2260 }
2261 }));
2262 self.poll_next(cx)
2263 }
2264 }
2265 }
2266 }
2267}
2268
2269pub type ConsumersErrorKind = StreamsErrorKind;
2270pub type ConsumersError = StreamsError;
2271type PageInfoRequest = BoxFuture<'static, Result<ConsumerInfoPage, RequestError>>;
2272
2273pub struct Consumers {
2274 context: Context,
2275 stream: String,
2276 offset: usize,
2277 page_request: Option<PageInfoRequest>,
2278 consumers: Vec<super::consumer::Info>,
2279 done: bool,
2280}
2281
2282impl futures_util::Stream for Consumers {
2283 type Item = Result<super::consumer::Info, ConsumersError>;
2284
2285 fn poll_next(
2286 mut self: Pin<&mut Self>,
2287 cx: &mut std::task::Context<'_>,
2288 ) -> std::task::Poll<Option<Self::Item>> {
2289 match self.page_request.as_mut() {
2290 Some(page) => match page.try_poll_unpin(cx) {
2291 std::task::Poll::Ready(page) => {
2292 self.page_request = None;
2293 let page = page.map_err(|err| {
2294 ConsumersError::with_source(ConsumersErrorKind::Other, err)
2295 })?;
2296 if let Some(consumers) = page.consumers {
2297 self.offset += consumers.len();
2298 self.consumers = consumers;
2299 if self.offset >= page.total {
2300 self.done = true;
2301 }
2302 match self.consumers.pop() {
2303 Some(consumer) => Poll::Ready(Some(Ok(consumer))),
2304 None => Poll::Ready(None),
2305 }
2306 } else {
2307 Poll::Ready(None)
2308 }
2309 }
2310 std::task::Poll::Pending => std::task::Poll::Pending,
2311 },
2312 None => {
2313 if let Some(stream) = self.consumers.pop() {
2314 Poll::Ready(Some(Ok(stream)))
2315 } else {
2316 if self.done {
2317 return Poll::Ready(None);
2318 }
2319 let context = self.context.clone();
2320 let offset = self.offset;
2321 let stream = self.stream.clone();
2322 self.page_request = Some(Box::pin(async move {
2323 match context
2324 .request(
2325 format!("CONSUMER.LIST.{stream}"),
2326 &json!({
2327 "offset": offset,
2328 }),
2329 )
2330 .await?
2331 {
2332 Response::Err { error } => Err(RequestError::with_source(
2333 super::context::RequestErrorKind::Other,
2334 error,
2335 )),
2336 Response::Ok(page) => Ok(page),
2337 }
2338 }));
2339 self.poll_next(cx)
2340 }
2341 }
2342 }
2343 }
2344}
2345
2346#[derive(Clone, Debug, PartialEq)]
2347pub enum LastRawMessageErrorKind {
2348 NoMessageFound,
2349 InvalidSubject,
2350 JetStream(super::errors::Error),
2351 Other,
2352}
2353
2354impl Display for LastRawMessageErrorKind {
2355 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2356 match self {
2357 Self::NoMessageFound => write!(f, "no message found"),
2358 Self::InvalidSubject => write!(f, "invalid subject"),
2359 Self::Other => write!(f, "failed to get last raw message"),
2360 Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2361 }
2362 }
2363}
2364
2365pub type LastRawMessageError = Error<LastRawMessageErrorKind>;
2366pub type RawMessageErrorKind = LastRawMessageErrorKind;
2367pub type RawMessageError = LastRawMessageError;
2368
2369#[derive(Clone, Debug, PartialEq)]
2370pub enum ConsumerErrorKind {
2371 TimedOut,
2373 Request,
2374 InvalidConsumerType,
2375 InvalidName,
2376 JetStream(super::errors::Error),
2377 Other,
2378}
2379
2380impl Display for ConsumerErrorKind {
2381 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2382 match self {
2383 Self::TimedOut => write!(f, "timed out"),
2384 Self::Request => write!(f, "request failed"),
2385 Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2386 Self::Other => write!(f, "consumer error"),
2387 Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2388 Self::InvalidName => write!(f, "invalid consumer name"),
2389 }
2390 }
2391}
2392
2393pub type ConsumerError = Error<ConsumerErrorKind>;
2394
2395#[cfg(feature = "server_2_14")]
2396#[cfg_attr(docsrs, doc(cfg(feature = "server_2_14")))]
2397#[derive(Clone, Debug, PartialEq)]
2398pub enum ConsumerResetErrorKind {
2399 TimedOut,
2400 Request,
2401 NotFound,
2407 InvalidReset,
2411 JetStream(super::errors::Error),
2412}
2413
2414#[cfg(feature = "server_2_14")]
2415#[cfg_attr(docsrs, doc(cfg(feature = "server_2_14")))]
2416impl Display for ConsumerResetErrorKind {
2417 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2418 match self {
2419 Self::TimedOut => write!(f, "timed out"),
2420 Self::Request => write!(f, "request failed"),
2421 Self::NotFound => write!(f, "stream or consumer not found"),
2422 Self::InvalidReset => write!(f, "invalid reset"),
2423 Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2424 }
2425 }
2426}
2427
2428#[cfg(feature = "server_2_14")]
2429pub type ConsumerResetError = Error<ConsumerResetErrorKind>;
2430
2431#[cfg(feature = "server_2_14")]
2432impl From<super::errors::Error> for ConsumerResetError {
2433 fn from(err: super::errors::Error) -> Self {
2434 match err.error_code() {
2435 super::errors::ErrorCode::CONSUMER_INVALID_RESET => {
2436 ConsumerResetError::new(ConsumerResetErrorKind::InvalidReset)
2437 }
2438 super::errors::ErrorCode::CONSUMER_NOT_FOUND
2439 | super::errors::ErrorCode::STREAM_NOT_FOUND => {
2440 ConsumerResetError::new(ConsumerResetErrorKind::NotFound)
2441 }
2442 _ => ConsumerResetError::new(ConsumerResetErrorKind::JetStream(err)),
2443 }
2444 }
2445}
2446
2447#[cfg(feature = "server_2_14")]
2448impl From<super::context::RequestError> for ConsumerResetError {
2449 fn from(err: super::context::RequestError) -> Self {
2450 match err.kind() {
2451 RequestErrorKind::TimedOut => ConsumerResetError::new(ConsumerResetErrorKind::TimedOut),
2452 RequestErrorKind::NoResponders => {
2453 ConsumerResetError::new(ConsumerResetErrorKind::NotFound)
2454 }
2455 _ => ConsumerResetError::with_source(ConsumerResetErrorKind::Request, err),
2456 }
2457 }
2458}
2459
2460#[derive(Clone, Debug, PartialEq)]
2461pub enum ConsumerCreateStrictErrorKind {
2462 TimedOut,
2464 Request,
2465 InvalidConsumerType,
2466 InvalidName,
2467 AlreadyExists,
2468 JetStream(super::errors::Error),
2469 Other,
2470}
2471
2472impl Display for ConsumerCreateStrictErrorKind {
2473 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2474 match self {
2475 Self::TimedOut => write!(f, "timed out"),
2476 Self::Request => write!(f, "request failed"),
2477 Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2478 Self::Other => write!(f, "consumer error"),
2479 Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2480 Self::InvalidName => write!(f, "invalid consumer name"),
2481 Self::AlreadyExists => write!(f, "consumer already exists"),
2482 }
2483 }
2484}
2485
2486pub type ConsumerCreateStrictError = Error<ConsumerCreateStrictErrorKind>;
2487
2488#[derive(Clone, Debug, PartialEq)]
2489pub enum ConsumerUpdateErrorKind {
2490 TimedOut,
2492 Request,
2493 InvalidConsumerType,
2494 InvalidName,
2495 DoesNotExist,
2496 JetStream(super::errors::Error),
2497 Other,
2498}
2499
2500impl Display for ConsumerUpdateErrorKind {
2501 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2502 match self {
2503 Self::TimedOut => write!(f, "timed out"),
2504 Self::Request => write!(f, "request failed"),
2505 Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2506 Self::Other => write!(f, "consumer error"),
2507 Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2508 Self::InvalidName => write!(f, "invalid consumer name"),
2509 Self::DoesNotExist => write!(f, "consumer does not exist"),
2510 }
2511 }
2512}
2513
2514pub type ConsumerUpdateError = Error<ConsumerUpdateErrorKind>;
2515
2516impl From<super::errors::Error> for ConsumerError {
2517 fn from(err: super::errors::Error) -> Self {
2518 ConsumerError::new(ConsumerErrorKind::JetStream(err))
2519 }
2520}
2521impl From<super::errors::Error> for ConsumerCreateStrictError {
2522 fn from(err: super::errors::Error) -> Self {
2523 if err.error_code() == super::errors::ErrorCode::CONSUMER_ALREADY_EXISTS {
2524 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::AlreadyExists)
2525 } else {
2526 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::JetStream(err))
2527 }
2528 }
2529}
2530impl From<super::errors::Error> for ConsumerUpdateError {
2531 fn from(err: super::errors::Error) -> Self {
2532 if err.error_code() == super::errors::ErrorCode::CONSUMER_DOES_NOT_EXIST {
2533 ConsumerUpdateError::new(ConsumerUpdateErrorKind::DoesNotExist)
2534 } else {
2535 ConsumerUpdateError::new(ConsumerUpdateErrorKind::JetStream(err))
2536 }
2537 }
2538}
2539impl From<ConsumerError> for ConsumerUpdateError {
2540 fn from(err: ConsumerError) -> Self {
2541 match err.kind() {
2542 ConsumerErrorKind::JetStream(err) => {
2543 if err.error_code() == super::errors::ErrorCode::CONSUMER_DOES_NOT_EXIST {
2544 ConsumerUpdateError::new(ConsumerUpdateErrorKind::DoesNotExist)
2545 } else {
2546 ConsumerUpdateError::new(ConsumerUpdateErrorKind::JetStream(err))
2547 }
2548 }
2549 ConsumerErrorKind::Request => {
2550 ConsumerUpdateError::new(ConsumerUpdateErrorKind::Request)
2551 }
2552 ConsumerErrorKind::TimedOut => {
2553 ConsumerUpdateError::new(ConsumerUpdateErrorKind::TimedOut)
2554 }
2555 ConsumerErrorKind::InvalidConsumerType => {
2556 ConsumerUpdateError::new(ConsumerUpdateErrorKind::InvalidConsumerType)
2557 }
2558 ConsumerErrorKind::InvalidName => {
2559 ConsumerUpdateError::new(ConsumerUpdateErrorKind::InvalidName)
2560 }
2561 ConsumerErrorKind::Other => ConsumerUpdateError::new(ConsumerUpdateErrorKind::Other),
2562 }
2563 }
2564}
2565
2566impl From<ConsumerError> for ConsumerCreateStrictError {
2567 fn from(err: ConsumerError) -> Self {
2568 match err.kind() {
2569 ConsumerErrorKind::JetStream(err) => {
2570 if err.error_code() == super::errors::ErrorCode::CONSUMER_ALREADY_EXISTS {
2571 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::AlreadyExists)
2572 } else {
2573 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::JetStream(err))
2574 }
2575 }
2576 ConsumerErrorKind::Request => {
2577 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::Request)
2578 }
2579 ConsumerErrorKind::TimedOut => {
2580 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::TimedOut)
2581 }
2582 ConsumerErrorKind::InvalidConsumerType => {
2583 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::InvalidConsumerType)
2584 }
2585 ConsumerErrorKind::InvalidName => {
2586 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::InvalidName)
2587 }
2588 ConsumerErrorKind::Other => {
2589 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::Other)
2590 }
2591 }
2592 }
2593}
2594
2595impl From<super::context::RequestError> for ConsumerError {
2596 fn from(err: super::context::RequestError) -> Self {
2597 match err.kind() {
2598 RequestErrorKind::TimedOut => ConsumerError::new(ConsumerErrorKind::TimedOut),
2599 _ => ConsumerError::with_source(ConsumerErrorKind::Request, err),
2600 }
2601 }
2602}
2603impl From<super::context::RequestError> for ConsumerUpdateError {
2604 fn from(err: super::context::RequestError) -> Self {
2605 match err.kind() {
2606 RequestErrorKind::TimedOut => {
2607 ConsumerUpdateError::new(ConsumerUpdateErrorKind::TimedOut)
2608 }
2609 _ => ConsumerUpdateError::with_source(ConsumerUpdateErrorKind::Request, err),
2610 }
2611 }
2612}
2613impl From<super::context::RequestError> for ConsumerCreateStrictError {
2614 fn from(err: super::context::RequestError) -> Self {
2615 match err.kind() {
2616 RequestErrorKind::TimedOut => {
2617 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::TimedOut)
2618 }
2619 _ => {
2620 ConsumerCreateStrictError::with_source(ConsumerCreateStrictErrorKind::Request, err)
2621 }
2622 }
2623 }
2624}
2625
2626#[derive(Debug, Serialize, Default)]
2627pub struct DirectGetRequest {
2628 #[serde(rename = "seq", skip_serializing_if = "Option::is_none")]
2629 sequence: Option<u64>,
2630 #[serde(rename = "last_by_subj", skip_serializing)]
2631 last_by_subject: Option<String>,
2632 #[serde(rename = "next_by_subj", skip_serializing_if = "Option::is_none")]
2633 next_by_subject: Option<String>,
2634}
2635
2636pub struct WithHeaders;
2638
2639pub struct WithoutHeaders;
2641
2642trait DirectGetResponse: Sized {
2644 fn from_message(message: crate::Message) -> Result<Self, DirectGetError>;
2645}
2646
2647impl DirectGetResponse for StreamMessage {
2648 fn from_message(message: crate::Message) -> Result<Self, DirectGetError> {
2649 StreamMessage::try_from(message).map_err(Into::into)
2650 }
2651}
2652
2653impl DirectGetResponse for StreamValue {
2654 fn from_message(message: crate::Message) -> Result<Self, DirectGetError> {
2655 Ok(StreamValue {
2656 data: message.payload,
2657 })
2658 }
2659}
2660
2661pub struct DirectGetBuilder<T = WithHeaders> {
2662 context: Context,
2663 stream_name: String,
2664 request: DirectGetRequest,
2665 _phantom: std::marker::PhantomData<T>,
2666}
2667
2668impl DirectGetBuilder<WithHeaders> {
2669 fn new(context: Context, stream_name: String) -> DirectGetBuilder<WithHeaders> {
2670 DirectGetBuilder {
2671 context,
2672 stream_name,
2673 request: DirectGetRequest::default(),
2674 _phantom: std::marker::PhantomData,
2675 }
2676 }
2677}
2678
2679impl<T> DirectGetBuilder<T> {
2680 async fn send_internal<R: DirectGetResponse>(&self) -> Result<R, DirectGetError> {
2682 let payload = if self.request.last_by_subject.is_some() {
2685 Bytes::new()
2686 } else {
2687 serde_json::to_vec(&self.request).map(Bytes::from)?
2688 };
2689
2690 let request_subject = if let Some(ref subject) = self.request.last_by_subject {
2691 format!(
2692 "{}.DIRECT.GET.{}.{}",
2693 &self.context.prefix, &self.stream_name, subject
2694 )
2695 } else {
2696 format!("{}.DIRECT.GET.{}", &self.context.prefix, &self.stream_name)
2697 };
2698
2699 let response = self
2700 .context
2701 .client
2702 .request(request_subject, payload)
2703 .await?;
2704
2705 if let Some(status) = response.status {
2707 if let Some(ref description) = response.description {
2708 match status {
2709 StatusCode::NOT_FOUND => {
2710 return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
2711 }
2712 StatusCode::TIMEOUT => {
2714 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
2715 }
2716 _ => {
2717 return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
2718 status,
2719 description.to_string(),
2720 )));
2721 }
2722 }
2723 }
2724 }
2725
2726 R::from_message(response)
2727 }
2728
2729 pub fn sequence(mut self, seq: u64) -> Self {
2731 self.request.sequence = Some(seq);
2732 self
2733 }
2734
2735 pub fn last_by_subject<S: Into<String>>(mut self, subject: S) -> Self {
2737 self.request.last_by_subject = Some(subject.into());
2738 self
2739 }
2740
2741 pub fn next_by_subject<S: Into<String>>(mut self, subject: S) -> Self {
2743 self.request.next_by_subject = Some(subject.into());
2744 self
2745 }
2746}
2747
2748impl DirectGetBuilder<WithHeaders> {
2749 pub async fn send(self) -> Result<StreamMessage, DirectGetError> {
2751 self.send_internal::<StreamMessage>().await
2752 }
2753}
2754
2755impl DirectGetBuilder<WithoutHeaders> {
2756 pub async fn send(self) -> Result<StreamValue, DirectGetError> {
2758 self.send_internal::<StreamValue>().await
2759 }
2760}
2761
2762pub struct StreamValue {
2763 pub data: Bytes,
2764}
2765
2766#[derive(Debug, Serialize, Default)]
2767pub struct RawMessageRequest {
2768 #[serde(rename = "seq", skip_serializing_if = "Option::is_none")]
2769 sequence: Option<u64>,
2770 #[serde(rename = "last_by_subj", skip_serializing_if = "Option::is_none")]
2771 last_by_subject: Option<String>,
2772 #[serde(rename = "next_by_subj", skip_serializing_if = "Option::is_none")]
2773 next_by_subject: Option<String>,
2774}
2775
2776trait RawMessageResponse: Sized {
2778 fn from_raw_message(message: RawMessage) -> Result<Self, RawMessageError>;
2779}
2780
2781impl RawMessageResponse for StreamMessage {
2782 fn from_raw_message(message: RawMessage) -> Result<Self, RawMessageError> {
2783 StreamMessage::try_from(message)
2784 .map_err(|err| RawMessageError::with_source(RawMessageErrorKind::Other, err))
2785 }
2786}
2787
2788impl RawMessageResponse for StreamValue {
2789 fn from_raw_message(message: RawMessage) -> Result<Self, RawMessageError> {
2790 use base64::engine::general_purpose::STANDARD;
2791 use base64::Engine;
2792
2793 let decoded_payload = STANDARD.decode(message.payload).map_err(|err| {
2794 RawMessageError::with_source(
2795 RawMessageErrorKind::Other,
2796 Box::new(std::io::Error::other(err)),
2797 )
2798 })?;
2799
2800 Ok(StreamValue {
2801 data: decoded_payload.into(),
2802 })
2803 }
2804}
2805
2806pub struct RawMessageBuilder<T = WithHeaders> {
2807 context: Context,
2808 stream_name: String,
2809 request: RawMessageRequest,
2810 _phantom: std::marker::PhantomData<T>,
2811}
2812
2813impl RawMessageBuilder<WithHeaders> {
2814 fn new(context: Context, stream_name: String) -> Self {
2815 RawMessageBuilder {
2816 context,
2817 stream_name,
2818 request: RawMessageRequest::default(),
2819 _phantom: std::marker::PhantomData,
2820 }
2821 }
2822}
2823
2824impl<T> RawMessageBuilder<T> {
2825 async fn send_internal<R: RawMessageResponse>(&self) -> Result<R, RawMessageError> {
2827 for subject in [&self.request.last_by_subject, &self.request.next_by_subject]
2829 .into_iter()
2830 .flatten()
2831 {
2832 if !is_valid_subject(subject) {
2833 return Err(RawMessageError::new(RawMessageErrorKind::InvalidSubject));
2834 }
2835 }
2836
2837 let subject = format!("STREAM.MSG.GET.{}", &self.stream_name);
2838
2839 let response: Response<GetRawMessage> = self
2840 .context
2841 .request(subject, &self.request)
2842 .map_err(|err| RawMessageError::with_source(RawMessageErrorKind::Other, err))
2843 .await?;
2844
2845 match response {
2846 Response::Err { error } => {
2847 if error.error_code() == ErrorCode::NO_MESSAGE_FOUND {
2848 Err(RawMessageError::new(RawMessageErrorKind::NoMessageFound))
2849 } else {
2850 Err(RawMessageError::new(RawMessageErrorKind::JetStream(error)))
2851 }
2852 }
2853 Response::Ok(value) => R::from_raw_message(value.message),
2854 }
2855 }
2856
2857 pub fn sequence(mut self, seq: u64) -> Self {
2859 self.request.sequence = Some(seq);
2860 self
2861 }
2862
2863 pub fn last_by_subject<S: Into<String>>(mut self, subject: S) -> Self {
2865 self.request.last_by_subject = Some(subject.into());
2866 self
2867 }
2868
2869 pub fn next_by_subject<S: Into<String>>(mut self, subject: S) -> Self {
2871 self.request.next_by_subject = Some(subject.into());
2872 self
2873 }
2874}
2875
2876impl RawMessageBuilder<WithHeaders> {
2877 pub async fn send(self) -> Result<StreamMessage, RawMessageError> {
2879 self.send_internal::<StreamMessage>().await
2880 }
2881}
2882
2883impl RawMessageBuilder<WithoutHeaders> {
2884 pub async fn send(self) -> Result<StreamValue, RawMessageError> {
2886 self.send_internal::<StreamValue>().await
2887 }
2888}
2889
2890#[cfg(test)]
2891mod tests {
2892 use super::*;
2893
2894 #[test]
2895 fn consumer_limits_de() {
2896 let config = Config {
2897 ..Default::default()
2898 };
2899
2900 let roundtrip: Config = {
2901 let ser = serde_json::to_string(&config).unwrap();
2902 serde_json::from_str(&ser).unwrap()
2903 };
2904 assert_eq!(config, roundtrip);
2905 }
2906}