1use std::{
17 collections::{self, HashMap},
18 fmt::{self, Debug, Display},
19 future::IntoFuture,
20 io::{self, ErrorKind},
21 pin::Pin,
22 str::FromStr,
23 task::Poll,
24 time::Duration,
25};
26
27use crate::{
28 error::Error, header::HeaderName, is_valid_subject, HeaderMap, HeaderValue, StatusCode,
29};
30use base64::engine::general_purpose::STANDARD;
31use base64::engine::Engine;
32use bytes::Bytes;
33use futures::{future::BoxFuture, FutureExt, TryFutureExt};
34use serde::{Deserialize, Deserializer, Serialize};
35use serde_json::json;
36use time::{serde::rfc3339, OffsetDateTime};
37
38use super::{
39 consumer::{self, Consumer, FromConsumer, IntoConsumerConfig},
40 context::{RequestError, RequestErrorKind, StreamsError, StreamsErrorKind},
41 errors::ErrorCode,
42 message::{StreamMessage, StreamMessageError},
43 response::Response,
44 Context, Message,
45};
46
47pub type InfoError = RequestError;
48
49#[derive(Clone, Debug, PartialEq)]
50pub enum DirectGetErrorKind {
51 NotFound,
52 InvalidSubject,
53 TimedOut,
54 Request,
55 ErrorResponse(StatusCode, String),
56 Other,
57}
58
59impl Display for DirectGetErrorKind {
60 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61 match self {
62 Self::InvalidSubject => write!(f, "invalid subject"),
63 Self::NotFound => write!(f, "message not found"),
64 Self::ErrorResponse(status, description) => {
65 write!(f, "unable to get message: {} {}", status, description)
66 }
67 Self::Other => write!(f, "error getting message"),
68 Self::TimedOut => write!(f, "timed out"),
69 Self::Request => write!(f, "request failed"),
70 }
71 }
72}
73
74pub type DirectGetError = Error<DirectGetErrorKind>;
75
76impl From<crate::RequestError> for DirectGetError {
77 fn from(err: crate::RequestError) -> Self {
78 match err.kind() {
79 crate::RequestErrorKind::TimedOut => DirectGetError::new(DirectGetErrorKind::TimedOut),
80 crate::RequestErrorKind::NoResponders => {
81 DirectGetError::new(DirectGetErrorKind::ErrorResponse(
82 StatusCode::NO_RESPONDERS,
83 "no responders".to_string(),
84 ))
85 }
86 crate::RequestErrorKind::Other => {
87 DirectGetError::with_source(DirectGetErrorKind::Other, err)
88 }
89 }
90 }
91}
92
93impl From<serde_json::Error> for DirectGetError {
94 fn from(err: serde_json::Error) -> Self {
95 DirectGetError::with_source(DirectGetErrorKind::Other, err)
96 }
97}
98
99impl From<StreamMessageError> for DirectGetError {
100 fn from(err: StreamMessageError) -> Self {
101 DirectGetError::with_source(DirectGetErrorKind::Other, err)
102 }
103}
104
105#[derive(Clone, Debug, PartialEq)]
106pub enum DeleteMessageErrorKind {
107 Request,
108 TimedOut,
109 JetStream(super::errors::Error),
110}
111
112impl Display for DeleteMessageErrorKind {
113 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
114 match self {
115 Self::Request => write!(f, "request failed"),
116 Self::TimedOut => write!(f, "timed out"),
117 Self::JetStream(err) => write!(f, "JetStream error: {}", err),
118 }
119 }
120}
121
122pub type DeleteMessageError = Error<DeleteMessageErrorKind>;
123
124#[derive(Debug, Clone)]
128pub struct Stream<T = Info> {
129 pub(crate) info: T,
130 pub(crate) context: Context,
131 pub(crate) name: String,
132}
133
134impl Stream<Info> {
135 pub async fn info(&mut self) -> Result<&Info, InfoError> {
153 let subject = format!("STREAM.INFO.{}", self.info.config.name);
154
155 match self.context.request(subject, &json!({})).await? {
156 Response::Ok::<Info>(info) => {
157 self.info = info;
158 Ok(&self.info)
159 }
160 Response::Err { error } => Err(error.into()),
161 }
162 }
163
164 pub fn cached_info(&self) -> &Info {
183 &self.info
184 }
185}
186
187impl<I> Stream<I> {
188 pub async fn get_info(&self) -> Result<Info, InfoError> {
191 let subject = format!("STREAM.INFO.{}", self.name);
192
193 match self.context.request(subject, &json!({})).await? {
194 Response::Ok::<Info>(info) => Ok(info),
195 Response::Err { error } => Err(error.into()),
196 }
197 }
198
199 pub async fn info_with_subjects<F: AsRef<str>>(
222 &self,
223 subjects_filter: F,
224 ) -> Result<InfoWithSubjects, InfoError> {
225 let subjects_filter = subjects_filter.as_ref().to_string();
226 let info = stream_info_with_details(
228 self.context.clone(),
229 self.name.clone(),
230 0,
231 false,
232 subjects_filter.clone(),
233 )
234 .await?;
235
236 Ok(InfoWithSubjects::new(
237 self.context.clone(),
238 info,
239 subjects_filter,
240 ))
241 }
242
243 pub fn info_builder(&self) -> StreamInfoBuilder {
269 StreamInfoBuilder::new(self.context.clone(), self.name.clone())
270 }
271
272 pub async fn direct_get_next_for_subject<T: AsRef<str>>(
307 &self,
308 subject: T,
309 sequence: Option<u64>,
310 ) -> Result<Message, DirectGetError> {
311 if !is_valid_subject(&subject) {
312 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject));
313 }
314 let request_subject = format!("{}.DIRECT.GET.{}", &self.context.prefix, &self.name);
315 let payload;
316 if let Some(sequence) = sequence {
317 payload = json!({
318 "seq": sequence,
319 "next_by_subj": subject.as_ref(),
320 });
321 } else {
322 payload = json!({
323 "next_by_subj": subject.as_ref(),
324 });
325 }
326
327 let response = self
328 .context
329 .client
330 .request(
331 request_subject,
332 serde_json::to_vec(&payload).map(Bytes::from)?,
333 )
334 .await
335 .map(|message| Message {
336 message,
337 context: self.context.clone(),
338 })?;
339
340 if let Some(status) = response.status {
341 if let Some(ref description) = response.description {
342 match status {
343 StatusCode::NOT_FOUND => {
344 return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
345 }
346 StatusCode::TIMEOUT => {
348 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
349 }
350 _ => {
351 return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
352 status,
353 description.to_string(),
354 )));
355 }
356 }
357 }
358 }
359 Ok(response)
360 }
361
362 pub async fn direct_get_first_for_subject<T: AsRef<str>>(
394 &self,
395 subject: T,
396 ) -> Result<Message, DirectGetError> {
397 if !is_valid_subject(&subject) {
398 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject));
399 }
400 let request_subject = format!("{}.DIRECT.GET.{}", &self.context.prefix, &self.name);
401 let payload = json!({
402 "next_by_subj": subject.as_ref(),
403 });
404
405 let response = self
406 .context
407 .client
408 .request(
409 request_subject,
410 serde_json::to_vec(&payload).map(Bytes::from)?,
411 )
412 .await
413 .map(|message| Message {
414 message,
415 context: self.context.clone(),
416 })?;
417 if let Some(status) = response.status {
418 if let Some(ref description) = response.description {
419 match status {
420 StatusCode::NOT_FOUND => {
421 return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
422 }
423 StatusCode::TIMEOUT => {
425 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
426 }
427 _ => {
428 return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
429 status,
430 description.to_string(),
431 )));
432 }
433 }
434 }
435 }
436 Ok(response)
437 }
438
439 pub async fn direct_get(&self, sequence: u64) -> Result<StreamMessage, DirectGetError> {
471 let subject = format!("{}.DIRECT.GET.{}", &self.context.prefix, &self.name);
472 let payload = json!({
473 "seq": sequence,
474 });
475
476 let response = self
477 .context
478 .client
479 .request(subject, serde_json::to_vec(&payload).map(Bytes::from)?)
480 .await?;
481
482 if let Some(status) = response.status {
483 if let Some(ref description) = response.description {
484 match status {
485 StatusCode::NOT_FOUND => {
486 return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
487 }
488 StatusCode::TIMEOUT => {
490 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
491 }
492 _ => {
493 return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
494 status,
495 description.to_string(),
496 )));
497 }
498 }
499 }
500 }
501 StreamMessage::try_from(response).map_err(Into::into)
502 }
503
504 pub async fn direct_get_last_for_subject<T: AsRef<str>>(
536 &self,
537 subject: T,
538 ) -> Result<StreamMessage, DirectGetError> {
539 let subject = format!(
540 "{}.DIRECT.GET.{}.{}",
541 &self.context.prefix,
542 &self.name,
543 subject.as_ref()
544 );
545
546 let response = self.context.client.request(subject, "".into()).await?;
547 if let Some(status) = response.status {
548 if let Some(ref description) = response.description {
549 match status {
550 StatusCode::NOT_FOUND => {
551 return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
552 }
553 StatusCode::TIMEOUT => {
555 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
556 }
557 _ => {
558 return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
559 status,
560 description.to_string(),
561 )));
562 }
563 }
564 }
565 }
566 StreamMessage::try_from(response).map_err(Into::into)
567 }
568 pub async fn get_raw_message(&self, sequence: u64) -> Result<StreamMessage, RawMessageError> {
598 self.raw_message(StreamGetMessage {
599 sequence: Some(sequence),
600 last_by_subject: None,
601 next_by_subject: None,
602 })
603 .await
604 }
605
606 pub async fn get_first_raw_message_by_subject<T: AsRef<str>>(
630 &self,
631 subject: T,
632 sequence: u64,
633 ) -> Result<StreamMessage, RawMessageError> {
634 self.raw_message(StreamGetMessage {
635 sequence: Some(sequence),
636 last_by_subject: None,
637 next_by_subject: Some(subject.as_ref().to_string()),
638 })
639 .await
640 }
641
642 pub async fn get_next_raw_message_by_subject<T: AsRef<str>>(
666 &self,
667 subject: T,
668 ) -> Result<StreamMessage, RawMessageError> {
669 self.raw_message(StreamGetMessage {
670 sequence: None,
671 last_by_subject: None,
672 next_by_subject: Some(subject.as_ref().to_string()),
673 })
674 .await
675 }
676
677 async fn raw_message(
678 &self,
679 request: StreamGetMessage,
680 ) -> Result<StreamMessage, RawMessageError> {
681 for subject in [&request.last_by_subject, &request.next_by_subject]
682 .into_iter()
683 .flatten()
684 {
685 if !is_valid_subject(subject) {
686 return Err(RawMessageError::new(RawMessageErrorKind::InvalidSubject));
687 }
688 }
689 let subject = format!("STREAM.MSG.GET.{}", &self.name);
690
691 let response: Response<GetRawMessage> = self
692 .context
693 .request(subject, &request)
694 .map_err(|err| LastRawMessageError::with_source(LastRawMessageErrorKind::Other, err))
695 .await?;
696
697 match response {
698 Response::Err { error } => {
699 if error.error_code() == ErrorCode::NO_MESSAGE_FOUND {
700 Err(LastRawMessageError::new(
701 LastRawMessageErrorKind::NoMessageFound,
702 ))
703 } else {
704 Err(LastRawMessageError::new(
705 LastRawMessageErrorKind::JetStream(error),
706 ))
707 }
708 }
709 Response::Ok(value) => StreamMessage::try_from(value.message)
710 .map_err(|err| RawMessageError::with_source(RawMessageErrorKind::Other, err)),
711 }
712 }
713
714 pub async fn get_last_raw_message_by_subject(
738 &self,
739 stream_subject: &str,
740 ) -> Result<StreamMessage, LastRawMessageError> {
741 self.raw_message(StreamGetMessage {
742 sequence: None,
743 last_by_subject: Some(stream_subject.to_string()),
744 next_by_subject: None,
745 })
746 .await
747 }
748
749 pub async fn delete_message(&self, sequence: u64) -> Result<bool, DeleteMessageError> {
773 let subject = format!("STREAM.MSG.DELETE.{}", &self.name);
774 let payload = json!({
775 "seq": sequence,
776 });
777
778 let response: Response<DeleteStatus> = self
779 .context
780 .request(subject, &payload)
781 .map_err(|err| match err.kind() {
782 RequestErrorKind::TimedOut => {
783 DeleteMessageError::new(DeleteMessageErrorKind::TimedOut)
784 }
785 _ => DeleteMessageError::with_source(DeleteMessageErrorKind::Request, err),
786 })
787 .await?;
788
789 match response {
790 Response::Err { error } => Err(DeleteMessageError::new(
791 DeleteMessageErrorKind::JetStream(error),
792 )),
793 Response::Ok(value) => Ok(value.success),
794 }
795 }
796
797 pub fn purge(&self) -> Purge<No, No> {
813 Purge::build(self)
814 }
815
816 #[deprecated(
833 since = "0.25.0",
834 note = "Overloads have been replaced with an into_future based builder. Use Stream::purge().filter(subject) instead."
835 )]
836 pub async fn purge_subject<T>(&self, subject: T) -> Result<PurgeResponse, PurgeError>
837 where
838 T: Into<String>,
839 {
840 self.purge().filter(subject).await
841 }
842
843 pub async fn create_consumer<C: IntoConsumerConfig + FromConsumer>(
867 &self,
868 config: C,
869 ) -> Result<Consumer<C>, ConsumerError> {
870 self.context
871 .create_consumer_on_stream(config, self.name.clone())
872 .await
873 }
874
875 #[cfg(feature = "server_2_10")]
899 pub async fn update_consumer<C: IntoConsumerConfig + FromConsumer>(
900 &self,
901 config: C,
902 ) -> Result<Consumer<C>, ConsumerUpdateError> {
903 self.context
904 .update_consumer_on_stream(config, self.name.clone())
905 .await
906 }
907
908 #[cfg(feature = "server_2_10")]
933 pub async fn create_consumer_strict<C: IntoConsumerConfig + FromConsumer>(
934 &self,
935 config: C,
936 ) -> Result<Consumer<C>, ConsumerCreateStrictError> {
937 self.context
938 .create_consumer_strict_on_stream(config, self.name.clone())
939 .await
940 }
941
942 pub async fn consumer_info<T: AsRef<str>>(
959 &self,
960 name: T,
961 ) -> Result<consumer::Info, crate::Error> {
962 let name = name.as_ref();
963
964 let subject = format!("CONSUMER.INFO.{}.{}", self.name, name);
965
966 match self.context.request(subject, &json!({})).await? {
967 Response::Ok(info) => Ok(info),
968 Response::Err { error } => Err(Box::new(std::io::Error::new(
969 ErrorKind::Other,
970 format!("nats: error while getting consumer info: {}", error),
971 ))),
972 }
973 }
974
975 pub async fn get_consumer<T: FromConsumer + IntoConsumerConfig>(
994 &self,
995 name: &str,
996 ) -> Result<Consumer<T>, crate::Error> {
997 let info = self.consumer_info(name).await?;
998
999 Ok(Consumer::new(
1000 T::try_from_consumer_config(info.config.clone())?,
1001 info,
1002 self.context.clone(),
1003 ))
1004 }
1005
1006 pub async fn get_or_create_consumer<T: FromConsumer + IntoConsumerConfig>(
1034 &self,
1035 name: &str,
1036 config: T,
1037 ) -> Result<Consumer<T>, ConsumerError> {
1038 let subject = format!("CONSUMER.INFO.{}.{}", self.name, name);
1039
1040 match self.context.request(subject, &json!({})).await? {
1041 Response::Err { error } if error.code() == 404 => self.create_consumer(config).await,
1042 Response::Err { error } => Err(error.into()),
1043 Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
1044 T::try_from_consumer_config(info.config.clone()).map_err(|err| {
1045 ConsumerError::with_source(ConsumerErrorKind::InvalidConsumerType, err)
1046 })?,
1047 info,
1048 self.context.clone(),
1049 )),
1050 }
1051 }
1052
1053 pub async fn delete_consumer(&self, name: &str) -> Result<DeleteStatus, ConsumerError> {
1074 let subject = format!("CONSUMER.DELETE.{}.{}", self.name, name);
1075
1076 match self.context.request(subject, &json!({})).await? {
1077 Response::Ok(delete_status) => Ok(delete_status),
1078 Response::Err { error } => Err(error.into()),
1079 }
1080 }
1081
1082 #[cfg(feature = "server_2_11")]
1106 pub async fn pause_consumer(
1107 &self,
1108 name: &str,
1109 pause_until: OffsetDateTime,
1110 ) -> Result<PauseResponse, ConsumerError> {
1111 self.request_pause_consumer(name, Some(pause_until)).await
1112 }
1113
1114 #[cfg(feature = "server_2_11")]
1135 pub async fn resume_consumer(&self, name: &str) -> Result<PauseResponse, ConsumerError> {
1136 self.request_pause_consumer(name, None).await
1137 }
1138
1139 #[cfg(feature = "server_2_11")]
1140 async fn request_pause_consumer(
1141 &self,
1142 name: &str,
1143 pause_until: Option<OffsetDateTime>,
1144 ) -> Result<PauseResponse, ConsumerError> {
1145 let subject = format!("CONSUMER.PAUSE.{}.{}", self.name, name);
1146 let payload = &PauseResumeConsumerRequest { pause_until };
1147
1148 match self.context.request(subject, payload).await? {
1149 Response::Ok::<PauseResponse>(resp) => Ok(resp),
1150 Response::Err { error } => Err(error.into()),
1151 }
1152 }
1153
1154 pub fn consumer_names(&self) -> ConsumerNames {
1173 ConsumerNames {
1174 context: self.context.clone(),
1175 stream: self.name.clone(),
1176 offset: 0,
1177 page_request: None,
1178 consumers: Vec::new(),
1179 done: false,
1180 }
1181 }
1182
1183 pub fn consumers(&self) -> Consumers {
1202 Consumers {
1203 context: self.context.clone(),
1204 stream: self.name.clone(),
1205 offset: 0,
1206 page_request: None,
1207 consumers: Vec::new(),
1208 done: false,
1209 }
1210 }
1211}
1212
1213pub struct StreamInfoBuilder {
1214 pub(crate) context: Context,
1215 pub(crate) name: String,
1216 pub(crate) deleted: bool,
1217 pub(crate) subject: String,
1218}
1219
1220impl StreamInfoBuilder {
1221 fn new(context: Context, name: String) -> Self {
1222 Self {
1223 context,
1224 name,
1225 deleted: false,
1226 subject: "".to_string(),
1227 }
1228 }
1229
1230 pub fn with_deleted(mut self, deleted: bool) -> Self {
1231 self.deleted = deleted;
1232 self
1233 }
1234
1235 pub fn subjects<S: Into<String>>(mut self, subject: S) -> Self {
1236 self.subject = subject.into();
1237 self
1238 }
1239
1240 pub async fn fetch(self) -> Result<InfoWithSubjects, InfoError> {
1241 let info = stream_info_with_details(
1242 self.context.clone(),
1243 self.name.clone(),
1244 0,
1245 self.deleted,
1246 self.subject.clone(),
1247 )
1248 .await?;
1249
1250 Ok(InfoWithSubjects::new(self.context, info, self.subject))
1251 }
1252}
1253
1254#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
1258pub struct Config {
1259 pub name: String,
1261 pub max_bytes: i64,
1263 #[serde(rename = "max_msgs")]
1265 pub max_messages: i64,
1266 #[serde(rename = "max_msgs_per_subject")]
1268 pub max_messages_per_subject: i64,
1269 pub discard: DiscardPolicy,
1272 #[serde(default, skip_serializing_if = "is_default")]
1274 pub discard_new_per_subject: bool,
1275 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1278 pub subjects: Vec<String>,
1279 pub retention: RetentionPolicy,
1281 pub max_consumers: i32,
1283 #[serde(with = "serde_nanos")]
1285 pub max_age: Duration,
1286 #[serde(default, skip_serializing_if = "is_default", rename = "max_msg_size")]
1288 pub max_message_size: i32,
1289 pub storage: StorageType,
1291 pub num_replicas: usize,
1293 #[serde(default, skip_serializing_if = "is_default")]
1295 pub no_ack: bool,
1296 #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
1298 pub duplicate_window: Duration,
1299 #[serde(default, skip_serializing_if = "is_default")]
1301 pub template_owner: String,
1302 #[serde(default, skip_serializing_if = "is_default")]
1304 pub sealed: bool,
1305 #[serde(default, skip_serializing_if = "is_default")]
1307 pub description: Option<String>,
1308 #[serde(
1309 default,
1310 rename = "allow_rollup_hdrs",
1311 skip_serializing_if = "is_default"
1312 )]
1313 pub allow_rollup: bool,
1315 #[serde(default, skip_serializing_if = "is_default")]
1316 pub deny_delete: bool,
1318 #[serde(default, skip_serializing_if = "is_default")]
1320 pub deny_purge: bool,
1321
1322 #[serde(default, skip_serializing_if = "is_default")]
1324 pub republish: Option<Republish>,
1325
1326 #[serde(default, skip_serializing_if = "is_default")]
1329 pub allow_direct: bool,
1330
1331 #[serde(default, skip_serializing_if = "is_default")]
1333 pub mirror_direct: bool,
1334
1335 #[serde(default, skip_serializing_if = "Option::is_none")]
1337 pub mirror: Option<Source>,
1338
1339 #[serde(default, skip_serializing_if = "Option::is_none")]
1341 pub sources: Option<Vec<Source>>,
1342
1343 #[cfg(feature = "server_2_10")]
1344 #[serde(default, skip_serializing_if = "is_default")]
1346 pub metadata: HashMap<String, String>,
1347
1348 #[cfg(feature = "server_2_10")]
1349 #[serde(default, skip_serializing_if = "Option::is_none")]
1351 pub subject_transform: Option<SubjectTransform>,
1352
1353 #[cfg(feature = "server_2_10")]
1354 #[serde(default, skip_serializing_if = "Option::is_none")]
1359 pub compression: Option<Compression>,
1360 #[cfg(feature = "server_2_10")]
1361 #[serde(default, deserialize_with = "default_consumer_limits_as_none")]
1363 pub consumer_limits: Option<ConsumerLimits>,
1364
1365 #[cfg(feature = "server_2_10")]
1366 #[serde(default, skip_serializing_if = "Option::is_none", rename = "first_seq")]
1368 pub first_sequence: Option<u64>,
1369
1370 #[serde(default, skip_serializing_if = "Option::is_none")]
1372 pub placement: Option<Placement>,
1373 #[cfg(feature = "server_2_11")]
1375 #[serde(
1376 default,
1377 with = "rfc3339::option",
1378 skip_serializing_if = "Option::is_none"
1379 )]
1380 pub pause_until: Option<OffsetDateTime>,
1381
1382 #[cfg(feature = "server_2_11")]
1384 #[serde(default, skip_serializing_if = "is_default", rename = "allow_msg_ttl")]
1385 pub allow_message_ttl: bool,
1386
1387 #[cfg(feature = "server_2_11")]
1390 #[serde(default, skip_serializing_if = "Option::is_none", with = "serde_nanos")]
1391 pub subject_delete_marker_ttl: Option<Duration>,
1392}
1393
1394impl From<&Config> for Config {
1395 fn from(sc: &Config) -> Config {
1396 sc.clone()
1397 }
1398}
1399
1400impl From<&str> for Config {
1401 fn from(s: &str) -> Config {
1402 Config {
1403 name: s.to_string(),
1404 ..Default::default()
1405 }
1406 }
1407}
1408
1409#[cfg(feature = "server_2_10")]
1410fn default_consumer_limits_as_none<'de, D>(
1411 deserializer: D,
1412) -> Result<Option<ConsumerLimits>, D::Error>
1413where
1414 D: Deserializer<'de>,
1415{
1416 let consumer_limits = Option::<ConsumerLimits>::deserialize(deserializer)?;
1417 if let Some(cl) = consumer_limits {
1418 if cl == ConsumerLimits::default() {
1419 Ok(None)
1420 } else {
1421 Ok(Some(cl))
1422 }
1423 } else {
1424 Ok(None)
1425 }
1426}
1427#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Default)]
1428pub struct ConsumerLimits {
1429 #[serde(default, with = "serde_nanos")]
1431 pub inactive_threshold: std::time::Duration,
1432 #[serde(default)]
1434 pub max_ack_pending: i64,
1435}
1436
1437#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1438pub enum Compression {
1439 #[serde(rename = "s2")]
1440 S2,
1441 #[serde(rename = "none")]
1442 None,
1443}
1444
1445#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1447pub struct SubjectTransform {
1448 #[serde(rename = "src")]
1449 pub source: String,
1450
1451 #[serde(rename = "dest")]
1452 pub destination: String,
1453}
1454
1455#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1457pub struct Republish {
1458 #[serde(rename = "src")]
1460 pub source: String,
1461 #[serde(rename = "dest")]
1463 pub destination: String,
1464 #[serde(default)]
1466 pub headers_only: bool,
1467}
1468
1469#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
1471pub struct Placement {
1472 #[serde(default, skip_serializing_if = "is_default")]
1474 pub cluster: Option<String>,
1475 #[serde(default, skip_serializing_if = "is_default")]
1477 pub tags: Vec<String>,
1478}
1479
1480#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1483#[repr(u8)]
1484pub enum DiscardPolicy {
1485 #[default]
1487 #[serde(rename = "old")]
1488 Old = 0,
1489 #[serde(rename = "new")]
1491 New = 1,
1492}
1493
1494#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1496#[repr(u8)]
1497pub enum RetentionPolicy {
1498 #[default]
1501 #[serde(rename = "limits")]
1502 Limits = 0,
1503 #[serde(rename = "interest")]
1505 Interest = 1,
1506 #[serde(rename = "workqueue")]
1508 WorkQueue = 2,
1509}
1510
1511#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1513#[repr(u8)]
1514pub enum StorageType {
1515 #[default]
1517 #[serde(rename = "file")]
1518 File = 0,
1519 #[serde(rename = "memory")]
1521 Memory = 1,
1522}
1523
1524async fn stream_info_with_details(
1525 context: Context,
1526 stream: String,
1527 offset: usize,
1528 deleted_details: bool,
1529 subjects_filter: String,
1530) -> Result<Info, InfoError> {
1531 let subject = format!("STREAM.INFO.{}", stream);
1532
1533 let payload = StreamInfoRequest {
1534 offset,
1535 deleted_details,
1536 subjects_filter,
1537 };
1538
1539 let response: Response<Info> = context.request(subject, &payload).await?;
1540
1541 match response {
1542 Response::Ok(info) => Ok(info),
1543 Response::Err { error } => Err(error.into()),
1544 }
1545}
1546
1547type InfoRequest = BoxFuture<'static, Result<Info, InfoError>>;
1548
1549#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
1550pub struct StreamInfoRequest {
1551 offset: usize,
1552 deleted_details: bool,
1553 subjects_filter: String,
1554}
1555
1556pub struct InfoWithSubjects {
1557 stream: String,
1558 context: Context,
1559 pub info: Info,
1560 offset: usize,
1561 subjects: collections::hash_map::IntoIter<String, usize>,
1562 info_request: Option<InfoRequest>,
1563 subjects_filter: String,
1564 pages_done: bool,
1565}
1566
1567impl InfoWithSubjects {
1568 pub fn new(context: Context, mut info: Info, subject: String) -> Self {
1569 let subjects = info.state.subjects.take().unwrap_or_default();
1570 let name = info.config.name.clone();
1571 InfoWithSubjects {
1572 context,
1573 info,
1574 pages_done: subjects.is_empty(),
1575 offset: subjects.len(),
1576 subjects: subjects.into_iter(),
1577 subjects_filter: subject,
1578 stream: name,
1579 info_request: None,
1580 }
1581 }
1582}
1583
1584impl futures::Stream for InfoWithSubjects {
1585 type Item = Result<(String, usize), InfoError>;
1586
1587 fn poll_next(
1588 mut self: Pin<&mut Self>,
1589 cx: &mut std::task::Context<'_>,
1590 ) -> Poll<Option<Self::Item>> {
1591 match self.subjects.next() {
1592 Some((subject, count)) => Poll::Ready(Some(Ok((subject, count)))),
1593 None => {
1594 if self.pages_done {
1596 return Poll::Ready(None);
1597 }
1598 let stream = self.stream.clone();
1599 let context = self.context.clone();
1600 let subjects_filter = self.subjects_filter.clone();
1601 let offset = self.offset;
1602 match self
1603 .info_request
1604 .get_or_insert_with(|| {
1605 Box::pin(stream_info_with_details(
1606 context,
1607 stream,
1608 offset,
1609 false,
1610 subjects_filter,
1611 ))
1612 })
1613 .poll_unpin(cx)
1614 {
1615 Poll::Ready(resp) => match resp {
1616 Ok(info) => {
1617 let subjects = info.state.subjects.clone();
1618 self.offset += subjects.as_ref().map_or_else(|| 0, |s| s.len());
1619 self.info_request = None;
1620 let subjects = subjects.unwrap_or_default();
1621 self.subjects = info.state.subjects.unwrap_or_default().into_iter();
1622 let total = info.paged_info.map(|info| info.total).unwrap_or(0);
1623 if total <= self.offset || subjects.is_empty() {
1624 self.pages_done = true;
1625 }
1626 match self.subjects.next() {
1627 Some((subject, count)) => Poll::Ready(Some(Ok((subject, count)))),
1628 None => Poll::Ready(None),
1629 }
1630 }
1631 Err(err) => Poll::Ready(Some(Err(err))),
1632 },
1633 Poll::Pending => Poll::Pending,
1634 }
1635 }
1636 }
1637 }
1638}
1639
1640#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1642pub struct Info {
1643 pub config: Config,
1645 #[serde(with = "rfc3339")]
1647 pub created: time::OffsetDateTime,
1648 pub state: State,
1650 pub cluster: Option<ClusterInfo>,
1652 #[serde(default)]
1654 pub mirror: Option<SourceInfo>,
1655 #[serde(default)]
1657 pub sources: Vec<SourceInfo>,
1658 #[serde(flatten)]
1659 paged_info: Option<PagedInfo>,
1660}
1661
1662#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1663pub struct PagedInfo {
1664 offset: usize,
1665 total: usize,
1666 limit: usize,
1667}
1668
1669#[derive(Deserialize)]
1670pub struct DeleteStatus {
1671 pub success: bool,
1672}
1673
1674#[cfg(feature = "server_2_11")]
1675#[derive(Deserialize)]
1676pub struct PauseResponse {
1677 pub paused: bool,
1678 #[serde(with = "rfc3339")]
1679 pub pause_until: OffsetDateTime,
1680 #[serde(default, with = "serde_nanos")]
1681 pub pause_remaining: Option<Duration>,
1682}
1683
1684#[cfg(feature = "server_2_11")]
1685#[derive(Serialize, Debug)]
1686struct PauseResumeConsumerRequest {
1687 #[serde(with = "rfc3339::option", skip_serializing_if = "Option::is_none")]
1688 pause_until: Option<OffsetDateTime>,
1689}
1690
1691#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1693pub struct State {
1694 pub messages: u64,
1696 pub bytes: u64,
1698 #[serde(rename = "first_seq")]
1700 pub first_sequence: u64,
1701 #[serde(with = "rfc3339", rename = "first_ts")]
1703 pub first_timestamp: time::OffsetDateTime,
1704 #[serde(rename = "last_seq")]
1706 pub last_sequence: u64,
1707 #[serde(with = "rfc3339", rename = "last_ts")]
1709 pub last_timestamp: time::OffsetDateTime,
1710 pub consumer_count: usize,
1712 #[serde(default, rename = "num_subjects")]
1714 pub subjects_count: u64,
1715 #[serde(default, rename = "num_deleted")]
1717 pub deleted_count: Option<u64>,
1718 #[serde(default)]
1721 pub deleted: Option<Vec<u64>>,
1722
1723 pub(crate) subjects: Option<HashMap<String, usize>>,
1724}
1725
1726#[derive(Debug, Serialize, Deserialize, Clone)]
1728pub struct RawMessage {
1729 #[serde(rename = "subject")]
1731 pub subject: String,
1732
1733 #[serde(rename = "seq")]
1735 pub sequence: u64,
1736
1737 #[serde(default, rename = "data")]
1739 pub payload: String,
1740
1741 #[serde(default, rename = "hdrs")]
1743 pub headers: Option<String>,
1744
1745 #[serde(rename = "time", with = "rfc3339")]
1747 pub time: time::OffsetDateTime,
1748}
1749
1750impl TryFrom<RawMessage> for StreamMessage {
1751 type Error = crate::Error;
1752
1753 fn try_from(value: RawMessage) -> Result<Self, Self::Error> {
1754 let decoded_payload = STANDARD
1755 .decode(value.payload)
1756 .map_err(|err| Box::new(std::io::Error::new(ErrorKind::Other, err)))?;
1757 let decoded_headers = value
1758 .headers
1759 .map(|header| STANDARD.decode(header))
1760 .map_or(Ok(None), |v| v.map(Some))?;
1761
1762 let (headers, _, _) = decoded_headers
1763 .map_or_else(|| Ok((HeaderMap::new(), None, None)), |h| parse_headers(&h))?;
1764
1765 Ok(StreamMessage {
1766 subject: value.subject.into(),
1767 payload: decoded_payload.into(),
1768 headers,
1769 sequence: value.sequence,
1770 time: value.time,
1771 })
1772 }
1773}
1774
1775fn is_continuation(c: char) -> bool {
1776 c == ' ' || c == '\t'
1777}
1778const HEADER_LINE: &str = "NATS/1.0";
1779
1780#[allow(clippy::type_complexity)]
1781fn parse_headers(
1782 buf: &[u8],
1783) -> Result<(HeaderMap, Option<StatusCode>, Option<String>), crate::Error> {
1784 let mut headers = HeaderMap::new();
1785 let mut maybe_status: Option<StatusCode> = None;
1786 let mut maybe_description: Option<String> = None;
1787 let mut lines = if let Ok(line) = std::str::from_utf8(buf) {
1788 line.lines().peekable()
1789 } else {
1790 return Err(Box::new(std::io::Error::new(
1791 ErrorKind::Other,
1792 "invalid header",
1793 )));
1794 };
1795
1796 if let Some(line) = lines.next() {
1797 let line = line
1798 .strip_prefix(HEADER_LINE)
1799 .ok_or_else(|| {
1800 Box::new(std::io::Error::new(
1801 ErrorKind::Other,
1802 "version line does not start with NATS/1.0",
1803 ))
1804 })?
1805 .trim();
1806
1807 match line.split_once(' ') {
1808 Some((status, description)) => {
1809 if !status.is_empty() {
1810 maybe_status = Some(status.parse()?);
1811 }
1812
1813 if !description.is_empty() {
1814 maybe_description = Some(description.trim().to_string());
1815 }
1816 }
1817 None => {
1818 if !line.is_empty() {
1819 maybe_status = Some(line.parse()?);
1820 }
1821 }
1822 }
1823 } else {
1824 return Err(Box::new(std::io::Error::new(
1825 ErrorKind::Other,
1826 "expected header information not found",
1827 )));
1828 };
1829
1830 while let Some(line) = lines.next() {
1831 if line.is_empty() {
1832 continue;
1833 }
1834
1835 if let Some((k, v)) = line.split_once(':').to_owned() {
1836 let mut s = String::from(v.trim());
1837 while let Some(v) = lines.next_if(|s| s.starts_with(is_continuation)).to_owned() {
1838 s.push(' ');
1839 s.push_str(v.trim());
1840 }
1841
1842 headers.insert(
1843 HeaderName::from_str(k)?,
1844 HeaderValue::from_str(&s)
1845 .map_err(|err| Box::new(io::Error::new(ErrorKind::Other, err)))?,
1846 );
1847 } else {
1848 return Err(Box::new(std::io::Error::new(
1849 ErrorKind::Other,
1850 "malformed header line",
1851 )));
1852 }
1853 }
1854
1855 if headers.is_empty() {
1856 Ok((HeaderMap::new(), maybe_status, maybe_description))
1857 } else {
1858 Ok((headers, maybe_status, maybe_description))
1859 }
1860}
1861
1862#[derive(Debug, Serialize, Deserialize, Clone)]
1863struct GetRawMessage {
1864 pub(crate) message: RawMessage,
1865}
1866
1867fn is_default<T: Default + Eq>(t: &T) -> bool {
1868 t == &T::default()
1869}
1870#[derive(Debug, Default, Deserialize, Clone, PartialEq, Eq)]
1872pub struct ClusterInfo {
1873 pub name: Option<String>,
1875 pub leader: Option<String>,
1877 #[serde(default)]
1879 pub replicas: Vec<PeerInfo>,
1880}
1881
1882#[derive(Debug, Default, Deserialize, Clone, PartialEq, Eq)]
1884pub struct PeerInfo {
1885 pub name: String,
1887 pub current: bool,
1889 #[serde(with = "serde_nanos")]
1891 pub active: Duration,
1892 #[serde(default)]
1894 pub offline: bool,
1895 pub lag: Option<u64>,
1897}
1898
1899#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
1900pub struct SourceInfo {
1901 pub name: String,
1903 pub lag: u64,
1905 #[serde(deserialize_with = "negative_duration_as_none")]
1907 pub active: Option<std::time::Duration>,
1908 #[serde(default)]
1910 pub filter_subject: Option<String>,
1911 #[serde(default)]
1913 pub subject_transform_dest: Option<String>,
1914 #[serde(default)]
1916 pub subject_transforms: Vec<SubjectTransform>,
1917}
1918
1919fn negative_duration_as_none<'de, D>(
1920 deserializer: D,
1921) -> Result<Option<std::time::Duration>, D::Error>
1922where
1923 D: Deserializer<'de>,
1924{
1925 let n = i64::deserialize(deserializer)?;
1926 if n.is_negative() {
1927 Ok(None)
1928 } else {
1929 Ok(Some(std::time::Duration::from_nanos(n as u64)))
1930 }
1931}
1932
1933#[derive(Debug, Deserialize, Clone, Copy)]
1935pub struct PurgeResponse {
1936 pub success: bool,
1938 pub purged: u64,
1940}
1941#[derive(Default, Debug, Serialize, Clone)]
1943pub struct PurgeRequest {
1944 #[serde(default, rename = "seq", skip_serializing_if = "is_default")]
1946 pub sequence: Option<u64>,
1947
1948 #[serde(default, skip_serializing_if = "is_default")]
1950 pub filter: Option<String>,
1951
1952 #[serde(default, skip_serializing_if = "is_default")]
1954 pub keep: Option<u64>,
1955}
1956
1957#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
1958pub struct Source {
1959 pub name: String,
1961 #[serde(default, rename = "opt_start_seq", skip_serializing_if = "is_default")]
1963 pub start_sequence: Option<u64>,
1964 #[serde(
1965 default,
1966 rename = "opt_start_time",
1967 skip_serializing_if = "is_default",
1968 with = "rfc3339::option"
1969 )]
1970 pub start_time: Option<OffsetDateTime>,
1972 #[serde(default, skip_serializing_if = "is_default")]
1974 pub filter_subject: Option<String>,
1975 #[serde(default, skip_serializing_if = "Option::is_none")]
1977 pub external: Option<External>,
1978 #[serde(default, skip_serializing_if = "is_default")]
1980 pub domain: Option<String>,
1981 #[cfg(feature = "server_2_10")]
1983 #[serde(default, skip_serializing_if = "is_default")]
1984 pub subject_transforms: Vec<SubjectTransform>,
1985}
1986
1987#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
1988pub struct External {
1989 #[serde(rename = "api")]
1991 pub api_prefix: String,
1992 #[serde(rename = "deliver", skip_serializing_if = "is_default")]
1994 pub delivery_prefix: Option<String>,
1995}
1996
1997use std::marker::PhantomData;
1998
1999#[derive(Debug, Default)]
2000pub struct Yes;
2001#[derive(Debug, Default)]
2002pub struct No;
2003
2004pub trait ToAssign: Debug {}
2005
2006impl ToAssign for Yes {}
2007impl ToAssign for No {}
2008
2009#[derive(Debug)]
2010pub struct Purge<SEQUENCE, KEEP>
2011where
2012 SEQUENCE: ToAssign,
2013 KEEP: ToAssign,
2014{
2015 inner: PurgeRequest,
2016 sequence_set: PhantomData<SEQUENCE>,
2017 keep_set: PhantomData<KEEP>,
2018 context: Context,
2019 stream_name: String,
2020}
2021
2022impl<SEQUENCE, KEEP> Purge<SEQUENCE, KEEP>
2023where
2024 SEQUENCE: ToAssign,
2025 KEEP: ToAssign,
2026{
2027 pub fn filter<T: Into<String>>(mut self, filter: T) -> Purge<SEQUENCE, KEEP> {
2029 self.inner.filter = Some(filter.into());
2030 self
2031 }
2032}
2033
2034impl Purge<No, No> {
2035 pub(crate) fn build<I>(stream: &Stream<I>) -> Purge<No, No> {
2036 Purge {
2037 context: stream.context.clone(),
2038 stream_name: stream.name.clone(),
2039 inner: Default::default(),
2040 sequence_set: PhantomData {},
2041 keep_set: PhantomData {},
2042 }
2043 }
2044}
2045
2046impl<KEEP> Purge<No, KEEP>
2047where
2048 KEEP: ToAssign,
2049{
2050 pub fn keep(self, keep: u64) -> Purge<No, Yes> {
2053 Purge {
2054 context: self.context.clone(),
2055 stream_name: self.stream_name.clone(),
2056 sequence_set: PhantomData {},
2057 keep_set: PhantomData {},
2058 inner: PurgeRequest {
2059 keep: Some(keep),
2060 ..self.inner
2061 },
2062 }
2063 }
2064}
2065impl<SEQUENCE> Purge<SEQUENCE, No>
2066where
2067 SEQUENCE: ToAssign,
2068{
2069 pub fn sequence(self, sequence: u64) -> Purge<Yes, No> {
2072 Purge {
2073 context: self.context.clone(),
2074 stream_name: self.stream_name.clone(),
2075 sequence_set: PhantomData {},
2076 keep_set: PhantomData {},
2077 inner: PurgeRequest {
2078 sequence: Some(sequence),
2079 ..self.inner
2080 },
2081 }
2082 }
2083}
2084
2085#[derive(Clone, Debug, PartialEq)]
2086pub enum PurgeErrorKind {
2087 Request,
2088 TimedOut,
2089 JetStream(super::errors::Error),
2090}
2091
2092impl Display for PurgeErrorKind {
2093 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2094 match self {
2095 Self::Request => write!(f, "request failed"),
2096 Self::TimedOut => write!(f, "timed out"),
2097 Self::JetStream(err) => write!(f, "JetStream error: {}", err),
2098 }
2099 }
2100}
2101
2102pub type PurgeError = Error<PurgeErrorKind>;
2103
2104impl<S, K> IntoFuture for Purge<S, K>
2105where
2106 S: ToAssign + std::marker::Send,
2107 K: ToAssign + std::marker::Send,
2108{
2109 type Output = Result<PurgeResponse, PurgeError>;
2110
2111 type IntoFuture = BoxFuture<'static, Result<PurgeResponse, PurgeError>>;
2112
2113 fn into_future(self) -> Self::IntoFuture {
2114 Box::pin(std::future::IntoFuture::into_future(async move {
2115 let request_subject = format!("STREAM.PURGE.{}", self.stream_name);
2116 let response: Response<PurgeResponse> = self
2117 .context
2118 .request(request_subject, &self.inner)
2119 .map_err(|err| match err.kind() {
2120 RequestErrorKind::TimedOut => PurgeError::new(PurgeErrorKind::TimedOut),
2121 _ => PurgeError::with_source(PurgeErrorKind::Request, err),
2122 })
2123 .await?;
2124
2125 match response {
2126 Response::Err { error } => Err(PurgeError::new(PurgeErrorKind::JetStream(error))),
2127 Response::Ok(response) => Ok(response),
2128 }
2129 }))
2130 }
2131}
2132
2133#[derive(Deserialize, Debug)]
2134struct ConsumerPage {
2135 total: usize,
2136 consumers: Option<Vec<String>>,
2137}
2138
2139#[derive(Deserialize, Debug)]
2140struct ConsumerInfoPage {
2141 total: usize,
2142 consumers: Option<Vec<super::consumer::Info>>,
2143}
2144
2145type ConsumerNamesErrorKind = StreamsErrorKind;
2146type ConsumerNamesError = StreamsError;
2147type PageRequest = BoxFuture<'static, Result<ConsumerPage, RequestError>>;
2148
2149pub struct ConsumerNames {
2150 context: Context,
2151 stream: String,
2152 offset: usize,
2153 page_request: Option<PageRequest>,
2154 consumers: Vec<String>,
2155 done: bool,
2156}
2157
2158impl futures::Stream for ConsumerNames {
2159 type Item = Result<String, ConsumerNamesError>;
2160
2161 fn poll_next(
2162 mut self: Pin<&mut Self>,
2163 cx: &mut std::task::Context<'_>,
2164 ) -> std::task::Poll<Option<Self::Item>> {
2165 match self.page_request.as_mut() {
2166 Some(page) => match page.try_poll_unpin(cx) {
2167 std::task::Poll::Ready(page) => {
2168 self.page_request = None;
2169 let page = page.map_err(|err| {
2170 ConsumerNamesError::with_source(ConsumerNamesErrorKind::Other, err)
2171 })?;
2172
2173 if let Some(consumers) = page.consumers {
2174 self.offset += consumers.len();
2175 self.consumers = consumers;
2176 if self.offset >= page.total {
2177 self.done = true;
2178 }
2179 match self.consumers.pop() {
2180 Some(stream) => Poll::Ready(Some(Ok(stream))),
2181 None => Poll::Ready(None),
2182 }
2183 } else {
2184 Poll::Ready(None)
2185 }
2186 }
2187 std::task::Poll::Pending => std::task::Poll::Pending,
2188 },
2189 None => {
2190 if let Some(stream) = self.consumers.pop() {
2191 Poll::Ready(Some(Ok(stream)))
2192 } else {
2193 if self.done {
2194 return Poll::Ready(None);
2195 }
2196 let context = self.context.clone();
2197 let offset = self.offset;
2198 let stream = self.stream.clone();
2199 self.page_request = Some(Box::pin(async move {
2200 match context
2201 .request(
2202 format!("CONSUMER.NAMES.{stream}"),
2203 &json!({
2204 "offset": offset,
2205 }),
2206 )
2207 .await?
2208 {
2209 Response::Err { error } => Err(RequestError::with_source(
2210 super::context::RequestErrorKind::Other,
2211 error,
2212 )),
2213 Response::Ok(page) => Ok(page),
2214 }
2215 }));
2216 self.poll_next(cx)
2217 }
2218 }
2219 }
2220 }
2221}
2222
2223pub type ConsumersErrorKind = StreamsErrorKind;
2224pub type ConsumersError = StreamsError;
2225type PageInfoRequest = BoxFuture<'static, Result<ConsumerInfoPage, RequestError>>;
2226
2227pub struct Consumers {
2228 context: Context,
2229 stream: String,
2230 offset: usize,
2231 page_request: Option<PageInfoRequest>,
2232 consumers: Vec<super::consumer::Info>,
2233 done: bool,
2234}
2235
2236impl futures::Stream for Consumers {
2237 type Item = Result<super::consumer::Info, ConsumersError>;
2238
2239 fn poll_next(
2240 mut self: Pin<&mut Self>,
2241 cx: &mut std::task::Context<'_>,
2242 ) -> std::task::Poll<Option<Self::Item>> {
2243 match self.page_request.as_mut() {
2244 Some(page) => match page.try_poll_unpin(cx) {
2245 std::task::Poll::Ready(page) => {
2246 self.page_request = None;
2247 let page = page.map_err(|err| {
2248 ConsumersError::with_source(ConsumersErrorKind::Other, err)
2249 })?;
2250 if let Some(consumers) = page.consumers {
2251 self.offset += consumers.len();
2252 self.consumers = consumers;
2253 if self.offset >= page.total {
2254 self.done = true;
2255 }
2256 match self.consumers.pop() {
2257 Some(consumer) => Poll::Ready(Some(Ok(consumer))),
2258 None => Poll::Ready(None),
2259 }
2260 } else {
2261 Poll::Ready(None)
2262 }
2263 }
2264 std::task::Poll::Pending => std::task::Poll::Pending,
2265 },
2266 None => {
2267 if let Some(stream) = self.consumers.pop() {
2268 Poll::Ready(Some(Ok(stream)))
2269 } else {
2270 if self.done {
2271 return Poll::Ready(None);
2272 }
2273 let context = self.context.clone();
2274 let offset = self.offset;
2275 let stream = self.stream.clone();
2276 self.page_request = Some(Box::pin(async move {
2277 match context
2278 .request(
2279 format!("CONSUMER.LIST.{stream}"),
2280 &json!({
2281 "offset": offset,
2282 }),
2283 )
2284 .await?
2285 {
2286 Response::Err { error } => Err(RequestError::with_source(
2287 super::context::RequestErrorKind::Other,
2288 error,
2289 )),
2290 Response::Ok(page) => Ok(page),
2291 }
2292 }));
2293 self.poll_next(cx)
2294 }
2295 }
2296 }
2297 }
2298}
2299
2300#[derive(Clone, Debug, PartialEq)]
2301pub enum LastRawMessageErrorKind {
2302 NoMessageFound,
2303 InvalidSubject,
2304 JetStream(super::errors::Error),
2305 Other,
2306}
2307
2308impl Display for LastRawMessageErrorKind {
2309 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2310 match self {
2311 Self::NoMessageFound => write!(f, "no message found"),
2312 Self::InvalidSubject => write!(f, "invalid subject"),
2313 Self::Other => write!(f, "failed to get last raw message"),
2314 Self::JetStream(err) => write!(f, "JetStream error: {}", err),
2315 }
2316 }
2317}
2318
2319pub type LastRawMessageError = Error<LastRawMessageErrorKind>;
2320pub type RawMessageErrorKind = LastRawMessageErrorKind;
2321pub type RawMessageError = LastRawMessageError;
2322
2323#[derive(Clone, Debug, PartialEq)]
2324pub enum ConsumerErrorKind {
2325 TimedOut,
2327 Request,
2328 InvalidConsumerType,
2329 InvalidName,
2330 JetStream(super::errors::Error),
2331 Other,
2332}
2333
2334impl Display for ConsumerErrorKind {
2335 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2336 match self {
2337 Self::TimedOut => write!(f, "timed out"),
2338 Self::Request => write!(f, "request failed"),
2339 Self::JetStream(err) => write!(f, "JetStream error: {}", err),
2340 Self::Other => write!(f, "consumer error"),
2341 Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2342 Self::InvalidName => write!(f, "invalid consumer name"),
2343 }
2344 }
2345}
2346
2347pub type ConsumerError = Error<ConsumerErrorKind>;
2348
2349#[derive(Clone, Debug, PartialEq)]
2350pub enum ConsumerCreateStrictErrorKind {
2351 TimedOut,
2353 Request,
2354 InvalidConsumerType,
2355 InvalidName,
2356 AlreadyExists,
2357 JetStream(super::errors::Error),
2358 Other,
2359}
2360
2361impl Display for ConsumerCreateStrictErrorKind {
2362 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2363 match self {
2364 Self::TimedOut => write!(f, "timed out"),
2365 Self::Request => write!(f, "request failed"),
2366 Self::JetStream(err) => write!(f, "JetStream error: {}", err),
2367 Self::Other => write!(f, "consumer error"),
2368 Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2369 Self::InvalidName => write!(f, "invalid consumer name"),
2370 Self::AlreadyExists => write!(f, "consumer already exists"),
2371 }
2372 }
2373}
2374
2375pub type ConsumerCreateStrictError = Error<ConsumerCreateStrictErrorKind>;
2376
2377#[derive(Clone, Debug, PartialEq)]
2378pub enum ConsumerUpdateErrorKind {
2379 TimedOut,
2381 Request,
2382 InvalidConsumerType,
2383 InvalidName,
2384 DoesNotExist,
2385 JetStream(super::errors::Error),
2386 Other,
2387}
2388
2389impl Display for ConsumerUpdateErrorKind {
2390 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2391 match self {
2392 Self::TimedOut => write!(f, "timed out"),
2393 Self::Request => write!(f, "request failed"),
2394 Self::JetStream(err) => write!(f, "JetStream error: {}", err),
2395 Self::Other => write!(f, "consumer error"),
2396 Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2397 Self::InvalidName => write!(f, "invalid consumer name"),
2398 Self::DoesNotExist => write!(f, "consumer does not exist"),
2399 }
2400 }
2401}
2402
2403pub type ConsumerUpdateError = Error<ConsumerUpdateErrorKind>;
2404
2405impl From<super::errors::Error> for ConsumerError {
2406 fn from(err: super::errors::Error) -> Self {
2407 ConsumerError::new(ConsumerErrorKind::JetStream(err))
2408 }
2409}
2410impl From<super::errors::Error> for ConsumerCreateStrictError {
2411 fn from(err: super::errors::Error) -> Self {
2412 if err.error_code() == super::errors::ErrorCode::CONSUMER_ALREADY_EXISTS {
2413 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::AlreadyExists)
2414 } else {
2415 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::JetStream(err))
2416 }
2417 }
2418}
2419impl From<super::errors::Error> for ConsumerUpdateError {
2420 fn from(err: super::errors::Error) -> Self {
2421 if err.error_code() == super::errors::ErrorCode::CONSUMER_DOES_NOT_EXIST {
2422 ConsumerUpdateError::new(ConsumerUpdateErrorKind::DoesNotExist)
2423 } else {
2424 ConsumerUpdateError::new(ConsumerUpdateErrorKind::JetStream(err))
2425 }
2426 }
2427}
2428impl From<ConsumerError> for ConsumerUpdateError {
2429 fn from(err: ConsumerError) -> Self {
2430 match err.kind() {
2431 ConsumerErrorKind::JetStream(err) => {
2432 if err.error_code() == super::errors::ErrorCode::CONSUMER_DOES_NOT_EXIST {
2433 ConsumerUpdateError::new(ConsumerUpdateErrorKind::DoesNotExist)
2434 } else {
2435 ConsumerUpdateError::new(ConsumerUpdateErrorKind::JetStream(err))
2436 }
2437 }
2438 ConsumerErrorKind::Request => {
2439 ConsumerUpdateError::new(ConsumerUpdateErrorKind::Request)
2440 }
2441 ConsumerErrorKind::TimedOut => {
2442 ConsumerUpdateError::new(ConsumerUpdateErrorKind::TimedOut)
2443 }
2444 ConsumerErrorKind::InvalidConsumerType => {
2445 ConsumerUpdateError::new(ConsumerUpdateErrorKind::InvalidConsumerType)
2446 }
2447 ConsumerErrorKind::InvalidName => {
2448 ConsumerUpdateError::new(ConsumerUpdateErrorKind::InvalidName)
2449 }
2450 ConsumerErrorKind::Other => ConsumerUpdateError::new(ConsumerUpdateErrorKind::Other),
2451 }
2452 }
2453}
2454
2455impl From<ConsumerError> for ConsumerCreateStrictError {
2456 fn from(err: ConsumerError) -> Self {
2457 match err.kind() {
2458 ConsumerErrorKind::JetStream(err) => {
2459 if err.error_code() == super::errors::ErrorCode::CONSUMER_ALREADY_EXISTS {
2460 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::AlreadyExists)
2461 } else {
2462 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::JetStream(err))
2463 }
2464 }
2465 ConsumerErrorKind::Request => {
2466 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::Request)
2467 }
2468 ConsumerErrorKind::TimedOut => {
2469 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::TimedOut)
2470 }
2471 ConsumerErrorKind::InvalidConsumerType => {
2472 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::InvalidConsumerType)
2473 }
2474 ConsumerErrorKind::InvalidName => {
2475 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::InvalidName)
2476 }
2477 ConsumerErrorKind::Other => {
2478 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::Other)
2479 }
2480 }
2481 }
2482}
2483
2484impl From<super::context::RequestError> for ConsumerError {
2485 fn from(err: super::context::RequestError) -> Self {
2486 match err.kind() {
2487 RequestErrorKind::TimedOut => ConsumerError::new(ConsumerErrorKind::TimedOut),
2488 _ => ConsumerError::with_source(ConsumerErrorKind::Request, err),
2489 }
2490 }
2491}
2492impl From<super::context::RequestError> for ConsumerUpdateError {
2493 fn from(err: super::context::RequestError) -> Self {
2494 match err.kind() {
2495 RequestErrorKind::TimedOut => {
2496 ConsumerUpdateError::new(ConsumerUpdateErrorKind::TimedOut)
2497 }
2498 _ => ConsumerUpdateError::with_source(ConsumerUpdateErrorKind::Request, err),
2499 }
2500 }
2501}
2502impl From<super::context::RequestError> for ConsumerCreateStrictError {
2503 fn from(err: super::context::RequestError) -> Self {
2504 match err.kind() {
2505 RequestErrorKind::TimedOut => {
2506 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::TimedOut)
2507 }
2508 _ => {
2509 ConsumerCreateStrictError::with_source(ConsumerCreateStrictErrorKind::Request, err)
2510 }
2511 }
2512 }
2513}
2514
2515#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
2516pub struct StreamGetMessage {
2517 #[serde(rename = "seq", skip_serializing_if = "is_default")]
2518 sequence: Option<u64>,
2519 #[serde(rename = "next_by_subj", skip_serializing_if = "is_default")]
2520 next_by_subject: Option<String>,
2521 #[serde(rename = "last_by_subj", skip_serializing_if = "is_default")]
2522 last_by_subject: Option<String>,
2523}
2524
2525#[cfg(test)]
2526mod tests {
2527 use super::*;
2528
2529 #[test]
2530 fn consumer_limits_de() {
2531 let config = Config {
2532 ..Default::default()
2533 };
2534
2535 let roundtrip: Config = {
2536 let ser = serde_json::to_string(&config).unwrap();
2537 serde_json::from_str(&ser).unwrap()
2538 };
2539 assert_eq!(config, roundtrip);
2540 }
2541}