1use crate::error::Error;
17use crate::jetstream::account::Account;
18use crate::jetstream::message::PublishMessage;
19use crate::jetstream::publish::PublishAck;
20use crate::jetstream::response::Response;
21use crate::subject::ToSubject;
22use crate::{is_valid_subject, jetstream, Client, Command, Message, StatusCode};
23use bytes::Bytes;
24use futures_util::future::BoxFuture;
25use futures_util::{Future, StreamExt, TryFutureExt};
26use serde::de::DeserializeOwned;
27use serde::{Deserialize, Serialize};
28use serde_json::{self, json};
29use std::borrow::Borrow;
30use std::fmt::Debug;
31use std::fmt::Display;
32use std::future::IntoFuture;
33use std::pin::Pin;
34use std::str::from_utf8;
35use std::sync::Arc;
36use std::task::Poll;
37use tokio::sync::{mpsc, oneshot, OwnedSemaphorePermit, TryAcquireError};
38use tokio::time::Duration;
39use tokio_stream::wrappers::ReceiverStream;
40use tracing::debug;
41
42use super::consumer::{self, Consumer, FromConsumer, IntoConsumerConfig};
43use super::errors::ErrorCode;
44use super::is_valid_name;
45#[cfg(feature = "kv")]
46use super::kv::{Store, MAX_HISTORY};
47#[cfg(feature = "object-store")]
48use super::object_store::{is_valid_bucket_name, ObjectStore};
49#[cfg(any(feature = "kv", feature = "object-store"))]
50use super::stream::DiscardPolicy;
51#[cfg(feature = "server_2_10")]
52use super::stream::{Compression, ConsumerCreateStrictError, ConsumerUpdateError};
53use super::stream::{
54 Config, ConsumerError, ConsumerErrorKind, DeleteStatus, External, Info, Stream,
55};
56
57pub mod traits {
58 use std::{future::Future, time::Duration};
59
60 use bytes::Bytes;
61 use serde::{de::DeserializeOwned, Serialize};
62
63 use crate::{jetstream::message, subject::ToSubject, Request};
64
65 use super::RequestError;
66
67 pub trait Requester {
68 fn request<S, T, V>(
69 &self,
70 subject: S,
71 payload: &T,
72 ) -> impl Future<Output = Result<V, RequestError>>
73 where
74 S: ToSubject,
75 T: ?Sized + Serialize,
76 V: DeserializeOwned;
77 }
78
79 pub trait RequestSender {
80 fn send_request<S: ToSubject>(
81 &self,
82 subject: S,
83 request: Request,
84 ) -> impl Future<Output = Result<(), crate::PublishError>>;
85 }
86
87 pub trait Publisher {
88 fn publish<S>(
89 &self,
90 subject: S,
91 payload: Bytes,
92 ) -> impl Future<Output = Result<super::PublishAckFuture, super::PublishError>>
93 where
94 S: ToSubject;
95
96 fn publish_message(
97 &self,
98 message: message::OutboundMessage,
99 ) -> impl Future<Output = Result<super::PublishAckFuture, super::PublishError>>;
100 }
101
102 pub trait ClientProvider {
103 fn client(&self) -> crate::Client;
104 }
105
106 pub trait TimeoutProvider {
107 fn timeout(&self) -> Duration;
108 }
109}
110
111#[derive(Debug, Clone)]
113pub struct Context {
114 pub(crate) client: Client,
115 pub(crate) prefix: String,
116 pub(crate) timeout: Duration,
117 pub(crate) max_ack_semaphore: Arc<tokio::sync::Semaphore>,
118 pub(crate) ack_sender:
119 tokio::sync::mpsc::Sender<(oneshot::Receiver<Message>, OwnedSemaphorePermit)>,
120 pub(crate) backpressure_on_inflight: bool,
121 pub(crate) semaphore_capacity: usize,
122}
123
124fn spawn_acker(
125 rx: ReceiverStream<(oneshot::Receiver<Message>, OwnedSemaphorePermit)>,
126 ack_timeout: Duration,
127 concurrency: Option<usize>,
128) -> tokio::task::JoinHandle<()> {
129 tokio::spawn(async move {
130 rx.for_each_concurrent(concurrency, |(subscription, permit)| async move {
131 tokio::time::timeout(ack_timeout, subscription).await.ok();
132 drop(permit);
133 })
134 .await;
135 debug!("Acker task exited");
136 })
137}
138
139use std::marker::PhantomData;
140
141#[derive(Debug, Default)]
142pub struct Yes;
143#[derive(Debug, Default)]
144pub struct No;
145
146pub trait ToAssign: Debug {}
147
148impl ToAssign for Yes {}
149impl ToAssign for No {}
150
151pub struct ContextBuilder<PREFIX: ToAssign> {
170 prefix: String,
171 timeout: Duration,
172 semaphore_capacity: usize,
173 ack_timeout: Duration,
174 backpressure_on_inflight: bool,
175 concurrency_limit: Option<usize>,
176 _phantom: PhantomData<PREFIX>,
177}
178
179impl Default for ContextBuilder<Yes> {
180 fn default() -> Self {
181 ContextBuilder {
182 prefix: "$JS.API".to_string(),
183 timeout: Duration::from_secs(5),
184 semaphore_capacity: 5_000,
185 ack_timeout: Duration::from_secs(30),
186 backpressure_on_inflight: true,
187 concurrency_limit: None,
188 _phantom: PhantomData {},
189 }
190 }
191}
192
193impl ContextBuilder<Yes> {
194 pub fn new() -> ContextBuilder<Yes> {
196 ContextBuilder::default()
197 }
198}
199
200impl ContextBuilder<Yes> {
201 pub fn api_prefix<T: Into<String>>(self, prefix: T) -> ContextBuilder<No> {
203 ContextBuilder {
204 prefix: prefix.into(),
205 timeout: self.timeout,
206 semaphore_capacity: self.semaphore_capacity,
207 ack_timeout: self.ack_timeout,
208 backpressure_on_inflight: self.backpressure_on_inflight,
209 concurrency_limit: self.concurrency_limit,
210 _phantom: PhantomData,
211 }
212 }
213
214 pub fn domain<T: Into<String>>(self, domain: T) -> ContextBuilder<No> {
217 ContextBuilder {
218 prefix: format!("$JS.{}.API", domain.into()),
219 timeout: self.timeout,
220 semaphore_capacity: self.semaphore_capacity,
221 ack_timeout: self.ack_timeout,
222 backpressure_on_inflight: self.backpressure_on_inflight,
223 concurrency_limit: self.concurrency_limit,
224 _phantom: PhantomData,
225 }
226 }
227}
228
229impl<PREFIX> ContextBuilder<PREFIX>
230where
231 PREFIX: ToAssign,
232{
233 pub fn timeout(self, timeout: Duration) -> ContextBuilder<Yes>
235 where
236 Yes: ToAssign,
237 {
238 ContextBuilder {
239 prefix: self.prefix,
240 timeout,
241 semaphore_capacity: self.semaphore_capacity,
242 ack_timeout: self.ack_timeout,
243 backpressure_on_inflight: self.backpressure_on_inflight,
244 concurrency_limit: self.concurrency_limit,
245 _phantom: PhantomData,
246 }
247 }
248
249 pub fn ack_timeout(self, ack_timeout: Duration) -> ContextBuilder<Yes>
252 where
253 Yes: ToAssign,
254 {
255 ContextBuilder {
256 prefix: self.prefix,
257 timeout: self.timeout,
258 semaphore_capacity: self.semaphore_capacity,
259 ack_timeout,
260 backpressure_on_inflight: self.backpressure_on_inflight,
261 concurrency_limit: self.concurrency_limit,
262 _phantom: PhantomData,
263 }
264 }
265
266 pub fn max_ack_inflight(self, capacity: usize) -> ContextBuilder<Yes>
269 where
270 Yes: ToAssign,
271 {
272 ContextBuilder {
273 prefix: self.prefix,
274 timeout: self.timeout,
275 semaphore_capacity: capacity,
276 ack_timeout: self.ack_timeout,
277 backpressure_on_inflight: self.backpressure_on_inflight,
278 concurrency_limit: self.concurrency_limit,
279 _phantom: PhantomData,
280 }
281 }
282
283 pub fn backpressure_on_inflight(self, enabled: bool) -> ContextBuilder<Yes>
288 where
289 Yes: ToAssign,
290 {
291 ContextBuilder {
292 prefix: self.prefix,
293 timeout: self.timeout,
294 semaphore_capacity: self.semaphore_capacity,
295 ack_timeout: self.ack_timeout,
296 backpressure_on_inflight: enabled,
297 concurrency_limit: self.concurrency_limit,
298 _phantom: PhantomData,
299 }
300 }
301
302 pub fn concurrency_limit(self, limit: Option<usize>) -> ContextBuilder<Yes>
305 where
306 Yes: ToAssign,
307 {
308 ContextBuilder {
309 prefix: self.prefix,
310 timeout: self.timeout,
311 semaphore_capacity: self.semaphore_capacity,
312 ack_timeout: self.ack_timeout,
313 backpressure_on_inflight: self.backpressure_on_inflight,
314 concurrency_limit: limit,
315 _phantom: PhantomData,
316 }
317 }
318
319 pub fn build(self, client: Client) -> Context {
321 let (tx, rx) = tokio::sync::mpsc::channel::<(
322 oneshot::Receiver<Message>,
323 OwnedSemaphorePermit,
324 )>(self.semaphore_capacity);
325 let stream = ReceiverStream::new(rx);
326 spawn_acker(stream, self.ack_timeout, self.concurrency_limit);
327 Context {
328 client,
329 prefix: self.prefix,
330 timeout: self.timeout,
331 max_ack_semaphore: Arc::new(tokio::sync::Semaphore::new(self.semaphore_capacity)),
332 ack_sender: tx,
333 backpressure_on_inflight: self.backpressure_on_inflight,
334 semaphore_capacity: self.semaphore_capacity,
335 }
336 }
337}
338
339impl Context {
340 pub(crate) fn new(client: Client) -> Context {
341 ContextBuilder::default().build(client)
342 }
343
344 pub fn set_timeout(&mut self, timeout: Duration) {
346 self.timeout = timeout
347 }
348
349 pub fn client(&self) -> Client {
351 self.client.clone()
352 }
353
354 pub async fn wait_for_acks(&self) {
360 self.max_ack_semaphore
361 .acquire_many(self.semaphore_capacity as u32)
362 .await
363 .ok();
364 }
365
366 pub(crate) fn with_prefix<T: ToString>(client: Client, prefix: T) -> Context {
368 ContextBuilder::new()
369 .api_prefix(prefix.to_string())
370 .build(client)
371 }
372
373 pub(crate) fn with_domain<T: AsRef<str>>(client: Client, domain: T) -> Context {
375 ContextBuilder::new().domain(domain.as_ref()).build(client)
376 }
377
378 pub async fn publish<S: ToSubject>(
419 &self,
420 subject: S,
421 payload: Bytes,
422 ) -> Result<PublishAckFuture, PublishError> {
423 self.send_publish(subject, PublishMessage::build().payload(payload))
424 .await
425 }
426
427 pub async fn publish_with_headers<S: ToSubject>(
449 &self,
450 subject: S,
451 headers: crate::header::HeaderMap,
452 payload: Bytes,
453 ) -> Result<PublishAckFuture, PublishError> {
454 self.send_publish(
455 subject,
456 PublishMessage::build().payload(payload).headers(headers),
457 )
458 .await
459 }
460
461 pub async fn send_publish<S: ToSubject>(
484 &self,
485 subject: S,
486 publish: PublishMessage,
487 ) -> Result<PublishAckFuture, PublishError> {
488 let permit = if self.backpressure_on_inflight {
489 self.max_ack_semaphore
491 .clone()
492 .acquire_owned()
493 .await
494 .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err))?
495 } else {
496 self.max_ack_semaphore
498 .clone()
499 .try_acquire_owned()
500 .map_err(|err| match err {
501 TryAcquireError::NoPermits => {
502 PublishError::new(PublishErrorKind::MaxAckPending)
503 }
504 _ => PublishError::with_source(PublishErrorKind::Other, err),
505 })?
506 };
507 let subject = self
508 .client
509 .maybe_validate_publish_subject(subject)
510 .map_err(|e| PublishError::with_source(PublishErrorKind::Other, e))?;
511 let (sender, receiver) = oneshot::channel();
512
513 let respond = self.client.new_inbox().into();
514
515 let send_fut = self
516 .client
517 .sender
518 .send(Command::Request {
519 subject,
520 payload: publish.payload,
521 respond,
522 headers: publish.headers,
523 sender,
524 })
525 .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err));
526
527 tokio::time::timeout(self.timeout, send_fut)
528 .map_err(|_elapsed| PublishError::new(PublishErrorKind::TimedOut))
529 .await??;
530
531 Ok(PublishAckFuture {
532 timeout: self.timeout,
533 subscription: Some(receiver),
534 permit: Some(permit),
535 tx: self.ack_sender.clone(),
536 })
537 }
538
539 pub async fn query_account(&self) -> Result<Account, AccountError> {
541 let response: Response<Account> = self.request("INFO", b"").await?;
542
543 match response {
544 Response::Err { error } => Err(AccountError::new(AccountErrorKind::JetStream(error))),
545 Response::Ok(account) => Ok(account),
546 }
547 }
548
549 pub async fn create_stream<S>(
574 &self,
575 stream_config: S,
576 ) -> Result<Stream<Info>, CreateStreamError>
577 where
578 Config: From<S>,
579 {
580 let mut config: Config = stream_config.into();
581 if config.name.is_empty() {
582 return Err(CreateStreamError::new(
583 CreateStreamErrorKind::EmptyStreamName,
584 ));
585 }
586 if !is_valid_name(config.name.as_str()) {
587 return Err(CreateStreamError::new(
588 CreateStreamErrorKind::InvalidStreamName,
589 ));
590 }
591 if let Some(ref mut mirror) = config.mirror {
592 if let Some(ref mut domain) = mirror.domain {
593 if mirror.external.is_some() {
594 return Err(CreateStreamError::new(
595 CreateStreamErrorKind::DomainAndExternalSet,
596 ));
597 }
598 mirror.external = Some(External {
599 api_prefix: format!("$JS.{domain}.API"),
600 delivery_prefix: None,
601 })
602 }
603 }
604
605 if let Some(ref mut sources) = config.sources {
606 for source in sources {
607 if let Some(ref mut domain) = source.domain {
608 if source.external.is_some() {
609 return Err(CreateStreamError::new(
610 CreateStreamErrorKind::DomainAndExternalSet,
611 ));
612 }
613 source.external = Some(External {
614 api_prefix: format!("$JS.{domain}.API"),
615 delivery_prefix: None,
616 })
617 }
618 }
619 }
620 let subject = format!("STREAM.CREATE.{}", config.name);
621 let response: Response<Info> = self.request(subject, &config).await?;
622
623 match response {
624 Response::Err { error } => Err(error.into()),
625 Response::Ok(info) => Ok(Stream {
626 context: self.clone(),
627 info,
628 name: config.name,
629 }),
630 }
631 }
632
633 pub async fn get_stream_no_info<T: AsRef<str>>(
653 &self,
654 stream: T,
655 ) -> Result<Stream<()>, GetStreamError> {
656 let stream = stream.as_ref();
657 if stream.is_empty() {
658 return Err(GetStreamError::new(GetStreamErrorKind::EmptyName));
659 }
660
661 if !is_valid_name(stream) {
662 return Err(GetStreamError::new(GetStreamErrorKind::InvalidStreamName));
663 }
664
665 Ok(Stream {
666 context: self.clone(),
667 info: (),
668 name: stream.to_string(),
669 })
670 }
671
672 pub async fn get_stream<T: AsRef<str>>(&self, stream: T) -> Result<Stream, GetStreamError> {
688 let stream = stream.as_ref();
689 if stream.is_empty() {
690 return Err(GetStreamError::new(GetStreamErrorKind::EmptyName));
691 }
692
693 if !is_valid_name(stream) {
694 return Err(GetStreamError::new(GetStreamErrorKind::InvalidStreamName));
695 }
696
697 let subject = format!("STREAM.INFO.{stream}");
698 let request: Response<Info> = self
699 .request(subject, &())
700 .await
701 .map_err(|err| GetStreamError::with_source(GetStreamErrorKind::Request, err))?;
702 match request {
703 Response::Err { error } => {
704 Err(GetStreamError::new(GetStreamErrorKind::JetStream(error)))
705 }
706 Response::Ok(info) => Ok(Stream {
707 context: self.clone(),
708 info,
709 name: stream.to_string(),
710 }),
711 }
712 }
713
714 pub async fn get_or_create_stream<S>(
738 &self,
739 stream_config: S,
740 ) -> Result<Stream, CreateStreamError>
741 where
742 S: Into<Config>,
743 {
744 let config: Config = stream_config.into();
745
746 if config.name.is_empty() {
747 return Err(CreateStreamError::new(
748 CreateStreamErrorKind::EmptyStreamName,
749 ));
750 }
751
752 if !is_valid_name(config.name.as_str()) {
753 return Err(CreateStreamError::new(
754 CreateStreamErrorKind::InvalidStreamName,
755 ));
756 }
757 let subject = format!("STREAM.INFO.{}", config.name);
758
759 let request: Response<Info> = self.request(subject, &()).await?;
760 match request {
761 Response::Err { error } if error.code() == 404 => self.create_stream(&config).await,
762 Response::Err { error } => Err(error.into()),
763 Response::Ok(info) => Ok(Stream {
764 context: self.clone(),
765 info,
766 name: config.name,
767 }),
768 }
769 }
770
771 pub async fn delete_stream<T: AsRef<str>>(
787 &self,
788 stream: T,
789 ) -> Result<DeleteStatus, DeleteStreamError> {
790 let stream = stream.as_ref();
791 if stream.is_empty() {
792 return Err(DeleteStreamError::new(DeleteStreamErrorKind::EmptyName));
793 }
794
795 if !is_valid_name(stream) {
796 return Err(DeleteStreamError::new(
797 DeleteStreamErrorKind::InvalidStreamName,
798 ));
799 }
800
801 let subject = format!("STREAM.DELETE.{stream}");
802 match self
803 .request(subject, &json!({}))
804 .await
805 .map_err(|err| DeleteStreamError::with_source(DeleteStreamErrorKind::Request, err))?
806 {
807 Response::Err { error } => Err(DeleteStreamError::new(
808 DeleteStreamErrorKind::JetStream(error),
809 )),
810 Response::Ok(delete_response) => Ok(delete_response),
811 }
812 }
813
814 pub async fn update_stream<S>(&self, config: S) -> Result<Info, UpdateStreamError>
839 where
840 S: Borrow<Config>,
841 {
842 let config = config.borrow();
843
844 if config.name.is_empty() {
845 return Err(CreateStreamError::new(
846 CreateStreamErrorKind::EmptyStreamName,
847 ));
848 }
849
850 if !is_valid_name(config.name.as_str()) {
851 return Err(CreateStreamError::new(
852 CreateStreamErrorKind::InvalidStreamName,
853 ));
854 }
855
856 let subject = format!("STREAM.UPDATE.{}", config.name);
857 match self.request(subject, config).await? {
858 Response::Err { error } => Err(error.into()),
859 Response::Ok(info) => Ok(info),
860 }
861 }
862
863 pub async fn create_or_update_stream(&self, config: Config) -> Result<Info, CreateStreamError> {
887 match self.update_stream(config.clone()).await {
888 Ok(stream) => Ok(stream),
889 Err(err) => match err.kind() {
890 CreateStreamErrorKind::NotFound => {
891 let stream = self
892 .create_stream(config)
893 .await
894 .map_err(|err| CreateStreamError::with_source(err.kind(), err))?;
895 Ok(stream.info)
896 }
897 _ => Err(err),
898 },
899 }
900 }
901
902 pub async fn stream_by_subject<T: Into<String>>(
917 &self,
918 subject: T,
919 ) -> Result<String, GetStreamByNameError> {
920 let subject = subject.into();
921 if !is_valid_subject(subject.as_str()) {
922 return Err(GetStreamByNameError::new(
923 GetStreamByNameErrorKind::InvalidSubject,
924 ));
925 }
926 let mut names = StreamNames {
927 context: self.clone(),
928 offset: 0,
929 page_request: None,
930 streams: Vec::new(),
931 subject: Some(subject),
932 done: false,
933 };
934 match names.next().await {
935 Some(name) => match name {
936 Ok(name) => Ok(name),
937 Err(err) => Err(GetStreamByNameError::with_source(
938 GetStreamByNameErrorKind::Request,
939 err,
940 )),
941 },
942 None => Err(GetStreamByNameError::new(
943 GetStreamByNameErrorKind::NotFound,
944 )),
945 }
946 }
947
948 pub fn stream_names(&self) -> StreamNames {
966 StreamNames {
967 context: self.clone(),
968 offset: 0,
969 page_request: None,
970 streams: Vec::new(),
971 subject: None,
972 done: false,
973 }
974 }
975
976 pub fn streams(&self) -> Streams {
994 Streams {
995 context: self.clone(),
996 offset: 0,
997 page_request: None,
998 streams: Vec::new(),
999 done: false,
1000 }
1001 }
1002 #[cfg(feature = "kv")]
1016 #[cfg_attr(docsrs, doc(cfg(feature = "kv")))]
1017 pub async fn get_key_value<T: Into<String>>(&self, bucket: T) -> Result<Store, KeyValueError> {
1018 let bucket: String = bucket.into();
1019 if !crate::jetstream::kv::is_valid_bucket_name(&bucket) {
1020 return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
1021 }
1022
1023 let stream_name = format!("KV_{}", &bucket);
1024 let stream = self
1025 .get_stream(stream_name.clone())
1026 .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::GetBucket, err))
1027 .await?;
1028
1029 if stream.info.config.max_messages_per_subject < 1 {
1030 return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
1031 }
1032 let mut store = Store {
1033 prefix: format!("$KV.{}.", &bucket),
1034 name: bucket,
1035 stream_name,
1036 stream: stream.clone(),
1037 put_prefix: None,
1038 use_jetstream_prefix: self.prefix != "$JS.API",
1039 };
1040 if let Some(ref mirror) = stream.info.config.mirror {
1041 let bucket = mirror.name.trim_start_matches("KV_");
1042 if let Some(ref external) = mirror.external {
1043 if !external.api_prefix.is_empty() {
1044 store.use_jetstream_prefix = false;
1045 store.prefix = format!("$KV.{bucket}.");
1046 store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
1047 } else {
1048 store.put_prefix = Some(format!("$KV.{bucket}."));
1049 }
1050 }
1051 };
1052
1053 Ok(store)
1054 }
1055
1056 #[cfg(feature = "kv")]
1076 #[cfg_attr(docsrs, doc(cfg(feature = "kv")))]
1077 pub async fn create_key_value(
1078 &self,
1079 config: crate::jetstream::kv::Config,
1080 ) -> Result<Store, CreateKeyValueError> {
1081 if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1082 return Err(CreateKeyValueError::new(
1083 CreateKeyValueErrorKind::InvalidStoreName,
1084 ));
1085 }
1086 let info = self.query_account().await.map_err(|err| {
1087 CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
1088 })?;
1089
1090 let bucket_name = config.bucket.clone();
1091 let stream_config = kv_to_stream_config(config, info)?;
1092
1093 let stream = self.create_stream(stream_config).await.map_err(|err| {
1094 if err.kind() == CreateStreamErrorKind::TimedOut {
1095 CreateKeyValueError::with_source(CreateKeyValueErrorKind::TimedOut, err)
1096 } else {
1097 CreateKeyValueError::with_source(CreateKeyValueErrorKind::BucketCreate, err)
1098 }
1099 })?;
1100
1101 Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
1102 }
1103
1104 #[cfg(feature = "kv")]
1124 #[cfg_attr(docsrs, doc(cfg(feature = "kv")))]
1125 pub async fn update_key_value(
1126 &self,
1127 config: crate::jetstream::kv::Config,
1128 ) -> Result<Store, UpdateKeyValueError> {
1129 if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1130 return Err(UpdateKeyValueError::new(
1131 UpdateKeyValueErrorKind::InvalidStoreName,
1132 ));
1133 }
1134
1135 let stream_name = format!("KV_{}", config.bucket);
1136 let bucket_name = config.bucket.clone();
1137
1138 let account = self.query_account().await.map_err(|err| {
1139 UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::JetStream, err)
1140 })?;
1141 let stream = self
1142 .update_stream(kv_to_stream_config(config, account)?)
1143 .await
1144 .map_err(|err| match err.kind() {
1145 UpdateStreamErrorKind::NotFound => {
1146 UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::NotFound, err)
1147 }
1148 _ => UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::JetStream, err),
1149 })?;
1150
1151 let stream = Stream {
1152 context: self.clone(),
1153 info: stream,
1154 name: stream_name,
1155 };
1156
1157 Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
1158 }
1159
1160 #[cfg(feature = "kv")]
1180 #[cfg_attr(docsrs, doc(cfg(feature = "kv")))]
1181 pub async fn create_or_update_key_value(
1182 &self,
1183 config: crate::jetstream::kv::Config,
1184 ) -> Result<Store, CreateKeyValueError> {
1185 if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1186 return Err(CreateKeyValueError::new(
1187 CreateKeyValueErrorKind::InvalidStoreName,
1188 ));
1189 }
1190
1191 let bucket_name = config.bucket.clone();
1192 let stream_name = format!("KV_{}", config.bucket);
1193
1194 let account = self.query_account().await.map_err(|err| {
1195 CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
1196 })?;
1197 let stream = self
1198 .create_or_update_stream(kv_to_stream_config(config, account)?)
1199 .await
1200 .map_err(|err| {
1201 CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
1202 })?;
1203
1204 let stream = Stream {
1205 context: self.clone(),
1206 info: stream,
1207 name: stream_name,
1208 };
1209
1210 Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
1211 }
1212
1213 #[cfg(feature = "kv")]
1233 #[cfg_attr(docsrs, doc(cfg(feature = "kv")))]
1234 pub async fn delete_key_value<T: AsRef<str>>(
1235 &self,
1236 bucket: T,
1237 ) -> Result<DeleteStatus, KeyValueError> {
1238 if !crate::jetstream::kv::is_valid_bucket_name(bucket.as_ref()) {
1239 return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
1240 }
1241
1242 let stream_name = format!("KV_{}", bucket.as_ref());
1243 self.delete_stream(stream_name)
1244 .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::JetStream, err))
1245 .await
1246 }
1247
1248 pub async fn get_consumer_from_stream<T, C, S>(
1285 &self,
1286 consumer: C,
1287 stream: S,
1288 ) -> Result<Consumer<T>, ConsumerError>
1289 where
1290 T: FromConsumer + IntoConsumerConfig,
1291 S: AsRef<str>,
1292 C: AsRef<str>,
1293 {
1294 if !is_valid_name(stream.as_ref()) {
1295 return Err(ConsumerError::with_source(
1296 ConsumerErrorKind::InvalidName,
1297 "invalid stream",
1298 ));
1299 }
1300
1301 if !is_valid_name(consumer.as_ref()) {
1302 return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
1303 }
1304
1305 let subject = format!("CONSUMER.INFO.{}.{}", stream.as_ref(), consumer.as_ref());
1306
1307 let info: super::consumer::Info = match self.request(subject, &json!({})).await? {
1308 Response::Ok(info) => info,
1309 Response::Err { error } => return Err(error.into()),
1310 };
1311
1312 Ok(Consumer::new(
1313 T::try_from_consumer_config(info.config.clone()).map_err(|err| {
1314 ConsumerError::with_source(ConsumerErrorKind::InvalidConsumerType, err)
1315 })?,
1316 info,
1317 self.clone(),
1318 ))
1319 }
1320
1321 pub async fn delete_consumer_from_stream<C: AsRef<str>, S: AsRef<str>>(
1344 &self,
1345 consumer: C,
1346 stream: S,
1347 ) -> Result<DeleteStatus, ConsumerError> {
1348 if !is_valid_name(consumer.as_ref()) {
1349 return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
1350 }
1351
1352 if !is_valid_name(stream.as_ref()) {
1353 return Err(ConsumerError::with_source(
1354 ConsumerErrorKind::Other,
1355 "invalid stream name",
1356 ));
1357 }
1358
1359 let subject = format!("CONSUMER.DELETE.{}.{}", stream.as_ref(), consumer.as_ref());
1360
1361 match self.request(subject, &json!({})).await? {
1362 Response::Ok(delete_status) => Ok(delete_status),
1363 Response::Err { error } => Err(error.into()),
1364 }
1365 }
1366
1367 pub async fn create_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
1393 &self,
1394 config: C,
1395 stream: S,
1396 ) -> Result<Consumer<C>, ConsumerError> {
1397 self.create_consumer_on_stream_action(config, stream, ConsumerAction::CreateOrUpdate)
1398 .await
1399 }
1400
1401 #[cfg(feature = "server_2_10")]
1428 pub async fn update_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
1429 &self,
1430 config: C,
1431 stream: S,
1432 ) -> Result<Consumer<C>, ConsumerUpdateError> {
1433 self.create_consumer_on_stream_action(config, stream, ConsumerAction::Update)
1434 .await
1435 .map_err(|err| err.into())
1436 }
1437
1438 #[cfg(feature = "server_2_10")]
1465 pub async fn create_consumer_strict_on_stream<
1466 C: IntoConsumerConfig + FromConsumer,
1467 S: AsRef<str>,
1468 >(
1469 &self,
1470 config: C,
1471 stream: S,
1472 ) -> Result<Consumer<C>, ConsumerCreateStrictError> {
1473 self.create_consumer_on_stream_action(config, stream, ConsumerAction::Create)
1474 .await
1475 .map_err(|err| err.into())
1476 }
1477
1478 async fn create_consumer_on_stream_action<
1479 C: IntoConsumerConfig + FromConsumer,
1480 S: AsRef<str>,
1481 >(
1482 &self,
1483 config: C,
1484 stream: S,
1485 action: ConsumerAction,
1486 ) -> Result<Consumer<C>, ConsumerError> {
1487 let config = config.into_consumer_config();
1488
1489 let subject = {
1490 let filter = if config.filter_subject.is_empty() {
1491 "".to_string()
1492 } else {
1493 format!(".{}", config.filter_subject)
1494 };
1495 config
1496 .name
1497 .as_ref()
1498 .or(config.durable_name.as_ref())
1499 .map(|name| format!("CONSUMER.CREATE.{}.{}{}", stream.as_ref(), name, filter))
1500 .unwrap_or_else(|| format!("CONSUMER.CREATE.{}", stream.as_ref()))
1501 };
1502
1503 match self
1504 .request(
1505 subject,
1506 &json!({"stream_name": stream.as_ref(), "config": config, "action": action}),
1507 )
1508 .await?
1509 {
1510 Response::Err { error } => Err(ConsumerError::new(ConsumerErrorKind::JetStream(error))),
1511 Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
1512 FromConsumer::try_from_consumer_config(info.clone().config)
1513 .map_err(|err| ConsumerError::with_source(ConsumerErrorKind::Other, err))?,
1514 info,
1515 self.clone(),
1516 )),
1517 }
1518 }
1519
1520 pub async fn request<S, T, V>(&self, subject: S, payload: &T) -> Result<V, RequestError>
1540 where
1541 S: ToSubject,
1542 T: ?Sized + Serialize,
1543 V: DeserializeOwned,
1544 {
1545 let subject = subject.to_subject();
1546 let request = serde_json::to_vec(&payload)
1547 .map(Bytes::from)
1548 .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
1549
1550 debug!("JetStream request sent: {:?}", request);
1551
1552 let message = self
1553 .client
1554 .request(format!("{}.{}", self.prefix, subject.as_ref()), request)
1555 .await;
1556 let message = message?;
1557 debug!(
1558 "JetStream request response: {:?}",
1559 from_utf8(&message.payload)
1560 );
1561 let response = serde_json::from_slice(message.payload.as_ref())
1562 .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
1563
1564 Ok(response)
1565 }
1566
1567 pub async fn send_request<S: ToSubject>(
1576 &self,
1577 subject: S,
1578 request: crate::client::Request,
1579 ) -> Result<(), crate::PublishError> {
1580 let prefixed_subject = format!("{}.{}", self.prefix, subject.to_subject());
1581 let inbox = request.inbox.unwrap_or_else(|| self.client.new_inbox());
1582 let payload = request.payload.unwrap_or_default();
1583
1584 match request.headers {
1585 Some(headers) => {
1586 self.client
1587 .publish_with_reply_and_headers(prefixed_subject, inbox, headers, payload)
1588 .await
1589 }
1590 None => {
1591 self.client
1592 .publish_with_reply(prefixed_subject, inbox, payload)
1593 .await
1594 }
1595 }
1596 }
1597
1598 #[cfg(feature = "object-store")]
1617 #[cfg_attr(docsrs, doc(cfg(feature = "object-store")))]
1618 pub async fn create_object_store(
1619 &self,
1620 config: super::object_store::Config,
1621 ) -> Result<super::object_store::ObjectStore, CreateObjectStoreError> {
1622 if !super::object_store::is_valid_bucket_name(&config.bucket) {
1623 return Err(CreateObjectStoreError::new(
1624 CreateKeyValueErrorKind::InvalidStoreName,
1625 ));
1626 }
1627
1628 let bucket_name = config.bucket.clone();
1629 let stream_name = format!("OBJ_{bucket_name}");
1630 let chunk_subject = format!("$O.{bucket_name}.C.>");
1631 let meta_subject = format!("$O.{bucket_name}.M.>");
1632
1633 let stream = self
1634 .create_stream(super::stream::Config {
1635 name: stream_name,
1636 description: config.description.clone(),
1637 subjects: vec![chunk_subject, meta_subject],
1638 max_age: config.max_age,
1639 max_bytes: config.max_bytes,
1640 storage: config.storage,
1641 num_replicas: config.num_replicas,
1642 discard: DiscardPolicy::New,
1643 allow_rollup: true,
1644 allow_direct: true,
1645 #[cfg(feature = "server_2_10")]
1646 compression: if config.compression {
1647 Some(Compression::S2)
1648 } else {
1649 None
1650 },
1651 placement: config.placement,
1652 ..Default::default()
1653 })
1654 .await
1655 .map_err(|err| {
1656 CreateObjectStoreError::with_source(CreateKeyValueErrorKind::BucketCreate, err)
1657 })?;
1658
1659 Ok(ObjectStore {
1660 name: bucket_name,
1661 stream,
1662 })
1663 }
1664
1665 #[cfg(feature = "object-store")]
1679 #[cfg_attr(docsrs, doc(cfg(feature = "object-store")))]
1680 pub async fn get_object_store<T: AsRef<str>>(
1681 &self,
1682 bucket_name: T,
1683 ) -> Result<ObjectStore, ObjectStoreError> {
1684 let bucket_name = bucket_name.as_ref();
1685 if !is_valid_bucket_name(bucket_name) {
1686 return Err(ObjectStoreError::new(
1687 ObjectStoreErrorKind::InvalidBucketName,
1688 ));
1689 }
1690 let stream_name = format!("OBJ_{bucket_name}");
1691 let stream = self
1692 .get_stream(stream_name)
1693 .await
1694 .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?;
1695
1696 Ok(ObjectStore {
1697 name: bucket_name.to_string(),
1698 stream,
1699 })
1700 }
1701
1702 #[cfg(feature = "object-store")]
1716 #[cfg_attr(docsrs, doc(cfg(feature = "object-store")))]
1717 pub async fn delete_object_store<T: AsRef<str>>(
1718 &self,
1719 bucket_name: T,
1720 ) -> Result<(), DeleteObjectStore> {
1721 let stream_name = format!("OBJ_{}", bucket_name.as_ref());
1722 self.delete_stream(stream_name)
1723 .await
1724 .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?;
1725 Ok(())
1726 }
1727}
1728
1729impl crate::client::traits::Requester for Context {
1730 fn send_request<S: ToSubject>(
1731 &self,
1732 subject: S,
1733 request: crate::Request,
1734 ) -> impl Future<Output = Result<Message, crate::RequestError>> {
1735 self.client.send_request(subject, request)
1736 }
1737}
1738
1739impl crate::client::traits::Publisher for Context {
1740 fn publish_with_reply<S, R>(
1741 &self,
1742 subject: S,
1743 reply: R,
1744 payload: Bytes,
1745 ) -> impl Future<Output = Result<(), crate::PublishError>>
1746 where
1747 S: ToSubject,
1748 R: ToSubject,
1749 {
1750 self.client.publish_with_reply(subject, reply, payload)
1751 }
1752
1753 fn publish_message(
1754 &self,
1755 msg: crate::message::OutboundMessage,
1756 ) -> impl Future<Output = Result<(), crate::PublishError>> {
1757 self.client.publish_message(msg)
1758 }
1759}
1760
1761impl traits::ClientProvider for Context {
1762 fn client(&self) -> crate::Client {
1763 self.client()
1764 }
1765}
1766
1767impl traits::Requester for Context {
1768 fn request<S, T, V>(
1769 &self,
1770 subject: S,
1771 payload: &T,
1772 ) -> impl Future<Output = Result<V, RequestError>>
1773 where
1774 S: ToSubject,
1775 T: ?Sized + Serialize,
1776 V: DeserializeOwned,
1777 {
1778 self.request(subject, payload)
1779 }
1780}
1781
1782impl traits::TimeoutProvider for Context {
1783 fn timeout(&self) -> Duration {
1784 self.timeout
1785 }
1786}
1787
1788impl traits::RequestSender for Context {
1789 fn send_request<S: ToSubject>(
1790 &self,
1791 subject: S,
1792 request: crate::client::Request,
1793 ) -> impl Future<Output = Result<(), crate::PublishError>> {
1794 self.send_request(subject, request)
1795 }
1796}
1797
1798impl traits::Publisher for Context {
1799 fn publish<S: ToSubject>(
1800 &self,
1801 subject: S,
1802 payload: Bytes,
1803 ) -> impl Future<Output = Result<PublishAckFuture, PublishError>> {
1804 self.publish(subject, payload)
1805 }
1806
1807 fn publish_message(
1808 &self,
1809 message: jetstream::message::OutboundMessage,
1810 ) -> impl Future<Output = Result<PublishAckFuture, PublishError>> {
1811 self.send_publish(
1812 message.subject,
1813 PublishMessage {
1814 payload: message.payload,
1815 headers: message.headers,
1816 },
1817 )
1818 }
1819}
1820
1821#[derive(Clone, Copy, Debug, PartialEq)]
1822pub enum PublishErrorKind {
1823 StreamNotFound,
1824 WrongLastMessageId,
1825 WrongLastSequence,
1826 TimedOut,
1827 BrokenPipe,
1828 MaxAckPending,
1829 Other,
1830}
1831
1832impl Display for PublishErrorKind {
1833 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1834 match self {
1835 Self::StreamNotFound => write!(f, "no stream found for given subject"),
1836 Self::TimedOut => write!(f, "timed out: didn't receive ack in time"),
1837 Self::Other => write!(f, "publish failed"),
1838 Self::BrokenPipe => write!(f, "broken pipe"),
1839 Self::WrongLastMessageId => write!(f, "wrong last message id"),
1840 Self::WrongLastSequence => write!(f, "wrong last sequence"),
1841 Self::MaxAckPending => write!(f, "max ack pending reached"),
1842 }
1843 }
1844}
1845
1846pub type PublishError = Error<PublishErrorKind>;
1847
1848#[derive(Debug)]
1849pub struct PublishAckFuture {
1850 timeout: Duration,
1851 subscription: Option<oneshot::Receiver<Message>>,
1852 permit: Option<OwnedSemaphorePermit>,
1853 tx: mpsc::Sender<(oneshot::Receiver<Message>, OwnedSemaphorePermit)>,
1854}
1855
1856impl Drop for PublishAckFuture {
1857 fn drop(&mut self) {
1858 if let (Some(sub), Some(permit)) = (self.subscription.take(), self.permit.take()) {
1859 if let Err(err) = self.tx.try_send((sub, permit)) {
1860 tracing::warn!("failed to pass future permit to the acker: {}", err);
1861 }
1862 }
1863 }
1864}
1865
1866impl PublishAckFuture {
1867 async fn next_with_timeout(mut self) -> Result<PublishAck, PublishError> {
1868 let next = tokio::time::timeout(self.timeout, self.subscription.take().unwrap())
1869 .await
1870 .map_err(|_| PublishError::new(PublishErrorKind::TimedOut))?;
1871 next.map_or_else(
1872 |_| Err(PublishError::new(PublishErrorKind::BrokenPipe)),
1873 |m| {
1874 if m.status == Some(StatusCode::NO_RESPONDERS) {
1875 return Err(PublishError::new(PublishErrorKind::StreamNotFound));
1876 }
1877 let response = serde_json::from_slice(m.payload.as_ref())
1878 .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err))?;
1879 match response {
1880 Response::Err { error } => match error.error_code() {
1881 ErrorCode::STREAM_WRONG_LAST_MESSAGE_ID => Err(PublishError::with_source(
1882 PublishErrorKind::WrongLastMessageId,
1883 error,
1884 )),
1885 ErrorCode::STREAM_WRONG_LAST_SEQUENCE => Err(PublishError::with_source(
1886 PublishErrorKind::WrongLastSequence,
1887 error,
1888 )),
1889 _ => Err(PublishError::with_source(PublishErrorKind::Other, error)),
1890 },
1891 Response::Ok(publish_ack) => Ok(publish_ack),
1892 }
1893 },
1894 )
1895 }
1896}
1897impl IntoFuture for PublishAckFuture {
1898 type Output = Result<PublishAck, PublishError>;
1899
1900 type IntoFuture = Pin<Box<dyn Future<Output = Result<PublishAck, PublishError>> + Send>>;
1901
1902 fn into_future(self) -> Self::IntoFuture {
1903 Box::pin(std::future::IntoFuture::into_future(
1904 self.next_with_timeout(),
1905 ))
1906 }
1907}
1908
1909#[derive(Deserialize, Debug)]
1910struct StreamPage {
1911 total: usize,
1912 streams: Option<Vec<String>>,
1913}
1914
1915#[derive(Deserialize, Debug)]
1916struct StreamInfoPage {
1917 total: usize,
1918 streams: Option<Vec<super::stream::Info>>,
1919}
1920
1921type PageRequest = BoxFuture<'static, Result<StreamPage, RequestError>>;
1922
1923pub struct StreamNames {
1924 context: Context,
1925 offset: usize,
1926 page_request: Option<PageRequest>,
1927 subject: Option<String>,
1928 streams: Vec<String>,
1929 done: bool,
1930}
1931
1932impl futures_util::Stream for StreamNames {
1933 type Item = Result<String, StreamsError>;
1934
1935 fn poll_next(
1936 mut self: Pin<&mut Self>,
1937 cx: &mut std::task::Context<'_>,
1938 ) -> std::task::Poll<Option<Self::Item>> {
1939 match self.page_request.as_mut() {
1940 Some(page) => match page.try_poll_unpin(cx) {
1941 std::task::Poll::Ready(page) => {
1942 self.page_request = None;
1943 let page = page
1944 .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?;
1945 if let Some(streams) = page.streams {
1946 self.offset += streams.len();
1947 self.streams = streams;
1948 if self.offset >= page.total {
1949 self.done = true;
1950 }
1951 match self.streams.pop() {
1952 Some(stream) => Poll::Ready(Some(Ok(stream))),
1953 None => Poll::Ready(None),
1954 }
1955 } else {
1956 Poll::Ready(None)
1957 }
1958 }
1959 std::task::Poll::Pending => std::task::Poll::Pending,
1960 },
1961 None => {
1962 if let Some(stream) = self.streams.pop() {
1963 Poll::Ready(Some(Ok(stream)))
1964 } else {
1965 if self.done {
1966 return Poll::Ready(None);
1967 }
1968 let context = self.context.clone();
1969 let offset = self.offset;
1970 let subject = self.subject.clone();
1971 self.page_request = Some(Box::pin(async move {
1972 match context
1973 .request(
1974 "STREAM.NAMES",
1975 &json!({
1976 "offset": offset,
1977 "subject": subject
1978 }),
1979 )
1980 .await?
1981 {
1982 Response::Err { error } => {
1983 Err(RequestError::with_source(RequestErrorKind::Other, error))
1984 }
1985 Response::Ok(page) => Ok(page),
1986 }
1987 }));
1988 self.poll_next(cx)
1989 }
1990 }
1991 }
1992 }
1993}
1994
1995type PageInfoRequest = BoxFuture<'static, Result<StreamInfoPage, RequestError>>;
1996
1997pub type StreamsErrorKind = RequestErrorKind;
1998pub type StreamsError = RequestError;
1999
2000pub struct Streams {
2001 context: Context,
2002 offset: usize,
2003 page_request: Option<PageInfoRequest>,
2004 streams: Vec<super::stream::Info>,
2005 done: bool,
2006}
2007
2008impl futures_util::Stream for Streams {
2009 type Item = Result<super::stream::Info, StreamsError>;
2010
2011 fn poll_next(
2012 mut self: Pin<&mut Self>,
2013 cx: &mut std::task::Context<'_>,
2014 ) -> std::task::Poll<Option<Self::Item>> {
2015 match self.page_request.as_mut() {
2016 Some(page) => match page.try_poll_unpin(cx) {
2017 std::task::Poll::Ready(page) => {
2018 self.page_request = None;
2019 let page = page
2020 .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?;
2021 if let Some(streams) = page.streams {
2022 self.offset += streams.len();
2023 self.streams = streams;
2024 if self.offset >= page.total {
2025 self.done = true;
2026 }
2027 match self.streams.pop() {
2028 Some(stream) => Poll::Ready(Some(Ok(stream))),
2029 None => Poll::Ready(None),
2030 }
2031 } else {
2032 Poll::Ready(None)
2033 }
2034 }
2035 std::task::Poll::Pending => std::task::Poll::Pending,
2036 },
2037 None => {
2038 if let Some(stream) = self.streams.pop() {
2039 Poll::Ready(Some(Ok(stream)))
2040 } else {
2041 if self.done {
2042 return Poll::Ready(None);
2043 }
2044 let context = self.context.clone();
2045 let offset = self.offset;
2046 self.page_request = Some(Box::pin(async move {
2047 match context
2048 .request(
2049 "STREAM.LIST",
2050 &json!({
2051 "offset": offset,
2052 }),
2053 )
2054 .await?
2055 {
2056 Response::Err { error } => {
2057 Err(RequestError::with_source(RequestErrorKind::Other, error))
2058 }
2059 Response::Ok(page) => Ok(page),
2060 }
2061 }));
2062 self.poll_next(cx)
2063 }
2064 }
2065 }
2066 }
2067}
2068
2069#[deprecated(
2072 note = "use jetstream::message::PublishMessage instead",
2073 since = "0.44.0"
2074)]
2075pub type Publish = super::message::PublishMessage;
2076
2077#[derive(Clone, Copy, Debug, PartialEq)]
2078pub enum RequestErrorKind {
2079 NoResponders,
2080 TimedOut,
2081 InvalidSubject,
2082 Other,
2083}
2084
2085impl Display for RequestErrorKind {
2086 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2087 match self {
2088 Self::TimedOut => write!(f, "timed out"),
2089 Self::Other => write!(f, "request failed"),
2090 Self::InvalidSubject => write!(f, "invalid subject"),
2091 Self::NoResponders => write!(f, "requested JetStream resource does not exist"),
2092 }
2093 }
2094}
2095
2096pub type RequestError = Error<RequestErrorKind>;
2097
2098impl From<crate::RequestError> for RequestError {
2099 fn from(error: crate::RequestError) -> Self {
2100 match error.kind() {
2101 crate::RequestErrorKind::TimedOut => {
2102 RequestError::with_source(RequestErrorKind::TimedOut, error)
2103 }
2104 crate::RequestErrorKind::NoResponders => {
2105 RequestError::new(RequestErrorKind::NoResponders)
2106 }
2107 crate::RequestErrorKind::InvalidSubject => {
2108 RequestError::with_source(RequestErrorKind::InvalidSubject, error)
2109 }
2110 crate::RequestErrorKind::Other => {
2111 RequestError::with_source(RequestErrorKind::Other, error)
2112 }
2113 }
2114 }
2115}
2116
2117impl From<super::errors::Error> for RequestError {
2118 fn from(err: super::errors::Error) -> Self {
2119 RequestError::with_source(RequestErrorKind::Other, err)
2120 }
2121}
2122
2123pub type ConsumerInfoError = Error<ConsumerInfoErrorKind>;
2124
2125#[derive(Clone, Debug, PartialEq)]
2126pub enum ConsumerInfoErrorKind {
2127 InvalidName,
2128 Offline,
2129 NotFound,
2130 StreamNotFound,
2131 Request,
2132 JetStream(super::errors::Error),
2133 TimedOut,
2134 NoResponders,
2135}
2136
2137impl Display for ConsumerInfoErrorKind {
2138 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2139 match self {
2140 Self::InvalidName => write!(f, "invalid consumer name"),
2141 Self::Offline => write!(f, "consumer is offline"),
2142 Self::NotFound => write!(f, "consumer not found"),
2143 Self::StreamNotFound => write!(f, "stream not found"),
2144 Self::Request => write!(f, "request error"),
2145 Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2146 Self::TimedOut => write!(f, "timed out"),
2147 Self::NoResponders => write!(f, "no responders"),
2148 }
2149 }
2150}
2151
2152impl From<super::errors::Error> for ConsumerInfoError {
2153 fn from(error: super::errors::Error) -> Self {
2154 match error.error_code() {
2155 ErrorCode::CONSUMER_NOT_FOUND => {
2156 ConsumerInfoError::new(ConsumerInfoErrorKind::NotFound)
2157 }
2158 ErrorCode::STREAM_NOT_FOUND => {
2159 ConsumerInfoError::new(ConsumerInfoErrorKind::StreamNotFound)
2160 }
2161 ErrorCode::CONSUMER_OFFLINE => ConsumerInfoError::new(ConsumerInfoErrorKind::Offline),
2162 _ => ConsumerInfoError::new(ConsumerInfoErrorKind::JetStream(error)),
2163 }
2164 }
2165}
2166
2167impl From<RequestError> for ConsumerInfoError {
2168 fn from(error: RequestError) -> Self {
2169 match error.kind() {
2170 RequestErrorKind::TimedOut => ConsumerInfoError::new(ConsumerInfoErrorKind::TimedOut),
2171 RequestErrorKind::InvalidSubject => {
2172 ConsumerInfoError::with_source(ConsumerInfoErrorKind::InvalidName, error)
2173 }
2174 RequestErrorKind::Other => {
2175 ConsumerInfoError::with_source(ConsumerInfoErrorKind::Request, error)
2176 }
2177 RequestErrorKind::NoResponders => {
2178 ConsumerInfoError::new(ConsumerInfoErrorKind::NoResponders)
2179 }
2180 }
2181 }
2182}
2183
2184#[derive(Clone, Debug, PartialEq)]
2185pub enum CreateStreamErrorKind {
2186 EmptyStreamName,
2187 InvalidStreamName,
2188 DomainAndExternalSet,
2189 JetStreamUnavailable,
2190 JetStream(super::errors::Error),
2191 TimedOut,
2192 Response,
2193 NotFound,
2194 ResponseParse,
2195}
2196
2197impl Display for CreateStreamErrorKind {
2198 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2199 match self {
2200 Self::EmptyStreamName => write!(f, "stream name cannot be empty"),
2201 Self::InvalidStreamName => write!(f, "stream name cannot contain `.`, `_`"),
2202 Self::DomainAndExternalSet => write!(f, "domain and external are both set"),
2203 Self::NotFound => write!(f, "stream not found"),
2204 Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2205 Self::TimedOut => write!(f, "jetstream request timed out"),
2206 Self::JetStreamUnavailable => write!(f, "jetstream unavailable"),
2207 Self::ResponseParse => write!(f, "failed to parse server response"),
2208 Self::Response => write!(f, "response error"),
2209 }
2210 }
2211}
2212
2213pub type CreateStreamError = Error<CreateStreamErrorKind>;
2214
2215impl From<super::errors::Error> for CreateStreamError {
2216 fn from(error: super::errors::Error) -> Self {
2217 match error.kind() {
2218 super::errors::ErrorCode::STREAM_NOT_FOUND => {
2219 CreateStreamError::new(CreateStreamErrorKind::NotFound)
2220 }
2221 _ => CreateStreamError::new(CreateStreamErrorKind::JetStream(error)),
2222 }
2223 }
2224}
2225
2226impl From<RequestError> for CreateStreamError {
2227 fn from(error: RequestError) -> Self {
2228 match error.kind() {
2229 RequestErrorKind::NoResponders => {
2230 CreateStreamError::new(CreateStreamErrorKind::JetStreamUnavailable)
2231 }
2232 RequestErrorKind::TimedOut => CreateStreamError::new(CreateStreamErrorKind::TimedOut),
2233 RequestErrorKind::InvalidSubject => {
2234 CreateStreamError::with_source(CreateStreamErrorKind::InvalidStreamName, error)
2235 }
2236 RequestErrorKind::Other => {
2237 CreateStreamError::with_source(CreateStreamErrorKind::Response, error)
2238 }
2239 }
2240 }
2241}
2242
2243#[derive(Clone, Debug, PartialEq)]
2244pub enum GetStreamErrorKind {
2245 EmptyName,
2246 Request,
2247 InvalidStreamName,
2248 JetStream(super::errors::Error),
2249}
2250
2251impl Display for GetStreamErrorKind {
2252 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2253 match self {
2254 Self::EmptyName => write!(f, "empty name cannot be empty"),
2255 Self::Request => write!(f, "request error"),
2256 Self::InvalidStreamName => write!(f, "invalid stream name"),
2257 Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2258 }
2259 }
2260}
2261
2262#[derive(Clone, Debug, PartialEq)]
2263pub enum GetStreamByNameErrorKind {
2264 Request,
2265 NotFound,
2266 InvalidSubject,
2267 JetStream(super::errors::Error),
2268}
2269
2270impl Display for GetStreamByNameErrorKind {
2271 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2272 match self {
2273 Self::Request => write!(f, "request error"),
2274 Self::NotFound => write!(f, "stream not found"),
2275 Self::InvalidSubject => write!(f, "invalid subject"),
2276 Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2277 }
2278 }
2279}
2280
2281pub type GetStreamError = Error<GetStreamErrorKind>;
2282pub type GetStreamByNameError = Error<GetStreamByNameErrorKind>;
2283
2284pub type UpdateStreamError = CreateStreamError;
2285pub type UpdateStreamErrorKind = CreateStreamErrorKind;
2286pub type DeleteStreamError = GetStreamError;
2287pub type DeleteStreamErrorKind = GetStreamErrorKind;
2288
2289#[cfg(feature = "kv")]
2290#[derive(Clone, Copy, Debug, PartialEq)]
2291pub enum KeyValueErrorKind {
2292 InvalidStoreName,
2293 GetBucket,
2294 JetStream,
2295}
2296
2297#[cfg(feature = "kv")]
2298impl Display for KeyValueErrorKind {
2299 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2300 match self {
2301 Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
2302 Self::GetBucket => write!(f, "failed to get the bucket"),
2303 Self::JetStream => write!(f, "JetStream error"),
2304 }
2305 }
2306}
2307
2308#[cfg(feature = "kv")]
2309pub type KeyValueError = Error<KeyValueErrorKind>;
2310
2311#[cfg(any(feature = "kv", feature = "object-store"))]
2312#[derive(Clone, Copy, Debug, PartialEq)]
2313pub enum CreateKeyValueErrorKind {
2314 InvalidStoreName,
2315 TooLongHistory,
2316 JetStream,
2317 BucketCreate,
2318 TimedOut,
2319 LimitMarkersNotSupported,
2320}
2321
2322#[cfg(any(feature = "kv", feature = "object-store"))]
2323impl Display for CreateKeyValueErrorKind {
2324 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2325 match self {
2326 Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
2327 Self::TooLongHistory => write!(f, "too long history"),
2328 Self::JetStream => write!(f, "JetStream error"),
2329 Self::BucketCreate => write!(f, "bucket creation failed"),
2330 Self::TimedOut => write!(f, "timed out"),
2331 Self::LimitMarkersNotSupported => {
2332 write!(f, "limit markers not supported")
2333 }
2334 }
2335 }
2336}
2337
2338#[cfg(feature = "kv")]
2339#[derive(Clone, Copy, Debug, PartialEq)]
2340pub enum UpdateKeyValueErrorKind {
2341 InvalidStoreName,
2342 TooLongHistory,
2343 JetStream,
2344 BucketUpdate,
2345 TimedOut,
2346 LimitMarkersNotSupported,
2347 NotFound,
2348}
2349
2350#[cfg(feature = "kv")]
2351impl Display for UpdateKeyValueErrorKind {
2352 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2353 match self {
2354 Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
2355 Self::TooLongHistory => write!(f, "too long history"),
2356 Self::JetStream => write!(f, "JetStream error"),
2357 Self::BucketUpdate => write!(f, "bucket creation failed"),
2358 Self::TimedOut => write!(f, "timed out"),
2359 Self::LimitMarkersNotSupported => {
2360 write!(f, "limit markers not supported")
2361 }
2362 Self::NotFound => write!(f, "bucket does not exist"),
2363 }
2364 }
2365}
2366#[cfg(feature = "kv")]
2367pub type CreateKeyValueError = Error<CreateKeyValueErrorKind>;
2368#[cfg(feature = "kv")]
2369pub type UpdateKeyValueError = Error<UpdateKeyValueErrorKind>;
2370
2371#[cfg(feature = "object-store")]
2372pub type CreateObjectStoreError = Error<CreateKeyValueErrorKind>;
2373#[cfg(feature = "object-store")]
2374pub type CreateObjectStoreErrorKind = CreateKeyValueErrorKind;
2375
2376#[cfg(feature = "object-store")]
2377#[derive(Clone, Copy, Debug, PartialEq)]
2378pub enum ObjectStoreErrorKind {
2379 InvalidBucketName,
2380 GetStore,
2381}
2382
2383#[cfg(feature = "object-store")]
2384impl Display for ObjectStoreErrorKind {
2385 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2386 match self {
2387 Self::InvalidBucketName => write!(f, "invalid Object Store bucket name"),
2388 Self::GetStore => write!(f, "failed to get Object Store"),
2389 }
2390 }
2391}
2392
2393#[cfg(feature = "object-store")]
2394pub type ObjectStoreError = Error<ObjectStoreErrorKind>;
2395
2396#[cfg(feature = "object-store")]
2397pub type DeleteObjectStore = ObjectStoreError;
2398#[cfg(feature = "object-store")]
2399pub type DeleteObjectStoreKind = ObjectStoreErrorKind;
2400
2401#[derive(Clone, Debug, PartialEq)]
2402pub enum AccountErrorKind {
2403 TimedOut,
2404 JetStream(super::errors::Error),
2405 JetStreamUnavailable,
2406 Other,
2407}
2408
2409impl Display for AccountErrorKind {
2410 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2411 match self {
2412 Self::TimedOut => write!(f, "timed out"),
2413 Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2414 Self::Other => write!(f, "error"),
2415 Self::JetStreamUnavailable => write!(f, "JetStream unavailable"),
2416 }
2417 }
2418}
2419
2420pub type AccountError = Error<AccountErrorKind>;
2421
2422impl From<RequestError> for AccountError {
2423 fn from(err: RequestError) -> Self {
2424 match err.kind {
2425 RequestErrorKind::NoResponders => {
2426 AccountError::with_source(AccountErrorKind::JetStreamUnavailable, err)
2427 }
2428 RequestErrorKind::TimedOut => AccountError::new(AccountErrorKind::TimedOut),
2429 RequestErrorKind::Other | RequestErrorKind::InvalidSubject => {
2430 AccountError::with_source(AccountErrorKind::Other, err)
2431 }
2432 }
2433 }
2434}
2435
2436#[derive(Clone, Debug, Serialize)]
2437enum ConsumerAction {
2438 #[serde(rename = "")]
2439 CreateOrUpdate,
2440 #[serde(rename = "create")]
2441 #[cfg(feature = "server_2_10")]
2442 Create,
2443 #[serde(rename = "update")]
2444 #[cfg(feature = "server_2_10")]
2445 Update,
2446}
2447
2448#[cfg(feature = "kv")]
2449fn map_to_kv(stream: super::stream::Stream, prefix: String, bucket: String) -> Store {
2451 let mut store = Store {
2452 prefix: format!("$KV.{}.", bucket.as_str()),
2453 name: bucket,
2454 stream: stream.clone(),
2455 stream_name: stream.info.config.name.clone(),
2456 put_prefix: None,
2457 use_jetstream_prefix: prefix != "$JS.API",
2458 };
2459 if let Some(ref mirror) = stream.info.config.mirror {
2460 let bucket = mirror.name.trim_start_matches("KV_");
2461 if let Some(ref external) = mirror.external {
2462 if !external.api_prefix.is_empty() {
2463 store.use_jetstream_prefix = false;
2464 store.prefix = format!("$KV.{bucket}.");
2465 store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
2466 } else {
2467 store.put_prefix = Some(format!("$KV.{bucket}."));
2468 }
2469 }
2470 };
2471 store
2472}
2473
2474#[cfg(feature = "kv")]
2475enum KvToStreamConfigError {
2476 TooLongHistory,
2477 #[allow(dead_code)]
2478 LimitMarkersNotSupported,
2479}
2480
2481#[cfg(feature = "kv")]
2482impl From<KvToStreamConfigError> for CreateKeyValueError {
2483 fn from(err: KvToStreamConfigError) -> Self {
2484 match err {
2485 KvToStreamConfigError::TooLongHistory => {
2486 CreateKeyValueError::new(CreateKeyValueErrorKind::TooLongHistory)
2487 }
2488 KvToStreamConfigError::LimitMarkersNotSupported => {
2489 CreateKeyValueError::new(CreateKeyValueErrorKind::LimitMarkersNotSupported)
2490 }
2491 }
2492 }
2493}
2494
2495#[cfg(feature = "kv")]
2496impl From<KvToStreamConfigError> for UpdateKeyValueError {
2497 fn from(err: KvToStreamConfigError) -> Self {
2498 match err {
2499 KvToStreamConfigError::TooLongHistory => {
2500 UpdateKeyValueError::new(UpdateKeyValueErrorKind::TooLongHistory)
2501 }
2502 KvToStreamConfigError::LimitMarkersNotSupported => {
2503 UpdateKeyValueError::new(UpdateKeyValueErrorKind::LimitMarkersNotSupported)
2504 }
2505 }
2506 }
2507}
2508
2509#[cfg(feature = "kv")]
2510fn kv_to_stream_config(
2512 config: crate::jetstream::kv::Config,
2513 _account: Account,
2514) -> Result<super::stream::Config, KvToStreamConfigError> {
2515 let history = if config.history > 0 {
2516 if config.history > MAX_HISTORY {
2517 return Err(KvToStreamConfigError::TooLongHistory);
2518 }
2519 config.history
2520 } else {
2521 1
2522 };
2523
2524 let num_replicas = if config.num_replicas == 0 {
2525 1
2526 } else {
2527 config.num_replicas
2528 };
2529
2530 #[cfg(feature = "server_2_11")]
2531 let (mut allow_message_ttl, mut subject_delete_marker_ttl) = (false, None);
2532
2533 #[cfg(feature = "server_2_11")]
2534 if let Some(duration) = config.limit_markers {
2535 if _account.requests.level < 1 {
2536 return Err(KvToStreamConfigError::LimitMarkersNotSupported);
2537 }
2538 allow_message_ttl = true;
2539 subject_delete_marker_ttl = Some(duration);
2540 }
2541
2542 let mut mirror = config.mirror.clone();
2543 let mut sources = config.sources.clone();
2544 let mut mirror_direct = config.mirror_direct;
2545
2546 let mut subjects = Vec::new();
2547 if let Some(ref mut mirror) = mirror {
2548 if !mirror.name.starts_with("KV_") {
2549 mirror.name = format!("KV_{}", mirror.name);
2550 }
2551 mirror_direct = true;
2552 } else if let Some(ref mut sources) = sources {
2553 for source in sources {
2554 if !source.name.starts_with("KV_") {
2555 source.name = format!("KV_{}", source.name);
2556 }
2557 }
2558 } else {
2559 subjects = vec![format!("$KV.{}.>", config.bucket)];
2560 }
2561
2562 Ok(Config {
2563 name: format!("KV_{}", config.bucket),
2564 description: Some(config.description),
2565 subjects,
2566 max_messages_per_subject: history,
2567 max_bytes: config.max_bytes,
2568 max_age: config.max_age,
2569 max_message_size: config.max_value_size,
2570 storage: config.storage,
2571 republish: config.republish,
2572 allow_rollup: true,
2573 deny_delete: true,
2574 deny_purge: false,
2575 allow_direct: true,
2576 sources,
2577 mirror,
2578 num_replicas,
2579 discard: DiscardPolicy::New,
2580 mirror_direct,
2581 #[cfg(feature = "server_2_10")]
2582 compression: if config.compression {
2583 Some(Compression::S2)
2584 } else {
2585 None
2586 },
2587 placement: config.placement,
2588 #[cfg(feature = "server_2_11")]
2589 allow_message_ttl,
2590 #[cfg(feature = "server_2_11")]
2591 subject_delete_marker_ttl,
2592 ..Default::default()
2593 })
2594}