1use std::{
17 collections::{self, HashMap},
18 fmt::{self, Debug, Display},
19 future::IntoFuture,
20 io::{self},
21 pin::Pin,
22 str::FromStr,
23 task::Poll,
24 time::Duration,
25};
26
27use crate::{
28 error::Error, header::HeaderName, is_valid_subject, HeaderMap, HeaderValue, StatusCode,
29};
30use base64::engine::general_purpose::STANDARD;
31use base64::engine::Engine;
32use bytes::Bytes;
33use futures_util::{future::BoxFuture, FutureExt, TryFutureExt};
34use serde::{Deserialize, Deserializer, Serialize};
35use serde_json::json;
36use time::{serde::rfc3339, OffsetDateTime};
37
38use super::{
39 consumer::{self, Consumer, FromConsumer, IntoConsumerConfig},
40 context::{
41 ConsumerInfoError, ConsumerInfoErrorKind, RequestError, RequestErrorKind, StreamsError,
42 StreamsErrorKind,
43 },
44 errors::ErrorCode,
45 is_valid_name,
46 message::{StreamMessage, StreamMessageError},
47 response::Response,
48 Context,
49};
50
51pub type InfoError = RequestError;
52
53#[derive(Clone, Debug, PartialEq)]
54pub enum DirectGetErrorKind {
55 NotFound,
56 InvalidSubject,
57 TimedOut,
58 Request,
59 ErrorResponse(StatusCode, String),
60 Other,
61}
62
63impl Display for DirectGetErrorKind {
64 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65 match self {
66 Self::InvalidSubject => write!(f, "invalid subject"),
67 Self::NotFound => write!(f, "message not found"),
68 Self::ErrorResponse(status, description) => {
69 write!(f, "unable to get message: {status} {description}")
70 }
71 Self::Other => write!(f, "error getting message"),
72 Self::TimedOut => write!(f, "timed out"),
73 Self::Request => write!(f, "request failed"),
74 }
75 }
76}
77
78pub type DirectGetError = Error<DirectGetErrorKind>;
79
80impl From<crate::RequestError> for DirectGetError {
81 fn from(err: crate::RequestError) -> Self {
82 match err.kind() {
83 crate::RequestErrorKind::TimedOut => DirectGetError::new(DirectGetErrorKind::TimedOut),
84 crate::RequestErrorKind::NoResponders => {
85 DirectGetError::new(DirectGetErrorKind::ErrorResponse(
86 StatusCode::NO_RESPONDERS,
87 "no responders".to_string(),
88 ))
89 }
90 crate::RequestErrorKind::Other => {
91 DirectGetError::with_source(DirectGetErrorKind::Other, err)
92 }
93 }
94 }
95}
96
97impl From<serde_json::Error> for DirectGetError {
98 fn from(err: serde_json::Error) -> Self {
99 DirectGetError::with_source(DirectGetErrorKind::Other, err)
100 }
101}
102
103impl From<StreamMessageError> for DirectGetError {
104 fn from(err: StreamMessageError) -> Self {
105 DirectGetError::with_source(DirectGetErrorKind::Other, err)
106 }
107}
108
109#[derive(Clone, Debug, PartialEq)]
110pub enum DeleteMessageErrorKind {
111 Request,
112 TimedOut,
113 JetStream(super::errors::Error),
114}
115
116impl Display for DeleteMessageErrorKind {
117 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
118 match self {
119 Self::Request => write!(f, "request failed"),
120 Self::TimedOut => write!(f, "timed out"),
121 Self::JetStream(err) => write!(f, "JetStream error: {err}"),
122 }
123 }
124}
125
126pub type DeleteMessageError = Error<DeleteMessageErrorKind>;
127
128#[derive(Debug, Clone)]
132pub struct Stream<T = Info> {
133 pub(crate) info: T,
134 pub(crate) context: Context,
135 pub(crate) name: String,
136}
137
138impl Stream<Info> {
139 pub async fn info(&mut self) -> Result<&Info, InfoError> {
157 let subject = format!("STREAM.INFO.{}", self.info.config.name);
158
159 match self.context.request(subject, &json!({})).await? {
160 Response::Ok::<Info>(info) => {
161 self.info = info;
162 Ok(&self.info)
163 }
164 Response::Err { error } => Err(error.into()),
165 }
166 }
167
168 pub fn cached_info(&self) -> &Info {
187 &self.info
188 }
189}
190
191impl<I> Stream<I> {
192 pub async fn get_info(&self) -> Result<Info, InfoError> {
195 let subject = format!("STREAM.INFO.{}", self.name);
196
197 match self.context.request(subject, &json!({})).await? {
198 Response::Ok::<Info>(info) => Ok(info),
199 Response::Err { error } => Err(error.into()),
200 }
201 }
202
203 pub async fn info_with_subjects<F: AsRef<str>>(
226 &self,
227 subjects_filter: F,
228 ) -> Result<InfoWithSubjects, InfoError> {
229 let subjects_filter = subjects_filter.as_ref().to_string();
230 let info = stream_info_with_details(
232 self.context.clone(),
233 self.name.clone(),
234 0,
235 false,
236 subjects_filter.clone(),
237 )
238 .await?;
239
240 Ok(InfoWithSubjects::new(
241 self.context.clone(),
242 info,
243 subjects_filter,
244 ))
245 }
246
247 pub fn info_builder(&self) -> StreamInfoBuilder {
273 StreamInfoBuilder::new(self.context.clone(), self.name.clone())
274 }
275
276 pub fn direct_get_builder(&self) -> DirectGetBuilder<WithHeaders> {
296 DirectGetBuilder::new(self.context.clone(), self.name.clone())
297 }
298
299 pub async fn direct_get_next_for_subject<T: Into<String>>(
334 &self,
335 subject: T,
336 sequence: Option<u64>,
337 ) -> Result<StreamMessage, DirectGetError> {
338 let subject_str = subject.into();
339 if !is_valid_subject(&subject_str) {
340 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject));
341 }
342
343 let mut builder = self.direct_get_builder().next_by_subject(subject_str);
344 if let Some(seq) = sequence {
345 builder = builder.sequence(seq);
346 }
347
348 builder.send().await
349 }
350
351 pub async fn direct_get_first_for_subject<T: Into<String>>(
383 &self,
384 subject: T,
385 ) -> Result<StreamMessage, DirectGetError> {
386 let subject_str = subject.into();
387 if !is_valid_subject(&subject_str) {
388 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject));
389 }
390
391 self.direct_get_builder()
392 .next_by_subject(subject_str)
393 .send()
394 .await
395 }
396
397 pub async fn direct_get(&self, sequence: u64) -> Result<StreamMessage, DirectGetError> {
429 self.direct_get_builder().sequence(sequence).send().await
430 }
431
432 pub async fn direct_get_last_for_subject<T: Into<String>>(
464 &self,
465 subject: T,
466 ) -> Result<StreamMessage, DirectGetError> {
467 self.direct_get_builder()
468 .last_by_subject(subject)
469 .send()
470 .await
471 }
472 pub fn raw_message_builder(&self) -> RawMessageBuilder<WithHeaders> {
492 RawMessageBuilder::new(self.context.clone(), self.name.clone())
493 }
494
495 pub async fn get_raw_message(&self, sequence: u64) -> Result<StreamMessage, RawMessageError> {
525 self.raw_message_builder().sequence(sequence).send().await
526 }
527
528 pub async fn get_first_raw_message_by_subject<T: AsRef<str>>(
552 &self,
553 subject: T,
554 sequence: u64,
555 ) -> Result<StreamMessage, RawMessageError> {
556 self.raw_message_builder()
557 .sequence(sequence)
558 .next_by_subject(subject.as_ref().to_string())
559 .send()
560 .await
561 }
562
563 pub async fn get_next_raw_message_by_subject<T: AsRef<str>>(
587 &self,
588 subject: T,
589 ) -> Result<StreamMessage, RawMessageError> {
590 self.raw_message_builder()
591 .next_by_subject(subject.as_ref().to_string())
592 .send()
593 .await
594 }
595
596 pub async fn get_last_raw_message_by_subject(
620 &self,
621 stream_subject: &str,
622 ) -> Result<StreamMessage, LastRawMessageError> {
623 self.raw_message_builder()
624 .last_by_subject(stream_subject.to_string())
625 .send()
626 .await
627 }
628
629 pub async fn delete_message(&self, sequence: u64) -> Result<bool, DeleteMessageError> {
653 let subject = format!("STREAM.MSG.DELETE.{}", &self.name);
654 let payload = json!({
655 "seq": sequence,
656 });
657
658 let response: Response<DeleteStatus> = self
659 .context
660 .request(subject, &payload)
661 .map_err(|err| match err.kind() {
662 RequestErrorKind::TimedOut => {
663 DeleteMessageError::new(DeleteMessageErrorKind::TimedOut)
664 }
665 _ => DeleteMessageError::with_source(DeleteMessageErrorKind::Request, err),
666 })
667 .await?;
668
669 match response {
670 Response::Err { error } => Err(DeleteMessageError::new(
671 DeleteMessageErrorKind::JetStream(error),
672 )),
673 Response::Ok(value) => Ok(value.success),
674 }
675 }
676
677 pub fn purge(&self) -> Purge<No, No> {
693 Purge::build(self)
694 }
695
696 #[deprecated(
713 since = "0.25.0",
714 note = "Overloads have been replaced with an into_future based builder. Use Stream::purge().filter(subject) instead."
715 )]
716 pub async fn purge_subject<T>(&self, subject: T) -> Result<PurgeResponse, PurgeError>
717 where
718 T: Into<String>,
719 {
720 self.purge().filter(subject).await
721 }
722
723 pub async fn create_consumer<C: IntoConsumerConfig + FromConsumer>(
747 &self,
748 config: C,
749 ) -> Result<Consumer<C>, ConsumerError> {
750 self.context
751 .create_consumer_on_stream(config, self.name.clone())
752 .await
753 }
754
755 #[cfg(feature = "server_2_10")]
779 pub async fn update_consumer<C: IntoConsumerConfig + FromConsumer>(
780 &self,
781 config: C,
782 ) -> Result<Consumer<C>, ConsumerUpdateError> {
783 self.context
784 .update_consumer_on_stream(config, self.name.clone())
785 .await
786 }
787
788 #[cfg(feature = "server_2_10")]
813 pub async fn create_consumer_strict<C: IntoConsumerConfig + FromConsumer>(
814 &self,
815 config: C,
816 ) -> Result<Consumer<C>, ConsumerCreateStrictError> {
817 self.context
818 .create_consumer_strict_on_stream(config, self.name.clone())
819 .await
820 }
821
822 pub async fn consumer_info<T: AsRef<str>>(
839 &self,
840 name: T,
841 ) -> Result<consumer::Info, ConsumerInfoError> {
842 let name = name.as_ref();
843
844 if !is_valid_name(name) {
845 return Err(ConsumerInfoError::new(ConsumerInfoErrorKind::InvalidName));
846 }
847
848 let subject = format!("CONSUMER.INFO.{}.{}", self.name, name);
849
850 match self.context.request(subject, &json!({})).await? {
851 Response::Ok(info) => Ok(info),
852 Response::Err { error } => Err(error.into()),
853 }
854 }
855
856 pub async fn get_consumer<T: FromConsumer + IntoConsumerConfig>(
875 &self,
876 name: &str,
877 ) -> Result<Consumer<T>, crate::Error> {
878 let info = self.consumer_info(name).await?;
879
880 Ok(Consumer::new(
881 T::try_from_consumer_config(info.config.clone())?,
882 info,
883 self.context.clone(),
884 ))
885 }
886
887 pub async fn get_or_create_consumer<T: FromConsumer + IntoConsumerConfig>(
915 &self,
916 name: &str,
917 config: T,
918 ) -> Result<Consumer<T>, ConsumerError> {
919 let subject = format!("CONSUMER.INFO.{}.{}", self.name, name);
920
921 match self.context.request(subject, &json!({})).await? {
922 Response::Err { error } if error.code() == 404 => self.create_consumer(config).await,
923 Response::Err { error } => Err(error.into()),
924 Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
925 T::try_from_consumer_config(info.config.clone()).map_err(|err| {
926 ConsumerError::with_source(ConsumerErrorKind::InvalidConsumerType, err)
927 })?,
928 info,
929 self.context.clone(),
930 )),
931 }
932 }
933
934 pub async fn delete_consumer(&self, name: &str) -> Result<DeleteStatus, ConsumerError> {
955 let subject = format!("CONSUMER.DELETE.{}.{}", self.name, name);
956
957 match self.context.request(subject, &json!({})).await? {
958 Response::Ok(delete_status) => Ok(delete_status),
959 Response::Err { error } => Err(error.into()),
960 }
961 }
962
963 #[cfg(feature = "server_2_11")]
987 pub async fn pause_consumer(
988 &self,
989 name: &str,
990 pause_until: OffsetDateTime,
991 ) -> Result<PauseResponse, ConsumerError> {
992 self.request_pause_consumer(name, Some(pause_until)).await
993 }
994
995 #[cfg(feature = "server_2_11")]
1016 pub async fn resume_consumer(&self, name: &str) -> Result<PauseResponse, ConsumerError> {
1017 self.request_pause_consumer(name, None).await
1018 }
1019
1020 #[cfg(feature = "server_2_11")]
1021 async fn request_pause_consumer(
1022 &self,
1023 name: &str,
1024 pause_until: Option<OffsetDateTime>,
1025 ) -> Result<PauseResponse, ConsumerError> {
1026 let subject = format!("CONSUMER.PAUSE.{}.{}", self.name, name);
1027 let payload = &PauseResumeConsumerRequest { pause_until };
1028
1029 match self.context.request(subject, payload).await? {
1030 Response::Ok::<PauseResponse>(resp) => Ok(resp),
1031 Response::Err { error } => Err(error.into()),
1032 }
1033 }
1034
1035 pub fn consumer_names(&self) -> ConsumerNames {
1054 ConsumerNames {
1055 context: self.context.clone(),
1056 stream: self.name.clone(),
1057 offset: 0,
1058 page_request: None,
1059 consumers: Vec::new(),
1060 done: false,
1061 }
1062 }
1063
1064 pub fn consumers(&self) -> Consumers {
1083 Consumers {
1084 context: self.context.clone(),
1085 stream: self.name.clone(),
1086 offset: 0,
1087 page_request: None,
1088 consumers: Vec::new(),
1089 done: false,
1090 }
1091 }
1092}
1093
1094pub struct StreamInfoBuilder {
1095 pub(crate) context: Context,
1096 pub(crate) name: String,
1097 pub(crate) deleted: bool,
1098 pub(crate) subject: String,
1099}
1100
1101impl StreamInfoBuilder {
1102 fn new(context: Context, name: String) -> Self {
1103 Self {
1104 context,
1105 name,
1106 deleted: false,
1107 subject: "".to_string(),
1108 }
1109 }
1110
1111 pub fn with_deleted(mut self, deleted: bool) -> Self {
1112 self.deleted = deleted;
1113 self
1114 }
1115
1116 pub fn subjects<S: Into<String>>(mut self, subject: S) -> Self {
1117 self.subject = subject.into();
1118 self
1119 }
1120
1121 pub async fn fetch(self) -> Result<InfoWithSubjects, InfoError> {
1122 let info = stream_info_with_details(
1123 self.context.clone(),
1124 self.name.clone(),
1125 0,
1126 self.deleted,
1127 self.subject.clone(),
1128 )
1129 .await?;
1130
1131 Ok(InfoWithSubjects::new(self.context, info, self.subject))
1132 }
1133}
1134
1135#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
1139pub struct Config {
1140 pub name: String,
1142 #[serde(default)]
1144 pub max_bytes: i64,
1145 #[serde(default, rename = "max_msgs")]
1147 pub max_messages: i64,
1148 #[serde(default, rename = "max_msgs_per_subject")]
1150 pub max_messages_per_subject: i64,
1151 pub discard: DiscardPolicy,
1154 #[serde(default, skip_serializing_if = "is_default")]
1156 pub discard_new_per_subject: bool,
1157 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1160 pub subjects: Vec<String>,
1161 pub retention: RetentionPolicy,
1163 #[serde(default)]
1165 pub max_consumers: i32,
1166 #[serde(default, with = "serde_nanos")]
1168 pub max_age: Duration,
1169 #[serde(default, skip_serializing_if = "is_default", rename = "max_msg_size")]
1171 pub max_message_size: i32,
1172 pub storage: StorageType,
1174 pub num_replicas: usize,
1176 #[serde(default, skip_serializing_if = "is_default")]
1178 pub no_ack: bool,
1179 #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
1181 pub duplicate_window: Duration,
1182 #[serde(default, skip_serializing_if = "is_default")]
1184 pub template_owner: String,
1185 #[serde(default, skip_serializing_if = "is_default")]
1187 pub sealed: bool,
1188 #[serde(default, skip_serializing_if = "is_default")]
1190 pub description: Option<String>,
1191 #[serde(
1192 default,
1193 rename = "allow_rollup_hdrs",
1194 skip_serializing_if = "is_default"
1195 )]
1196 pub allow_rollup: bool,
1198 #[serde(default, skip_serializing_if = "is_default")]
1199 pub deny_delete: bool,
1201 #[serde(default, skip_serializing_if = "is_default")]
1203 pub deny_purge: bool,
1204
1205 #[serde(default, skip_serializing_if = "is_default")]
1207 pub republish: Option<Republish>,
1208
1209 #[serde(default, skip_serializing_if = "is_default")]
1212 pub allow_direct: bool,
1213
1214 #[serde(default, skip_serializing_if = "is_default")]
1216 pub mirror_direct: bool,
1217
1218 #[serde(default, skip_serializing_if = "Option::is_none")]
1220 pub mirror: Option<Source>,
1221
1222 #[serde(default, skip_serializing_if = "Option::is_none")]
1224 pub sources: Option<Vec<Source>>,
1225
1226 #[cfg(feature = "server_2_10")]
1227 #[serde(default, skip_serializing_if = "is_default")]
1229 pub metadata: HashMap<String, String>,
1230
1231 #[cfg(feature = "server_2_10")]
1232 #[serde(default, skip_serializing_if = "Option::is_none")]
1234 pub subject_transform: Option<SubjectTransform>,
1235
1236 #[cfg(feature = "server_2_10")]
1237 #[serde(default, skip_serializing_if = "Option::is_none")]
1242 pub compression: Option<Compression>,
1243 #[cfg(feature = "server_2_10")]
1244 #[serde(default, deserialize_with = "default_consumer_limits_as_none")]
1246 pub consumer_limits: Option<ConsumerLimits>,
1247
1248 #[cfg(feature = "server_2_10")]
1249 #[serde(default, skip_serializing_if = "Option::is_none", rename = "first_seq")]
1251 pub first_sequence: Option<u64>,
1252
1253 #[serde(default, skip_serializing_if = "Option::is_none")]
1255 pub placement: Option<Placement>,
1256
1257 #[serde(default, skip_serializing_if = "Option::is_none")]
1259 pub persist_mode: Option<PersistenceMode>,
1260
1261 #[cfg(feature = "server_2_11")]
1263 #[serde(
1264 default,
1265 with = "rfc3339::option",
1266 skip_serializing_if = "Option::is_none"
1267 )]
1268 pub pause_until: Option<OffsetDateTime>,
1269
1270 #[cfg(feature = "server_2_11")]
1272 #[serde(default, skip_serializing_if = "is_default", rename = "allow_msg_ttl")]
1273 pub allow_message_ttl: bool,
1274
1275 #[cfg(feature = "server_2_11")]
1278 #[serde(default, skip_serializing_if = "Option::is_none", with = "serde_nanos")]
1279 pub subject_delete_marker_ttl: Option<Duration>,
1280
1281 #[cfg(feature = "server_2_12")]
1283 #[serde(default, skip_serializing_if = "is_default", rename = "allow_atomic")]
1284 pub allow_atomic_publish: bool,
1285
1286 #[cfg(feature = "server_2_12")]
1288 #[serde(
1289 default,
1290 skip_serializing_if = "is_default",
1291 rename = "allow_msg_schedules"
1292 )]
1293 pub allow_message_schedules: bool,
1294
1295 #[cfg(feature = "server_2_12")]
1297 #[serde(
1298 default,
1299 skip_serializing_if = "is_default",
1300 rename = "allow_msg_counter"
1301 )]
1302 pub allow_message_counter: bool,
1303}
1304
1305impl From<&Config> for Config {
1306 fn from(sc: &Config) -> Config {
1307 sc.clone()
1308 }
1309}
1310
1311impl From<&str> for Config {
1312 fn from(s: &str) -> Config {
1313 Config {
1314 name: s.to_string(),
1315 ..Default::default()
1316 }
1317 }
1318}
1319
1320#[cfg(feature = "server_2_10")]
1321fn default_consumer_limits_as_none<'de, D>(
1322 deserializer: D,
1323) -> Result<Option<ConsumerLimits>, D::Error>
1324where
1325 D: Deserializer<'de>,
1326{
1327 let consumer_limits = Option::<ConsumerLimits>::deserialize(deserializer)?;
1328 if let Some(cl) = consumer_limits {
1329 if cl == ConsumerLimits::default() {
1330 Ok(None)
1331 } else {
1332 Ok(Some(cl))
1333 }
1334 } else {
1335 Ok(None)
1336 }
1337}
1338#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Default)]
1339pub struct ConsumerLimits {
1340 #[serde(default, with = "serde_nanos")]
1342 pub inactive_threshold: std::time::Duration,
1343 #[serde(default)]
1345 pub max_ack_pending: i64,
1346}
1347
1348#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1349pub enum Compression {
1350 #[serde(rename = "s2")]
1351 S2,
1352 #[serde(rename = "none")]
1353 None,
1354}
1355
1356#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1358pub struct SubjectTransform {
1359 #[serde(rename = "src")]
1360 pub source: String,
1361
1362 #[serde(rename = "dest")]
1363 pub destination: String,
1364}
1365
1366#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1368pub struct Republish {
1369 #[serde(rename = "src")]
1371 pub source: String,
1372 #[serde(rename = "dest")]
1374 pub destination: String,
1375 #[serde(default)]
1377 pub headers_only: bool,
1378}
1379
1380#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
1382pub struct Placement {
1383 #[serde(default, skip_serializing_if = "is_default")]
1385 pub cluster: Option<String>,
1386 #[serde(default, skip_serializing_if = "is_default")]
1388 pub tags: Vec<String>,
1389}
1390
1391#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1394#[repr(u8)]
1395pub enum DiscardPolicy {
1396 #[default]
1398 #[serde(rename = "old")]
1399 Old = 0,
1400 #[serde(rename = "new")]
1402 New = 1,
1403}
1404
1405#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1407#[repr(u8)]
1408pub enum RetentionPolicy {
1409 #[default]
1412 #[serde(rename = "limits")]
1413 Limits = 0,
1414 #[serde(rename = "interest")]
1416 Interest = 1,
1417 #[serde(rename = "workqueue")]
1419 WorkQueue = 2,
1420}
1421
1422#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1424#[repr(u8)]
1425pub enum StorageType {
1426 #[default]
1428 #[serde(rename = "file")]
1429 File = 0,
1430 #[serde(rename = "memory")]
1432 Memory = 1,
1433}
1434
1435#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1437#[repr(u8)]
1438pub enum PersistenceMode {
1439 #[default]
1441 #[serde(rename = "default")]
1442 Default = 0,
1443 #[serde(rename = "async")]
1445 Async = 1,
1446}
1447
1448async fn stream_info_with_details(
1449 context: Context,
1450 stream: String,
1451 offset: usize,
1452 deleted_details: bool,
1453 subjects_filter: String,
1454) -> Result<Info, InfoError> {
1455 let subject = format!("STREAM.INFO.{stream}");
1456
1457 let payload = StreamInfoRequest {
1458 offset,
1459 deleted_details,
1460 subjects_filter,
1461 };
1462
1463 let response: Response<Info> = context.request(subject, &payload).await?;
1464
1465 match response {
1466 Response::Ok(info) => Ok(info),
1467 Response::Err { error } => Err(error.into()),
1468 }
1469}
1470
1471type InfoRequest = BoxFuture<'static, Result<Info, InfoError>>;
1472
1473#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
1474pub struct StreamInfoRequest {
1475 offset: usize,
1476 deleted_details: bool,
1477 subjects_filter: String,
1478}
1479
1480pub struct InfoWithSubjects {
1481 stream: String,
1482 context: Context,
1483 pub info: Info,
1484 offset: usize,
1485 subjects: collections::hash_map::IntoIter<String, usize>,
1486 info_request: Option<InfoRequest>,
1487 subjects_filter: String,
1488 pages_done: bool,
1489}
1490
1491impl InfoWithSubjects {
1492 pub fn new(context: Context, mut info: Info, subject: String) -> Self {
1493 let subjects = info.state.subjects.take().unwrap_or_default();
1494 let name = info.config.name.clone();
1495 InfoWithSubjects {
1496 context,
1497 info,
1498 pages_done: subjects.is_empty(),
1499 offset: subjects.len(),
1500 subjects: subjects.into_iter(),
1501 subjects_filter: subject,
1502 stream: name,
1503 info_request: None,
1504 }
1505 }
1506}
1507
1508impl futures_util::Stream for InfoWithSubjects {
1509 type Item = Result<(String, usize), InfoError>;
1510
1511 fn poll_next(
1512 mut self: Pin<&mut Self>,
1513 cx: &mut std::task::Context<'_>,
1514 ) -> Poll<Option<Self::Item>> {
1515 match self.subjects.next() {
1516 Some((subject, count)) => Poll::Ready(Some(Ok((subject, count)))),
1517 None => {
1518 if self.pages_done {
1520 return Poll::Ready(None);
1521 }
1522 let stream = self.stream.clone();
1523 let context = self.context.clone();
1524 let subjects_filter = self.subjects_filter.clone();
1525 let offset = self.offset;
1526 match self
1527 .info_request
1528 .get_or_insert_with(|| {
1529 Box::pin(stream_info_with_details(
1530 context,
1531 stream,
1532 offset,
1533 false,
1534 subjects_filter,
1535 ))
1536 })
1537 .poll_unpin(cx)
1538 {
1539 Poll::Ready(resp) => match resp {
1540 Ok(info) => {
1541 let subjects = info.state.subjects.clone();
1542 self.offset += subjects.as_ref().map_or_else(|| 0, |s| s.len());
1543 self.info_request = None;
1544 let subjects = subjects.unwrap_or_default();
1545 self.subjects = info.state.subjects.unwrap_or_default().into_iter();
1546 let total = info.paged_info.map(|info| info.total).unwrap_or(0);
1547 if total <= self.offset || subjects.is_empty() {
1548 self.pages_done = true;
1549 }
1550 match self.subjects.next() {
1551 Some((subject, count)) => Poll::Ready(Some(Ok((subject, count)))),
1552 None => Poll::Ready(None),
1553 }
1554 }
1555 Err(err) => Poll::Ready(Some(Err(err))),
1556 },
1557 Poll::Pending => Poll::Pending,
1558 }
1559 }
1560 }
1561 }
1562}
1563
1564#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1566pub struct Info {
1567 pub config: Config,
1569 #[serde(with = "rfc3339")]
1571 pub created: time::OffsetDateTime,
1572 pub state: State,
1574 pub cluster: Option<ClusterInfo>,
1576 #[serde(default)]
1578 pub mirror: Option<SourceInfo>,
1579 #[serde(default)]
1581 pub sources: Vec<SourceInfo>,
1582 #[serde(flatten)]
1583 paged_info: Option<PagedInfo>,
1584}
1585
1586#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1587pub struct PagedInfo {
1588 offset: usize,
1589 total: usize,
1590 limit: usize,
1591}
1592
1593#[derive(Deserialize)]
1594pub struct DeleteStatus {
1595 pub success: bool,
1596}
1597
1598#[cfg(feature = "server_2_11")]
1599#[derive(Deserialize)]
1600pub struct PauseResponse {
1601 pub paused: bool,
1602 #[serde(with = "rfc3339")]
1603 pub pause_until: OffsetDateTime,
1604 #[serde(default, with = "serde_nanos")]
1605 pub pause_remaining: Option<Duration>,
1606}
1607
1608#[cfg(feature = "server_2_11")]
1609#[derive(Serialize, Debug)]
1610struct PauseResumeConsumerRequest {
1611 #[serde(with = "rfc3339::option", skip_serializing_if = "Option::is_none")]
1612 pause_until: Option<OffsetDateTime>,
1613}
1614
1615#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1617pub struct State {
1618 pub messages: u64,
1620 pub bytes: u64,
1622 #[serde(rename = "first_seq")]
1624 pub first_sequence: u64,
1625 #[serde(with = "rfc3339", rename = "first_ts")]
1627 pub first_timestamp: time::OffsetDateTime,
1628 #[serde(rename = "last_seq")]
1630 pub last_sequence: u64,
1631 #[serde(with = "rfc3339", rename = "last_ts")]
1633 pub last_timestamp: time::OffsetDateTime,
1634 pub consumer_count: usize,
1636 #[serde(default, rename = "num_subjects")]
1638 pub subjects_count: u64,
1639 #[serde(default, rename = "num_deleted")]
1641 pub deleted_count: Option<u64>,
1642 #[serde(default)]
1645 pub deleted: Option<Vec<u64>>,
1646
1647 pub(crate) subjects: Option<HashMap<String, usize>>,
1648}
1649
1650#[derive(Debug, Serialize, Deserialize, Clone)]
1652pub struct RawMessage {
1653 #[serde(rename = "subject")]
1655 pub subject: String,
1656
1657 #[serde(rename = "seq")]
1659 pub sequence: u64,
1660
1661 #[serde(default, rename = "data")]
1663 pub payload: String,
1664
1665 #[serde(default, rename = "hdrs")]
1667 pub headers: Option<String>,
1668
1669 #[serde(rename = "time", with = "rfc3339")]
1671 pub time: time::OffsetDateTime,
1672}
1673
1674impl TryFrom<RawMessage> for StreamMessage {
1675 type Error = crate::Error;
1676
1677 fn try_from(value: RawMessage) -> Result<Self, Self::Error> {
1678 let decoded_payload = STANDARD
1679 .decode(value.payload)
1680 .map_err(|err| Box::new(std::io::Error::other(err)))?;
1681 let decoded_headers = value
1682 .headers
1683 .map(|header| STANDARD.decode(header))
1684 .map_or(Ok(None), |v| v.map(Some))?;
1685
1686 let (headers, _, _) = decoded_headers
1687 .map_or_else(|| Ok((HeaderMap::new(), None, None)), |h| parse_headers(&h))?;
1688
1689 Ok(StreamMessage {
1690 subject: value.subject.into(),
1691 payload: decoded_payload.into(),
1692 headers,
1693 sequence: value.sequence,
1694 time: value.time,
1695 })
1696 }
1697}
1698
1699fn is_continuation(c: char) -> bool {
1700 c == ' ' || c == '\t'
1701}
1702const HEADER_LINE: &str = "NATS/1.0";
1703
1704#[allow(clippy::type_complexity)]
1705fn parse_headers(
1706 buf: &[u8],
1707) -> Result<(HeaderMap, Option<StatusCode>, Option<String>), crate::Error> {
1708 let mut headers = HeaderMap::new();
1709 let mut maybe_status: Option<StatusCode> = None;
1710 let mut maybe_description: Option<String> = None;
1711 let mut lines = if let Ok(line) = std::str::from_utf8(buf) {
1712 line.lines().peekable()
1713 } else {
1714 return Err(Box::new(std::io::Error::other("invalid header")));
1715 };
1716
1717 if let Some(line) = lines.next() {
1718 let line = line
1719 .strip_prefix(HEADER_LINE)
1720 .ok_or_else(|| {
1721 Box::new(std::io::Error::other(
1722 "version line does not start with NATS/1.0",
1723 ))
1724 })?
1725 .trim();
1726
1727 match line.split_once(' ') {
1728 Some((status, description)) => {
1729 if !status.is_empty() {
1730 maybe_status = Some(status.parse()?);
1731 }
1732
1733 if !description.is_empty() {
1734 maybe_description = Some(description.trim().to_string());
1735 }
1736 }
1737 None => {
1738 if !line.is_empty() {
1739 maybe_status = Some(line.parse()?);
1740 }
1741 }
1742 }
1743 } else {
1744 return Err(Box::new(std::io::Error::other(
1745 "expected header information not found",
1746 )));
1747 };
1748
1749 while let Some(line) = lines.next() {
1750 if line.is_empty() {
1751 continue;
1752 }
1753
1754 if let Some((k, v)) = line.split_once(':').to_owned() {
1755 let mut s = String::from(v.trim());
1756 while let Some(v) = lines.next_if(|s| s.starts_with(is_continuation)).to_owned() {
1757 s.push(' ');
1758 s.push_str(v.trim());
1759 }
1760
1761 headers.insert(
1762 HeaderName::from_str(k)?,
1763 HeaderValue::from_str(&s).map_err(|err| Box::new(io::Error::other(err)))?,
1764 );
1765 } else {
1766 return Err(Box::new(std::io::Error::other("malformed header line")));
1767 }
1768 }
1769
1770 if headers.is_empty() {
1771 Ok((HeaderMap::new(), maybe_status, maybe_description))
1772 } else {
1773 Ok((headers, maybe_status, maybe_description))
1774 }
1775}
1776
1777#[derive(Debug, Serialize, Deserialize, Clone)]
1778struct GetRawMessage {
1779 pub(crate) message: RawMessage,
1780}
1781
1782fn is_default<T: Default + Eq>(t: &T) -> bool {
1783 t == &T::default()
1784}
1785#[derive(Debug, Default, Deserialize, Clone, PartialEq, Eq)]
1787pub struct ClusterInfo {
1788 #[serde(default)]
1790 pub name: Option<String>,
1791 #[serde(default)]
1793 pub raft_group: Option<String>,
1794 #[serde(default)]
1796 pub leader: Option<String>,
1797 #[serde(default, with = "rfc3339::option")]
1799 pub leader_since: Option<OffsetDateTime>,
1800 #[cfg(feature = "server_2_12")]
1802 #[serde(default)]
1803 pub system_account: bool,
1805 #[cfg(feature = "server_2_12")]
1806 #[serde(default)]
1808 pub traffic_account: Option<String>,
1809 #[serde(default)]
1811 pub replicas: Vec<PeerInfo>,
1812}
1813
1814#[derive(Debug, Default, Deserialize, Clone, PartialEq, Eq)]
1816pub struct PeerInfo {
1817 pub name: String,
1819 pub current: bool,
1821 #[serde(with = "serde_nanos")]
1823 pub active: Duration,
1824 #[serde(default)]
1826 pub offline: bool,
1827 pub lag: Option<u64>,
1829}
1830
1831#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
1832pub struct SourceInfo {
1833 pub name: String,
1835 pub lag: u64,
1837 #[serde(deserialize_with = "negative_duration_as_none")]
1839 pub active: Option<std::time::Duration>,
1840 #[serde(default)]
1842 pub filter_subject: Option<String>,
1843 #[serde(default)]
1845 pub subject_transform_dest: Option<String>,
1846 #[serde(default)]
1848 pub subject_transforms: Vec<SubjectTransform>,
1849}
1850
1851fn negative_duration_as_none<'de, D>(
1852 deserializer: D,
1853) -> Result<Option<std::time::Duration>, D::Error>
1854where
1855 D: Deserializer<'de>,
1856{
1857 let n = i64::deserialize(deserializer)?;
1858 if n.is_negative() {
1859 Ok(None)
1860 } else {
1861 Ok(Some(std::time::Duration::from_nanos(n as u64)))
1862 }
1863}
1864
1865#[derive(Debug, Deserialize, Clone, Copy)]
1867pub struct PurgeResponse {
1868 pub success: bool,
1870 pub purged: u64,
1872}
1873#[derive(Default, Debug, Serialize, Clone)]
1875pub struct PurgeRequest {
1876 #[serde(default, rename = "seq", skip_serializing_if = "is_default")]
1878 pub sequence: Option<u64>,
1879
1880 #[serde(default, skip_serializing_if = "is_default")]
1882 pub filter: Option<String>,
1883
1884 #[serde(default, skip_serializing_if = "is_default")]
1886 pub keep: Option<u64>,
1887}
1888
1889#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
1890pub struct Source {
1891 pub name: String,
1893 #[serde(default, rename = "opt_start_seq", skip_serializing_if = "is_default")]
1895 pub start_sequence: Option<u64>,
1896 #[serde(
1897 default,
1898 rename = "opt_start_time",
1899 skip_serializing_if = "is_default",
1900 with = "rfc3339::option"
1901 )]
1902 pub start_time: Option<OffsetDateTime>,
1904 #[serde(default, skip_serializing_if = "is_default")]
1906 pub filter_subject: Option<String>,
1907 #[serde(default, skip_serializing_if = "Option::is_none")]
1909 pub external: Option<External>,
1910 #[serde(default, skip_serializing_if = "is_default")]
1912 pub domain: Option<String>,
1913 #[cfg(feature = "server_2_10")]
1915 #[serde(default, skip_serializing_if = "is_default")]
1916 pub subject_transforms: Vec<SubjectTransform>,
1917}
1918
1919#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
1920pub struct External {
1921 #[serde(rename = "api")]
1923 pub api_prefix: String,
1924 #[serde(rename = "deliver", skip_serializing_if = "is_default")]
1926 pub delivery_prefix: Option<String>,
1927}
1928
1929use std::marker::PhantomData;
1930
1931#[derive(Debug, Default)]
1932pub struct Yes;
1933#[derive(Debug, Default)]
1934pub struct No;
1935
1936pub trait ToAssign: Debug {}
1937
1938impl ToAssign for Yes {}
1939impl ToAssign for No {}
1940
1941#[derive(Debug)]
1942pub struct Purge<SEQUENCE, KEEP>
1943where
1944 SEQUENCE: ToAssign,
1945 KEEP: ToAssign,
1946{
1947 inner: PurgeRequest,
1948 sequence_set: PhantomData<SEQUENCE>,
1949 keep_set: PhantomData<KEEP>,
1950 context: Context,
1951 stream_name: String,
1952}
1953
1954impl<SEQUENCE, KEEP> Purge<SEQUENCE, KEEP>
1955where
1956 SEQUENCE: ToAssign,
1957 KEEP: ToAssign,
1958{
1959 pub fn filter<T: Into<String>>(mut self, filter: T) -> Purge<SEQUENCE, KEEP> {
1961 self.inner.filter = Some(filter.into());
1962 self
1963 }
1964}
1965
1966impl Purge<No, No> {
1967 pub(crate) fn build<I>(stream: &Stream<I>) -> Purge<No, No> {
1968 Purge {
1969 context: stream.context.clone(),
1970 stream_name: stream.name.clone(),
1971 inner: Default::default(),
1972 sequence_set: PhantomData {},
1973 keep_set: PhantomData {},
1974 }
1975 }
1976}
1977
1978impl<KEEP> Purge<No, KEEP>
1979where
1980 KEEP: ToAssign,
1981{
1982 pub fn keep(self, keep: u64) -> Purge<No, Yes> {
1985 Purge {
1986 context: self.context.clone(),
1987 stream_name: self.stream_name.clone(),
1988 sequence_set: PhantomData {},
1989 keep_set: PhantomData {},
1990 inner: PurgeRequest {
1991 keep: Some(keep),
1992 ..self.inner
1993 },
1994 }
1995 }
1996}
1997impl<SEQUENCE> Purge<SEQUENCE, No>
1998where
1999 SEQUENCE: ToAssign,
2000{
2001 pub fn sequence(self, sequence: u64) -> Purge<Yes, No> {
2004 Purge {
2005 context: self.context.clone(),
2006 stream_name: self.stream_name.clone(),
2007 sequence_set: PhantomData {},
2008 keep_set: PhantomData {},
2009 inner: PurgeRequest {
2010 sequence: Some(sequence),
2011 ..self.inner
2012 },
2013 }
2014 }
2015}
2016
2017#[derive(Clone, Debug, PartialEq)]
2018pub enum PurgeErrorKind {
2019 Request,
2020 TimedOut,
2021 JetStream(super::errors::Error),
2022}
2023
2024impl Display for PurgeErrorKind {
2025 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2026 match self {
2027 Self::Request => write!(f, "request failed"),
2028 Self::TimedOut => write!(f, "timed out"),
2029 Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2030 }
2031 }
2032}
2033
2034pub type PurgeError = Error<PurgeErrorKind>;
2035
2036impl<S, K> IntoFuture for Purge<S, K>
2037where
2038 S: ToAssign + std::marker::Send,
2039 K: ToAssign + std::marker::Send,
2040{
2041 type Output = Result<PurgeResponse, PurgeError>;
2042
2043 type IntoFuture = BoxFuture<'static, Result<PurgeResponse, PurgeError>>;
2044
2045 fn into_future(self) -> Self::IntoFuture {
2046 Box::pin(std::future::IntoFuture::into_future(async move {
2047 let request_subject = format!("STREAM.PURGE.{}", self.stream_name);
2048 let response: Response<PurgeResponse> = self
2049 .context
2050 .request(request_subject, &self.inner)
2051 .map_err(|err| match err.kind() {
2052 RequestErrorKind::TimedOut => PurgeError::new(PurgeErrorKind::TimedOut),
2053 _ => PurgeError::with_source(PurgeErrorKind::Request, err),
2054 })
2055 .await?;
2056
2057 match response {
2058 Response::Err { error } => Err(PurgeError::new(PurgeErrorKind::JetStream(error))),
2059 Response::Ok(response) => Ok(response),
2060 }
2061 }))
2062 }
2063}
2064
2065#[derive(Deserialize, Debug)]
2066struct ConsumerPage {
2067 total: usize,
2068 consumers: Option<Vec<String>>,
2069}
2070
2071#[derive(Deserialize, Debug)]
2072struct ConsumerInfoPage {
2073 total: usize,
2074 consumers: Option<Vec<super::consumer::Info>>,
2075}
2076
2077type ConsumerNamesErrorKind = StreamsErrorKind;
2078type ConsumerNamesError = StreamsError;
2079type PageRequest = BoxFuture<'static, Result<ConsumerPage, RequestError>>;
2080
2081pub struct ConsumerNames {
2082 context: Context,
2083 stream: String,
2084 offset: usize,
2085 page_request: Option<PageRequest>,
2086 consumers: Vec<String>,
2087 done: bool,
2088}
2089
2090impl futures_util::Stream for ConsumerNames {
2091 type Item = Result<String, ConsumerNamesError>;
2092
2093 fn poll_next(
2094 mut self: Pin<&mut Self>,
2095 cx: &mut std::task::Context<'_>,
2096 ) -> std::task::Poll<Option<Self::Item>> {
2097 match self.page_request.as_mut() {
2098 Some(page) => match page.try_poll_unpin(cx) {
2099 std::task::Poll::Ready(page) => {
2100 self.page_request = None;
2101 let page = page.map_err(|err| {
2102 ConsumerNamesError::with_source(ConsumerNamesErrorKind::Other, err)
2103 })?;
2104
2105 if let Some(consumers) = page.consumers {
2106 self.offset += consumers.len();
2107 self.consumers = consumers;
2108 if self.offset >= page.total {
2109 self.done = true;
2110 }
2111 match self.consumers.pop() {
2112 Some(stream) => Poll::Ready(Some(Ok(stream))),
2113 None => Poll::Ready(None),
2114 }
2115 } else {
2116 Poll::Ready(None)
2117 }
2118 }
2119 std::task::Poll::Pending => std::task::Poll::Pending,
2120 },
2121 None => {
2122 if let Some(stream) = self.consumers.pop() {
2123 Poll::Ready(Some(Ok(stream)))
2124 } else {
2125 if self.done {
2126 return Poll::Ready(None);
2127 }
2128 let context = self.context.clone();
2129 let offset = self.offset;
2130 let stream = self.stream.clone();
2131 self.page_request = Some(Box::pin(async move {
2132 match context
2133 .request(
2134 format!("CONSUMER.NAMES.{stream}"),
2135 &json!({
2136 "offset": offset,
2137 }),
2138 )
2139 .await?
2140 {
2141 Response::Err { error } => Err(RequestError::with_source(
2142 super::context::RequestErrorKind::Other,
2143 error,
2144 )),
2145 Response::Ok(page) => Ok(page),
2146 }
2147 }));
2148 self.poll_next(cx)
2149 }
2150 }
2151 }
2152 }
2153}
2154
2155pub type ConsumersErrorKind = StreamsErrorKind;
2156pub type ConsumersError = StreamsError;
2157type PageInfoRequest = BoxFuture<'static, Result<ConsumerInfoPage, RequestError>>;
2158
2159pub struct Consumers {
2160 context: Context,
2161 stream: String,
2162 offset: usize,
2163 page_request: Option<PageInfoRequest>,
2164 consumers: Vec<super::consumer::Info>,
2165 done: bool,
2166}
2167
2168impl futures_util::Stream for Consumers {
2169 type Item = Result<super::consumer::Info, ConsumersError>;
2170
2171 fn poll_next(
2172 mut self: Pin<&mut Self>,
2173 cx: &mut std::task::Context<'_>,
2174 ) -> std::task::Poll<Option<Self::Item>> {
2175 match self.page_request.as_mut() {
2176 Some(page) => match page.try_poll_unpin(cx) {
2177 std::task::Poll::Ready(page) => {
2178 self.page_request = None;
2179 let page = page.map_err(|err| {
2180 ConsumersError::with_source(ConsumersErrorKind::Other, err)
2181 })?;
2182 if let Some(consumers) = page.consumers {
2183 self.offset += consumers.len();
2184 self.consumers = consumers;
2185 if self.offset >= page.total {
2186 self.done = true;
2187 }
2188 match self.consumers.pop() {
2189 Some(consumer) => Poll::Ready(Some(Ok(consumer))),
2190 None => Poll::Ready(None),
2191 }
2192 } else {
2193 Poll::Ready(None)
2194 }
2195 }
2196 std::task::Poll::Pending => std::task::Poll::Pending,
2197 },
2198 None => {
2199 if let Some(stream) = self.consumers.pop() {
2200 Poll::Ready(Some(Ok(stream)))
2201 } else {
2202 if self.done {
2203 return Poll::Ready(None);
2204 }
2205 let context = self.context.clone();
2206 let offset = self.offset;
2207 let stream = self.stream.clone();
2208 self.page_request = Some(Box::pin(async move {
2209 match context
2210 .request(
2211 format!("CONSUMER.LIST.{stream}"),
2212 &json!({
2213 "offset": offset,
2214 }),
2215 )
2216 .await?
2217 {
2218 Response::Err { error } => Err(RequestError::with_source(
2219 super::context::RequestErrorKind::Other,
2220 error,
2221 )),
2222 Response::Ok(page) => Ok(page),
2223 }
2224 }));
2225 self.poll_next(cx)
2226 }
2227 }
2228 }
2229 }
2230}
2231
2232#[derive(Clone, Debug, PartialEq)]
2233pub enum LastRawMessageErrorKind {
2234 NoMessageFound,
2235 InvalidSubject,
2236 JetStream(super::errors::Error),
2237 Other,
2238}
2239
2240impl Display for LastRawMessageErrorKind {
2241 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2242 match self {
2243 Self::NoMessageFound => write!(f, "no message found"),
2244 Self::InvalidSubject => write!(f, "invalid subject"),
2245 Self::Other => write!(f, "failed to get last raw message"),
2246 Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2247 }
2248 }
2249}
2250
2251pub type LastRawMessageError = Error<LastRawMessageErrorKind>;
2252pub type RawMessageErrorKind = LastRawMessageErrorKind;
2253pub type RawMessageError = LastRawMessageError;
2254
2255#[derive(Clone, Debug, PartialEq)]
2256pub enum ConsumerErrorKind {
2257 TimedOut,
2259 Request,
2260 InvalidConsumerType,
2261 InvalidName,
2262 JetStream(super::errors::Error),
2263 Other,
2264}
2265
2266impl Display for ConsumerErrorKind {
2267 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2268 match self {
2269 Self::TimedOut => write!(f, "timed out"),
2270 Self::Request => write!(f, "request failed"),
2271 Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2272 Self::Other => write!(f, "consumer error"),
2273 Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2274 Self::InvalidName => write!(f, "invalid consumer name"),
2275 }
2276 }
2277}
2278
2279pub type ConsumerError = Error<ConsumerErrorKind>;
2280
2281#[derive(Clone, Debug, PartialEq)]
2282pub enum ConsumerCreateStrictErrorKind {
2283 TimedOut,
2285 Request,
2286 InvalidConsumerType,
2287 InvalidName,
2288 AlreadyExists,
2289 JetStream(super::errors::Error),
2290 Other,
2291}
2292
2293impl Display for ConsumerCreateStrictErrorKind {
2294 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2295 match self {
2296 Self::TimedOut => write!(f, "timed out"),
2297 Self::Request => write!(f, "request failed"),
2298 Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2299 Self::Other => write!(f, "consumer error"),
2300 Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2301 Self::InvalidName => write!(f, "invalid consumer name"),
2302 Self::AlreadyExists => write!(f, "consumer already exists"),
2303 }
2304 }
2305}
2306
2307pub type ConsumerCreateStrictError = Error<ConsumerCreateStrictErrorKind>;
2308
2309#[derive(Clone, Debug, PartialEq)]
2310pub enum ConsumerUpdateErrorKind {
2311 TimedOut,
2313 Request,
2314 InvalidConsumerType,
2315 InvalidName,
2316 DoesNotExist,
2317 JetStream(super::errors::Error),
2318 Other,
2319}
2320
2321impl Display for ConsumerUpdateErrorKind {
2322 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2323 match self {
2324 Self::TimedOut => write!(f, "timed out"),
2325 Self::Request => write!(f, "request failed"),
2326 Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2327 Self::Other => write!(f, "consumer error"),
2328 Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2329 Self::InvalidName => write!(f, "invalid consumer name"),
2330 Self::DoesNotExist => write!(f, "consumer does not exist"),
2331 }
2332 }
2333}
2334
2335pub type ConsumerUpdateError = Error<ConsumerUpdateErrorKind>;
2336
2337impl From<super::errors::Error> for ConsumerError {
2338 fn from(err: super::errors::Error) -> Self {
2339 ConsumerError::new(ConsumerErrorKind::JetStream(err))
2340 }
2341}
2342impl From<super::errors::Error> for ConsumerCreateStrictError {
2343 fn from(err: super::errors::Error) -> Self {
2344 if err.error_code() == super::errors::ErrorCode::CONSUMER_ALREADY_EXISTS {
2345 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::AlreadyExists)
2346 } else {
2347 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::JetStream(err))
2348 }
2349 }
2350}
2351impl From<super::errors::Error> for ConsumerUpdateError {
2352 fn from(err: super::errors::Error) -> Self {
2353 if err.error_code() == super::errors::ErrorCode::CONSUMER_DOES_NOT_EXIST {
2354 ConsumerUpdateError::new(ConsumerUpdateErrorKind::DoesNotExist)
2355 } else {
2356 ConsumerUpdateError::new(ConsumerUpdateErrorKind::JetStream(err))
2357 }
2358 }
2359}
2360impl From<ConsumerError> for ConsumerUpdateError {
2361 fn from(err: ConsumerError) -> Self {
2362 match err.kind() {
2363 ConsumerErrorKind::JetStream(err) => {
2364 if err.error_code() == super::errors::ErrorCode::CONSUMER_DOES_NOT_EXIST {
2365 ConsumerUpdateError::new(ConsumerUpdateErrorKind::DoesNotExist)
2366 } else {
2367 ConsumerUpdateError::new(ConsumerUpdateErrorKind::JetStream(err))
2368 }
2369 }
2370 ConsumerErrorKind::Request => {
2371 ConsumerUpdateError::new(ConsumerUpdateErrorKind::Request)
2372 }
2373 ConsumerErrorKind::TimedOut => {
2374 ConsumerUpdateError::new(ConsumerUpdateErrorKind::TimedOut)
2375 }
2376 ConsumerErrorKind::InvalidConsumerType => {
2377 ConsumerUpdateError::new(ConsumerUpdateErrorKind::InvalidConsumerType)
2378 }
2379 ConsumerErrorKind::InvalidName => {
2380 ConsumerUpdateError::new(ConsumerUpdateErrorKind::InvalidName)
2381 }
2382 ConsumerErrorKind::Other => ConsumerUpdateError::new(ConsumerUpdateErrorKind::Other),
2383 }
2384 }
2385}
2386
2387impl From<ConsumerError> for ConsumerCreateStrictError {
2388 fn from(err: ConsumerError) -> Self {
2389 match err.kind() {
2390 ConsumerErrorKind::JetStream(err) => {
2391 if err.error_code() == super::errors::ErrorCode::CONSUMER_ALREADY_EXISTS {
2392 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::AlreadyExists)
2393 } else {
2394 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::JetStream(err))
2395 }
2396 }
2397 ConsumerErrorKind::Request => {
2398 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::Request)
2399 }
2400 ConsumerErrorKind::TimedOut => {
2401 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::TimedOut)
2402 }
2403 ConsumerErrorKind::InvalidConsumerType => {
2404 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::InvalidConsumerType)
2405 }
2406 ConsumerErrorKind::InvalidName => {
2407 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::InvalidName)
2408 }
2409 ConsumerErrorKind::Other => {
2410 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::Other)
2411 }
2412 }
2413 }
2414}
2415
2416impl From<super::context::RequestError> for ConsumerError {
2417 fn from(err: super::context::RequestError) -> Self {
2418 match err.kind() {
2419 RequestErrorKind::TimedOut => ConsumerError::new(ConsumerErrorKind::TimedOut),
2420 _ => ConsumerError::with_source(ConsumerErrorKind::Request, err),
2421 }
2422 }
2423}
2424impl From<super::context::RequestError> for ConsumerUpdateError {
2425 fn from(err: super::context::RequestError) -> Self {
2426 match err.kind() {
2427 RequestErrorKind::TimedOut => {
2428 ConsumerUpdateError::new(ConsumerUpdateErrorKind::TimedOut)
2429 }
2430 _ => ConsumerUpdateError::with_source(ConsumerUpdateErrorKind::Request, err),
2431 }
2432 }
2433}
2434impl From<super::context::RequestError> for ConsumerCreateStrictError {
2435 fn from(err: super::context::RequestError) -> Self {
2436 match err.kind() {
2437 RequestErrorKind::TimedOut => {
2438 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::TimedOut)
2439 }
2440 _ => {
2441 ConsumerCreateStrictError::with_source(ConsumerCreateStrictErrorKind::Request, err)
2442 }
2443 }
2444 }
2445}
2446
2447#[derive(Debug, Serialize, Default)]
2448pub struct DirectGetRequest {
2449 #[serde(rename = "seq", skip_serializing_if = "Option::is_none")]
2450 sequence: Option<u64>,
2451 #[serde(rename = "last_by_subj", skip_serializing)]
2452 last_by_subject: Option<String>,
2453 #[serde(rename = "next_by_subj", skip_serializing_if = "Option::is_none")]
2454 next_by_subject: Option<String>,
2455}
2456
2457pub struct WithHeaders;
2459
2460pub struct WithoutHeaders;
2462
2463trait DirectGetResponse: Sized {
2465 fn from_message(message: crate::Message) -> Result<Self, DirectGetError>;
2466}
2467
2468impl DirectGetResponse for StreamMessage {
2469 fn from_message(message: crate::Message) -> Result<Self, DirectGetError> {
2470 StreamMessage::try_from(message).map_err(Into::into)
2471 }
2472}
2473
2474impl DirectGetResponse for StreamValue {
2475 fn from_message(message: crate::Message) -> Result<Self, DirectGetError> {
2476 Ok(StreamValue {
2477 data: message.payload,
2478 })
2479 }
2480}
2481
2482pub struct DirectGetBuilder<T = WithHeaders> {
2483 context: Context,
2484 stream_name: String,
2485 request: DirectGetRequest,
2486 _phantom: std::marker::PhantomData<T>,
2487}
2488
2489impl DirectGetBuilder<WithHeaders> {
2490 fn new(context: Context, stream_name: String) -> DirectGetBuilder<WithHeaders> {
2491 DirectGetBuilder {
2492 context,
2493 stream_name,
2494 request: DirectGetRequest::default(),
2495 _phantom: std::marker::PhantomData,
2496 }
2497 }
2498}
2499
2500impl<T> DirectGetBuilder<T> {
2501 async fn send_internal<R: DirectGetResponse>(&self) -> Result<R, DirectGetError> {
2503 let payload = if self.request.last_by_subject.is_some() {
2506 Bytes::new()
2507 } else {
2508 serde_json::to_vec(&self.request).map(Bytes::from)?
2509 };
2510
2511 let request_subject = if let Some(ref subject) = self.request.last_by_subject {
2512 format!(
2513 "{}.DIRECT.GET.{}.{}",
2514 &self.context.prefix, &self.stream_name, subject
2515 )
2516 } else {
2517 format!("{}.DIRECT.GET.{}", &self.context.prefix, &self.stream_name)
2518 };
2519
2520 let response = self
2521 .context
2522 .client
2523 .request(request_subject, payload)
2524 .await?;
2525
2526 if let Some(status) = response.status {
2528 if let Some(ref description) = response.description {
2529 match status {
2530 StatusCode::NOT_FOUND => {
2531 return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
2532 }
2533 StatusCode::TIMEOUT => {
2535 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
2536 }
2537 _ => {
2538 return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
2539 status,
2540 description.to_string(),
2541 )));
2542 }
2543 }
2544 }
2545 }
2546
2547 R::from_message(response)
2548 }
2549
2550 pub fn sequence(mut self, seq: u64) -> Self {
2552 self.request.sequence = Some(seq);
2553 self
2554 }
2555
2556 pub fn last_by_subject<S: Into<String>>(mut self, subject: S) -> Self {
2558 self.request.last_by_subject = Some(subject.into());
2559 self
2560 }
2561
2562 pub fn next_by_subject<S: Into<String>>(mut self, subject: S) -> Self {
2564 self.request.next_by_subject = Some(subject.into());
2565 self
2566 }
2567}
2568
2569impl DirectGetBuilder<WithHeaders> {
2570 pub async fn send(self) -> Result<StreamMessage, DirectGetError> {
2572 self.send_internal::<StreamMessage>().await
2573 }
2574}
2575
2576impl DirectGetBuilder<WithoutHeaders> {
2577 pub async fn send(self) -> Result<StreamValue, DirectGetError> {
2579 self.send_internal::<StreamValue>().await
2580 }
2581}
2582
2583pub struct StreamValue {
2584 pub data: Bytes,
2585}
2586
2587#[derive(Debug, Serialize, Default)]
2588pub struct RawMessageRequest {
2589 #[serde(rename = "seq", skip_serializing_if = "Option::is_none")]
2590 sequence: Option<u64>,
2591 #[serde(rename = "last_by_subj", skip_serializing_if = "Option::is_none")]
2592 last_by_subject: Option<String>,
2593 #[serde(rename = "next_by_subj", skip_serializing_if = "Option::is_none")]
2594 next_by_subject: Option<String>,
2595}
2596
2597trait RawMessageResponse: Sized {
2599 fn from_raw_message(message: RawMessage) -> Result<Self, RawMessageError>;
2600}
2601
2602impl RawMessageResponse for StreamMessage {
2603 fn from_raw_message(message: RawMessage) -> Result<Self, RawMessageError> {
2604 StreamMessage::try_from(message)
2605 .map_err(|err| RawMessageError::with_source(RawMessageErrorKind::Other, err))
2606 }
2607}
2608
2609impl RawMessageResponse for StreamValue {
2610 fn from_raw_message(message: RawMessage) -> Result<Self, RawMessageError> {
2611 use base64::engine::general_purpose::STANDARD;
2612 use base64::Engine;
2613
2614 let decoded_payload = STANDARD.decode(message.payload).map_err(|err| {
2615 RawMessageError::with_source(
2616 RawMessageErrorKind::Other,
2617 Box::new(std::io::Error::other(err)),
2618 )
2619 })?;
2620
2621 Ok(StreamValue {
2622 data: decoded_payload.into(),
2623 })
2624 }
2625}
2626
2627pub struct RawMessageBuilder<T = WithHeaders> {
2628 context: Context,
2629 stream_name: String,
2630 request: RawMessageRequest,
2631 _phantom: std::marker::PhantomData<T>,
2632}
2633
2634impl RawMessageBuilder<WithHeaders> {
2635 fn new(context: Context, stream_name: String) -> Self {
2636 RawMessageBuilder {
2637 context,
2638 stream_name,
2639 request: RawMessageRequest::default(),
2640 _phantom: std::marker::PhantomData,
2641 }
2642 }
2643}
2644
2645impl<T> RawMessageBuilder<T> {
2646 async fn send_internal<R: RawMessageResponse>(&self) -> Result<R, RawMessageError> {
2648 for subject in [&self.request.last_by_subject, &self.request.next_by_subject]
2650 .into_iter()
2651 .flatten()
2652 {
2653 if !is_valid_subject(subject) {
2654 return Err(RawMessageError::new(RawMessageErrorKind::InvalidSubject));
2655 }
2656 }
2657
2658 let subject = format!("STREAM.MSG.GET.{}", &self.stream_name);
2659
2660 let response: Response<GetRawMessage> = self
2661 .context
2662 .request(subject, &self.request)
2663 .map_err(|err| RawMessageError::with_source(RawMessageErrorKind::Other, err))
2664 .await?;
2665
2666 match response {
2667 Response::Err { error } => {
2668 if error.error_code() == ErrorCode::NO_MESSAGE_FOUND {
2669 Err(RawMessageError::new(RawMessageErrorKind::NoMessageFound))
2670 } else {
2671 Err(RawMessageError::new(RawMessageErrorKind::JetStream(error)))
2672 }
2673 }
2674 Response::Ok(value) => R::from_raw_message(value.message),
2675 }
2676 }
2677
2678 pub fn sequence(mut self, seq: u64) -> Self {
2680 self.request.sequence = Some(seq);
2681 self
2682 }
2683
2684 pub fn last_by_subject<S: Into<String>>(mut self, subject: S) -> Self {
2686 self.request.last_by_subject = Some(subject.into());
2687 self
2688 }
2689
2690 pub fn next_by_subject<S: Into<String>>(mut self, subject: S) -> Self {
2692 self.request.next_by_subject = Some(subject.into());
2693 self
2694 }
2695}
2696
2697impl RawMessageBuilder<WithHeaders> {
2698 pub async fn send(self) -> Result<StreamMessage, RawMessageError> {
2700 self.send_internal::<StreamMessage>().await
2701 }
2702}
2703
2704impl RawMessageBuilder<WithoutHeaders> {
2705 pub async fn send(self) -> Result<StreamValue, RawMessageError> {
2707 self.send_internal::<StreamValue>().await
2708 }
2709}
2710
2711#[cfg(test)]
2712mod tests {
2713 use super::*;
2714
2715 #[test]
2716 fn consumer_limits_de() {
2717 let config = Config {
2718 ..Default::default()
2719 };
2720
2721 let roundtrip: Config = {
2722 let ser = serde_json::to_string(&config).unwrap();
2723 serde_json::from_str(&ser).unwrap()
2724 };
2725 assert_eq!(config, roundtrip);
2726 }
2727}