1use std::{
17 collections::{self, HashMap},
18 fmt::{self, Debug, Display},
19 future::IntoFuture,
20 io::{self, ErrorKind},
21 pin::Pin,
22 str::FromStr,
23 task::Poll,
24 time::Duration,
25};
26
27use crate::{
28 error::Error, header::HeaderName, is_valid_subject, HeaderMap, HeaderValue, StatusCode,
29};
30use base64::engine::general_purpose::STANDARD;
31use base64::engine::Engine;
32use bytes::Bytes;
33use futures::{future::BoxFuture, FutureExt, TryFutureExt};
34use serde::{Deserialize, Deserializer, Serialize};
35use serde_json::json;
36use time::{serde::rfc3339, OffsetDateTime};
37
38use super::{
39 consumer::{self, Consumer, FromConsumer, IntoConsumerConfig},
40 context::{RequestError, RequestErrorKind, StreamsError, StreamsErrorKind},
41 errors::ErrorCode,
42 message::{StreamMessage, StreamMessageError},
43 response::Response,
44 Context, Message,
45};
46
47pub type InfoError = RequestError;
48
49#[derive(Clone, Debug, PartialEq)]
50pub enum DirectGetErrorKind {
51 NotFound,
52 InvalidSubject,
53 TimedOut,
54 Request,
55 ErrorResponse(StatusCode, String),
56 Other,
57}
58
59impl Display for DirectGetErrorKind {
60 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61 match self {
62 Self::InvalidSubject => write!(f, "invalid subject"),
63 Self::NotFound => write!(f, "message not found"),
64 Self::ErrorResponse(status, description) => {
65 write!(f, "unable to get message: {} {}", status, description)
66 }
67 Self::Other => write!(f, "error getting message"),
68 Self::TimedOut => write!(f, "timed out"),
69 Self::Request => write!(f, "request failed"),
70 }
71 }
72}
73
74pub type DirectGetError = Error<DirectGetErrorKind>;
75
76impl From<crate::RequestError> for DirectGetError {
77 fn from(err: crate::RequestError) -> Self {
78 match err.kind() {
79 crate::RequestErrorKind::TimedOut => DirectGetError::new(DirectGetErrorKind::TimedOut),
80 crate::RequestErrorKind::NoResponders => {
81 DirectGetError::new(DirectGetErrorKind::ErrorResponse(
82 StatusCode::NO_RESPONDERS,
83 "no responders".to_string(),
84 ))
85 }
86 crate::RequestErrorKind::Other => {
87 DirectGetError::with_source(DirectGetErrorKind::Other, err)
88 }
89 }
90 }
91}
92
93impl From<serde_json::Error> for DirectGetError {
94 fn from(err: serde_json::Error) -> Self {
95 DirectGetError::with_source(DirectGetErrorKind::Other, err)
96 }
97}
98
99impl From<StreamMessageError> for DirectGetError {
100 fn from(err: StreamMessageError) -> Self {
101 DirectGetError::with_source(DirectGetErrorKind::Other, err)
102 }
103}
104
105#[derive(Clone, Debug, PartialEq)]
106pub enum DeleteMessageErrorKind {
107 Request,
108 TimedOut,
109 JetStream(super::errors::Error),
110}
111
112impl Display for DeleteMessageErrorKind {
113 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
114 match self {
115 Self::Request => write!(f, "request failed"),
116 Self::TimedOut => write!(f, "timed out"),
117 Self::JetStream(err) => write!(f, "JetStream error: {}", err),
118 }
119 }
120}
121
122pub type DeleteMessageError = Error<DeleteMessageErrorKind>;
123
124#[derive(Debug, Clone)]
128pub struct Stream<T = Info> {
129 pub(crate) info: T,
130 pub(crate) context: Context,
131 pub(crate) name: String,
132}
133
134impl Stream<Info> {
135 pub async fn info(&mut self) -> Result<&Info, InfoError> {
153 let subject = format!("STREAM.INFO.{}", self.info.config.name);
154
155 match self.context.request(subject, &json!({})).await? {
156 Response::Ok::<Info>(info) => {
157 self.info = info;
158 Ok(&self.info)
159 }
160 Response::Err { error } => Err(error.into()),
161 }
162 }
163
164 pub fn cached_info(&self) -> &Info {
183 &self.info
184 }
185}
186
187impl<I> Stream<I> {
188 pub async fn get_info(&self) -> Result<Info, InfoError> {
191 let subject = format!("STREAM.INFO.{}", self.name);
192
193 match self.context.request(subject, &json!({})).await? {
194 Response::Ok::<Info>(info) => Ok(info),
195 Response::Err { error } => Err(error.into()),
196 }
197 }
198
199 pub async fn info_with_subjects<F: AsRef<str>>(
222 &self,
223 subjects_filter: F,
224 ) -> Result<InfoWithSubjects, InfoError> {
225 let subjects_filter = subjects_filter.as_ref().to_string();
226 let info = stream_info_with_details(
228 self.context.clone(),
229 self.name.clone(),
230 0,
231 false,
232 subjects_filter.clone(),
233 )
234 .await?;
235
236 Ok(InfoWithSubjects::new(
237 self.context.clone(),
238 info,
239 subjects_filter,
240 ))
241 }
242
243 pub fn info_builder(&self) -> StreamInfoBuilder {
269 StreamInfoBuilder::new(self.context.clone(), self.name.clone())
270 }
271
272 pub async fn direct_get_next_for_subject<T: AsRef<str>>(
307 &self,
308 subject: T,
309 sequence: Option<u64>,
310 ) -> Result<Message, DirectGetError> {
311 if !is_valid_subject(&subject) {
312 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject));
313 }
314 let request_subject = format!("{}.DIRECT.GET.{}", &self.context.prefix, &self.name);
315 let payload;
316 if let Some(sequence) = sequence {
317 payload = json!({
318 "seq": sequence,
319 "next_by_subj": subject.as_ref(),
320 });
321 } else {
322 payload = json!({
323 "next_by_subj": subject.as_ref(),
324 });
325 }
326
327 let response = self
328 .context
329 .client
330 .request(
331 request_subject,
332 serde_json::to_vec(&payload).map(Bytes::from)?,
333 )
334 .await
335 .map(|message| Message {
336 message,
337 context: self.context.clone(),
338 })?;
339
340 if let Some(status) = response.status {
341 if let Some(ref description) = response.description {
342 match status {
343 StatusCode::NOT_FOUND => {
344 return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
345 }
346 StatusCode::TIMEOUT => {
348 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
349 }
350 _ => {
351 return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
352 status,
353 description.to_string(),
354 )));
355 }
356 }
357 }
358 }
359 Ok(response)
360 }
361
362 pub async fn direct_get_first_for_subject<T: AsRef<str>>(
394 &self,
395 subject: T,
396 ) -> Result<Message, DirectGetError> {
397 if !is_valid_subject(&subject) {
398 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject));
399 }
400 let request_subject = format!("{}.DIRECT.GET.{}", &self.context.prefix, &self.name);
401 let payload = json!({
402 "next_by_subj": subject.as_ref(),
403 });
404
405 let response = self
406 .context
407 .client
408 .request(
409 request_subject,
410 serde_json::to_vec(&payload).map(Bytes::from)?,
411 )
412 .await
413 .map(|message| Message {
414 message,
415 context: self.context.clone(),
416 })?;
417 if let Some(status) = response.status {
418 if let Some(ref description) = response.description {
419 match status {
420 StatusCode::NOT_FOUND => {
421 return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
422 }
423 StatusCode::TIMEOUT => {
425 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
426 }
427 _ => {
428 return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
429 status,
430 description.to_string(),
431 )));
432 }
433 }
434 }
435 }
436 Ok(response)
437 }
438
439 pub async fn direct_get(&self, sequence: u64) -> Result<StreamMessage, DirectGetError> {
471 let subject = format!("{}.DIRECT.GET.{}", &self.context.prefix, &self.name);
472 let payload = json!({
473 "seq": sequence,
474 });
475
476 let response = self
477 .context
478 .client
479 .request(subject, serde_json::to_vec(&payload).map(Bytes::from)?)
480 .await?;
481
482 if let Some(status) = response.status {
483 if let Some(ref description) = response.description {
484 match status {
485 StatusCode::NOT_FOUND => {
486 return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
487 }
488 StatusCode::TIMEOUT => {
490 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
491 }
492 _ => {
493 return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
494 status,
495 description.to_string(),
496 )));
497 }
498 }
499 }
500 }
501 StreamMessage::try_from(response).map_err(Into::into)
502 }
503
504 pub async fn direct_get_last_for_subject<T: AsRef<str>>(
536 &self,
537 subject: T,
538 ) -> Result<StreamMessage, DirectGetError> {
539 let subject = format!(
540 "{}.DIRECT.GET.{}.{}",
541 &self.context.prefix,
542 &self.name,
543 subject.as_ref()
544 );
545
546 let response = self.context.client.request(subject, "".into()).await?;
547 if let Some(status) = response.status {
548 if let Some(ref description) = response.description {
549 match status {
550 StatusCode::NOT_FOUND => {
551 return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
552 }
553 StatusCode::TIMEOUT => {
555 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
556 }
557 _ => {
558 return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
559 status,
560 description.to_string(),
561 )));
562 }
563 }
564 }
565 }
566 StreamMessage::try_from(response).map_err(Into::into)
567 }
568 pub async fn get_raw_message(&self, sequence: u64) -> Result<StreamMessage, RawMessageError> {
598 self.raw_message(StreamGetMessage {
599 sequence: Some(sequence),
600 last_by_subject: None,
601 next_by_subject: None,
602 })
603 .await
604 }
605
606 pub async fn get_first_raw_message_by_subject<T: AsRef<str>>(
630 &self,
631 subject: T,
632 sequence: u64,
633 ) -> Result<StreamMessage, RawMessageError> {
634 self.raw_message(StreamGetMessage {
635 sequence: Some(sequence),
636 last_by_subject: None,
637 next_by_subject: Some(subject.as_ref().to_string()),
638 })
639 .await
640 }
641
642 pub async fn get_next_raw_message_by_subject<T: AsRef<str>>(
666 &self,
667 subject: T,
668 ) -> Result<StreamMessage, RawMessageError> {
669 self.raw_message(StreamGetMessage {
670 sequence: None,
671 last_by_subject: None,
672 next_by_subject: Some(subject.as_ref().to_string()),
673 })
674 .await
675 }
676
677 async fn raw_message(
678 &self,
679 request: StreamGetMessage,
680 ) -> Result<StreamMessage, RawMessageError> {
681 for subject in [&request.last_by_subject, &request.next_by_subject]
682 .into_iter()
683 .flatten()
684 {
685 if !is_valid_subject(subject) {
686 return Err(RawMessageError::new(RawMessageErrorKind::InvalidSubject));
687 }
688 }
689 let subject = format!("STREAM.MSG.GET.{}", &self.name);
690
691 let response: Response<GetRawMessage> = self
692 .context
693 .request(subject, &request)
694 .map_err(|err| LastRawMessageError::with_source(LastRawMessageErrorKind::Other, err))
695 .await?;
696
697 match response {
698 Response::Err { error } => {
699 if error.error_code() == ErrorCode::NO_MESSAGE_FOUND {
700 Err(LastRawMessageError::new(
701 LastRawMessageErrorKind::NoMessageFound,
702 ))
703 } else {
704 Err(LastRawMessageError::new(
705 LastRawMessageErrorKind::JetStream(error),
706 ))
707 }
708 }
709 Response::Ok(value) => StreamMessage::try_from(value.message)
710 .map_err(|err| RawMessageError::with_source(RawMessageErrorKind::Other, err)),
711 }
712 }
713
714 pub async fn get_last_raw_message_by_subject(
738 &self,
739 stream_subject: &str,
740 ) -> Result<StreamMessage, LastRawMessageError> {
741 self.raw_message(StreamGetMessage {
742 sequence: None,
743 last_by_subject: Some(stream_subject.to_string()),
744 next_by_subject: None,
745 })
746 .await
747 }
748
749 pub async fn delete_message(&self, sequence: u64) -> Result<bool, DeleteMessageError> {
773 let subject = format!("STREAM.MSG.DELETE.{}", &self.name);
774 let payload = json!({
775 "seq": sequence,
776 });
777
778 let response: Response<DeleteStatus> = self
779 .context
780 .request(subject, &payload)
781 .map_err(|err| match err.kind() {
782 RequestErrorKind::TimedOut => {
783 DeleteMessageError::new(DeleteMessageErrorKind::TimedOut)
784 }
785 _ => DeleteMessageError::with_source(DeleteMessageErrorKind::Request, err),
786 })
787 .await?;
788
789 match response {
790 Response::Err { error } => Err(DeleteMessageError::new(
791 DeleteMessageErrorKind::JetStream(error),
792 )),
793 Response::Ok(value) => Ok(value.success),
794 }
795 }
796
797 pub fn purge(&self) -> Purge<No, No> {
813 Purge::build(self)
814 }
815
816 #[deprecated(
833 since = "0.25.0",
834 note = "Overloads have been replaced with an into_future based builder. Use Stream::purge().filter(subject) instead."
835 )]
836 pub async fn purge_subject<T>(&self, subject: T) -> Result<PurgeResponse, PurgeError>
837 where
838 T: Into<String>,
839 {
840 self.purge().filter(subject).await
841 }
842
843 pub async fn create_consumer<C: IntoConsumerConfig + FromConsumer>(
867 &self,
868 config: C,
869 ) -> Result<Consumer<C>, ConsumerError> {
870 self.context
871 .create_consumer_on_stream(config, self.name.clone())
872 .await
873 }
874
875 #[cfg(feature = "server_2_10")]
899 pub async fn update_consumer<C: IntoConsumerConfig + FromConsumer>(
900 &self,
901 config: C,
902 ) -> Result<Consumer<C>, ConsumerUpdateError> {
903 self.context
904 .update_consumer_on_stream(config, self.name.clone())
905 .await
906 }
907
908 #[cfg(feature = "server_2_10")]
933 pub async fn create_consumer_strict<C: IntoConsumerConfig + FromConsumer>(
934 &self,
935 config: C,
936 ) -> Result<Consumer<C>, ConsumerCreateStrictError> {
937 self.context
938 .create_consumer_strict_on_stream(config, self.name.clone())
939 .await
940 }
941
942 pub async fn consumer_info<T: AsRef<str>>(
959 &self,
960 name: T,
961 ) -> Result<consumer::Info, crate::Error> {
962 let name = name.as_ref();
963
964 let subject = format!("CONSUMER.INFO.{}.{}", self.name, name);
965
966 match self.context.request(subject, &json!({})).await? {
967 Response::Ok(info) => Ok(info),
968 Response::Err { error } => Err(Box::new(std::io::Error::new(
969 ErrorKind::Other,
970 format!("nats: error while getting consumer info: {}", error),
971 ))),
972 }
973 }
974
975 pub async fn get_consumer<T: FromConsumer + IntoConsumerConfig>(
994 &self,
995 name: &str,
996 ) -> Result<Consumer<T>, crate::Error> {
997 let info = self.consumer_info(name).await?;
998
999 Ok(Consumer::new(
1000 T::try_from_consumer_config(info.config.clone())?,
1001 info,
1002 self.context.clone(),
1003 ))
1004 }
1005
1006 pub async fn get_or_create_consumer<T: FromConsumer + IntoConsumerConfig>(
1034 &self,
1035 name: &str,
1036 config: T,
1037 ) -> Result<Consumer<T>, ConsumerError> {
1038 let subject = format!("CONSUMER.INFO.{}.{}", self.name, name);
1039
1040 match self.context.request(subject, &json!({})).await? {
1041 Response::Err { error } if error.code() == 404 => self.create_consumer(config).await,
1042 Response::Err { error } => Err(error.into()),
1043 Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
1044 T::try_from_consumer_config(info.config.clone()).map_err(|err| {
1045 ConsumerError::with_source(ConsumerErrorKind::InvalidConsumerType, err)
1046 })?,
1047 info,
1048 self.context.clone(),
1049 )),
1050 }
1051 }
1052
1053 pub async fn delete_consumer(&self, name: &str) -> Result<DeleteStatus, ConsumerError> {
1074 let subject = format!("CONSUMER.DELETE.{}.{}", self.name, name);
1075
1076 match self.context.request(subject, &json!({})).await? {
1077 Response::Ok(delete_status) => Ok(delete_status),
1078 Response::Err { error } => Err(error.into()),
1079 }
1080 }
1081
1082 pub fn consumer_names(&self) -> ConsumerNames {
1101 ConsumerNames {
1102 context: self.context.clone(),
1103 stream: self.name.clone(),
1104 offset: 0,
1105 page_request: None,
1106 consumers: Vec::new(),
1107 done: false,
1108 }
1109 }
1110
1111 pub fn consumers(&self) -> Consumers {
1130 Consumers {
1131 context: self.context.clone(),
1132 stream: self.name.clone(),
1133 offset: 0,
1134 page_request: None,
1135 consumers: Vec::new(),
1136 done: false,
1137 }
1138 }
1139}
1140
1141pub struct StreamInfoBuilder {
1142 pub(crate) context: Context,
1143 pub(crate) name: String,
1144 pub(crate) deleted: bool,
1145 pub(crate) subject: String,
1146}
1147
1148impl StreamInfoBuilder {
1149 fn new(context: Context, name: String) -> Self {
1150 Self {
1151 context,
1152 name,
1153 deleted: false,
1154 subject: "".to_string(),
1155 }
1156 }
1157
1158 pub fn with_deleted(mut self, deleted: bool) -> Self {
1159 self.deleted = deleted;
1160 self
1161 }
1162
1163 pub fn subjects<S: Into<String>>(mut self, subject: S) -> Self {
1164 self.subject = subject.into();
1165 self
1166 }
1167
1168 pub async fn fetch(self) -> Result<InfoWithSubjects, InfoError> {
1169 let info = stream_info_with_details(
1170 self.context.clone(),
1171 self.name.clone(),
1172 0,
1173 self.deleted,
1174 self.subject.clone(),
1175 )
1176 .await?;
1177
1178 Ok(InfoWithSubjects::new(self.context, info, self.subject))
1179 }
1180}
1181
1182#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
1186pub struct Config {
1187 pub name: String,
1189 pub max_bytes: i64,
1191 #[serde(rename = "max_msgs")]
1193 pub max_messages: i64,
1194 #[serde(rename = "max_msgs_per_subject")]
1196 pub max_messages_per_subject: i64,
1197 pub discard: DiscardPolicy,
1200 #[serde(default, skip_serializing_if = "is_default")]
1202 pub discard_new_per_subject: bool,
1203 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1206 pub subjects: Vec<String>,
1207 pub retention: RetentionPolicy,
1209 pub max_consumers: i32,
1211 #[serde(with = "serde_nanos")]
1213 pub max_age: Duration,
1214 #[serde(default, skip_serializing_if = "is_default", rename = "max_msg_size")]
1216 pub max_message_size: i32,
1217 pub storage: StorageType,
1219 pub num_replicas: usize,
1221 #[serde(default, skip_serializing_if = "is_default")]
1223 pub no_ack: bool,
1224 #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
1226 pub duplicate_window: Duration,
1227 #[serde(default, skip_serializing_if = "is_default")]
1229 pub template_owner: String,
1230 #[serde(default, skip_serializing_if = "is_default")]
1232 pub sealed: bool,
1233 #[serde(default, skip_serializing_if = "is_default")]
1235 pub description: Option<String>,
1236 #[serde(
1237 default,
1238 rename = "allow_rollup_hdrs",
1239 skip_serializing_if = "is_default"
1240 )]
1241 pub allow_rollup: bool,
1243 #[serde(default, skip_serializing_if = "is_default")]
1244 pub deny_delete: bool,
1246 #[serde(default, skip_serializing_if = "is_default")]
1248 pub deny_purge: bool,
1249
1250 #[serde(default, skip_serializing_if = "is_default")]
1252 pub republish: Option<Republish>,
1253
1254 #[serde(default, skip_serializing_if = "is_default")]
1257 pub allow_direct: bool,
1258
1259 #[serde(default, skip_serializing_if = "is_default")]
1261 pub mirror_direct: bool,
1262
1263 #[serde(default, skip_serializing_if = "Option::is_none")]
1265 pub mirror: Option<Source>,
1266
1267 #[serde(default, skip_serializing_if = "Option::is_none")]
1269 pub sources: Option<Vec<Source>>,
1270
1271 #[cfg(feature = "server_2_10")]
1272 #[serde(default, skip_serializing_if = "is_default")]
1274 pub metadata: HashMap<String, String>,
1275
1276 #[cfg(feature = "server_2_10")]
1277 #[serde(default, skip_serializing_if = "Option::is_none")]
1279 pub subject_transform: Option<SubjectTransform>,
1280
1281 #[cfg(feature = "server_2_10")]
1282 #[serde(default, skip_serializing_if = "Option::is_none")]
1287 pub compression: Option<Compression>,
1288 #[cfg(feature = "server_2_10")]
1289 #[serde(default, deserialize_with = "default_consumer_limits_as_none")]
1291 pub consumer_limits: Option<ConsumerLimits>,
1292
1293 #[cfg(feature = "server_2_10")]
1294 #[serde(default, skip_serializing_if = "Option::is_none", rename = "first_seq")]
1296 pub first_sequence: Option<u64>,
1297
1298 #[serde(default, skip_serializing_if = "Option::is_none")]
1300 pub placement: Option<Placement>,
1301}
1302
1303impl From<&Config> for Config {
1304 fn from(sc: &Config) -> Config {
1305 sc.clone()
1306 }
1307}
1308
1309impl From<&str> for Config {
1310 fn from(s: &str) -> Config {
1311 Config {
1312 name: s.to_string(),
1313 ..Default::default()
1314 }
1315 }
1316}
1317
1318#[cfg(feature = "server_2_10")]
1319fn default_consumer_limits_as_none<'de, D>(
1320 deserializer: D,
1321) -> Result<Option<ConsumerLimits>, D::Error>
1322where
1323 D: Deserializer<'de>,
1324{
1325 let consumer_limits = Option::<ConsumerLimits>::deserialize(deserializer)?;
1326 if let Some(cl) = consumer_limits {
1327 if cl == ConsumerLimits::default() {
1328 Ok(None)
1329 } else {
1330 Ok(Some(cl))
1331 }
1332 } else {
1333 Ok(None)
1334 }
1335}
1336#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Default)]
1337pub struct ConsumerLimits {
1338 #[serde(default, with = "serde_nanos")]
1340 pub inactive_threshold: std::time::Duration,
1341 #[serde(default)]
1343 pub max_ack_pending: i64,
1344}
1345
1346#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1347pub enum Compression {
1348 #[serde(rename = "s2")]
1349 S2,
1350 #[serde(rename = "none")]
1351 None,
1352}
1353
1354#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1356pub struct SubjectTransform {
1357 #[serde(rename = "src")]
1358 pub source: String,
1359
1360 #[serde(rename = "dest")]
1361 pub destination: String,
1362}
1363
1364#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1366pub struct Republish {
1367 #[serde(rename = "src")]
1369 pub source: String,
1370 #[serde(rename = "dest")]
1372 pub destination: String,
1373 #[serde(default)]
1375 pub headers_only: bool,
1376}
1377
1378#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
1380pub struct Placement {
1381 #[serde(default, skip_serializing_if = "is_default")]
1383 pub cluster: Option<String>,
1384 #[serde(default, skip_serializing_if = "is_default")]
1386 pub tags: Vec<String>,
1387}
1388
1389#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1392#[repr(u8)]
1393pub enum DiscardPolicy {
1394 #[default]
1396 #[serde(rename = "old")]
1397 Old = 0,
1398 #[serde(rename = "new")]
1400 New = 1,
1401}
1402
1403#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1405#[repr(u8)]
1406pub enum RetentionPolicy {
1407 #[default]
1410 #[serde(rename = "limits")]
1411 Limits = 0,
1412 #[serde(rename = "interest")]
1414 Interest = 1,
1415 #[serde(rename = "workqueue")]
1417 WorkQueue = 2,
1418}
1419
1420#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1422#[repr(u8)]
1423pub enum StorageType {
1424 #[default]
1426 #[serde(rename = "file")]
1427 File = 0,
1428 #[serde(rename = "memory")]
1430 Memory = 1,
1431}
1432
1433async fn stream_info_with_details(
1434 context: Context,
1435 stream: String,
1436 offset: usize,
1437 deleted_details: bool,
1438 subjects_filter: String,
1439) -> Result<Info, InfoError> {
1440 let subject = format!("STREAM.INFO.{}", stream);
1441
1442 let payload = StreamInfoRequest {
1443 offset,
1444 deleted_details,
1445 subjects_filter,
1446 };
1447
1448 let response: Response<Info> = context.request(subject, &payload).await?;
1449
1450 match response {
1451 Response::Ok(info) => Ok(info),
1452 Response::Err { error } => Err(error.into()),
1453 }
1454}
1455
1456type InfoRequest = BoxFuture<'static, Result<Info, InfoError>>;
1457
1458#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
1459pub struct StreamInfoRequest {
1460 offset: usize,
1461 deleted_details: bool,
1462 subjects_filter: String,
1463}
1464
1465pub struct InfoWithSubjects {
1466 stream: String,
1467 context: Context,
1468 pub info: Info,
1469 offset: usize,
1470 subjects: collections::hash_map::IntoIter<String, usize>,
1471 info_request: Option<InfoRequest>,
1472 subjects_filter: String,
1473 pages_done: bool,
1474}
1475
1476impl InfoWithSubjects {
1477 pub fn new(context: Context, mut info: Info, subject: String) -> Self {
1478 let subjects = info.state.subjects.take().unwrap_or_default();
1479 let name = info.config.name.clone();
1480 InfoWithSubjects {
1481 context,
1482 info,
1483 pages_done: subjects.is_empty(),
1484 offset: subjects.len(),
1485 subjects: subjects.into_iter(),
1486 subjects_filter: subject,
1487 stream: name,
1488 info_request: None,
1489 }
1490 }
1491}
1492
1493impl futures::Stream for InfoWithSubjects {
1494 type Item = Result<(String, usize), InfoError>;
1495
1496 fn poll_next(
1497 mut self: Pin<&mut Self>,
1498 cx: &mut std::task::Context<'_>,
1499 ) -> Poll<Option<Self::Item>> {
1500 match self.subjects.next() {
1501 Some((subject, count)) => Poll::Ready(Some(Ok((subject, count)))),
1502 None => {
1503 if self.pages_done {
1505 return Poll::Ready(None);
1506 }
1507 let stream = self.stream.clone();
1508 let context = self.context.clone();
1509 let subjects_filter = self.subjects_filter.clone();
1510 let offset = self.offset;
1511 match self
1512 .info_request
1513 .get_or_insert_with(|| {
1514 Box::pin(stream_info_with_details(
1515 context,
1516 stream,
1517 offset,
1518 false,
1519 subjects_filter,
1520 ))
1521 })
1522 .poll_unpin(cx)
1523 {
1524 Poll::Ready(resp) => match resp {
1525 Ok(info) => {
1526 let subjects = info.state.subjects.clone();
1527 self.offset += subjects.as_ref().map_or_else(|| 0, |s| s.len());
1528 self.info_request = None;
1529 let subjects = subjects.unwrap_or_default();
1530 self.subjects = info.state.subjects.unwrap_or_default().into_iter();
1531 let total = info.paged_info.map(|info| info.total).unwrap_or(0);
1532 if total <= self.offset || subjects.is_empty() {
1533 self.pages_done = true;
1534 }
1535 match self.subjects.next() {
1536 Some((subject, count)) => Poll::Ready(Some(Ok((subject, count)))),
1537 None => Poll::Ready(None),
1538 }
1539 }
1540 Err(err) => Poll::Ready(Some(Err(err))),
1541 },
1542 Poll::Pending => Poll::Pending,
1543 }
1544 }
1545 }
1546 }
1547}
1548
1549#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1551pub struct Info {
1552 pub config: Config,
1554 #[serde(with = "rfc3339")]
1556 pub created: time::OffsetDateTime,
1557 pub state: State,
1559 pub cluster: Option<ClusterInfo>,
1561 #[serde(default)]
1563 pub mirror: Option<SourceInfo>,
1564 #[serde(default)]
1566 pub sources: Vec<SourceInfo>,
1567 #[serde(flatten)]
1568 paged_info: Option<PagedInfo>,
1569}
1570
1571#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1572pub struct PagedInfo {
1573 offset: usize,
1574 total: usize,
1575 limit: usize,
1576}
1577
1578#[derive(Deserialize)]
1579pub struct DeleteStatus {
1580 pub success: bool,
1581}
1582
1583#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1585pub struct State {
1586 pub messages: u64,
1588 pub bytes: u64,
1590 #[serde(rename = "first_seq")]
1592 pub first_sequence: u64,
1593 #[serde(with = "rfc3339", rename = "first_ts")]
1595 pub first_timestamp: time::OffsetDateTime,
1596 #[serde(rename = "last_seq")]
1598 pub last_sequence: u64,
1599 #[serde(with = "rfc3339", rename = "last_ts")]
1601 pub last_timestamp: time::OffsetDateTime,
1602 pub consumer_count: usize,
1604 #[serde(default, rename = "num_subjects")]
1606 pub subjects_count: u64,
1607 #[serde(default, rename = "num_deleted")]
1609 pub deleted_count: Option<u64>,
1610 #[serde(default)]
1613 pub deleted: Option<Vec<u64>>,
1614
1615 pub(crate) subjects: Option<HashMap<String, usize>>,
1616}
1617
1618#[derive(Debug, Serialize, Deserialize, Clone)]
1620pub struct RawMessage {
1621 #[serde(rename = "subject")]
1623 pub subject: String,
1624
1625 #[serde(rename = "seq")]
1627 pub sequence: u64,
1628
1629 #[serde(default, rename = "data")]
1631 pub payload: String,
1632
1633 #[serde(default, rename = "hdrs")]
1635 pub headers: Option<String>,
1636
1637 #[serde(rename = "time", with = "rfc3339")]
1639 pub time: time::OffsetDateTime,
1640}
1641
1642impl TryFrom<RawMessage> for StreamMessage {
1643 type Error = crate::Error;
1644
1645 fn try_from(value: RawMessage) -> Result<Self, Self::Error> {
1646 let decoded_payload = STANDARD
1647 .decode(value.payload)
1648 .map_err(|err| Box::new(std::io::Error::new(ErrorKind::Other, err)))?;
1649 let decoded_headers = value
1650 .headers
1651 .map(|header| STANDARD.decode(header))
1652 .map_or(Ok(None), |v| v.map(Some))?;
1653
1654 let (headers, _, _) = decoded_headers
1655 .map_or_else(|| Ok((HeaderMap::new(), None, None)), |h| parse_headers(&h))?;
1656
1657 Ok(StreamMessage {
1658 subject: value.subject.into(),
1659 payload: decoded_payload.into(),
1660 headers,
1661 sequence: value.sequence,
1662 time: value.time,
1663 })
1664 }
1665}
1666
1667fn is_continuation(c: char) -> bool {
1668 c == ' ' || c == '\t'
1669}
1670const HEADER_LINE: &str = "NATS/1.0";
1671
1672#[allow(clippy::type_complexity)]
1673fn parse_headers(
1674 buf: &[u8],
1675) -> Result<(HeaderMap, Option<StatusCode>, Option<String>), crate::Error> {
1676 let mut headers = HeaderMap::new();
1677 let mut maybe_status: Option<StatusCode> = None;
1678 let mut maybe_description: Option<String> = None;
1679 let mut lines = if let Ok(line) = std::str::from_utf8(buf) {
1680 line.lines().peekable()
1681 } else {
1682 return Err(Box::new(std::io::Error::new(
1683 ErrorKind::Other,
1684 "invalid header",
1685 )));
1686 };
1687
1688 if let Some(line) = lines.next() {
1689 let line = line
1690 .strip_prefix(HEADER_LINE)
1691 .ok_or_else(|| {
1692 Box::new(std::io::Error::new(
1693 ErrorKind::Other,
1694 "version line does not start with NATS/1.0",
1695 ))
1696 })?
1697 .trim();
1698
1699 match line.split_once(' ') {
1700 Some((status, description)) => {
1701 if !status.is_empty() {
1702 maybe_status = Some(status.parse()?);
1703 }
1704
1705 if !description.is_empty() {
1706 maybe_description = Some(description.trim().to_string());
1707 }
1708 }
1709 None => {
1710 if !line.is_empty() {
1711 maybe_status = Some(line.parse()?);
1712 }
1713 }
1714 }
1715 } else {
1716 return Err(Box::new(std::io::Error::new(
1717 ErrorKind::Other,
1718 "expected header information not found",
1719 )));
1720 };
1721
1722 while let Some(line) = lines.next() {
1723 if line.is_empty() {
1724 continue;
1725 }
1726
1727 if let Some((k, v)) = line.split_once(':').to_owned() {
1728 let mut s = String::from(v.trim());
1729 while let Some(v) = lines.next_if(|s| s.starts_with(is_continuation)).to_owned() {
1730 s.push(' ');
1731 s.push_str(v.trim());
1732 }
1733
1734 headers.insert(
1735 HeaderName::from_str(k)?,
1736 HeaderValue::from_str(&s)
1737 .map_err(|err| Box::new(io::Error::new(ErrorKind::Other, err)))?,
1738 );
1739 } else {
1740 return Err(Box::new(std::io::Error::new(
1741 ErrorKind::Other,
1742 "malformed header line",
1743 )));
1744 }
1745 }
1746
1747 if headers.is_empty() {
1748 Ok((HeaderMap::new(), maybe_status, maybe_description))
1749 } else {
1750 Ok((headers, maybe_status, maybe_description))
1751 }
1752}
1753
1754#[derive(Debug, Serialize, Deserialize, Clone)]
1755struct GetRawMessage {
1756 pub(crate) message: RawMessage,
1757}
1758
1759fn is_default<T: Default + Eq>(t: &T) -> bool {
1760 t == &T::default()
1761}
1762#[derive(Debug, Default, Deserialize, Clone, PartialEq, Eq)]
1764pub struct ClusterInfo {
1765 pub name: Option<String>,
1767 pub leader: Option<String>,
1769 #[serde(default)]
1771 pub replicas: Vec<PeerInfo>,
1772}
1773
1774#[derive(Debug, Default, Deserialize, Clone, PartialEq, Eq)]
1776pub struct PeerInfo {
1777 pub name: String,
1779 pub current: bool,
1781 #[serde(with = "serde_nanos")]
1783 pub active: Duration,
1784 #[serde(default)]
1786 pub offline: bool,
1787 pub lag: Option<u64>,
1789}
1790
1791#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
1792pub struct SourceInfo {
1793 pub name: String,
1795 pub lag: u64,
1797 #[serde(deserialize_with = "negative_duration_as_none")]
1799 pub active: Option<std::time::Duration>,
1800 #[serde(default)]
1802 pub filter_subject: Option<String>,
1803 #[serde(default)]
1805 pub subject_transform_dest: Option<String>,
1806 #[serde(default)]
1808 pub subject_transforms: Vec<SubjectTransform>,
1809}
1810
1811fn negative_duration_as_none<'de, D>(
1812 deserializer: D,
1813) -> Result<Option<std::time::Duration>, D::Error>
1814where
1815 D: Deserializer<'de>,
1816{
1817 let n = i64::deserialize(deserializer)?;
1818 if n.is_negative() {
1819 Ok(None)
1820 } else {
1821 Ok(Some(std::time::Duration::from_nanos(n as u64)))
1822 }
1823}
1824
1825#[derive(Debug, Deserialize, Clone, Copy)]
1827pub struct PurgeResponse {
1828 pub success: bool,
1830 pub purged: u64,
1832}
1833#[derive(Default, Debug, Serialize, Clone)]
1835pub struct PurgeRequest {
1836 #[serde(default, rename = "seq", skip_serializing_if = "is_default")]
1838 pub sequence: Option<u64>,
1839
1840 #[serde(default, skip_serializing_if = "is_default")]
1842 pub filter: Option<String>,
1843
1844 #[serde(default, skip_serializing_if = "is_default")]
1846 pub keep: Option<u64>,
1847}
1848
1849#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
1850pub struct Source {
1851 pub name: String,
1853 #[serde(default, rename = "opt_start_seq", skip_serializing_if = "is_default")]
1855 pub start_sequence: Option<u64>,
1856 #[serde(
1857 default,
1858 rename = "opt_start_time",
1859 skip_serializing_if = "is_default",
1860 with = "rfc3339::option"
1861 )]
1862 pub start_time: Option<OffsetDateTime>,
1864 #[serde(default, skip_serializing_if = "is_default")]
1866 pub filter_subject: Option<String>,
1867 #[serde(default, skip_serializing_if = "Option::is_none")]
1869 pub external: Option<External>,
1870 #[serde(default, skip_serializing_if = "is_default")]
1872 pub domain: Option<String>,
1873 #[cfg(feature = "server_2_10")]
1875 #[serde(default, skip_serializing_if = "is_default")]
1876 pub subject_transforms: Vec<SubjectTransform>,
1877}
1878
1879#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
1880pub struct External {
1881 #[serde(rename = "api")]
1883 pub api_prefix: String,
1884 #[serde(rename = "deliver", skip_serializing_if = "is_default")]
1886 pub delivery_prefix: Option<String>,
1887}
1888
1889use std::marker::PhantomData;
1890
1891#[derive(Debug, Default)]
1892pub struct Yes;
1893#[derive(Debug, Default)]
1894pub struct No;
1895
1896pub trait ToAssign: Debug {}
1897
1898impl ToAssign for Yes {}
1899impl ToAssign for No {}
1900
1901#[derive(Debug)]
1902pub struct Purge<SEQUENCE, KEEP>
1903where
1904 SEQUENCE: ToAssign,
1905 KEEP: ToAssign,
1906{
1907 inner: PurgeRequest,
1908 sequence_set: PhantomData<SEQUENCE>,
1909 keep_set: PhantomData<KEEP>,
1910 context: Context,
1911 stream_name: String,
1912}
1913
1914impl<SEQUENCE, KEEP> Purge<SEQUENCE, KEEP>
1915where
1916 SEQUENCE: ToAssign,
1917 KEEP: ToAssign,
1918{
1919 pub fn filter<T: Into<String>>(mut self, filter: T) -> Purge<SEQUENCE, KEEP> {
1921 self.inner.filter = Some(filter.into());
1922 self
1923 }
1924}
1925
1926impl Purge<No, No> {
1927 pub(crate) fn build<I>(stream: &Stream<I>) -> Purge<No, No> {
1928 Purge {
1929 context: stream.context.clone(),
1930 stream_name: stream.name.clone(),
1931 inner: Default::default(),
1932 sequence_set: PhantomData {},
1933 keep_set: PhantomData {},
1934 }
1935 }
1936}
1937
1938impl<KEEP> Purge<No, KEEP>
1939where
1940 KEEP: ToAssign,
1941{
1942 pub fn keep(self, keep: u64) -> Purge<No, Yes> {
1945 Purge {
1946 context: self.context.clone(),
1947 stream_name: self.stream_name.clone(),
1948 sequence_set: PhantomData {},
1949 keep_set: PhantomData {},
1950 inner: PurgeRequest {
1951 keep: Some(keep),
1952 ..self.inner
1953 },
1954 }
1955 }
1956}
1957impl<SEQUENCE> Purge<SEQUENCE, No>
1958where
1959 SEQUENCE: ToAssign,
1960{
1961 pub fn sequence(self, sequence: u64) -> Purge<Yes, No> {
1964 Purge {
1965 context: self.context.clone(),
1966 stream_name: self.stream_name.clone(),
1967 sequence_set: PhantomData {},
1968 keep_set: PhantomData {},
1969 inner: PurgeRequest {
1970 sequence: Some(sequence),
1971 ..self.inner
1972 },
1973 }
1974 }
1975}
1976
1977#[derive(Clone, Debug, PartialEq)]
1978pub enum PurgeErrorKind {
1979 Request,
1980 TimedOut,
1981 JetStream(super::errors::Error),
1982}
1983
1984impl Display for PurgeErrorKind {
1985 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1986 match self {
1987 Self::Request => write!(f, "request failed"),
1988 Self::TimedOut => write!(f, "timed out"),
1989 Self::JetStream(err) => write!(f, "JetStream error: {}", err),
1990 }
1991 }
1992}
1993
1994pub type PurgeError = Error<PurgeErrorKind>;
1995
1996impl<S, K> IntoFuture for Purge<S, K>
1997where
1998 S: ToAssign + std::marker::Send,
1999 K: ToAssign + std::marker::Send,
2000{
2001 type Output = Result<PurgeResponse, PurgeError>;
2002
2003 type IntoFuture = BoxFuture<'static, Result<PurgeResponse, PurgeError>>;
2004
2005 fn into_future(self) -> Self::IntoFuture {
2006 Box::pin(std::future::IntoFuture::into_future(async move {
2007 let request_subject = format!("STREAM.PURGE.{}", self.stream_name);
2008 let response: Response<PurgeResponse> = self
2009 .context
2010 .request(request_subject, &self.inner)
2011 .map_err(|err| match err.kind() {
2012 RequestErrorKind::TimedOut => PurgeError::new(PurgeErrorKind::TimedOut),
2013 _ => PurgeError::with_source(PurgeErrorKind::Request, err),
2014 })
2015 .await?;
2016
2017 match response {
2018 Response::Err { error } => Err(PurgeError::new(PurgeErrorKind::JetStream(error))),
2019 Response::Ok(response) => Ok(response),
2020 }
2021 }))
2022 }
2023}
2024
2025#[derive(Deserialize, Debug)]
2026struct ConsumerPage {
2027 total: usize,
2028 consumers: Option<Vec<String>>,
2029}
2030
2031#[derive(Deserialize, Debug)]
2032struct ConsumerInfoPage {
2033 total: usize,
2034 consumers: Option<Vec<super::consumer::Info>>,
2035}
2036
2037type ConsumerNamesErrorKind = StreamsErrorKind;
2038type ConsumerNamesError = StreamsError;
2039type PageRequest = BoxFuture<'static, Result<ConsumerPage, RequestError>>;
2040
2041pub struct ConsumerNames {
2042 context: Context,
2043 stream: String,
2044 offset: usize,
2045 page_request: Option<PageRequest>,
2046 consumers: Vec<String>,
2047 done: bool,
2048}
2049
2050impl futures::Stream for ConsumerNames {
2051 type Item = Result<String, ConsumerNamesError>;
2052
2053 fn poll_next(
2054 mut self: Pin<&mut Self>,
2055 cx: &mut std::task::Context<'_>,
2056 ) -> std::task::Poll<Option<Self::Item>> {
2057 match self.page_request.as_mut() {
2058 Some(page) => match page.try_poll_unpin(cx) {
2059 std::task::Poll::Ready(page) => {
2060 self.page_request = None;
2061 let page = page.map_err(|err| {
2062 ConsumerNamesError::with_source(ConsumerNamesErrorKind::Other, err)
2063 })?;
2064
2065 if let Some(consumers) = page.consumers {
2066 self.offset += consumers.len();
2067 self.consumers = consumers;
2068 if self.offset >= page.total {
2069 self.done = true;
2070 }
2071 match self.consumers.pop() {
2072 Some(stream) => Poll::Ready(Some(Ok(stream))),
2073 None => Poll::Ready(None),
2074 }
2075 } else {
2076 Poll::Ready(None)
2077 }
2078 }
2079 std::task::Poll::Pending => std::task::Poll::Pending,
2080 },
2081 None => {
2082 if let Some(stream) = self.consumers.pop() {
2083 Poll::Ready(Some(Ok(stream)))
2084 } else {
2085 if self.done {
2086 return Poll::Ready(None);
2087 }
2088 let context = self.context.clone();
2089 let offset = self.offset;
2090 let stream = self.stream.clone();
2091 self.page_request = Some(Box::pin(async move {
2092 match context
2093 .request(
2094 format!("CONSUMER.NAMES.{stream}"),
2095 &json!({
2096 "offset": offset,
2097 }),
2098 )
2099 .await?
2100 {
2101 Response::Err { error } => Err(RequestError::with_source(
2102 super::context::RequestErrorKind::Other,
2103 error,
2104 )),
2105 Response::Ok(page) => Ok(page),
2106 }
2107 }));
2108 self.poll_next(cx)
2109 }
2110 }
2111 }
2112 }
2113}
2114
2115pub type ConsumersErrorKind = StreamsErrorKind;
2116pub type ConsumersError = StreamsError;
2117type PageInfoRequest = BoxFuture<'static, Result<ConsumerInfoPage, RequestError>>;
2118
2119pub struct Consumers {
2120 context: Context,
2121 stream: String,
2122 offset: usize,
2123 page_request: Option<PageInfoRequest>,
2124 consumers: Vec<super::consumer::Info>,
2125 done: bool,
2126}
2127
2128impl futures::Stream for Consumers {
2129 type Item = Result<super::consumer::Info, ConsumersError>;
2130
2131 fn poll_next(
2132 mut self: Pin<&mut Self>,
2133 cx: &mut std::task::Context<'_>,
2134 ) -> std::task::Poll<Option<Self::Item>> {
2135 match self.page_request.as_mut() {
2136 Some(page) => match page.try_poll_unpin(cx) {
2137 std::task::Poll::Ready(page) => {
2138 self.page_request = None;
2139 let page = page.map_err(|err| {
2140 ConsumersError::with_source(ConsumersErrorKind::Other, err)
2141 })?;
2142 if let Some(consumers) = page.consumers {
2143 self.offset += consumers.len();
2144 self.consumers = consumers;
2145 if self.offset >= page.total {
2146 self.done = true;
2147 }
2148 match self.consumers.pop() {
2149 Some(consumer) => Poll::Ready(Some(Ok(consumer))),
2150 None => Poll::Ready(None),
2151 }
2152 } else {
2153 Poll::Ready(None)
2154 }
2155 }
2156 std::task::Poll::Pending => std::task::Poll::Pending,
2157 },
2158 None => {
2159 if let Some(stream) = self.consumers.pop() {
2160 Poll::Ready(Some(Ok(stream)))
2161 } else {
2162 if self.done {
2163 return Poll::Ready(None);
2164 }
2165 let context = self.context.clone();
2166 let offset = self.offset;
2167 let stream = self.stream.clone();
2168 self.page_request = Some(Box::pin(async move {
2169 match context
2170 .request(
2171 format!("CONSUMER.LIST.{stream}"),
2172 &json!({
2173 "offset": offset,
2174 }),
2175 )
2176 .await?
2177 {
2178 Response::Err { error } => Err(RequestError::with_source(
2179 super::context::RequestErrorKind::Other,
2180 error,
2181 )),
2182 Response::Ok(page) => Ok(page),
2183 }
2184 }));
2185 self.poll_next(cx)
2186 }
2187 }
2188 }
2189 }
2190}
2191
2192#[derive(Clone, Debug, PartialEq)]
2193pub enum LastRawMessageErrorKind {
2194 NoMessageFound,
2195 InvalidSubject,
2196 JetStream(super::errors::Error),
2197 Other,
2198}
2199
2200impl Display for LastRawMessageErrorKind {
2201 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2202 match self {
2203 Self::NoMessageFound => write!(f, "no message found"),
2204 Self::InvalidSubject => write!(f, "invalid subject"),
2205 Self::Other => write!(f, "failed to get last raw message"),
2206 Self::JetStream(err) => write!(f, "JetStream error: {}", err),
2207 }
2208 }
2209}
2210
2211pub type LastRawMessageError = Error<LastRawMessageErrorKind>;
2212pub type RawMessageErrorKind = LastRawMessageErrorKind;
2213pub type RawMessageError = LastRawMessageError;
2214
2215#[derive(Clone, Debug, PartialEq)]
2216pub enum ConsumerErrorKind {
2217 TimedOut,
2219 Request,
2220 InvalidConsumerType,
2221 InvalidName,
2222 JetStream(super::errors::Error),
2223 Other,
2224}
2225
2226impl Display for ConsumerErrorKind {
2227 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2228 match self {
2229 Self::TimedOut => write!(f, "timed out"),
2230 Self::Request => write!(f, "request failed"),
2231 Self::JetStream(err) => write!(f, "JetStream error: {}", err),
2232 Self::Other => write!(f, "consumer error"),
2233 Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2234 Self::InvalidName => write!(f, "invalid consumer name"),
2235 }
2236 }
2237}
2238
2239pub type ConsumerError = Error<ConsumerErrorKind>;
2240
2241#[derive(Clone, Debug, PartialEq)]
2242pub enum ConsumerCreateStrictErrorKind {
2243 TimedOut,
2245 Request,
2246 InvalidConsumerType,
2247 InvalidName,
2248 AlreadyExists,
2249 JetStream(super::errors::Error),
2250 Other,
2251}
2252
2253impl Display for ConsumerCreateStrictErrorKind {
2254 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2255 match self {
2256 Self::TimedOut => write!(f, "timed out"),
2257 Self::Request => write!(f, "request failed"),
2258 Self::JetStream(err) => write!(f, "JetStream error: {}", err),
2259 Self::Other => write!(f, "consumer error"),
2260 Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2261 Self::InvalidName => write!(f, "invalid consumer name"),
2262 Self::AlreadyExists => write!(f, "consumer already exists"),
2263 }
2264 }
2265}
2266
2267pub type ConsumerCreateStrictError = Error<ConsumerCreateStrictErrorKind>;
2268
2269#[derive(Clone, Debug, PartialEq)]
2270pub enum ConsumerUpdateErrorKind {
2271 TimedOut,
2273 Request,
2274 InvalidConsumerType,
2275 InvalidName,
2276 DoesNotExist,
2277 JetStream(super::errors::Error),
2278 Other,
2279}
2280
2281impl Display for ConsumerUpdateErrorKind {
2282 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2283 match self {
2284 Self::TimedOut => write!(f, "timed out"),
2285 Self::Request => write!(f, "request failed"),
2286 Self::JetStream(err) => write!(f, "JetStream error: {}", err),
2287 Self::Other => write!(f, "consumer error"),
2288 Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2289 Self::InvalidName => write!(f, "invalid consumer name"),
2290 Self::DoesNotExist => write!(f, "consumer does not exist"),
2291 }
2292 }
2293}
2294
2295pub type ConsumerUpdateError = Error<ConsumerUpdateErrorKind>;
2296
2297impl From<super::errors::Error> for ConsumerError {
2298 fn from(err: super::errors::Error) -> Self {
2299 ConsumerError::new(ConsumerErrorKind::JetStream(err))
2300 }
2301}
2302impl From<super::errors::Error> for ConsumerCreateStrictError {
2303 fn from(err: super::errors::Error) -> Self {
2304 if err.error_code() == super::errors::ErrorCode::CONSUMER_ALREADY_EXISTS {
2305 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::AlreadyExists)
2306 } else {
2307 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::JetStream(err))
2308 }
2309 }
2310}
2311impl From<super::errors::Error> for ConsumerUpdateError {
2312 fn from(err: super::errors::Error) -> Self {
2313 if err.error_code() == super::errors::ErrorCode::CONSUMER_DOES_NOT_EXIST {
2314 ConsumerUpdateError::new(ConsumerUpdateErrorKind::DoesNotExist)
2315 } else {
2316 ConsumerUpdateError::new(ConsumerUpdateErrorKind::JetStream(err))
2317 }
2318 }
2319}
2320impl From<ConsumerError> for ConsumerUpdateError {
2321 fn from(err: ConsumerError) -> Self {
2322 match err.kind() {
2323 ConsumerErrorKind::JetStream(err) => {
2324 if err.error_code() == super::errors::ErrorCode::CONSUMER_DOES_NOT_EXIST {
2325 ConsumerUpdateError::new(ConsumerUpdateErrorKind::DoesNotExist)
2326 } else {
2327 ConsumerUpdateError::new(ConsumerUpdateErrorKind::JetStream(err))
2328 }
2329 }
2330 ConsumerErrorKind::Request => {
2331 ConsumerUpdateError::new(ConsumerUpdateErrorKind::Request)
2332 }
2333 ConsumerErrorKind::TimedOut => {
2334 ConsumerUpdateError::new(ConsumerUpdateErrorKind::TimedOut)
2335 }
2336 ConsumerErrorKind::InvalidConsumerType => {
2337 ConsumerUpdateError::new(ConsumerUpdateErrorKind::InvalidConsumerType)
2338 }
2339 ConsumerErrorKind::InvalidName => {
2340 ConsumerUpdateError::new(ConsumerUpdateErrorKind::InvalidName)
2341 }
2342 ConsumerErrorKind::Other => ConsumerUpdateError::new(ConsumerUpdateErrorKind::Other),
2343 }
2344 }
2345}
2346
2347impl From<ConsumerError> for ConsumerCreateStrictError {
2348 fn from(err: ConsumerError) -> Self {
2349 match err.kind() {
2350 ConsumerErrorKind::JetStream(err) => {
2351 if err.error_code() == super::errors::ErrorCode::CONSUMER_ALREADY_EXISTS {
2352 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::AlreadyExists)
2353 } else {
2354 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::JetStream(err))
2355 }
2356 }
2357 ConsumerErrorKind::Request => {
2358 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::Request)
2359 }
2360 ConsumerErrorKind::TimedOut => {
2361 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::TimedOut)
2362 }
2363 ConsumerErrorKind::InvalidConsumerType => {
2364 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::InvalidConsumerType)
2365 }
2366 ConsumerErrorKind::InvalidName => {
2367 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::InvalidName)
2368 }
2369 ConsumerErrorKind::Other => {
2370 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::Other)
2371 }
2372 }
2373 }
2374}
2375
2376impl From<super::context::RequestError> for ConsumerError {
2377 fn from(err: super::context::RequestError) -> Self {
2378 match err.kind() {
2379 RequestErrorKind::TimedOut => ConsumerError::new(ConsumerErrorKind::TimedOut),
2380 _ => ConsumerError::with_source(ConsumerErrorKind::Request, err),
2381 }
2382 }
2383}
2384impl From<super::context::RequestError> for ConsumerUpdateError {
2385 fn from(err: super::context::RequestError) -> Self {
2386 match err.kind() {
2387 RequestErrorKind::TimedOut => {
2388 ConsumerUpdateError::new(ConsumerUpdateErrorKind::TimedOut)
2389 }
2390 _ => ConsumerUpdateError::with_source(ConsumerUpdateErrorKind::Request, err),
2391 }
2392 }
2393}
2394impl From<super::context::RequestError> for ConsumerCreateStrictError {
2395 fn from(err: super::context::RequestError) -> Self {
2396 match err.kind() {
2397 RequestErrorKind::TimedOut => {
2398 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::TimedOut)
2399 }
2400 _ => {
2401 ConsumerCreateStrictError::with_source(ConsumerCreateStrictErrorKind::Request, err)
2402 }
2403 }
2404 }
2405}
2406
2407#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
2408pub struct StreamGetMessage {
2409 #[serde(rename = "seq", skip_serializing_if = "is_default")]
2410 sequence: Option<u64>,
2411 #[serde(rename = "next_by_subj", skip_serializing_if = "is_default")]
2412 next_by_subject: Option<String>,
2413 #[serde(rename = "last_by_subj", skip_serializing_if = "is_default")]
2414 last_by_subject: Option<String>,
2415}
2416
2417#[cfg(test)]
2418mod tests {
2419 use super::*;
2420
2421 #[test]
2422 fn consumer_limits_de() {
2423 let config = Config {
2424 ..Default::default()
2425 };
2426
2427 let roundtrip: Config = {
2428 let ser = serde_json::to_string(&config).unwrap();
2429 serde_json::from_str(&ser).unwrap()
2430 };
2431 assert_eq!(config, roundtrip);
2432 }
2433}