1use crate::error::Error;
17use crate::jetstream::account::Account;
18use crate::jetstream::message::PublishMessage;
19use crate::jetstream::publish::PublishAck;
20use crate::jetstream::response::Response;
21use crate::subject::ToSubject;
22use crate::{is_valid_subject, jetstream, Client, Command, Message, StatusCode};
23use bytes::Bytes;
24use futures_util::future::BoxFuture;
25use futures_util::{Future, StreamExt, TryFutureExt};
26use serde::de::DeserializeOwned;
27use serde::{Deserialize, Serialize};
28use serde_json::{self, json};
29use std::borrow::Borrow;
30use std::fmt::Debug;
31use std::fmt::Display;
32use std::future::IntoFuture;
33use std::pin::Pin;
34use std::str::from_utf8;
35use std::sync::Arc;
36use std::task::Poll;
37use tokio::sync::{mpsc, oneshot, OwnedSemaphorePermit, TryAcquireError};
38use tokio::time::Duration;
39use tokio_stream::wrappers::ReceiverStream;
40use tracing::debug;
41
42use super::consumer::{self, Consumer, FromConsumer, IntoConsumerConfig};
43use super::errors::ErrorCode;
44use super::is_valid_name;
45#[cfg(feature = "kv")]
46use super::kv::{Store, MAX_HISTORY};
47#[cfg(feature = "object-store")]
48use super::object_store::{is_valid_bucket_name, ObjectStore};
49#[cfg(any(feature = "kv", feature = "object-store"))]
50use super::stream::DiscardPolicy;
51#[cfg(feature = "server_2_10")]
52use super::stream::{Compression, ConsumerCreateStrictError, ConsumerUpdateError};
53use super::stream::{
54 Config, ConsumerError, ConsumerErrorKind, DeleteStatus, External, Info, Stream,
55};
56
57pub mod traits {
58 use std::{future::Future, time::Duration};
59
60 use bytes::Bytes;
61 use serde::{de::DeserializeOwned, Serialize};
62
63 use crate::{jetstream::message, subject::ToSubject, Request};
64
65 use super::RequestError;
66
67 pub trait Requester {
68 fn request<S, T, V>(
69 &self,
70 subject: S,
71 payload: &T,
72 ) -> impl Future<Output = Result<V, RequestError>>
73 where
74 S: ToSubject,
75 T: ?Sized + Serialize,
76 V: DeserializeOwned;
77 }
78
79 pub trait RequestSender {
80 fn send_request<S: ToSubject>(
81 &self,
82 subject: S,
83 request: Request,
84 ) -> impl Future<Output = Result<(), crate::PublishError>>;
85 }
86
87 pub trait Publisher {
88 fn publish<S: ToSubject>(
89 &self,
90 subject: S,
91 payload: Bytes,
92 ) -> impl Future<Output = Result<super::PublishAckFuture, super::PublishError>>;
93
94 fn publish_message(
95 &self,
96 message: message::OutboundMessage,
97 ) -> impl Future<Output = Result<super::PublishAckFuture, super::PublishError>>;
98 }
99
100 pub trait ClientProvider {
101 fn client(&self) -> crate::Client;
102 }
103
104 pub trait TimeoutProvider {
105 fn timeout(&self) -> Duration;
106 }
107}
108
109#[derive(Debug, Clone)]
111pub struct Context {
112 pub(crate) client: Client,
113 pub(crate) prefix: String,
114 pub(crate) timeout: Duration,
115 pub(crate) max_ack_semaphore: Arc<tokio::sync::Semaphore>,
116 pub(crate) ack_sender:
117 tokio::sync::mpsc::Sender<(oneshot::Receiver<Message>, OwnedSemaphorePermit)>,
118 pub(crate) backpressure_on_inflight: bool,
119 pub(crate) semaphore_capacity: usize,
120}
121
122fn spawn_acker(
123 rx: ReceiverStream<(oneshot::Receiver<Message>, OwnedSemaphorePermit)>,
124 ack_timeout: Duration,
125 concurrency: Option<usize>,
126) -> tokio::task::JoinHandle<()> {
127 tokio::spawn(async move {
128 rx.for_each_concurrent(concurrency, |(subscription, permit)| async move {
129 tokio::time::timeout(ack_timeout, subscription).await.ok();
130 drop(permit);
131 })
132 .await;
133 debug!("Acker task exited");
134 })
135}
136
137use std::marker::PhantomData;
138
139#[derive(Debug, Default)]
140pub struct Yes;
141#[derive(Debug, Default)]
142pub struct No;
143
144pub trait ToAssign: Debug {}
145
146impl ToAssign for Yes {}
147impl ToAssign for No {}
148
149pub struct ContextBuilder<PREFIX: ToAssign> {
168 prefix: String,
169 timeout: Duration,
170 semaphore_capacity: usize,
171 ack_timeout: Duration,
172 backpressure_on_inflight: bool,
173 concurrency_limit: Option<usize>,
174 _phantom: PhantomData<PREFIX>,
175}
176
177impl Default for ContextBuilder<Yes> {
178 fn default() -> Self {
179 ContextBuilder {
180 prefix: "$JS.API".to_string(),
181 timeout: Duration::from_secs(5),
182 semaphore_capacity: 5_000,
183 ack_timeout: Duration::from_secs(30),
184 backpressure_on_inflight: true,
185 concurrency_limit: None,
186 _phantom: PhantomData {},
187 }
188 }
189}
190
191impl ContextBuilder<Yes> {
192 pub fn new() -> ContextBuilder<Yes> {
194 ContextBuilder::default()
195 }
196}
197
198impl ContextBuilder<Yes> {
199 pub fn api_prefix<T: Into<String>>(self, prefix: T) -> ContextBuilder<No> {
201 ContextBuilder {
202 prefix: prefix.into(),
203 timeout: self.timeout,
204 semaphore_capacity: self.semaphore_capacity,
205 ack_timeout: self.ack_timeout,
206 backpressure_on_inflight: self.backpressure_on_inflight,
207 concurrency_limit: self.concurrency_limit,
208 _phantom: PhantomData,
209 }
210 }
211
212 pub fn domain<T: Into<String>>(self, domain: T) -> ContextBuilder<No> {
215 ContextBuilder {
216 prefix: format!("$JS.{}.API", domain.into()),
217 timeout: self.timeout,
218 semaphore_capacity: self.semaphore_capacity,
219 ack_timeout: self.ack_timeout,
220 backpressure_on_inflight: self.backpressure_on_inflight,
221 concurrency_limit: self.concurrency_limit,
222 _phantom: PhantomData,
223 }
224 }
225}
226
227impl<PREFIX> ContextBuilder<PREFIX>
228where
229 PREFIX: ToAssign,
230{
231 pub fn timeout(self, timeout: Duration) -> ContextBuilder<Yes>
233 where
234 Yes: ToAssign,
235 {
236 ContextBuilder {
237 prefix: self.prefix,
238 timeout,
239 semaphore_capacity: self.semaphore_capacity,
240 ack_timeout: self.ack_timeout,
241 backpressure_on_inflight: self.backpressure_on_inflight,
242 concurrency_limit: self.concurrency_limit,
243 _phantom: PhantomData,
244 }
245 }
246
247 pub fn ack_timeout(self, ack_timeout: Duration) -> ContextBuilder<Yes>
250 where
251 Yes: ToAssign,
252 {
253 ContextBuilder {
254 prefix: self.prefix,
255 timeout: self.timeout,
256 semaphore_capacity: self.semaphore_capacity,
257 ack_timeout,
258 backpressure_on_inflight: self.backpressure_on_inflight,
259 concurrency_limit: self.concurrency_limit,
260 _phantom: PhantomData,
261 }
262 }
263
264 pub fn max_ack_inflight(self, capacity: usize) -> ContextBuilder<Yes>
267 where
268 Yes: ToAssign,
269 {
270 ContextBuilder {
271 prefix: self.prefix,
272 timeout: self.timeout,
273 semaphore_capacity: capacity,
274 ack_timeout: self.ack_timeout,
275 backpressure_on_inflight: self.backpressure_on_inflight,
276 concurrency_limit: self.concurrency_limit,
277 _phantom: PhantomData,
278 }
279 }
280
281 pub fn backpressure_on_inflight(self, enabled: bool) -> ContextBuilder<Yes>
286 where
287 Yes: ToAssign,
288 {
289 ContextBuilder {
290 prefix: self.prefix,
291 timeout: self.timeout,
292 semaphore_capacity: self.semaphore_capacity,
293 ack_timeout: self.ack_timeout,
294 backpressure_on_inflight: enabled,
295 concurrency_limit: self.concurrency_limit,
296 _phantom: PhantomData,
297 }
298 }
299
300 pub fn concurrency_limit(self, limit: Option<usize>) -> ContextBuilder<Yes>
303 where
304 Yes: ToAssign,
305 {
306 ContextBuilder {
307 prefix: self.prefix,
308 timeout: self.timeout,
309 semaphore_capacity: self.semaphore_capacity,
310 ack_timeout: self.ack_timeout,
311 backpressure_on_inflight: self.backpressure_on_inflight,
312 concurrency_limit: limit,
313 _phantom: PhantomData,
314 }
315 }
316
317 pub fn build(self, client: Client) -> Context {
319 let (tx, rx) = tokio::sync::mpsc::channel::<(
320 oneshot::Receiver<Message>,
321 OwnedSemaphorePermit,
322 )>(self.semaphore_capacity);
323 let stream = ReceiverStream::new(rx);
324 spawn_acker(stream, self.ack_timeout, self.concurrency_limit);
325 Context {
326 client,
327 prefix: self.prefix,
328 timeout: self.timeout,
329 max_ack_semaphore: Arc::new(tokio::sync::Semaphore::new(self.semaphore_capacity)),
330 ack_sender: tx,
331 backpressure_on_inflight: self.backpressure_on_inflight,
332 semaphore_capacity: self.semaphore_capacity,
333 }
334 }
335}
336
337impl Context {
338 pub(crate) fn new(client: Client) -> Context {
339 ContextBuilder::default().build(client)
340 }
341
342 pub fn set_timeout(&mut self, timeout: Duration) {
344 self.timeout = timeout
345 }
346
347 pub fn client(&self) -> Client {
349 self.client.clone()
350 }
351
352 pub async fn wait_for_acks(&self) {
358 self.max_ack_semaphore
359 .acquire_many(self.semaphore_capacity as u32)
360 .await
361 .ok();
362 }
363
364 pub(crate) fn with_prefix<T: ToString>(client: Client, prefix: T) -> Context {
366 ContextBuilder::new()
367 .api_prefix(prefix.to_string())
368 .build(client)
369 }
370
371 pub(crate) fn with_domain<T: AsRef<str>>(client: Client, domain: T) -> Context {
373 ContextBuilder::new().domain(domain.as_ref()).build(client)
374 }
375
376 pub async fn publish<S: ToSubject>(
417 &self,
418 subject: S,
419 payload: Bytes,
420 ) -> Result<PublishAckFuture, PublishError> {
421 self.send_publish(subject, PublishMessage::build().payload(payload))
422 .await
423 }
424
425 pub async fn publish_with_headers<S: ToSubject>(
447 &self,
448 subject: S,
449 headers: crate::header::HeaderMap,
450 payload: Bytes,
451 ) -> Result<PublishAckFuture, PublishError> {
452 self.send_publish(
453 subject,
454 PublishMessage::build().payload(payload).headers(headers),
455 )
456 .await
457 }
458
459 pub async fn send_publish<S: ToSubject>(
482 &self,
483 subject: S,
484 publish: PublishMessage,
485 ) -> Result<PublishAckFuture, PublishError> {
486 let permit = if self.backpressure_on_inflight {
487 self.max_ack_semaphore
489 .clone()
490 .acquire_owned()
491 .await
492 .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err))?
493 } else {
494 self.max_ack_semaphore
496 .clone()
497 .try_acquire_owned()
498 .map_err(|err| match err {
499 TryAcquireError::NoPermits => {
500 PublishError::new(PublishErrorKind::MaxAckPending)
501 }
502 _ => PublishError::with_source(PublishErrorKind::Other, err),
503 })?
504 };
505 let subject = subject.to_subject();
506 let (sender, receiver) = oneshot::channel();
507
508 let respond = self.client.new_inbox().into();
509
510 let send_fut = self
511 .client
512 .sender
513 .send(Command::Request {
514 subject,
515 payload: publish.payload,
516 respond,
517 headers: publish.headers,
518 sender,
519 })
520 .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err));
521
522 tokio::time::timeout(self.timeout, send_fut)
523 .map_err(|_elapsed| PublishError::new(PublishErrorKind::TimedOut))
524 .await??;
525
526 Ok(PublishAckFuture {
527 timeout: self.timeout,
528 subscription: Some(receiver),
529 permit: Some(permit),
530 tx: self.ack_sender.clone(),
531 })
532 }
533
534 pub async fn query_account(&self) -> Result<Account, AccountError> {
536 let response: Response<Account> = self.request("INFO", b"").await?;
537
538 match response {
539 Response::Err { error } => Err(AccountError::new(AccountErrorKind::JetStream(error))),
540 Response::Ok(account) => Ok(account),
541 }
542 }
543
544 pub async fn create_stream<S>(
569 &self,
570 stream_config: S,
571 ) -> Result<Stream<Info>, CreateStreamError>
572 where
573 Config: From<S>,
574 {
575 let mut config: Config = stream_config.into();
576 if config.name.is_empty() {
577 return Err(CreateStreamError::new(
578 CreateStreamErrorKind::EmptyStreamName,
579 ));
580 }
581 if !is_valid_name(config.name.as_str()) {
582 return Err(CreateStreamError::new(
583 CreateStreamErrorKind::InvalidStreamName,
584 ));
585 }
586 if let Some(ref mut mirror) = config.mirror {
587 if let Some(ref mut domain) = mirror.domain {
588 if mirror.external.is_some() {
589 return Err(CreateStreamError::new(
590 CreateStreamErrorKind::DomainAndExternalSet,
591 ));
592 }
593 mirror.external = Some(External {
594 api_prefix: format!("$JS.{domain}.API"),
595 delivery_prefix: None,
596 })
597 }
598 }
599
600 if let Some(ref mut sources) = config.sources {
601 for source in sources {
602 if let Some(ref mut domain) = source.domain {
603 if source.external.is_some() {
604 return Err(CreateStreamError::new(
605 CreateStreamErrorKind::DomainAndExternalSet,
606 ));
607 }
608 source.external = Some(External {
609 api_prefix: format!("$JS.{domain}.API"),
610 delivery_prefix: None,
611 })
612 }
613 }
614 }
615 let subject = format!("STREAM.CREATE.{}", config.name);
616 let response: Response<Info> = self.request(subject, &config).await?;
617
618 match response {
619 Response::Err { error } => Err(error.into()),
620 Response::Ok(info) => Ok(Stream {
621 context: self.clone(),
622 info,
623 name: config.name,
624 }),
625 }
626 }
627
628 pub async fn get_stream_no_info<T: AsRef<str>>(
648 &self,
649 stream: T,
650 ) -> Result<Stream<()>, GetStreamError> {
651 let stream = stream.as_ref();
652 if stream.is_empty() {
653 return Err(GetStreamError::new(GetStreamErrorKind::EmptyName));
654 }
655
656 if !is_valid_name(stream) {
657 return Err(GetStreamError::new(GetStreamErrorKind::InvalidStreamName));
658 }
659
660 Ok(Stream {
661 context: self.clone(),
662 info: (),
663 name: stream.to_string(),
664 })
665 }
666
667 pub async fn get_stream<T: AsRef<str>>(&self, stream: T) -> Result<Stream, GetStreamError> {
683 let stream = stream.as_ref();
684 if stream.is_empty() {
685 return Err(GetStreamError::new(GetStreamErrorKind::EmptyName));
686 }
687
688 if !is_valid_name(stream) {
689 return Err(GetStreamError::new(GetStreamErrorKind::InvalidStreamName));
690 }
691
692 let subject = format!("STREAM.INFO.{stream}");
693 let request: Response<Info> = self
694 .request(subject, &())
695 .await
696 .map_err(|err| GetStreamError::with_source(GetStreamErrorKind::Request, err))?;
697 match request {
698 Response::Err { error } => {
699 Err(GetStreamError::new(GetStreamErrorKind::JetStream(error)))
700 }
701 Response::Ok(info) => Ok(Stream {
702 context: self.clone(),
703 info,
704 name: stream.to_string(),
705 }),
706 }
707 }
708
709 pub async fn get_or_create_stream<S>(
733 &self,
734 stream_config: S,
735 ) -> Result<Stream, CreateStreamError>
736 where
737 S: Into<Config>,
738 {
739 let config: Config = stream_config.into();
740
741 if config.name.is_empty() {
742 return Err(CreateStreamError::new(
743 CreateStreamErrorKind::EmptyStreamName,
744 ));
745 }
746
747 if !is_valid_name(config.name.as_str()) {
748 return Err(CreateStreamError::new(
749 CreateStreamErrorKind::InvalidStreamName,
750 ));
751 }
752 let subject = format!("STREAM.INFO.{}", config.name);
753
754 let request: Response<Info> = self.request(subject, &()).await?;
755 match request {
756 Response::Err { error } if error.code() == 404 => self.create_stream(&config).await,
757 Response::Err { error } => Err(error.into()),
758 Response::Ok(info) => Ok(Stream {
759 context: self.clone(),
760 info,
761 name: config.name,
762 }),
763 }
764 }
765
766 pub async fn delete_stream<T: AsRef<str>>(
782 &self,
783 stream: T,
784 ) -> Result<DeleteStatus, DeleteStreamError> {
785 let stream = stream.as_ref();
786 if stream.is_empty() {
787 return Err(DeleteStreamError::new(DeleteStreamErrorKind::EmptyName));
788 }
789
790 if !is_valid_name(stream) {
791 return Err(DeleteStreamError::new(
792 DeleteStreamErrorKind::InvalidStreamName,
793 ));
794 }
795
796 let subject = format!("STREAM.DELETE.{stream}");
797 match self
798 .request(subject, &json!({}))
799 .await
800 .map_err(|err| DeleteStreamError::with_source(DeleteStreamErrorKind::Request, err))?
801 {
802 Response::Err { error } => Err(DeleteStreamError::new(
803 DeleteStreamErrorKind::JetStream(error),
804 )),
805 Response::Ok(delete_response) => Ok(delete_response),
806 }
807 }
808
809 pub async fn update_stream<S>(&self, config: S) -> Result<Info, UpdateStreamError>
834 where
835 S: Borrow<Config>,
836 {
837 let config = config.borrow();
838
839 if config.name.is_empty() {
840 return Err(CreateStreamError::new(
841 CreateStreamErrorKind::EmptyStreamName,
842 ));
843 }
844
845 if !is_valid_name(config.name.as_str()) {
846 return Err(CreateStreamError::new(
847 CreateStreamErrorKind::InvalidStreamName,
848 ));
849 }
850
851 let subject = format!("STREAM.UPDATE.{}", config.name);
852 match self.request(subject, config).await? {
853 Response::Err { error } => Err(error.into()),
854 Response::Ok(info) => Ok(info),
855 }
856 }
857
858 pub async fn create_or_update_stream(&self, config: Config) -> Result<Info, CreateStreamError> {
882 match self.update_stream(config.clone()).await {
883 Ok(stream) => Ok(stream),
884 Err(err) => match err.kind() {
885 CreateStreamErrorKind::NotFound => {
886 let stream = self
887 .create_stream(config)
888 .await
889 .map_err(|err| CreateStreamError::with_source(err.kind(), err))?;
890 Ok(stream.info)
891 }
892 _ => Err(err),
893 },
894 }
895 }
896
897 pub async fn stream_by_subject<T: Into<String>>(
912 &self,
913 subject: T,
914 ) -> Result<String, GetStreamByNameError> {
915 let subject = subject.into();
916 if !is_valid_subject(subject.as_str()) {
917 return Err(GetStreamByNameError::new(
918 GetStreamByNameErrorKind::InvalidSubject,
919 ));
920 }
921 let mut names = StreamNames {
922 context: self.clone(),
923 offset: 0,
924 page_request: None,
925 streams: Vec::new(),
926 subject: Some(subject),
927 done: false,
928 };
929 match names.next().await {
930 Some(name) => match name {
931 Ok(name) => Ok(name),
932 Err(err) => Err(GetStreamByNameError::with_source(
933 GetStreamByNameErrorKind::Request,
934 err,
935 )),
936 },
937 None => Err(GetStreamByNameError::new(
938 GetStreamByNameErrorKind::NotFound,
939 )),
940 }
941 }
942
943 pub fn stream_names(&self) -> StreamNames {
961 StreamNames {
962 context: self.clone(),
963 offset: 0,
964 page_request: None,
965 streams: Vec::new(),
966 subject: None,
967 done: false,
968 }
969 }
970
971 pub fn streams(&self) -> Streams {
989 Streams {
990 context: self.clone(),
991 offset: 0,
992 page_request: None,
993 streams: Vec::new(),
994 done: false,
995 }
996 }
997 #[cfg(feature = "kv")]
1011 #[cfg_attr(docsrs, doc(cfg(feature = "kv")))]
1012 pub async fn get_key_value<T: Into<String>>(&self, bucket: T) -> Result<Store, KeyValueError> {
1013 let bucket: String = bucket.into();
1014 if !crate::jetstream::kv::is_valid_bucket_name(&bucket) {
1015 return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
1016 }
1017
1018 let stream_name = format!("KV_{}", &bucket);
1019 let stream = self
1020 .get_stream(stream_name.clone())
1021 .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::GetBucket, err))
1022 .await?;
1023
1024 if stream.info.config.max_messages_per_subject < 1 {
1025 return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
1026 }
1027 let mut store = Store {
1028 prefix: format!("$KV.{}.", &bucket),
1029 name: bucket,
1030 stream_name,
1031 stream: stream.clone(),
1032 put_prefix: None,
1033 use_jetstream_prefix: self.prefix != "$JS.API",
1034 };
1035 if let Some(ref mirror) = stream.info.config.mirror {
1036 let bucket = mirror.name.trim_start_matches("KV_");
1037 if let Some(ref external) = mirror.external {
1038 if !external.api_prefix.is_empty() {
1039 store.use_jetstream_prefix = false;
1040 store.prefix = format!("$KV.{bucket}.");
1041 store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
1042 } else {
1043 store.put_prefix = Some(format!("$KV.{bucket}."));
1044 }
1045 }
1046 };
1047
1048 Ok(store)
1049 }
1050
1051 #[cfg(feature = "kv")]
1071 #[cfg_attr(docsrs, doc(cfg(feature = "kv")))]
1072 pub async fn create_key_value(
1073 &self,
1074 config: crate::jetstream::kv::Config,
1075 ) -> Result<Store, CreateKeyValueError> {
1076 if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1077 return Err(CreateKeyValueError::new(
1078 CreateKeyValueErrorKind::InvalidStoreName,
1079 ));
1080 }
1081 let info = self.query_account().await.map_err(|err| {
1082 CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
1083 })?;
1084
1085 let bucket_name = config.bucket.clone();
1086 let stream_config = kv_to_stream_config(config, info)?;
1087
1088 let stream = self.create_stream(stream_config).await.map_err(|err| {
1089 if err.kind() == CreateStreamErrorKind::TimedOut {
1090 CreateKeyValueError::with_source(CreateKeyValueErrorKind::TimedOut, err)
1091 } else {
1092 CreateKeyValueError::with_source(CreateKeyValueErrorKind::BucketCreate, err)
1093 }
1094 })?;
1095
1096 Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
1097 }
1098
1099 #[cfg(feature = "kv")]
1119 #[cfg_attr(docsrs, doc(cfg(feature = "kv")))]
1120 pub async fn update_key_value(
1121 &self,
1122 config: crate::jetstream::kv::Config,
1123 ) -> Result<Store, UpdateKeyValueError> {
1124 if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1125 return Err(UpdateKeyValueError::new(
1126 UpdateKeyValueErrorKind::InvalidStoreName,
1127 ));
1128 }
1129
1130 let stream_name = format!("KV_{}", config.bucket);
1131 let bucket_name = config.bucket.clone();
1132
1133 let account = self.query_account().await.map_err(|err| {
1134 UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::JetStream, err)
1135 })?;
1136 let stream = self
1137 .update_stream(kv_to_stream_config(config, account)?)
1138 .await
1139 .map_err(|err| match err.kind() {
1140 UpdateStreamErrorKind::NotFound => {
1141 UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::NotFound, err)
1142 }
1143 _ => UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::JetStream, err),
1144 })?;
1145
1146 let stream = Stream {
1147 context: self.clone(),
1148 info: stream,
1149 name: stream_name,
1150 };
1151
1152 Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
1153 }
1154
1155 #[cfg(feature = "kv")]
1175 #[cfg_attr(docsrs, doc(cfg(feature = "kv")))]
1176 pub async fn create_or_update_key_value(
1177 &self,
1178 config: crate::jetstream::kv::Config,
1179 ) -> Result<Store, CreateKeyValueError> {
1180 if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1181 return Err(CreateKeyValueError::new(
1182 CreateKeyValueErrorKind::InvalidStoreName,
1183 ));
1184 }
1185
1186 let bucket_name = config.bucket.clone();
1187 let stream_name = format!("KV_{}", config.bucket);
1188
1189 let account = self.query_account().await.map_err(|err| {
1190 CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
1191 })?;
1192 let stream = self
1193 .create_or_update_stream(kv_to_stream_config(config, account)?)
1194 .await
1195 .map_err(|err| {
1196 CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
1197 })?;
1198
1199 let stream = Stream {
1200 context: self.clone(),
1201 info: stream,
1202 name: stream_name,
1203 };
1204
1205 Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
1206 }
1207
1208 #[cfg(feature = "kv")]
1228 #[cfg_attr(docsrs, doc(cfg(feature = "kv")))]
1229 pub async fn delete_key_value<T: AsRef<str>>(
1230 &self,
1231 bucket: T,
1232 ) -> Result<DeleteStatus, KeyValueError> {
1233 if !crate::jetstream::kv::is_valid_bucket_name(bucket.as_ref()) {
1234 return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
1235 }
1236
1237 let stream_name = format!("KV_{}", bucket.as_ref());
1238 self.delete_stream(stream_name)
1239 .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::JetStream, err))
1240 .await
1241 }
1242
1243 pub async fn get_consumer_from_stream<T, C, S>(
1280 &self,
1281 consumer: C,
1282 stream: S,
1283 ) -> Result<Consumer<T>, ConsumerError>
1284 where
1285 T: FromConsumer + IntoConsumerConfig,
1286 S: AsRef<str>,
1287 C: AsRef<str>,
1288 {
1289 if !is_valid_name(stream.as_ref()) {
1290 return Err(ConsumerError::with_source(
1291 ConsumerErrorKind::InvalidName,
1292 "invalid stream",
1293 ));
1294 }
1295
1296 if !is_valid_name(consumer.as_ref()) {
1297 return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
1298 }
1299
1300 let subject = format!("CONSUMER.INFO.{}.{}", stream.as_ref(), consumer.as_ref());
1301
1302 let info: super::consumer::Info = match self.request(subject, &json!({})).await? {
1303 Response::Ok(info) => info,
1304 Response::Err { error } => return Err(error.into()),
1305 };
1306
1307 Ok(Consumer::new(
1308 T::try_from_consumer_config(info.config.clone()).map_err(|err| {
1309 ConsumerError::with_source(ConsumerErrorKind::InvalidConsumerType, err)
1310 })?,
1311 info,
1312 self.clone(),
1313 ))
1314 }
1315
1316 pub async fn delete_consumer_from_stream<C: AsRef<str>, S: AsRef<str>>(
1339 &self,
1340 consumer: C,
1341 stream: S,
1342 ) -> Result<DeleteStatus, ConsumerError> {
1343 if !is_valid_name(consumer.as_ref()) {
1344 return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
1345 }
1346
1347 if !is_valid_name(stream.as_ref()) {
1348 return Err(ConsumerError::with_source(
1349 ConsumerErrorKind::Other,
1350 "invalid stream name",
1351 ));
1352 }
1353
1354 let subject = format!("CONSUMER.DELETE.{}.{}", stream.as_ref(), consumer.as_ref());
1355
1356 match self.request(subject, &json!({})).await? {
1357 Response::Ok(delete_status) => Ok(delete_status),
1358 Response::Err { error } => Err(error.into()),
1359 }
1360 }
1361
1362 pub async fn create_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
1388 &self,
1389 config: C,
1390 stream: S,
1391 ) -> Result<Consumer<C>, ConsumerError> {
1392 self.create_consumer_on_stream_action(config, stream, ConsumerAction::CreateOrUpdate)
1393 .await
1394 }
1395
1396 #[cfg(feature = "server_2_10")]
1423 pub async fn update_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
1424 &self,
1425 config: C,
1426 stream: S,
1427 ) -> Result<Consumer<C>, ConsumerUpdateError> {
1428 self.create_consumer_on_stream_action(config, stream, ConsumerAction::Update)
1429 .await
1430 .map_err(|err| err.into())
1431 }
1432
1433 #[cfg(feature = "server_2_10")]
1460 pub async fn create_consumer_strict_on_stream<
1461 C: IntoConsumerConfig + FromConsumer,
1462 S: AsRef<str>,
1463 >(
1464 &self,
1465 config: C,
1466 stream: S,
1467 ) -> Result<Consumer<C>, ConsumerCreateStrictError> {
1468 self.create_consumer_on_stream_action(config, stream, ConsumerAction::Create)
1469 .await
1470 .map_err(|err| err.into())
1471 }
1472
1473 async fn create_consumer_on_stream_action<
1474 C: IntoConsumerConfig + FromConsumer,
1475 S: AsRef<str>,
1476 >(
1477 &self,
1478 config: C,
1479 stream: S,
1480 action: ConsumerAction,
1481 ) -> Result<Consumer<C>, ConsumerError> {
1482 let config = config.into_consumer_config();
1483
1484 let subject = {
1485 let filter = if config.filter_subject.is_empty() {
1486 "".to_string()
1487 } else {
1488 format!(".{}", config.filter_subject)
1489 };
1490 config
1491 .name
1492 .as_ref()
1493 .or(config.durable_name.as_ref())
1494 .map(|name| format!("CONSUMER.CREATE.{}.{}{}", stream.as_ref(), name, filter))
1495 .unwrap_or_else(|| format!("CONSUMER.CREATE.{}", stream.as_ref()))
1496 };
1497
1498 match self
1499 .request(
1500 subject,
1501 &json!({"stream_name": stream.as_ref(), "config": config, "action": action}),
1502 )
1503 .await?
1504 {
1505 Response::Err { error } => Err(ConsumerError::new(ConsumerErrorKind::JetStream(error))),
1506 Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
1507 FromConsumer::try_from_consumer_config(info.clone().config)
1508 .map_err(|err| ConsumerError::with_source(ConsumerErrorKind::Other, err))?,
1509 info,
1510 self.clone(),
1511 )),
1512 }
1513 }
1514
1515 pub async fn request<S, T, V>(&self, subject: S, payload: &T) -> Result<V, RequestError>
1535 where
1536 S: ToSubject,
1537 T: ?Sized + Serialize,
1538 V: DeserializeOwned,
1539 {
1540 let subject = subject.to_subject();
1541 let request = serde_json::to_vec(&payload)
1542 .map(Bytes::from)
1543 .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
1544
1545 debug!("JetStream request sent: {:?}", request);
1546
1547 let message = self
1548 .client
1549 .request(format!("{}.{}", self.prefix, subject.as_ref()), request)
1550 .await;
1551 let message = message?;
1552 debug!(
1553 "JetStream request response: {:?}",
1554 from_utf8(&message.payload)
1555 );
1556 let response = serde_json::from_slice(message.payload.as_ref())
1557 .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
1558
1559 Ok(response)
1560 }
1561
1562 pub async fn send_request<S: ToSubject>(
1571 &self,
1572 subject: S,
1573 request: crate::client::Request,
1574 ) -> Result<(), crate::PublishError> {
1575 let prefixed_subject = format!("{}.{}", self.prefix, subject.to_subject());
1576 let inbox = request.inbox.unwrap_or_else(|| self.client.new_inbox());
1577 let payload = request.payload.unwrap_or_default();
1578
1579 match request.headers {
1580 Some(headers) => {
1581 self.client
1582 .publish_with_reply_and_headers(prefixed_subject, inbox, headers, payload)
1583 .await
1584 }
1585 None => {
1586 self.client
1587 .publish_with_reply(prefixed_subject, inbox, payload)
1588 .await
1589 }
1590 }
1591 }
1592
1593 #[cfg(feature = "object-store")]
1612 #[cfg_attr(docsrs, doc(cfg(feature = "object-store")))]
1613 pub async fn create_object_store(
1614 &self,
1615 config: super::object_store::Config,
1616 ) -> Result<super::object_store::ObjectStore, CreateObjectStoreError> {
1617 if !super::object_store::is_valid_bucket_name(&config.bucket) {
1618 return Err(CreateObjectStoreError::new(
1619 CreateKeyValueErrorKind::InvalidStoreName,
1620 ));
1621 }
1622
1623 let bucket_name = config.bucket.clone();
1624 let stream_name = format!("OBJ_{bucket_name}");
1625 let chunk_subject = format!("$O.{bucket_name}.C.>");
1626 let meta_subject = format!("$O.{bucket_name}.M.>");
1627
1628 let stream = self
1629 .create_stream(super::stream::Config {
1630 name: stream_name,
1631 description: config.description.clone(),
1632 subjects: vec![chunk_subject, meta_subject],
1633 max_age: config.max_age,
1634 max_bytes: config.max_bytes,
1635 storage: config.storage,
1636 num_replicas: config.num_replicas,
1637 discard: DiscardPolicy::New,
1638 allow_rollup: true,
1639 allow_direct: true,
1640 #[cfg(feature = "server_2_10")]
1641 compression: if config.compression {
1642 Some(Compression::S2)
1643 } else {
1644 None
1645 },
1646 placement: config.placement,
1647 ..Default::default()
1648 })
1649 .await
1650 .map_err(|err| {
1651 CreateObjectStoreError::with_source(CreateKeyValueErrorKind::BucketCreate, err)
1652 })?;
1653
1654 Ok(ObjectStore {
1655 name: bucket_name,
1656 stream,
1657 })
1658 }
1659
1660 #[cfg(feature = "object-store")]
1674 #[cfg_attr(docsrs, doc(cfg(feature = "object-store")))]
1675 pub async fn get_object_store<T: AsRef<str>>(
1676 &self,
1677 bucket_name: T,
1678 ) -> Result<ObjectStore, ObjectStoreError> {
1679 let bucket_name = bucket_name.as_ref();
1680 if !is_valid_bucket_name(bucket_name) {
1681 return Err(ObjectStoreError::new(
1682 ObjectStoreErrorKind::InvalidBucketName,
1683 ));
1684 }
1685 let stream_name = format!("OBJ_{bucket_name}");
1686 let stream = self
1687 .get_stream(stream_name)
1688 .await
1689 .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?;
1690
1691 Ok(ObjectStore {
1692 name: bucket_name.to_string(),
1693 stream,
1694 })
1695 }
1696
1697 #[cfg(feature = "object-store")]
1711 #[cfg_attr(docsrs, doc(cfg(feature = "object-store")))]
1712 pub async fn delete_object_store<T: AsRef<str>>(
1713 &self,
1714 bucket_name: T,
1715 ) -> Result<(), DeleteObjectStore> {
1716 let stream_name = format!("OBJ_{}", bucket_name.as_ref());
1717 self.delete_stream(stream_name)
1718 .await
1719 .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?;
1720 Ok(())
1721 }
1722}
1723
1724impl crate::client::traits::Requester for Context {
1725 fn send_request<S: ToSubject>(
1726 &self,
1727 subject: S,
1728 request: crate::Request,
1729 ) -> impl Future<Output = Result<Message, crate::RequestError>> {
1730 self.client.send_request(subject, request)
1731 }
1732}
1733
1734impl crate::client::traits::Publisher for Context {
1735 fn publish_with_reply<S: ToSubject, R: ToSubject>(
1736 &self,
1737 subject: S,
1738 reply: R,
1739 payload: Bytes,
1740 ) -> impl Future<Output = Result<(), crate::PublishError>> {
1741 self.client.publish_with_reply(subject, reply, payload)
1742 }
1743
1744 fn publish_message(
1745 &self,
1746 msg: crate::message::OutboundMessage,
1747 ) -> impl Future<Output = Result<(), crate::PublishError>> {
1748 self.client.publish_message(msg)
1749 }
1750}
1751
1752impl traits::ClientProvider for Context {
1753 fn client(&self) -> crate::Client {
1754 self.client()
1755 }
1756}
1757
1758impl traits::Requester for Context {
1759 fn request<S, T, V>(
1760 &self,
1761 subject: S,
1762 payload: &T,
1763 ) -> impl Future<Output = Result<V, RequestError>>
1764 where
1765 S: ToSubject,
1766 T: ?Sized + Serialize,
1767 V: DeserializeOwned,
1768 {
1769 self.request(subject, payload)
1770 }
1771}
1772
1773impl traits::TimeoutProvider for Context {
1774 fn timeout(&self) -> Duration {
1775 self.timeout
1776 }
1777}
1778
1779impl traits::RequestSender for Context {
1780 fn send_request<S: ToSubject>(
1781 &self,
1782 subject: S,
1783 request: crate::client::Request,
1784 ) -> impl Future<Output = Result<(), crate::PublishError>> {
1785 self.send_request(subject, request)
1786 }
1787}
1788
1789impl traits::Publisher for Context {
1790 fn publish<S: ToSubject>(
1791 &self,
1792 subject: S,
1793 payload: Bytes,
1794 ) -> impl Future<Output = Result<PublishAckFuture, PublishError>> {
1795 self.publish(subject, payload)
1796 }
1797
1798 fn publish_message(
1799 &self,
1800 message: jetstream::message::OutboundMessage,
1801 ) -> impl Future<Output = Result<PublishAckFuture, PublishError>> {
1802 self.send_publish(
1803 message.subject,
1804 PublishMessage {
1805 payload: message.payload,
1806 headers: message.headers,
1807 },
1808 )
1809 }
1810}
1811
1812#[derive(Clone, Copy, Debug, PartialEq)]
1813pub enum PublishErrorKind {
1814 StreamNotFound,
1815 WrongLastMessageId,
1816 WrongLastSequence,
1817 TimedOut,
1818 BrokenPipe,
1819 MaxAckPending,
1820 Other,
1821}
1822
1823impl Display for PublishErrorKind {
1824 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1825 match self {
1826 Self::StreamNotFound => write!(f, "no stream found for given subject"),
1827 Self::TimedOut => write!(f, "timed out: didn't receive ack in time"),
1828 Self::Other => write!(f, "publish failed"),
1829 Self::BrokenPipe => write!(f, "broken pipe"),
1830 Self::WrongLastMessageId => write!(f, "wrong last message id"),
1831 Self::WrongLastSequence => write!(f, "wrong last sequence"),
1832 Self::MaxAckPending => write!(f, "max ack pending reached"),
1833 }
1834 }
1835}
1836
1837pub type PublishError = Error<PublishErrorKind>;
1838
1839#[derive(Debug)]
1840pub struct PublishAckFuture {
1841 timeout: Duration,
1842 subscription: Option<oneshot::Receiver<Message>>,
1843 permit: Option<OwnedSemaphorePermit>,
1844 tx: mpsc::Sender<(oneshot::Receiver<Message>, OwnedSemaphorePermit)>,
1845}
1846
1847impl Drop for PublishAckFuture {
1848 fn drop(&mut self) {
1849 if let (Some(sub), Some(permit)) = (self.subscription.take(), self.permit.take()) {
1850 if let Err(err) = self.tx.try_send((sub, permit)) {
1851 tracing::warn!("failed to pass future permit to the acker: {}", err);
1852 }
1853 }
1854 }
1855}
1856
1857impl PublishAckFuture {
1858 async fn next_with_timeout(mut self) -> Result<PublishAck, PublishError> {
1859 let next = tokio::time::timeout(self.timeout, self.subscription.take().unwrap())
1860 .await
1861 .map_err(|_| PublishError::new(PublishErrorKind::TimedOut))?;
1862 next.map_or_else(
1863 |_| Err(PublishError::new(PublishErrorKind::BrokenPipe)),
1864 |m| {
1865 if m.status == Some(StatusCode::NO_RESPONDERS) {
1866 return Err(PublishError::new(PublishErrorKind::StreamNotFound));
1867 }
1868 let response = serde_json::from_slice(m.payload.as_ref())
1869 .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err))?;
1870 match response {
1871 Response::Err { error } => match error.error_code() {
1872 ErrorCode::STREAM_WRONG_LAST_MESSAGE_ID => Err(PublishError::with_source(
1873 PublishErrorKind::WrongLastMessageId,
1874 error,
1875 )),
1876 ErrorCode::STREAM_WRONG_LAST_SEQUENCE => Err(PublishError::with_source(
1877 PublishErrorKind::WrongLastSequence,
1878 error,
1879 )),
1880 _ => Err(PublishError::with_source(PublishErrorKind::Other, error)),
1881 },
1882 Response::Ok(publish_ack) => Ok(publish_ack),
1883 }
1884 },
1885 )
1886 }
1887}
1888impl IntoFuture for PublishAckFuture {
1889 type Output = Result<PublishAck, PublishError>;
1890
1891 type IntoFuture = Pin<Box<dyn Future<Output = Result<PublishAck, PublishError>> + Send>>;
1892
1893 fn into_future(self) -> Self::IntoFuture {
1894 Box::pin(std::future::IntoFuture::into_future(
1895 self.next_with_timeout(),
1896 ))
1897 }
1898}
1899
1900#[derive(Deserialize, Debug)]
1901struct StreamPage {
1902 total: usize,
1903 streams: Option<Vec<String>>,
1904}
1905
1906#[derive(Deserialize, Debug)]
1907struct StreamInfoPage {
1908 total: usize,
1909 streams: Option<Vec<super::stream::Info>>,
1910}
1911
1912type PageRequest = BoxFuture<'static, Result<StreamPage, RequestError>>;
1913
1914pub struct StreamNames {
1915 context: Context,
1916 offset: usize,
1917 page_request: Option<PageRequest>,
1918 subject: Option<String>,
1919 streams: Vec<String>,
1920 done: bool,
1921}
1922
1923impl futures_util::Stream for StreamNames {
1924 type Item = Result<String, StreamsError>;
1925
1926 fn poll_next(
1927 mut self: Pin<&mut Self>,
1928 cx: &mut std::task::Context<'_>,
1929 ) -> std::task::Poll<Option<Self::Item>> {
1930 match self.page_request.as_mut() {
1931 Some(page) => match page.try_poll_unpin(cx) {
1932 std::task::Poll::Ready(page) => {
1933 self.page_request = None;
1934 let page = page
1935 .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?;
1936 if let Some(streams) = page.streams {
1937 self.offset += streams.len();
1938 self.streams = streams;
1939 if self.offset >= page.total {
1940 self.done = true;
1941 }
1942 match self.streams.pop() {
1943 Some(stream) => Poll::Ready(Some(Ok(stream))),
1944 None => Poll::Ready(None),
1945 }
1946 } else {
1947 Poll::Ready(None)
1948 }
1949 }
1950 std::task::Poll::Pending => std::task::Poll::Pending,
1951 },
1952 None => {
1953 if let Some(stream) = self.streams.pop() {
1954 Poll::Ready(Some(Ok(stream)))
1955 } else {
1956 if self.done {
1957 return Poll::Ready(None);
1958 }
1959 let context = self.context.clone();
1960 let offset = self.offset;
1961 let subject = self.subject.clone();
1962 self.page_request = Some(Box::pin(async move {
1963 match context
1964 .request(
1965 "STREAM.NAMES",
1966 &json!({
1967 "offset": offset,
1968 "subject": subject
1969 }),
1970 )
1971 .await?
1972 {
1973 Response::Err { error } => {
1974 Err(RequestError::with_source(RequestErrorKind::Other, error))
1975 }
1976 Response::Ok(page) => Ok(page),
1977 }
1978 }));
1979 self.poll_next(cx)
1980 }
1981 }
1982 }
1983 }
1984}
1985
1986type PageInfoRequest = BoxFuture<'static, Result<StreamInfoPage, RequestError>>;
1987
1988pub type StreamsErrorKind = RequestErrorKind;
1989pub type StreamsError = RequestError;
1990
1991pub struct Streams {
1992 context: Context,
1993 offset: usize,
1994 page_request: Option<PageInfoRequest>,
1995 streams: Vec<super::stream::Info>,
1996 done: bool,
1997}
1998
1999impl futures_util::Stream for Streams {
2000 type Item = Result<super::stream::Info, StreamsError>;
2001
2002 fn poll_next(
2003 mut self: Pin<&mut Self>,
2004 cx: &mut std::task::Context<'_>,
2005 ) -> std::task::Poll<Option<Self::Item>> {
2006 match self.page_request.as_mut() {
2007 Some(page) => match page.try_poll_unpin(cx) {
2008 std::task::Poll::Ready(page) => {
2009 self.page_request = None;
2010 let page = page
2011 .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?;
2012 if let Some(streams) = page.streams {
2013 self.offset += streams.len();
2014 self.streams = streams;
2015 if self.offset >= page.total {
2016 self.done = true;
2017 }
2018 match self.streams.pop() {
2019 Some(stream) => Poll::Ready(Some(Ok(stream))),
2020 None => Poll::Ready(None),
2021 }
2022 } else {
2023 Poll::Ready(None)
2024 }
2025 }
2026 std::task::Poll::Pending => std::task::Poll::Pending,
2027 },
2028 None => {
2029 if let Some(stream) = self.streams.pop() {
2030 Poll::Ready(Some(Ok(stream)))
2031 } else {
2032 if self.done {
2033 return Poll::Ready(None);
2034 }
2035 let context = self.context.clone();
2036 let offset = self.offset;
2037 self.page_request = Some(Box::pin(async move {
2038 match context
2039 .request(
2040 "STREAM.LIST",
2041 &json!({
2042 "offset": offset,
2043 }),
2044 )
2045 .await?
2046 {
2047 Response::Err { error } => {
2048 Err(RequestError::with_source(RequestErrorKind::Other, error))
2049 }
2050 Response::Ok(page) => Ok(page),
2051 }
2052 }));
2053 self.poll_next(cx)
2054 }
2055 }
2056 }
2057 }
2058}
2059
2060#[deprecated(
2063 note = "use jetstream::message::PublishMessage instead",
2064 since = "0.44.0"
2065)]
2066pub type Publish = super::message::PublishMessage;
2067
2068#[derive(Clone, Copy, Debug, PartialEq)]
2069pub enum RequestErrorKind {
2070 NoResponders,
2071 TimedOut,
2072 Other,
2073}
2074
2075impl Display for RequestErrorKind {
2076 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2077 match self {
2078 Self::TimedOut => write!(f, "timed out"),
2079 Self::Other => write!(f, "request failed"),
2080 Self::NoResponders => write!(f, "requested JetStream resource does not exist"),
2081 }
2082 }
2083}
2084
2085pub type RequestError = Error<RequestErrorKind>;
2086
2087impl From<crate::RequestError> for RequestError {
2088 fn from(error: crate::RequestError) -> Self {
2089 match error.kind() {
2090 crate::RequestErrorKind::TimedOut => {
2091 RequestError::with_source(RequestErrorKind::TimedOut, error)
2092 }
2093 crate::RequestErrorKind::NoResponders => {
2094 RequestError::new(RequestErrorKind::NoResponders)
2095 }
2096 crate::RequestErrorKind::Other => {
2097 RequestError::with_source(RequestErrorKind::Other, error)
2098 }
2099 }
2100 }
2101}
2102
2103impl From<super::errors::Error> for RequestError {
2104 fn from(err: super::errors::Error) -> Self {
2105 RequestError::with_source(RequestErrorKind::Other, err)
2106 }
2107}
2108
2109pub type ConsumerInfoError = Error<ConsumerInfoErrorKind>;
2110
2111#[derive(Clone, Debug, PartialEq)]
2112pub enum ConsumerInfoErrorKind {
2113 InvalidName,
2114 Offline,
2115 NotFound,
2116 StreamNotFound,
2117 Request,
2118 JetStream(super::errors::Error),
2119 TimedOut,
2120 NoResponders,
2121}
2122
2123impl Display for ConsumerInfoErrorKind {
2124 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2125 match self {
2126 Self::InvalidName => write!(f, "invalid consumer name"),
2127 Self::Offline => write!(f, "consumer is offline"),
2128 Self::NotFound => write!(f, "consumer not found"),
2129 Self::StreamNotFound => write!(f, "stream not found"),
2130 Self::Request => write!(f, "request error"),
2131 Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2132 Self::TimedOut => write!(f, "timed out"),
2133 Self::NoResponders => write!(f, "no responders"),
2134 }
2135 }
2136}
2137
2138impl From<super::errors::Error> for ConsumerInfoError {
2139 fn from(error: super::errors::Error) -> Self {
2140 match error.error_code() {
2141 ErrorCode::CONSUMER_NOT_FOUND => {
2142 ConsumerInfoError::new(ConsumerInfoErrorKind::NotFound)
2143 }
2144 ErrorCode::STREAM_NOT_FOUND => {
2145 ConsumerInfoError::new(ConsumerInfoErrorKind::StreamNotFound)
2146 }
2147 ErrorCode::CONSUMER_OFFLINE => ConsumerInfoError::new(ConsumerInfoErrorKind::Offline),
2148 _ => ConsumerInfoError::new(ConsumerInfoErrorKind::JetStream(error)),
2149 }
2150 }
2151}
2152
2153impl From<RequestError> for ConsumerInfoError {
2154 fn from(error: RequestError) -> Self {
2155 match error.kind() {
2156 RequestErrorKind::TimedOut => ConsumerInfoError::new(ConsumerInfoErrorKind::TimedOut),
2157 RequestErrorKind::Other => {
2158 ConsumerInfoError::with_source(ConsumerInfoErrorKind::Request, error)
2159 }
2160 RequestErrorKind::NoResponders => {
2161 ConsumerInfoError::new(ConsumerInfoErrorKind::NoResponders)
2162 }
2163 }
2164 }
2165}
2166
2167#[derive(Clone, Debug, PartialEq)]
2168pub enum CreateStreamErrorKind {
2169 EmptyStreamName,
2170 InvalidStreamName,
2171 DomainAndExternalSet,
2172 JetStreamUnavailable,
2173 JetStream(super::errors::Error),
2174 TimedOut,
2175 Response,
2176 NotFound,
2177 ResponseParse,
2178}
2179
2180impl Display for CreateStreamErrorKind {
2181 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2182 match self {
2183 Self::EmptyStreamName => write!(f, "stream name cannot be empty"),
2184 Self::InvalidStreamName => write!(f, "stream name cannot contain `.`, `_`"),
2185 Self::DomainAndExternalSet => write!(f, "domain and external are both set"),
2186 Self::NotFound => write!(f, "stream not found"),
2187 Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2188 Self::TimedOut => write!(f, "jetstream request timed out"),
2189 Self::JetStreamUnavailable => write!(f, "jetstream unavailable"),
2190 Self::ResponseParse => write!(f, "failed to parse server response"),
2191 Self::Response => write!(f, "response error"),
2192 }
2193 }
2194}
2195
2196pub type CreateStreamError = Error<CreateStreamErrorKind>;
2197
2198impl From<super::errors::Error> for CreateStreamError {
2199 fn from(error: super::errors::Error) -> Self {
2200 match error.kind() {
2201 super::errors::ErrorCode::STREAM_NOT_FOUND => {
2202 CreateStreamError::new(CreateStreamErrorKind::NotFound)
2203 }
2204 _ => CreateStreamError::new(CreateStreamErrorKind::JetStream(error)),
2205 }
2206 }
2207}
2208
2209impl From<RequestError> for CreateStreamError {
2210 fn from(error: RequestError) -> Self {
2211 match error.kind() {
2212 RequestErrorKind::NoResponders => {
2213 CreateStreamError::new(CreateStreamErrorKind::JetStreamUnavailable)
2214 }
2215 RequestErrorKind::TimedOut => CreateStreamError::new(CreateStreamErrorKind::TimedOut),
2216 RequestErrorKind::Other => {
2217 CreateStreamError::with_source(CreateStreamErrorKind::Response, error)
2218 }
2219 }
2220 }
2221}
2222
2223#[derive(Clone, Debug, PartialEq)]
2224pub enum GetStreamErrorKind {
2225 EmptyName,
2226 Request,
2227 InvalidStreamName,
2228 JetStream(super::errors::Error),
2229}
2230
2231impl Display for GetStreamErrorKind {
2232 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2233 match self {
2234 Self::EmptyName => write!(f, "empty name cannot be empty"),
2235 Self::Request => write!(f, "request error"),
2236 Self::InvalidStreamName => write!(f, "invalid stream name"),
2237 Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2238 }
2239 }
2240}
2241
2242#[derive(Clone, Debug, PartialEq)]
2243pub enum GetStreamByNameErrorKind {
2244 Request,
2245 NotFound,
2246 InvalidSubject,
2247 JetStream(super::errors::Error),
2248}
2249
2250impl Display for GetStreamByNameErrorKind {
2251 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2252 match self {
2253 Self::Request => write!(f, "request error"),
2254 Self::NotFound => write!(f, "stream not found"),
2255 Self::InvalidSubject => write!(f, "invalid subject"),
2256 Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2257 }
2258 }
2259}
2260
2261pub type GetStreamError = Error<GetStreamErrorKind>;
2262pub type GetStreamByNameError = Error<GetStreamByNameErrorKind>;
2263
2264pub type UpdateStreamError = CreateStreamError;
2265pub type UpdateStreamErrorKind = CreateStreamErrorKind;
2266pub type DeleteStreamError = GetStreamError;
2267pub type DeleteStreamErrorKind = GetStreamErrorKind;
2268
2269#[cfg(feature = "kv")]
2270#[derive(Clone, Copy, Debug, PartialEq)]
2271pub enum KeyValueErrorKind {
2272 InvalidStoreName,
2273 GetBucket,
2274 JetStream,
2275}
2276
2277#[cfg(feature = "kv")]
2278impl Display for KeyValueErrorKind {
2279 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2280 match self {
2281 Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
2282 Self::GetBucket => write!(f, "failed to get the bucket"),
2283 Self::JetStream => write!(f, "JetStream error"),
2284 }
2285 }
2286}
2287
2288#[cfg(feature = "kv")]
2289pub type KeyValueError = Error<KeyValueErrorKind>;
2290
2291#[cfg(any(feature = "kv", feature = "object-store"))]
2292#[derive(Clone, Copy, Debug, PartialEq)]
2293pub enum CreateKeyValueErrorKind {
2294 InvalidStoreName,
2295 TooLongHistory,
2296 JetStream,
2297 BucketCreate,
2298 TimedOut,
2299 LimitMarkersNotSupported,
2300}
2301
2302#[cfg(any(feature = "kv", feature = "object-store"))]
2303impl Display for CreateKeyValueErrorKind {
2304 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2305 match self {
2306 Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
2307 Self::TooLongHistory => write!(f, "too long history"),
2308 Self::JetStream => write!(f, "JetStream error"),
2309 Self::BucketCreate => write!(f, "bucket creation failed"),
2310 Self::TimedOut => write!(f, "timed out"),
2311 Self::LimitMarkersNotSupported => {
2312 write!(f, "limit markers not supported")
2313 }
2314 }
2315 }
2316}
2317
2318#[cfg(feature = "kv")]
2319#[derive(Clone, Copy, Debug, PartialEq)]
2320pub enum UpdateKeyValueErrorKind {
2321 InvalidStoreName,
2322 TooLongHistory,
2323 JetStream,
2324 BucketUpdate,
2325 TimedOut,
2326 LimitMarkersNotSupported,
2327 NotFound,
2328}
2329
2330#[cfg(feature = "kv")]
2331impl Display for UpdateKeyValueErrorKind {
2332 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2333 match self {
2334 Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
2335 Self::TooLongHistory => write!(f, "too long history"),
2336 Self::JetStream => write!(f, "JetStream error"),
2337 Self::BucketUpdate => write!(f, "bucket creation failed"),
2338 Self::TimedOut => write!(f, "timed out"),
2339 Self::LimitMarkersNotSupported => {
2340 write!(f, "limit markers not supported")
2341 }
2342 Self::NotFound => write!(f, "bucket does not exist"),
2343 }
2344 }
2345}
2346#[cfg(feature = "kv")]
2347pub type CreateKeyValueError = Error<CreateKeyValueErrorKind>;
2348#[cfg(feature = "kv")]
2349pub type UpdateKeyValueError = Error<UpdateKeyValueErrorKind>;
2350
2351#[cfg(feature = "object-store")]
2352pub type CreateObjectStoreError = Error<CreateKeyValueErrorKind>;
2353#[cfg(feature = "object-store")]
2354pub type CreateObjectStoreErrorKind = CreateKeyValueErrorKind;
2355
2356#[cfg(feature = "object-store")]
2357#[derive(Clone, Copy, Debug, PartialEq)]
2358pub enum ObjectStoreErrorKind {
2359 InvalidBucketName,
2360 GetStore,
2361}
2362
2363#[cfg(feature = "object-store")]
2364impl Display for ObjectStoreErrorKind {
2365 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2366 match self {
2367 Self::InvalidBucketName => write!(f, "invalid Object Store bucket name"),
2368 Self::GetStore => write!(f, "failed to get Object Store"),
2369 }
2370 }
2371}
2372
2373#[cfg(feature = "object-store")]
2374pub type ObjectStoreError = Error<ObjectStoreErrorKind>;
2375
2376#[cfg(feature = "object-store")]
2377pub type DeleteObjectStore = ObjectStoreError;
2378#[cfg(feature = "object-store")]
2379pub type DeleteObjectStoreKind = ObjectStoreErrorKind;
2380
2381#[derive(Clone, Debug, PartialEq)]
2382pub enum AccountErrorKind {
2383 TimedOut,
2384 JetStream(super::errors::Error),
2385 JetStreamUnavailable,
2386 Other,
2387}
2388
2389impl Display for AccountErrorKind {
2390 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2391 match self {
2392 Self::TimedOut => write!(f, "timed out"),
2393 Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2394 Self::Other => write!(f, "error"),
2395 Self::JetStreamUnavailable => write!(f, "JetStream unavailable"),
2396 }
2397 }
2398}
2399
2400pub type AccountError = Error<AccountErrorKind>;
2401
2402impl From<RequestError> for AccountError {
2403 fn from(err: RequestError) -> Self {
2404 match err.kind {
2405 RequestErrorKind::NoResponders => {
2406 AccountError::with_source(AccountErrorKind::JetStreamUnavailable, err)
2407 }
2408 RequestErrorKind::TimedOut => AccountError::new(AccountErrorKind::TimedOut),
2409 RequestErrorKind::Other => AccountError::with_source(AccountErrorKind::Other, err),
2410 }
2411 }
2412}
2413
2414#[derive(Clone, Debug, Serialize)]
2415enum ConsumerAction {
2416 #[serde(rename = "")]
2417 CreateOrUpdate,
2418 #[serde(rename = "create")]
2419 #[cfg(feature = "server_2_10")]
2420 Create,
2421 #[serde(rename = "update")]
2422 #[cfg(feature = "server_2_10")]
2423 Update,
2424}
2425
2426#[cfg(feature = "kv")]
2427fn map_to_kv(stream: super::stream::Stream, prefix: String, bucket: String) -> Store {
2429 let mut store = Store {
2430 prefix: format!("$KV.{}.", bucket.as_str()),
2431 name: bucket,
2432 stream: stream.clone(),
2433 stream_name: stream.info.config.name.clone(),
2434 put_prefix: None,
2435 use_jetstream_prefix: prefix != "$JS.API",
2436 };
2437 if let Some(ref mirror) = stream.info.config.mirror {
2438 let bucket = mirror.name.trim_start_matches("KV_");
2439 if let Some(ref external) = mirror.external {
2440 if !external.api_prefix.is_empty() {
2441 store.use_jetstream_prefix = false;
2442 store.prefix = format!("$KV.{bucket}.");
2443 store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
2444 } else {
2445 store.put_prefix = Some(format!("$KV.{bucket}."));
2446 }
2447 }
2448 };
2449 store
2450}
2451
2452#[cfg(feature = "kv")]
2453enum KvToStreamConfigError {
2454 TooLongHistory,
2455 #[allow(dead_code)]
2456 LimitMarkersNotSupported,
2457}
2458
2459#[cfg(feature = "kv")]
2460impl From<KvToStreamConfigError> for CreateKeyValueError {
2461 fn from(err: KvToStreamConfigError) -> Self {
2462 match err {
2463 KvToStreamConfigError::TooLongHistory => {
2464 CreateKeyValueError::new(CreateKeyValueErrorKind::TooLongHistory)
2465 }
2466 KvToStreamConfigError::LimitMarkersNotSupported => {
2467 CreateKeyValueError::new(CreateKeyValueErrorKind::LimitMarkersNotSupported)
2468 }
2469 }
2470 }
2471}
2472
2473#[cfg(feature = "kv")]
2474impl From<KvToStreamConfigError> for UpdateKeyValueError {
2475 fn from(err: KvToStreamConfigError) -> Self {
2476 match err {
2477 KvToStreamConfigError::TooLongHistory => {
2478 UpdateKeyValueError::new(UpdateKeyValueErrorKind::TooLongHistory)
2479 }
2480 KvToStreamConfigError::LimitMarkersNotSupported => {
2481 UpdateKeyValueError::new(UpdateKeyValueErrorKind::LimitMarkersNotSupported)
2482 }
2483 }
2484 }
2485}
2486
2487#[cfg(feature = "kv")]
2488fn kv_to_stream_config(
2490 config: crate::jetstream::kv::Config,
2491 _account: Account,
2492) -> Result<super::stream::Config, KvToStreamConfigError> {
2493 let history = if config.history > 0 {
2494 if config.history > MAX_HISTORY {
2495 return Err(KvToStreamConfigError::TooLongHistory);
2496 }
2497 config.history
2498 } else {
2499 1
2500 };
2501
2502 let num_replicas = if config.num_replicas == 0 {
2503 1
2504 } else {
2505 config.num_replicas
2506 };
2507
2508 #[cfg(feature = "server_2_11")]
2509 let (mut allow_message_ttl, mut subject_delete_marker_ttl) = (false, None);
2510
2511 #[cfg(feature = "server_2_11")]
2512 if let Some(duration) = config.limit_markers {
2513 if _account.requests.level < 1 {
2514 return Err(KvToStreamConfigError::LimitMarkersNotSupported);
2515 }
2516 allow_message_ttl = true;
2517 subject_delete_marker_ttl = Some(duration);
2518 }
2519
2520 let mut mirror = config.mirror.clone();
2521 let mut sources = config.sources.clone();
2522 let mut mirror_direct = config.mirror_direct;
2523
2524 let mut subjects = Vec::new();
2525 if let Some(ref mut mirror) = mirror {
2526 if !mirror.name.starts_with("KV_") {
2527 mirror.name = format!("KV_{}", mirror.name);
2528 }
2529 mirror_direct = true;
2530 } else if let Some(ref mut sources) = sources {
2531 for source in sources {
2532 if !source.name.starts_with("KV_") {
2533 source.name = format!("KV_{}", source.name);
2534 }
2535 }
2536 } else {
2537 subjects = vec![format!("$KV.{}.>", config.bucket)];
2538 }
2539
2540 Ok(Config {
2541 name: format!("KV_{}", config.bucket),
2542 description: Some(config.description),
2543 subjects,
2544 max_messages_per_subject: history,
2545 max_bytes: config.max_bytes,
2546 max_age: config.max_age,
2547 max_message_size: config.max_value_size,
2548 storage: config.storage,
2549 republish: config.republish,
2550 allow_rollup: true,
2551 deny_delete: true,
2552 deny_purge: false,
2553 allow_direct: true,
2554 sources,
2555 mirror,
2556 num_replicas,
2557 discard: DiscardPolicy::New,
2558 mirror_direct,
2559 #[cfg(feature = "server_2_10")]
2560 compression: if config.compression {
2561 Some(Compression::S2)
2562 } else {
2563 None
2564 },
2565 placement: config.placement,
2566 #[cfg(feature = "server_2_11")]
2567 allow_message_ttl,
2568 #[cfg(feature = "server_2_11")]
2569 subject_delete_marker_ttl,
2570 ..Default::default()
2571 })
2572}