1use std::{
17 collections::{self, HashMap},
18 fmt::{self, Debug, Display},
19 future::IntoFuture,
20 io::{self},
21 pin::Pin,
22 str::FromStr,
23 task::Poll,
24 time::Duration,
25};
26
27use crate::{
28 error::Error, header::HeaderName, is_valid_subject, HeaderMap, HeaderValue, StatusCode,
29};
30use base64::engine::general_purpose::STANDARD;
31use base64::engine::Engine;
32use bytes::Bytes;
33use futures_util::{future::BoxFuture, FutureExt, TryFutureExt};
34use serde::{Deserialize, Deserializer, Serialize};
35use serde_json::json;
36use time::{serde::rfc3339, OffsetDateTime};
37
38use super::{
39 consumer::{self, Consumer, FromConsumer, IntoConsumerConfig},
40 context::{
41 ConsumerInfoError, ConsumerInfoErrorKind, RequestError, RequestErrorKind, StreamsError,
42 StreamsErrorKind,
43 },
44 errors::ErrorCode,
45 is_valid_name,
46 message::{StreamMessage, StreamMessageError},
47 response::Response,
48 Context,
49};
50
51pub type InfoError = RequestError;
52
53#[derive(Clone, Debug, PartialEq)]
54pub enum DirectGetErrorKind {
55 NotFound,
56 InvalidSubject,
57 TimedOut,
58 Request,
59 ErrorResponse(StatusCode, String),
60 Other,
61}
62
63impl Display for DirectGetErrorKind {
64 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65 match self {
66 Self::InvalidSubject => write!(f, "invalid subject"),
67 Self::NotFound => write!(f, "message not found"),
68 Self::ErrorResponse(status, description) => {
69 write!(f, "unable to get message: {status} {description}")
70 }
71 Self::Other => write!(f, "error getting message"),
72 Self::TimedOut => write!(f, "timed out"),
73 Self::Request => write!(f, "request failed"),
74 }
75 }
76}
77
78pub type DirectGetError = Error<DirectGetErrorKind>;
79
80impl From<crate::RequestError> for DirectGetError {
81 fn from(err: crate::RequestError) -> Self {
82 match err.kind() {
83 crate::RequestErrorKind::TimedOut => DirectGetError::new(DirectGetErrorKind::TimedOut),
84 crate::RequestErrorKind::NoResponders => {
85 DirectGetError::new(DirectGetErrorKind::ErrorResponse(
86 StatusCode::NO_RESPONDERS,
87 "no responders".to_string(),
88 ))
89 }
90 crate::RequestErrorKind::InvalidSubject
91 | crate::RequestErrorKind::MaxPayloadExceeded
92 | crate::RequestErrorKind::Other => {
93 DirectGetError::with_source(DirectGetErrorKind::Other, err)
94 }
95 }
96 }
97}
98
99impl From<serde_json::Error> for DirectGetError {
100 fn from(err: serde_json::Error) -> Self {
101 DirectGetError::with_source(DirectGetErrorKind::Other, err)
102 }
103}
104
105impl From<StreamMessageError> for DirectGetError {
106 fn from(err: StreamMessageError) -> Self {
107 DirectGetError::with_source(DirectGetErrorKind::Other, err)
108 }
109}
110
111#[derive(Clone, Debug, PartialEq)]
112pub enum DeleteMessageErrorKind {
113 Request,
114 TimedOut,
115 JetStream(super::errors::Error),
116}
117
118impl Display for DeleteMessageErrorKind {
119 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
120 match self {
121 Self::Request => write!(f, "request failed"),
122 Self::TimedOut => write!(f, "timed out"),
123 Self::JetStream(err) => write!(f, "JetStream error: {err}"),
124 }
125 }
126}
127
128pub type DeleteMessageError = Error<DeleteMessageErrorKind>;
129
130#[derive(Debug, Clone)]
134pub struct Stream<T = Info> {
135 pub(crate) info: T,
136 pub(crate) context: Context,
137 pub(crate) name: String,
138}
139
140impl Stream<Info> {
141 pub async fn info(&mut self) -> Result<&Info, InfoError> {
159 let subject = format!("STREAM.INFO.{}", self.info.config.name);
160
161 match self.context.request(subject, &json!({})).await? {
162 Response::Ok::<Info>(info) => {
163 self.info = info;
164 Ok(&self.info)
165 }
166 Response::Err { error } => Err(error.into()),
167 }
168 }
169
170 pub fn cached_info(&self) -> &Info {
189 &self.info
190 }
191}
192
193impl<I> Stream<I> {
194 pub async fn get_info(&self) -> Result<Info, InfoError> {
197 let subject = format!("STREAM.INFO.{}", self.name);
198
199 match self.context.request(subject, &json!({})).await? {
200 Response::Ok::<Info>(info) => Ok(info),
201 Response::Err { error } => Err(error.into()),
202 }
203 }
204
205 pub async fn info_with_subjects<F: AsRef<str>>(
228 &self,
229 subjects_filter: F,
230 ) -> Result<InfoWithSubjects, InfoError> {
231 let subjects_filter = subjects_filter.as_ref().to_string();
232 let info = stream_info_with_details(
234 self.context.clone(),
235 self.name.clone(),
236 0,
237 false,
238 subjects_filter.clone(),
239 )
240 .await?;
241
242 Ok(InfoWithSubjects::new(
243 self.context.clone(),
244 info,
245 subjects_filter,
246 ))
247 }
248
249 pub fn info_builder(&self) -> StreamInfoBuilder {
275 StreamInfoBuilder::new(self.context.clone(), self.name.clone())
276 }
277
278 pub fn direct_get_builder(&self) -> DirectGetBuilder<WithHeaders> {
298 DirectGetBuilder::new(self.context.clone(), self.name.clone())
299 }
300
301 pub async fn direct_get_next_for_subject<T: Into<String>>(
336 &self,
337 subject: T,
338 sequence: Option<u64>,
339 ) -> Result<StreamMessage, DirectGetError> {
340 let subject_str = subject.into();
341 if !is_valid_subject(&subject_str) {
342 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject));
343 }
344
345 let mut builder = self.direct_get_builder().next_by_subject(subject_str);
346 if let Some(seq) = sequence {
347 builder = builder.sequence(seq);
348 }
349
350 builder.send().await
351 }
352
353 pub async fn direct_get_first_for_subject<T: Into<String>>(
385 &self,
386 subject: T,
387 ) -> Result<StreamMessage, DirectGetError> {
388 let subject_str = subject.into();
389 if !is_valid_subject(&subject_str) {
390 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject));
391 }
392
393 self.direct_get_builder()
394 .next_by_subject(subject_str)
395 .send()
396 .await
397 }
398
399 pub async fn direct_get(&self, sequence: u64) -> Result<StreamMessage, DirectGetError> {
431 self.direct_get_builder().sequence(sequence).send().await
432 }
433
434 pub async fn direct_get_last_for_subject<T: Into<String>>(
466 &self,
467 subject: T,
468 ) -> Result<StreamMessage, DirectGetError> {
469 self.direct_get_builder()
470 .last_by_subject(subject)
471 .send()
472 .await
473 }
474 pub fn raw_message_builder(&self) -> RawMessageBuilder<WithHeaders> {
494 RawMessageBuilder::new(self.context.clone(), self.name.clone())
495 }
496
497 pub async fn get_raw_message(&self, sequence: u64) -> Result<StreamMessage, RawMessageError> {
527 self.raw_message_builder().sequence(sequence).send().await
528 }
529
530 pub async fn get_first_raw_message_by_subject<T: AsRef<str>>(
554 &self,
555 subject: T,
556 sequence: u64,
557 ) -> Result<StreamMessage, RawMessageError> {
558 self.raw_message_builder()
559 .sequence(sequence)
560 .next_by_subject(subject.as_ref().to_string())
561 .send()
562 .await
563 }
564
565 pub async fn get_next_raw_message_by_subject<T: AsRef<str>>(
589 &self,
590 subject: T,
591 ) -> Result<StreamMessage, RawMessageError> {
592 self.raw_message_builder()
593 .next_by_subject(subject.as_ref().to_string())
594 .send()
595 .await
596 }
597
598 pub async fn get_last_raw_message_by_subject(
622 &self,
623 stream_subject: &str,
624 ) -> Result<StreamMessage, LastRawMessageError> {
625 self.raw_message_builder()
626 .last_by_subject(stream_subject.to_string())
627 .send()
628 .await
629 }
630
631 pub async fn delete_message(&self, sequence: u64) -> Result<bool, DeleteMessageError> {
655 let subject = format!("STREAM.MSG.DELETE.{}", &self.name);
656 let payload = json!({
657 "seq": sequence,
658 });
659
660 let response: Response<DeleteStatus> = self
661 .context
662 .request(subject, &payload)
663 .map_err(|err| match err.kind() {
664 RequestErrorKind::TimedOut => {
665 DeleteMessageError::new(DeleteMessageErrorKind::TimedOut)
666 }
667 _ => DeleteMessageError::with_source(DeleteMessageErrorKind::Request, err),
668 })
669 .await?;
670
671 match response {
672 Response::Err { error } => Err(DeleteMessageError::new(
673 DeleteMessageErrorKind::JetStream(error),
674 )),
675 Response::Ok(value) => Ok(value.success),
676 }
677 }
678
679 pub fn purge(&self) -> Purge<No, No> {
695 Purge::build(self)
696 }
697
698 #[deprecated(
715 since = "0.25.0",
716 note = "Overloads have been replaced with an into_future based builder. Use Stream::purge().filter(subject) instead."
717 )]
718 pub async fn purge_subject<T>(&self, subject: T) -> Result<PurgeResponse, PurgeError>
719 where
720 T: Into<String>,
721 {
722 self.purge().filter(subject).await
723 }
724
725 pub async fn create_consumer<C: IntoConsumerConfig + FromConsumer>(
749 &self,
750 config: C,
751 ) -> Result<Consumer<C>, ConsumerError> {
752 self.context
753 .create_consumer_on_stream(config, self.name.clone())
754 .await
755 }
756
757 #[cfg(feature = "server_2_10")]
781 pub async fn update_consumer<C: IntoConsumerConfig + FromConsumer>(
782 &self,
783 config: C,
784 ) -> Result<Consumer<C>, ConsumerUpdateError> {
785 self.context
786 .update_consumer_on_stream(config, self.name.clone())
787 .await
788 }
789
790 #[cfg(feature = "server_2_10")]
815 pub async fn create_consumer_strict<C: IntoConsumerConfig + FromConsumer>(
816 &self,
817 config: C,
818 ) -> Result<Consumer<C>, ConsumerCreateStrictError> {
819 self.context
820 .create_consumer_strict_on_stream(config, self.name.clone())
821 .await
822 }
823
824 pub async fn consumer_info<T: AsRef<str>>(
841 &self,
842 name: T,
843 ) -> Result<consumer::Info, ConsumerInfoError> {
844 let name = name.as_ref();
845
846 if !is_valid_name(name) {
847 return Err(ConsumerInfoError::new(ConsumerInfoErrorKind::InvalidName));
848 }
849
850 let subject = format!("CONSUMER.INFO.{}.{}", self.name, name);
851
852 match self.context.request(subject, &json!({})).await? {
853 Response::Ok(info) => Ok(info),
854 Response::Err { error } => Err(error.into()),
855 }
856 }
857
858 pub async fn get_consumer<T: FromConsumer + IntoConsumerConfig>(
877 &self,
878 name: &str,
879 ) -> Result<Consumer<T>, crate::Error> {
880 let info = self.consumer_info(name).await?;
881
882 Ok(Consumer::new(
883 T::try_from_consumer_config(info.config.clone())?,
884 info,
885 self.context.clone(),
886 ))
887 }
888
889 pub async fn get_or_create_consumer<T: FromConsumer + IntoConsumerConfig>(
917 &self,
918 name: &str,
919 config: T,
920 ) -> Result<Consumer<T>, ConsumerError> {
921 let subject = format!("CONSUMER.INFO.{}.{}", self.name, name);
922
923 match self.context.request(subject, &json!({})).await? {
924 Response::Err { error } if error.code() == 404 => self.create_consumer(config).await,
925 Response::Err { error } => Err(error.into()),
926 Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
927 T::try_from_consumer_config(info.config.clone()).map_err(|err| {
928 ConsumerError::with_source(ConsumerErrorKind::InvalidConsumerType, err)
929 })?,
930 info,
931 self.context.clone(),
932 )),
933 }
934 }
935
936 pub async fn delete_consumer(&self, name: &str) -> Result<DeleteStatus, ConsumerError> {
957 let subject = format!("CONSUMER.DELETE.{}.{}", self.name, name);
958
959 match self.context.request(subject, &json!({})).await? {
960 Response::Ok(delete_status) => Ok(delete_status),
961 Response::Err { error } => Err(error.into()),
962 }
963 }
964
965 #[cfg(feature = "server_2_11")]
989 pub async fn pause_consumer(
990 &self,
991 name: &str,
992 pause_until: OffsetDateTime,
993 ) -> Result<PauseResponse, ConsumerError> {
994 self.request_pause_consumer(name, Some(pause_until)).await
995 }
996
997 #[cfg(feature = "server_2_11")]
1018 pub async fn resume_consumer(&self, name: &str) -> Result<PauseResponse, ConsumerError> {
1019 self.request_pause_consumer(name, None).await
1020 }
1021
1022 #[cfg(feature = "server_2_11")]
1023 async fn request_pause_consumer(
1024 &self,
1025 name: &str,
1026 pause_until: Option<OffsetDateTime>,
1027 ) -> Result<PauseResponse, ConsumerError> {
1028 let subject = format!("CONSUMER.PAUSE.{}.{}", self.name, name);
1029 let payload = &PauseResumeConsumerRequest { pause_until };
1030
1031 match self.context.request(subject, payload).await? {
1032 Response::Ok::<PauseResponse>(resp) => Ok(resp),
1033 Response::Err { error } => Err(error.into()),
1034 }
1035 }
1036
1037 #[cfg(feature = "server_2_14")]
1069 #[cfg_attr(docsrs, doc(cfg(feature = "server_2_14")))]
1070 pub async fn reset_consumer(
1071 &self,
1072 name: &str,
1073 seq: Option<u64>,
1074 ) -> Result<ConsumerResetResponse, ConsumerResetError> {
1075 let subject = format!("CONSUMER.RESET.{}.{}", self.name, name);
1076 let payload = ConsumerResetRequest {
1077 seq: seq.unwrap_or(0),
1078 };
1079
1080 match self.context.request(subject, &payload).await? {
1081 Response::Ok::<ConsumerResetResponse>(resp) => Ok(resp),
1082 Response::Err { error } => Err(error.into()),
1083 }
1084 }
1085
1086 pub fn consumer_names(&self) -> ConsumerNames {
1105 ConsumerNames {
1106 context: self.context.clone(),
1107 stream: self.name.clone(),
1108 offset: 0,
1109 page_request: None,
1110 consumers: Vec::new(),
1111 done: false,
1112 }
1113 }
1114
1115 pub fn consumers(&self) -> Consumers {
1134 Consumers {
1135 context: self.context.clone(),
1136 stream: self.name.clone(),
1137 offset: 0,
1138 page_request: None,
1139 consumers: Vec::new(),
1140 done: false,
1141 }
1142 }
1143}
1144
1145pub struct StreamInfoBuilder {
1146 pub(crate) context: Context,
1147 pub(crate) name: String,
1148 pub(crate) deleted: bool,
1149 pub(crate) subject: String,
1150}
1151
1152impl StreamInfoBuilder {
1153 fn new(context: Context, name: String) -> Self {
1154 Self {
1155 context,
1156 name,
1157 deleted: false,
1158 subject: "".to_string(),
1159 }
1160 }
1161
1162 pub fn with_deleted(mut self, deleted: bool) -> Self {
1163 self.deleted = deleted;
1164 self
1165 }
1166
1167 pub fn subjects<S: Into<String>>(mut self, subject: S) -> Self {
1168 self.subject = subject.into();
1169 self
1170 }
1171
1172 pub async fn fetch(self) -> Result<InfoWithSubjects, InfoError> {
1173 let info = stream_info_with_details(
1174 self.context.clone(),
1175 self.name.clone(),
1176 0,
1177 self.deleted,
1178 self.subject.clone(),
1179 )
1180 .await?;
1181
1182 Ok(InfoWithSubjects::new(self.context, info, self.subject))
1183 }
1184}
1185
1186#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
1190pub struct Config {
1191 pub name: String,
1193 #[serde(default)]
1195 pub max_bytes: i64,
1196 #[serde(default, rename = "max_msgs")]
1198 pub max_messages: i64,
1199 #[serde(default, rename = "max_msgs_per_subject")]
1201 pub max_messages_per_subject: i64,
1202 pub discard: DiscardPolicy,
1205 #[serde(default, skip_serializing_if = "is_default")]
1207 pub discard_new_per_subject: bool,
1208 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1211 pub subjects: Vec<String>,
1212 pub retention: RetentionPolicy,
1214 #[serde(default)]
1216 pub max_consumers: i32,
1217 #[serde(default, with = "serde_nanos")]
1219 pub max_age: Duration,
1220 #[serde(default, skip_serializing_if = "is_default", rename = "max_msg_size")]
1222 pub max_message_size: i32,
1223 pub storage: StorageType,
1225 pub num_replicas: usize,
1227 #[serde(default, skip_serializing_if = "is_default")]
1229 pub no_ack: bool,
1230 #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
1232 pub duplicate_window: Duration,
1233 #[serde(default, skip_serializing_if = "is_default")]
1235 pub template_owner: String,
1236 #[serde(default, skip_serializing_if = "is_default")]
1238 pub sealed: bool,
1239 #[serde(default, skip_serializing_if = "is_default")]
1241 pub description: Option<String>,
1242 #[serde(
1243 default,
1244 rename = "allow_rollup_hdrs",
1245 skip_serializing_if = "is_default"
1246 )]
1247 pub allow_rollup: bool,
1249 #[serde(default, skip_serializing_if = "is_default")]
1250 pub deny_delete: bool,
1252 #[serde(default, skip_serializing_if = "is_default")]
1254 pub deny_purge: bool,
1255
1256 #[serde(default, skip_serializing_if = "is_default")]
1258 pub republish: Option<Republish>,
1259
1260 #[serde(default, skip_serializing_if = "is_default")]
1263 pub allow_direct: bool,
1264
1265 #[serde(default, skip_serializing_if = "is_default")]
1267 pub mirror_direct: bool,
1268
1269 #[serde(default, skip_serializing_if = "Option::is_none")]
1271 pub mirror: Option<Source>,
1272
1273 #[serde(default, skip_serializing_if = "Option::is_none")]
1275 pub sources: Option<Vec<Source>>,
1276
1277 #[cfg(feature = "server_2_10")]
1278 #[serde(default, skip_serializing_if = "is_default")]
1280 pub metadata: HashMap<String, String>,
1281
1282 #[cfg(feature = "server_2_10")]
1283 #[serde(default, skip_serializing_if = "Option::is_none")]
1285 pub subject_transform: Option<SubjectTransform>,
1286
1287 #[cfg(feature = "server_2_10")]
1288 #[serde(default, skip_serializing_if = "Option::is_none")]
1293 pub compression: Option<Compression>,
1294 #[cfg(feature = "server_2_10")]
1295 #[serde(default, deserialize_with = "default_consumer_limits_as_none")]
1297 pub consumer_limits: Option<ConsumerLimits>,
1298
1299 #[cfg(feature = "server_2_10")]
1300 #[serde(default, skip_serializing_if = "Option::is_none", rename = "first_seq")]
1302 pub first_sequence: Option<u64>,
1303
1304 #[serde(default, skip_serializing_if = "Option::is_none")]
1306 pub placement: Option<Placement>,
1307
1308 #[serde(default, skip_serializing_if = "Option::is_none")]
1310 pub persist_mode: Option<PersistenceMode>,
1311
1312 #[cfg(feature = "server_2_11")]
1314 #[serde(
1315 default,
1316 with = "rfc3339::option",
1317 skip_serializing_if = "Option::is_none"
1318 )]
1319 pub pause_until: Option<OffsetDateTime>,
1320
1321 #[cfg(feature = "server_2_11")]
1323 #[serde(default, skip_serializing_if = "is_default", rename = "allow_msg_ttl")]
1324 pub allow_message_ttl: bool,
1325
1326 #[cfg(feature = "server_2_11")]
1329 #[serde(default, skip_serializing_if = "Option::is_none", with = "serde_nanos")]
1330 pub subject_delete_marker_ttl: Option<Duration>,
1331
1332 #[cfg(feature = "server_2_12")]
1334 #[serde(default, skip_serializing_if = "is_default", rename = "allow_atomic")]
1335 pub allow_atomic_publish: bool,
1336
1337 #[cfg(feature = "server_2_12")]
1339 #[serde(
1340 default,
1341 skip_serializing_if = "is_default",
1342 rename = "allow_msg_schedules"
1343 )]
1344 pub allow_message_schedules: bool,
1345
1346 #[cfg(feature = "server_2_12")]
1348 #[serde(
1349 default,
1350 skip_serializing_if = "is_default",
1351 rename = "allow_msg_counter"
1352 )]
1353 pub allow_message_counter: bool,
1354
1355 #[cfg(feature = "server_2_14")]
1357 #[cfg_attr(docsrs, doc(cfg(feature = "server_2_14")))]
1358 #[serde(default, skip_serializing_if = "is_default", rename = "allow_batched")]
1359 pub allow_batch_publish: bool,
1360}
1361
1362impl From<&Config> for Config {
1363 fn from(sc: &Config) -> Config {
1364 sc.clone()
1365 }
1366}
1367
1368impl From<&str> for Config {
1369 fn from(s: &str) -> Config {
1370 Config {
1371 name: s.to_string(),
1372 ..Default::default()
1373 }
1374 }
1375}
1376
1377#[cfg(feature = "server_2_10")]
1378fn default_consumer_limits_as_none<'de, D>(
1379 deserializer: D,
1380) -> Result<Option<ConsumerLimits>, D::Error>
1381where
1382 D: Deserializer<'de>,
1383{
1384 let consumer_limits = Option::<ConsumerLimits>::deserialize(deserializer)?;
1385 if let Some(cl) = consumer_limits {
1386 if cl == ConsumerLimits::default() {
1387 Ok(None)
1388 } else {
1389 Ok(Some(cl))
1390 }
1391 } else {
1392 Ok(None)
1393 }
1394}
1395#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Default)]
1396pub struct ConsumerLimits {
1397 #[serde(default, with = "serde_nanos")]
1399 pub inactive_threshold: std::time::Duration,
1400 #[serde(default)]
1402 pub max_ack_pending: i64,
1403}
1404
1405#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1406pub enum Compression {
1407 #[serde(rename = "s2")]
1408 S2,
1409 #[serde(rename = "none")]
1410 None,
1411}
1412
1413#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1415pub struct SubjectTransform {
1416 #[serde(rename = "src")]
1417 pub source: String,
1418
1419 #[serde(rename = "dest")]
1420 pub destination: String,
1421}
1422
1423#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1425pub struct Republish {
1426 #[serde(rename = "src")]
1428 pub source: String,
1429 #[serde(rename = "dest")]
1431 pub destination: String,
1432 #[serde(default)]
1434 pub headers_only: bool,
1435}
1436
1437#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
1439pub struct Placement {
1440 #[serde(default, skip_serializing_if = "is_default")]
1442 pub cluster: Option<String>,
1443 #[serde(default, skip_serializing_if = "is_default")]
1445 pub tags: Vec<String>,
1446}
1447
1448#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1451#[repr(u8)]
1452pub enum DiscardPolicy {
1453 #[default]
1455 #[serde(rename = "old")]
1456 Old = 0,
1457 #[serde(rename = "new")]
1459 New = 1,
1460}
1461
1462#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1464#[repr(u8)]
1465pub enum RetentionPolicy {
1466 #[default]
1469 #[serde(rename = "limits")]
1470 Limits = 0,
1471 #[serde(rename = "interest")]
1473 Interest = 1,
1474 #[serde(rename = "workqueue")]
1476 WorkQueue = 2,
1477}
1478
1479#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1481#[repr(u8)]
1482pub enum StorageType {
1483 #[default]
1485 #[serde(rename = "file")]
1486 File = 0,
1487 #[serde(rename = "memory")]
1489 Memory = 1,
1490}
1491
1492#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1494#[repr(u8)]
1495pub enum PersistenceMode {
1496 #[default]
1498 #[serde(rename = "default")]
1499 Default = 0,
1500 #[serde(rename = "async")]
1502 Async = 1,
1503}
1504
1505async fn stream_info_with_details(
1506 context: Context,
1507 stream: String,
1508 offset: usize,
1509 deleted_details: bool,
1510 subjects_filter: String,
1511) -> Result<Info, InfoError> {
1512 let subject = format!("STREAM.INFO.{stream}");
1513
1514 let payload = StreamInfoRequest {
1515 offset,
1516 deleted_details,
1517 subjects_filter,
1518 };
1519
1520 let response: Response<Info> = context.request(subject, &payload).await?;
1521
1522 match response {
1523 Response::Ok(info) => Ok(info),
1524 Response::Err { error } => Err(error.into()),
1525 }
1526}
1527
1528type InfoRequest = BoxFuture<'static, Result<Info, InfoError>>;
1529
1530#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
1531pub struct StreamInfoRequest {
1532 offset: usize,
1533 deleted_details: bool,
1534 subjects_filter: String,
1535}
1536
1537pub struct InfoWithSubjects {
1538 stream: String,
1539 context: Context,
1540 pub info: Info,
1541 offset: usize,
1542 subjects: collections::hash_map::IntoIter<String, usize>,
1543 info_request: Option<InfoRequest>,
1544 subjects_filter: String,
1545 pages_done: bool,
1546}
1547
1548impl InfoWithSubjects {
1549 pub fn new(context: Context, mut info: Info, subject: String) -> Self {
1550 let subjects = info.state.subjects.take().unwrap_or_default();
1551 let name = info.config.name.clone();
1552 InfoWithSubjects {
1553 context,
1554 info,
1555 pages_done: subjects.is_empty(),
1556 offset: subjects.len(),
1557 subjects: subjects.into_iter(),
1558 subjects_filter: subject,
1559 stream: name,
1560 info_request: None,
1561 }
1562 }
1563}
1564
1565impl futures_util::Stream for InfoWithSubjects {
1566 type Item = Result<(String, usize), InfoError>;
1567
1568 fn poll_next(
1569 mut self: Pin<&mut Self>,
1570 cx: &mut std::task::Context<'_>,
1571 ) -> Poll<Option<Self::Item>> {
1572 match self.subjects.next() {
1573 Some((subject, count)) => Poll::Ready(Some(Ok((subject, count)))),
1574 None => {
1575 if self.pages_done {
1577 return Poll::Ready(None);
1578 }
1579 let stream = self.stream.clone();
1580 let context = self.context.clone();
1581 let subjects_filter = self.subjects_filter.clone();
1582 let offset = self.offset;
1583 match self
1584 .info_request
1585 .get_or_insert_with(|| {
1586 Box::pin(stream_info_with_details(
1587 context,
1588 stream,
1589 offset,
1590 false,
1591 subjects_filter,
1592 ))
1593 })
1594 .poll_unpin(cx)
1595 {
1596 Poll::Ready(resp) => match resp {
1597 Ok(info) => {
1598 let subjects = info.state.subjects.clone();
1599 self.offset += subjects.as_ref().map_or_else(|| 0, |s| s.len());
1600 self.info_request = None;
1601 let subjects = subjects.unwrap_or_default();
1602 self.subjects = info.state.subjects.unwrap_or_default().into_iter();
1603 let total = info.paged_info.map(|info| info.total).unwrap_or(0);
1604 if total <= self.offset || subjects.is_empty() {
1605 self.pages_done = true;
1606 }
1607 match self.subjects.next() {
1608 Some((subject, count)) => Poll::Ready(Some(Ok((subject, count)))),
1609 None => Poll::Ready(None),
1610 }
1611 }
1612 Err(err) => Poll::Ready(Some(Err(err))),
1613 },
1614 Poll::Pending => Poll::Pending,
1615 }
1616 }
1617 }
1618 }
1619}
1620
1621#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1623pub struct Info {
1624 pub config: Config,
1626 #[serde(with = "rfc3339")]
1628 pub created: time::OffsetDateTime,
1629 pub state: State,
1631 pub cluster: Option<ClusterInfo>,
1633 #[serde(default)]
1635 pub mirror: Option<SourceInfo>,
1636 #[serde(default)]
1638 pub sources: Vec<SourceInfo>,
1639 #[serde(flatten)]
1640 paged_info: Option<PagedInfo>,
1641}
1642
1643#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1644pub struct PagedInfo {
1645 offset: usize,
1646 total: usize,
1647 limit: usize,
1648}
1649
1650#[derive(Deserialize)]
1651pub struct DeleteStatus {
1652 pub success: bool,
1653}
1654
1655#[cfg(feature = "server_2_11")]
1656#[derive(Deserialize)]
1657pub struct PauseResponse {
1658 pub paused: bool,
1659 #[serde(with = "rfc3339")]
1660 pub pause_until: OffsetDateTime,
1661 #[serde(default, with = "serde_nanos")]
1662 pub pause_remaining: Option<Duration>,
1663}
1664
1665#[cfg(feature = "server_2_11")]
1666#[derive(Serialize, Debug)]
1667struct PauseResumeConsumerRequest {
1668 #[serde(with = "rfc3339::option", skip_serializing_if = "Option::is_none")]
1669 pause_until: Option<OffsetDateTime>,
1670}
1671
1672#[cfg(feature = "server_2_14")]
1673#[derive(Serialize, Debug)]
1674pub(crate) struct ConsumerResetRequest {
1675 #[serde(default, skip_serializing_if = "is_default")]
1676 pub(crate) seq: u64,
1677}
1678
1679#[cfg(feature = "server_2_14")]
1681#[cfg_attr(docsrs, doc(cfg(feature = "server_2_14")))]
1682#[derive(Debug, Deserialize, Clone)]
1683pub struct ConsumerResetResponse {
1684 #[serde(flatten)]
1687 pub info: super::consumer::Info,
1688 pub reset_seq: u64,
1692}
1693
1694#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1696pub struct State {
1697 pub messages: u64,
1699 pub bytes: u64,
1701 #[serde(rename = "first_seq")]
1703 pub first_sequence: u64,
1704 #[serde(with = "rfc3339", rename = "first_ts")]
1706 pub first_timestamp: time::OffsetDateTime,
1707 #[serde(rename = "last_seq")]
1709 pub last_sequence: u64,
1710 #[serde(with = "rfc3339", rename = "last_ts")]
1712 pub last_timestamp: time::OffsetDateTime,
1713 pub consumer_count: usize,
1715 #[serde(default, rename = "num_subjects")]
1717 pub subjects_count: u64,
1718 #[serde(default, rename = "num_deleted")]
1720 pub deleted_count: Option<u64>,
1721 #[serde(default)]
1724 pub deleted: Option<Vec<u64>>,
1725
1726 pub(crate) subjects: Option<HashMap<String, usize>>,
1727}
1728
1729#[derive(Debug, Serialize, Deserialize, Clone)]
1731pub struct RawMessage {
1732 #[serde(rename = "subject")]
1734 pub subject: String,
1735
1736 #[serde(rename = "seq")]
1738 pub sequence: u64,
1739
1740 #[serde(default, rename = "data")]
1742 pub payload: String,
1743
1744 #[serde(default, rename = "hdrs")]
1746 pub headers: Option<String>,
1747
1748 #[serde(rename = "time", with = "rfc3339")]
1750 pub time: time::OffsetDateTime,
1751}
1752
1753impl TryFrom<RawMessage> for StreamMessage {
1754 type Error = crate::Error;
1755
1756 fn try_from(value: RawMessage) -> Result<Self, Self::Error> {
1757 let decoded_payload = STANDARD
1758 .decode(value.payload)
1759 .map_err(|err| Box::new(std::io::Error::other(err)))?;
1760 let decoded_headers = value
1761 .headers
1762 .map(|header| STANDARD.decode(header))
1763 .map_or(Ok(None), |v| v.map(Some))?;
1764
1765 let (headers, _, _) = decoded_headers
1766 .map_or_else(|| Ok((HeaderMap::new(), None, None)), |h| parse_headers(&h))?;
1767
1768 Ok(StreamMessage {
1769 subject: value.subject.into(),
1770 payload: decoded_payload.into(),
1771 headers,
1772 sequence: value.sequence,
1773 time: value.time,
1774 })
1775 }
1776}
1777
1778fn is_continuation(c: char) -> bool {
1779 c == ' ' || c == '\t'
1780}
1781const HEADER_LINE: &str = "NATS/1.0";
1782
1783#[allow(clippy::type_complexity)]
1784fn parse_headers(
1785 buf: &[u8],
1786) -> Result<(HeaderMap, Option<StatusCode>, Option<String>), crate::Error> {
1787 let mut headers = HeaderMap::new();
1788 let mut maybe_status: Option<StatusCode> = None;
1789 let mut maybe_description: Option<String> = None;
1790 let mut lines = if let Ok(line) = std::str::from_utf8(buf) {
1791 line.lines().peekable()
1792 } else {
1793 return Err(Box::new(std::io::Error::other("invalid header")));
1794 };
1795
1796 if let Some(line) = lines.next() {
1797 let line = line
1798 .strip_prefix(HEADER_LINE)
1799 .ok_or_else(|| {
1800 Box::new(std::io::Error::other(
1801 "version line does not start with NATS/1.0",
1802 ))
1803 })?
1804 .trim();
1805
1806 match line.split_once(' ') {
1807 Some((status, description)) => {
1808 if !status.is_empty() {
1809 maybe_status = Some(status.parse()?);
1810 }
1811
1812 if !description.is_empty() {
1813 maybe_description = Some(description.trim().to_string());
1814 }
1815 }
1816 None => {
1817 if !line.is_empty() {
1818 maybe_status = Some(line.parse()?);
1819 }
1820 }
1821 }
1822 } else {
1823 return Err(Box::new(std::io::Error::other(
1824 "expected header information not found",
1825 )));
1826 };
1827
1828 while let Some(line) = lines.next() {
1829 if line.is_empty() {
1830 continue;
1831 }
1832
1833 if let Some((k, v)) = line.split_once(':').to_owned() {
1834 let mut s = String::from(v.trim());
1835 while let Some(v) = lines.next_if(|s| s.starts_with(is_continuation)).to_owned() {
1836 s.push(' ');
1837 s.push_str(v.trim());
1838 }
1839
1840 headers.insert(
1841 HeaderName::from_str(k)?,
1842 HeaderValue::from_str(&s).map_err(|err| Box::new(io::Error::other(err)))?,
1843 );
1844 } else {
1845 return Err(Box::new(std::io::Error::other("malformed header line")));
1846 }
1847 }
1848
1849 if headers.is_empty() {
1850 Ok((HeaderMap::new(), maybe_status, maybe_description))
1851 } else {
1852 Ok((headers, maybe_status, maybe_description))
1853 }
1854}
1855
1856#[derive(Debug, Serialize, Deserialize, Clone)]
1857struct GetRawMessage {
1858 pub(crate) message: RawMessage,
1859}
1860
1861fn is_default<T: Default + Eq>(t: &T) -> bool {
1862 t == &T::default()
1863}
1864#[derive(Debug, Default, Deserialize, Clone, PartialEq, Eq)]
1866pub struct ClusterInfo {
1867 #[serde(default)]
1869 pub name: Option<String>,
1870 #[serde(default)]
1872 pub raft_group: Option<String>,
1873 #[serde(default)]
1875 pub leader: Option<String>,
1876 #[serde(default, with = "rfc3339::option")]
1878 pub leader_since: Option<OffsetDateTime>,
1879 #[cfg(feature = "server_2_12")]
1881 #[serde(default)]
1882 pub system_account: bool,
1884 #[cfg(feature = "server_2_12")]
1885 #[serde(default)]
1887 pub traffic_account: Option<String>,
1888 #[serde(default)]
1890 pub replicas: Vec<PeerInfo>,
1891}
1892
1893#[derive(Debug, Default, Deserialize, Clone, PartialEq, Eq)]
1895pub struct PeerInfo {
1896 pub name: String,
1898 pub current: bool,
1900 #[serde(with = "serde_nanos")]
1902 pub active: Duration,
1903 #[serde(default)]
1905 pub offline: bool,
1906 pub lag: Option<u64>,
1908}
1909
1910#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
1911pub struct SourceInfo {
1912 pub name: String,
1914 pub lag: u64,
1916 #[serde(deserialize_with = "negative_duration_as_none")]
1918 pub active: Option<std::time::Duration>,
1919 #[serde(default)]
1921 pub filter_subject: Option<String>,
1922 #[serde(default)]
1924 pub subject_transform_dest: Option<String>,
1925 #[serde(default)]
1927 pub subject_transforms: Vec<SubjectTransform>,
1928}
1929
1930fn negative_duration_as_none<'de, D>(
1931 deserializer: D,
1932) -> Result<Option<std::time::Duration>, D::Error>
1933where
1934 D: Deserializer<'de>,
1935{
1936 let n = i64::deserialize(deserializer)?;
1937 if n.is_negative() {
1938 Ok(None)
1939 } else {
1940 Ok(Some(std::time::Duration::from_nanos(n as u64)))
1941 }
1942}
1943
1944#[derive(Debug, Deserialize, Clone, Copy)]
1946pub struct PurgeResponse {
1947 pub success: bool,
1949 pub purged: u64,
1951}
1952#[derive(Default, Debug, Serialize, Clone)]
1954pub struct PurgeRequest {
1955 #[serde(default, rename = "seq", skip_serializing_if = "is_default")]
1957 pub sequence: Option<u64>,
1958
1959 #[serde(default, skip_serializing_if = "is_default")]
1961 pub filter: Option<String>,
1962
1963 #[serde(default, skip_serializing_if = "is_default")]
1965 pub keep: Option<u64>,
1966}
1967
1968#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
1969pub struct Source {
1970 pub name: String,
1972 #[serde(default, rename = "opt_start_seq", skip_serializing_if = "is_default")]
1974 pub start_sequence: Option<u64>,
1975 #[serde(
1976 default,
1977 rename = "opt_start_time",
1978 skip_serializing_if = "is_default",
1979 with = "rfc3339::option"
1980 )]
1981 pub start_time: Option<OffsetDateTime>,
1983 #[serde(default, skip_serializing_if = "is_default")]
1985 pub filter_subject: Option<String>,
1986 #[serde(default, skip_serializing_if = "Option::is_none")]
1988 pub external: Option<External>,
1989 #[serde(default, skip_serializing_if = "is_default")]
1991 pub domain: Option<String>,
1992 #[cfg(feature = "server_2_10")]
1994 #[serde(default, skip_serializing_if = "is_default")]
1995 pub subject_transforms: Vec<SubjectTransform>,
1996
1997 #[cfg(feature = "server_2_14")]
2004 #[cfg_attr(docsrs, doc(cfg(feature = "server_2_14")))]
2005 #[serde(default, skip_serializing_if = "Option::is_none")]
2006 pub consumer: Option<StreamConsumerSource>,
2007}
2008
2009#[cfg(feature = "server_2_14")]
2012#[cfg_attr(docsrs, doc(cfg(feature = "server_2_14")))]
2013#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
2014pub struct StreamConsumerSource {
2015 #[serde(default, skip_serializing_if = "is_default")]
2017 pub name: String,
2018 #[serde(default, skip_serializing_if = "is_default")]
2020 pub deliver_subject: String,
2021}
2022
2023#[cfg(feature = "server_2_14")]
2024impl StreamConsumerSource {
2025 pub fn new(name: impl Into<String>, deliver_subject: impl Into<String>) -> Self {
2028 Self {
2029 name: name.into(),
2030 deliver_subject: deliver_subject.into(),
2031 }
2032 }
2033}
2034
2035#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
2036pub struct External {
2037 #[serde(rename = "api")]
2039 pub api_prefix: String,
2040 #[serde(rename = "deliver", skip_serializing_if = "is_default")]
2042 pub delivery_prefix: Option<String>,
2043}
2044
2045use std::marker::PhantomData;
2046
2047#[derive(Debug, Default)]
2048pub struct Yes;
2049#[derive(Debug, Default)]
2050pub struct No;
2051
2052pub trait ToAssign: Debug {}
2053
2054impl ToAssign for Yes {}
2055impl ToAssign for No {}
2056
2057#[derive(Debug)]
2058pub struct Purge<SEQUENCE, KEEP>
2059where
2060 SEQUENCE: ToAssign,
2061 KEEP: ToAssign,
2062{
2063 inner: PurgeRequest,
2064 sequence_set: PhantomData<SEQUENCE>,
2065 keep_set: PhantomData<KEEP>,
2066 context: Context,
2067 stream_name: String,
2068}
2069
2070impl<SEQUENCE, KEEP> Purge<SEQUENCE, KEEP>
2071where
2072 SEQUENCE: ToAssign,
2073 KEEP: ToAssign,
2074{
2075 pub fn filter<T: Into<String>>(mut self, filter: T) -> Purge<SEQUENCE, KEEP> {
2077 self.inner.filter = Some(filter.into());
2078 self
2079 }
2080}
2081
2082impl Purge<No, No> {
2083 pub(crate) fn build<I>(stream: &Stream<I>) -> Purge<No, No> {
2084 Purge {
2085 context: stream.context.clone(),
2086 stream_name: stream.name.clone(),
2087 inner: Default::default(),
2088 sequence_set: PhantomData {},
2089 keep_set: PhantomData {},
2090 }
2091 }
2092}
2093
2094impl<KEEP> Purge<No, KEEP>
2095where
2096 KEEP: ToAssign,
2097{
2098 pub fn keep(self, keep: u64) -> Purge<No, Yes> {
2101 Purge {
2102 context: self.context.clone(),
2103 stream_name: self.stream_name.clone(),
2104 sequence_set: PhantomData {},
2105 keep_set: PhantomData {},
2106 inner: PurgeRequest {
2107 keep: Some(keep),
2108 ..self.inner
2109 },
2110 }
2111 }
2112}
2113impl<SEQUENCE> Purge<SEQUENCE, No>
2114where
2115 SEQUENCE: ToAssign,
2116{
2117 pub fn sequence(self, sequence: u64) -> Purge<Yes, No> {
2120 Purge {
2121 context: self.context.clone(),
2122 stream_name: self.stream_name.clone(),
2123 sequence_set: PhantomData {},
2124 keep_set: PhantomData {},
2125 inner: PurgeRequest {
2126 sequence: Some(sequence),
2127 ..self.inner
2128 },
2129 }
2130 }
2131}
2132
2133#[derive(Clone, Debug, PartialEq)]
2134pub enum PurgeErrorKind {
2135 Request,
2136 TimedOut,
2137 JetStream(super::errors::Error),
2138}
2139
2140impl Display for PurgeErrorKind {
2141 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2142 match self {
2143 Self::Request => write!(f, "request failed"),
2144 Self::TimedOut => write!(f, "timed out"),
2145 Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2146 }
2147 }
2148}
2149
2150pub type PurgeError = Error<PurgeErrorKind>;
2151
2152impl<S, K> IntoFuture for Purge<S, K>
2153where
2154 S: ToAssign + std::marker::Send,
2155 K: ToAssign + std::marker::Send,
2156{
2157 type Output = Result<PurgeResponse, PurgeError>;
2158
2159 type IntoFuture = BoxFuture<'static, Result<PurgeResponse, PurgeError>>;
2160
2161 fn into_future(self) -> Self::IntoFuture {
2162 Box::pin(std::future::IntoFuture::into_future(async move {
2163 let request_subject = format!("STREAM.PURGE.{}", self.stream_name);
2164 let response: Response<PurgeResponse> = self
2165 .context
2166 .request(request_subject, &self.inner)
2167 .map_err(|err| match err.kind() {
2168 RequestErrorKind::TimedOut => PurgeError::new(PurgeErrorKind::TimedOut),
2169 _ => PurgeError::with_source(PurgeErrorKind::Request, err),
2170 })
2171 .await?;
2172
2173 match response {
2174 Response::Err { error } => Err(PurgeError::new(PurgeErrorKind::JetStream(error))),
2175 Response::Ok(response) => Ok(response),
2176 }
2177 }))
2178 }
2179}
2180
2181#[derive(Deserialize, Debug)]
2182struct ConsumerPage {
2183 total: usize,
2184 consumers: Option<Vec<String>>,
2185}
2186
2187#[derive(Deserialize, Debug)]
2188struct ConsumerInfoPage {
2189 total: usize,
2190 consumers: Option<Vec<super::consumer::Info>>,
2191}
2192
2193type ConsumerNamesErrorKind = StreamsErrorKind;
2194type ConsumerNamesError = StreamsError;
2195type PageRequest = BoxFuture<'static, Result<ConsumerPage, RequestError>>;
2196
2197pub struct ConsumerNames {
2198 context: Context,
2199 stream: String,
2200 offset: usize,
2201 page_request: Option<PageRequest>,
2202 consumers: Vec<String>,
2203 done: bool,
2204}
2205
2206impl futures_util::Stream for ConsumerNames {
2207 type Item = Result<String, ConsumerNamesError>;
2208
2209 fn poll_next(
2210 mut self: Pin<&mut Self>,
2211 cx: &mut std::task::Context<'_>,
2212 ) -> std::task::Poll<Option<Self::Item>> {
2213 match self.page_request.as_mut() {
2214 Some(page) => match page.try_poll_unpin(cx) {
2215 std::task::Poll::Ready(page) => {
2216 self.page_request = None;
2217 let page = page.map_err(|err| {
2218 ConsumerNamesError::with_source(ConsumerNamesErrorKind::Other, err)
2219 })?;
2220
2221 if let Some(consumers) = page.consumers {
2222 self.offset += consumers.len();
2223 self.consumers = consumers;
2224 if self.offset >= page.total {
2225 self.done = true;
2226 }
2227 match self.consumers.pop() {
2228 Some(stream) => Poll::Ready(Some(Ok(stream))),
2229 None => Poll::Ready(None),
2230 }
2231 } else {
2232 Poll::Ready(None)
2233 }
2234 }
2235 std::task::Poll::Pending => std::task::Poll::Pending,
2236 },
2237 None => {
2238 if let Some(stream) = self.consumers.pop() {
2239 Poll::Ready(Some(Ok(stream)))
2240 } else {
2241 if self.done {
2242 return Poll::Ready(None);
2243 }
2244 let context = self.context.clone();
2245 let offset = self.offset;
2246 let stream = self.stream.clone();
2247 self.page_request = Some(Box::pin(async move {
2248 match context
2249 .request(
2250 format!("CONSUMER.NAMES.{stream}"),
2251 &json!({
2252 "offset": offset,
2253 }),
2254 )
2255 .await?
2256 {
2257 Response::Err { error } => Err(RequestError::with_source(
2258 super::context::RequestErrorKind::Other,
2259 error,
2260 )),
2261 Response::Ok(page) => Ok(page),
2262 }
2263 }));
2264 self.poll_next(cx)
2265 }
2266 }
2267 }
2268 }
2269}
2270
2271pub type ConsumersErrorKind = StreamsErrorKind;
2272pub type ConsumersError = StreamsError;
2273type PageInfoRequest = BoxFuture<'static, Result<ConsumerInfoPage, RequestError>>;
2274
2275pub struct Consumers {
2276 context: Context,
2277 stream: String,
2278 offset: usize,
2279 page_request: Option<PageInfoRequest>,
2280 consumers: Vec<super::consumer::Info>,
2281 done: bool,
2282}
2283
2284impl futures_util::Stream for Consumers {
2285 type Item = Result<super::consumer::Info, ConsumersError>;
2286
2287 fn poll_next(
2288 mut self: Pin<&mut Self>,
2289 cx: &mut std::task::Context<'_>,
2290 ) -> std::task::Poll<Option<Self::Item>> {
2291 match self.page_request.as_mut() {
2292 Some(page) => match page.try_poll_unpin(cx) {
2293 std::task::Poll::Ready(page) => {
2294 self.page_request = None;
2295 let page = page.map_err(|err| {
2296 ConsumersError::with_source(ConsumersErrorKind::Other, err)
2297 })?;
2298 if let Some(consumers) = page.consumers {
2299 self.offset += consumers.len();
2300 self.consumers = consumers;
2301 if self.offset >= page.total {
2302 self.done = true;
2303 }
2304 match self.consumers.pop() {
2305 Some(consumer) => Poll::Ready(Some(Ok(consumer))),
2306 None => Poll::Ready(None),
2307 }
2308 } else {
2309 Poll::Ready(None)
2310 }
2311 }
2312 std::task::Poll::Pending => std::task::Poll::Pending,
2313 },
2314 None => {
2315 if let Some(stream) = self.consumers.pop() {
2316 Poll::Ready(Some(Ok(stream)))
2317 } else {
2318 if self.done {
2319 return Poll::Ready(None);
2320 }
2321 let context = self.context.clone();
2322 let offset = self.offset;
2323 let stream = self.stream.clone();
2324 self.page_request = Some(Box::pin(async move {
2325 match context
2326 .request(
2327 format!("CONSUMER.LIST.{stream}"),
2328 &json!({
2329 "offset": offset,
2330 }),
2331 )
2332 .await?
2333 {
2334 Response::Err { error } => Err(RequestError::with_source(
2335 super::context::RequestErrorKind::Other,
2336 error,
2337 )),
2338 Response::Ok(page) => Ok(page),
2339 }
2340 }));
2341 self.poll_next(cx)
2342 }
2343 }
2344 }
2345 }
2346}
2347
2348#[derive(Clone, Debug, PartialEq)]
2349pub enum LastRawMessageErrorKind {
2350 NoMessageFound,
2351 InvalidSubject,
2352 JetStream(super::errors::Error),
2353 Other,
2354}
2355
2356impl Display for LastRawMessageErrorKind {
2357 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2358 match self {
2359 Self::NoMessageFound => write!(f, "no message found"),
2360 Self::InvalidSubject => write!(f, "invalid subject"),
2361 Self::Other => write!(f, "failed to get last raw message"),
2362 Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2363 }
2364 }
2365}
2366
2367pub type LastRawMessageError = Error<LastRawMessageErrorKind>;
2368pub type RawMessageErrorKind = LastRawMessageErrorKind;
2369pub type RawMessageError = LastRawMessageError;
2370
2371#[derive(Clone, Debug, PartialEq)]
2372pub enum ConsumerErrorKind {
2373 TimedOut,
2375 Request,
2376 InvalidConsumerType,
2377 InvalidName,
2378 JetStream(super::errors::Error),
2379 Other,
2380}
2381
2382impl Display for ConsumerErrorKind {
2383 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2384 match self {
2385 Self::TimedOut => write!(f, "timed out"),
2386 Self::Request => write!(f, "request failed"),
2387 Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2388 Self::Other => write!(f, "consumer error"),
2389 Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2390 Self::InvalidName => write!(f, "invalid consumer name"),
2391 }
2392 }
2393}
2394
2395pub type ConsumerError = Error<ConsumerErrorKind>;
2396
2397#[cfg(feature = "server_2_14")]
2398#[cfg_attr(docsrs, doc(cfg(feature = "server_2_14")))]
2399#[derive(Clone, Debug, PartialEq)]
2400pub enum ConsumerResetErrorKind {
2401 TimedOut,
2402 Request,
2403 NotFound,
2409 InvalidReset,
2413 JetStream(super::errors::Error),
2414}
2415
2416#[cfg(feature = "server_2_14")]
2417#[cfg_attr(docsrs, doc(cfg(feature = "server_2_14")))]
2418impl Display for ConsumerResetErrorKind {
2419 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2420 match self {
2421 Self::TimedOut => write!(f, "timed out"),
2422 Self::Request => write!(f, "request failed"),
2423 Self::NotFound => write!(f, "stream or consumer not found"),
2424 Self::InvalidReset => write!(f, "invalid reset"),
2425 Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2426 }
2427 }
2428}
2429
2430#[cfg(feature = "server_2_14")]
2431pub type ConsumerResetError = Error<ConsumerResetErrorKind>;
2432
2433#[cfg(feature = "server_2_14")]
2434impl From<super::errors::Error> for ConsumerResetError {
2435 fn from(err: super::errors::Error) -> Self {
2436 match err.error_code() {
2437 super::errors::ErrorCode::CONSUMER_INVALID_RESET => {
2438 ConsumerResetError::new(ConsumerResetErrorKind::InvalidReset)
2439 }
2440 super::errors::ErrorCode::CONSUMER_NOT_FOUND
2441 | super::errors::ErrorCode::STREAM_NOT_FOUND => {
2442 ConsumerResetError::new(ConsumerResetErrorKind::NotFound)
2443 }
2444 _ => ConsumerResetError::new(ConsumerResetErrorKind::JetStream(err)),
2445 }
2446 }
2447}
2448
2449#[cfg(feature = "server_2_14")]
2450impl From<super::context::RequestError> for ConsumerResetError {
2451 fn from(err: super::context::RequestError) -> Self {
2452 match err.kind() {
2453 RequestErrorKind::TimedOut => ConsumerResetError::new(ConsumerResetErrorKind::TimedOut),
2454 RequestErrorKind::NoResponders => {
2455 ConsumerResetError::new(ConsumerResetErrorKind::NotFound)
2456 }
2457 _ => ConsumerResetError::with_source(ConsumerResetErrorKind::Request, err),
2458 }
2459 }
2460}
2461
2462#[derive(Clone, Debug, PartialEq)]
2463pub enum ConsumerCreateStrictErrorKind {
2464 TimedOut,
2466 Request,
2467 InvalidConsumerType,
2468 InvalidName,
2469 AlreadyExists,
2470 JetStream(super::errors::Error),
2471 Other,
2472}
2473
2474impl Display for ConsumerCreateStrictErrorKind {
2475 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2476 match self {
2477 Self::TimedOut => write!(f, "timed out"),
2478 Self::Request => write!(f, "request failed"),
2479 Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2480 Self::Other => write!(f, "consumer error"),
2481 Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2482 Self::InvalidName => write!(f, "invalid consumer name"),
2483 Self::AlreadyExists => write!(f, "consumer already exists"),
2484 }
2485 }
2486}
2487
2488pub type ConsumerCreateStrictError = Error<ConsumerCreateStrictErrorKind>;
2489
2490#[derive(Clone, Debug, PartialEq)]
2491pub enum ConsumerUpdateErrorKind {
2492 TimedOut,
2494 Request,
2495 InvalidConsumerType,
2496 InvalidName,
2497 DoesNotExist,
2498 JetStream(super::errors::Error),
2499 Other,
2500}
2501
2502impl Display for ConsumerUpdateErrorKind {
2503 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2504 match self {
2505 Self::TimedOut => write!(f, "timed out"),
2506 Self::Request => write!(f, "request failed"),
2507 Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2508 Self::Other => write!(f, "consumer error"),
2509 Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2510 Self::InvalidName => write!(f, "invalid consumer name"),
2511 Self::DoesNotExist => write!(f, "consumer does not exist"),
2512 }
2513 }
2514}
2515
2516pub type ConsumerUpdateError = Error<ConsumerUpdateErrorKind>;
2517
2518impl From<super::errors::Error> for ConsumerError {
2519 fn from(err: super::errors::Error) -> Self {
2520 ConsumerError::new(ConsumerErrorKind::JetStream(err))
2521 }
2522}
2523impl From<super::errors::Error> for ConsumerCreateStrictError {
2524 fn from(err: super::errors::Error) -> Self {
2525 if err.error_code() == super::errors::ErrorCode::CONSUMER_ALREADY_EXISTS {
2526 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::AlreadyExists)
2527 } else {
2528 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::JetStream(err))
2529 }
2530 }
2531}
2532impl From<super::errors::Error> for ConsumerUpdateError {
2533 fn from(err: super::errors::Error) -> Self {
2534 if err.error_code() == super::errors::ErrorCode::CONSUMER_DOES_NOT_EXIST {
2535 ConsumerUpdateError::new(ConsumerUpdateErrorKind::DoesNotExist)
2536 } else {
2537 ConsumerUpdateError::new(ConsumerUpdateErrorKind::JetStream(err))
2538 }
2539 }
2540}
2541impl From<ConsumerError> for ConsumerUpdateError {
2542 fn from(err: ConsumerError) -> Self {
2543 match err.kind() {
2544 ConsumerErrorKind::JetStream(err) => {
2545 if err.error_code() == super::errors::ErrorCode::CONSUMER_DOES_NOT_EXIST {
2546 ConsumerUpdateError::new(ConsumerUpdateErrorKind::DoesNotExist)
2547 } else {
2548 ConsumerUpdateError::new(ConsumerUpdateErrorKind::JetStream(err))
2549 }
2550 }
2551 ConsumerErrorKind::Request => {
2552 ConsumerUpdateError::new(ConsumerUpdateErrorKind::Request)
2553 }
2554 ConsumerErrorKind::TimedOut => {
2555 ConsumerUpdateError::new(ConsumerUpdateErrorKind::TimedOut)
2556 }
2557 ConsumerErrorKind::InvalidConsumerType => {
2558 ConsumerUpdateError::new(ConsumerUpdateErrorKind::InvalidConsumerType)
2559 }
2560 ConsumerErrorKind::InvalidName => {
2561 ConsumerUpdateError::new(ConsumerUpdateErrorKind::InvalidName)
2562 }
2563 ConsumerErrorKind::Other => ConsumerUpdateError::new(ConsumerUpdateErrorKind::Other),
2564 }
2565 }
2566}
2567
2568impl From<ConsumerError> for ConsumerCreateStrictError {
2569 fn from(err: ConsumerError) -> Self {
2570 match err.kind() {
2571 ConsumerErrorKind::JetStream(err) => {
2572 if err.error_code() == super::errors::ErrorCode::CONSUMER_ALREADY_EXISTS {
2573 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::AlreadyExists)
2574 } else {
2575 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::JetStream(err))
2576 }
2577 }
2578 ConsumerErrorKind::Request => {
2579 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::Request)
2580 }
2581 ConsumerErrorKind::TimedOut => {
2582 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::TimedOut)
2583 }
2584 ConsumerErrorKind::InvalidConsumerType => {
2585 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::InvalidConsumerType)
2586 }
2587 ConsumerErrorKind::InvalidName => {
2588 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::InvalidName)
2589 }
2590 ConsumerErrorKind::Other => {
2591 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::Other)
2592 }
2593 }
2594 }
2595}
2596
2597impl From<super::context::RequestError> for ConsumerError {
2598 fn from(err: super::context::RequestError) -> Self {
2599 match err.kind() {
2600 RequestErrorKind::TimedOut => ConsumerError::new(ConsumerErrorKind::TimedOut),
2601 _ => ConsumerError::with_source(ConsumerErrorKind::Request, err),
2602 }
2603 }
2604}
2605impl From<super::context::RequestError> for ConsumerUpdateError {
2606 fn from(err: super::context::RequestError) -> Self {
2607 match err.kind() {
2608 RequestErrorKind::TimedOut => {
2609 ConsumerUpdateError::new(ConsumerUpdateErrorKind::TimedOut)
2610 }
2611 _ => ConsumerUpdateError::with_source(ConsumerUpdateErrorKind::Request, err),
2612 }
2613 }
2614}
2615impl From<super::context::RequestError> for ConsumerCreateStrictError {
2616 fn from(err: super::context::RequestError) -> Self {
2617 match err.kind() {
2618 RequestErrorKind::TimedOut => {
2619 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::TimedOut)
2620 }
2621 _ => {
2622 ConsumerCreateStrictError::with_source(ConsumerCreateStrictErrorKind::Request, err)
2623 }
2624 }
2625 }
2626}
2627
2628#[derive(Debug, Serialize, Default)]
2629pub struct DirectGetRequest {
2630 #[serde(rename = "seq", skip_serializing_if = "Option::is_none")]
2631 sequence: Option<u64>,
2632 #[serde(rename = "last_by_subj", skip_serializing)]
2633 last_by_subject: Option<String>,
2634 #[serde(rename = "next_by_subj", skip_serializing_if = "Option::is_none")]
2635 next_by_subject: Option<String>,
2636}
2637
2638pub struct WithHeaders;
2640
2641pub struct WithoutHeaders;
2643
2644trait DirectGetResponse: Sized {
2646 fn from_message(message: crate::Message) -> Result<Self, DirectGetError>;
2647}
2648
2649impl DirectGetResponse for StreamMessage {
2650 fn from_message(message: crate::Message) -> Result<Self, DirectGetError> {
2651 StreamMessage::try_from(message).map_err(Into::into)
2652 }
2653}
2654
2655impl DirectGetResponse for StreamValue {
2656 fn from_message(message: crate::Message) -> Result<Self, DirectGetError> {
2657 Ok(StreamValue {
2658 data: message.payload,
2659 })
2660 }
2661}
2662
2663pub struct DirectGetBuilder<T = WithHeaders> {
2664 context: Context,
2665 stream_name: String,
2666 request: DirectGetRequest,
2667 _phantom: std::marker::PhantomData<T>,
2668}
2669
2670impl DirectGetBuilder<WithHeaders> {
2671 fn new(context: Context, stream_name: String) -> DirectGetBuilder<WithHeaders> {
2672 DirectGetBuilder {
2673 context,
2674 stream_name,
2675 request: DirectGetRequest::default(),
2676 _phantom: std::marker::PhantomData,
2677 }
2678 }
2679}
2680
2681impl<T> DirectGetBuilder<T> {
2682 async fn send_internal<R: DirectGetResponse>(&self) -> Result<R, DirectGetError> {
2684 let payload = if self.request.last_by_subject.is_some() {
2687 Bytes::new()
2688 } else {
2689 serde_json::to_vec(&self.request).map(Bytes::from)?
2690 };
2691
2692 let request_subject = if let Some(ref subject) = self.request.last_by_subject {
2693 format!(
2694 "{}.DIRECT.GET.{}.{}",
2695 &self.context.prefix, &self.stream_name, subject
2696 )
2697 } else {
2698 format!("{}.DIRECT.GET.{}", &self.context.prefix, &self.stream_name)
2699 };
2700
2701 let response = self
2702 .context
2703 .client
2704 .request(request_subject, payload)
2705 .await?;
2706
2707 if let Some(status) = response.status {
2709 if let Some(ref description) = response.description {
2710 match status {
2711 StatusCode::NOT_FOUND => {
2712 return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
2713 }
2714 StatusCode::TIMEOUT => {
2716 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
2717 }
2718 _ => {
2719 return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
2720 status,
2721 description.to_string(),
2722 )));
2723 }
2724 }
2725 }
2726 }
2727
2728 R::from_message(response)
2729 }
2730
2731 pub fn sequence(mut self, seq: u64) -> Self {
2733 self.request.sequence = Some(seq);
2734 self
2735 }
2736
2737 pub fn last_by_subject<S: Into<String>>(mut self, subject: S) -> Self {
2739 self.request.last_by_subject = Some(subject.into());
2740 self
2741 }
2742
2743 pub fn next_by_subject<S: Into<String>>(mut self, subject: S) -> Self {
2745 self.request.next_by_subject = Some(subject.into());
2746 self
2747 }
2748}
2749
2750impl DirectGetBuilder<WithHeaders> {
2751 pub async fn send(self) -> Result<StreamMessage, DirectGetError> {
2753 self.send_internal::<StreamMessage>().await
2754 }
2755}
2756
2757impl DirectGetBuilder<WithoutHeaders> {
2758 pub async fn send(self) -> Result<StreamValue, DirectGetError> {
2760 self.send_internal::<StreamValue>().await
2761 }
2762}
2763
2764pub struct StreamValue {
2765 pub data: Bytes,
2766}
2767
2768#[derive(Debug, Serialize, Default)]
2769pub struct RawMessageRequest {
2770 #[serde(rename = "seq", skip_serializing_if = "Option::is_none")]
2771 sequence: Option<u64>,
2772 #[serde(rename = "last_by_subj", skip_serializing_if = "Option::is_none")]
2773 last_by_subject: Option<String>,
2774 #[serde(rename = "next_by_subj", skip_serializing_if = "Option::is_none")]
2775 next_by_subject: Option<String>,
2776}
2777
2778trait RawMessageResponse: Sized {
2780 fn from_raw_message(message: RawMessage) -> Result<Self, RawMessageError>;
2781}
2782
2783impl RawMessageResponse for StreamMessage {
2784 fn from_raw_message(message: RawMessage) -> Result<Self, RawMessageError> {
2785 StreamMessage::try_from(message)
2786 .map_err(|err| RawMessageError::with_source(RawMessageErrorKind::Other, err))
2787 }
2788}
2789
2790impl RawMessageResponse for StreamValue {
2791 fn from_raw_message(message: RawMessage) -> Result<Self, RawMessageError> {
2792 use base64::engine::general_purpose::STANDARD;
2793 use base64::Engine;
2794
2795 let decoded_payload = STANDARD.decode(message.payload).map_err(|err| {
2796 RawMessageError::with_source(
2797 RawMessageErrorKind::Other,
2798 Box::new(std::io::Error::other(err)),
2799 )
2800 })?;
2801
2802 Ok(StreamValue {
2803 data: decoded_payload.into(),
2804 })
2805 }
2806}
2807
2808pub struct RawMessageBuilder<T = WithHeaders> {
2809 context: Context,
2810 stream_name: String,
2811 request: RawMessageRequest,
2812 _phantom: std::marker::PhantomData<T>,
2813}
2814
2815impl RawMessageBuilder<WithHeaders> {
2816 fn new(context: Context, stream_name: String) -> Self {
2817 RawMessageBuilder {
2818 context,
2819 stream_name,
2820 request: RawMessageRequest::default(),
2821 _phantom: std::marker::PhantomData,
2822 }
2823 }
2824}
2825
2826impl<T> RawMessageBuilder<T> {
2827 async fn send_internal<R: RawMessageResponse>(&self) -> Result<R, RawMessageError> {
2829 for subject in [&self.request.last_by_subject, &self.request.next_by_subject]
2831 .into_iter()
2832 .flatten()
2833 {
2834 if !is_valid_subject(subject) {
2835 return Err(RawMessageError::new(RawMessageErrorKind::InvalidSubject));
2836 }
2837 }
2838
2839 let subject = format!("STREAM.MSG.GET.{}", &self.stream_name);
2840
2841 let response: Response<GetRawMessage> = self
2842 .context
2843 .request(subject, &self.request)
2844 .map_err(|err| RawMessageError::with_source(RawMessageErrorKind::Other, err))
2845 .await?;
2846
2847 match response {
2848 Response::Err { error } => {
2849 if error.error_code() == ErrorCode::NO_MESSAGE_FOUND {
2850 Err(RawMessageError::new(RawMessageErrorKind::NoMessageFound))
2851 } else {
2852 Err(RawMessageError::new(RawMessageErrorKind::JetStream(error)))
2853 }
2854 }
2855 Response::Ok(value) => R::from_raw_message(value.message),
2856 }
2857 }
2858
2859 pub fn sequence(mut self, seq: u64) -> Self {
2861 self.request.sequence = Some(seq);
2862 self
2863 }
2864
2865 pub fn last_by_subject<S: Into<String>>(mut self, subject: S) -> Self {
2867 self.request.last_by_subject = Some(subject.into());
2868 self
2869 }
2870
2871 pub fn next_by_subject<S: Into<String>>(mut self, subject: S) -> Self {
2873 self.request.next_by_subject = Some(subject.into());
2874 self
2875 }
2876}
2877
2878impl RawMessageBuilder<WithHeaders> {
2879 pub async fn send(self) -> Result<StreamMessage, RawMessageError> {
2881 self.send_internal::<StreamMessage>().await
2882 }
2883}
2884
2885impl RawMessageBuilder<WithoutHeaders> {
2886 pub async fn send(self) -> Result<StreamValue, RawMessageError> {
2888 self.send_internal::<StreamValue>().await
2889 }
2890}
2891
2892#[cfg(test)]
2893mod tests {
2894 use super::*;
2895
2896 #[test]
2897 fn consumer_limits_de() {
2898 let config = Config {
2899 ..Default::default()
2900 };
2901
2902 let roundtrip: Config = {
2903 let ser = serde_json::to_string(&config).unwrap();
2904 serde_json::from_str(&ser).unwrap()
2905 };
2906 assert_eq!(config, roundtrip);
2907 }
2908}