1use std::{
17 collections::{self, HashMap},
18 fmt::{self, Debug, Display},
19 future::IntoFuture,
20 io::{self},
21 pin::Pin,
22 str::FromStr,
23 task::Poll,
24 time::Duration,
25};
26
27use crate::{
28 error::Error, header::HeaderName, is_valid_subject, HeaderMap, HeaderValue, StatusCode,
29};
30use base64::engine::general_purpose::STANDARD;
31use base64::engine::Engine;
32use bytes::Bytes;
33use futures::{future::BoxFuture, FutureExt, TryFutureExt};
34use serde::{Deserialize, Deserializer, Serialize};
35use serde_json::json;
36use time::{serde::rfc3339, OffsetDateTime};
37
38use super::{
39 consumer::{self, Consumer, FromConsumer, IntoConsumerConfig},
40 context::{
41 ConsumerInfoError, ConsumerInfoErrorKind, RequestError, RequestErrorKind, StreamsError,
42 StreamsErrorKind,
43 },
44 errors::ErrorCode,
45 is_valid_name,
46 message::{StreamMessage, StreamMessageError},
47 response::Response,
48 Context, Message,
49};
50
51pub type InfoError = RequestError;
52
53#[derive(Clone, Debug, PartialEq)]
54pub enum DirectGetErrorKind {
55 NotFound,
56 InvalidSubject,
57 TimedOut,
58 Request,
59 ErrorResponse(StatusCode, String),
60 Other,
61}
62
63impl Display for DirectGetErrorKind {
64 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65 match self {
66 Self::InvalidSubject => write!(f, "invalid subject"),
67 Self::NotFound => write!(f, "message not found"),
68 Self::ErrorResponse(status, description) => {
69 write!(f, "unable to get message: {} {}", status, description)
70 }
71 Self::Other => write!(f, "error getting message"),
72 Self::TimedOut => write!(f, "timed out"),
73 Self::Request => write!(f, "request failed"),
74 }
75 }
76}
77
78pub type DirectGetError = Error<DirectGetErrorKind>;
79
80impl From<crate::RequestError> for DirectGetError {
81 fn from(err: crate::RequestError) -> Self {
82 match err.kind() {
83 crate::RequestErrorKind::TimedOut => DirectGetError::new(DirectGetErrorKind::TimedOut),
84 crate::RequestErrorKind::NoResponders => {
85 DirectGetError::new(DirectGetErrorKind::ErrorResponse(
86 StatusCode::NO_RESPONDERS,
87 "no responders".to_string(),
88 ))
89 }
90 crate::RequestErrorKind::Other => {
91 DirectGetError::with_source(DirectGetErrorKind::Other, err)
92 }
93 }
94 }
95}
96
97impl From<serde_json::Error> for DirectGetError {
98 fn from(err: serde_json::Error) -> Self {
99 DirectGetError::with_source(DirectGetErrorKind::Other, err)
100 }
101}
102
103impl From<StreamMessageError> for DirectGetError {
104 fn from(err: StreamMessageError) -> Self {
105 DirectGetError::with_source(DirectGetErrorKind::Other, err)
106 }
107}
108
109#[derive(Clone, Debug, PartialEq)]
110pub enum DeleteMessageErrorKind {
111 Request,
112 TimedOut,
113 JetStream(super::errors::Error),
114}
115
116impl Display for DeleteMessageErrorKind {
117 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
118 match self {
119 Self::Request => write!(f, "request failed"),
120 Self::TimedOut => write!(f, "timed out"),
121 Self::JetStream(err) => write!(f, "JetStream error: {}", err),
122 }
123 }
124}
125
126pub type DeleteMessageError = Error<DeleteMessageErrorKind>;
127
128#[derive(Debug, Clone)]
132pub struct Stream<T = Info> {
133 pub(crate) info: T,
134 pub(crate) context: Context,
135 pub(crate) name: String,
136}
137
138impl Stream<Info> {
139 pub async fn info(&mut self) -> Result<&Info, InfoError> {
157 let subject = format!("STREAM.INFO.{}", self.info.config.name);
158
159 match self.context.request(subject, &json!({})).await? {
160 Response::Ok::<Info>(info) => {
161 self.info = info;
162 Ok(&self.info)
163 }
164 Response::Err { error } => Err(error.into()),
165 }
166 }
167
168 pub fn cached_info(&self) -> &Info {
187 &self.info
188 }
189}
190
191impl<I> Stream<I> {
192 pub async fn get_info(&self) -> Result<Info, InfoError> {
195 let subject = format!("STREAM.INFO.{}", self.name);
196
197 match self.context.request(subject, &json!({})).await? {
198 Response::Ok::<Info>(info) => Ok(info),
199 Response::Err { error } => Err(error.into()),
200 }
201 }
202
203 pub async fn info_with_subjects<F: AsRef<str>>(
226 &self,
227 subjects_filter: F,
228 ) -> Result<InfoWithSubjects, InfoError> {
229 let subjects_filter = subjects_filter.as_ref().to_string();
230 let info = stream_info_with_details(
232 self.context.clone(),
233 self.name.clone(),
234 0,
235 false,
236 subjects_filter.clone(),
237 )
238 .await?;
239
240 Ok(InfoWithSubjects::new(
241 self.context.clone(),
242 info,
243 subjects_filter,
244 ))
245 }
246
247 pub fn info_builder(&self) -> StreamInfoBuilder {
273 StreamInfoBuilder::new(self.context.clone(), self.name.clone())
274 }
275
276 pub async fn direct_get_next_for_subject<T: AsRef<str>>(
311 &self,
312 subject: T,
313 sequence: Option<u64>,
314 ) -> Result<Message, DirectGetError> {
315 if !is_valid_subject(&subject) {
316 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject));
317 }
318 let request_subject = format!("{}.DIRECT.GET.{}", &self.context.prefix, &self.name);
319 let payload;
320 if let Some(sequence) = sequence {
321 payload = json!({
322 "seq": sequence,
323 "next_by_subj": subject.as_ref(),
324 });
325 } else {
326 payload = json!({
327 "next_by_subj": subject.as_ref(),
328 });
329 }
330
331 let response = self
332 .context
333 .client
334 .request(
335 request_subject,
336 serde_json::to_vec(&payload).map(Bytes::from)?,
337 )
338 .await
339 .map(|message| Message {
340 message,
341 context: self.context.clone(),
342 })?;
343
344 if let Some(status) = response.status {
345 if let Some(ref description) = response.description {
346 match status {
347 StatusCode::NOT_FOUND => {
348 return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
349 }
350 StatusCode::TIMEOUT => {
352 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
353 }
354 _ => {
355 return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
356 status,
357 description.to_string(),
358 )));
359 }
360 }
361 }
362 }
363 Ok(response)
364 }
365
366 pub async fn direct_get_first_for_subject<T: AsRef<str>>(
398 &self,
399 subject: T,
400 ) -> Result<Message, DirectGetError> {
401 if !is_valid_subject(&subject) {
402 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject));
403 }
404 let request_subject = format!("{}.DIRECT.GET.{}", &self.context.prefix, &self.name);
405 let payload = json!({
406 "next_by_subj": subject.as_ref(),
407 });
408
409 let response = self
410 .context
411 .client
412 .request(
413 request_subject,
414 serde_json::to_vec(&payload).map(Bytes::from)?,
415 )
416 .await
417 .map(|message| Message {
418 message,
419 context: self.context.clone(),
420 })?;
421 if let Some(status) = response.status {
422 if let Some(ref description) = response.description {
423 match status {
424 StatusCode::NOT_FOUND => {
425 return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
426 }
427 StatusCode::TIMEOUT => {
429 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
430 }
431 _ => {
432 return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
433 status,
434 description.to_string(),
435 )));
436 }
437 }
438 }
439 }
440 Ok(response)
441 }
442
443 pub async fn direct_get(&self, sequence: u64) -> Result<StreamMessage, DirectGetError> {
475 let subject = format!("{}.DIRECT.GET.{}", &self.context.prefix, &self.name);
476 let payload = json!({
477 "seq": sequence,
478 });
479
480 let response = self
481 .context
482 .client
483 .request(subject, serde_json::to_vec(&payload).map(Bytes::from)?)
484 .await?;
485
486 if let Some(status) = response.status {
487 if let Some(ref description) = response.description {
488 match status {
489 StatusCode::NOT_FOUND => {
490 return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
491 }
492 StatusCode::TIMEOUT => {
494 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
495 }
496 _ => {
497 return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
498 status,
499 description.to_string(),
500 )));
501 }
502 }
503 }
504 }
505 StreamMessage::try_from(response).map_err(Into::into)
506 }
507
508 pub async fn direct_get_last_for_subject<T: AsRef<str>>(
540 &self,
541 subject: T,
542 ) -> Result<StreamMessage, DirectGetError> {
543 let subject = format!(
544 "{}.DIRECT.GET.{}.{}",
545 &self.context.prefix,
546 &self.name,
547 subject.as_ref()
548 );
549
550 let response = self.context.client.request(subject, "".into()).await?;
551 if let Some(status) = response.status {
552 if let Some(ref description) = response.description {
553 match status {
554 StatusCode::NOT_FOUND => {
555 return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
556 }
557 StatusCode::TIMEOUT => {
559 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
560 }
561 _ => {
562 return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
563 status,
564 description.to_string(),
565 )));
566 }
567 }
568 }
569 }
570 StreamMessage::try_from(response).map_err(Into::into)
571 }
572 pub async fn get_raw_message(&self, sequence: u64) -> Result<StreamMessage, RawMessageError> {
602 self.raw_message(StreamGetMessage {
603 sequence: Some(sequence),
604 last_by_subject: None,
605 next_by_subject: None,
606 })
607 .await
608 }
609
610 pub async fn get_first_raw_message_by_subject<T: AsRef<str>>(
634 &self,
635 subject: T,
636 sequence: u64,
637 ) -> Result<StreamMessage, RawMessageError> {
638 self.raw_message(StreamGetMessage {
639 sequence: Some(sequence),
640 last_by_subject: None,
641 next_by_subject: Some(subject.as_ref().to_string()),
642 })
643 .await
644 }
645
646 pub async fn get_next_raw_message_by_subject<T: AsRef<str>>(
670 &self,
671 subject: T,
672 ) -> Result<StreamMessage, RawMessageError> {
673 self.raw_message(StreamGetMessage {
674 sequence: None,
675 last_by_subject: None,
676 next_by_subject: Some(subject.as_ref().to_string()),
677 })
678 .await
679 }
680
681 async fn raw_message(
682 &self,
683 request: StreamGetMessage,
684 ) -> Result<StreamMessage, RawMessageError> {
685 for subject in [&request.last_by_subject, &request.next_by_subject]
686 .into_iter()
687 .flatten()
688 {
689 if !is_valid_subject(subject) {
690 return Err(RawMessageError::new(RawMessageErrorKind::InvalidSubject));
691 }
692 }
693 let subject = format!("STREAM.MSG.GET.{}", &self.name);
694
695 let response: Response<GetRawMessage> = self
696 .context
697 .request(subject, &request)
698 .map_err(|err| LastRawMessageError::with_source(LastRawMessageErrorKind::Other, err))
699 .await?;
700
701 match response {
702 Response::Err { error } => {
703 if error.error_code() == ErrorCode::NO_MESSAGE_FOUND {
704 Err(LastRawMessageError::new(
705 LastRawMessageErrorKind::NoMessageFound,
706 ))
707 } else {
708 Err(LastRawMessageError::new(
709 LastRawMessageErrorKind::JetStream(error),
710 ))
711 }
712 }
713 Response::Ok(value) => StreamMessage::try_from(value.message)
714 .map_err(|err| RawMessageError::with_source(RawMessageErrorKind::Other, err)),
715 }
716 }
717
718 pub async fn get_last_raw_message_by_subject(
742 &self,
743 stream_subject: &str,
744 ) -> Result<StreamMessage, LastRawMessageError> {
745 self.raw_message(StreamGetMessage {
746 sequence: None,
747 last_by_subject: Some(stream_subject.to_string()),
748 next_by_subject: None,
749 })
750 .await
751 }
752
753 pub async fn delete_message(&self, sequence: u64) -> Result<bool, DeleteMessageError> {
777 let subject = format!("STREAM.MSG.DELETE.{}", &self.name);
778 let payload = json!({
779 "seq": sequence,
780 });
781
782 let response: Response<DeleteStatus> = self
783 .context
784 .request(subject, &payload)
785 .map_err(|err| match err.kind() {
786 RequestErrorKind::TimedOut => {
787 DeleteMessageError::new(DeleteMessageErrorKind::TimedOut)
788 }
789 _ => DeleteMessageError::with_source(DeleteMessageErrorKind::Request, err),
790 })
791 .await?;
792
793 match response {
794 Response::Err { error } => Err(DeleteMessageError::new(
795 DeleteMessageErrorKind::JetStream(error),
796 )),
797 Response::Ok(value) => Ok(value.success),
798 }
799 }
800
801 pub fn purge(&self) -> Purge<No, No> {
817 Purge::build(self)
818 }
819
820 #[deprecated(
837 since = "0.25.0",
838 note = "Overloads have been replaced with an into_future based builder. Use Stream::purge().filter(subject) instead."
839 )]
840 pub async fn purge_subject<T>(&self, subject: T) -> Result<PurgeResponse, PurgeError>
841 where
842 T: Into<String>,
843 {
844 self.purge().filter(subject).await
845 }
846
847 pub async fn create_consumer<C: IntoConsumerConfig + FromConsumer>(
871 &self,
872 config: C,
873 ) -> Result<Consumer<C>, ConsumerError> {
874 self.context
875 .create_consumer_on_stream(config, self.name.clone())
876 .await
877 }
878
879 #[cfg(feature = "server_2_10")]
903 pub async fn update_consumer<C: IntoConsumerConfig + FromConsumer>(
904 &self,
905 config: C,
906 ) -> Result<Consumer<C>, ConsumerUpdateError> {
907 self.context
908 .update_consumer_on_stream(config, self.name.clone())
909 .await
910 }
911
912 #[cfg(feature = "server_2_10")]
937 pub async fn create_consumer_strict<C: IntoConsumerConfig + FromConsumer>(
938 &self,
939 config: C,
940 ) -> Result<Consumer<C>, ConsumerCreateStrictError> {
941 self.context
942 .create_consumer_strict_on_stream(config, self.name.clone())
943 .await
944 }
945
946 pub async fn consumer_info<T: AsRef<str>>(
963 &self,
964 name: T,
965 ) -> Result<consumer::Info, ConsumerInfoError> {
966 let name = name.as_ref();
967
968 if !is_valid_name(name) {
969 return Err(ConsumerInfoError::new(ConsumerInfoErrorKind::InvalidName));
970 }
971
972 let subject = format!("CONSUMER.INFO.{}.{}", self.name, name);
973
974 match self.context.request(subject, &json!({})).await? {
975 Response::Ok(info) => Ok(info),
976 Response::Err { error } => Err(error.into()),
977 }
978 }
979
980 pub async fn get_consumer<T: FromConsumer + IntoConsumerConfig>(
999 &self,
1000 name: &str,
1001 ) -> Result<Consumer<T>, crate::Error> {
1002 let info = self.consumer_info(name).await?;
1003
1004 Ok(Consumer::new(
1005 T::try_from_consumer_config(info.config.clone())?,
1006 info,
1007 self.context.clone(),
1008 ))
1009 }
1010
1011 pub async fn get_or_create_consumer<T: FromConsumer + IntoConsumerConfig>(
1039 &self,
1040 name: &str,
1041 config: T,
1042 ) -> Result<Consumer<T>, ConsumerError> {
1043 let subject = format!("CONSUMER.INFO.{}.{}", self.name, name);
1044
1045 match self.context.request(subject, &json!({})).await? {
1046 Response::Err { error } if error.code() == 404 => self.create_consumer(config).await,
1047 Response::Err { error } => Err(error.into()),
1048 Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
1049 T::try_from_consumer_config(info.config.clone()).map_err(|err| {
1050 ConsumerError::with_source(ConsumerErrorKind::InvalidConsumerType, err)
1051 })?,
1052 info,
1053 self.context.clone(),
1054 )),
1055 }
1056 }
1057
1058 pub async fn delete_consumer(&self, name: &str) -> Result<DeleteStatus, ConsumerError> {
1079 let subject = format!("CONSUMER.DELETE.{}.{}", self.name, name);
1080
1081 match self.context.request(subject, &json!({})).await? {
1082 Response::Ok(delete_status) => Ok(delete_status),
1083 Response::Err { error } => Err(error.into()),
1084 }
1085 }
1086
1087 #[cfg(feature = "server_2_11")]
1111 pub async fn pause_consumer(
1112 &self,
1113 name: &str,
1114 pause_until: OffsetDateTime,
1115 ) -> Result<PauseResponse, ConsumerError> {
1116 self.request_pause_consumer(name, Some(pause_until)).await
1117 }
1118
1119 #[cfg(feature = "server_2_11")]
1140 pub async fn resume_consumer(&self, name: &str) -> Result<PauseResponse, ConsumerError> {
1141 self.request_pause_consumer(name, None).await
1142 }
1143
1144 #[cfg(feature = "server_2_11")]
1145 async fn request_pause_consumer(
1146 &self,
1147 name: &str,
1148 pause_until: Option<OffsetDateTime>,
1149 ) -> Result<PauseResponse, ConsumerError> {
1150 let subject = format!("CONSUMER.PAUSE.{}.{}", self.name, name);
1151 let payload = &PauseResumeConsumerRequest { pause_until };
1152
1153 match self.context.request(subject, payload).await? {
1154 Response::Ok::<PauseResponse>(resp) => Ok(resp),
1155 Response::Err { error } => Err(error.into()),
1156 }
1157 }
1158
1159 pub fn consumer_names(&self) -> ConsumerNames {
1178 ConsumerNames {
1179 context: self.context.clone(),
1180 stream: self.name.clone(),
1181 offset: 0,
1182 page_request: None,
1183 consumers: Vec::new(),
1184 done: false,
1185 }
1186 }
1187
1188 pub fn consumers(&self) -> Consumers {
1207 Consumers {
1208 context: self.context.clone(),
1209 stream: self.name.clone(),
1210 offset: 0,
1211 page_request: None,
1212 consumers: Vec::new(),
1213 done: false,
1214 }
1215 }
1216}
1217
1218pub struct StreamInfoBuilder {
1219 pub(crate) context: Context,
1220 pub(crate) name: String,
1221 pub(crate) deleted: bool,
1222 pub(crate) subject: String,
1223}
1224
1225impl StreamInfoBuilder {
1226 fn new(context: Context, name: String) -> Self {
1227 Self {
1228 context,
1229 name,
1230 deleted: false,
1231 subject: "".to_string(),
1232 }
1233 }
1234
1235 pub fn with_deleted(mut self, deleted: bool) -> Self {
1236 self.deleted = deleted;
1237 self
1238 }
1239
1240 pub fn subjects<S: Into<String>>(mut self, subject: S) -> Self {
1241 self.subject = subject.into();
1242 self
1243 }
1244
1245 pub async fn fetch(self) -> Result<InfoWithSubjects, InfoError> {
1246 let info = stream_info_with_details(
1247 self.context.clone(),
1248 self.name.clone(),
1249 0,
1250 self.deleted,
1251 self.subject.clone(),
1252 )
1253 .await?;
1254
1255 Ok(InfoWithSubjects::new(self.context, info, self.subject))
1256 }
1257}
1258
1259#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
1263pub struct Config {
1264 pub name: String,
1266 pub max_bytes: i64,
1268 #[serde(rename = "max_msgs")]
1270 pub max_messages: i64,
1271 #[serde(rename = "max_msgs_per_subject")]
1273 pub max_messages_per_subject: i64,
1274 pub discard: DiscardPolicy,
1277 #[serde(default, skip_serializing_if = "is_default")]
1279 pub discard_new_per_subject: bool,
1280 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1283 pub subjects: Vec<String>,
1284 pub retention: RetentionPolicy,
1286 pub max_consumers: i32,
1288 #[serde(with = "serde_nanos")]
1290 pub max_age: Duration,
1291 #[serde(default, skip_serializing_if = "is_default", rename = "max_msg_size")]
1293 pub max_message_size: i32,
1294 pub storage: StorageType,
1296 pub num_replicas: usize,
1298 #[serde(default, skip_serializing_if = "is_default")]
1300 pub no_ack: bool,
1301 #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
1303 pub duplicate_window: Duration,
1304 #[serde(default, skip_serializing_if = "is_default")]
1306 pub template_owner: String,
1307 #[serde(default, skip_serializing_if = "is_default")]
1309 pub sealed: bool,
1310 #[serde(default, skip_serializing_if = "is_default")]
1312 pub description: Option<String>,
1313 #[serde(
1314 default,
1315 rename = "allow_rollup_hdrs",
1316 skip_serializing_if = "is_default"
1317 )]
1318 pub allow_rollup: bool,
1320 #[serde(default, skip_serializing_if = "is_default")]
1321 pub deny_delete: bool,
1323 #[serde(default, skip_serializing_if = "is_default")]
1325 pub deny_purge: bool,
1326
1327 #[serde(default, skip_serializing_if = "is_default")]
1329 pub republish: Option<Republish>,
1330
1331 #[serde(default, skip_serializing_if = "is_default")]
1334 pub allow_direct: bool,
1335
1336 #[serde(default, skip_serializing_if = "is_default")]
1338 pub mirror_direct: bool,
1339
1340 #[serde(default, skip_serializing_if = "Option::is_none")]
1342 pub mirror: Option<Source>,
1343
1344 #[serde(default, skip_serializing_if = "Option::is_none")]
1346 pub sources: Option<Vec<Source>>,
1347
1348 #[cfg(feature = "server_2_10")]
1349 #[serde(default, skip_serializing_if = "is_default")]
1351 pub metadata: HashMap<String, String>,
1352
1353 #[cfg(feature = "server_2_10")]
1354 #[serde(default, skip_serializing_if = "Option::is_none")]
1356 pub subject_transform: Option<SubjectTransform>,
1357
1358 #[cfg(feature = "server_2_10")]
1359 #[serde(default, skip_serializing_if = "Option::is_none")]
1364 pub compression: Option<Compression>,
1365 #[cfg(feature = "server_2_10")]
1366 #[serde(default, deserialize_with = "default_consumer_limits_as_none")]
1368 pub consumer_limits: Option<ConsumerLimits>,
1369
1370 #[cfg(feature = "server_2_10")]
1371 #[serde(default, skip_serializing_if = "Option::is_none", rename = "first_seq")]
1373 pub first_sequence: Option<u64>,
1374
1375 #[serde(default, skip_serializing_if = "Option::is_none")]
1377 pub placement: Option<Placement>,
1378 #[cfg(feature = "server_2_11")]
1380 #[serde(
1381 default,
1382 with = "rfc3339::option",
1383 skip_serializing_if = "Option::is_none"
1384 )]
1385 pub pause_until: Option<OffsetDateTime>,
1386
1387 #[cfg(feature = "server_2_11")]
1389 #[serde(default, skip_serializing_if = "is_default", rename = "allow_msg_ttl")]
1390 pub allow_message_ttl: bool,
1391
1392 #[cfg(feature = "server_2_11")]
1395 #[serde(default, skip_serializing_if = "Option::is_none", with = "serde_nanos")]
1396 pub subject_delete_marker_ttl: Option<Duration>,
1397}
1398
1399impl From<&Config> for Config {
1400 fn from(sc: &Config) -> Config {
1401 sc.clone()
1402 }
1403}
1404
1405impl From<&str> for Config {
1406 fn from(s: &str) -> Config {
1407 Config {
1408 name: s.to_string(),
1409 ..Default::default()
1410 }
1411 }
1412}
1413
1414#[cfg(feature = "server_2_10")]
1415fn default_consumer_limits_as_none<'de, D>(
1416 deserializer: D,
1417) -> Result<Option<ConsumerLimits>, D::Error>
1418where
1419 D: Deserializer<'de>,
1420{
1421 let consumer_limits = Option::<ConsumerLimits>::deserialize(deserializer)?;
1422 if let Some(cl) = consumer_limits {
1423 if cl == ConsumerLimits::default() {
1424 Ok(None)
1425 } else {
1426 Ok(Some(cl))
1427 }
1428 } else {
1429 Ok(None)
1430 }
1431}
1432#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Default)]
1433pub struct ConsumerLimits {
1434 #[serde(default, with = "serde_nanos")]
1436 pub inactive_threshold: std::time::Duration,
1437 #[serde(default)]
1439 pub max_ack_pending: i64,
1440}
1441
1442#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1443pub enum Compression {
1444 #[serde(rename = "s2")]
1445 S2,
1446 #[serde(rename = "none")]
1447 None,
1448}
1449
1450#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1452pub struct SubjectTransform {
1453 #[serde(rename = "src")]
1454 pub source: String,
1455
1456 #[serde(rename = "dest")]
1457 pub destination: String,
1458}
1459
1460#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1462pub struct Republish {
1463 #[serde(rename = "src")]
1465 pub source: String,
1466 #[serde(rename = "dest")]
1468 pub destination: String,
1469 #[serde(default)]
1471 pub headers_only: bool,
1472}
1473
1474#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
1476pub struct Placement {
1477 #[serde(default, skip_serializing_if = "is_default")]
1479 pub cluster: Option<String>,
1480 #[serde(default, skip_serializing_if = "is_default")]
1482 pub tags: Vec<String>,
1483}
1484
1485#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1488#[repr(u8)]
1489pub enum DiscardPolicy {
1490 #[default]
1492 #[serde(rename = "old")]
1493 Old = 0,
1494 #[serde(rename = "new")]
1496 New = 1,
1497}
1498
1499#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1501#[repr(u8)]
1502pub enum RetentionPolicy {
1503 #[default]
1506 #[serde(rename = "limits")]
1507 Limits = 0,
1508 #[serde(rename = "interest")]
1510 Interest = 1,
1511 #[serde(rename = "workqueue")]
1513 WorkQueue = 2,
1514}
1515
1516#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1518#[repr(u8)]
1519pub enum StorageType {
1520 #[default]
1522 #[serde(rename = "file")]
1523 File = 0,
1524 #[serde(rename = "memory")]
1526 Memory = 1,
1527}
1528
1529async fn stream_info_with_details(
1530 context: Context,
1531 stream: String,
1532 offset: usize,
1533 deleted_details: bool,
1534 subjects_filter: String,
1535) -> Result<Info, InfoError> {
1536 let subject = format!("STREAM.INFO.{}", stream);
1537
1538 let payload = StreamInfoRequest {
1539 offset,
1540 deleted_details,
1541 subjects_filter,
1542 };
1543
1544 let response: Response<Info> = context.request(subject, &payload).await?;
1545
1546 match response {
1547 Response::Ok(info) => Ok(info),
1548 Response::Err { error } => Err(error.into()),
1549 }
1550}
1551
1552type InfoRequest = BoxFuture<'static, Result<Info, InfoError>>;
1553
1554#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
1555pub struct StreamInfoRequest {
1556 offset: usize,
1557 deleted_details: bool,
1558 subjects_filter: String,
1559}
1560
1561pub struct InfoWithSubjects {
1562 stream: String,
1563 context: Context,
1564 pub info: Info,
1565 offset: usize,
1566 subjects: collections::hash_map::IntoIter<String, usize>,
1567 info_request: Option<InfoRequest>,
1568 subjects_filter: String,
1569 pages_done: bool,
1570}
1571
1572impl InfoWithSubjects {
1573 pub fn new(context: Context, mut info: Info, subject: String) -> Self {
1574 let subjects = info.state.subjects.take().unwrap_or_default();
1575 let name = info.config.name.clone();
1576 InfoWithSubjects {
1577 context,
1578 info,
1579 pages_done: subjects.is_empty(),
1580 offset: subjects.len(),
1581 subjects: subjects.into_iter(),
1582 subjects_filter: subject,
1583 stream: name,
1584 info_request: None,
1585 }
1586 }
1587}
1588
1589impl futures::Stream for InfoWithSubjects {
1590 type Item = Result<(String, usize), InfoError>;
1591
1592 fn poll_next(
1593 mut self: Pin<&mut Self>,
1594 cx: &mut std::task::Context<'_>,
1595 ) -> Poll<Option<Self::Item>> {
1596 match self.subjects.next() {
1597 Some((subject, count)) => Poll::Ready(Some(Ok((subject, count)))),
1598 None => {
1599 if self.pages_done {
1601 return Poll::Ready(None);
1602 }
1603 let stream = self.stream.clone();
1604 let context = self.context.clone();
1605 let subjects_filter = self.subjects_filter.clone();
1606 let offset = self.offset;
1607 match self
1608 .info_request
1609 .get_or_insert_with(|| {
1610 Box::pin(stream_info_with_details(
1611 context,
1612 stream,
1613 offset,
1614 false,
1615 subjects_filter,
1616 ))
1617 })
1618 .poll_unpin(cx)
1619 {
1620 Poll::Ready(resp) => match resp {
1621 Ok(info) => {
1622 let subjects = info.state.subjects.clone();
1623 self.offset += subjects.as_ref().map_or_else(|| 0, |s| s.len());
1624 self.info_request = None;
1625 let subjects = subjects.unwrap_or_default();
1626 self.subjects = info.state.subjects.unwrap_or_default().into_iter();
1627 let total = info.paged_info.map(|info| info.total).unwrap_or(0);
1628 if total <= self.offset || subjects.is_empty() {
1629 self.pages_done = true;
1630 }
1631 match self.subjects.next() {
1632 Some((subject, count)) => Poll::Ready(Some(Ok((subject, count)))),
1633 None => Poll::Ready(None),
1634 }
1635 }
1636 Err(err) => Poll::Ready(Some(Err(err))),
1637 },
1638 Poll::Pending => Poll::Pending,
1639 }
1640 }
1641 }
1642 }
1643}
1644
1645#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1647pub struct Info {
1648 pub config: Config,
1650 #[serde(with = "rfc3339")]
1652 pub created: time::OffsetDateTime,
1653 pub state: State,
1655 pub cluster: Option<ClusterInfo>,
1657 #[serde(default)]
1659 pub mirror: Option<SourceInfo>,
1660 #[serde(default)]
1662 pub sources: Vec<SourceInfo>,
1663 #[serde(flatten)]
1664 paged_info: Option<PagedInfo>,
1665}
1666
1667#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1668pub struct PagedInfo {
1669 offset: usize,
1670 total: usize,
1671 limit: usize,
1672}
1673
1674#[derive(Deserialize)]
1675pub struct DeleteStatus {
1676 pub success: bool,
1677}
1678
1679#[cfg(feature = "server_2_11")]
1680#[derive(Deserialize)]
1681pub struct PauseResponse {
1682 pub paused: bool,
1683 #[serde(with = "rfc3339")]
1684 pub pause_until: OffsetDateTime,
1685 #[serde(default, with = "serde_nanos")]
1686 pub pause_remaining: Option<Duration>,
1687}
1688
1689#[cfg(feature = "server_2_11")]
1690#[derive(Serialize, Debug)]
1691struct PauseResumeConsumerRequest {
1692 #[serde(with = "rfc3339::option", skip_serializing_if = "Option::is_none")]
1693 pause_until: Option<OffsetDateTime>,
1694}
1695
1696#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1698pub struct State {
1699 pub messages: u64,
1701 pub bytes: u64,
1703 #[serde(rename = "first_seq")]
1705 pub first_sequence: u64,
1706 #[serde(with = "rfc3339", rename = "first_ts")]
1708 pub first_timestamp: time::OffsetDateTime,
1709 #[serde(rename = "last_seq")]
1711 pub last_sequence: u64,
1712 #[serde(with = "rfc3339", rename = "last_ts")]
1714 pub last_timestamp: time::OffsetDateTime,
1715 pub consumer_count: usize,
1717 #[serde(default, rename = "num_subjects")]
1719 pub subjects_count: u64,
1720 #[serde(default, rename = "num_deleted")]
1722 pub deleted_count: Option<u64>,
1723 #[serde(default)]
1726 pub deleted: Option<Vec<u64>>,
1727
1728 pub(crate) subjects: Option<HashMap<String, usize>>,
1729}
1730
1731#[derive(Debug, Serialize, Deserialize, Clone)]
1733pub struct RawMessage {
1734 #[serde(rename = "subject")]
1736 pub subject: String,
1737
1738 #[serde(rename = "seq")]
1740 pub sequence: u64,
1741
1742 #[serde(default, rename = "data")]
1744 pub payload: String,
1745
1746 #[serde(default, rename = "hdrs")]
1748 pub headers: Option<String>,
1749
1750 #[serde(rename = "time", with = "rfc3339")]
1752 pub time: time::OffsetDateTime,
1753}
1754
1755impl TryFrom<RawMessage> for StreamMessage {
1756 type Error = crate::Error;
1757
1758 fn try_from(value: RawMessage) -> Result<Self, Self::Error> {
1759 let decoded_payload = STANDARD
1760 .decode(value.payload)
1761 .map_err(|err| Box::new(std::io::Error::other(err)))?;
1762 let decoded_headers = value
1763 .headers
1764 .map(|header| STANDARD.decode(header))
1765 .map_or(Ok(None), |v| v.map(Some))?;
1766
1767 let (headers, _, _) = decoded_headers
1768 .map_or_else(|| Ok((HeaderMap::new(), None, None)), |h| parse_headers(&h))?;
1769
1770 Ok(StreamMessage {
1771 subject: value.subject.into(),
1772 payload: decoded_payload.into(),
1773 headers,
1774 sequence: value.sequence,
1775 time: value.time,
1776 })
1777 }
1778}
1779
1780fn is_continuation(c: char) -> bool {
1781 c == ' ' || c == '\t'
1782}
1783const HEADER_LINE: &str = "NATS/1.0";
1784
1785#[allow(clippy::type_complexity)]
1786fn parse_headers(
1787 buf: &[u8],
1788) -> Result<(HeaderMap, Option<StatusCode>, Option<String>), crate::Error> {
1789 let mut headers = HeaderMap::new();
1790 let mut maybe_status: Option<StatusCode> = None;
1791 let mut maybe_description: Option<String> = None;
1792 let mut lines = if let Ok(line) = std::str::from_utf8(buf) {
1793 line.lines().peekable()
1794 } else {
1795 return Err(Box::new(std::io::Error::other("invalid header")));
1796 };
1797
1798 if let Some(line) = lines.next() {
1799 let line = line
1800 .strip_prefix(HEADER_LINE)
1801 .ok_or_else(|| {
1802 Box::new(std::io::Error::other(
1803 "version line does not start with NATS/1.0",
1804 ))
1805 })?
1806 .trim();
1807
1808 match line.split_once(' ') {
1809 Some((status, description)) => {
1810 if !status.is_empty() {
1811 maybe_status = Some(status.parse()?);
1812 }
1813
1814 if !description.is_empty() {
1815 maybe_description = Some(description.trim().to_string());
1816 }
1817 }
1818 None => {
1819 if !line.is_empty() {
1820 maybe_status = Some(line.parse()?);
1821 }
1822 }
1823 }
1824 } else {
1825 return Err(Box::new(std::io::Error::other(
1826 "expected header information not found",
1827 )));
1828 };
1829
1830 while let Some(line) = lines.next() {
1831 if line.is_empty() {
1832 continue;
1833 }
1834
1835 if let Some((k, v)) = line.split_once(':').to_owned() {
1836 let mut s = String::from(v.trim());
1837 while let Some(v) = lines.next_if(|s| s.starts_with(is_continuation)).to_owned() {
1838 s.push(' ');
1839 s.push_str(v.trim());
1840 }
1841
1842 headers.insert(
1843 HeaderName::from_str(k)?,
1844 HeaderValue::from_str(&s).map_err(|err| Box::new(io::Error::other(err)))?,
1845 );
1846 } else {
1847 return Err(Box::new(std::io::Error::other("malformed header line")));
1848 }
1849 }
1850
1851 if headers.is_empty() {
1852 Ok((HeaderMap::new(), maybe_status, maybe_description))
1853 } else {
1854 Ok((headers, maybe_status, maybe_description))
1855 }
1856}
1857
1858#[derive(Debug, Serialize, Deserialize, Clone)]
1859struct GetRawMessage {
1860 pub(crate) message: RawMessage,
1861}
1862
1863fn is_default<T: Default + Eq>(t: &T) -> bool {
1864 t == &T::default()
1865}
1866#[derive(Debug, Default, Deserialize, Clone, PartialEq, Eq)]
1868pub struct ClusterInfo {
1869 pub name: Option<String>,
1871 pub leader: Option<String>,
1873 #[serde(default)]
1875 pub replicas: Vec<PeerInfo>,
1876}
1877
1878#[derive(Debug, Default, Deserialize, Clone, PartialEq, Eq)]
1880pub struct PeerInfo {
1881 pub name: String,
1883 pub current: bool,
1885 #[serde(with = "serde_nanos")]
1887 pub active: Duration,
1888 #[serde(default)]
1890 pub offline: bool,
1891 pub lag: Option<u64>,
1893}
1894
1895#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
1896pub struct SourceInfo {
1897 pub name: String,
1899 pub lag: u64,
1901 #[serde(deserialize_with = "negative_duration_as_none")]
1903 pub active: Option<std::time::Duration>,
1904 #[serde(default)]
1906 pub filter_subject: Option<String>,
1907 #[serde(default)]
1909 pub subject_transform_dest: Option<String>,
1910 #[serde(default)]
1912 pub subject_transforms: Vec<SubjectTransform>,
1913}
1914
1915fn negative_duration_as_none<'de, D>(
1916 deserializer: D,
1917) -> Result<Option<std::time::Duration>, D::Error>
1918where
1919 D: Deserializer<'de>,
1920{
1921 let n = i64::deserialize(deserializer)?;
1922 if n.is_negative() {
1923 Ok(None)
1924 } else {
1925 Ok(Some(std::time::Duration::from_nanos(n as u64)))
1926 }
1927}
1928
1929#[derive(Debug, Deserialize, Clone, Copy)]
1931pub struct PurgeResponse {
1932 pub success: bool,
1934 pub purged: u64,
1936}
1937#[derive(Default, Debug, Serialize, Clone)]
1939pub struct PurgeRequest {
1940 #[serde(default, rename = "seq", skip_serializing_if = "is_default")]
1942 pub sequence: Option<u64>,
1943
1944 #[serde(default, skip_serializing_if = "is_default")]
1946 pub filter: Option<String>,
1947
1948 #[serde(default, skip_serializing_if = "is_default")]
1950 pub keep: Option<u64>,
1951}
1952
1953#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
1954pub struct Source {
1955 pub name: String,
1957 #[serde(default, rename = "opt_start_seq", skip_serializing_if = "is_default")]
1959 pub start_sequence: Option<u64>,
1960 #[serde(
1961 default,
1962 rename = "opt_start_time",
1963 skip_serializing_if = "is_default",
1964 with = "rfc3339::option"
1965 )]
1966 pub start_time: Option<OffsetDateTime>,
1968 #[serde(default, skip_serializing_if = "is_default")]
1970 pub filter_subject: Option<String>,
1971 #[serde(default, skip_serializing_if = "Option::is_none")]
1973 pub external: Option<External>,
1974 #[serde(default, skip_serializing_if = "is_default")]
1976 pub domain: Option<String>,
1977 #[cfg(feature = "server_2_10")]
1979 #[serde(default, skip_serializing_if = "is_default")]
1980 pub subject_transforms: Vec<SubjectTransform>,
1981}
1982
1983#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
1984pub struct External {
1985 #[serde(rename = "api")]
1987 pub api_prefix: String,
1988 #[serde(rename = "deliver", skip_serializing_if = "is_default")]
1990 pub delivery_prefix: Option<String>,
1991}
1992
1993use std::marker::PhantomData;
1994
1995#[derive(Debug, Default)]
1996pub struct Yes;
1997#[derive(Debug, Default)]
1998pub struct No;
1999
2000pub trait ToAssign: Debug {}
2001
2002impl ToAssign for Yes {}
2003impl ToAssign for No {}
2004
2005#[derive(Debug)]
2006pub struct Purge<SEQUENCE, KEEP>
2007where
2008 SEQUENCE: ToAssign,
2009 KEEP: ToAssign,
2010{
2011 inner: PurgeRequest,
2012 sequence_set: PhantomData<SEQUENCE>,
2013 keep_set: PhantomData<KEEP>,
2014 context: Context,
2015 stream_name: String,
2016}
2017
2018impl<SEQUENCE, KEEP> Purge<SEQUENCE, KEEP>
2019where
2020 SEQUENCE: ToAssign,
2021 KEEP: ToAssign,
2022{
2023 pub fn filter<T: Into<String>>(mut self, filter: T) -> Purge<SEQUENCE, KEEP> {
2025 self.inner.filter = Some(filter.into());
2026 self
2027 }
2028}
2029
2030impl Purge<No, No> {
2031 pub(crate) fn build<I>(stream: &Stream<I>) -> Purge<No, No> {
2032 Purge {
2033 context: stream.context.clone(),
2034 stream_name: stream.name.clone(),
2035 inner: Default::default(),
2036 sequence_set: PhantomData {},
2037 keep_set: PhantomData {},
2038 }
2039 }
2040}
2041
2042impl<KEEP> Purge<No, KEEP>
2043where
2044 KEEP: ToAssign,
2045{
2046 pub fn keep(self, keep: u64) -> Purge<No, Yes> {
2049 Purge {
2050 context: self.context.clone(),
2051 stream_name: self.stream_name.clone(),
2052 sequence_set: PhantomData {},
2053 keep_set: PhantomData {},
2054 inner: PurgeRequest {
2055 keep: Some(keep),
2056 ..self.inner
2057 },
2058 }
2059 }
2060}
2061impl<SEQUENCE> Purge<SEQUENCE, No>
2062where
2063 SEQUENCE: ToAssign,
2064{
2065 pub fn sequence(self, sequence: u64) -> Purge<Yes, No> {
2068 Purge {
2069 context: self.context.clone(),
2070 stream_name: self.stream_name.clone(),
2071 sequence_set: PhantomData {},
2072 keep_set: PhantomData {},
2073 inner: PurgeRequest {
2074 sequence: Some(sequence),
2075 ..self.inner
2076 },
2077 }
2078 }
2079}
2080
2081#[derive(Clone, Debug, PartialEq)]
2082pub enum PurgeErrorKind {
2083 Request,
2084 TimedOut,
2085 JetStream(super::errors::Error),
2086}
2087
2088impl Display for PurgeErrorKind {
2089 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2090 match self {
2091 Self::Request => write!(f, "request failed"),
2092 Self::TimedOut => write!(f, "timed out"),
2093 Self::JetStream(err) => write!(f, "JetStream error: {}", err),
2094 }
2095 }
2096}
2097
2098pub type PurgeError = Error<PurgeErrorKind>;
2099
2100impl<S, K> IntoFuture for Purge<S, K>
2101where
2102 S: ToAssign + std::marker::Send,
2103 K: ToAssign + std::marker::Send,
2104{
2105 type Output = Result<PurgeResponse, PurgeError>;
2106
2107 type IntoFuture = BoxFuture<'static, Result<PurgeResponse, PurgeError>>;
2108
2109 fn into_future(self) -> Self::IntoFuture {
2110 Box::pin(std::future::IntoFuture::into_future(async move {
2111 let request_subject = format!("STREAM.PURGE.{}", self.stream_name);
2112 let response: Response<PurgeResponse> = self
2113 .context
2114 .request(request_subject, &self.inner)
2115 .map_err(|err| match err.kind() {
2116 RequestErrorKind::TimedOut => PurgeError::new(PurgeErrorKind::TimedOut),
2117 _ => PurgeError::with_source(PurgeErrorKind::Request, err),
2118 })
2119 .await?;
2120
2121 match response {
2122 Response::Err { error } => Err(PurgeError::new(PurgeErrorKind::JetStream(error))),
2123 Response::Ok(response) => Ok(response),
2124 }
2125 }))
2126 }
2127}
2128
2129#[derive(Deserialize, Debug)]
2130struct ConsumerPage {
2131 total: usize,
2132 consumers: Option<Vec<String>>,
2133}
2134
2135#[derive(Deserialize, Debug)]
2136struct ConsumerInfoPage {
2137 total: usize,
2138 consumers: Option<Vec<super::consumer::Info>>,
2139}
2140
2141type ConsumerNamesErrorKind = StreamsErrorKind;
2142type ConsumerNamesError = StreamsError;
2143type PageRequest = BoxFuture<'static, Result<ConsumerPage, RequestError>>;
2144
2145pub struct ConsumerNames {
2146 context: Context,
2147 stream: String,
2148 offset: usize,
2149 page_request: Option<PageRequest>,
2150 consumers: Vec<String>,
2151 done: bool,
2152}
2153
2154impl futures::Stream for ConsumerNames {
2155 type Item = Result<String, ConsumerNamesError>;
2156
2157 fn poll_next(
2158 mut self: Pin<&mut Self>,
2159 cx: &mut std::task::Context<'_>,
2160 ) -> std::task::Poll<Option<Self::Item>> {
2161 match self.page_request.as_mut() {
2162 Some(page) => match page.try_poll_unpin(cx) {
2163 std::task::Poll::Ready(page) => {
2164 self.page_request = None;
2165 let page = page.map_err(|err| {
2166 ConsumerNamesError::with_source(ConsumerNamesErrorKind::Other, err)
2167 })?;
2168
2169 if let Some(consumers) = page.consumers {
2170 self.offset += consumers.len();
2171 self.consumers = consumers;
2172 if self.offset >= page.total {
2173 self.done = true;
2174 }
2175 match self.consumers.pop() {
2176 Some(stream) => Poll::Ready(Some(Ok(stream))),
2177 None => Poll::Ready(None),
2178 }
2179 } else {
2180 Poll::Ready(None)
2181 }
2182 }
2183 std::task::Poll::Pending => std::task::Poll::Pending,
2184 },
2185 None => {
2186 if let Some(stream) = self.consumers.pop() {
2187 Poll::Ready(Some(Ok(stream)))
2188 } else {
2189 if self.done {
2190 return Poll::Ready(None);
2191 }
2192 let context = self.context.clone();
2193 let offset = self.offset;
2194 let stream = self.stream.clone();
2195 self.page_request = Some(Box::pin(async move {
2196 match context
2197 .request(
2198 format!("CONSUMER.NAMES.{stream}"),
2199 &json!({
2200 "offset": offset,
2201 }),
2202 )
2203 .await?
2204 {
2205 Response::Err { error } => Err(RequestError::with_source(
2206 super::context::RequestErrorKind::Other,
2207 error,
2208 )),
2209 Response::Ok(page) => Ok(page),
2210 }
2211 }));
2212 self.poll_next(cx)
2213 }
2214 }
2215 }
2216 }
2217}
2218
2219pub type ConsumersErrorKind = StreamsErrorKind;
2220pub type ConsumersError = StreamsError;
2221type PageInfoRequest = BoxFuture<'static, Result<ConsumerInfoPage, RequestError>>;
2222
2223pub struct Consumers {
2224 context: Context,
2225 stream: String,
2226 offset: usize,
2227 page_request: Option<PageInfoRequest>,
2228 consumers: Vec<super::consumer::Info>,
2229 done: bool,
2230}
2231
2232impl futures::Stream for Consumers {
2233 type Item = Result<super::consumer::Info, ConsumersError>;
2234
2235 fn poll_next(
2236 mut self: Pin<&mut Self>,
2237 cx: &mut std::task::Context<'_>,
2238 ) -> std::task::Poll<Option<Self::Item>> {
2239 match self.page_request.as_mut() {
2240 Some(page) => match page.try_poll_unpin(cx) {
2241 std::task::Poll::Ready(page) => {
2242 self.page_request = None;
2243 let page = page.map_err(|err| {
2244 ConsumersError::with_source(ConsumersErrorKind::Other, err)
2245 })?;
2246 if let Some(consumers) = page.consumers {
2247 self.offset += consumers.len();
2248 self.consumers = consumers;
2249 if self.offset >= page.total {
2250 self.done = true;
2251 }
2252 match self.consumers.pop() {
2253 Some(consumer) => Poll::Ready(Some(Ok(consumer))),
2254 None => Poll::Ready(None),
2255 }
2256 } else {
2257 Poll::Ready(None)
2258 }
2259 }
2260 std::task::Poll::Pending => std::task::Poll::Pending,
2261 },
2262 None => {
2263 if let Some(stream) = self.consumers.pop() {
2264 Poll::Ready(Some(Ok(stream)))
2265 } else {
2266 if self.done {
2267 return Poll::Ready(None);
2268 }
2269 let context = self.context.clone();
2270 let offset = self.offset;
2271 let stream = self.stream.clone();
2272 self.page_request = Some(Box::pin(async move {
2273 match context
2274 .request(
2275 format!("CONSUMER.LIST.{stream}"),
2276 &json!({
2277 "offset": offset,
2278 }),
2279 )
2280 .await?
2281 {
2282 Response::Err { error } => Err(RequestError::with_source(
2283 super::context::RequestErrorKind::Other,
2284 error,
2285 )),
2286 Response::Ok(page) => Ok(page),
2287 }
2288 }));
2289 self.poll_next(cx)
2290 }
2291 }
2292 }
2293 }
2294}
2295
2296#[derive(Clone, Debug, PartialEq)]
2297pub enum LastRawMessageErrorKind {
2298 NoMessageFound,
2299 InvalidSubject,
2300 JetStream(super::errors::Error),
2301 Other,
2302}
2303
2304impl Display for LastRawMessageErrorKind {
2305 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2306 match self {
2307 Self::NoMessageFound => write!(f, "no message found"),
2308 Self::InvalidSubject => write!(f, "invalid subject"),
2309 Self::Other => write!(f, "failed to get last raw message"),
2310 Self::JetStream(err) => write!(f, "JetStream error: {}", err),
2311 }
2312 }
2313}
2314
2315pub type LastRawMessageError = Error<LastRawMessageErrorKind>;
2316pub type RawMessageErrorKind = LastRawMessageErrorKind;
2317pub type RawMessageError = LastRawMessageError;
2318
2319#[derive(Clone, Debug, PartialEq)]
2320pub enum ConsumerErrorKind {
2321 TimedOut,
2323 Request,
2324 InvalidConsumerType,
2325 InvalidName,
2326 JetStream(super::errors::Error),
2327 Other,
2328}
2329
2330impl Display for ConsumerErrorKind {
2331 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2332 match self {
2333 Self::TimedOut => write!(f, "timed out"),
2334 Self::Request => write!(f, "request failed"),
2335 Self::JetStream(err) => write!(f, "JetStream error: {}", err),
2336 Self::Other => write!(f, "consumer error"),
2337 Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2338 Self::InvalidName => write!(f, "invalid consumer name"),
2339 }
2340 }
2341}
2342
2343pub type ConsumerError = Error<ConsumerErrorKind>;
2344
2345#[derive(Clone, Debug, PartialEq)]
2346pub enum ConsumerCreateStrictErrorKind {
2347 TimedOut,
2349 Request,
2350 InvalidConsumerType,
2351 InvalidName,
2352 AlreadyExists,
2353 JetStream(super::errors::Error),
2354 Other,
2355}
2356
2357impl Display for ConsumerCreateStrictErrorKind {
2358 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2359 match self {
2360 Self::TimedOut => write!(f, "timed out"),
2361 Self::Request => write!(f, "request failed"),
2362 Self::JetStream(err) => write!(f, "JetStream error: {}", err),
2363 Self::Other => write!(f, "consumer error"),
2364 Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2365 Self::InvalidName => write!(f, "invalid consumer name"),
2366 Self::AlreadyExists => write!(f, "consumer already exists"),
2367 }
2368 }
2369}
2370
2371pub type ConsumerCreateStrictError = Error<ConsumerCreateStrictErrorKind>;
2372
2373#[derive(Clone, Debug, PartialEq)]
2374pub enum ConsumerUpdateErrorKind {
2375 TimedOut,
2377 Request,
2378 InvalidConsumerType,
2379 InvalidName,
2380 DoesNotExist,
2381 JetStream(super::errors::Error),
2382 Other,
2383}
2384
2385impl Display for ConsumerUpdateErrorKind {
2386 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2387 match self {
2388 Self::TimedOut => write!(f, "timed out"),
2389 Self::Request => write!(f, "request failed"),
2390 Self::JetStream(err) => write!(f, "JetStream error: {}", err),
2391 Self::Other => write!(f, "consumer error"),
2392 Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2393 Self::InvalidName => write!(f, "invalid consumer name"),
2394 Self::DoesNotExist => write!(f, "consumer does not exist"),
2395 }
2396 }
2397}
2398
2399pub type ConsumerUpdateError = Error<ConsumerUpdateErrorKind>;
2400
2401impl From<super::errors::Error> for ConsumerError {
2402 fn from(err: super::errors::Error) -> Self {
2403 ConsumerError::new(ConsumerErrorKind::JetStream(err))
2404 }
2405}
2406impl From<super::errors::Error> for ConsumerCreateStrictError {
2407 fn from(err: super::errors::Error) -> Self {
2408 if err.error_code() == super::errors::ErrorCode::CONSUMER_ALREADY_EXISTS {
2409 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::AlreadyExists)
2410 } else {
2411 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::JetStream(err))
2412 }
2413 }
2414}
2415impl From<super::errors::Error> for ConsumerUpdateError {
2416 fn from(err: super::errors::Error) -> Self {
2417 if err.error_code() == super::errors::ErrorCode::CONSUMER_DOES_NOT_EXIST {
2418 ConsumerUpdateError::new(ConsumerUpdateErrorKind::DoesNotExist)
2419 } else {
2420 ConsumerUpdateError::new(ConsumerUpdateErrorKind::JetStream(err))
2421 }
2422 }
2423}
2424impl From<ConsumerError> for ConsumerUpdateError {
2425 fn from(err: ConsumerError) -> Self {
2426 match err.kind() {
2427 ConsumerErrorKind::JetStream(err) => {
2428 if err.error_code() == super::errors::ErrorCode::CONSUMER_DOES_NOT_EXIST {
2429 ConsumerUpdateError::new(ConsumerUpdateErrorKind::DoesNotExist)
2430 } else {
2431 ConsumerUpdateError::new(ConsumerUpdateErrorKind::JetStream(err))
2432 }
2433 }
2434 ConsumerErrorKind::Request => {
2435 ConsumerUpdateError::new(ConsumerUpdateErrorKind::Request)
2436 }
2437 ConsumerErrorKind::TimedOut => {
2438 ConsumerUpdateError::new(ConsumerUpdateErrorKind::TimedOut)
2439 }
2440 ConsumerErrorKind::InvalidConsumerType => {
2441 ConsumerUpdateError::new(ConsumerUpdateErrorKind::InvalidConsumerType)
2442 }
2443 ConsumerErrorKind::InvalidName => {
2444 ConsumerUpdateError::new(ConsumerUpdateErrorKind::InvalidName)
2445 }
2446 ConsumerErrorKind::Other => ConsumerUpdateError::new(ConsumerUpdateErrorKind::Other),
2447 }
2448 }
2449}
2450
2451impl From<ConsumerError> for ConsumerCreateStrictError {
2452 fn from(err: ConsumerError) -> Self {
2453 match err.kind() {
2454 ConsumerErrorKind::JetStream(err) => {
2455 if err.error_code() == super::errors::ErrorCode::CONSUMER_ALREADY_EXISTS {
2456 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::AlreadyExists)
2457 } else {
2458 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::JetStream(err))
2459 }
2460 }
2461 ConsumerErrorKind::Request => {
2462 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::Request)
2463 }
2464 ConsumerErrorKind::TimedOut => {
2465 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::TimedOut)
2466 }
2467 ConsumerErrorKind::InvalidConsumerType => {
2468 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::InvalidConsumerType)
2469 }
2470 ConsumerErrorKind::InvalidName => {
2471 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::InvalidName)
2472 }
2473 ConsumerErrorKind::Other => {
2474 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::Other)
2475 }
2476 }
2477 }
2478}
2479
2480impl From<super::context::RequestError> for ConsumerError {
2481 fn from(err: super::context::RequestError) -> Self {
2482 match err.kind() {
2483 RequestErrorKind::TimedOut => ConsumerError::new(ConsumerErrorKind::TimedOut),
2484 _ => ConsumerError::with_source(ConsumerErrorKind::Request, err),
2485 }
2486 }
2487}
2488impl From<super::context::RequestError> for ConsumerUpdateError {
2489 fn from(err: super::context::RequestError) -> Self {
2490 match err.kind() {
2491 RequestErrorKind::TimedOut => {
2492 ConsumerUpdateError::new(ConsumerUpdateErrorKind::TimedOut)
2493 }
2494 _ => ConsumerUpdateError::with_source(ConsumerUpdateErrorKind::Request, err),
2495 }
2496 }
2497}
2498impl From<super::context::RequestError> for ConsumerCreateStrictError {
2499 fn from(err: super::context::RequestError) -> Self {
2500 match err.kind() {
2501 RequestErrorKind::TimedOut => {
2502 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::TimedOut)
2503 }
2504 _ => {
2505 ConsumerCreateStrictError::with_source(ConsumerCreateStrictErrorKind::Request, err)
2506 }
2507 }
2508 }
2509}
2510
2511#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
2512pub struct StreamGetMessage {
2513 #[serde(rename = "seq", skip_serializing_if = "is_default")]
2514 sequence: Option<u64>,
2515 #[serde(rename = "next_by_subj", skip_serializing_if = "is_default")]
2516 next_by_subject: Option<String>,
2517 #[serde(rename = "last_by_subj", skip_serializing_if = "is_default")]
2518 last_by_subject: Option<String>,
2519}
2520
2521#[cfg(test)]
2522mod tests {
2523 use super::*;
2524
2525 #[test]
2526 fn consumer_limits_de() {
2527 let config = Config {
2528 ..Default::default()
2529 };
2530
2531 let roundtrip: Config = {
2532 let ser = serde_json::to_string(&config).unwrap();
2533 serde_json::from_str(&ser).unwrap()
2534 };
2535 assert_eq!(config, roundtrip);
2536 }
2537}