1use std::{
17 collections::{self, HashMap},
18 fmt::{self, Debug, Display},
19 future::IntoFuture,
20 io::{self},
21 pin::Pin,
22 str::FromStr,
23 task::Poll,
24 time::Duration,
25};
26
27use crate::{
28 error::Error, header::HeaderName, is_valid_subject, HeaderMap, HeaderValue, StatusCode,
29};
30use base64::engine::general_purpose::STANDARD;
31use base64::engine::Engine;
32use bytes::Bytes;
33use futures_util::{future::BoxFuture, FutureExt, TryFutureExt};
34use serde::{Deserialize, Deserializer, Serialize};
35use serde_json::json;
36use time::{serde::rfc3339, OffsetDateTime};
37
38use super::{
39 consumer::{self, Consumer, FromConsumer, IntoConsumerConfig},
40 context::{
41 ConsumerInfoError, ConsumerInfoErrorKind, RequestError, RequestErrorKind, StreamsError,
42 StreamsErrorKind,
43 },
44 errors::ErrorCode,
45 is_valid_name,
46 message::{StreamMessage, StreamMessageError},
47 response::Response,
48 Context,
49};
50
51pub type InfoError = RequestError;
52
53#[derive(Clone, Debug, PartialEq)]
54pub enum DirectGetErrorKind {
55 NotFound,
56 InvalidSubject,
57 TimedOut,
58 Request,
59 ErrorResponse(StatusCode, String),
60 Other,
61}
62
63impl Display for DirectGetErrorKind {
64 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65 match self {
66 Self::InvalidSubject => write!(f, "invalid subject"),
67 Self::NotFound => write!(f, "message not found"),
68 Self::ErrorResponse(status, description) => {
69 write!(f, "unable to get message: {status} {description}")
70 }
71 Self::Other => write!(f, "error getting message"),
72 Self::TimedOut => write!(f, "timed out"),
73 Self::Request => write!(f, "request failed"),
74 }
75 }
76}
77
78pub type DirectGetError = Error<DirectGetErrorKind>;
79
80impl From<crate::RequestError> for DirectGetError {
81 fn from(err: crate::RequestError) -> Self {
82 match err.kind() {
83 crate::RequestErrorKind::TimedOut => DirectGetError::new(DirectGetErrorKind::TimedOut),
84 crate::RequestErrorKind::NoResponders => {
85 DirectGetError::new(DirectGetErrorKind::ErrorResponse(
86 StatusCode::NO_RESPONDERS,
87 "no responders".to_string(),
88 ))
89 }
90 crate::RequestErrorKind::Other => {
91 DirectGetError::with_source(DirectGetErrorKind::Other, err)
92 }
93 }
94 }
95}
96
97impl From<serde_json::Error> for DirectGetError {
98 fn from(err: serde_json::Error) -> Self {
99 DirectGetError::with_source(DirectGetErrorKind::Other, err)
100 }
101}
102
103impl From<StreamMessageError> for DirectGetError {
104 fn from(err: StreamMessageError) -> Self {
105 DirectGetError::with_source(DirectGetErrorKind::Other, err)
106 }
107}
108
109#[derive(Clone, Debug, PartialEq)]
110pub enum DeleteMessageErrorKind {
111 Request,
112 TimedOut,
113 JetStream(super::errors::Error),
114}
115
116impl Display for DeleteMessageErrorKind {
117 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
118 match self {
119 Self::Request => write!(f, "request failed"),
120 Self::TimedOut => write!(f, "timed out"),
121 Self::JetStream(err) => write!(f, "JetStream error: {err}"),
122 }
123 }
124}
125
126pub type DeleteMessageError = Error<DeleteMessageErrorKind>;
127
128#[derive(Debug, Clone)]
132pub struct Stream<T = Info> {
133 pub(crate) info: T,
134 pub(crate) context: Context,
135 pub(crate) name: String,
136}
137
138impl Stream<Info> {
139 pub async fn info(&mut self) -> Result<&Info, InfoError> {
157 let subject = format!("STREAM.INFO.{}", self.info.config.name);
158
159 match self.context.request(subject, &json!({})).await? {
160 Response::Ok::<Info>(info) => {
161 self.info = info;
162 Ok(&self.info)
163 }
164 Response::Err { error } => Err(error.into()),
165 }
166 }
167
168 pub fn cached_info(&self) -> &Info {
187 &self.info
188 }
189}
190
191impl<I> Stream<I> {
192 pub async fn get_info(&self) -> Result<Info, InfoError> {
195 let subject = format!("STREAM.INFO.{}", self.name);
196
197 match self.context.request(subject, &json!({})).await? {
198 Response::Ok::<Info>(info) => Ok(info),
199 Response::Err { error } => Err(error.into()),
200 }
201 }
202
203 pub async fn info_with_subjects<F: AsRef<str>>(
226 &self,
227 subjects_filter: F,
228 ) -> Result<InfoWithSubjects, InfoError> {
229 let subjects_filter = subjects_filter.as_ref().to_string();
230 let info = stream_info_with_details(
232 self.context.clone(),
233 self.name.clone(),
234 0,
235 false,
236 subjects_filter.clone(),
237 )
238 .await?;
239
240 Ok(InfoWithSubjects::new(
241 self.context.clone(),
242 info,
243 subjects_filter,
244 ))
245 }
246
247 pub fn info_builder(&self) -> StreamInfoBuilder {
273 StreamInfoBuilder::new(self.context.clone(), self.name.clone())
274 }
275
276 pub fn direct_get_builder(&self) -> DirectGetBuilder<WithHeaders> {
296 DirectGetBuilder::new(self.context.clone(), self.name.clone())
297 }
298
299 pub async fn direct_get_next_for_subject<T: Into<String>>(
334 &self,
335 subject: T,
336 sequence: Option<u64>,
337 ) -> Result<StreamMessage, DirectGetError> {
338 let subject_str = subject.into();
339 if !is_valid_subject(&subject_str) {
340 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject));
341 }
342
343 let mut builder = self.direct_get_builder().next_by_subject(subject_str);
344 if let Some(seq) = sequence {
345 builder = builder.sequence(seq);
346 }
347
348 builder.send().await
349 }
350
351 pub async fn direct_get_first_for_subject<T: Into<String>>(
383 &self,
384 subject: T,
385 ) -> Result<StreamMessage, DirectGetError> {
386 let subject_str = subject.into();
387 if !is_valid_subject(&subject_str) {
388 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject));
389 }
390
391 self.direct_get_builder()
392 .next_by_subject(subject_str)
393 .send()
394 .await
395 }
396
397 pub async fn direct_get(&self, sequence: u64) -> Result<StreamMessage, DirectGetError> {
429 self.direct_get_builder().sequence(sequence).send().await
430 }
431
432 pub async fn direct_get_last_for_subject<T: Into<String>>(
464 &self,
465 subject: T,
466 ) -> Result<StreamMessage, DirectGetError> {
467 self.direct_get_builder()
468 .last_by_subject(subject)
469 .send()
470 .await
471 }
472 pub fn raw_message_builder(&self) -> RawMessageBuilder<WithHeaders> {
492 RawMessageBuilder::new(self.context.clone(), self.name.clone())
493 }
494
495 pub async fn get_raw_message(&self, sequence: u64) -> Result<StreamMessage, RawMessageError> {
525 self.raw_message_builder().sequence(sequence).send().await
526 }
527
528 pub async fn get_first_raw_message_by_subject<T: AsRef<str>>(
552 &self,
553 subject: T,
554 sequence: u64,
555 ) -> Result<StreamMessage, RawMessageError> {
556 self.raw_message_builder()
557 .sequence(sequence)
558 .next_by_subject(subject.as_ref().to_string())
559 .send()
560 .await
561 }
562
563 pub async fn get_next_raw_message_by_subject<T: AsRef<str>>(
587 &self,
588 subject: T,
589 ) -> Result<StreamMessage, RawMessageError> {
590 self.raw_message_builder()
591 .next_by_subject(subject.as_ref().to_string())
592 .send()
593 .await
594 }
595
596 pub async fn get_last_raw_message_by_subject(
620 &self,
621 stream_subject: &str,
622 ) -> Result<StreamMessage, LastRawMessageError> {
623 self.raw_message_builder()
624 .last_by_subject(stream_subject.to_string())
625 .send()
626 .await
627 }
628
629 pub async fn delete_message(&self, sequence: u64) -> Result<bool, DeleteMessageError> {
653 let subject = format!("STREAM.MSG.DELETE.{}", &self.name);
654 let payload = json!({
655 "seq": sequence,
656 });
657
658 let response: Response<DeleteStatus> = self
659 .context
660 .request(subject, &payload)
661 .map_err(|err| match err.kind() {
662 RequestErrorKind::TimedOut => {
663 DeleteMessageError::new(DeleteMessageErrorKind::TimedOut)
664 }
665 _ => DeleteMessageError::with_source(DeleteMessageErrorKind::Request, err),
666 })
667 .await?;
668
669 match response {
670 Response::Err { error } => Err(DeleteMessageError::new(
671 DeleteMessageErrorKind::JetStream(error),
672 )),
673 Response::Ok(value) => Ok(value.success),
674 }
675 }
676
677 pub fn purge(&self) -> Purge<No, No> {
693 Purge::build(self)
694 }
695
696 #[deprecated(
713 since = "0.25.0",
714 note = "Overloads have been replaced with an into_future based builder. Use Stream::purge().filter(subject) instead."
715 )]
716 pub async fn purge_subject<T>(&self, subject: T) -> Result<PurgeResponse, PurgeError>
717 where
718 T: Into<String>,
719 {
720 self.purge().filter(subject).await
721 }
722
723 pub async fn create_consumer<C: IntoConsumerConfig + FromConsumer>(
747 &self,
748 config: C,
749 ) -> Result<Consumer<C>, ConsumerError> {
750 self.context
751 .create_consumer_on_stream(config, self.name.clone())
752 .await
753 }
754
755 #[cfg(feature = "server_2_10")]
779 pub async fn update_consumer<C: IntoConsumerConfig + FromConsumer>(
780 &self,
781 config: C,
782 ) -> Result<Consumer<C>, ConsumerUpdateError> {
783 self.context
784 .update_consumer_on_stream(config, self.name.clone())
785 .await
786 }
787
788 #[cfg(feature = "server_2_10")]
813 pub async fn create_consumer_strict<C: IntoConsumerConfig + FromConsumer>(
814 &self,
815 config: C,
816 ) -> Result<Consumer<C>, ConsumerCreateStrictError> {
817 self.context
818 .create_consumer_strict_on_stream(config, self.name.clone())
819 .await
820 }
821
822 pub async fn consumer_info<T: AsRef<str>>(
839 &self,
840 name: T,
841 ) -> Result<consumer::Info, ConsumerInfoError> {
842 let name = name.as_ref();
843
844 if !is_valid_name(name) {
845 return Err(ConsumerInfoError::new(ConsumerInfoErrorKind::InvalidName));
846 }
847
848 let subject = format!("CONSUMER.INFO.{}.{}", self.name, name);
849
850 match self.context.request(subject, &json!({})).await? {
851 Response::Ok(info) => Ok(info),
852 Response::Err { error } => Err(error.into()),
853 }
854 }
855
856 pub async fn get_consumer<T: FromConsumer + IntoConsumerConfig>(
875 &self,
876 name: &str,
877 ) -> Result<Consumer<T>, crate::Error> {
878 let info = self.consumer_info(name).await?;
879
880 Ok(Consumer::new(
881 T::try_from_consumer_config(info.config.clone())?,
882 info,
883 self.context.clone(),
884 ))
885 }
886
887 pub async fn get_or_create_consumer<T: FromConsumer + IntoConsumerConfig>(
915 &self,
916 name: &str,
917 config: T,
918 ) -> Result<Consumer<T>, ConsumerError> {
919 let subject = format!("CONSUMER.INFO.{}.{}", self.name, name);
920
921 match self.context.request(subject, &json!({})).await? {
922 Response::Err { error } if error.code() == 404 => self.create_consumer(config).await,
923 Response::Err { error } => Err(error.into()),
924 Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
925 T::try_from_consumer_config(info.config.clone()).map_err(|err| {
926 ConsumerError::with_source(ConsumerErrorKind::InvalidConsumerType, err)
927 })?,
928 info,
929 self.context.clone(),
930 )),
931 }
932 }
933
934 pub async fn delete_consumer(&self, name: &str) -> Result<DeleteStatus, ConsumerError> {
955 let subject = format!("CONSUMER.DELETE.{}.{}", self.name, name);
956
957 match self.context.request(subject, &json!({})).await? {
958 Response::Ok(delete_status) => Ok(delete_status),
959 Response::Err { error } => Err(error.into()),
960 }
961 }
962
963 #[cfg(feature = "server_2_11")]
987 pub async fn pause_consumer(
988 &self,
989 name: &str,
990 pause_until: OffsetDateTime,
991 ) -> Result<PauseResponse, ConsumerError> {
992 self.request_pause_consumer(name, Some(pause_until)).await
993 }
994
995 #[cfg(feature = "server_2_11")]
1016 pub async fn resume_consumer(&self, name: &str) -> Result<PauseResponse, ConsumerError> {
1017 self.request_pause_consumer(name, None).await
1018 }
1019
1020 #[cfg(feature = "server_2_11")]
1021 async fn request_pause_consumer(
1022 &self,
1023 name: &str,
1024 pause_until: Option<OffsetDateTime>,
1025 ) -> Result<PauseResponse, ConsumerError> {
1026 let subject = format!("CONSUMER.PAUSE.{}.{}", self.name, name);
1027 let payload = &PauseResumeConsumerRequest { pause_until };
1028
1029 match self.context.request(subject, payload).await? {
1030 Response::Ok::<PauseResponse>(resp) => Ok(resp),
1031 Response::Err { error } => Err(error.into()),
1032 }
1033 }
1034
1035 pub fn consumer_names(&self) -> ConsumerNames {
1054 ConsumerNames {
1055 context: self.context.clone(),
1056 stream: self.name.clone(),
1057 offset: 0,
1058 page_request: None,
1059 consumers: Vec::new(),
1060 done: false,
1061 }
1062 }
1063
1064 pub fn consumers(&self) -> Consumers {
1083 Consumers {
1084 context: self.context.clone(),
1085 stream: self.name.clone(),
1086 offset: 0,
1087 page_request: None,
1088 consumers: Vec::new(),
1089 done: false,
1090 }
1091 }
1092}
1093
1094pub struct StreamInfoBuilder {
1095 pub(crate) context: Context,
1096 pub(crate) name: String,
1097 pub(crate) deleted: bool,
1098 pub(crate) subject: String,
1099}
1100
1101impl StreamInfoBuilder {
1102 fn new(context: Context, name: String) -> Self {
1103 Self {
1104 context,
1105 name,
1106 deleted: false,
1107 subject: "".to_string(),
1108 }
1109 }
1110
1111 pub fn with_deleted(mut self, deleted: bool) -> Self {
1112 self.deleted = deleted;
1113 self
1114 }
1115
1116 pub fn subjects<S: Into<String>>(mut self, subject: S) -> Self {
1117 self.subject = subject.into();
1118 self
1119 }
1120
1121 pub async fn fetch(self) -> Result<InfoWithSubjects, InfoError> {
1122 let info = stream_info_with_details(
1123 self.context.clone(),
1124 self.name.clone(),
1125 0,
1126 self.deleted,
1127 self.subject.clone(),
1128 )
1129 .await?;
1130
1131 Ok(InfoWithSubjects::new(self.context, info, self.subject))
1132 }
1133}
1134
1135#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
1139pub struct Config {
1140 pub name: String,
1142 pub max_bytes: i64,
1144 #[serde(rename = "max_msgs")]
1146 pub max_messages: i64,
1147 #[serde(rename = "max_msgs_per_subject")]
1149 pub max_messages_per_subject: i64,
1150 pub discard: DiscardPolicy,
1153 #[serde(default, skip_serializing_if = "is_default")]
1155 pub discard_new_per_subject: bool,
1156 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1159 pub subjects: Vec<String>,
1160 pub retention: RetentionPolicy,
1162 pub max_consumers: i32,
1164 #[serde(with = "serde_nanos")]
1166 pub max_age: Duration,
1167 #[serde(default, skip_serializing_if = "is_default", rename = "max_msg_size")]
1169 pub max_message_size: i32,
1170 pub storage: StorageType,
1172 pub num_replicas: usize,
1174 #[serde(default, skip_serializing_if = "is_default")]
1176 pub no_ack: bool,
1177 #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
1179 pub duplicate_window: Duration,
1180 #[serde(default, skip_serializing_if = "is_default")]
1182 pub template_owner: String,
1183 #[serde(default, skip_serializing_if = "is_default")]
1185 pub sealed: bool,
1186 #[serde(default, skip_serializing_if = "is_default")]
1188 pub description: Option<String>,
1189 #[serde(
1190 default,
1191 rename = "allow_rollup_hdrs",
1192 skip_serializing_if = "is_default"
1193 )]
1194 pub allow_rollup: bool,
1196 #[serde(default, skip_serializing_if = "is_default")]
1197 pub deny_delete: bool,
1199 #[serde(default, skip_serializing_if = "is_default")]
1201 pub deny_purge: bool,
1202
1203 #[serde(default, skip_serializing_if = "is_default")]
1205 pub republish: Option<Republish>,
1206
1207 #[serde(default, skip_serializing_if = "is_default")]
1210 pub allow_direct: bool,
1211
1212 #[serde(default, skip_serializing_if = "is_default")]
1214 pub mirror_direct: bool,
1215
1216 #[serde(default, skip_serializing_if = "Option::is_none")]
1218 pub mirror: Option<Source>,
1219
1220 #[serde(default, skip_serializing_if = "Option::is_none")]
1222 pub sources: Option<Vec<Source>>,
1223
1224 #[cfg(feature = "server_2_10")]
1225 #[serde(default, skip_serializing_if = "is_default")]
1227 pub metadata: HashMap<String, String>,
1228
1229 #[cfg(feature = "server_2_10")]
1230 #[serde(default, skip_serializing_if = "Option::is_none")]
1232 pub subject_transform: Option<SubjectTransform>,
1233
1234 #[cfg(feature = "server_2_10")]
1235 #[serde(default, skip_serializing_if = "Option::is_none")]
1240 pub compression: Option<Compression>,
1241 #[cfg(feature = "server_2_10")]
1242 #[serde(default, deserialize_with = "default_consumer_limits_as_none")]
1244 pub consumer_limits: Option<ConsumerLimits>,
1245
1246 #[cfg(feature = "server_2_10")]
1247 #[serde(default, skip_serializing_if = "Option::is_none", rename = "first_seq")]
1249 pub first_sequence: Option<u64>,
1250
1251 #[serde(default, skip_serializing_if = "Option::is_none")]
1253 pub placement: Option<Placement>,
1254
1255 #[serde(default, skip_serializing_if = "Option::is_none")]
1257 pub persist_mode: Option<PersistenceMode>,
1258
1259 #[cfg(feature = "server_2_11")]
1261 #[serde(
1262 default,
1263 with = "rfc3339::option",
1264 skip_serializing_if = "Option::is_none"
1265 )]
1266 pub pause_until: Option<OffsetDateTime>,
1267
1268 #[cfg(feature = "server_2_11")]
1270 #[serde(default, skip_serializing_if = "is_default", rename = "allow_msg_ttl")]
1271 pub allow_message_ttl: bool,
1272
1273 #[cfg(feature = "server_2_11")]
1276 #[serde(default, skip_serializing_if = "Option::is_none", with = "serde_nanos")]
1277 pub subject_delete_marker_ttl: Option<Duration>,
1278
1279 #[cfg(feature = "server_2_12")]
1281 #[serde(default, skip_serializing_if = "is_default", rename = "allow_atomic")]
1282 pub allow_atomic_publish: bool,
1283
1284 #[cfg(feature = "server_2_12")]
1286 #[serde(
1287 default,
1288 skip_serializing_if = "is_default",
1289 rename = "allow_msg_schedules"
1290 )]
1291 pub allow_message_schedules: bool,
1292
1293 #[cfg(feature = "server_2_12")]
1295 #[serde(
1296 default,
1297 skip_serializing_if = "is_default",
1298 rename = "allow_msg_counter"
1299 )]
1300 pub allow_message_counter: bool,
1301}
1302
1303impl From<&Config> for Config {
1304 fn from(sc: &Config) -> Config {
1305 sc.clone()
1306 }
1307}
1308
1309impl From<&str> for Config {
1310 fn from(s: &str) -> Config {
1311 Config {
1312 name: s.to_string(),
1313 ..Default::default()
1314 }
1315 }
1316}
1317
1318#[cfg(feature = "server_2_10")]
1319fn default_consumer_limits_as_none<'de, D>(
1320 deserializer: D,
1321) -> Result<Option<ConsumerLimits>, D::Error>
1322where
1323 D: Deserializer<'de>,
1324{
1325 let consumer_limits = Option::<ConsumerLimits>::deserialize(deserializer)?;
1326 if let Some(cl) = consumer_limits {
1327 if cl == ConsumerLimits::default() {
1328 Ok(None)
1329 } else {
1330 Ok(Some(cl))
1331 }
1332 } else {
1333 Ok(None)
1334 }
1335}
1336#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Default)]
1337pub struct ConsumerLimits {
1338 #[serde(default, with = "serde_nanos")]
1340 pub inactive_threshold: std::time::Duration,
1341 #[serde(default)]
1343 pub max_ack_pending: i64,
1344}
1345
1346#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1347pub enum Compression {
1348 #[serde(rename = "s2")]
1349 S2,
1350 #[serde(rename = "none")]
1351 None,
1352}
1353
1354#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1356pub struct SubjectTransform {
1357 #[serde(rename = "src")]
1358 pub source: String,
1359
1360 #[serde(rename = "dest")]
1361 pub destination: String,
1362}
1363
1364#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1366pub struct Republish {
1367 #[serde(rename = "src")]
1369 pub source: String,
1370 #[serde(rename = "dest")]
1372 pub destination: String,
1373 #[serde(default)]
1375 pub headers_only: bool,
1376}
1377
1378#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
1380pub struct Placement {
1381 #[serde(default, skip_serializing_if = "is_default")]
1383 pub cluster: Option<String>,
1384 #[serde(default, skip_serializing_if = "is_default")]
1386 pub tags: Vec<String>,
1387}
1388
1389#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1392#[repr(u8)]
1393pub enum DiscardPolicy {
1394 #[default]
1396 #[serde(rename = "old")]
1397 Old = 0,
1398 #[serde(rename = "new")]
1400 New = 1,
1401}
1402
1403#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1405#[repr(u8)]
1406pub enum RetentionPolicy {
1407 #[default]
1410 #[serde(rename = "limits")]
1411 Limits = 0,
1412 #[serde(rename = "interest")]
1414 Interest = 1,
1415 #[serde(rename = "workqueue")]
1417 WorkQueue = 2,
1418}
1419
1420#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1422#[repr(u8)]
1423pub enum StorageType {
1424 #[default]
1426 #[serde(rename = "file")]
1427 File = 0,
1428 #[serde(rename = "memory")]
1430 Memory = 1,
1431}
1432
1433#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1435#[repr(u8)]
1436pub enum PersistenceMode {
1437 #[default]
1439 #[serde(rename = "default")]
1440 Default = 0,
1441 #[serde(rename = "async")]
1443 Async = 1,
1444}
1445
1446async fn stream_info_with_details(
1447 context: Context,
1448 stream: String,
1449 offset: usize,
1450 deleted_details: bool,
1451 subjects_filter: String,
1452) -> Result<Info, InfoError> {
1453 let subject = format!("STREAM.INFO.{stream}");
1454
1455 let payload = StreamInfoRequest {
1456 offset,
1457 deleted_details,
1458 subjects_filter,
1459 };
1460
1461 let response: Response<Info> = context.request(subject, &payload).await?;
1462
1463 match response {
1464 Response::Ok(info) => Ok(info),
1465 Response::Err { error } => Err(error.into()),
1466 }
1467}
1468
1469type InfoRequest = BoxFuture<'static, Result<Info, InfoError>>;
1470
1471#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
1472pub struct StreamInfoRequest {
1473 offset: usize,
1474 deleted_details: bool,
1475 subjects_filter: String,
1476}
1477
1478pub struct InfoWithSubjects {
1479 stream: String,
1480 context: Context,
1481 pub info: Info,
1482 offset: usize,
1483 subjects: collections::hash_map::IntoIter<String, usize>,
1484 info_request: Option<InfoRequest>,
1485 subjects_filter: String,
1486 pages_done: bool,
1487}
1488
1489impl InfoWithSubjects {
1490 pub fn new(context: Context, mut info: Info, subject: String) -> Self {
1491 let subjects = info.state.subjects.take().unwrap_or_default();
1492 let name = info.config.name.clone();
1493 InfoWithSubjects {
1494 context,
1495 info,
1496 pages_done: subjects.is_empty(),
1497 offset: subjects.len(),
1498 subjects: subjects.into_iter(),
1499 subjects_filter: subject,
1500 stream: name,
1501 info_request: None,
1502 }
1503 }
1504}
1505
1506impl futures_util::Stream for InfoWithSubjects {
1507 type Item = Result<(String, usize), InfoError>;
1508
1509 fn poll_next(
1510 mut self: Pin<&mut Self>,
1511 cx: &mut std::task::Context<'_>,
1512 ) -> Poll<Option<Self::Item>> {
1513 match self.subjects.next() {
1514 Some((subject, count)) => Poll::Ready(Some(Ok((subject, count)))),
1515 None => {
1516 if self.pages_done {
1518 return Poll::Ready(None);
1519 }
1520 let stream = self.stream.clone();
1521 let context = self.context.clone();
1522 let subjects_filter = self.subjects_filter.clone();
1523 let offset = self.offset;
1524 match self
1525 .info_request
1526 .get_or_insert_with(|| {
1527 Box::pin(stream_info_with_details(
1528 context,
1529 stream,
1530 offset,
1531 false,
1532 subjects_filter,
1533 ))
1534 })
1535 .poll_unpin(cx)
1536 {
1537 Poll::Ready(resp) => match resp {
1538 Ok(info) => {
1539 let subjects = info.state.subjects.clone();
1540 self.offset += subjects.as_ref().map_or_else(|| 0, |s| s.len());
1541 self.info_request = None;
1542 let subjects = subjects.unwrap_or_default();
1543 self.subjects = info.state.subjects.unwrap_or_default().into_iter();
1544 let total = info.paged_info.map(|info| info.total).unwrap_or(0);
1545 if total <= self.offset || subjects.is_empty() {
1546 self.pages_done = true;
1547 }
1548 match self.subjects.next() {
1549 Some((subject, count)) => Poll::Ready(Some(Ok((subject, count)))),
1550 None => Poll::Ready(None),
1551 }
1552 }
1553 Err(err) => Poll::Ready(Some(Err(err))),
1554 },
1555 Poll::Pending => Poll::Pending,
1556 }
1557 }
1558 }
1559 }
1560}
1561
1562#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1564pub struct Info {
1565 pub config: Config,
1567 #[serde(with = "rfc3339")]
1569 pub created: time::OffsetDateTime,
1570 pub state: State,
1572 pub cluster: Option<ClusterInfo>,
1574 #[serde(default)]
1576 pub mirror: Option<SourceInfo>,
1577 #[serde(default)]
1579 pub sources: Vec<SourceInfo>,
1580 #[serde(flatten)]
1581 paged_info: Option<PagedInfo>,
1582}
1583
1584#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1585pub struct PagedInfo {
1586 offset: usize,
1587 total: usize,
1588 limit: usize,
1589}
1590
1591#[derive(Deserialize)]
1592pub struct DeleteStatus {
1593 pub success: bool,
1594}
1595
1596#[cfg(feature = "server_2_11")]
1597#[derive(Deserialize)]
1598pub struct PauseResponse {
1599 pub paused: bool,
1600 #[serde(with = "rfc3339")]
1601 pub pause_until: OffsetDateTime,
1602 #[serde(default, with = "serde_nanos")]
1603 pub pause_remaining: Option<Duration>,
1604}
1605
1606#[cfg(feature = "server_2_11")]
1607#[derive(Serialize, Debug)]
1608struct PauseResumeConsumerRequest {
1609 #[serde(with = "rfc3339::option", skip_serializing_if = "Option::is_none")]
1610 pause_until: Option<OffsetDateTime>,
1611}
1612
1613#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1615pub struct State {
1616 pub messages: u64,
1618 pub bytes: u64,
1620 #[serde(rename = "first_seq")]
1622 pub first_sequence: u64,
1623 #[serde(with = "rfc3339", rename = "first_ts")]
1625 pub first_timestamp: time::OffsetDateTime,
1626 #[serde(rename = "last_seq")]
1628 pub last_sequence: u64,
1629 #[serde(with = "rfc3339", rename = "last_ts")]
1631 pub last_timestamp: time::OffsetDateTime,
1632 pub consumer_count: usize,
1634 #[serde(default, rename = "num_subjects")]
1636 pub subjects_count: u64,
1637 #[serde(default, rename = "num_deleted")]
1639 pub deleted_count: Option<u64>,
1640 #[serde(default)]
1643 pub deleted: Option<Vec<u64>>,
1644
1645 pub(crate) subjects: Option<HashMap<String, usize>>,
1646}
1647
1648#[derive(Debug, Serialize, Deserialize, Clone)]
1650pub struct RawMessage {
1651 #[serde(rename = "subject")]
1653 pub subject: String,
1654
1655 #[serde(rename = "seq")]
1657 pub sequence: u64,
1658
1659 #[serde(default, rename = "data")]
1661 pub payload: String,
1662
1663 #[serde(default, rename = "hdrs")]
1665 pub headers: Option<String>,
1666
1667 #[serde(rename = "time", with = "rfc3339")]
1669 pub time: time::OffsetDateTime,
1670}
1671
1672impl TryFrom<RawMessage> for StreamMessage {
1673 type Error = crate::Error;
1674
1675 fn try_from(value: RawMessage) -> Result<Self, Self::Error> {
1676 let decoded_payload = STANDARD
1677 .decode(value.payload)
1678 .map_err(|err| Box::new(std::io::Error::other(err)))?;
1679 let decoded_headers = value
1680 .headers
1681 .map(|header| STANDARD.decode(header))
1682 .map_or(Ok(None), |v| v.map(Some))?;
1683
1684 let (headers, _, _) = decoded_headers
1685 .map_or_else(|| Ok((HeaderMap::new(), None, None)), |h| parse_headers(&h))?;
1686
1687 Ok(StreamMessage {
1688 subject: value.subject.into(),
1689 payload: decoded_payload.into(),
1690 headers,
1691 sequence: value.sequence,
1692 time: value.time,
1693 })
1694 }
1695}
1696
1697fn is_continuation(c: char) -> bool {
1698 c == ' ' || c == '\t'
1699}
1700const HEADER_LINE: &str = "NATS/1.0";
1701
1702#[allow(clippy::type_complexity)]
1703fn parse_headers(
1704 buf: &[u8],
1705) -> Result<(HeaderMap, Option<StatusCode>, Option<String>), crate::Error> {
1706 let mut headers = HeaderMap::new();
1707 let mut maybe_status: Option<StatusCode> = None;
1708 let mut maybe_description: Option<String> = None;
1709 let mut lines = if let Ok(line) = std::str::from_utf8(buf) {
1710 line.lines().peekable()
1711 } else {
1712 return Err(Box::new(std::io::Error::other("invalid header")));
1713 };
1714
1715 if let Some(line) = lines.next() {
1716 let line = line
1717 .strip_prefix(HEADER_LINE)
1718 .ok_or_else(|| {
1719 Box::new(std::io::Error::other(
1720 "version line does not start with NATS/1.0",
1721 ))
1722 })?
1723 .trim();
1724
1725 match line.split_once(' ') {
1726 Some((status, description)) => {
1727 if !status.is_empty() {
1728 maybe_status = Some(status.parse()?);
1729 }
1730
1731 if !description.is_empty() {
1732 maybe_description = Some(description.trim().to_string());
1733 }
1734 }
1735 None => {
1736 if !line.is_empty() {
1737 maybe_status = Some(line.parse()?);
1738 }
1739 }
1740 }
1741 } else {
1742 return Err(Box::new(std::io::Error::other(
1743 "expected header information not found",
1744 )));
1745 };
1746
1747 while let Some(line) = lines.next() {
1748 if line.is_empty() {
1749 continue;
1750 }
1751
1752 if let Some((k, v)) = line.split_once(':').to_owned() {
1753 let mut s = String::from(v.trim());
1754 while let Some(v) = lines.next_if(|s| s.starts_with(is_continuation)).to_owned() {
1755 s.push(' ');
1756 s.push_str(v.trim());
1757 }
1758
1759 headers.insert(
1760 HeaderName::from_str(k)?,
1761 HeaderValue::from_str(&s).map_err(|err| Box::new(io::Error::other(err)))?,
1762 );
1763 } else {
1764 return Err(Box::new(std::io::Error::other("malformed header line")));
1765 }
1766 }
1767
1768 if headers.is_empty() {
1769 Ok((HeaderMap::new(), maybe_status, maybe_description))
1770 } else {
1771 Ok((headers, maybe_status, maybe_description))
1772 }
1773}
1774
1775#[derive(Debug, Serialize, Deserialize, Clone)]
1776struct GetRawMessage {
1777 pub(crate) message: RawMessage,
1778}
1779
1780fn is_default<T: Default + Eq>(t: &T) -> bool {
1781 t == &T::default()
1782}
1783#[derive(Debug, Default, Deserialize, Clone, PartialEq, Eq)]
1785pub struct ClusterInfo {
1786 #[serde(default)]
1788 pub name: Option<String>,
1789 #[serde(default)]
1791 pub raft_group: Option<String>,
1792 #[serde(default)]
1794 pub leader: Option<String>,
1795 #[serde(default, with = "rfc3339::option")]
1797 pub leader_since: Option<OffsetDateTime>,
1798 #[cfg(feature = "server_2_12")]
1800 #[serde(default)]
1801 pub system_account: bool,
1803 #[cfg(feature = "server_2_12")]
1804 #[serde(default)]
1806 pub traffic_account: Option<String>,
1807 #[serde(default)]
1809 pub replicas: Vec<PeerInfo>,
1810}
1811
1812#[derive(Debug, Default, Deserialize, Clone, PartialEq, Eq)]
1814pub struct PeerInfo {
1815 pub name: String,
1817 pub current: bool,
1819 #[serde(with = "serde_nanos")]
1821 pub active: Duration,
1822 #[serde(default)]
1824 pub offline: bool,
1825 pub lag: Option<u64>,
1827}
1828
1829#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
1830pub struct SourceInfo {
1831 pub name: String,
1833 pub lag: u64,
1835 #[serde(deserialize_with = "negative_duration_as_none")]
1837 pub active: Option<std::time::Duration>,
1838 #[serde(default)]
1840 pub filter_subject: Option<String>,
1841 #[serde(default)]
1843 pub subject_transform_dest: Option<String>,
1844 #[serde(default)]
1846 pub subject_transforms: Vec<SubjectTransform>,
1847}
1848
1849fn negative_duration_as_none<'de, D>(
1850 deserializer: D,
1851) -> Result<Option<std::time::Duration>, D::Error>
1852where
1853 D: Deserializer<'de>,
1854{
1855 let n = i64::deserialize(deserializer)?;
1856 if n.is_negative() {
1857 Ok(None)
1858 } else {
1859 Ok(Some(std::time::Duration::from_nanos(n as u64)))
1860 }
1861}
1862
1863#[derive(Debug, Deserialize, Clone, Copy)]
1865pub struct PurgeResponse {
1866 pub success: bool,
1868 pub purged: u64,
1870}
1871#[derive(Default, Debug, Serialize, Clone)]
1873pub struct PurgeRequest {
1874 #[serde(default, rename = "seq", skip_serializing_if = "is_default")]
1876 pub sequence: Option<u64>,
1877
1878 #[serde(default, skip_serializing_if = "is_default")]
1880 pub filter: Option<String>,
1881
1882 #[serde(default, skip_serializing_if = "is_default")]
1884 pub keep: Option<u64>,
1885}
1886
1887#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
1888pub struct Source {
1889 pub name: String,
1891 #[serde(default, rename = "opt_start_seq", skip_serializing_if = "is_default")]
1893 pub start_sequence: Option<u64>,
1894 #[serde(
1895 default,
1896 rename = "opt_start_time",
1897 skip_serializing_if = "is_default",
1898 with = "rfc3339::option"
1899 )]
1900 pub start_time: Option<OffsetDateTime>,
1902 #[serde(default, skip_serializing_if = "is_default")]
1904 pub filter_subject: Option<String>,
1905 #[serde(default, skip_serializing_if = "Option::is_none")]
1907 pub external: Option<External>,
1908 #[serde(default, skip_serializing_if = "is_default")]
1910 pub domain: Option<String>,
1911 #[cfg(feature = "server_2_10")]
1913 #[serde(default, skip_serializing_if = "is_default")]
1914 pub subject_transforms: Vec<SubjectTransform>,
1915}
1916
1917#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
1918pub struct External {
1919 #[serde(rename = "api")]
1921 pub api_prefix: String,
1922 #[serde(rename = "deliver", skip_serializing_if = "is_default")]
1924 pub delivery_prefix: Option<String>,
1925}
1926
1927use std::marker::PhantomData;
1928
1929#[derive(Debug, Default)]
1930pub struct Yes;
1931#[derive(Debug, Default)]
1932pub struct No;
1933
1934pub trait ToAssign: Debug {}
1935
1936impl ToAssign for Yes {}
1937impl ToAssign for No {}
1938
1939#[derive(Debug)]
1940pub struct Purge<SEQUENCE, KEEP>
1941where
1942 SEQUENCE: ToAssign,
1943 KEEP: ToAssign,
1944{
1945 inner: PurgeRequest,
1946 sequence_set: PhantomData<SEQUENCE>,
1947 keep_set: PhantomData<KEEP>,
1948 context: Context,
1949 stream_name: String,
1950}
1951
1952impl<SEQUENCE, KEEP> Purge<SEQUENCE, KEEP>
1953where
1954 SEQUENCE: ToAssign,
1955 KEEP: ToAssign,
1956{
1957 pub fn filter<T: Into<String>>(mut self, filter: T) -> Purge<SEQUENCE, KEEP> {
1959 self.inner.filter = Some(filter.into());
1960 self
1961 }
1962}
1963
1964impl Purge<No, No> {
1965 pub(crate) fn build<I>(stream: &Stream<I>) -> Purge<No, No> {
1966 Purge {
1967 context: stream.context.clone(),
1968 stream_name: stream.name.clone(),
1969 inner: Default::default(),
1970 sequence_set: PhantomData {},
1971 keep_set: PhantomData {},
1972 }
1973 }
1974}
1975
1976impl<KEEP> Purge<No, KEEP>
1977where
1978 KEEP: ToAssign,
1979{
1980 pub fn keep(self, keep: u64) -> Purge<No, Yes> {
1983 Purge {
1984 context: self.context.clone(),
1985 stream_name: self.stream_name.clone(),
1986 sequence_set: PhantomData {},
1987 keep_set: PhantomData {},
1988 inner: PurgeRequest {
1989 keep: Some(keep),
1990 ..self.inner
1991 },
1992 }
1993 }
1994}
1995impl<SEQUENCE> Purge<SEQUENCE, No>
1996where
1997 SEQUENCE: ToAssign,
1998{
1999 pub fn sequence(self, sequence: u64) -> Purge<Yes, No> {
2002 Purge {
2003 context: self.context.clone(),
2004 stream_name: self.stream_name.clone(),
2005 sequence_set: PhantomData {},
2006 keep_set: PhantomData {},
2007 inner: PurgeRequest {
2008 sequence: Some(sequence),
2009 ..self.inner
2010 },
2011 }
2012 }
2013}
2014
2015#[derive(Clone, Debug, PartialEq)]
2016pub enum PurgeErrorKind {
2017 Request,
2018 TimedOut,
2019 JetStream(super::errors::Error),
2020}
2021
2022impl Display for PurgeErrorKind {
2023 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2024 match self {
2025 Self::Request => write!(f, "request failed"),
2026 Self::TimedOut => write!(f, "timed out"),
2027 Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2028 }
2029 }
2030}
2031
2032pub type PurgeError = Error<PurgeErrorKind>;
2033
2034impl<S, K> IntoFuture for Purge<S, K>
2035where
2036 S: ToAssign + std::marker::Send,
2037 K: ToAssign + std::marker::Send,
2038{
2039 type Output = Result<PurgeResponse, PurgeError>;
2040
2041 type IntoFuture = BoxFuture<'static, Result<PurgeResponse, PurgeError>>;
2042
2043 fn into_future(self) -> Self::IntoFuture {
2044 Box::pin(std::future::IntoFuture::into_future(async move {
2045 let request_subject = format!("STREAM.PURGE.{}", self.stream_name);
2046 let response: Response<PurgeResponse> = self
2047 .context
2048 .request(request_subject, &self.inner)
2049 .map_err(|err| match err.kind() {
2050 RequestErrorKind::TimedOut => PurgeError::new(PurgeErrorKind::TimedOut),
2051 _ => PurgeError::with_source(PurgeErrorKind::Request, err),
2052 })
2053 .await?;
2054
2055 match response {
2056 Response::Err { error } => Err(PurgeError::new(PurgeErrorKind::JetStream(error))),
2057 Response::Ok(response) => Ok(response),
2058 }
2059 }))
2060 }
2061}
2062
2063#[derive(Deserialize, Debug)]
2064struct ConsumerPage {
2065 total: usize,
2066 consumers: Option<Vec<String>>,
2067}
2068
2069#[derive(Deserialize, Debug)]
2070struct ConsumerInfoPage {
2071 total: usize,
2072 consumers: Option<Vec<super::consumer::Info>>,
2073}
2074
2075type ConsumerNamesErrorKind = StreamsErrorKind;
2076type ConsumerNamesError = StreamsError;
2077type PageRequest = BoxFuture<'static, Result<ConsumerPage, RequestError>>;
2078
2079pub struct ConsumerNames {
2080 context: Context,
2081 stream: String,
2082 offset: usize,
2083 page_request: Option<PageRequest>,
2084 consumers: Vec<String>,
2085 done: bool,
2086}
2087
2088impl futures_util::Stream for ConsumerNames {
2089 type Item = Result<String, ConsumerNamesError>;
2090
2091 fn poll_next(
2092 mut self: Pin<&mut Self>,
2093 cx: &mut std::task::Context<'_>,
2094 ) -> std::task::Poll<Option<Self::Item>> {
2095 match self.page_request.as_mut() {
2096 Some(page) => match page.try_poll_unpin(cx) {
2097 std::task::Poll::Ready(page) => {
2098 self.page_request = None;
2099 let page = page.map_err(|err| {
2100 ConsumerNamesError::with_source(ConsumerNamesErrorKind::Other, err)
2101 })?;
2102
2103 if let Some(consumers) = page.consumers {
2104 self.offset += consumers.len();
2105 self.consumers = consumers;
2106 if self.offset >= page.total {
2107 self.done = true;
2108 }
2109 match self.consumers.pop() {
2110 Some(stream) => Poll::Ready(Some(Ok(stream))),
2111 None => Poll::Ready(None),
2112 }
2113 } else {
2114 Poll::Ready(None)
2115 }
2116 }
2117 std::task::Poll::Pending => std::task::Poll::Pending,
2118 },
2119 None => {
2120 if let Some(stream) = self.consumers.pop() {
2121 Poll::Ready(Some(Ok(stream)))
2122 } else {
2123 if self.done {
2124 return Poll::Ready(None);
2125 }
2126 let context = self.context.clone();
2127 let offset = self.offset;
2128 let stream = self.stream.clone();
2129 self.page_request = Some(Box::pin(async move {
2130 match context
2131 .request(
2132 format!("CONSUMER.NAMES.{stream}"),
2133 &json!({
2134 "offset": offset,
2135 }),
2136 )
2137 .await?
2138 {
2139 Response::Err { error } => Err(RequestError::with_source(
2140 super::context::RequestErrorKind::Other,
2141 error,
2142 )),
2143 Response::Ok(page) => Ok(page),
2144 }
2145 }));
2146 self.poll_next(cx)
2147 }
2148 }
2149 }
2150 }
2151}
2152
2153pub type ConsumersErrorKind = StreamsErrorKind;
2154pub type ConsumersError = StreamsError;
2155type PageInfoRequest = BoxFuture<'static, Result<ConsumerInfoPage, RequestError>>;
2156
2157pub struct Consumers {
2158 context: Context,
2159 stream: String,
2160 offset: usize,
2161 page_request: Option<PageInfoRequest>,
2162 consumers: Vec<super::consumer::Info>,
2163 done: bool,
2164}
2165
2166impl futures_util::Stream for Consumers {
2167 type Item = Result<super::consumer::Info, ConsumersError>;
2168
2169 fn poll_next(
2170 mut self: Pin<&mut Self>,
2171 cx: &mut std::task::Context<'_>,
2172 ) -> std::task::Poll<Option<Self::Item>> {
2173 match self.page_request.as_mut() {
2174 Some(page) => match page.try_poll_unpin(cx) {
2175 std::task::Poll::Ready(page) => {
2176 self.page_request = None;
2177 let page = page.map_err(|err| {
2178 ConsumersError::with_source(ConsumersErrorKind::Other, err)
2179 })?;
2180 if let Some(consumers) = page.consumers {
2181 self.offset += consumers.len();
2182 self.consumers = consumers;
2183 if self.offset >= page.total {
2184 self.done = true;
2185 }
2186 match self.consumers.pop() {
2187 Some(consumer) => Poll::Ready(Some(Ok(consumer))),
2188 None => Poll::Ready(None),
2189 }
2190 } else {
2191 Poll::Ready(None)
2192 }
2193 }
2194 std::task::Poll::Pending => std::task::Poll::Pending,
2195 },
2196 None => {
2197 if let Some(stream) = self.consumers.pop() {
2198 Poll::Ready(Some(Ok(stream)))
2199 } else {
2200 if self.done {
2201 return Poll::Ready(None);
2202 }
2203 let context = self.context.clone();
2204 let offset = self.offset;
2205 let stream = self.stream.clone();
2206 self.page_request = Some(Box::pin(async move {
2207 match context
2208 .request(
2209 format!("CONSUMER.LIST.{stream}"),
2210 &json!({
2211 "offset": offset,
2212 }),
2213 )
2214 .await?
2215 {
2216 Response::Err { error } => Err(RequestError::with_source(
2217 super::context::RequestErrorKind::Other,
2218 error,
2219 )),
2220 Response::Ok(page) => Ok(page),
2221 }
2222 }));
2223 self.poll_next(cx)
2224 }
2225 }
2226 }
2227 }
2228}
2229
2230#[derive(Clone, Debug, PartialEq)]
2231pub enum LastRawMessageErrorKind {
2232 NoMessageFound,
2233 InvalidSubject,
2234 JetStream(super::errors::Error),
2235 Other,
2236}
2237
2238impl Display for LastRawMessageErrorKind {
2239 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2240 match self {
2241 Self::NoMessageFound => write!(f, "no message found"),
2242 Self::InvalidSubject => write!(f, "invalid subject"),
2243 Self::Other => write!(f, "failed to get last raw message"),
2244 Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2245 }
2246 }
2247}
2248
2249pub type LastRawMessageError = Error<LastRawMessageErrorKind>;
2250pub type RawMessageErrorKind = LastRawMessageErrorKind;
2251pub type RawMessageError = LastRawMessageError;
2252
2253#[derive(Clone, Debug, PartialEq)]
2254pub enum ConsumerErrorKind {
2255 TimedOut,
2257 Request,
2258 InvalidConsumerType,
2259 InvalidName,
2260 JetStream(super::errors::Error),
2261 Other,
2262}
2263
2264impl Display for ConsumerErrorKind {
2265 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2266 match self {
2267 Self::TimedOut => write!(f, "timed out"),
2268 Self::Request => write!(f, "request failed"),
2269 Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2270 Self::Other => write!(f, "consumer error"),
2271 Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2272 Self::InvalidName => write!(f, "invalid consumer name"),
2273 }
2274 }
2275}
2276
2277pub type ConsumerError = Error<ConsumerErrorKind>;
2278
2279#[derive(Clone, Debug, PartialEq)]
2280pub enum ConsumerCreateStrictErrorKind {
2281 TimedOut,
2283 Request,
2284 InvalidConsumerType,
2285 InvalidName,
2286 AlreadyExists,
2287 JetStream(super::errors::Error),
2288 Other,
2289}
2290
2291impl Display for ConsumerCreateStrictErrorKind {
2292 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2293 match self {
2294 Self::TimedOut => write!(f, "timed out"),
2295 Self::Request => write!(f, "request failed"),
2296 Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2297 Self::Other => write!(f, "consumer error"),
2298 Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2299 Self::InvalidName => write!(f, "invalid consumer name"),
2300 Self::AlreadyExists => write!(f, "consumer already exists"),
2301 }
2302 }
2303}
2304
2305pub type ConsumerCreateStrictError = Error<ConsumerCreateStrictErrorKind>;
2306
2307#[derive(Clone, Debug, PartialEq)]
2308pub enum ConsumerUpdateErrorKind {
2309 TimedOut,
2311 Request,
2312 InvalidConsumerType,
2313 InvalidName,
2314 DoesNotExist,
2315 JetStream(super::errors::Error),
2316 Other,
2317}
2318
2319impl Display for ConsumerUpdateErrorKind {
2320 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2321 match self {
2322 Self::TimedOut => write!(f, "timed out"),
2323 Self::Request => write!(f, "request failed"),
2324 Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2325 Self::Other => write!(f, "consumer error"),
2326 Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2327 Self::InvalidName => write!(f, "invalid consumer name"),
2328 Self::DoesNotExist => write!(f, "consumer does not exist"),
2329 }
2330 }
2331}
2332
2333pub type ConsumerUpdateError = Error<ConsumerUpdateErrorKind>;
2334
2335impl From<super::errors::Error> for ConsumerError {
2336 fn from(err: super::errors::Error) -> Self {
2337 ConsumerError::new(ConsumerErrorKind::JetStream(err))
2338 }
2339}
2340impl From<super::errors::Error> for ConsumerCreateStrictError {
2341 fn from(err: super::errors::Error) -> Self {
2342 if err.error_code() == super::errors::ErrorCode::CONSUMER_ALREADY_EXISTS {
2343 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::AlreadyExists)
2344 } else {
2345 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::JetStream(err))
2346 }
2347 }
2348}
2349impl From<super::errors::Error> for ConsumerUpdateError {
2350 fn from(err: super::errors::Error) -> Self {
2351 if err.error_code() == super::errors::ErrorCode::CONSUMER_DOES_NOT_EXIST {
2352 ConsumerUpdateError::new(ConsumerUpdateErrorKind::DoesNotExist)
2353 } else {
2354 ConsumerUpdateError::new(ConsumerUpdateErrorKind::JetStream(err))
2355 }
2356 }
2357}
2358impl From<ConsumerError> for ConsumerUpdateError {
2359 fn from(err: ConsumerError) -> Self {
2360 match err.kind() {
2361 ConsumerErrorKind::JetStream(err) => {
2362 if err.error_code() == super::errors::ErrorCode::CONSUMER_DOES_NOT_EXIST {
2363 ConsumerUpdateError::new(ConsumerUpdateErrorKind::DoesNotExist)
2364 } else {
2365 ConsumerUpdateError::new(ConsumerUpdateErrorKind::JetStream(err))
2366 }
2367 }
2368 ConsumerErrorKind::Request => {
2369 ConsumerUpdateError::new(ConsumerUpdateErrorKind::Request)
2370 }
2371 ConsumerErrorKind::TimedOut => {
2372 ConsumerUpdateError::new(ConsumerUpdateErrorKind::TimedOut)
2373 }
2374 ConsumerErrorKind::InvalidConsumerType => {
2375 ConsumerUpdateError::new(ConsumerUpdateErrorKind::InvalidConsumerType)
2376 }
2377 ConsumerErrorKind::InvalidName => {
2378 ConsumerUpdateError::new(ConsumerUpdateErrorKind::InvalidName)
2379 }
2380 ConsumerErrorKind::Other => ConsumerUpdateError::new(ConsumerUpdateErrorKind::Other),
2381 }
2382 }
2383}
2384
2385impl From<ConsumerError> for ConsumerCreateStrictError {
2386 fn from(err: ConsumerError) -> Self {
2387 match err.kind() {
2388 ConsumerErrorKind::JetStream(err) => {
2389 if err.error_code() == super::errors::ErrorCode::CONSUMER_ALREADY_EXISTS {
2390 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::AlreadyExists)
2391 } else {
2392 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::JetStream(err))
2393 }
2394 }
2395 ConsumerErrorKind::Request => {
2396 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::Request)
2397 }
2398 ConsumerErrorKind::TimedOut => {
2399 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::TimedOut)
2400 }
2401 ConsumerErrorKind::InvalidConsumerType => {
2402 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::InvalidConsumerType)
2403 }
2404 ConsumerErrorKind::InvalidName => {
2405 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::InvalidName)
2406 }
2407 ConsumerErrorKind::Other => {
2408 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::Other)
2409 }
2410 }
2411 }
2412}
2413
2414impl From<super::context::RequestError> for ConsumerError {
2415 fn from(err: super::context::RequestError) -> Self {
2416 match err.kind() {
2417 RequestErrorKind::TimedOut => ConsumerError::new(ConsumerErrorKind::TimedOut),
2418 _ => ConsumerError::with_source(ConsumerErrorKind::Request, err),
2419 }
2420 }
2421}
2422impl From<super::context::RequestError> for ConsumerUpdateError {
2423 fn from(err: super::context::RequestError) -> Self {
2424 match err.kind() {
2425 RequestErrorKind::TimedOut => {
2426 ConsumerUpdateError::new(ConsumerUpdateErrorKind::TimedOut)
2427 }
2428 _ => ConsumerUpdateError::with_source(ConsumerUpdateErrorKind::Request, err),
2429 }
2430 }
2431}
2432impl From<super::context::RequestError> for ConsumerCreateStrictError {
2433 fn from(err: super::context::RequestError) -> Self {
2434 match err.kind() {
2435 RequestErrorKind::TimedOut => {
2436 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::TimedOut)
2437 }
2438 _ => {
2439 ConsumerCreateStrictError::with_source(ConsumerCreateStrictErrorKind::Request, err)
2440 }
2441 }
2442 }
2443}
2444
2445#[derive(Debug, Serialize, Default)]
2446pub struct DirectGetRequest {
2447 #[serde(rename = "seq", skip_serializing_if = "Option::is_none")]
2448 sequence: Option<u64>,
2449 #[serde(rename = "last_by_subj", skip_serializing)]
2450 last_by_subject: Option<String>,
2451 #[serde(rename = "next_by_subj", skip_serializing_if = "Option::is_none")]
2452 next_by_subject: Option<String>,
2453}
2454
2455pub struct WithHeaders;
2457
2458pub struct WithoutHeaders;
2460
2461trait DirectGetResponse: Sized {
2463 fn from_message(message: crate::Message) -> Result<Self, DirectGetError>;
2464}
2465
2466impl DirectGetResponse for StreamMessage {
2467 fn from_message(message: crate::Message) -> Result<Self, DirectGetError> {
2468 StreamMessage::try_from(message).map_err(Into::into)
2469 }
2470}
2471
2472impl DirectGetResponse for StreamValue {
2473 fn from_message(message: crate::Message) -> Result<Self, DirectGetError> {
2474 Ok(StreamValue {
2475 data: message.payload,
2476 })
2477 }
2478}
2479
2480pub struct DirectGetBuilder<T = WithHeaders> {
2481 context: Context,
2482 stream_name: String,
2483 request: DirectGetRequest,
2484 _phantom: std::marker::PhantomData<T>,
2485}
2486
2487impl DirectGetBuilder<WithHeaders> {
2488 fn new(context: Context, stream_name: String) -> DirectGetBuilder<WithHeaders> {
2489 DirectGetBuilder {
2490 context,
2491 stream_name,
2492 request: DirectGetRequest::default(),
2493 _phantom: std::marker::PhantomData,
2494 }
2495 }
2496}
2497
2498impl<T> DirectGetBuilder<T> {
2499 async fn send_internal<R: DirectGetResponse>(&self) -> Result<R, DirectGetError> {
2501 let payload = if self.request.last_by_subject.is_some() {
2504 Bytes::new()
2505 } else {
2506 serde_json::to_vec(&self.request).map(Bytes::from)?
2507 };
2508
2509 let request_subject = if let Some(ref subject) = self.request.last_by_subject {
2510 format!(
2511 "{}.DIRECT.GET.{}.{}",
2512 &self.context.prefix, &self.stream_name, subject
2513 )
2514 } else {
2515 format!("{}.DIRECT.GET.{}", &self.context.prefix, &self.stream_name)
2516 };
2517
2518 let response = self
2519 .context
2520 .client
2521 .request(request_subject, payload)
2522 .await?;
2523
2524 if let Some(status) = response.status {
2526 if let Some(ref description) = response.description {
2527 match status {
2528 StatusCode::NOT_FOUND => {
2529 return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
2530 }
2531 StatusCode::TIMEOUT => {
2533 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
2534 }
2535 _ => {
2536 return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
2537 status,
2538 description.to_string(),
2539 )));
2540 }
2541 }
2542 }
2543 }
2544
2545 R::from_message(response)
2546 }
2547
2548 pub fn sequence(mut self, seq: u64) -> Self {
2550 self.request.sequence = Some(seq);
2551 self
2552 }
2553
2554 pub fn last_by_subject<S: Into<String>>(mut self, subject: S) -> Self {
2556 self.request.last_by_subject = Some(subject.into());
2557 self
2558 }
2559
2560 pub fn next_by_subject<S: Into<String>>(mut self, subject: S) -> Self {
2562 self.request.next_by_subject = Some(subject.into());
2563 self
2564 }
2565}
2566
2567impl DirectGetBuilder<WithHeaders> {
2568 pub async fn send(self) -> Result<StreamMessage, DirectGetError> {
2570 self.send_internal::<StreamMessage>().await
2571 }
2572}
2573
2574impl DirectGetBuilder<WithoutHeaders> {
2575 pub async fn send(self) -> Result<StreamValue, DirectGetError> {
2577 self.send_internal::<StreamValue>().await
2578 }
2579}
2580
2581pub struct StreamValue {
2582 pub data: Bytes,
2583}
2584
2585#[derive(Debug, Serialize, Default)]
2586pub struct RawMessageRequest {
2587 #[serde(rename = "seq", skip_serializing_if = "Option::is_none")]
2588 sequence: Option<u64>,
2589 #[serde(rename = "last_by_subj", skip_serializing_if = "Option::is_none")]
2590 last_by_subject: Option<String>,
2591 #[serde(rename = "next_by_subj", skip_serializing_if = "Option::is_none")]
2592 next_by_subject: Option<String>,
2593}
2594
2595trait RawMessageResponse: Sized {
2597 fn from_raw_message(message: RawMessage) -> Result<Self, RawMessageError>;
2598}
2599
2600impl RawMessageResponse for StreamMessage {
2601 fn from_raw_message(message: RawMessage) -> Result<Self, RawMessageError> {
2602 StreamMessage::try_from(message)
2603 .map_err(|err| RawMessageError::with_source(RawMessageErrorKind::Other, err))
2604 }
2605}
2606
2607impl RawMessageResponse for StreamValue {
2608 fn from_raw_message(message: RawMessage) -> Result<Self, RawMessageError> {
2609 use base64::engine::general_purpose::STANDARD;
2610 use base64::Engine;
2611
2612 let decoded_payload = STANDARD.decode(message.payload).map_err(|err| {
2613 RawMessageError::with_source(
2614 RawMessageErrorKind::Other,
2615 Box::new(std::io::Error::other(err)),
2616 )
2617 })?;
2618
2619 Ok(StreamValue {
2620 data: decoded_payload.into(),
2621 })
2622 }
2623}
2624
2625pub struct RawMessageBuilder<T = WithHeaders> {
2626 context: Context,
2627 stream_name: String,
2628 request: RawMessageRequest,
2629 _phantom: std::marker::PhantomData<T>,
2630}
2631
2632impl RawMessageBuilder<WithHeaders> {
2633 fn new(context: Context, stream_name: String) -> Self {
2634 RawMessageBuilder {
2635 context,
2636 stream_name,
2637 request: RawMessageRequest::default(),
2638 _phantom: std::marker::PhantomData,
2639 }
2640 }
2641}
2642
2643impl<T> RawMessageBuilder<T> {
2644 async fn send_internal<R: RawMessageResponse>(&self) -> Result<R, RawMessageError> {
2646 for subject in [&self.request.last_by_subject, &self.request.next_by_subject]
2648 .into_iter()
2649 .flatten()
2650 {
2651 if !is_valid_subject(subject) {
2652 return Err(RawMessageError::new(RawMessageErrorKind::InvalidSubject));
2653 }
2654 }
2655
2656 let subject = format!("STREAM.MSG.GET.{}", &self.stream_name);
2657
2658 let response: Response<GetRawMessage> = self
2659 .context
2660 .request(subject, &self.request)
2661 .map_err(|err| RawMessageError::with_source(RawMessageErrorKind::Other, err))
2662 .await?;
2663
2664 match response {
2665 Response::Err { error } => {
2666 if error.error_code() == ErrorCode::NO_MESSAGE_FOUND {
2667 Err(RawMessageError::new(RawMessageErrorKind::NoMessageFound))
2668 } else {
2669 Err(RawMessageError::new(RawMessageErrorKind::JetStream(error)))
2670 }
2671 }
2672 Response::Ok(value) => R::from_raw_message(value.message),
2673 }
2674 }
2675
2676 pub fn sequence(mut self, seq: u64) -> Self {
2678 self.request.sequence = Some(seq);
2679 self
2680 }
2681
2682 pub fn last_by_subject<S: Into<String>>(mut self, subject: S) -> Self {
2684 self.request.last_by_subject = Some(subject.into());
2685 self
2686 }
2687
2688 pub fn next_by_subject<S: Into<String>>(mut self, subject: S) -> Self {
2690 self.request.next_by_subject = Some(subject.into());
2691 self
2692 }
2693}
2694
2695impl RawMessageBuilder<WithHeaders> {
2696 pub async fn send(self) -> Result<StreamMessage, RawMessageError> {
2698 self.send_internal::<StreamMessage>().await
2699 }
2700}
2701
2702impl RawMessageBuilder<WithoutHeaders> {
2703 pub async fn send(self) -> Result<StreamValue, RawMessageError> {
2705 self.send_internal::<StreamValue>().await
2706 }
2707}
2708
2709#[cfg(test)]
2710mod tests {
2711 use super::*;
2712
2713 #[test]
2714 fn consumer_limits_de() {
2715 let config = Config {
2716 ..Default::default()
2717 };
2718
2719 let roundtrip: Config = {
2720 let ser = serde_json::to_string(&config).unwrap();
2721 serde_json::from_str(&ser).unwrap()
2722 };
2723 assert_eq!(config, roundtrip);
2724 }
2725}