1use crate::error::Error;
17use crate::jetstream::account::Account;
18use crate::jetstream::message::PublishMessage;
19use crate::jetstream::publish::PublishAck;
20use crate::jetstream::response::Response;
21use crate::subject::ToSubject;
22use crate::{is_valid_subject, jetstream, Client, Command, Message, StatusCode};
23use bytes::Bytes;
24use futures_util::future::BoxFuture;
25use futures_util::{Future, StreamExt, TryFutureExt};
26use serde::de::DeserializeOwned;
27use serde::{Deserialize, Serialize};
28use serde_json::{self, json};
29use std::borrow::Borrow;
30use std::fmt::Debug;
31use std::fmt::Display;
32use std::future::IntoFuture;
33use std::pin::Pin;
34use std::str::from_utf8;
35use std::sync::Arc;
36use std::task::Poll;
37use tokio::sync::{mpsc, oneshot, OwnedSemaphorePermit, TryAcquireError};
38use tokio::time::Duration;
39use tokio_stream::wrappers::ReceiverStream;
40use tracing::debug;
41
42use super::consumer::{self, Consumer, FromConsumer, IntoConsumerConfig};
43use super::errors::ErrorCode;
44use super::is_valid_name;
45#[cfg(feature = "kv")]
46use super::kv::{Store, MAX_HISTORY};
47#[cfg(feature = "object-store")]
48use super::object_store::{is_valid_bucket_name, ObjectStore};
49#[cfg(any(feature = "kv", feature = "object-store"))]
50use super::stream::DiscardPolicy;
51#[cfg(feature = "server_2_10")]
52use super::stream::{Compression, ConsumerCreateStrictError, ConsumerUpdateError};
53use super::stream::{
54 Config, ConsumerError, ConsumerErrorKind, DeleteStatus, External, Info, Stream,
55};
56
57pub mod traits {
58 use std::{future::Future, time::Duration};
59
60 use bytes::Bytes;
61 use serde::{de::DeserializeOwned, Serialize};
62
63 use crate::{jetstream::message, subject::ToSubject, Request};
64
65 use super::RequestError;
66
67 pub trait Requester {
68 fn request<S, T, V>(
69 &self,
70 subject: S,
71 payload: &T,
72 ) -> impl Future<Output = Result<V, RequestError>>
73 where
74 S: ToSubject,
75 T: ?Sized + Serialize,
76 V: DeserializeOwned;
77 }
78
79 pub trait RequestSender {
80 fn send_request<S: ToSubject>(
81 &self,
82 subject: S,
83 request: Request,
84 ) -> impl Future<Output = Result<(), crate::PublishError>>;
85 }
86
87 pub trait Publisher {
88 fn publish<S>(
89 &self,
90 subject: S,
91 payload: Bytes,
92 ) -> impl Future<Output = Result<super::PublishAckFuture, super::PublishError>>
93 where
94 S: ToSubject;
95
96 fn publish_message(
97 &self,
98 message: message::OutboundMessage,
99 ) -> impl Future<Output = Result<super::PublishAckFuture, super::PublishError>>;
100 }
101
102 pub trait ClientProvider {
103 fn client(&self) -> crate::Client;
104 }
105
106 pub trait TimeoutProvider {
107 fn timeout(&self) -> Duration;
108 }
109}
110
111#[derive(Debug, Clone)]
113pub struct Context {
114 pub(crate) client: Client,
115 pub(crate) prefix: String,
116 pub(crate) timeout: Duration,
117 pub(crate) max_ack_semaphore: Arc<tokio::sync::Semaphore>,
118 pub(crate) ack_sender:
119 tokio::sync::mpsc::Sender<(oneshot::Receiver<Message>, OwnedSemaphorePermit)>,
120 pub(crate) backpressure_on_inflight: bool,
121 pub(crate) semaphore_capacity: usize,
122}
123
124fn spawn_acker(
125 rx: ReceiverStream<(oneshot::Receiver<Message>, OwnedSemaphorePermit)>,
126 ack_timeout: Duration,
127 concurrency: Option<usize>,
128) -> tokio::task::JoinHandle<()> {
129 tokio::spawn(async move {
130 rx.for_each_concurrent(concurrency, |(subscription, permit)| async move {
131 tokio::time::timeout(ack_timeout, subscription).await.ok();
132 drop(permit);
133 })
134 .await;
135 debug!("Acker task exited");
136 })
137}
138
139use std::marker::PhantomData;
140
141#[derive(Debug, Default)]
142pub struct Yes;
143#[derive(Debug, Default)]
144pub struct No;
145
146pub trait ToAssign: Debug {}
147
148impl ToAssign for Yes {}
149impl ToAssign for No {}
150
151pub struct ContextBuilder<PREFIX: ToAssign> {
170 prefix: String,
171 timeout: Duration,
172 semaphore_capacity: usize,
173 ack_timeout: Duration,
174 backpressure_on_inflight: bool,
175 concurrency_limit: Option<usize>,
176 _phantom: PhantomData<PREFIX>,
177}
178
179impl Default for ContextBuilder<Yes> {
180 fn default() -> Self {
181 ContextBuilder {
182 prefix: "$JS.API".to_string(),
183 timeout: Duration::from_secs(5),
184 semaphore_capacity: 5_000,
185 ack_timeout: Duration::from_secs(30),
186 backpressure_on_inflight: true,
187 concurrency_limit: None,
188 _phantom: PhantomData {},
189 }
190 }
191}
192
193impl ContextBuilder<Yes> {
194 pub fn new() -> ContextBuilder<Yes> {
196 ContextBuilder::default()
197 }
198}
199
200impl ContextBuilder<Yes> {
201 pub fn api_prefix<T: Into<String>>(self, prefix: T) -> ContextBuilder<No> {
203 ContextBuilder {
204 prefix: prefix.into(),
205 timeout: self.timeout,
206 semaphore_capacity: self.semaphore_capacity,
207 ack_timeout: self.ack_timeout,
208 backpressure_on_inflight: self.backpressure_on_inflight,
209 concurrency_limit: self.concurrency_limit,
210 _phantom: PhantomData,
211 }
212 }
213
214 pub fn domain<T: Into<String>>(self, domain: T) -> ContextBuilder<No> {
217 ContextBuilder {
218 prefix: format!("$JS.{}.API", domain.into()),
219 timeout: self.timeout,
220 semaphore_capacity: self.semaphore_capacity,
221 ack_timeout: self.ack_timeout,
222 backpressure_on_inflight: self.backpressure_on_inflight,
223 concurrency_limit: self.concurrency_limit,
224 _phantom: PhantomData,
225 }
226 }
227}
228
229impl<PREFIX> ContextBuilder<PREFIX>
230where
231 PREFIX: ToAssign,
232{
233 pub fn timeout(self, timeout: Duration) -> ContextBuilder<Yes>
235 where
236 Yes: ToAssign,
237 {
238 ContextBuilder {
239 prefix: self.prefix,
240 timeout,
241 semaphore_capacity: self.semaphore_capacity,
242 ack_timeout: self.ack_timeout,
243 backpressure_on_inflight: self.backpressure_on_inflight,
244 concurrency_limit: self.concurrency_limit,
245 _phantom: PhantomData,
246 }
247 }
248
249 pub fn ack_timeout(self, ack_timeout: Duration) -> ContextBuilder<Yes>
252 where
253 Yes: ToAssign,
254 {
255 ContextBuilder {
256 prefix: self.prefix,
257 timeout: self.timeout,
258 semaphore_capacity: self.semaphore_capacity,
259 ack_timeout,
260 backpressure_on_inflight: self.backpressure_on_inflight,
261 concurrency_limit: self.concurrency_limit,
262 _phantom: PhantomData,
263 }
264 }
265
266 pub fn max_ack_inflight(self, capacity: usize) -> ContextBuilder<Yes>
269 where
270 Yes: ToAssign,
271 {
272 ContextBuilder {
273 prefix: self.prefix,
274 timeout: self.timeout,
275 semaphore_capacity: capacity,
276 ack_timeout: self.ack_timeout,
277 backpressure_on_inflight: self.backpressure_on_inflight,
278 concurrency_limit: self.concurrency_limit,
279 _phantom: PhantomData,
280 }
281 }
282
283 pub fn backpressure_on_inflight(self, enabled: bool) -> ContextBuilder<Yes>
288 where
289 Yes: ToAssign,
290 {
291 ContextBuilder {
292 prefix: self.prefix,
293 timeout: self.timeout,
294 semaphore_capacity: self.semaphore_capacity,
295 ack_timeout: self.ack_timeout,
296 backpressure_on_inflight: enabled,
297 concurrency_limit: self.concurrency_limit,
298 _phantom: PhantomData,
299 }
300 }
301
302 pub fn concurrency_limit(self, limit: Option<usize>) -> ContextBuilder<Yes>
305 where
306 Yes: ToAssign,
307 {
308 ContextBuilder {
309 prefix: self.prefix,
310 timeout: self.timeout,
311 semaphore_capacity: self.semaphore_capacity,
312 ack_timeout: self.ack_timeout,
313 backpressure_on_inflight: self.backpressure_on_inflight,
314 concurrency_limit: limit,
315 _phantom: PhantomData,
316 }
317 }
318
319 pub fn build(self, client: Client) -> Context {
321 let (tx, rx) = tokio::sync::mpsc::channel::<(
322 oneshot::Receiver<Message>,
323 OwnedSemaphorePermit,
324 )>(self.semaphore_capacity);
325 let stream = ReceiverStream::new(rx);
326 spawn_acker(stream, self.ack_timeout, self.concurrency_limit);
327 Context {
328 client,
329 prefix: self.prefix,
330 timeout: self.timeout,
331 max_ack_semaphore: Arc::new(tokio::sync::Semaphore::new(self.semaphore_capacity)),
332 ack_sender: tx,
333 backpressure_on_inflight: self.backpressure_on_inflight,
334 semaphore_capacity: self.semaphore_capacity,
335 }
336 }
337}
338
339impl Context {
340 pub(crate) fn new(client: Client) -> Context {
341 ContextBuilder::default().build(client)
342 }
343
344 pub fn set_timeout(&mut self, timeout: Duration) {
346 self.timeout = timeout
347 }
348
349 pub fn client(&self) -> Client {
351 self.client.clone()
352 }
353
354 pub async fn wait_for_acks(&self) {
360 self.max_ack_semaphore
361 .acquire_many(self.semaphore_capacity as u32)
362 .await
363 .ok();
364 }
365
366 pub(crate) fn with_prefix<T: ToString>(client: Client, prefix: T) -> Context {
368 ContextBuilder::new()
369 .api_prefix(prefix.to_string())
370 .build(client)
371 }
372
373 pub(crate) fn with_domain<T: AsRef<str>>(client: Client, domain: T) -> Context {
375 ContextBuilder::new().domain(domain.as_ref()).build(client)
376 }
377
378 pub async fn publish<S: ToSubject>(
419 &self,
420 subject: S,
421 payload: Bytes,
422 ) -> Result<PublishAckFuture, PublishError> {
423 self.send_publish(subject, PublishMessage::build().payload(payload))
424 .await
425 }
426
427 pub async fn publish_with_headers<S: ToSubject>(
449 &self,
450 subject: S,
451 headers: crate::header::HeaderMap,
452 payload: Bytes,
453 ) -> Result<PublishAckFuture, PublishError> {
454 self.send_publish(
455 subject,
456 PublishMessage::build().payload(payload).headers(headers),
457 )
458 .await
459 }
460
461 pub async fn send_publish<S: ToSubject>(
484 &self,
485 subject: S,
486 publish: PublishMessage,
487 ) -> Result<PublishAckFuture, PublishError> {
488 let subject = self
489 .client
490 .maybe_validate_publish_subject(subject)
491 .map_err(|e| PublishError::with_source(PublishErrorKind::Other, e))?;
492
493 self.client
496 .check_payload_size(publish.headers.as_ref(), publish.payload.len())
497 .map_err(|sizes| {
498 PublishError::with_source(
499 PublishErrorKind::MaxPayloadExceeded,
500 crate::client::max_payload_message(sizes),
501 )
502 })?;
503
504 let permit = if self.backpressure_on_inflight {
505 self.max_ack_semaphore
507 .clone()
508 .acquire_owned()
509 .await
510 .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err))?
511 } else {
512 self.max_ack_semaphore
514 .clone()
515 .try_acquire_owned()
516 .map_err(|err| match err {
517 TryAcquireError::NoPermits => {
518 PublishError::new(PublishErrorKind::MaxAckPending)
519 }
520 _ => PublishError::with_source(PublishErrorKind::Other, err),
521 })?
522 };
523
524 let (sender, receiver) = oneshot::channel();
525
526 let respond = self.client.new_inbox().into();
527
528 let send_fut = self
529 .client
530 .sender
531 .send(Command::Request {
532 subject,
533 payload: publish.payload,
534 respond,
535 headers: publish.headers,
536 sender,
537 })
538 .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err));
539
540 tokio::time::timeout(self.timeout, send_fut)
541 .map_err(|_elapsed| PublishError::new(PublishErrorKind::TimedOut))
542 .await??;
543
544 Ok(PublishAckFuture {
545 timeout: self.timeout,
546 subscription: Some(receiver),
547 permit: Some(permit),
548 tx: self.ack_sender.clone(),
549 })
550 }
551
552 pub async fn query_account(&self) -> Result<Account, AccountError> {
554 let response: Response<Account> = self.request("INFO", b"").await?;
555
556 match response {
557 Response::Err { error } => Err(AccountError::new(AccountErrorKind::JetStream(error))),
558 Response::Ok(account) => Ok(account),
559 }
560 }
561
562 pub async fn create_stream<S>(
587 &self,
588 stream_config: S,
589 ) -> Result<Stream<Info>, CreateStreamError>
590 where
591 Config: From<S>,
592 {
593 let mut config: Config = stream_config.into();
594 if config.name.is_empty() {
595 return Err(CreateStreamError::new(
596 CreateStreamErrorKind::EmptyStreamName,
597 ));
598 }
599 if !is_valid_name(config.name.as_str()) {
600 return Err(CreateStreamError::new(
601 CreateStreamErrorKind::InvalidStreamName,
602 ));
603 }
604 if let Some(ref mut mirror) = config.mirror {
605 if let Some(ref mut domain) = mirror.domain {
606 if mirror.external.is_some() {
607 return Err(CreateStreamError::new(
608 CreateStreamErrorKind::DomainAndExternalSet,
609 ));
610 }
611 mirror.external = Some(External {
612 api_prefix: format!("$JS.{domain}.API"),
613 delivery_prefix: None,
614 })
615 }
616 }
617
618 if let Some(ref mut sources) = config.sources {
619 for source in sources {
620 if let Some(ref mut domain) = source.domain {
621 if source.external.is_some() {
622 return Err(CreateStreamError::new(
623 CreateStreamErrorKind::DomainAndExternalSet,
624 ));
625 }
626 source.external = Some(External {
627 api_prefix: format!("$JS.{domain}.API"),
628 delivery_prefix: None,
629 })
630 }
631 }
632 }
633 let subject = format!("STREAM.CREATE.{}", config.name);
634 let response: Response<Info> = self.request(subject, &config).await?;
635
636 match response {
637 Response::Err { error } => Err(error.into()),
638 Response::Ok(info) => Ok(Stream {
639 context: self.clone(),
640 info,
641 name: config.name,
642 }),
643 }
644 }
645
646 pub async fn get_stream_no_info<T: AsRef<str>>(
666 &self,
667 stream: T,
668 ) -> Result<Stream<()>, GetStreamError> {
669 let stream = stream.as_ref();
670 if stream.is_empty() {
671 return Err(GetStreamError::new(GetStreamErrorKind::EmptyName));
672 }
673
674 if !is_valid_name(stream) {
675 return Err(GetStreamError::new(GetStreamErrorKind::InvalidStreamName));
676 }
677
678 Ok(Stream {
679 context: self.clone(),
680 info: (),
681 name: stream.to_string(),
682 })
683 }
684
685 pub async fn get_stream<T: AsRef<str>>(&self, stream: T) -> Result<Stream, GetStreamError> {
701 let stream = stream.as_ref();
702 if stream.is_empty() {
703 return Err(GetStreamError::new(GetStreamErrorKind::EmptyName));
704 }
705
706 if !is_valid_name(stream) {
707 return Err(GetStreamError::new(GetStreamErrorKind::InvalidStreamName));
708 }
709
710 let subject = format!("STREAM.INFO.{stream}");
711 let request: Response<Info> = self
712 .request(subject, &())
713 .await
714 .map_err(|err| GetStreamError::with_source(GetStreamErrorKind::Request, err))?;
715 match request {
716 Response::Err { error } => {
717 Err(GetStreamError::new(GetStreamErrorKind::JetStream(error)))
718 }
719 Response::Ok(info) => Ok(Stream {
720 context: self.clone(),
721 info,
722 name: stream.to_string(),
723 }),
724 }
725 }
726
727 pub async fn get_or_create_stream<S>(
751 &self,
752 stream_config: S,
753 ) -> Result<Stream, CreateStreamError>
754 where
755 S: Into<Config>,
756 {
757 let config: Config = stream_config.into();
758
759 if config.name.is_empty() {
760 return Err(CreateStreamError::new(
761 CreateStreamErrorKind::EmptyStreamName,
762 ));
763 }
764
765 if !is_valid_name(config.name.as_str()) {
766 return Err(CreateStreamError::new(
767 CreateStreamErrorKind::InvalidStreamName,
768 ));
769 }
770 let subject = format!("STREAM.INFO.{}", config.name);
771
772 let request: Response<Info> = self.request(subject, &()).await?;
773 match request {
774 Response::Err { error } if error.code() == 404 => self.create_stream(&config).await,
775 Response::Err { error } => Err(error.into()),
776 Response::Ok(info) => Ok(Stream {
777 context: self.clone(),
778 info,
779 name: config.name,
780 }),
781 }
782 }
783
784 pub async fn delete_stream<T: AsRef<str>>(
800 &self,
801 stream: T,
802 ) -> Result<DeleteStatus, DeleteStreamError> {
803 let stream = stream.as_ref();
804 if stream.is_empty() {
805 return Err(DeleteStreamError::new(DeleteStreamErrorKind::EmptyName));
806 }
807
808 if !is_valid_name(stream) {
809 return Err(DeleteStreamError::new(
810 DeleteStreamErrorKind::InvalidStreamName,
811 ));
812 }
813
814 let subject = format!("STREAM.DELETE.{stream}");
815 match self
816 .request(subject, &json!({}))
817 .await
818 .map_err(|err| DeleteStreamError::with_source(DeleteStreamErrorKind::Request, err))?
819 {
820 Response::Err { error } => Err(DeleteStreamError::new(
821 DeleteStreamErrorKind::JetStream(error),
822 )),
823 Response::Ok(delete_response) => Ok(delete_response),
824 }
825 }
826
827 pub async fn update_stream<S>(&self, config: S) -> Result<Info, UpdateStreamError>
852 where
853 S: Borrow<Config>,
854 {
855 let config = config.borrow();
856
857 if config.name.is_empty() {
858 return Err(CreateStreamError::new(
859 CreateStreamErrorKind::EmptyStreamName,
860 ));
861 }
862
863 if !is_valid_name(config.name.as_str()) {
864 return Err(CreateStreamError::new(
865 CreateStreamErrorKind::InvalidStreamName,
866 ));
867 }
868
869 let subject = format!("STREAM.UPDATE.{}", config.name);
870 match self.request(subject, config).await? {
871 Response::Err { error } => Err(error.into()),
872 Response::Ok(info) => Ok(info),
873 }
874 }
875
876 pub async fn create_or_update_stream(&self, config: Config) -> Result<Info, CreateStreamError> {
900 match self.update_stream(config.clone()).await {
901 Ok(stream) => Ok(stream),
902 Err(err) => match err.kind() {
903 CreateStreamErrorKind::NotFound => {
904 let stream = self
905 .create_stream(config)
906 .await
907 .map_err(|err| CreateStreamError::with_source(err.kind(), err))?;
908 Ok(stream.info)
909 }
910 _ => Err(err),
911 },
912 }
913 }
914
915 pub async fn stream_by_subject<T: Into<String>>(
930 &self,
931 subject: T,
932 ) -> Result<String, GetStreamByNameError> {
933 let subject = subject.into();
934 if !is_valid_subject(subject.as_str()) {
935 return Err(GetStreamByNameError::new(
936 GetStreamByNameErrorKind::InvalidSubject,
937 ));
938 }
939 let mut names = StreamNames {
940 context: self.clone(),
941 offset: 0,
942 page_request: None,
943 streams: Vec::new(),
944 subject: Some(subject),
945 done: false,
946 };
947 match names.next().await {
948 Some(name) => match name {
949 Ok(name) => Ok(name),
950 Err(err) => Err(GetStreamByNameError::with_source(
951 GetStreamByNameErrorKind::Request,
952 err,
953 )),
954 },
955 None => Err(GetStreamByNameError::new(
956 GetStreamByNameErrorKind::NotFound,
957 )),
958 }
959 }
960
961 pub fn stream_names(&self) -> StreamNames {
979 StreamNames {
980 context: self.clone(),
981 offset: 0,
982 page_request: None,
983 streams: Vec::new(),
984 subject: None,
985 done: false,
986 }
987 }
988
989 pub fn streams(&self) -> Streams {
1007 Streams {
1008 context: self.clone(),
1009 offset: 0,
1010 page_request: None,
1011 streams: Vec::new(),
1012 done: false,
1013 }
1014 }
1015 #[cfg(feature = "kv")]
1029 #[cfg_attr(docsrs, doc(cfg(feature = "kv")))]
1030 pub async fn get_key_value<T: Into<String>>(&self, bucket: T) -> Result<Store, KeyValueError> {
1031 let bucket: String = bucket.into();
1032 if !crate::jetstream::kv::is_valid_bucket_name(&bucket) {
1033 return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
1034 }
1035
1036 let stream_name = format!("KV_{}", &bucket);
1037 let stream = self
1038 .get_stream(stream_name.clone())
1039 .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::GetBucket, err))
1040 .await?;
1041
1042 if stream.info.config.max_messages_per_subject < 1 {
1043 return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
1044 }
1045 let mut store = Store {
1046 prefix: format!("$KV.{}.", &bucket),
1047 name: bucket,
1048 stream_name,
1049 stream: stream.clone(),
1050 put_prefix: None,
1051 use_jetstream_prefix: self.prefix != "$JS.API",
1052 };
1053 if let Some(ref mirror) = stream.info.config.mirror {
1054 let bucket = mirror.name.trim_start_matches("KV_");
1055 if let Some(ref external) = mirror.external {
1056 if !external.api_prefix.is_empty() {
1057 store.use_jetstream_prefix = false;
1058 store.prefix = format!("$KV.{bucket}.");
1059 store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
1060 } else {
1061 store.put_prefix = Some(format!("$KV.{bucket}."));
1062 }
1063 }
1064 };
1065
1066 Ok(store)
1067 }
1068
1069 #[cfg(feature = "kv")]
1089 #[cfg_attr(docsrs, doc(cfg(feature = "kv")))]
1090 pub async fn create_key_value(
1091 &self,
1092 config: crate::jetstream::kv::Config,
1093 ) -> Result<Store, CreateKeyValueError> {
1094 if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1095 return Err(CreateKeyValueError::new(
1096 CreateKeyValueErrorKind::InvalidStoreName,
1097 ));
1098 }
1099 let info = self.query_account().await.map_err(|err| {
1100 CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
1101 })?;
1102
1103 let bucket_name = config.bucket.clone();
1104 let stream_config = kv_to_stream_config(config, info)?;
1105
1106 let stream = self.create_stream(stream_config).await.map_err(|err| {
1107 if err.kind() == CreateStreamErrorKind::TimedOut {
1108 CreateKeyValueError::with_source(CreateKeyValueErrorKind::TimedOut, err)
1109 } else {
1110 CreateKeyValueError::with_source(CreateKeyValueErrorKind::BucketCreate, err)
1111 }
1112 })?;
1113
1114 Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
1115 }
1116
1117 #[cfg(feature = "kv")]
1137 #[cfg_attr(docsrs, doc(cfg(feature = "kv")))]
1138 pub async fn update_key_value(
1139 &self,
1140 config: crate::jetstream::kv::Config,
1141 ) -> Result<Store, UpdateKeyValueError> {
1142 if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1143 return Err(UpdateKeyValueError::new(
1144 UpdateKeyValueErrorKind::InvalidStoreName,
1145 ));
1146 }
1147
1148 let stream_name = format!("KV_{}", config.bucket);
1149 let bucket_name = config.bucket.clone();
1150
1151 let account = self.query_account().await.map_err(|err| {
1152 UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::JetStream, err)
1153 })?;
1154 let stream = self
1155 .update_stream(kv_to_stream_config(config, account)?)
1156 .await
1157 .map_err(|err| match err.kind() {
1158 UpdateStreamErrorKind::NotFound => {
1159 UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::NotFound, err)
1160 }
1161 _ => UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::JetStream, err),
1162 })?;
1163
1164 let stream = Stream {
1165 context: self.clone(),
1166 info: stream,
1167 name: stream_name,
1168 };
1169
1170 Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
1171 }
1172
1173 #[cfg(feature = "kv")]
1193 #[cfg_attr(docsrs, doc(cfg(feature = "kv")))]
1194 pub async fn create_or_update_key_value(
1195 &self,
1196 config: crate::jetstream::kv::Config,
1197 ) -> Result<Store, CreateKeyValueError> {
1198 if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1199 return Err(CreateKeyValueError::new(
1200 CreateKeyValueErrorKind::InvalidStoreName,
1201 ));
1202 }
1203
1204 let bucket_name = config.bucket.clone();
1205 let stream_name = format!("KV_{}", config.bucket);
1206
1207 let account = self.query_account().await.map_err(|err| {
1208 CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
1209 })?;
1210 let stream = self
1211 .create_or_update_stream(kv_to_stream_config(config, account)?)
1212 .await
1213 .map_err(|err| {
1214 CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
1215 })?;
1216
1217 let stream = Stream {
1218 context: self.clone(),
1219 info: stream,
1220 name: stream_name,
1221 };
1222
1223 Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
1224 }
1225
1226 #[cfg(feature = "kv")]
1246 #[cfg_attr(docsrs, doc(cfg(feature = "kv")))]
1247 pub async fn delete_key_value<T: AsRef<str>>(
1248 &self,
1249 bucket: T,
1250 ) -> Result<DeleteStatus, KeyValueError> {
1251 if !crate::jetstream::kv::is_valid_bucket_name(bucket.as_ref()) {
1252 return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
1253 }
1254
1255 let stream_name = format!("KV_{}", bucket.as_ref());
1256 self.delete_stream(stream_name)
1257 .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::JetStream, err))
1258 .await
1259 }
1260
1261 pub async fn get_consumer_from_stream<T, C, S>(
1298 &self,
1299 consumer: C,
1300 stream: S,
1301 ) -> Result<Consumer<T>, ConsumerError>
1302 where
1303 T: FromConsumer + IntoConsumerConfig,
1304 S: AsRef<str>,
1305 C: AsRef<str>,
1306 {
1307 if !is_valid_name(stream.as_ref()) {
1308 return Err(ConsumerError::with_source(
1309 ConsumerErrorKind::InvalidName,
1310 "invalid stream",
1311 ));
1312 }
1313
1314 if !is_valid_name(consumer.as_ref()) {
1315 return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
1316 }
1317
1318 let subject = format!("CONSUMER.INFO.{}.{}", stream.as_ref(), consumer.as_ref());
1319
1320 let info: super::consumer::Info = match self.request(subject, &json!({})).await? {
1321 Response::Ok(info) => info,
1322 Response::Err { error } => return Err(error.into()),
1323 };
1324
1325 Ok(Consumer::new(
1326 T::try_from_consumer_config(info.config.clone()).map_err(|err| {
1327 ConsumerError::with_source(ConsumerErrorKind::InvalidConsumerType, err)
1328 })?,
1329 info,
1330 self.clone(),
1331 ))
1332 }
1333
1334 pub async fn delete_consumer_from_stream<C: AsRef<str>, S: AsRef<str>>(
1357 &self,
1358 consumer: C,
1359 stream: S,
1360 ) -> Result<DeleteStatus, ConsumerError> {
1361 if !is_valid_name(consumer.as_ref()) {
1362 return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
1363 }
1364
1365 if !is_valid_name(stream.as_ref()) {
1366 return Err(ConsumerError::with_source(
1367 ConsumerErrorKind::Other,
1368 "invalid stream name",
1369 ));
1370 }
1371
1372 let subject = format!("CONSUMER.DELETE.{}.{}", stream.as_ref(), consumer.as_ref());
1373
1374 match self.request(subject, &json!({})).await? {
1375 Response::Ok(delete_status) => Ok(delete_status),
1376 Response::Err { error } => Err(error.into()),
1377 }
1378 }
1379
1380 pub async fn create_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
1406 &self,
1407 config: C,
1408 stream: S,
1409 ) -> Result<Consumer<C>, ConsumerError> {
1410 self.create_consumer_on_stream_action(config, stream, ConsumerAction::CreateOrUpdate)
1411 .await
1412 }
1413
1414 #[cfg(feature = "server_2_10")]
1441 pub async fn update_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
1442 &self,
1443 config: C,
1444 stream: S,
1445 ) -> Result<Consumer<C>, ConsumerUpdateError> {
1446 self.create_consumer_on_stream_action(config, stream, ConsumerAction::Update)
1447 .await
1448 .map_err(|err| err.into())
1449 }
1450
1451 #[cfg(feature = "server_2_10")]
1478 pub async fn create_consumer_strict_on_stream<
1479 C: IntoConsumerConfig + FromConsumer,
1480 S: AsRef<str>,
1481 >(
1482 &self,
1483 config: C,
1484 stream: S,
1485 ) -> Result<Consumer<C>, ConsumerCreateStrictError> {
1486 self.create_consumer_on_stream_action(config, stream, ConsumerAction::Create)
1487 .await
1488 .map_err(|err| err.into())
1489 }
1490
1491 async fn create_consumer_on_stream_action<
1492 C: IntoConsumerConfig + FromConsumer,
1493 S: AsRef<str>,
1494 >(
1495 &self,
1496 config: C,
1497 stream: S,
1498 action: ConsumerAction,
1499 ) -> Result<Consumer<C>, ConsumerError> {
1500 let config = config.into_consumer_config();
1501
1502 let subject = {
1503 let filter = if config.filter_subject.is_empty() {
1504 "".to_string()
1505 } else {
1506 format!(".{}", config.filter_subject)
1507 };
1508 config
1509 .name
1510 .as_ref()
1511 .or(config.durable_name.as_ref())
1512 .map(|name| format!("CONSUMER.CREATE.{}.{}{}", stream.as_ref(), name, filter))
1513 .unwrap_or_else(|| format!("CONSUMER.CREATE.{}", stream.as_ref()))
1514 };
1515
1516 match self
1517 .request(
1518 subject,
1519 &json!({"stream_name": stream.as_ref(), "config": config, "action": action}),
1520 )
1521 .await?
1522 {
1523 Response::Err { error } => Err(ConsumerError::new(ConsumerErrorKind::JetStream(error))),
1524 Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
1525 FromConsumer::try_from_consumer_config(info.clone().config)
1526 .map_err(|err| ConsumerError::with_source(ConsumerErrorKind::Other, err))?,
1527 info,
1528 self.clone(),
1529 )),
1530 }
1531 }
1532
1533 pub async fn request<S, T, V>(&self, subject: S, payload: &T) -> Result<V, RequestError>
1553 where
1554 S: ToSubject,
1555 T: ?Sized + Serialize,
1556 V: DeserializeOwned,
1557 {
1558 let subject = subject.to_subject();
1559 let request = serde_json::to_vec(&payload)
1560 .map(Bytes::from)
1561 .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
1562
1563 debug!("JetStream request sent: {:?}", request);
1564
1565 let message = self
1566 .client
1567 .request(format!("{}.{}", self.prefix, subject.as_ref()), request)
1568 .await;
1569 let message = message?;
1570 debug!(
1571 "JetStream request response: {:?}",
1572 from_utf8(&message.payload)
1573 );
1574 let response = serde_json::from_slice(message.payload.as_ref())
1575 .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
1576
1577 Ok(response)
1578 }
1579
1580 pub async fn send_request<S: ToSubject>(
1589 &self,
1590 subject: S,
1591 request: crate::client::Request,
1592 ) -> Result<(), crate::PublishError> {
1593 let prefixed_subject = format!("{}.{}", self.prefix, subject.to_subject());
1594 let inbox = request.inbox.unwrap_or_else(|| self.client.new_inbox());
1595 let payload = request.payload.unwrap_or_default();
1596
1597 match request.headers {
1598 Some(headers) => {
1599 self.client
1600 .publish_with_reply_and_headers(prefixed_subject, inbox, headers, payload)
1601 .await
1602 }
1603 None => {
1604 self.client
1605 .publish_with_reply(prefixed_subject, inbox, payload)
1606 .await
1607 }
1608 }
1609 }
1610
1611 #[cfg(feature = "object-store")]
1630 #[cfg_attr(docsrs, doc(cfg(feature = "object-store")))]
1631 pub async fn create_object_store(
1632 &self,
1633 config: super::object_store::Config,
1634 ) -> Result<super::object_store::ObjectStore, CreateObjectStoreError> {
1635 if !super::object_store::is_valid_bucket_name(&config.bucket) {
1636 return Err(CreateObjectStoreError::new(
1637 CreateKeyValueErrorKind::InvalidStoreName,
1638 ));
1639 }
1640
1641 let bucket_name = config.bucket.clone();
1642 let stream_name = format!("OBJ_{bucket_name}");
1643 let chunk_subject = format!("$O.{bucket_name}.C.>");
1644 let meta_subject = format!("$O.{bucket_name}.M.>");
1645
1646 let stream = self
1647 .create_stream(super::stream::Config {
1648 name: stream_name,
1649 description: config.description.clone(),
1650 subjects: vec![chunk_subject, meta_subject],
1651 max_age: config.max_age,
1652 max_bytes: config.max_bytes,
1653 storage: config.storage,
1654 num_replicas: config.num_replicas,
1655 discard: DiscardPolicy::New,
1656 allow_rollup: true,
1657 allow_direct: true,
1658 #[cfg(feature = "server_2_10")]
1659 compression: if config.compression {
1660 Some(Compression::S2)
1661 } else {
1662 None
1663 },
1664 placement: config.placement,
1665 ..Default::default()
1666 })
1667 .await
1668 .map_err(|err| {
1669 CreateObjectStoreError::with_source(CreateKeyValueErrorKind::BucketCreate, err)
1670 })?;
1671
1672 Ok(ObjectStore {
1673 name: bucket_name,
1674 stream,
1675 })
1676 }
1677
1678 #[cfg(feature = "object-store")]
1692 #[cfg_attr(docsrs, doc(cfg(feature = "object-store")))]
1693 pub async fn get_object_store<T: AsRef<str>>(
1694 &self,
1695 bucket_name: T,
1696 ) -> Result<ObjectStore, ObjectStoreError> {
1697 let bucket_name = bucket_name.as_ref();
1698 if !is_valid_bucket_name(bucket_name) {
1699 return Err(ObjectStoreError::new(
1700 ObjectStoreErrorKind::InvalidBucketName,
1701 ));
1702 }
1703 let stream_name = format!("OBJ_{bucket_name}");
1704 let stream = self
1705 .get_stream(stream_name)
1706 .await
1707 .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?;
1708
1709 Ok(ObjectStore {
1710 name: bucket_name.to_string(),
1711 stream,
1712 })
1713 }
1714
1715 #[cfg(feature = "object-store")]
1729 #[cfg_attr(docsrs, doc(cfg(feature = "object-store")))]
1730 pub async fn delete_object_store<T: AsRef<str>>(
1731 &self,
1732 bucket_name: T,
1733 ) -> Result<(), DeleteObjectStore> {
1734 let stream_name = format!("OBJ_{}", bucket_name.as_ref());
1735 self.delete_stream(stream_name)
1736 .await
1737 .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?;
1738 Ok(())
1739 }
1740}
1741
1742impl crate::client::traits::Requester for Context {
1743 fn send_request<S: ToSubject>(
1744 &self,
1745 subject: S,
1746 request: crate::Request,
1747 ) -> impl Future<Output = Result<Message, crate::RequestError>> {
1748 self.client.send_request(subject, request)
1749 }
1750}
1751
1752impl crate::client::traits::Publisher for Context {
1753 fn publish_with_reply<S, R>(
1754 &self,
1755 subject: S,
1756 reply: R,
1757 payload: Bytes,
1758 ) -> impl Future<Output = Result<(), crate::PublishError>>
1759 where
1760 S: ToSubject,
1761 R: ToSubject,
1762 {
1763 self.client.publish_with_reply(subject, reply, payload)
1764 }
1765
1766 fn publish_message(
1767 &self,
1768 msg: crate::message::OutboundMessage,
1769 ) -> impl Future<Output = Result<(), crate::PublishError>> {
1770 self.client.publish_message(msg)
1771 }
1772}
1773
1774impl traits::ClientProvider for Context {
1775 fn client(&self) -> crate::Client {
1776 self.client()
1777 }
1778}
1779
1780impl traits::Requester for Context {
1781 fn request<S, T, V>(
1782 &self,
1783 subject: S,
1784 payload: &T,
1785 ) -> impl Future<Output = Result<V, RequestError>>
1786 where
1787 S: ToSubject,
1788 T: ?Sized + Serialize,
1789 V: DeserializeOwned,
1790 {
1791 self.request(subject, payload)
1792 }
1793}
1794
1795impl traits::TimeoutProvider for Context {
1796 fn timeout(&self) -> Duration {
1797 self.timeout
1798 }
1799}
1800
1801impl traits::RequestSender for Context {
1802 fn send_request<S: ToSubject>(
1803 &self,
1804 subject: S,
1805 request: crate::client::Request,
1806 ) -> impl Future<Output = Result<(), crate::PublishError>> {
1807 self.send_request(subject, request)
1808 }
1809}
1810
1811impl traits::Publisher for Context {
1812 fn publish<S: ToSubject>(
1813 &self,
1814 subject: S,
1815 payload: Bytes,
1816 ) -> impl Future<Output = Result<PublishAckFuture, PublishError>> {
1817 self.publish(subject, payload)
1818 }
1819
1820 fn publish_message(
1821 &self,
1822 message: jetstream::message::OutboundMessage,
1823 ) -> impl Future<Output = Result<PublishAckFuture, PublishError>> {
1824 self.send_publish(
1825 message.subject,
1826 PublishMessage {
1827 payload: message.payload,
1828 headers: message.headers,
1829 },
1830 )
1831 }
1832}
1833
1834#[derive(Clone, Copy, Debug, PartialEq)]
1835pub enum PublishErrorKind {
1836 StreamNotFound,
1837 WrongLastMessageId,
1838 WrongLastSequence,
1839 TimedOut,
1840 BrokenPipe,
1841 MaxAckPending,
1842 MaxPayloadExceeded,
1844 Other,
1845}
1846
1847impl Display for PublishErrorKind {
1848 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1849 match self {
1850 Self::StreamNotFound => write!(f, "no stream found for given subject"),
1851 Self::TimedOut => write!(f, "timed out: didn't receive ack in time"),
1852 Self::Other => write!(f, "publish failed"),
1853 Self::BrokenPipe => write!(f, "broken pipe"),
1854 Self::WrongLastMessageId => write!(f, "wrong last message id"),
1855 Self::WrongLastSequence => write!(f, "wrong last sequence"),
1856 Self::MaxAckPending => write!(f, "max ack pending reached"),
1857 Self::MaxPayloadExceeded => write!(f, "max payload size exceeded"),
1858 }
1859 }
1860}
1861
1862pub type PublishError = Error<PublishErrorKind>;
1863
1864#[derive(Debug)]
1865pub struct PublishAckFuture {
1866 timeout: Duration,
1867 subscription: Option<oneshot::Receiver<Message>>,
1868 permit: Option<OwnedSemaphorePermit>,
1869 tx: mpsc::Sender<(oneshot::Receiver<Message>, OwnedSemaphorePermit)>,
1870}
1871
1872impl Drop for PublishAckFuture {
1873 fn drop(&mut self) {
1874 if let (Some(sub), Some(permit)) = (self.subscription.take(), self.permit.take()) {
1875 if let Err(err) = self.tx.try_send((sub, permit)) {
1876 tracing::warn!("failed to pass future permit to the acker: {}", err);
1877 }
1878 }
1879 }
1880}
1881
1882impl PublishAckFuture {
1883 async fn next_with_timeout(mut self) -> Result<PublishAck, PublishError> {
1884 let next = tokio::time::timeout(self.timeout, self.subscription.take().unwrap())
1885 .await
1886 .map_err(|_| PublishError::new(PublishErrorKind::TimedOut))?;
1887 next.map_or_else(
1888 |_| Err(PublishError::new(PublishErrorKind::BrokenPipe)),
1889 |m| {
1890 if m.status == Some(StatusCode::NO_RESPONDERS) {
1891 return Err(PublishError::new(PublishErrorKind::StreamNotFound));
1892 }
1893 let response = serde_json::from_slice(m.payload.as_ref())
1894 .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err))?;
1895 match response {
1896 Response::Err { error } => match error.error_code() {
1897 ErrorCode::STREAM_WRONG_LAST_MESSAGE_ID => Err(PublishError::with_source(
1898 PublishErrorKind::WrongLastMessageId,
1899 error,
1900 )),
1901 ErrorCode::STREAM_WRONG_LAST_SEQUENCE => Err(PublishError::with_source(
1902 PublishErrorKind::WrongLastSequence,
1903 error,
1904 )),
1905 _ => Err(PublishError::with_source(PublishErrorKind::Other, error)),
1906 },
1907 Response::Ok(publish_ack) => Ok(publish_ack),
1908 }
1909 },
1910 )
1911 }
1912}
1913impl IntoFuture for PublishAckFuture {
1914 type Output = Result<PublishAck, PublishError>;
1915
1916 type IntoFuture = Pin<Box<dyn Future<Output = Result<PublishAck, PublishError>> + Send>>;
1917
1918 fn into_future(self) -> Self::IntoFuture {
1919 Box::pin(std::future::IntoFuture::into_future(
1920 self.next_with_timeout(),
1921 ))
1922 }
1923}
1924
1925#[derive(Deserialize, Debug)]
1926struct StreamPage {
1927 total: usize,
1928 streams: Option<Vec<String>>,
1929}
1930
1931#[derive(Deserialize, Debug)]
1932struct StreamInfoPage {
1933 total: usize,
1934 streams: Option<Vec<super::stream::Info>>,
1935}
1936
1937type PageRequest = BoxFuture<'static, Result<StreamPage, RequestError>>;
1938
1939pub struct StreamNames {
1940 context: Context,
1941 offset: usize,
1942 page_request: Option<PageRequest>,
1943 subject: Option<String>,
1944 streams: Vec<String>,
1945 done: bool,
1946}
1947
1948impl futures_util::Stream for StreamNames {
1949 type Item = Result<String, StreamsError>;
1950
1951 fn poll_next(
1952 mut self: Pin<&mut Self>,
1953 cx: &mut std::task::Context<'_>,
1954 ) -> std::task::Poll<Option<Self::Item>> {
1955 match self.page_request.as_mut() {
1956 Some(page) => match page.try_poll_unpin(cx) {
1957 std::task::Poll::Ready(page) => {
1958 self.page_request = None;
1959 let page = page
1960 .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?;
1961 if let Some(streams) = page.streams {
1962 self.offset += streams.len();
1963 self.streams = streams;
1964 if self.offset >= page.total {
1965 self.done = true;
1966 }
1967 match self.streams.pop() {
1968 Some(stream) => Poll::Ready(Some(Ok(stream))),
1969 None => Poll::Ready(None),
1970 }
1971 } else {
1972 Poll::Ready(None)
1973 }
1974 }
1975 std::task::Poll::Pending => std::task::Poll::Pending,
1976 },
1977 None => {
1978 if let Some(stream) = self.streams.pop() {
1979 Poll::Ready(Some(Ok(stream)))
1980 } else {
1981 if self.done {
1982 return Poll::Ready(None);
1983 }
1984 let context = self.context.clone();
1985 let offset = self.offset;
1986 let subject = self.subject.clone();
1987 self.page_request = Some(Box::pin(async move {
1988 match context
1989 .request(
1990 "STREAM.NAMES",
1991 &json!({
1992 "offset": offset,
1993 "subject": subject
1994 }),
1995 )
1996 .await?
1997 {
1998 Response::Err { error } => {
1999 Err(RequestError::with_source(RequestErrorKind::Other, error))
2000 }
2001 Response::Ok(page) => Ok(page),
2002 }
2003 }));
2004 self.poll_next(cx)
2005 }
2006 }
2007 }
2008 }
2009}
2010
2011type PageInfoRequest = BoxFuture<'static, Result<StreamInfoPage, RequestError>>;
2012
2013pub type StreamsErrorKind = RequestErrorKind;
2014pub type StreamsError = RequestError;
2015
2016pub struct Streams {
2017 context: Context,
2018 offset: usize,
2019 page_request: Option<PageInfoRequest>,
2020 streams: Vec<super::stream::Info>,
2021 done: bool,
2022}
2023
2024impl futures_util::Stream for Streams {
2025 type Item = Result<super::stream::Info, StreamsError>;
2026
2027 fn poll_next(
2028 mut self: Pin<&mut Self>,
2029 cx: &mut std::task::Context<'_>,
2030 ) -> std::task::Poll<Option<Self::Item>> {
2031 match self.page_request.as_mut() {
2032 Some(page) => match page.try_poll_unpin(cx) {
2033 std::task::Poll::Ready(page) => {
2034 self.page_request = None;
2035 let page = page
2036 .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?;
2037 if let Some(streams) = page.streams {
2038 self.offset += streams.len();
2039 self.streams = streams;
2040 if self.offset >= page.total {
2041 self.done = true;
2042 }
2043 match self.streams.pop() {
2044 Some(stream) => Poll::Ready(Some(Ok(stream))),
2045 None => Poll::Ready(None),
2046 }
2047 } else {
2048 Poll::Ready(None)
2049 }
2050 }
2051 std::task::Poll::Pending => std::task::Poll::Pending,
2052 },
2053 None => {
2054 if let Some(stream) = self.streams.pop() {
2055 Poll::Ready(Some(Ok(stream)))
2056 } else {
2057 if self.done {
2058 return Poll::Ready(None);
2059 }
2060 let context = self.context.clone();
2061 let offset = self.offset;
2062 self.page_request = Some(Box::pin(async move {
2063 match context
2064 .request(
2065 "STREAM.LIST",
2066 &json!({
2067 "offset": offset,
2068 }),
2069 )
2070 .await?
2071 {
2072 Response::Err { error } => {
2073 Err(RequestError::with_source(RequestErrorKind::Other, error))
2074 }
2075 Response::Ok(page) => Ok(page),
2076 }
2077 }));
2078 self.poll_next(cx)
2079 }
2080 }
2081 }
2082 }
2083}
2084
2085#[deprecated(
2088 note = "use jetstream::message::PublishMessage instead",
2089 since = "0.44.0"
2090)]
2091pub type Publish = super::message::PublishMessage;
2092
2093#[derive(Clone, Copy, Debug, PartialEq)]
2094pub enum RequestErrorKind {
2095 NoResponders,
2096 TimedOut,
2097 InvalidSubject,
2098 Other,
2099}
2100
2101impl Display for RequestErrorKind {
2102 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2103 match self {
2104 Self::TimedOut => write!(f, "timed out"),
2105 Self::Other => write!(f, "request failed"),
2106 Self::InvalidSubject => write!(f, "invalid subject"),
2107 Self::NoResponders => write!(f, "requested JetStream resource does not exist"),
2108 }
2109 }
2110}
2111
2112pub type RequestError = Error<RequestErrorKind>;
2113
2114impl From<crate::RequestError> for RequestError {
2115 fn from(error: crate::RequestError) -> Self {
2116 match error.kind() {
2117 crate::RequestErrorKind::TimedOut => {
2118 RequestError::with_source(RequestErrorKind::TimedOut, error)
2119 }
2120 crate::RequestErrorKind::NoResponders => {
2121 RequestError::new(RequestErrorKind::NoResponders)
2122 }
2123 crate::RequestErrorKind::InvalidSubject => {
2124 RequestError::with_source(RequestErrorKind::InvalidSubject, error)
2125 }
2126 crate::RequestErrorKind::MaxPayloadExceeded | crate::RequestErrorKind::Other => {
2127 RequestError::with_source(RequestErrorKind::Other, error)
2128 }
2129 }
2130 }
2131}
2132
2133impl From<super::errors::Error> for RequestError {
2134 fn from(err: super::errors::Error) -> Self {
2135 RequestError::with_source(RequestErrorKind::Other, err)
2136 }
2137}
2138
2139pub type ConsumerInfoError = Error<ConsumerInfoErrorKind>;
2140
2141#[derive(Clone, Debug, PartialEq)]
2142pub enum ConsumerInfoErrorKind {
2143 InvalidName,
2144 Offline,
2145 NotFound,
2146 StreamNotFound,
2147 Request,
2148 JetStream(super::errors::Error),
2149 TimedOut,
2150 NoResponders,
2151}
2152
2153impl Display for ConsumerInfoErrorKind {
2154 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2155 match self {
2156 Self::InvalidName => write!(f, "invalid consumer name"),
2157 Self::Offline => write!(f, "consumer is offline"),
2158 Self::NotFound => write!(f, "consumer not found"),
2159 Self::StreamNotFound => write!(f, "stream not found"),
2160 Self::Request => write!(f, "request error"),
2161 Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2162 Self::TimedOut => write!(f, "timed out"),
2163 Self::NoResponders => write!(f, "no responders"),
2164 }
2165 }
2166}
2167
2168impl From<super::errors::Error> for ConsumerInfoError {
2169 fn from(error: super::errors::Error) -> Self {
2170 match error.error_code() {
2171 ErrorCode::CONSUMER_NOT_FOUND => {
2172 ConsumerInfoError::new(ConsumerInfoErrorKind::NotFound)
2173 }
2174 ErrorCode::STREAM_NOT_FOUND => {
2175 ConsumerInfoError::new(ConsumerInfoErrorKind::StreamNotFound)
2176 }
2177 ErrorCode::CONSUMER_OFFLINE => ConsumerInfoError::new(ConsumerInfoErrorKind::Offline),
2178 _ => ConsumerInfoError::new(ConsumerInfoErrorKind::JetStream(error)),
2179 }
2180 }
2181}
2182
2183impl From<RequestError> for ConsumerInfoError {
2184 fn from(error: RequestError) -> Self {
2185 match error.kind() {
2186 RequestErrorKind::TimedOut => ConsumerInfoError::new(ConsumerInfoErrorKind::TimedOut),
2187 RequestErrorKind::InvalidSubject => {
2188 ConsumerInfoError::with_source(ConsumerInfoErrorKind::InvalidName, error)
2189 }
2190 RequestErrorKind::Other => {
2191 ConsumerInfoError::with_source(ConsumerInfoErrorKind::Request, error)
2192 }
2193 RequestErrorKind::NoResponders => {
2194 ConsumerInfoError::new(ConsumerInfoErrorKind::NoResponders)
2195 }
2196 }
2197 }
2198}
2199
2200#[derive(Clone, Debug, PartialEq)]
2201pub enum CreateStreamErrorKind {
2202 EmptyStreamName,
2203 InvalidStreamName,
2204 DomainAndExternalSet,
2205 JetStreamUnavailable,
2206 JetStream(super::errors::Error),
2207 TimedOut,
2208 Response,
2209 NotFound,
2210 ResponseParse,
2211}
2212
2213impl Display for CreateStreamErrorKind {
2214 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2215 match self {
2216 Self::EmptyStreamName => write!(f, "stream name cannot be empty"),
2217 Self::InvalidStreamName => write!(f, "stream name cannot contain `.`, `_`"),
2218 Self::DomainAndExternalSet => write!(f, "domain and external are both set"),
2219 Self::NotFound => write!(f, "stream not found"),
2220 Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2221 Self::TimedOut => write!(f, "jetstream request timed out"),
2222 Self::JetStreamUnavailable => write!(f, "jetstream unavailable"),
2223 Self::ResponseParse => write!(f, "failed to parse server response"),
2224 Self::Response => write!(f, "response error"),
2225 }
2226 }
2227}
2228
2229pub type CreateStreamError = Error<CreateStreamErrorKind>;
2230
2231impl From<super::errors::Error> for CreateStreamError {
2232 fn from(error: super::errors::Error) -> Self {
2233 match error.kind() {
2234 super::errors::ErrorCode::STREAM_NOT_FOUND => {
2235 CreateStreamError::new(CreateStreamErrorKind::NotFound)
2236 }
2237 _ => CreateStreamError::new(CreateStreamErrorKind::JetStream(error)),
2238 }
2239 }
2240}
2241
2242impl From<RequestError> for CreateStreamError {
2243 fn from(error: RequestError) -> Self {
2244 match error.kind() {
2245 RequestErrorKind::NoResponders => {
2246 CreateStreamError::new(CreateStreamErrorKind::JetStreamUnavailable)
2247 }
2248 RequestErrorKind::TimedOut => CreateStreamError::new(CreateStreamErrorKind::TimedOut),
2249 RequestErrorKind::InvalidSubject => {
2250 CreateStreamError::with_source(CreateStreamErrorKind::InvalidStreamName, error)
2251 }
2252 RequestErrorKind::Other => {
2253 CreateStreamError::with_source(CreateStreamErrorKind::Response, error)
2254 }
2255 }
2256 }
2257}
2258
2259#[derive(Clone, Debug, PartialEq)]
2260pub enum GetStreamErrorKind {
2261 EmptyName,
2262 Request,
2263 InvalidStreamName,
2264 JetStream(super::errors::Error),
2265}
2266
2267impl Display for GetStreamErrorKind {
2268 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2269 match self {
2270 Self::EmptyName => write!(f, "empty name cannot be empty"),
2271 Self::Request => write!(f, "request error"),
2272 Self::InvalidStreamName => write!(f, "invalid stream name"),
2273 Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2274 }
2275 }
2276}
2277
2278#[derive(Clone, Debug, PartialEq)]
2279pub enum GetStreamByNameErrorKind {
2280 Request,
2281 NotFound,
2282 InvalidSubject,
2283 JetStream(super::errors::Error),
2284}
2285
2286impl Display for GetStreamByNameErrorKind {
2287 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2288 match self {
2289 Self::Request => write!(f, "request error"),
2290 Self::NotFound => write!(f, "stream not found"),
2291 Self::InvalidSubject => write!(f, "invalid subject"),
2292 Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2293 }
2294 }
2295}
2296
2297pub type GetStreamError = Error<GetStreamErrorKind>;
2298pub type GetStreamByNameError = Error<GetStreamByNameErrorKind>;
2299
2300pub type UpdateStreamError = CreateStreamError;
2301pub type UpdateStreamErrorKind = CreateStreamErrorKind;
2302pub type DeleteStreamError = GetStreamError;
2303pub type DeleteStreamErrorKind = GetStreamErrorKind;
2304
2305#[cfg(feature = "kv")]
2306#[derive(Clone, Copy, Debug, PartialEq)]
2307pub enum KeyValueErrorKind {
2308 InvalidStoreName,
2309 GetBucket,
2310 JetStream,
2311}
2312
2313#[cfg(feature = "kv")]
2314impl Display for KeyValueErrorKind {
2315 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2316 match self {
2317 Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
2318 Self::GetBucket => write!(f, "failed to get the bucket"),
2319 Self::JetStream => write!(f, "JetStream error"),
2320 }
2321 }
2322}
2323
2324#[cfg(feature = "kv")]
2325pub type KeyValueError = Error<KeyValueErrorKind>;
2326
2327#[cfg(any(feature = "kv", feature = "object-store"))]
2328#[derive(Clone, Copy, Debug, PartialEq)]
2329pub enum CreateKeyValueErrorKind {
2330 InvalidStoreName,
2331 TooLongHistory,
2332 JetStream,
2333 BucketCreate,
2334 TimedOut,
2335 LimitMarkersNotSupported,
2336}
2337
2338#[cfg(any(feature = "kv", feature = "object-store"))]
2339impl Display for CreateKeyValueErrorKind {
2340 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2341 match self {
2342 Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
2343 Self::TooLongHistory => write!(f, "too long history"),
2344 Self::JetStream => write!(f, "JetStream error"),
2345 Self::BucketCreate => write!(f, "bucket creation failed"),
2346 Self::TimedOut => write!(f, "timed out"),
2347 Self::LimitMarkersNotSupported => {
2348 write!(f, "limit markers not supported")
2349 }
2350 }
2351 }
2352}
2353
2354#[cfg(feature = "kv")]
2355#[derive(Clone, Copy, Debug, PartialEq)]
2356pub enum UpdateKeyValueErrorKind {
2357 InvalidStoreName,
2358 TooLongHistory,
2359 JetStream,
2360 BucketUpdate,
2361 TimedOut,
2362 LimitMarkersNotSupported,
2363 NotFound,
2364}
2365
2366#[cfg(feature = "kv")]
2367impl Display for UpdateKeyValueErrorKind {
2368 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2369 match self {
2370 Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
2371 Self::TooLongHistory => write!(f, "too long history"),
2372 Self::JetStream => write!(f, "JetStream error"),
2373 Self::BucketUpdate => write!(f, "bucket creation failed"),
2374 Self::TimedOut => write!(f, "timed out"),
2375 Self::LimitMarkersNotSupported => {
2376 write!(f, "limit markers not supported")
2377 }
2378 Self::NotFound => write!(f, "bucket does not exist"),
2379 }
2380 }
2381}
2382#[cfg(feature = "kv")]
2383pub type CreateKeyValueError = Error<CreateKeyValueErrorKind>;
2384#[cfg(feature = "kv")]
2385pub type UpdateKeyValueError = Error<UpdateKeyValueErrorKind>;
2386
2387#[cfg(feature = "object-store")]
2388pub type CreateObjectStoreError = Error<CreateKeyValueErrorKind>;
2389#[cfg(feature = "object-store")]
2390pub type CreateObjectStoreErrorKind = CreateKeyValueErrorKind;
2391
2392#[cfg(feature = "object-store")]
2393#[derive(Clone, Copy, Debug, PartialEq)]
2394pub enum ObjectStoreErrorKind {
2395 InvalidBucketName,
2396 GetStore,
2397}
2398
2399#[cfg(feature = "object-store")]
2400impl Display for ObjectStoreErrorKind {
2401 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2402 match self {
2403 Self::InvalidBucketName => write!(f, "invalid Object Store bucket name"),
2404 Self::GetStore => write!(f, "failed to get Object Store"),
2405 }
2406 }
2407}
2408
2409#[cfg(feature = "object-store")]
2410pub type ObjectStoreError = Error<ObjectStoreErrorKind>;
2411
2412#[cfg(feature = "object-store")]
2413pub type DeleteObjectStore = ObjectStoreError;
2414#[cfg(feature = "object-store")]
2415pub type DeleteObjectStoreKind = ObjectStoreErrorKind;
2416
2417#[derive(Clone, Debug, PartialEq)]
2418pub enum AccountErrorKind {
2419 TimedOut,
2420 JetStream(super::errors::Error),
2421 JetStreamUnavailable,
2422 Other,
2423}
2424
2425impl Display for AccountErrorKind {
2426 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2427 match self {
2428 Self::TimedOut => write!(f, "timed out"),
2429 Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2430 Self::Other => write!(f, "error"),
2431 Self::JetStreamUnavailable => write!(f, "JetStream unavailable"),
2432 }
2433 }
2434}
2435
2436pub type AccountError = Error<AccountErrorKind>;
2437
2438impl From<RequestError> for AccountError {
2439 fn from(err: RequestError) -> Self {
2440 match err.kind {
2441 RequestErrorKind::NoResponders => {
2442 AccountError::with_source(AccountErrorKind::JetStreamUnavailable, err)
2443 }
2444 RequestErrorKind::TimedOut => AccountError::new(AccountErrorKind::TimedOut),
2445 RequestErrorKind::Other | RequestErrorKind::InvalidSubject => {
2446 AccountError::with_source(AccountErrorKind::Other, err)
2447 }
2448 }
2449 }
2450}
2451
2452#[derive(Clone, Debug, Serialize)]
2453enum ConsumerAction {
2454 #[serde(rename = "")]
2455 CreateOrUpdate,
2456 #[serde(rename = "create")]
2457 #[cfg(feature = "server_2_10")]
2458 Create,
2459 #[serde(rename = "update")]
2460 #[cfg(feature = "server_2_10")]
2461 Update,
2462}
2463
2464#[cfg(feature = "kv")]
2465fn map_to_kv(stream: super::stream::Stream, prefix: String, bucket: String) -> Store {
2467 let mut store = Store {
2468 prefix: format!("$KV.{}.", bucket.as_str()),
2469 name: bucket,
2470 stream: stream.clone(),
2471 stream_name: stream.info.config.name.clone(),
2472 put_prefix: None,
2473 use_jetstream_prefix: prefix != "$JS.API",
2474 };
2475 if let Some(ref mirror) = stream.info.config.mirror {
2476 let bucket = mirror.name.trim_start_matches("KV_");
2477 if let Some(ref external) = mirror.external {
2478 if !external.api_prefix.is_empty() {
2479 store.use_jetstream_prefix = false;
2480 store.prefix = format!("$KV.{bucket}.");
2481 store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
2482 } else {
2483 store.put_prefix = Some(format!("$KV.{bucket}."));
2484 }
2485 }
2486 };
2487 store
2488}
2489
2490#[cfg(feature = "kv")]
2491enum KvToStreamConfigError {
2492 TooLongHistory,
2493 #[allow(dead_code)]
2494 LimitMarkersNotSupported,
2495}
2496
2497#[cfg(feature = "kv")]
2498impl From<KvToStreamConfigError> for CreateKeyValueError {
2499 fn from(err: KvToStreamConfigError) -> Self {
2500 match err {
2501 KvToStreamConfigError::TooLongHistory => {
2502 CreateKeyValueError::new(CreateKeyValueErrorKind::TooLongHistory)
2503 }
2504 KvToStreamConfigError::LimitMarkersNotSupported => {
2505 CreateKeyValueError::new(CreateKeyValueErrorKind::LimitMarkersNotSupported)
2506 }
2507 }
2508 }
2509}
2510
2511#[cfg(feature = "kv")]
2512impl From<KvToStreamConfigError> for UpdateKeyValueError {
2513 fn from(err: KvToStreamConfigError) -> Self {
2514 match err {
2515 KvToStreamConfigError::TooLongHistory => {
2516 UpdateKeyValueError::new(UpdateKeyValueErrorKind::TooLongHistory)
2517 }
2518 KvToStreamConfigError::LimitMarkersNotSupported => {
2519 UpdateKeyValueError::new(UpdateKeyValueErrorKind::LimitMarkersNotSupported)
2520 }
2521 }
2522 }
2523}
2524
2525#[cfg(feature = "kv")]
2526fn kv_to_stream_config(
2528 config: crate::jetstream::kv::Config,
2529 _account: Account,
2530) -> Result<super::stream::Config, KvToStreamConfigError> {
2531 let history = if config.history > 0 {
2532 if config.history > MAX_HISTORY {
2533 return Err(KvToStreamConfigError::TooLongHistory);
2534 }
2535 config.history
2536 } else {
2537 1
2538 };
2539
2540 let num_replicas = if config.num_replicas == 0 {
2541 1
2542 } else {
2543 config.num_replicas
2544 };
2545
2546 #[cfg(feature = "server_2_11")]
2547 let (mut allow_message_ttl, mut subject_delete_marker_ttl) = (false, None);
2548
2549 #[cfg(feature = "server_2_11")]
2550 if let Some(duration) = config.limit_markers {
2551 if _account.requests.level < 1 {
2552 return Err(KvToStreamConfigError::LimitMarkersNotSupported);
2553 }
2554 allow_message_ttl = true;
2555 subject_delete_marker_ttl = Some(duration);
2556 }
2557
2558 let mut mirror = config.mirror.clone();
2559 let mut sources = config.sources.clone();
2560 let mut mirror_direct = config.mirror_direct;
2561
2562 let mut subjects = Vec::new();
2563 if let Some(ref mut mirror) = mirror {
2564 if !mirror.name.starts_with("KV_") {
2565 mirror.name = format!("KV_{}", mirror.name);
2566 }
2567 mirror_direct = true;
2568 } else if let Some(ref mut sources) = sources {
2569 for source in sources {
2570 if !source.name.starts_with("KV_") {
2571 source.name = format!("KV_{}", source.name);
2572 }
2573 }
2574 } else {
2575 subjects = vec![format!("$KV.{}.>", config.bucket)];
2576 }
2577
2578 Ok(Config {
2579 name: format!("KV_{}", config.bucket),
2580 description: Some(config.description),
2581 subjects,
2582 max_messages_per_subject: history,
2583 max_bytes: config.max_bytes,
2584 max_age: config.max_age,
2585 max_message_size: config.max_value_size,
2586 storage: config.storage,
2587 republish: config.republish,
2588 allow_rollup: true,
2589 deny_delete: true,
2590 deny_purge: false,
2591 allow_direct: true,
2592 sources,
2593 mirror,
2594 num_replicas,
2595 discard: DiscardPolicy::New,
2596 mirror_direct,
2597 #[cfg(feature = "server_2_10")]
2598 compression: if config.compression {
2599 Some(Compression::S2)
2600 } else {
2601 None
2602 },
2603 placement: config.placement,
2604 #[cfg(feature = "server_2_11")]
2605 allow_message_ttl,
2606 #[cfg(feature = "server_2_11")]
2607 subject_delete_marker_ttl,
2608 ..Default::default()
2609 })
2610}