1#[cfg(feature = "server_2_10")]
17use std::collections::HashMap;
18use std::{
19 fmt::{self, Debug, Display},
20 future::IntoFuture,
21 io::{self, ErrorKind},
22 pin::Pin,
23 str::FromStr,
24 task::Poll,
25 time::Duration,
26};
27
28use crate::{
29 error::Error, header::HeaderName, is_valid_subject, HeaderMap, HeaderValue, StatusCode,
30};
31use base64::engine::general_purpose::STANDARD;
32use base64::engine::Engine;
33use bytes::Bytes;
34use futures::{future::BoxFuture, TryFutureExt};
35use serde::{Deserialize, Deserializer, Serialize};
36use serde_json::json;
37use time::{serde::rfc3339, OffsetDateTime};
38
39use super::{
40 consumer::{self, Consumer, FromConsumer, IntoConsumerConfig},
41 context::{RequestError, RequestErrorKind, StreamsError, StreamsErrorKind},
42 errors::ErrorCode,
43 response::Response,
44 Context, Message,
45};
46
47pub type InfoError = RequestError;
48
49#[derive(Clone, Debug, PartialEq)]
50pub enum DirectGetErrorKind {
51 NotFound,
52 InvalidSubject,
53 TimedOut,
54 Request,
55 ErrorResponse(StatusCode, String),
56 Other,
57}
58
59impl Display for DirectGetErrorKind {
60 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61 match self {
62 Self::InvalidSubject => write!(f, "invalid subject"),
63 Self::NotFound => write!(f, "message not found"),
64 Self::ErrorResponse(status, description) => {
65 write!(f, "unable to get message: {} {}", status, description)
66 }
67 Self::Other => write!(f, "error getting message"),
68 Self::TimedOut => write!(f, "timed out"),
69 Self::Request => write!(f, "request failed"),
70 }
71 }
72}
73
74pub type DirectGetError = Error<DirectGetErrorKind>;
75
76impl From<crate::RequestError> for DirectGetError {
77 fn from(err: crate::RequestError) -> Self {
78 match err.kind() {
79 crate::RequestErrorKind::TimedOut => DirectGetError::new(DirectGetErrorKind::TimedOut),
80 crate::RequestErrorKind::NoResponders => DirectGetError::new(DirectGetErrorKind::Other),
81 crate::RequestErrorKind::Other => {
82 DirectGetError::with_source(DirectGetErrorKind::Other, err)
83 }
84 }
85 }
86}
87
88impl From<serde_json::Error> for DirectGetError {
89 fn from(err: serde_json::Error) -> Self {
90 DirectGetError::with_source(DirectGetErrorKind::Other, err)
91 }
92}
93
94#[derive(Clone, Debug, PartialEq)]
95pub enum DeleteMessageErrorKind {
96 Request,
97 TimedOut,
98 JetStream(super::errors::Error),
99}
100
101impl Display for DeleteMessageErrorKind {
102 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
103 match self {
104 Self::Request => write!(f, "request failed"),
105 Self::TimedOut => write!(f, "timed out"),
106 Self::JetStream(err) => write!(f, "JetStream error: {}", err),
107 }
108 }
109}
110
111pub type DeleteMessageError = Error<DeleteMessageErrorKind>;
112
113#[derive(Debug, Clone)]
115pub struct Stream {
116 pub(crate) info: Info,
117 pub(crate) context: Context,
118}
119
120impl Stream {
121 pub async fn info(&mut self) -> Result<&Info, InfoError> {
139 let subject = format!("STREAM.INFO.{}", self.info.config.name);
140
141 match self.context.request(subject, &json!({})).await? {
142 Response::Ok::<Info>(info) => {
143 self.info = info;
144 Ok(&self.info)
145 }
146 Response::Err { error } => Err(error.into()),
147 }
148 }
149
150 pub fn cached_info(&self) -> &Info {
169 &self.info
170 }
171
172 pub async fn direct_get_next_for_subject<T: AsRef<str>>(
207 &self,
208 subject: T,
209 sequence: Option<u64>,
210 ) -> Result<Message, DirectGetError> {
211 if !is_valid_subject(&subject) {
212 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject));
213 }
214 let request_subject = format!(
215 "{}.DIRECT.GET.{}",
216 &self.context.prefix, &self.info.config.name
217 );
218 let payload;
219 if let Some(sequence) = sequence {
220 payload = json!({
221 "seq": sequence,
222 "next_by_subj": subject.as_ref(),
223 });
224 } else {
225 payload = json!({
226 "next_by_subj": subject.as_ref(),
227 });
228 }
229
230 let response = self
231 .context
232 .client
233 .request(
234 request_subject,
235 serde_json::to_vec(&payload).map(Bytes::from)?,
236 )
237 .await
238 .map(|message| Message {
239 message,
240 context: self.context.clone(),
241 })?;
242 if let Some(status) = response.status {
243 if let Some(ref description) = response.description {
244 match status {
245 StatusCode::NOT_FOUND => {
246 return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
247 }
248 StatusCode::TIMEOUT => {
250 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
251 }
252 _ => {
253 return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
254 status,
255 description.to_string(),
256 )));
257 }
258 }
259 }
260 }
261 Ok(response)
262 }
263
264 pub async fn direct_get_first_for_subject<T: AsRef<str>>(
296 &self,
297 subject: T,
298 ) -> Result<Message, DirectGetError> {
299 if !is_valid_subject(&subject) {
300 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject));
301 }
302 let request_subject = format!(
303 "{}.DIRECT.GET.{}",
304 &self.context.prefix, &self.info.config.name
305 );
306 let payload = json!({
307 "next_by_subj": subject.as_ref(),
308 });
309
310 let response = self
311 .context
312 .client
313 .request(
314 request_subject,
315 serde_json::to_vec(&payload).map(Bytes::from)?,
316 )
317 .await
318 .map(|message| Message {
319 message,
320 context: self.context.clone(),
321 })?;
322 if let Some(status) = response.status {
323 if let Some(ref description) = response.description {
324 match status {
325 StatusCode::NOT_FOUND => {
326 return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
327 }
328 StatusCode::TIMEOUT => {
330 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
331 }
332 _ => {
333 return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
334 status,
335 description.to_string(),
336 )));
337 }
338 }
339 }
340 }
341 Ok(response)
342 }
343
344 pub async fn direct_get(&self, sequence: u64) -> Result<Message, DirectGetError> {
376 let subject = format!(
377 "{}.DIRECT.GET.{}",
378 &self.context.prefix, &self.info.config.name
379 );
380 let payload = json!({
381 "seq": sequence,
382 });
383
384 let response = self
385 .context
386 .client
387 .request(subject, serde_json::to_vec(&payload).map(Bytes::from)?)
388 .await
389 .map(|message| Message {
390 context: self.context.clone(),
391 message,
392 })?;
393
394 if let Some(status) = response.status {
395 if let Some(ref description) = response.description {
396 match status {
397 StatusCode::NOT_FOUND => {
398 return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
399 }
400 StatusCode::TIMEOUT => {
402 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
403 }
404 _ => {
405 return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
406 status,
407 description.to_string(),
408 )));
409 }
410 }
411 }
412 }
413 Ok(response)
414 }
415
416 pub async fn direct_get_last_for_subject<T: AsRef<str>>(
448 &self,
449 subject: T,
450 ) -> Result<Message, DirectGetError> {
451 let subject = format!(
452 "{}.DIRECT.GET.{}.{}",
453 &self.context.prefix,
454 &self.info.config.name,
455 subject.as_ref()
456 );
457
458 let response = self
459 .context
460 .client
461 .request(subject, "".into())
462 .await
463 .map(|message| Message {
464 context: self.context.clone(),
465 message,
466 })?;
467 if let Some(status) = response.status {
468 if let Some(ref description) = response.description {
469 match status {
470 StatusCode::NOT_FOUND => {
471 return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
472 }
473 StatusCode::TIMEOUT => {
475 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
476 }
477 _ => {
478 return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
479 status,
480 description.to_string(),
481 )));
482 }
483 }
484 }
485 }
486 Ok(response)
487 }
488 pub async fn get_raw_message(&self, sequence: u64) -> Result<RawMessage, crate::Error> {
516 let subject = format!("STREAM.MSG.GET.{}", &self.info.config.name);
517 let payload = json!({
518 "seq": sequence,
519 });
520
521 let response: Response<GetRawMessage> = self.context.request(subject, &payload).await?;
522 match response {
523 Response::Err { error } => Err(Box::new(std::io::Error::new(
524 ErrorKind::Other,
525 format!("nats: error while getting message: {}", error),
526 ))),
527 Response::Ok(value) => Ok(value.message),
528 }
529 }
530
531 pub async fn get_last_raw_message_by_subject(
559 &self,
560 stream_subject: &str,
561 ) -> Result<RawMessage, LastRawMessageError> {
562 let subject = format!("STREAM.MSG.GET.{}", &self.info.config.name);
563 let payload = json!({
564 "last_by_subj": stream_subject,
565 });
566
567 let response: Response<GetRawMessage> = self
568 .context
569 .request(subject, &payload)
570 .map_err(|err| LastRawMessageError::with_source(LastRawMessageErrorKind::Other, err))
571 .await?;
572 match response {
573 Response::Err { error } => {
574 if error.error_code() == ErrorCode::NO_MESSAGE_FOUND {
575 Err(LastRawMessageError::new(
576 LastRawMessageErrorKind::NoMessageFound,
577 ))
578 } else {
579 Err(LastRawMessageError::new(
580 LastRawMessageErrorKind::JetStream(error),
581 ))
582 }
583 }
584 Response::Ok(value) => Ok(value.message),
585 }
586 }
587
588 pub async fn delete_message(&self, sequence: u64) -> Result<bool, DeleteMessageError> {
612 let subject = format!("STREAM.MSG.DELETE.{}", &self.info.config.name);
613 let payload = json!({
614 "seq": sequence,
615 });
616
617 let response: Response<DeleteStatus> = self
618 .context
619 .request(subject, &payload)
620 .map_err(|err| match err.kind() {
621 RequestErrorKind::TimedOut => {
622 DeleteMessageError::new(DeleteMessageErrorKind::TimedOut)
623 }
624 _ => DeleteMessageError::with_source(DeleteMessageErrorKind::Request, err),
625 })
626 .await?;
627
628 match response {
629 Response::Err { error } => Err(DeleteMessageError::new(
630 DeleteMessageErrorKind::JetStream(error),
631 )),
632 Response::Ok(value) => Ok(value.success),
633 }
634 }
635
636 pub fn purge(&self) -> Purge<No, No> {
652 Purge::build(self)
653 }
654
655 #[deprecated(
672 since = "0.25.0",
673 note = "Overloads have been replaced with an into_future based builder. Use Stream::purge().filter(subject) instead."
674 )]
675 pub async fn purge_subject<T>(&self, subject: T) -> Result<PurgeResponse, PurgeError>
676 where
677 T: Into<String>,
678 {
679 self.purge().filter(subject).await
680 }
681
682 pub async fn create_consumer<C: IntoConsumerConfig + FromConsumer>(
706 &self,
707 config: C,
708 ) -> Result<Consumer<C>, ConsumerError> {
709 self.context
710 .create_consumer_on_stream(config, self.info.config.name.clone())
711 .await
712 }
713
714 #[cfg(feature = "server_2_10")]
738 pub async fn update_consumer<C: IntoConsumerConfig + FromConsumer>(
739 &self,
740 config: C,
741 ) -> Result<Consumer<C>, ConsumerUpdateError> {
742 self.context
743 .update_consumer_on_stream(config, self.info.config.name.clone())
744 .await
745 }
746
747 #[cfg(feature = "server_2_10")]
772 pub async fn create_consumer_strict<C: IntoConsumerConfig + FromConsumer>(
773 &self,
774 config: C,
775 ) -> Result<Consumer<C>, ConsumerCreateStrictError> {
776 self.context
777 .create_consumer_strict_on_stream(config, self.info.config.name.clone())
778 .await
779 }
780
781 pub async fn consumer_info<T: AsRef<str>>(
798 &self,
799 name: T,
800 ) -> Result<consumer::Info, crate::Error> {
801 let name = name.as_ref();
802
803 let subject = format!("CONSUMER.INFO.{}.{}", self.info.config.name, name);
804
805 match self.context.request(subject, &json!({})).await? {
806 Response::Ok(info) => Ok(info),
807 Response::Err { error } => Err(Box::new(std::io::Error::new(
808 ErrorKind::Other,
809 format!("nats: error while getting consumer info: {}", error),
810 ))),
811 }
812 }
813
814 pub async fn get_consumer<T: FromConsumer + IntoConsumerConfig>(
833 &self,
834 name: &str,
835 ) -> Result<Consumer<T>, crate::Error> {
836 let info = self.consumer_info(name).await?;
837
838 Ok(Consumer::new(
839 T::try_from_consumer_config(info.config.clone())?,
840 info,
841 self.context.clone(),
842 ))
843 }
844
845 pub async fn get_or_create_consumer<T: FromConsumer + IntoConsumerConfig>(
873 &self,
874 name: &str,
875 config: T,
876 ) -> Result<Consumer<T>, ConsumerError> {
877 let subject = format!("CONSUMER.INFO.{}.{}", self.info.config.name, name);
878
879 match self.context.request(subject, &json!({})).await? {
880 Response::Err { error } if error.code() == 404 => self.create_consumer(config).await,
881 Response::Err { error } => Err(error.into()),
882 Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
883 T::try_from_consumer_config(info.config.clone()).map_err(|err| {
884 ConsumerError::with_source(ConsumerErrorKind::InvalidConsumerType, err)
885 })?,
886 info,
887 self.context.clone(),
888 )),
889 }
890 }
891
892 pub async fn delete_consumer(&self, name: &str) -> Result<DeleteStatus, ConsumerError> {
913 let subject = format!("CONSUMER.DELETE.{}.{}", self.info.config.name, name);
914
915 match self.context.request(subject, &json!({})).await? {
916 Response::Ok(delete_status) => Ok(delete_status),
917 Response::Err { error } => Err(error.into()),
918 }
919 }
920
921 pub fn consumer_names(&self) -> ConsumerNames {
940 ConsumerNames {
941 context: self.context.clone(),
942 stream: self.info.config.name.clone(),
943 offset: 0,
944 page_request: None,
945 consumers: Vec::new(),
946 done: false,
947 }
948 }
949
950 pub fn consumers(&self) -> Consumers {
969 Consumers {
970 context: self.context.clone(),
971 stream: self.info.config.name.clone(),
972 offset: 0,
973 page_request: None,
974 consumers: Vec::new(),
975 done: false,
976 }
977 }
978}
979
980#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
984pub struct Config {
985 pub name: String,
987 pub max_bytes: i64,
989 #[serde(rename = "max_msgs")]
991 pub max_messages: i64,
992 #[serde(rename = "max_msgs_per_subject")]
994 pub max_messages_per_subject: i64,
995 pub discard: DiscardPolicy,
998 #[serde(default, skip_serializing_if = "is_default")]
1000 pub discard_new_per_subject: bool,
1001 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1004 pub subjects: Vec<String>,
1005 pub retention: RetentionPolicy,
1007 pub max_consumers: i32,
1009 #[serde(with = "serde_nanos")]
1011 pub max_age: Duration,
1012 #[serde(default, skip_serializing_if = "is_default", rename = "max_msg_size")]
1014 pub max_message_size: i32,
1015 pub storage: StorageType,
1017 pub num_replicas: usize,
1019 #[serde(default, skip_serializing_if = "is_default")]
1021 pub no_ack: bool,
1022 #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
1024 pub duplicate_window: Duration,
1025 #[serde(default, skip_serializing_if = "is_default")]
1027 pub template_owner: String,
1028 #[serde(default, skip_serializing_if = "is_default")]
1030 pub sealed: bool,
1031 #[serde(default, skip_serializing_if = "is_default")]
1033 pub description: Option<String>,
1034 #[serde(
1035 default,
1036 rename = "allow_rollup_hdrs",
1037 skip_serializing_if = "is_default"
1038 )]
1039 pub allow_rollup: bool,
1041 #[serde(default, skip_serializing_if = "is_default")]
1042 pub deny_delete: bool,
1044 #[serde(default, skip_serializing_if = "is_default")]
1046 pub deny_purge: bool,
1047
1048 #[serde(default, skip_serializing_if = "is_default")]
1050 pub republish: Option<Republish>,
1051
1052 #[serde(default, skip_serializing_if = "is_default")]
1055 pub allow_direct: bool,
1056
1057 #[serde(default, skip_serializing_if = "is_default")]
1059 pub mirror_direct: bool,
1060
1061 #[serde(default, skip_serializing_if = "Option::is_none")]
1063 pub mirror: Option<Source>,
1064
1065 #[serde(default, skip_serializing_if = "Option::is_none")]
1067 pub sources: Option<Vec<Source>>,
1068
1069 #[cfg(feature = "server_2_10")]
1070 #[serde(default, skip_serializing_if = "is_default")]
1072 pub metadata: HashMap<String, String>,
1073
1074 #[cfg(feature = "server_2_10")]
1075 #[serde(default, skip_serializing_if = "Option::is_none")]
1077 pub subject_transform: Option<SubjectTransform>,
1078
1079 #[cfg(feature = "server_2_10")]
1080 #[serde(default, skip_serializing_if = "Option::is_none")]
1085 pub compression: Option<Compression>,
1086 #[cfg(feature = "server_2_10")]
1087 #[serde(default, deserialize_with = "default_consumer_limits_as_none")]
1089 pub consumer_limits: Option<ConsumerLimits>,
1090
1091 #[cfg(feature = "server_2_10")]
1092 #[serde(default, skip_serializing_if = "Option::is_none", rename = "first_seq")]
1094 pub first_sequence: Option<u64>,
1095
1096 #[serde(default, skip_serializing_if = "Option::is_none")]
1098 pub placement: Option<Placement>,
1099}
1100
1101impl From<&Config> for Config {
1102 fn from(sc: &Config) -> Config {
1103 sc.clone()
1104 }
1105}
1106
1107impl From<&str> for Config {
1108 fn from(s: &str) -> Config {
1109 Config {
1110 name: s.to_string(),
1111 ..Default::default()
1112 }
1113 }
1114}
1115
1116#[cfg(feature = "server_2_10")]
1117fn default_consumer_limits_as_none<'de, D>(
1118 deserializer: D,
1119) -> Result<Option<ConsumerLimits>, D::Error>
1120where
1121 D: Deserializer<'de>,
1122{
1123 let consumer_limits = ConsumerLimits::deserialize(deserializer)?;
1124 if consumer_limits == ConsumerLimits::default() {
1125 Ok(None)
1126 } else {
1127 Ok(Some(consumer_limits))
1128 }
1129}
1130
1131#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Default)]
1132pub struct ConsumerLimits {
1133 #[serde(default, with = "serde_nanos")]
1135 pub inactive_threshold: std::time::Duration,
1136 #[serde(default)]
1138 pub max_ack_pending: i64,
1139}
1140
1141#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1142pub enum Compression {
1143 #[serde(rename = "s2")]
1144 S2,
1145 #[serde(rename = "none")]
1146 None,
1147}
1148
1149#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1151pub struct SubjectTransform {
1152 #[serde(rename = "src")]
1153 pub source: String,
1154
1155 #[serde(rename = "dest")]
1156 pub destination: String,
1157}
1158
1159#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1161pub struct Republish {
1162 #[serde(rename = "src")]
1164 pub source: String,
1165 #[serde(rename = "dest")]
1167 pub destination: String,
1168 #[serde(default)]
1170 pub headers_only: bool,
1171}
1172
1173#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
1175pub struct Placement {
1176 #[serde(default, skip_serializing_if = "is_default")]
1178 pub cluster: Option<String>,
1179 #[serde(default, skip_serializing_if = "is_default")]
1181 pub tags: Vec<String>,
1182}
1183
1184#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1187#[repr(u8)]
1188pub enum DiscardPolicy {
1189 #[default]
1191 #[serde(rename = "old")]
1192 Old = 0,
1193 #[serde(rename = "new")]
1195 New = 1,
1196}
1197
1198#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1200#[repr(u8)]
1201pub enum RetentionPolicy {
1202 #[default]
1205 #[serde(rename = "limits")]
1206 Limits = 0,
1207 #[serde(rename = "interest")]
1209 Interest = 1,
1210 #[serde(rename = "workqueue")]
1212 WorkQueue = 2,
1213}
1214
1215#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1217#[repr(u8)]
1218pub enum StorageType {
1219 #[default]
1221 #[serde(rename = "file")]
1222 File = 0,
1223 #[serde(rename = "memory")]
1225 Memory = 1,
1226}
1227
1228#[derive(Debug, Deserialize, Clone)]
1230pub struct Info {
1231 pub config: Config,
1233 #[serde(with = "rfc3339")]
1235 pub created: time::OffsetDateTime,
1236 pub state: State,
1238 pub cluster: Option<ClusterInfo>,
1240 #[serde(default)]
1242 pub mirror: Option<SourceInfo>,
1243 #[serde(default)]
1245 pub sources: Vec<SourceInfo>,
1246}
1247
1248#[derive(Deserialize)]
1249pub struct DeleteStatus {
1250 pub success: bool,
1251}
1252
1253#[derive(Debug, Deserialize, Clone, Copy)]
1255pub struct State {
1256 pub messages: u64,
1258 pub bytes: u64,
1260 #[serde(rename = "first_seq")]
1262 pub first_sequence: u64,
1263 #[serde(with = "rfc3339", rename = "first_ts")]
1265 pub first_timestamp: time::OffsetDateTime,
1266 #[serde(rename = "last_seq")]
1268 pub last_sequence: u64,
1269 #[serde(with = "rfc3339", rename = "last_ts")]
1271 pub last_timestamp: time::OffsetDateTime,
1272 pub consumer_count: usize,
1274}
1275
1276#[derive(Debug, Serialize, Deserialize, Clone)]
1278pub struct RawMessage {
1279 #[serde(rename = "subject")]
1281 pub subject: String,
1282
1283 #[serde(rename = "seq")]
1285 pub sequence: u64,
1286
1287 #[serde(default, rename = "data")]
1289 pub payload: String,
1290
1291 #[serde(default, rename = "hdrs")]
1293 pub headers: Option<String>,
1294
1295 #[serde(rename = "time", with = "rfc3339")]
1297 pub time: time::OffsetDateTime,
1298}
1299
1300impl TryFrom<RawMessage> for crate::Message {
1301 type Error = crate::Error;
1302
1303 fn try_from(value: RawMessage) -> Result<Self, Self::Error> {
1304 let decoded_payload = STANDARD
1305 .decode(value.payload)
1306 .map_err(|err| Box::new(std::io::Error::new(ErrorKind::Other, err)))?;
1307 let decoded_headers = value
1308 .headers
1309 .map(|header| STANDARD.decode(header))
1310 .map_or(Ok(None), |v| v.map(Some))?;
1311
1312 let length = decoded_headers
1313 .as_ref()
1314 .map_or_else(|| 0, |headers| headers.len())
1315 + decoded_payload.len()
1316 + value.subject.len();
1317
1318 let (headers, status, description) =
1319 decoded_headers.map_or_else(|| Ok((None, None, None)), |h| parse_headers(&h))?;
1320
1321 Ok(crate::Message {
1322 subject: value.subject.into(),
1323 reply: None,
1324 payload: decoded_payload.into(),
1325 headers,
1326 status,
1327 description,
1328 length,
1329 })
1330 }
1331}
1332
1333fn is_continuation(c: char) -> bool {
1334 c == ' ' || c == '\t'
1335}
1336const HEADER_LINE: &str = "NATS/1.0";
1337
1338#[allow(clippy::type_complexity)]
1339fn parse_headers(
1340 buf: &[u8],
1341) -> Result<(Option<HeaderMap>, Option<StatusCode>, Option<String>), crate::Error> {
1342 let mut headers = HeaderMap::new();
1343 let mut maybe_status: Option<StatusCode> = None;
1344 let mut maybe_description: Option<String> = None;
1345 let mut lines = if let Ok(line) = std::str::from_utf8(buf) {
1346 line.lines().peekable()
1347 } else {
1348 return Err(Box::new(std::io::Error::new(
1349 ErrorKind::Other,
1350 "invalid header",
1351 )));
1352 };
1353
1354 if let Some(line) = lines.next() {
1355 let line = line
1356 .strip_prefix(HEADER_LINE)
1357 .ok_or_else(|| {
1358 Box::new(std::io::Error::new(
1359 ErrorKind::Other,
1360 "version line does not start with NATS/1.0",
1361 ))
1362 })?
1363 .trim();
1364
1365 match line.split_once(' ') {
1366 Some((status, description)) => {
1367 if !status.is_empty() {
1368 maybe_status = Some(status.parse()?);
1369 }
1370
1371 if !description.is_empty() {
1372 maybe_description = Some(description.trim().to_string());
1373 }
1374 }
1375 None => {
1376 if !line.is_empty() {
1377 maybe_status = Some(line.parse()?);
1378 }
1379 }
1380 }
1381 } else {
1382 return Err(Box::new(std::io::Error::new(
1383 ErrorKind::Other,
1384 "expected header information not found",
1385 )));
1386 };
1387
1388 while let Some(line) = lines.next() {
1389 if line.is_empty() {
1390 continue;
1391 }
1392
1393 if let Some((k, v)) = line.split_once(':').to_owned() {
1394 let mut s = String::from(v.trim());
1395 while let Some(v) = lines.next_if(|s| s.starts_with(is_continuation)).to_owned() {
1396 s.push(' ');
1397 s.push_str(v.trim());
1398 }
1399
1400 headers.insert(
1401 HeaderName::from_str(k)?,
1402 HeaderValue::from_str(&s)
1403 .map_err(|err| Box::new(io::Error::new(ErrorKind::Other, err)))?,
1404 );
1405 } else {
1406 return Err(Box::new(std::io::Error::new(
1407 ErrorKind::Other,
1408 "malformed header line",
1409 )));
1410 }
1411 }
1412
1413 if headers.is_empty() {
1414 Ok((None, maybe_status, maybe_description))
1415 } else {
1416 Ok((Some(headers), maybe_status, maybe_description))
1417 }
1418}
1419
1420#[derive(Debug, Serialize, Deserialize, Clone)]
1421struct GetRawMessage {
1422 pub(crate) message: RawMessage,
1423}
1424
1425fn is_default<T: Default + Eq>(t: &T) -> bool {
1426 t == &T::default()
1427}
1428#[derive(Debug, Default, Deserialize, Clone, PartialEq, Eq)]
1430pub struct ClusterInfo {
1431 pub name: Option<String>,
1433 pub leader: Option<String>,
1435 #[serde(default)]
1437 pub replicas: Vec<PeerInfo>,
1438}
1439
1440#[derive(Debug, Default, Deserialize, Clone, PartialEq, Eq)]
1442pub struct PeerInfo {
1443 pub name: String,
1445 pub current: bool,
1447 #[serde(with = "serde_nanos")]
1449 pub active: Duration,
1450 #[serde(default)]
1452 pub offline: bool,
1453 pub lag: Option<u64>,
1455}
1456
1457#[derive(Debug, Clone, Deserialize)]
1458pub struct SourceInfo {
1459 pub name: String,
1461 pub lag: u64,
1463 #[serde(deserialize_with = "negative_duration_as_none")]
1465 pub active: Option<std::time::Duration>,
1466 #[serde(default)]
1468 pub filter_subject: Option<String>,
1469 #[serde(default)]
1471 pub subject_transform_dest: Option<String>,
1472 #[serde(default)]
1474 pub subject_transforms: Vec<SubjectTransform>,
1475}
1476
1477fn negative_duration_as_none<'de, D>(
1478 deserializer: D,
1479) -> Result<Option<std::time::Duration>, D::Error>
1480where
1481 D: Deserializer<'de>,
1482{
1483 let n = i64::deserialize(deserializer)?;
1484 if n.is_negative() {
1485 Ok(None)
1486 } else {
1487 Ok(Some(std::time::Duration::from_nanos(n as u64)))
1488 }
1489}
1490
1491#[derive(Debug, Deserialize, Clone, Copy)]
1493pub struct PurgeResponse {
1494 pub success: bool,
1496 pub purged: u64,
1498}
1499#[derive(Default, Debug, Serialize, Clone)]
1501pub struct PurgeRequest {
1502 #[serde(default, rename = "seq", skip_serializing_if = "is_default")]
1504 pub sequence: Option<u64>,
1505
1506 #[serde(default, skip_serializing_if = "is_default")]
1508 pub filter: Option<String>,
1509
1510 #[serde(default, skip_serializing_if = "is_default")]
1512 pub keep: Option<u64>,
1513}
1514
1515#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
1516pub struct Source {
1517 pub name: String,
1519 #[serde(default, rename = "opt_start_seq", skip_serializing_if = "is_default")]
1521 pub start_sequence: Option<u64>,
1522 #[serde(
1523 default,
1524 rename = "opt_start_time",
1525 skip_serializing_if = "is_default",
1526 with = "rfc3339::option"
1527 )]
1528 pub start_time: Option<OffsetDateTime>,
1530 #[serde(default, skip_serializing_if = "is_default")]
1532 pub filter_subject: Option<String>,
1533 #[serde(default, skip_serializing_if = "Option::is_none")]
1535 pub external: Option<External>,
1536 #[serde(default, skip_serializing_if = "is_default")]
1538 pub domain: Option<String>,
1539 #[cfg(feature = "server_2_10")]
1541 #[serde(default, skip_serializing_if = "is_default")]
1542 pub subject_transforms: Vec<SubjectTransform>,
1543}
1544
1545#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
1546pub struct External {
1547 #[serde(rename = "api")]
1549 pub api_prefix: String,
1550 #[serde(rename = "deliver", skip_serializing_if = "is_default")]
1552 pub delivery_prefix: Option<String>,
1553}
1554
1555use std::marker::PhantomData;
1556
1557#[derive(Debug, Default)]
1558pub struct Yes;
1559#[derive(Debug, Default)]
1560pub struct No;
1561
1562pub trait ToAssign: Debug {}
1563
1564impl ToAssign for Yes {}
1565impl ToAssign for No {}
1566
1567#[derive(Debug)]
1568pub struct Purge<'a, SEQUENCE, KEEP>
1569where
1570 SEQUENCE: ToAssign,
1571 KEEP: ToAssign,
1572{
1573 stream: &'a Stream,
1574 inner: PurgeRequest,
1575 sequence_set: PhantomData<SEQUENCE>,
1576 keep_set: PhantomData<KEEP>,
1577}
1578
1579impl<'a, SEQUENCE, KEEP> Purge<'a, SEQUENCE, KEEP>
1580where
1581 SEQUENCE: ToAssign,
1582 KEEP: ToAssign,
1583{
1584 pub fn filter<T: Into<String>>(mut self, filter: T) -> Purge<'a, SEQUENCE, KEEP> {
1586 self.inner.filter = Some(filter.into());
1587 self
1588 }
1589}
1590
1591impl<'a> Purge<'a, No, No> {
1592 pub(crate) fn build(stream: &'a Stream) -> Purge<'a, No, No> {
1593 Purge {
1594 stream,
1595 inner: Default::default(),
1596 sequence_set: PhantomData {},
1597 keep_set: PhantomData {},
1598 }
1599 }
1600}
1601
1602impl<'a, KEEP> Purge<'a, No, KEEP>
1603where
1604 KEEP: ToAssign,
1605{
1606 pub fn keep(self, keep: u64) -> Purge<'a, No, Yes> {
1609 Purge {
1610 stream: self.stream,
1611 sequence_set: PhantomData {},
1612 keep_set: PhantomData {},
1613 inner: PurgeRequest {
1614 keep: Some(keep),
1615 ..self.inner
1616 },
1617 }
1618 }
1619}
1620impl<'a, SEQUENCE> Purge<'a, SEQUENCE, No>
1621where
1622 SEQUENCE: ToAssign,
1623{
1624 pub fn sequence(self, sequence: u64) -> Purge<'a, Yes, No> {
1627 Purge {
1628 stream: self.stream,
1629 sequence_set: PhantomData {},
1630 keep_set: PhantomData {},
1631 inner: PurgeRequest {
1632 sequence: Some(sequence),
1633 ..self.inner
1634 },
1635 }
1636 }
1637}
1638
1639#[derive(Clone, Debug, PartialEq)]
1640pub enum PurgeErrorKind {
1641 Request,
1642 TimedOut,
1643 JetStream(super::errors::Error),
1644}
1645
1646impl Display for PurgeErrorKind {
1647 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1648 match self {
1649 Self::Request => write!(f, "request failed"),
1650 Self::TimedOut => write!(f, "timed out"),
1651 Self::JetStream(err) => write!(f, "JetStream error: {}", err),
1652 }
1653 }
1654}
1655
1656pub type PurgeError = Error<PurgeErrorKind>;
1657
1658impl<'a, S, K> IntoFuture for Purge<'a, S, K>
1659where
1660 S: ToAssign + std::marker::Send,
1661 K: ToAssign + std::marker::Send,
1662{
1663 type Output = Result<PurgeResponse, PurgeError>;
1664
1665 type IntoFuture = BoxFuture<'a, Result<PurgeResponse, PurgeError>>;
1666
1667 fn into_future(self) -> Self::IntoFuture {
1668 Box::pin(std::future::IntoFuture::into_future(async move {
1669 let request_subject = format!("STREAM.PURGE.{}", self.stream.info.config.name);
1670 let response: Response<PurgeResponse> = self
1671 .stream
1672 .context
1673 .request(request_subject, &self.inner)
1674 .map_err(|err| match err.kind() {
1675 RequestErrorKind::TimedOut => PurgeError::new(PurgeErrorKind::TimedOut),
1676 _ => PurgeError::with_source(PurgeErrorKind::Request, err),
1677 })
1678 .await?;
1679
1680 match response {
1681 Response::Err { error } => Err(PurgeError::new(PurgeErrorKind::JetStream(error))),
1682 Response::Ok(response) => Ok(response),
1683 }
1684 }))
1685 }
1686}
1687
1688#[derive(Deserialize, Debug)]
1689struct ConsumerPage {
1690 total: usize,
1691 consumers: Option<Vec<String>>,
1692}
1693
1694#[derive(Deserialize, Debug)]
1695struct ConsumerInfoPage {
1696 total: usize,
1697 consumers: Option<Vec<super::consumer::Info>>,
1698}
1699
1700type ConsumerNamesErrorKind = StreamsErrorKind;
1701type ConsumerNamesError = StreamsError;
1702type PageRequest = BoxFuture<'static, Result<ConsumerPage, RequestError>>;
1703
1704pub struct ConsumerNames {
1705 context: Context,
1706 stream: String,
1707 offset: usize,
1708 page_request: Option<PageRequest>,
1709 consumers: Vec<String>,
1710 done: bool,
1711}
1712
1713impl futures::Stream for ConsumerNames {
1714 type Item = Result<String, ConsumerNamesError>;
1715
1716 fn poll_next(
1717 mut self: Pin<&mut Self>,
1718 cx: &mut std::task::Context<'_>,
1719 ) -> std::task::Poll<Option<Self::Item>> {
1720 match self.page_request.as_mut() {
1721 Some(page) => match page.try_poll_unpin(cx) {
1722 std::task::Poll::Ready(page) => {
1723 self.page_request = None;
1724 let page = page.map_err(|err| {
1725 ConsumerNamesError::with_source(ConsumerNamesErrorKind::Other, err)
1726 })?;
1727
1728 if let Some(consumers) = page.consumers {
1729 self.offset += consumers.len();
1730 self.consumers = consumers;
1731 if self.offset >= page.total {
1732 self.done = true;
1733 }
1734 match self.consumers.pop() {
1735 Some(stream) => Poll::Ready(Some(Ok(stream))),
1736 None => Poll::Ready(None),
1737 }
1738 } else {
1739 Poll::Ready(None)
1740 }
1741 }
1742 std::task::Poll::Pending => std::task::Poll::Pending,
1743 },
1744 None => {
1745 if let Some(stream) = self.consumers.pop() {
1746 Poll::Ready(Some(Ok(stream)))
1747 } else {
1748 if self.done {
1749 return Poll::Ready(None);
1750 }
1751 let context = self.context.clone();
1752 let offset = self.offset;
1753 let stream = self.stream.clone();
1754 self.page_request = Some(Box::pin(async move {
1755 match context
1756 .request(
1757 format!("CONSUMER.NAMES.{stream}"),
1758 &json!({
1759 "offset": offset,
1760 }),
1761 )
1762 .await?
1763 {
1764 Response::Err { error } => Err(RequestError::with_source(
1765 super::context::RequestErrorKind::Other,
1766 error,
1767 )),
1768 Response::Ok(page) => Ok(page),
1769 }
1770 }));
1771 self.poll_next(cx)
1772 }
1773 }
1774 }
1775 }
1776}
1777
1778pub type ConsumersErrorKind = StreamsErrorKind;
1779pub type ConsumersError = StreamsError;
1780type PageInfoRequest = BoxFuture<'static, Result<ConsumerInfoPage, RequestError>>;
1781
1782pub struct Consumers {
1783 context: Context,
1784 stream: String,
1785 offset: usize,
1786 page_request: Option<PageInfoRequest>,
1787 consumers: Vec<super::consumer::Info>,
1788 done: bool,
1789}
1790
1791impl futures::Stream for Consumers {
1792 type Item = Result<super::consumer::Info, ConsumersError>;
1793
1794 fn poll_next(
1795 mut self: Pin<&mut Self>,
1796 cx: &mut std::task::Context<'_>,
1797 ) -> std::task::Poll<Option<Self::Item>> {
1798 match self.page_request.as_mut() {
1799 Some(page) => match page.try_poll_unpin(cx) {
1800 std::task::Poll::Ready(page) => {
1801 self.page_request = None;
1802 let page = page.map_err(|err| {
1803 ConsumersError::with_source(ConsumersErrorKind::Other, err)
1804 })?;
1805 if let Some(consumers) = page.consumers {
1806 self.offset += consumers.len();
1807 self.consumers = consumers;
1808 if self.offset >= page.total {
1809 self.done = true;
1810 }
1811 match self.consumers.pop() {
1812 Some(consumer) => Poll::Ready(Some(Ok(consumer))),
1813 None => Poll::Ready(None),
1814 }
1815 } else {
1816 Poll::Ready(None)
1817 }
1818 }
1819 std::task::Poll::Pending => std::task::Poll::Pending,
1820 },
1821 None => {
1822 if let Some(stream) = self.consumers.pop() {
1823 Poll::Ready(Some(Ok(stream)))
1824 } else {
1825 if self.done {
1826 return Poll::Ready(None);
1827 }
1828 let context = self.context.clone();
1829 let offset = self.offset;
1830 let stream = self.stream.clone();
1831 self.page_request = Some(Box::pin(async move {
1832 match context
1833 .request(
1834 format!("CONSUMER.LIST.{stream}"),
1835 &json!({
1836 "offset": offset,
1837 }),
1838 )
1839 .await?
1840 {
1841 Response::Err { error } => Err(RequestError::with_source(
1842 super::context::RequestErrorKind::Other,
1843 error,
1844 )),
1845 Response::Ok(page) => Ok(page),
1846 }
1847 }));
1848 self.poll_next(cx)
1849 }
1850 }
1851 }
1852 }
1853}
1854
1855#[derive(Clone, Debug, PartialEq)]
1856pub enum LastRawMessageErrorKind {
1857 NoMessageFound,
1858 JetStream(super::errors::Error),
1859 Other,
1860}
1861
1862impl Display for LastRawMessageErrorKind {
1863 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1864 match self {
1865 Self::NoMessageFound => write!(f, "no message found"),
1866 Self::Other => write!(f, "failed to get last raw message"),
1867 Self::JetStream(err) => write!(f, "JetStream error: {}", err),
1868 }
1869 }
1870}
1871
1872pub type LastRawMessageError = Error<LastRawMessageErrorKind>;
1873
1874#[derive(Clone, Debug, PartialEq)]
1875pub enum ConsumerErrorKind {
1876 TimedOut,
1878 Request,
1879 InvalidConsumerType,
1880 InvalidName,
1881 JetStream(super::errors::Error),
1882 Other,
1883}
1884
1885impl Display for ConsumerErrorKind {
1886 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1887 match self {
1888 Self::TimedOut => write!(f, "timed out"),
1889 Self::Request => write!(f, "request failed"),
1890 Self::JetStream(err) => write!(f, "JetStream error: {}", err),
1891 Self::Other => write!(f, "consumer error"),
1892 Self::InvalidConsumerType => write!(f, "invalid consumer type"),
1893 Self::InvalidName => write!(f, "invalid consumer name"),
1894 }
1895 }
1896}
1897
1898pub type ConsumerError = Error<ConsumerErrorKind>;
1899
1900#[derive(Clone, Debug, PartialEq)]
1901pub enum ConsumerCreateStrictErrorKind {
1902 TimedOut,
1904 Request,
1905 InvalidConsumerType,
1906 InvalidName,
1907 AlreadyExists,
1908 JetStream(super::errors::Error),
1909 Other,
1910}
1911
1912impl Display for ConsumerCreateStrictErrorKind {
1913 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1914 match self {
1915 Self::TimedOut => write!(f, "timed out"),
1916 Self::Request => write!(f, "request failed"),
1917 Self::JetStream(err) => write!(f, "JetStream error: {}", err),
1918 Self::Other => write!(f, "consumer error"),
1919 Self::InvalidConsumerType => write!(f, "invalid consumer type"),
1920 Self::InvalidName => write!(f, "invalid consumer name"),
1921 Self::AlreadyExists => write!(f, "consumer already exists"),
1922 }
1923 }
1924}
1925
1926pub type ConsumerCreateStrictError = Error<ConsumerCreateStrictErrorKind>;
1927
1928#[derive(Clone, Debug, PartialEq)]
1929pub enum ConsumerUpdateErrorKind {
1930 TimedOut,
1932 Request,
1933 InvalidConsumerType,
1934 InvalidName,
1935 DoesNotExist,
1936 JetStream(super::errors::Error),
1937 Other,
1938}
1939
1940impl Display for ConsumerUpdateErrorKind {
1941 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1942 match self {
1943 Self::TimedOut => write!(f, "timed out"),
1944 Self::Request => write!(f, "request failed"),
1945 Self::JetStream(err) => write!(f, "JetStream error: {}", err),
1946 Self::Other => write!(f, "consumer error"),
1947 Self::InvalidConsumerType => write!(f, "invalid consumer type"),
1948 Self::InvalidName => write!(f, "invalid consumer name"),
1949 Self::DoesNotExist => write!(f, "consumer does not exist"),
1950 }
1951 }
1952}
1953
1954pub type ConsumerUpdateError = Error<ConsumerUpdateErrorKind>;
1955
1956impl From<super::errors::Error> for ConsumerError {
1957 fn from(err: super::errors::Error) -> Self {
1958 ConsumerError::new(ConsumerErrorKind::JetStream(err))
1959 }
1960}
1961impl From<super::errors::Error> for ConsumerCreateStrictError {
1962 fn from(err: super::errors::Error) -> Self {
1963 if err.error_code() == super::errors::ErrorCode::CONSUMER_ALREADY_EXISTS {
1964 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::AlreadyExists)
1965 } else {
1966 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::JetStream(err))
1967 }
1968 }
1969}
1970impl From<super::errors::Error> for ConsumerUpdateError {
1971 fn from(err: super::errors::Error) -> Self {
1972 if err.error_code() == super::errors::ErrorCode::CONSUMER_DOES_NOT_EXIST {
1973 ConsumerUpdateError::new(ConsumerUpdateErrorKind::DoesNotExist)
1974 } else {
1975 ConsumerUpdateError::new(ConsumerUpdateErrorKind::JetStream(err))
1976 }
1977 }
1978}
1979impl From<ConsumerError> for ConsumerUpdateError {
1980 fn from(err: ConsumerError) -> Self {
1981 match err.kind() {
1982 ConsumerErrorKind::JetStream(err) => {
1983 if err.error_code() == super::errors::ErrorCode::CONSUMER_DOES_NOT_EXIST {
1984 ConsumerUpdateError::new(ConsumerUpdateErrorKind::DoesNotExist)
1985 } else {
1986 ConsumerUpdateError::new(ConsumerUpdateErrorKind::JetStream(err))
1987 }
1988 }
1989 ConsumerErrorKind::Request => {
1990 ConsumerUpdateError::new(ConsumerUpdateErrorKind::Request)
1991 }
1992 ConsumerErrorKind::TimedOut => {
1993 ConsumerUpdateError::new(ConsumerUpdateErrorKind::TimedOut)
1994 }
1995 ConsumerErrorKind::InvalidConsumerType => {
1996 ConsumerUpdateError::new(ConsumerUpdateErrorKind::InvalidConsumerType)
1997 }
1998 ConsumerErrorKind::InvalidName => {
1999 ConsumerUpdateError::new(ConsumerUpdateErrorKind::InvalidName)
2000 }
2001 ConsumerErrorKind::Other => ConsumerUpdateError::new(ConsumerUpdateErrorKind::Other),
2002 }
2003 }
2004}
2005
2006impl From<ConsumerError> for ConsumerCreateStrictError {
2007 fn from(err: ConsumerError) -> Self {
2008 match err.kind() {
2009 ConsumerErrorKind::JetStream(err) => {
2010 if err.error_code() == super::errors::ErrorCode::CONSUMER_ALREADY_EXISTS {
2011 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::AlreadyExists)
2012 } else {
2013 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::JetStream(err))
2014 }
2015 }
2016 ConsumerErrorKind::Request => {
2017 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::Request)
2018 }
2019 ConsumerErrorKind::TimedOut => {
2020 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::TimedOut)
2021 }
2022 ConsumerErrorKind::InvalidConsumerType => {
2023 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::InvalidConsumerType)
2024 }
2025 ConsumerErrorKind::InvalidName => {
2026 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::InvalidName)
2027 }
2028 ConsumerErrorKind::Other => {
2029 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::Other)
2030 }
2031 }
2032 }
2033}
2034
2035impl From<super::context::RequestError> for ConsumerError {
2036 fn from(err: super::context::RequestError) -> Self {
2037 match err.kind() {
2038 RequestErrorKind::TimedOut => ConsumerError::new(ConsumerErrorKind::TimedOut),
2039 _ => ConsumerError::with_source(ConsumerErrorKind::Request, err),
2040 }
2041 }
2042}
2043impl From<super::context::RequestError> for ConsumerUpdateError {
2044 fn from(err: super::context::RequestError) -> Self {
2045 match err.kind() {
2046 RequestErrorKind::TimedOut => {
2047 ConsumerUpdateError::new(ConsumerUpdateErrorKind::TimedOut)
2048 }
2049 _ => ConsumerUpdateError::with_source(ConsumerUpdateErrorKind::Request, err),
2050 }
2051 }
2052}
2053impl From<super::context::RequestError> for ConsumerCreateStrictError {
2054 fn from(err: super::context::RequestError) -> Self {
2055 match err.kind() {
2056 RequestErrorKind::TimedOut => {
2057 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::TimedOut)
2058 }
2059 _ => {
2060 ConsumerCreateStrictError::with_source(ConsumerCreateStrictErrorKind::Request, err)
2061 }
2062 }
2063 }
2064}