1use crate::error::Error;
17use crate::header::{IntoHeaderName, IntoHeaderValue};
18use crate::jetstream::account::Account;
19use crate::jetstream::publish::PublishAck;
20use crate::jetstream::response::Response;
21use crate::subject::ToSubject;
22use crate::{header, Client, Command, HeaderMap, HeaderValue, Message, StatusCode};
23use bytes::Bytes;
24use futures::future::BoxFuture;
25use futures::{Future, TryFutureExt};
26use serde::de::DeserializeOwned;
27use serde::{Deserialize, Serialize};
28use serde_json::{self, json};
29use std::borrow::Borrow;
30use std::fmt::Display;
31use std::future::IntoFuture;
32use std::pin::Pin;
33use std::str::from_utf8;
34use std::task::Poll;
35use std::time::Duration;
36use tokio::sync::oneshot;
37use tracing::debug;
38
39use super::consumer::{self, Consumer, FromConsumer, IntoConsumerConfig};
40use super::errors::ErrorCode;
41use super::is_valid_name;
42use super::kv::{Store, MAX_HISTORY};
43use super::object_store::{is_valid_bucket_name, ObjectStore};
44use super::stream::{
45 self, Config, ConsumerError, ConsumerErrorKind, DeleteStatus, DiscardPolicy, External, Info,
46 Stream,
47};
48#[cfg(feature = "server_2_10")]
49use super::stream::{Compression, ConsumerCreateStrictError, ConsumerUpdateError};
50
51#[derive(Debug, Clone)]
53pub struct Context {
54 pub(crate) client: Client,
55 pub(crate) prefix: String,
56 pub(crate) timeout: Duration,
57}
58
59impl Context {
60 pub(crate) fn new(client: Client) -> Context {
61 Context {
62 client,
63 prefix: "$JS.API".to_string(),
64 timeout: Duration::from_secs(5),
65 }
66 }
67
68 pub fn set_timeout(&mut self, timeout: Duration) {
69 self.timeout = timeout
70 }
71
72 pub(crate) fn with_prefix<T: ToString>(client: Client, prefix: T) -> Context {
73 Context {
74 client,
75 prefix: prefix.to_string(),
76 timeout: Duration::from_secs(5),
77 }
78 }
79
80 pub(crate) fn with_domain<T: AsRef<str>>(client: Client, domain: T) -> Context {
81 Context {
82 client,
83 prefix: format!("$JS.{}.API", domain.as_ref()),
84 timeout: Duration::from_secs(5),
85 }
86 }
87
88 pub async fn publish<S: ToSubject>(
129 &self,
130 subject: S,
131 payload: Bytes,
132 ) -> Result<PublishAckFuture, PublishError> {
133 self.send_publish(subject, Publish::build().payload(payload))
134 .await
135 }
136
137 pub async fn publish_with_headers<S: ToSubject>(
159 &self,
160 subject: S,
161 headers: crate::header::HeaderMap,
162 payload: Bytes,
163 ) -> Result<PublishAckFuture, PublishError> {
164 self.send_publish(subject, Publish::build().payload(payload).headers(headers))
165 .await
166 }
167
168 pub async fn send_publish<S: ToSubject>(
191 &self,
192 subject: S,
193 publish: Publish,
194 ) -> Result<PublishAckFuture, PublishError> {
195 let subject = subject.to_subject();
196 let (sender, receiver) = oneshot::channel();
197
198 let respond = self.client.new_inbox().into();
199
200 let send_fut = self
201 .client
202 .sender
203 .send(Command::Request {
204 subject,
205 payload: publish.payload,
206 respond,
207 headers: publish.headers,
208 sender,
209 })
210 .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err));
211
212 tokio::time::timeout(self.timeout, send_fut)
213 .map_err(|_elapsed| PublishError::new(PublishErrorKind::TimedOut))
214 .await??;
215
216 Ok(PublishAckFuture {
217 timeout: self.timeout,
218 subscription: receiver,
219 })
220 }
221
222 pub async fn query_account(&self) -> Result<Account, AccountError> {
224 let response: Response<Account> = self.request("INFO", b"").await?;
225
226 match response {
227 Response::Err { error } => Err(AccountError::new(AccountErrorKind::JetStream(error))),
228 Response::Ok(account) => Ok(account),
229 }
230 }
231
232 pub async fn create_stream<S>(&self, stream_config: S) -> Result<Stream, CreateStreamError>
257 where
258 Config: From<S>,
259 {
260 let mut config: Config = stream_config.into();
261 if config.name.is_empty() {
262 return Err(CreateStreamError::new(
263 CreateStreamErrorKind::EmptyStreamName,
264 ));
265 }
266 if !is_valid_name(config.name.as_str()) {
267 return Err(CreateStreamError::new(
268 CreateStreamErrorKind::InvalidStreamName,
269 ));
270 }
271 if let Some(ref mut mirror) = config.mirror {
272 if let Some(ref mut domain) = mirror.domain {
273 if mirror.external.is_some() {
274 return Err(CreateStreamError::new(
275 CreateStreamErrorKind::DomainAndExternalSet,
276 ));
277 }
278 mirror.external = Some(External {
279 api_prefix: format!("$JS.{domain}.API"),
280 delivery_prefix: None,
281 })
282 }
283 }
284
285 if let Some(ref mut sources) = config.sources {
286 for source in sources {
287 if let Some(ref mut domain) = source.domain {
288 if source.external.is_some() {
289 return Err(CreateStreamError::new(
290 CreateStreamErrorKind::DomainAndExternalSet,
291 ));
292 }
293 source.external = Some(External {
294 api_prefix: format!("$JS.{domain}.API"),
295 delivery_prefix: None,
296 })
297 }
298 }
299 }
300 let subject = format!("STREAM.CREATE.{}", config.name);
301 let response: Response<Info> = self.request(subject, &config).await?;
302
303 match response {
304 Response::Err { error } => Err(error.into()),
305 Response::Ok(info) => Ok(Stream {
306 context: self.clone(),
307 info,
308 }),
309 }
310 }
311
312 pub async fn get_stream<T: AsRef<str>>(&self, stream: T) -> Result<Stream, GetStreamError> {
328 let stream = stream.as_ref();
329 if stream.is_empty() {
330 return Err(GetStreamError::new(GetStreamErrorKind::EmptyName));
331 }
332
333 if !is_valid_name(stream) {
334 return Err(GetStreamError::new(GetStreamErrorKind::InvalidStreamName));
335 }
336
337 let subject = format!("STREAM.INFO.{stream}");
338 let request: Response<Info> = self
339 .request(subject, &())
340 .await
341 .map_err(|err| GetStreamError::with_source(GetStreamErrorKind::Request, err))?;
342 match request {
343 Response::Err { error } => {
344 Err(GetStreamError::new(GetStreamErrorKind::JetStream(error)))
345 }
346 Response::Ok(info) => Ok(Stream {
347 context: self.clone(),
348 info,
349 }),
350 }
351 }
352
353 pub async fn get_or_create_stream<S>(
377 &self,
378 stream_config: S,
379 ) -> Result<Stream, CreateStreamError>
380 where
381 S: Into<Config>,
382 {
383 let config: Config = stream_config.into();
384
385 if config.name.is_empty() {
386 return Err(CreateStreamError::new(
387 CreateStreamErrorKind::EmptyStreamName,
388 ));
389 }
390
391 if !is_valid_name(config.name.as_str()) {
392 return Err(CreateStreamError::new(
393 CreateStreamErrorKind::InvalidStreamName,
394 ));
395 }
396 let subject = format!("STREAM.INFO.{}", config.name);
397
398 let request: Response<Info> = self.request(subject, &()).await?;
399 match request {
400 Response::Err { error } if error.code() == 404 => self.create_stream(&config).await,
401 Response::Err { error } => Err(error.into()),
402 Response::Ok(info) => Ok(Stream {
403 context: self.clone(),
404 info,
405 }),
406 }
407 }
408
409 pub async fn delete_stream<T: AsRef<str>>(
425 &self,
426 stream: T,
427 ) -> Result<DeleteStatus, DeleteStreamError> {
428 let stream = stream.as_ref();
429 if stream.is_empty() {
430 return Err(DeleteStreamError::new(DeleteStreamErrorKind::EmptyName));
431 }
432
433 if !is_valid_name(stream) {
434 return Err(DeleteStreamError::new(
435 DeleteStreamErrorKind::InvalidStreamName,
436 ));
437 }
438
439 let subject = format!("STREAM.DELETE.{stream}");
440 match self
441 .request(subject, &json!({}))
442 .await
443 .map_err(|err| DeleteStreamError::with_source(DeleteStreamErrorKind::Request, err))?
444 {
445 Response::Err { error } => Err(DeleteStreamError::new(
446 DeleteStreamErrorKind::JetStream(error),
447 )),
448 Response::Ok(delete_response) => Ok(delete_response),
449 }
450 }
451
452 pub async fn update_stream<S>(&self, config: S) -> Result<Info, UpdateStreamError>
477 where
478 S: Borrow<Config>,
479 {
480 let config = config.borrow();
481
482 if config.name.is_empty() {
483 return Err(CreateStreamError::new(
484 CreateStreamErrorKind::EmptyStreamName,
485 ));
486 }
487
488 if !is_valid_name(config.name.as_str()) {
489 return Err(CreateStreamError::new(
490 CreateStreamErrorKind::InvalidStreamName,
491 ));
492 }
493
494 let subject = format!("STREAM.UPDATE.{}", config.name);
495 match self.request(subject, config).await? {
496 Response::Err { error } => Err(error.into()),
497 Response::Ok(info) => Ok(info),
498 }
499 }
500
501 pub fn stream_names(&self) -> StreamNames {
519 StreamNames {
520 context: self.clone(),
521 offset: 0,
522 page_request: None,
523 streams: Vec::new(),
524 done: false,
525 }
526 }
527
528 pub fn streams(&self) -> Streams {
546 Streams {
547 context: self.clone(),
548 offset: 0,
549 page_request: None,
550 streams: Vec::new(),
551 done: false,
552 }
553 }
554 pub async fn get_key_value<T: Into<String>>(&self, bucket: T) -> Result<Store, KeyValueError> {
568 let bucket: String = bucket.into();
569 if !crate::jetstream::kv::is_valid_bucket_name(&bucket) {
570 return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
571 }
572
573 let stream_name = format!("KV_{}", &bucket);
574 let stream = self
575 .get_stream(stream_name.clone())
576 .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::GetBucket, err))
577 .await?;
578
579 if stream.info.config.max_messages_per_subject < 1 {
580 return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
581 }
582 let mut store = Store {
583 prefix: format!("$KV.{}.", &bucket),
584 name: bucket,
585 stream_name,
586 stream: stream.clone(),
587 put_prefix: None,
588 use_jetstream_prefix: self.prefix != "$JS.API",
589 };
590 if let Some(ref mirror) = stream.info.config.mirror {
591 let bucket = mirror.name.trim_start_matches("KV_");
592 if let Some(ref external) = mirror.external {
593 if !external.api_prefix.is_empty() {
594 store.use_jetstream_prefix = false;
595 store.prefix = format!("$KV.{bucket}.");
596 store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
597 } else {
598 store.put_prefix = Some(format!("$KV.{bucket}."));
599 }
600 }
601 };
602
603 Ok(store)
604 }
605
606 pub async fn create_key_value(
626 &self,
627 mut config: crate::jetstream::kv::Config,
628 ) -> Result<Store, CreateKeyValueError> {
629 if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
630 return Err(CreateKeyValueError::new(
631 CreateKeyValueErrorKind::InvalidStoreName,
632 ));
633 }
634
635 let history = if config.history > 0 {
636 if config.history > MAX_HISTORY {
637 return Err(CreateKeyValueError::new(
638 CreateKeyValueErrorKind::TooLongHistory,
639 ));
640 }
641 config.history
642 } else {
643 1
644 };
645
646 let num_replicas = if config.num_replicas == 0 {
647 1
648 } else {
649 config.num_replicas
650 };
651
652 let mut subjects = Vec::new();
653 if let Some(ref mut mirror) = config.mirror {
654 if !mirror.name.starts_with("KV_") {
655 mirror.name = format!("KV_{}", mirror.name);
656 }
657 config.mirror_direct = true;
658 } else if let Some(ref mut sources) = config.sources {
659 for source in sources {
660 if !source.name.starts_with("KV_") {
661 source.name = format!("KV_{}", source.name);
662 }
663 }
664 } else {
665 subjects = vec![format!("$KV.{}.>", config.bucket)];
666 }
667
668 let stream = self
669 .create_stream(stream::Config {
670 name: format!("KV_{}", config.bucket),
671 description: Some(config.description),
672 subjects,
673 max_messages_per_subject: history,
674 max_bytes: config.max_bytes,
675 max_age: config.max_age,
676 max_message_size: config.max_value_size,
677 storage: config.storage,
678 republish: config.republish,
679 allow_rollup: true,
680 deny_delete: true,
681 deny_purge: false,
682 allow_direct: true,
683 sources: config.sources,
684 mirror: config.mirror,
685 num_replicas,
686 discard: stream::DiscardPolicy::New,
687 mirror_direct: config.mirror_direct,
688 #[cfg(feature = "server_2_10")]
689 compression: if config.compression {
690 Some(stream::Compression::S2)
691 } else {
692 None
693 },
694 placement: config.placement,
695 ..Default::default()
696 })
697 .await
698 .map_err(|err| {
699 if err.kind() == CreateStreamErrorKind::TimedOut {
700 CreateKeyValueError::with_source(CreateKeyValueErrorKind::TimedOut, err)
701 } else {
702 CreateKeyValueError::with_source(CreateKeyValueErrorKind::BucketCreate, err)
703 }
704 })?;
705
706 let mut store = Store {
707 prefix: format!("$KV.{}.", &config.bucket),
708 name: config.bucket,
709 stream: stream.clone(),
710 stream_name: stream.info.config.name,
711 put_prefix: None,
712 use_jetstream_prefix: self.prefix != "$JS.API",
713 };
714 if let Some(ref mirror) = stream.info.config.mirror {
715 let bucket = mirror.name.trim_start_matches("KV_");
716 if let Some(ref external) = mirror.external {
717 if !external.api_prefix.is_empty() {
718 store.use_jetstream_prefix = false;
719 store.prefix = format!("$KV.{bucket}.");
720 store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
721 } else {
722 store.put_prefix = Some(format!("$KV.{bucket}."));
723 }
724 }
725 };
726
727 Ok(store)
728 }
729
730 pub async fn delete_key_value<T: AsRef<str>>(
750 &self,
751 bucket: T,
752 ) -> Result<DeleteStatus, KeyValueError> {
753 if !crate::jetstream::kv::is_valid_bucket_name(bucket.as_ref()) {
754 return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
755 }
756
757 let stream_name = format!("KV_{}", bucket.as_ref());
758 self.delete_stream(stream_name)
759 .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::JetStream, err))
760 .await
761 }
762
763 pub async fn get_consumer_from_stream<T, C, S>(
801 &self,
802 consumer: C,
803 stream: S,
804 ) -> Result<Consumer<T>, ConsumerError>
805 where
806 T: FromConsumer + IntoConsumerConfig,
807 S: AsRef<str>,
808 C: AsRef<str>,
809 {
810 if !is_valid_name(stream.as_ref()) {
811 return Err(ConsumerError::with_source(
812 ConsumerErrorKind::InvalidName,
813 "invalid stream",
814 ));
815 }
816
817 if !is_valid_name(consumer.as_ref()) {
818 return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
819 }
820
821 let subject = format!("CONSUMER.INFO.{}.{}", stream.as_ref(), consumer.as_ref());
822
823 let info: super::consumer::Info = match self.request(subject, &json!({})).await? {
824 Response::Ok(info) => info,
825 Response::Err { error } => return Err(error.into()),
826 };
827
828 Ok(Consumer::new(
829 T::try_from_consumer_config(info.config.clone()).map_err(|err| {
830 ConsumerError::with_source(ConsumerErrorKind::InvalidConsumerType, err)
831 })?,
832 info,
833 self.clone(),
834 ))
835 }
836
837 pub async fn delete_consumer_from_stream<C: AsRef<str>, S: AsRef<str>>(
860 &self,
861 consumer: C,
862 stream: S,
863 ) -> Result<DeleteStatus, ConsumerError> {
864 if !is_valid_name(consumer.as_ref()) {
865 return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
866 }
867
868 if !is_valid_name(stream.as_ref()) {
869 return Err(ConsumerError::with_source(
870 ConsumerErrorKind::Other,
871 "invalid stream name",
872 ));
873 }
874
875 let subject = format!("CONSUMER.DELETE.{}.{}", stream.as_ref(), consumer.as_ref());
876
877 match self.request(subject, &json!({})).await? {
878 Response::Ok(delete_status) => Ok(delete_status),
879 Response::Err { error } => Err(error.into()),
880 }
881 }
882
883 pub async fn create_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
909 &self,
910 config: C,
911 stream: S,
912 ) -> Result<Consumer<C>, ConsumerError> {
913 self.create_consumer_on_stream_action(config, stream, ConsumerAction::CreateOrUpdate)
914 .await
915 }
916
917 #[cfg(feature = "server_2_10")]
944 pub async fn update_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
945 &self,
946 config: C,
947 stream: S,
948 ) -> Result<Consumer<C>, ConsumerUpdateError> {
949 self.create_consumer_on_stream_action(config, stream, ConsumerAction::Update)
950 .await
951 .map_err(|err| err.into())
952 }
953
954 #[cfg(feature = "server_2_10")]
981 pub async fn create_consumer_strict_on_stream<
982 C: IntoConsumerConfig + FromConsumer,
983 S: AsRef<str>,
984 >(
985 &self,
986 config: C,
987 stream: S,
988 ) -> Result<Consumer<C>, ConsumerCreateStrictError> {
989 self.create_consumer_on_stream_action(config, stream, ConsumerAction::Create)
990 .await
991 .map_err(|err| err.into())
992 }
993
994 async fn create_consumer_on_stream_action<
995 C: IntoConsumerConfig + FromConsumer,
996 S: AsRef<str>,
997 >(
998 &self,
999 config: C,
1000 stream: S,
1001 action: ConsumerAction,
1002 ) -> Result<Consumer<C>, ConsumerError> {
1003 let config = config.into_consumer_config();
1004
1005 let subject = {
1006 if self.client.is_server_compatible(2, 9, 0) {
1007 let filter = if config.filter_subject.is_empty() {
1008 "".to_string()
1009 } else {
1010 format!(".{}", config.filter_subject)
1011 };
1012 config
1013 .name
1014 .as_ref()
1015 .or(config.durable_name.as_ref())
1016 .map(|name| format!("CONSUMER.CREATE.{}.{}{}", stream.as_ref(), name, filter))
1017 .unwrap_or_else(|| format!("CONSUMER.CREATE.{}", stream.as_ref()))
1018 } else if config.name.is_some() {
1019 return Err(ConsumerError::with_source(
1020 ConsumerErrorKind::Other,
1021 "can't use consumer name with server < 2.9.0",
1022 ));
1023 } else if let Some(ref durable_name) = config.durable_name {
1024 format!(
1025 "CONSUMER.DURABLE.CREATE.{}.{}",
1026 stream.as_ref(),
1027 durable_name
1028 )
1029 } else {
1030 format!("CONSUMER.CREATE.{}", stream.as_ref())
1031 }
1032 };
1033
1034 match self
1035 .request(
1036 subject,
1037 &json!({"stream_name": stream.as_ref(), "config": config, "action": action}),
1038 )
1039 .await?
1040 {
1041 Response::Err { error } => Err(ConsumerError::new(ConsumerErrorKind::JetStream(error))),
1042 Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
1043 FromConsumer::try_from_consumer_config(info.clone().config)
1044 .map_err(|err| ConsumerError::with_source(ConsumerErrorKind::Other, err))?,
1045 info,
1046 self.clone(),
1047 )),
1048 }
1049 }
1050
1051 pub async fn request<S, T, V>(&self, subject: S, payload: &T) -> Result<V, RequestError>
1071 where
1072 S: ToSubject,
1073 T: ?Sized + Serialize,
1074 V: DeserializeOwned,
1075 {
1076 let subject = subject.to_subject();
1077 let request = serde_json::to_vec(&payload)
1078 .map(Bytes::from)
1079 .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
1080
1081 debug!("JetStream request sent: {:?}", request);
1082
1083 let message = self
1084 .client
1085 .request(format!("{}.{}", self.prefix, subject.as_ref()), request)
1086 .await;
1087 let message = message?;
1088 debug!(
1089 "JetStream request response: {:?}",
1090 from_utf8(&message.payload)
1091 );
1092 let response = serde_json::from_slice(message.payload.as_ref())
1093 .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
1094
1095 Ok(response)
1096 }
1097
1098 pub async fn create_object_store(
1117 &self,
1118 config: super::object_store::Config,
1119 ) -> Result<super::object_store::ObjectStore, CreateObjectStoreError> {
1120 if !super::object_store::is_valid_bucket_name(&config.bucket) {
1121 return Err(CreateObjectStoreError::new(
1122 CreateKeyValueErrorKind::InvalidStoreName,
1123 ));
1124 }
1125
1126 let bucket_name = config.bucket.clone();
1127 let stream_name = format!("OBJ_{bucket_name}");
1128 let chunk_subject = format!("$O.{bucket_name}.C.>");
1129 let meta_subject = format!("$O.{bucket_name}.M.>");
1130
1131 let stream = self
1132 .create_stream(super::stream::Config {
1133 name: stream_name,
1134 description: config.description.clone(),
1135 subjects: vec![chunk_subject, meta_subject],
1136 max_age: config.max_age,
1137 storage: config.storage,
1138 num_replicas: config.num_replicas,
1139 discard: DiscardPolicy::New,
1140 allow_rollup: true,
1141 allow_direct: true,
1142 #[cfg(feature = "server_2_10")]
1143 compression: if config.compression {
1144 Some(Compression::S2)
1145 } else {
1146 None
1147 },
1148 placement: config.placement,
1149 ..Default::default()
1150 })
1151 .await
1152 .map_err(|err| {
1153 CreateObjectStoreError::with_source(CreateKeyValueErrorKind::BucketCreate, err)
1154 })?;
1155
1156 Ok(ObjectStore {
1157 name: bucket_name,
1158 stream,
1159 })
1160 }
1161
1162 pub async fn get_object_store<T: AsRef<str>>(
1176 &self,
1177 bucket_name: T,
1178 ) -> Result<ObjectStore, ObjectStoreError> {
1179 let bucket_name = bucket_name.as_ref();
1180 if !is_valid_bucket_name(bucket_name) {
1181 return Err(ObjectStoreError::new(
1182 ObjectStoreErrorKind::InvalidBucketName,
1183 ));
1184 }
1185 let stream_name = format!("OBJ_{bucket_name}");
1186 let stream = self
1187 .get_stream(stream_name)
1188 .await
1189 .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?;
1190
1191 Ok(ObjectStore {
1192 name: bucket_name.to_string(),
1193 stream,
1194 })
1195 }
1196
1197 pub async fn delete_object_store<T: AsRef<str>>(
1211 &self,
1212 bucket_name: T,
1213 ) -> Result<(), DeleteObjectStore> {
1214 let stream_name = format!("OBJ_{}", bucket_name.as_ref());
1215 self.delete_stream(stream_name)
1216 .await
1217 .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?;
1218 Ok(())
1219 }
1220}
1221
1222#[derive(Clone, Copy, Debug, PartialEq)]
1223pub enum PublishErrorKind {
1224 StreamNotFound,
1225 WrongLastMessageId,
1226 WrongLastSequence,
1227 TimedOut,
1228 BrokenPipe,
1229 Other,
1230}
1231
1232impl Display for PublishErrorKind {
1233 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1234 match self {
1235 Self::StreamNotFound => write!(f, "no stream found for given subject"),
1236 Self::TimedOut => write!(f, "timed out: didn't receive ack in time"),
1237 Self::Other => write!(f, "publish failed"),
1238 Self::BrokenPipe => write!(f, "broken pipe"),
1239 Self::WrongLastMessageId => write!(f, "wrong last message id"),
1240 Self::WrongLastSequence => write!(f, "wrong last sequence"),
1241 }
1242 }
1243}
1244
1245pub type PublishError = Error<PublishErrorKind>;
1246
1247#[derive(Debug)]
1248pub struct PublishAckFuture {
1249 timeout: Duration,
1250 subscription: oneshot::Receiver<Message>,
1251}
1252
1253impl PublishAckFuture {
1254 async fn next_with_timeout(self) -> Result<PublishAck, PublishError> {
1255 let next = tokio::time::timeout(self.timeout, self.subscription)
1256 .await
1257 .map_err(|_| PublishError::new(PublishErrorKind::TimedOut))?;
1258 next.map_or_else(
1259 |_| Err(PublishError::new(PublishErrorKind::BrokenPipe)),
1260 |m| {
1261 if m.status == Some(StatusCode::NO_RESPONDERS) {
1262 return Err(PublishError::new(PublishErrorKind::StreamNotFound));
1263 }
1264 let response = serde_json::from_slice(m.payload.as_ref())
1265 .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err))?;
1266 match response {
1267 Response::Err { error } => match error.error_code() {
1268 ErrorCode::STREAM_WRONG_LAST_MESSAGE_ID => Err(PublishError::with_source(
1269 PublishErrorKind::WrongLastMessageId,
1270 error,
1271 )),
1272 ErrorCode::STREAM_WRONG_LAST_SEQUENCE => Err(PublishError::with_source(
1273 PublishErrorKind::WrongLastSequence,
1274 error,
1275 )),
1276 _ => Err(PublishError::with_source(PublishErrorKind::Other, error)),
1277 },
1278 Response::Ok(publish_ack) => Ok(publish_ack),
1279 }
1280 },
1281 )
1282 }
1283}
1284impl IntoFuture for PublishAckFuture {
1285 type Output = Result<PublishAck, PublishError>;
1286
1287 type IntoFuture = Pin<Box<dyn Future<Output = Result<PublishAck, PublishError>> + Send>>;
1288
1289 fn into_future(self) -> Self::IntoFuture {
1290 Box::pin(std::future::IntoFuture::into_future(
1291 self.next_with_timeout(),
1292 ))
1293 }
1294}
1295
1296#[derive(Deserialize, Debug)]
1297struct StreamPage {
1298 total: usize,
1299 streams: Option<Vec<String>>,
1300}
1301
1302#[derive(Deserialize, Debug)]
1303struct StreamInfoPage {
1304 total: usize,
1305 streams: Option<Vec<super::stream::Info>>,
1306}
1307
1308type PageRequest = BoxFuture<'static, Result<StreamPage, RequestError>>;
1309
1310pub struct StreamNames {
1311 context: Context,
1312 offset: usize,
1313 page_request: Option<PageRequest>,
1314 streams: Vec<String>,
1315 done: bool,
1316}
1317
1318impl futures::Stream for StreamNames {
1319 type Item = Result<String, StreamsError>;
1320
1321 fn poll_next(
1322 mut self: Pin<&mut Self>,
1323 cx: &mut std::task::Context<'_>,
1324 ) -> std::task::Poll<Option<Self::Item>> {
1325 match self.page_request.as_mut() {
1326 Some(page) => match page.try_poll_unpin(cx) {
1327 std::task::Poll::Ready(page) => {
1328 self.page_request = None;
1329 let page = page
1330 .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?;
1331 if let Some(streams) = page.streams {
1332 self.offset += streams.len();
1333 self.streams = streams;
1334 if self.offset >= page.total {
1335 self.done = true;
1336 }
1337 match self.streams.pop() {
1338 Some(stream) => Poll::Ready(Some(Ok(stream))),
1339 None => Poll::Ready(None),
1340 }
1341 } else {
1342 Poll::Ready(None)
1343 }
1344 }
1345 std::task::Poll::Pending => std::task::Poll::Pending,
1346 },
1347 None => {
1348 if let Some(stream) = self.streams.pop() {
1349 Poll::Ready(Some(Ok(stream)))
1350 } else {
1351 if self.done {
1352 return Poll::Ready(None);
1353 }
1354 let context = self.context.clone();
1355 let offset = self.offset;
1356 self.page_request = Some(Box::pin(async move {
1357 match context
1358 .request(
1359 "STREAM.NAMES",
1360 &json!({
1361 "offset": offset,
1362 }),
1363 )
1364 .await?
1365 {
1366 Response::Err { error } => {
1367 Err(RequestError::with_source(RequestErrorKind::Other, error))
1368 }
1369 Response::Ok(page) => Ok(page),
1370 }
1371 }));
1372 self.poll_next(cx)
1373 }
1374 }
1375 }
1376 }
1377}
1378
1379type PageInfoRequest = BoxFuture<'static, Result<StreamInfoPage, RequestError>>;
1380
1381pub type StreamsErrorKind = RequestErrorKind;
1382pub type StreamsError = RequestError;
1383
1384pub struct Streams {
1385 context: Context,
1386 offset: usize,
1387 page_request: Option<PageInfoRequest>,
1388 streams: Vec<super::stream::Info>,
1389 done: bool,
1390}
1391
1392impl futures::Stream for Streams {
1393 type Item = Result<super::stream::Info, StreamsError>;
1394
1395 fn poll_next(
1396 mut self: Pin<&mut Self>,
1397 cx: &mut std::task::Context<'_>,
1398 ) -> std::task::Poll<Option<Self::Item>> {
1399 match self.page_request.as_mut() {
1400 Some(page) => match page.try_poll_unpin(cx) {
1401 std::task::Poll::Ready(page) => {
1402 self.page_request = None;
1403 let page = page
1404 .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?;
1405 if let Some(streams) = page.streams {
1406 self.offset += streams.len();
1407 self.streams = streams;
1408 if self.offset >= page.total {
1409 self.done = true;
1410 }
1411 match self.streams.pop() {
1412 Some(stream) => Poll::Ready(Some(Ok(stream))),
1413 None => Poll::Ready(None),
1414 }
1415 } else {
1416 Poll::Ready(None)
1417 }
1418 }
1419 std::task::Poll::Pending => std::task::Poll::Pending,
1420 },
1421 None => {
1422 if let Some(stream) = self.streams.pop() {
1423 Poll::Ready(Some(Ok(stream)))
1424 } else {
1425 if self.done {
1426 return Poll::Ready(None);
1427 }
1428 let context = self.context.clone();
1429 let offset = self.offset;
1430 self.page_request = Some(Box::pin(async move {
1431 match context
1432 .request(
1433 "STREAM.LIST",
1434 &json!({
1435 "offset": offset,
1436 }),
1437 )
1438 .await?
1439 {
1440 Response::Err { error } => {
1441 Err(RequestError::with_source(RequestErrorKind::Other, error))
1442 }
1443 Response::Ok(page) => Ok(page),
1444 }
1445 }));
1446 self.poll_next(cx)
1447 }
1448 }
1449 }
1450 }
1451}
1452#[derive(Default, Clone, Debug)]
1454pub struct Publish {
1455 payload: Bytes,
1456 headers: Option<header::HeaderMap>,
1457}
1458impl Publish {
1459 pub fn build() -> Self {
1461 Default::default()
1462 }
1463
1464 pub fn payload(mut self, payload: Bytes) -> Self {
1466 self.payload = payload;
1467 self
1468 }
1469 pub fn headers(mut self, headers: HeaderMap) -> Self {
1471 self.headers = Some(headers);
1472 self
1473 }
1474 pub fn header<N: IntoHeaderName, V: IntoHeaderValue>(mut self, name: N, value: V) -> Self {
1476 self.headers
1477 .get_or_insert(header::HeaderMap::new())
1478 .insert(name, value);
1479 self
1480 }
1481 pub fn message_id<T: AsRef<str>>(self, id: T) -> Self {
1483 self.header(header::NATS_MESSAGE_ID, id.as_ref())
1484 }
1485 pub fn expected_last_message_id<T: AsRef<str>>(self, last_message_id: T) -> Self {
1488 self.header(
1489 header::NATS_EXPECTED_LAST_MESSAGE_ID,
1490 last_message_id.as_ref(),
1491 )
1492 }
1493 pub fn expected_last_sequence(self, last_sequence: u64) -> Self {
1496 self.header(
1497 header::NATS_EXPECTED_LAST_SEQUENCE,
1498 HeaderValue::from(last_sequence),
1499 )
1500 }
1501 pub fn expected_last_subject_sequence(self, subject_sequence: u64) -> Self {
1504 self.header(
1505 header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
1506 HeaderValue::from(subject_sequence),
1507 )
1508 }
1509 pub fn expected_stream<T: AsRef<str>>(self, stream: T) -> Self {
1512 self.header(
1513 header::NATS_EXPECTED_STREAM,
1514 HeaderValue::from(stream.as_ref()),
1515 )
1516 }
1517}
1518
1519#[derive(Clone, Copy, Debug, PartialEq)]
1520pub enum RequestErrorKind {
1521 NoResponders,
1522 TimedOut,
1523 Other,
1524}
1525
1526impl Display for RequestErrorKind {
1527 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1528 match self {
1529 Self::TimedOut => write!(f, "timed out"),
1530 Self::Other => write!(f, "request failed"),
1531 Self::NoResponders => write!(f, "requested JetStream resource does not exist"),
1532 }
1533 }
1534}
1535
1536pub type RequestError = Error<RequestErrorKind>;
1537
1538impl From<crate::RequestError> for RequestError {
1539 fn from(error: crate::RequestError) -> Self {
1540 match error.kind() {
1541 crate::RequestErrorKind::TimedOut => {
1542 RequestError::with_source(RequestErrorKind::TimedOut, error)
1543 }
1544 crate::RequestErrorKind::NoResponders => {
1545 RequestError::new(RequestErrorKind::NoResponders)
1546 }
1547 crate::RequestErrorKind::Other => {
1548 RequestError::with_source(RequestErrorKind::Other, error)
1549 }
1550 }
1551 }
1552}
1553
1554impl From<super::errors::Error> for RequestError {
1555 fn from(err: super::errors::Error) -> Self {
1556 RequestError::with_source(RequestErrorKind::Other, err)
1557 }
1558}
1559
1560#[derive(Clone, Debug, PartialEq)]
1561pub enum CreateStreamErrorKind {
1562 EmptyStreamName,
1563 InvalidStreamName,
1564 DomainAndExternalSet,
1565 JetStreamUnavailable,
1566 JetStream(super::errors::Error),
1567 TimedOut,
1568 Response,
1569 ResponseParse,
1570}
1571
1572impl Display for CreateStreamErrorKind {
1573 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1574 match self {
1575 Self::EmptyStreamName => write!(f, "stream name cannot be empty"),
1576 Self::InvalidStreamName => write!(f, "stream name cannot contain `.`, `_`"),
1577 Self::DomainAndExternalSet => write!(f, "domain and external are both set"),
1578 Self::JetStream(err) => write!(f, "jetstream error: {}", err),
1579 Self::TimedOut => write!(f, "jetstream request timed out"),
1580 Self::JetStreamUnavailable => write!(f, "jetstream unavailable"),
1581 Self::ResponseParse => write!(f, "failed to parse server response"),
1582 Self::Response => write!(f, "response error"),
1583 }
1584 }
1585}
1586
1587pub type CreateStreamError = Error<CreateStreamErrorKind>;
1588
1589impl From<super::errors::Error> for CreateStreamError {
1590 fn from(error: super::errors::Error) -> Self {
1591 CreateStreamError::new(CreateStreamErrorKind::JetStream(error))
1592 }
1593}
1594
1595impl From<RequestError> for CreateStreamError {
1596 fn from(error: RequestError) -> Self {
1597 match error.kind() {
1598 RequestErrorKind::NoResponders => {
1599 CreateStreamError::new(CreateStreamErrorKind::JetStreamUnavailable)
1600 }
1601 RequestErrorKind::TimedOut => CreateStreamError::new(CreateStreamErrorKind::TimedOut),
1602 RequestErrorKind::Other => {
1603 CreateStreamError::with_source(CreateStreamErrorKind::Response, error)
1604 }
1605 }
1606 }
1607}
1608
1609#[derive(Clone, Debug, PartialEq)]
1610pub enum GetStreamErrorKind {
1611 EmptyName,
1612 Request,
1613 InvalidStreamName,
1614 JetStream(super::errors::Error),
1615}
1616
1617impl Display for GetStreamErrorKind {
1618 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1619 match self {
1620 Self::EmptyName => write!(f, "empty name cannot be empty"),
1621 Self::Request => write!(f, "request error"),
1622 Self::InvalidStreamName => write!(f, "invalid stream name"),
1623 Self::JetStream(err) => write!(f, "jetstream error: {}", err),
1624 }
1625 }
1626}
1627
1628pub type GetStreamError = Error<GetStreamErrorKind>;
1629
1630pub type UpdateStreamError = CreateStreamError;
1631pub type UpdateStreamErrorKind = CreateStreamErrorKind;
1632pub type DeleteStreamError = GetStreamError;
1633pub type DeleteStreamErrorKind = GetStreamErrorKind;
1634
1635#[derive(Clone, Copy, Debug, PartialEq)]
1636pub enum KeyValueErrorKind {
1637 InvalidStoreName,
1638 GetBucket,
1639 JetStream,
1640}
1641
1642impl Display for KeyValueErrorKind {
1643 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1644 match self {
1645 Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
1646 Self::GetBucket => write!(f, "failed to get the bucket"),
1647 Self::JetStream => write!(f, "JetStream error"),
1648 }
1649 }
1650}
1651
1652pub type KeyValueError = Error<KeyValueErrorKind>;
1653
1654#[derive(Clone, Copy, Debug, PartialEq)]
1655pub enum CreateKeyValueErrorKind {
1656 InvalidStoreName,
1657 TooLongHistory,
1658 JetStream,
1659 BucketCreate,
1660 TimedOut,
1661}
1662
1663impl Display for CreateKeyValueErrorKind {
1664 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1665 match self {
1666 Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
1667 Self::TooLongHistory => write!(f, "too long history"),
1668 Self::JetStream => write!(f, "JetStream error"),
1669 Self::BucketCreate => write!(f, "bucket creation failed"),
1670 Self::TimedOut => write!(f, "timed out"),
1671 }
1672 }
1673}
1674
1675pub type CreateKeyValueError = Error<CreateKeyValueErrorKind>;
1676
1677pub type CreateObjectStoreError = CreateKeyValueError;
1678pub type CreateObjectStoreErrorKind = CreateKeyValueErrorKind;
1679
1680#[derive(Clone, Copy, Debug, PartialEq)]
1681pub enum ObjectStoreErrorKind {
1682 InvalidBucketName,
1683 GetStore,
1684}
1685
1686impl Display for ObjectStoreErrorKind {
1687 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1688 match self {
1689 Self::InvalidBucketName => write!(f, "invalid Object Store bucket name"),
1690 Self::GetStore => write!(f, "failed to get Object Store"),
1691 }
1692 }
1693}
1694
1695pub type ObjectStoreError = Error<ObjectStoreErrorKind>;
1696
1697pub type DeleteObjectStore = ObjectStoreError;
1698pub type DeleteObjectStoreKind = ObjectStoreErrorKind;
1699
1700#[derive(Clone, Debug, PartialEq)]
1701pub enum AccountErrorKind {
1702 TimedOut,
1703 JetStream(super::errors::Error),
1704 JetStreamUnavailable,
1705 Other,
1706}
1707
1708impl Display for AccountErrorKind {
1709 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1710 match self {
1711 Self::TimedOut => write!(f, "timed out"),
1712 Self::JetStream(err) => write!(f, "JetStream error: {}", err),
1713 Self::Other => write!(f, "error"),
1714 Self::JetStreamUnavailable => write!(f, "JetStream unavailable"),
1715 }
1716 }
1717}
1718
1719pub type AccountError = Error<AccountErrorKind>;
1720
1721impl From<RequestError> for AccountError {
1722 fn from(err: RequestError) -> Self {
1723 match err.kind {
1724 RequestErrorKind::NoResponders => {
1725 AccountError::with_source(AccountErrorKind::JetStreamUnavailable, err)
1726 }
1727 RequestErrorKind::TimedOut => AccountError::new(AccountErrorKind::TimedOut),
1728 RequestErrorKind::Other => AccountError::with_source(AccountErrorKind::Other, err),
1729 }
1730 }
1731}
1732
1733#[derive(Clone, Debug, Serialize)]
1734enum ConsumerAction {
1735 #[serde(rename = "")]
1736 CreateOrUpdate,
1737 #[serde(rename = "create")]
1738 #[cfg(feature = "server_2_10")]
1739 Create,
1740 #[serde(rename = "update")]
1741 #[cfg(feature = "server_2_10")]
1742 Update,
1743}