1use crate::error::Error;
17use crate::jetstream::account::Account;
18use crate::jetstream::message::PublishMessage;
19use crate::jetstream::publish::PublishAck;
20use crate::jetstream::response::Response;
21use crate::subject::ToSubject;
22use crate::{is_valid_subject, jetstream, Client, Command, Message, StatusCode};
23use bytes::Bytes;
24use futures_util::future::BoxFuture;
25use futures_util::{Future, StreamExt, TryFutureExt};
26use serde::de::DeserializeOwned;
27use serde::{Deserialize, Serialize};
28use serde_json::{self, json};
29use std::borrow::Borrow;
30use std::fmt::Debug;
31use std::fmt::Display;
32use std::future::IntoFuture;
33use std::pin::Pin;
34use std::str::from_utf8;
35use std::sync::Arc;
36use std::task::Poll;
37use tokio::sync::{mpsc, oneshot, OwnedSemaphorePermit, TryAcquireError};
38use tokio::time::Duration;
39use tokio_stream::wrappers::ReceiverStream;
40use tracing::debug;
41
42use super::consumer::{self, Consumer, FromConsumer, IntoConsumerConfig};
43use super::errors::ErrorCode;
44use super::kv::{Store, MAX_HISTORY};
45use super::object_store::{is_valid_bucket_name, ObjectStore};
46use super::stream::{
47 self, Config, ConsumerError, ConsumerErrorKind, DeleteStatus, DiscardPolicy, External, Info,
48 Stream,
49};
50#[cfg(feature = "server_2_10")]
51use super::stream::{Compression, ConsumerCreateStrictError, ConsumerUpdateError};
52use super::{is_valid_name, kv};
53
54pub mod traits {
55 use std::{future::Future, time::Duration};
56
57 use bytes::Bytes;
58 use serde::{de::DeserializeOwned, Serialize};
59
60 use crate::{jetstream::message, subject::ToSubject, Request};
61
62 use super::RequestError;
63
64 pub trait Requester {
65 fn request<S, T, V>(
66 &self,
67 subject: S,
68 payload: &T,
69 ) -> impl Future<Output = Result<V, RequestError>>
70 where
71 S: ToSubject,
72 T: ?Sized + Serialize,
73 V: DeserializeOwned;
74 }
75
76 pub trait RequestSender {
77 fn send_request<S: ToSubject>(
78 &self,
79 subject: S,
80 request: Request,
81 ) -> impl Future<Output = Result<(), crate::PublishError>>;
82 }
83
84 pub trait Publisher {
85 fn publish<S: ToSubject>(
86 &self,
87 subject: S,
88 payload: Bytes,
89 ) -> impl Future<Output = Result<super::PublishAckFuture, super::PublishError>>;
90
91 fn publish_message(
92 &self,
93 message: message::OutboundMessage,
94 ) -> impl Future<Output = Result<super::PublishAckFuture, super::PublishError>>;
95 }
96
97 pub trait ClientProvider {
98 fn client(&self) -> crate::Client;
99 }
100
101 pub trait TimeoutProvider {
102 fn timeout(&self) -> Duration;
103 }
104}
105
106#[derive(Debug, Clone)]
108pub struct Context {
109 pub(crate) client: Client,
110 pub(crate) prefix: String,
111 pub(crate) timeout: Duration,
112 pub(crate) max_ack_semaphore: Arc<tokio::sync::Semaphore>,
113 pub(crate) ack_sender:
114 tokio::sync::mpsc::Sender<(oneshot::Receiver<Message>, OwnedSemaphorePermit)>,
115 pub(crate) backpressure_on_inflight: bool,
116 pub(crate) semaphore_capacity: usize,
117}
118
119fn spawn_acker(
120 rx: ReceiverStream<(oneshot::Receiver<Message>, OwnedSemaphorePermit)>,
121 ack_timeout: Duration,
122 concurrency: Option<usize>,
123) -> tokio::task::JoinHandle<()> {
124 tokio::spawn(async move {
125 rx.for_each_concurrent(concurrency, |(subscription, permit)| async move {
126 tokio::time::timeout(ack_timeout, subscription).await.ok();
127 drop(permit);
128 })
129 .await;
130 debug!("Acker task exited");
131 })
132}
133
134use std::marker::PhantomData;
135
136#[derive(Debug, Default)]
137pub struct Yes;
138#[derive(Debug, Default)]
139pub struct No;
140
141pub trait ToAssign: Debug {}
142
143impl ToAssign for Yes {}
144impl ToAssign for No {}
145
146pub struct ContextBuilder<PREFIX: ToAssign> {
165 prefix: String,
166 timeout: Duration,
167 semaphore_capacity: usize,
168 ack_timeout: Duration,
169 backpressure_on_inflight: bool,
170 concurrency_limit: Option<usize>,
171 _phantom: PhantomData<PREFIX>,
172}
173
174impl Default for ContextBuilder<Yes> {
175 fn default() -> Self {
176 ContextBuilder {
177 prefix: "$JS.API".to_string(),
178 timeout: Duration::from_secs(5),
179 semaphore_capacity: 5_000,
180 ack_timeout: Duration::from_secs(30),
181 backpressure_on_inflight: true,
182 concurrency_limit: None,
183 _phantom: PhantomData {},
184 }
185 }
186}
187
188impl ContextBuilder<Yes> {
189 pub fn new() -> ContextBuilder<Yes> {
191 ContextBuilder::default()
192 }
193}
194
195impl ContextBuilder<Yes> {
196 pub fn api_prefix<T: Into<String>>(self, prefix: T) -> ContextBuilder<No> {
198 ContextBuilder {
199 prefix: prefix.into(),
200 timeout: self.timeout,
201 semaphore_capacity: self.semaphore_capacity,
202 ack_timeout: self.ack_timeout,
203 backpressure_on_inflight: self.backpressure_on_inflight,
204 concurrency_limit: self.concurrency_limit,
205 _phantom: PhantomData,
206 }
207 }
208
209 pub fn domain<T: Into<String>>(self, domain: T) -> ContextBuilder<No> {
212 ContextBuilder {
213 prefix: format!("$JS.{}.API", domain.into()),
214 timeout: self.timeout,
215 semaphore_capacity: self.semaphore_capacity,
216 ack_timeout: self.ack_timeout,
217 backpressure_on_inflight: self.backpressure_on_inflight,
218 concurrency_limit: self.concurrency_limit,
219 _phantom: PhantomData,
220 }
221 }
222}
223
224impl<PREFIX> ContextBuilder<PREFIX>
225where
226 PREFIX: ToAssign,
227{
228 pub fn timeout(self, timeout: Duration) -> ContextBuilder<Yes>
230 where
231 Yes: ToAssign,
232 {
233 ContextBuilder {
234 prefix: self.prefix,
235 timeout,
236 semaphore_capacity: self.semaphore_capacity,
237 ack_timeout: self.ack_timeout,
238 backpressure_on_inflight: self.backpressure_on_inflight,
239 concurrency_limit: self.concurrency_limit,
240 _phantom: PhantomData,
241 }
242 }
243
244 pub fn ack_timeout(self, ack_timeout: Duration) -> ContextBuilder<Yes>
247 where
248 Yes: ToAssign,
249 {
250 ContextBuilder {
251 prefix: self.prefix,
252 timeout: self.timeout,
253 semaphore_capacity: self.semaphore_capacity,
254 ack_timeout,
255 backpressure_on_inflight: self.backpressure_on_inflight,
256 concurrency_limit: self.concurrency_limit,
257 _phantom: PhantomData,
258 }
259 }
260
261 pub fn max_ack_inflight(self, capacity: usize) -> ContextBuilder<Yes>
264 where
265 Yes: ToAssign,
266 {
267 ContextBuilder {
268 prefix: self.prefix,
269 timeout: self.timeout,
270 semaphore_capacity: capacity,
271 ack_timeout: self.ack_timeout,
272 backpressure_on_inflight: self.backpressure_on_inflight,
273 concurrency_limit: self.concurrency_limit,
274 _phantom: PhantomData,
275 }
276 }
277
278 pub fn backpressure_on_inflight(self, enabled: bool) -> ContextBuilder<Yes>
283 where
284 Yes: ToAssign,
285 {
286 ContextBuilder {
287 prefix: self.prefix,
288 timeout: self.timeout,
289 semaphore_capacity: self.semaphore_capacity,
290 ack_timeout: self.ack_timeout,
291 backpressure_on_inflight: enabled,
292 concurrency_limit: self.concurrency_limit,
293 _phantom: PhantomData,
294 }
295 }
296
297 pub fn concurrency_limit(self, limit: Option<usize>) -> ContextBuilder<Yes>
300 where
301 Yes: ToAssign,
302 {
303 ContextBuilder {
304 prefix: self.prefix,
305 timeout: self.timeout,
306 semaphore_capacity: self.semaphore_capacity,
307 ack_timeout: self.ack_timeout,
308 backpressure_on_inflight: self.backpressure_on_inflight,
309 concurrency_limit: limit,
310 _phantom: PhantomData,
311 }
312 }
313
314 pub fn build(self, client: Client) -> Context {
316 let (tx, rx) = tokio::sync::mpsc::channel::<(
317 oneshot::Receiver<Message>,
318 OwnedSemaphorePermit,
319 )>(self.semaphore_capacity);
320 let stream = ReceiverStream::new(rx);
321 spawn_acker(stream, self.ack_timeout, self.concurrency_limit);
322 Context {
323 client,
324 prefix: self.prefix,
325 timeout: self.timeout,
326 max_ack_semaphore: Arc::new(tokio::sync::Semaphore::new(self.semaphore_capacity)),
327 ack_sender: tx,
328 backpressure_on_inflight: self.backpressure_on_inflight,
329 semaphore_capacity: self.semaphore_capacity,
330 }
331 }
332}
333
334impl Context {
335 pub(crate) fn new(client: Client) -> Context {
336 ContextBuilder::default().build(client)
337 }
338
339 pub fn set_timeout(&mut self, timeout: Duration) {
341 self.timeout = timeout
342 }
343
344 pub fn client(&self) -> Client {
346 self.client.clone()
347 }
348
349 pub async fn wait_for_acks(&self) {
355 self.max_ack_semaphore
356 .acquire_many(self.semaphore_capacity as u32)
357 .await
358 .ok();
359 }
360
361 pub(crate) fn with_prefix<T: ToString>(client: Client, prefix: T) -> Context {
363 ContextBuilder::new()
364 .api_prefix(prefix.to_string())
365 .build(client)
366 }
367
368 pub(crate) fn with_domain<T: AsRef<str>>(client: Client, domain: T) -> Context {
370 ContextBuilder::new().domain(domain.as_ref()).build(client)
371 }
372
373 pub async fn publish<S: ToSubject>(
414 &self,
415 subject: S,
416 payload: Bytes,
417 ) -> Result<PublishAckFuture, PublishError> {
418 self.send_publish(subject, PublishMessage::build().payload(payload))
419 .await
420 }
421
422 pub async fn publish_with_headers<S: ToSubject>(
444 &self,
445 subject: S,
446 headers: crate::header::HeaderMap,
447 payload: Bytes,
448 ) -> Result<PublishAckFuture, PublishError> {
449 self.send_publish(
450 subject,
451 PublishMessage::build().payload(payload).headers(headers),
452 )
453 .await
454 }
455
456 pub async fn send_publish<S: ToSubject>(
479 &self,
480 subject: S,
481 publish: PublishMessage,
482 ) -> Result<PublishAckFuture, PublishError> {
483 let permit = if self.backpressure_on_inflight {
484 self.max_ack_semaphore
486 .clone()
487 .acquire_owned()
488 .await
489 .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err))?
490 } else {
491 self.max_ack_semaphore
493 .clone()
494 .try_acquire_owned()
495 .map_err(|err| match err {
496 TryAcquireError::NoPermits => {
497 PublishError::new(PublishErrorKind::MaxAckPending)
498 }
499 _ => PublishError::with_source(PublishErrorKind::Other, err),
500 })?
501 };
502 let subject = subject.to_subject();
503 let (sender, receiver) = oneshot::channel();
504
505 let respond = self.client.new_inbox().into();
506
507 let send_fut = self
508 .client
509 .sender
510 .send(Command::Request {
511 subject,
512 payload: publish.payload,
513 respond,
514 headers: publish.headers,
515 sender,
516 })
517 .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err));
518
519 tokio::time::timeout(self.timeout, send_fut)
520 .map_err(|_elapsed| PublishError::new(PublishErrorKind::TimedOut))
521 .await??;
522
523 Ok(PublishAckFuture {
524 timeout: self.timeout,
525 subscription: Some(receiver),
526 permit: Some(permit),
527 tx: self.ack_sender.clone(),
528 })
529 }
530
531 pub async fn query_account(&self) -> Result<Account, AccountError> {
533 let response: Response<Account> = self.request("INFO", b"").await?;
534
535 match response {
536 Response::Err { error } => Err(AccountError::new(AccountErrorKind::JetStream(error))),
537 Response::Ok(account) => Ok(account),
538 }
539 }
540
541 pub async fn create_stream<S>(
566 &self,
567 stream_config: S,
568 ) -> Result<Stream<Info>, CreateStreamError>
569 where
570 Config: From<S>,
571 {
572 let mut config: Config = stream_config.into();
573 if config.name.is_empty() {
574 return Err(CreateStreamError::new(
575 CreateStreamErrorKind::EmptyStreamName,
576 ));
577 }
578 if !is_valid_name(config.name.as_str()) {
579 return Err(CreateStreamError::new(
580 CreateStreamErrorKind::InvalidStreamName,
581 ));
582 }
583 if let Some(ref mut mirror) = config.mirror {
584 if let Some(ref mut domain) = mirror.domain {
585 if mirror.external.is_some() {
586 return Err(CreateStreamError::new(
587 CreateStreamErrorKind::DomainAndExternalSet,
588 ));
589 }
590 mirror.external = Some(External {
591 api_prefix: format!("$JS.{domain}.API"),
592 delivery_prefix: None,
593 })
594 }
595 }
596
597 if let Some(ref mut sources) = config.sources {
598 for source in sources {
599 if let Some(ref mut domain) = source.domain {
600 if source.external.is_some() {
601 return Err(CreateStreamError::new(
602 CreateStreamErrorKind::DomainAndExternalSet,
603 ));
604 }
605 source.external = Some(External {
606 api_prefix: format!("$JS.{domain}.API"),
607 delivery_prefix: None,
608 })
609 }
610 }
611 }
612 let subject = format!("STREAM.CREATE.{}", config.name);
613 let response: Response<Info> = self.request(subject, &config).await?;
614
615 match response {
616 Response::Err { error } => Err(error.into()),
617 Response::Ok(info) => Ok(Stream {
618 context: self.clone(),
619 info,
620 name: config.name,
621 }),
622 }
623 }
624
625 pub async fn get_stream_no_info<T: AsRef<str>>(
645 &self,
646 stream: T,
647 ) -> Result<Stream<()>, GetStreamError> {
648 let stream = stream.as_ref();
649 if stream.is_empty() {
650 return Err(GetStreamError::new(GetStreamErrorKind::EmptyName));
651 }
652
653 if !is_valid_name(stream) {
654 return Err(GetStreamError::new(GetStreamErrorKind::InvalidStreamName));
655 }
656
657 Ok(Stream {
658 context: self.clone(),
659 info: (),
660 name: stream.to_string(),
661 })
662 }
663
664 pub async fn get_stream<T: AsRef<str>>(&self, stream: T) -> Result<Stream, GetStreamError> {
680 let stream = stream.as_ref();
681 if stream.is_empty() {
682 return Err(GetStreamError::new(GetStreamErrorKind::EmptyName));
683 }
684
685 if !is_valid_name(stream) {
686 return Err(GetStreamError::new(GetStreamErrorKind::InvalidStreamName));
687 }
688
689 let subject = format!("STREAM.INFO.{stream}");
690 let request: Response<Info> = self
691 .request(subject, &())
692 .await
693 .map_err(|err| GetStreamError::with_source(GetStreamErrorKind::Request, err))?;
694 match request {
695 Response::Err { error } => {
696 Err(GetStreamError::new(GetStreamErrorKind::JetStream(error)))
697 }
698 Response::Ok(info) => Ok(Stream {
699 context: self.clone(),
700 info,
701 name: stream.to_string(),
702 }),
703 }
704 }
705
706 pub async fn get_or_create_stream<S>(
730 &self,
731 stream_config: S,
732 ) -> Result<Stream, CreateStreamError>
733 where
734 S: Into<Config>,
735 {
736 let config: Config = stream_config.into();
737
738 if config.name.is_empty() {
739 return Err(CreateStreamError::new(
740 CreateStreamErrorKind::EmptyStreamName,
741 ));
742 }
743
744 if !is_valid_name(config.name.as_str()) {
745 return Err(CreateStreamError::new(
746 CreateStreamErrorKind::InvalidStreamName,
747 ));
748 }
749 let subject = format!("STREAM.INFO.{}", config.name);
750
751 let request: Response<Info> = self.request(subject, &()).await?;
752 match request {
753 Response::Err { error } if error.code() == 404 => self.create_stream(&config).await,
754 Response::Err { error } => Err(error.into()),
755 Response::Ok(info) => Ok(Stream {
756 context: self.clone(),
757 info,
758 name: config.name,
759 }),
760 }
761 }
762
763 pub async fn delete_stream<T: AsRef<str>>(
779 &self,
780 stream: T,
781 ) -> Result<DeleteStatus, DeleteStreamError> {
782 let stream = stream.as_ref();
783 if stream.is_empty() {
784 return Err(DeleteStreamError::new(DeleteStreamErrorKind::EmptyName));
785 }
786
787 if !is_valid_name(stream) {
788 return Err(DeleteStreamError::new(
789 DeleteStreamErrorKind::InvalidStreamName,
790 ));
791 }
792
793 let subject = format!("STREAM.DELETE.{stream}");
794 match self
795 .request(subject, &json!({}))
796 .await
797 .map_err(|err| DeleteStreamError::with_source(DeleteStreamErrorKind::Request, err))?
798 {
799 Response::Err { error } => Err(DeleteStreamError::new(
800 DeleteStreamErrorKind::JetStream(error),
801 )),
802 Response::Ok(delete_response) => Ok(delete_response),
803 }
804 }
805
806 pub async fn update_stream<S>(&self, config: S) -> Result<Info, UpdateStreamError>
831 where
832 S: Borrow<Config>,
833 {
834 let config = config.borrow();
835
836 if config.name.is_empty() {
837 return Err(CreateStreamError::new(
838 CreateStreamErrorKind::EmptyStreamName,
839 ));
840 }
841
842 if !is_valid_name(config.name.as_str()) {
843 return Err(CreateStreamError::new(
844 CreateStreamErrorKind::InvalidStreamName,
845 ));
846 }
847
848 let subject = format!("STREAM.UPDATE.{}", config.name);
849 match self.request(subject, config).await? {
850 Response::Err { error } => Err(error.into()),
851 Response::Ok(info) => Ok(info),
852 }
853 }
854
855 pub async fn create_or_update_stream(&self, config: Config) -> Result<Info, CreateStreamError> {
879 match self.update_stream(config.clone()).await {
880 Ok(stream) => Ok(stream),
881 Err(err) => match err.kind() {
882 CreateStreamErrorKind::NotFound => {
883 let stream = self
884 .create_stream(config)
885 .await
886 .map_err(|err| CreateStreamError::with_source(err.kind(), err))?;
887 Ok(stream.info)
888 }
889 _ => Err(err),
890 },
891 }
892 }
893
894 pub async fn stream_by_subject<T: Into<String>>(
909 &self,
910 subject: T,
911 ) -> Result<String, GetStreamByNameError> {
912 let subject = subject.into();
913 if !is_valid_subject(subject.as_str()) {
914 return Err(GetStreamByNameError::new(
915 GetStreamByNameErrorKind::InvalidSubject,
916 ));
917 }
918 let mut names = StreamNames {
919 context: self.clone(),
920 offset: 0,
921 page_request: None,
922 streams: Vec::new(),
923 subject: Some(subject),
924 done: false,
925 };
926 match names.next().await {
927 Some(name) => match name {
928 Ok(name) => Ok(name),
929 Err(err) => Err(GetStreamByNameError::with_source(
930 GetStreamByNameErrorKind::Request,
931 err,
932 )),
933 },
934 None => Err(GetStreamByNameError::new(
935 GetStreamByNameErrorKind::NotFound,
936 )),
937 }
938 }
939
940 pub fn stream_names(&self) -> StreamNames {
958 StreamNames {
959 context: self.clone(),
960 offset: 0,
961 page_request: None,
962 streams: Vec::new(),
963 subject: None,
964 done: false,
965 }
966 }
967
968 pub fn streams(&self) -> Streams {
986 Streams {
987 context: self.clone(),
988 offset: 0,
989 page_request: None,
990 streams: Vec::new(),
991 done: false,
992 }
993 }
994 pub async fn get_key_value<T: Into<String>>(&self, bucket: T) -> Result<Store, KeyValueError> {
1008 let bucket: String = bucket.into();
1009 if !crate::jetstream::kv::is_valid_bucket_name(&bucket) {
1010 return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
1011 }
1012
1013 let stream_name = format!("KV_{}", &bucket);
1014 let stream = self
1015 .get_stream(stream_name.clone())
1016 .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::GetBucket, err))
1017 .await?;
1018
1019 if stream.info.config.max_messages_per_subject < 1 {
1020 return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
1021 }
1022 let mut store = Store {
1023 prefix: format!("$KV.{}.", &bucket),
1024 name: bucket,
1025 stream_name,
1026 stream: stream.clone(),
1027 put_prefix: None,
1028 use_jetstream_prefix: self.prefix != "$JS.API",
1029 };
1030 if let Some(ref mirror) = stream.info.config.mirror {
1031 let bucket = mirror.name.trim_start_matches("KV_");
1032 if let Some(ref external) = mirror.external {
1033 if !external.api_prefix.is_empty() {
1034 store.use_jetstream_prefix = false;
1035 store.prefix = format!("$KV.{bucket}.");
1036 store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
1037 } else {
1038 store.put_prefix = Some(format!("$KV.{bucket}."));
1039 }
1040 }
1041 };
1042
1043 Ok(store)
1044 }
1045
1046 pub async fn create_key_value(
1066 &self,
1067 config: crate::jetstream::kv::Config,
1068 ) -> Result<Store, CreateKeyValueError> {
1069 if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1070 return Err(CreateKeyValueError::new(
1071 CreateKeyValueErrorKind::InvalidStoreName,
1072 ));
1073 }
1074 let info = self.query_account().await.map_err(|err| {
1075 CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
1076 })?;
1077
1078 let bucket_name = config.bucket.clone();
1079 let stream_config = kv_to_stream_config(config, info)?;
1080
1081 let stream = self.create_stream(stream_config).await.map_err(|err| {
1082 if err.kind() == CreateStreamErrorKind::TimedOut {
1083 CreateKeyValueError::with_source(CreateKeyValueErrorKind::TimedOut, err)
1084 } else {
1085 CreateKeyValueError::with_source(CreateKeyValueErrorKind::BucketCreate, err)
1086 }
1087 })?;
1088
1089 Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
1090 }
1091
1092 pub async fn update_key_value(
1112 &self,
1113 config: crate::jetstream::kv::Config,
1114 ) -> Result<Store, UpdateKeyValueError> {
1115 if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1116 return Err(UpdateKeyValueError::new(
1117 UpdateKeyValueErrorKind::InvalidStoreName,
1118 ));
1119 }
1120
1121 let stream_name = format!("KV_{}", config.bucket);
1122 let bucket_name = config.bucket.clone();
1123
1124 let account = self.query_account().await.map_err(|err| {
1125 UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::JetStream, err)
1126 })?;
1127 let stream = self
1128 .update_stream(kv_to_stream_config(config, account)?)
1129 .await
1130 .map_err(|err| match err.kind() {
1131 UpdateStreamErrorKind::NotFound => {
1132 UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::NotFound, err)
1133 }
1134 _ => UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::JetStream, err),
1135 })?;
1136
1137 let stream = Stream {
1138 context: self.clone(),
1139 info: stream,
1140 name: stream_name,
1141 };
1142
1143 Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
1144 }
1145
1146 pub async fn create_or_update_key_value(
1166 &self,
1167 config: crate::jetstream::kv::Config,
1168 ) -> Result<Store, CreateKeyValueError> {
1169 if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1170 return Err(CreateKeyValueError::new(
1171 CreateKeyValueErrorKind::InvalidStoreName,
1172 ));
1173 }
1174
1175 let bucket_name = config.bucket.clone();
1176 let stream_name = format!("KV_{}", config.bucket);
1177
1178 let account = self.query_account().await.map_err(|err| {
1179 CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
1180 })?;
1181 let stream = self
1182 .create_or_update_stream(kv_to_stream_config(config, account)?)
1183 .await
1184 .map_err(|err| {
1185 CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
1186 })?;
1187
1188 let stream = Stream {
1189 context: self.clone(),
1190 info: stream,
1191 name: stream_name,
1192 };
1193
1194 Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
1195 }
1196
1197 pub async fn delete_key_value<T: AsRef<str>>(
1217 &self,
1218 bucket: T,
1219 ) -> Result<DeleteStatus, KeyValueError> {
1220 if !crate::jetstream::kv::is_valid_bucket_name(bucket.as_ref()) {
1221 return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
1222 }
1223
1224 let stream_name = format!("KV_{}", bucket.as_ref());
1225 self.delete_stream(stream_name)
1226 .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::JetStream, err))
1227 .await
1228 }
1229
1230 pub async fn get_consumer_from_stream<T, C, S>(
1267 &self,
1268 consumer: C,
1269 stream: S,
1270 ) -> Result<Consumer<T>, ConsumerError>
1271 where
1272 T: FromConsumer + IntoConsumerConfig,
1273 S: AsRef<str>,
1274 C: AsRef<str>,
1275 {
1276 if !is_valid_name(stream.as_ref()) {
1277 return Err(ConsumerError::with_source(
1278 ConsumerErrorKind::InvalidName,
1279 "invalid stream",
1280 ));
1281 }
1282
1283 if !is_valid_name(consumer.as_ref()) {
1284 return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
1285 }
1286
1287 let subject = format!("CONSUMER.INFO.{}.{}", stream.as_ref(), consumer.as_ref());
1288
1289 let info: super::consumer::Info = match self.request(subject, &json!({})).await? {
1290 Response::Ok(info) => info,
1291 Response::Err { error } => return Err(error.into()),
1292 };
1293
1294 Ok(Consumer::new(
1295 T::try_from_consumer_config(info.config.clone()).map_err(|err| {
1296 ConsumerError::with_source(ConsumerErrorKind::InvalidConsumerType, err)
1297 })?,
1298 info,
1299 self.clone(),
1300 ))
1301 }
1302
1303 pub async fn delete_consumer_from_stream<C: AsRef<str>, S: AsRef<str>>(
1326 &self,
1327 consumer: C,
1328 stream: S,
1329 ) -> Result<DeleteStatus, ConsumerError> {
1330 if !is_valid_name(consumer.as_ref()) {
1331 return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
1332 }
1333
1334 if !is_valid_name(stream.as_ref()) {
1335 return Err(ConsumerError::with_source(
1336 ConsumerErrorKind::Other,
1337 "invalid stream name",
1338 ));
1339 }
1340
1341 let subject = format!("CONSUMER.DELETE.{}.{}", stream.as_ref(), consumer.as_ref());
1342
1343 match self.request(subject, &json!({})).await? {
1344 Response::Ok(delete_status) => Ok(delete_status),
1345 Response::Err { error } => Err(error.into()),
1346 }
1347 }
1348
1349 pub async fn create_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
1375 &self,
1376 config: C,
1377 stream: S,
1378 ) -> Result<Consumer<C>, ConsumerError> {
1379 self.create_consumer_on_stream_action(config, stream, ConsumerAction::CreateOrUpdate)
1380 .await
1381 }
1382
1383 #[cfg(feature = "server_2_10")]
1410 pub async fn update_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
1411 &self,
1412 config: C,
1413 stream: S,
1414 ) -> Result<Consumer<C>, ConsumerUpdateError> {
1415 self.create_consumer_on_stream_action(config, stream, ConsumerAction::Update)
1416 .await
1417 .map_err(|err| err.into())
1418 }
1419
1420 #[cfg(feature = "server_2_10")]
1447 pub async fn create_consumer_strict_on_stream<
1448 C: IntoConsumerConfig + FromConsumer,
1449 S: AsRef<str>,
1450 >(
1451 &self,
1452 config: C,
1453 stream: S,
1454 ) -> Result<Consumer<C>, ConsumerCreateStrictError> {
1455 self.create_consumer_on_stream_action(config, stream, ConsumerAction::Create)
1456 .await
1457 .map_err(|err| err.into())
1458 }
1459
1460 async fn create_consumer_on_stream_action<
1461 C: IntoConsumerConfig + FromConsumer,
1462 S: AsRef<str>,
1463 >(
1464 &self,
1465 config: C,
1466 stream: S,
1467 action: ConsumerAction,
1468 ) -> Result<Consumer<C>, ConsumerError> {
1469 let config = config.into_consumer_config();
1470
1471 let subject = {
1472 let filter = if config.filter_subject.is_empty() {
1473 "".to_string()
1474 } else {
1475 format!(".{}", config.filter_subject)
1476 };
1477 config
1478 .name
1479 .as_ref()
1480 .or(config.durable_name.as_ref())
1481 .map(|name| format!("CONSUMER.CREATE.{}.{}{}", stream.as_ref(), name, filter))
1482 .unwrap_or_else(|| format!("CONSUMER.CREATE.{}", stream.as_ref()))
1483 };
1484
1485 match self
1486 .request(
1487 subject,
1488 &json!({"stream_name": stream.as_ref(), "config": config, "action": action}),
1489 )
1490 .await?
1491 {
1492 Response::Err { error } => Err(ConsumerError::new(ConsumerErrorKind::JetStream(error))),
1493 Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
1494 FromConsumer::try_from_consumer_config(info.clone().config)
1495 .map_err(|err| ConsumerError::with_source(ConsumerErrorKind::Other, err))?,
1496 info,
1497 self.clone(),
1498 )),
1499 }
1500 }
1501
1502 pub async fn request<S, T, V>(&self, subject: S, payload: &T) -> Result<V, RequestError>
1522 where
1523 S: ToSubject,
1524 T: ?Sized + Serialize,
1525 V: DeserializeOwned,
1526 {
1527 let subject = subject.to_subject();
1528 let request = serde_json::to_vec(&payload)
1529 .map(Bytes::from)
1530 .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
1531
1532 debug!("JetStream request sent: {:?}", request);
1533
1534 let message = self
1535 .client
1536 .request(format!("{}.{}", self.prefix, subject.as_ref()), request)
1537 .await;
1538 let message = message?;
1539 debug!(
1540 "JetStream request response: {:?}",
1541 from_utf8(&message.payload)
1542 );
1543 let response = serde_json::from_slice(message.payload.as_ref())
1544 .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
1545
1546 Ok(response)
1547 }
1548
1549 pub async fn send_request<S: ToSubject>(
1558 &self,
1559 subject: S,
1560 request: crate::client::Request,
1561 ) -> Result<(), crate::PublishError> {
1562 let prefixed_subject = format!("{}.{}", self.prefix, subject.to_subject());
1563 let inbox = request.inbox.unwrap_or_else(|| self.client.new_inbox());
1564 let payload = request.payload.unwrap_or_default();
1565
1566 match request.headers {
1567 Some(headers) => {
1568 self.client
1569 .publish_with_reply_and_headers(prefixed_subject, inbox, headers, payload)
1570 .await
1571 }
1572 None => {
1573 self.client
1574 .publish_with_reply(prefixed_subject, inbox, payload)
1575 .await
1576 }
1577 }
1578 }
1579
1580 pub async fn create_object_store(
1599 &self,
1600 config: super::object_store::Config,
1601 ) -> Result<super::object_store::ObjectStore, CreateObjectStoreError> {
1602 if !super::object_store::is_valid_bucket_name(&config.bucket) {
1603 return Err(CreateObjectStoreError::new(
1604 CreateKeyValueErrorKind::InvalidStoreName,
1605 ));
1606 }
1607
1608 let bucket_name = config.bucket.clone();
1609 let stream_name = format!("OBJ_{bucket_name}");
1610 let chunk_subject = format!("$O.{bucket_name}.C.>");
1611 let meta_subject = format!("$O.{bucket_name}.M.>");
1612
1613 let stream = self
1614 .create_stream(super::stream::Config {
1615 name: stream_name,
1616 description: config.description.clone(),
1617 subjects: vec![chunk_subject, meta_subject],
1618 max_age: config.max_age,
1619 max_bytes: config.max_bytes,
1620 storage: config.storage,
1621 num_replicas: config.num_replicas,
1622 discard: DiscardPolicy::New,
1623 allow_rollup: true,
1624 allow_direct: true,
1625 #[cfg(feature = "server_2_10")]
1626 compression: if config.compression {
1627 Some(Compression::S2)
1628 } else {
1629 None
1630 },
1631 placement: config.placement,
1632 ..Default::default()
1633 })
1634 .await
1635 .map_err(|err| {
1636 CreateObjectStoreError::with_source(CreateKeyValueErrorKind::BucketCreate, err)
1637 })?;
1638
1639 Ok(ObjectStore {
1640 name: bucket_name,
1641 stream,
1642 })
1643 }
1644
1645 pub async fn get_object_store<T: AsRef<str>>(
1659 &self,
1660 bucket_name: T,
1661 ) -> Result<ObjectStore, ObjectStoreError> {
1662 let bucket_name = bucket_name.as_ref();
1663 if !is_valid_bucket_name(bucket_name) {
1664 return Err(ObjectStoreError::new(
1665 ObjectStoreErrorKind::InvalidBucketName,
1666 ));
1667 }
1668 let stream_name = format!("OBJ_{bucket_name}");
1669 let stream = self
1670 .get_stream(stream_name)
1671 .await
1672 .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?;
1673
1674 Ok(ObjectStore {
1675 name: bucket_name.to_string(),
1676 stream,
1677 })
1678 }
1679
1680 pub async fn delete_object_store<T: AsRef<str>>(
1694 &self,
1695 bucket_name: T,
1696 ) -> Result<(), DeleteObjectStore> {
1697 let stream_name = format!("OBJ_{}", bucket_name.as_ref());
1698 self.delete_stream(stream_name)
1699 .await
1700 .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?;
1701 Ok(())
1702 }
1703}
1704
1705impl crate::client::traits::Requester for Context {
1706 fn send_request<S: ToSubject>(
1707 &self,
1708 subject: S,
1709 request: crate::Request,
1710 ) -> impl Future<Output = Result<Message, crate::RequestError>> {
1711 self.client.send_request(subject, request)
1712 }
1713}
1714
1715impl crate::client::traits::Publisher for Context {
1716 fn publish_with_reply<S: ToSubject, R: ToSubject>(
1717 &self,
1718 subject: S,
1719 reply: R,
1720 payload: Bytes,
1721 ) -> impl Future<Output = Result<(), crate::PublishError>> {
1722 self.client.publish_with_reply(subject, reply, payload)
1723 }
1724
1725 fn publish_message(
1726 &self,
1727 msg: crate::message::OutboundMessage,
1728 ) -> impl Future<Output = Result<(), crate::PublishError>> {
1729 self.client.publish_message(msg)
1730 }
1731}
1732
1733impl traits::ClientProvider for Context {
1734 fn client(&self) -> crate::Client {
1735 self.client()
1736 }
1737}
1738
1739impl traits::Requester for Context {
1740 fn request<S, T, V>(
1741 &self,
1742 subject: S,
1743 payload: &T,
1744 ) -> impl Future<Output = Result<V, RequestError>>
1745 where
1746 S: ToSubject,
1747 T: ?Sized + Serialize,
1748 V: DeserializeOwned,
1749 {
1750 self.request(subject, payload)
1751 }
1752}
1753
1754impl traits::TimeoutProvider for Context {
1755 fn timeout(&self) -> Duration {
1756 self.timeout
1757 }
1758}
1759
1760impl traits::RequestSender for Context {
1761 fn send_request<S: ToSubject>(
1762 &self,
1763 subject: S,
1764 request: crate::client::Request,
1765 ) -> impl Future<Output = Result<(), crate::PublishError>> {
1766 self.send_request(subject, request)
1767 }
1768}
1769
1770impl traits::Publisher for Context {
1771 fn publish<S: ToSubject>(
1772 &self,
1773 subject: S,
1774 payload: Bytes,
1775 ) -> impl Future<Output = Result<PublishAckFuture, PublishError>> {
1776 self.publish(subject, payload)
1777 }
1778
1779 fn publish_message(
1780 &self,
1781 message: jetstream::message::OutboundMessage,
1782 ) -> impl Future<Output = Result<PublishAckFuture, PublishError>> {
1783 self.send_publish(
1784 message.subject,
1785 PublishMessage {
1786 payload: message.payload,
1787 headers: message.headers,
1788 },
1789 )
1790 }
1791}
1792
1793#[derive(Clone, Copy, Debug, PartialEq)]
1794pub enum PublishErrorKind {
1795 StreamNotFound,
1796 WrongLastMessageId,
1797 WrongLastSequence,
1798 TimedOut,
1799 BrokenPipe,
1800 MaxAckPending,
1801 Other,
1802}
1803
1804impl Display for PublishErrorKind {
1805 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1806 match self {
1807 Self::StreamNotFound => write!(f, "no stream found for given subject"),
1808 Self::TimedOut => write!(f, "timed out: didn't receive ack in time"),
1809 Self::Other => write!(f, "publish failed"),
1810 Self::BrokenPipe => write!(f, "broken pipe"),
1811 Self::WrongLastMessageId => write!(f, "wrong last message id"),
1812 Self::WrongLastSequence => write!(f, "wrong last sequence"),
1813 Self::MaxAckPending => write!(f, "max ack pending reached"),
1814 }
1815 }
1816}
1817
1818pub type PublishError = Error<PublishErrorKind>;
1819
1820#[derive(Debug)]
1821pub struct PublishAckFuture {
1822 timeout: Duration,
1823 subscription: Option<oneshot::Receiver<Message>>,
1824 permit: Option<OwnedSemaphorePermit>,
1825 tx: mpsc::Sender<(oneshot::Receiver<Message>, OwnedSemaphorePermit)>,
1826}
1827
1828impl Drop for PublishAckFuture {
1829 fn drop(&mut self) {
1830 if let (Some(sub), Some(permit)) = (self.subscription.take(), self.permit.take()) {
1831 if let Err(err) = self.tx.try_send((sub, permit)) {
1832 tracing::warn!("failed to pass future permit to the acker: {}", err);
1833 }
1834 }
1835 }
1836}
1837
1838impl PublishAckFuture {
1839 async fn next_with_timeout(mut self) -> Result<PublishAck, PublishError> {
1840 let next = tokio::time::timeout(self.timeout, self.subscription.take().unwrap())
1841 .await
1842 .map_err(|_| PublishError::new(PublishErrorKind::TimedOut))?;
1843 next.map_or_else(
1844 |_| Err(PublishError::new(PublishErrorKind::BrokenPipe)),
1845 |m| {
1846 if m.status == Some(StatusCode::NO_RESPONDERS) {
1847 return Err(PublishError::new(PublishErrorKind::StreamNotFound));
1848 }
1849 let response = serde_json::from_slice(m.payload.as_ref())
1850 .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err))?;
1851 match response {
1852 Response::Err { error } => match error.error_code() {
1853 ErrorCode::STREAM_WRONG_LAST_MESSAGE_ID => Err(PublishError::with_source(
1854 PublishErrorKind::WrongLastMessageId,
1855 error,
1856 )),
1857 ErrorCode::STREAM_WRONG_LAST_SEQUENCE => Err(PublishError::with_source(
1858 PublishErrorKind::WrongLastSequence,
1859 error,
1860 )),
1861 _ => Err(PublishError::with_source(PublishErrorKind::Other, error)),
1862 },
1863 Response::Ok(publish_ack) => Ok(publish_ack),
1864 }
1865 },
1866 )
1867 }
1868}
1869impl IntoFuture for PublishAckFuture {
1870 type Output = Result<PublishAck, PublishError>;
1871
1872 type IntoFuture = Pin<Box<dyn Future<Output = Result<PublishAck, PublishError>> + Send>>;
1873
1874 fn into_future(self) -> Self::IntoFuture {
1875 Box::pin(std::future::IntoFuture::into_future(
1876 self.next_with_timeout(),
1877 ))
1878 }
1879}
1880
1881#[derive(Deserialize, Debug)]
1882struct StreamPage {
1883 total: usize,
1884 streams: Option<Vec<String>>,
1885}
1886
1887#[derive(Deserialize, Debug)]
1888struct StreamInfoPage {
1889 total: usize,
1890 streams: Option<Vec<super::stream::Info>>,
1891}
1892
1893type PageRequest = BoxFuture<'static, Result<StreamPage, RequestError>>;
1894
1895pub struct StreamNames {
1896 context: Context,
1897 offset: usize,
1898 page_request: Option<PageRequest>,
1899 subject: Option<String>,
1900 streams: Vec<String>,
1901 done: bool,
1902}
1903
1904impl futures_util::Stream for StreamNames {
1905 type Item = Result<String, StreamsError>;
1906
1907 fn poll_next(
1908 mut self: Pin<&mut Self>,
1909 cx: &mut std::task::Context<'_>,
1910 ) -> std::task::Poll<Option<Self::Item>> {
1911 match self.page_request.as_mut() {
1912 Some(page) => match page.try_poll_unpin(cx) {
1913 std::task::Poll::Ready(page) => {
1914 self.page_request = None;
1915 let page = page
1916 .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?;
1917 if let Some(streams) = page.streams {
1918 self.offset += streams.len();
1919 self.streams = streams;
1920 if self.offset >= page.total {
1921 self.done = true;
1922 }
1923 match self.streams.pop() {
1924 Some(stream) => Poll::Ready(Some(Ok(stream))),
1925 None => Poll::Ready(None),
1926 }
1927 } else {
1928 Poll::Ready(None)
1929 }
1930 }
1931 std::task::Poll::Pending => std::task::Poll::Pending,
1932 },
1933 None => {
1934 if let Some(stream) = self.streams.pop() {
1935 Poll::Ready(Some(Ok(stream)))
1936 } else {
1937 if self.done {
1938 return Poll::Ready(None);
1939 }
1940 let context = self.context.clone();
1941 let offset = self.offset;
1942 let subject = self.subject.clone();
1943 self.page_request = Some(Box::pin(async move {
1944 match context
1945 .request(
1946 "STREAM.NAMES",
1947 &json!({
1948 "offset": offset,
1949 "subject": subject
1950 }),
1951 )
1952 .await?
1953 {
1954 Response::Err { error } => {
1955 Err(RequestError::with_source(RequestErrorKind::Other, error))
1956 }
1957 Response::Ok(page) => Ok(page),
1958 }
1959 }));
1960 self.poll_next(cx)
1961 }
1962 }
1963 }
1964 }
1965}
1966
1967type PageInfoRequest = BoxFuture<'static, Result<StreamInfoPage, RequestError>>;
1968
1969pub type StreamsErrorKind = RequestErrorKind;
1970pub type StreamsError = RequestError;
1971
1972pub struct Streams {
1973 context: Context,
1974 offset: usize,
1975 page_request: Option<PageInfoRequest>,
1976 streams: Vec<super::stream::Info>,
1977 done: bool,
1978}
1979
1980impl futures_util::Stream for Streams {
1981 type Item = Result<super::stream::Info, StreamsError>;
1982
1983 fn poll_next(
1984 mut self: Pin<&mut Self>,
1985 cx: &mut std::task::Context<'_>,
1986 ) -> std::task::Poll<Option<Self::Item>> {
1987 match self.page_request.as_mut() {
1988 Some(page) => match page.try_poll_unpin(cx) {
1989 std::task::Poll::Ready(page) => {
1990 self.page_request = None;
1991 let page = page
1992 .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?;
1993 if let Some(streams) = page.streams {
1994 self.offset += streams.len();
1995 self.streams = streams;
1996 if self.offset >= page.total {
1997 self.done = true;
1998 }
1999 match self.streams.pop() {
2000 Some(stream) => Poll::Ready(Some(Ok(stream))),
2001 None => Poll::Ready(None),
2002 }
2003 } else {
2004 Poll::Ready(None)
2005 }
2006 }
2007 std::task::Poll::Pending => std::task::Poll::Pending,
2008 },
2009 None => {
2010 if let Some(stream) = self.streams.pop() {
2011 Poll::Ready(Some(Ok(stream)))
2012 } else {
2013 if self.done {
2014 return Poll::Ready(None);
2015 }
2016 let context = self.context.clone();
2017 let offset = self.offset;
2018 self.page_request = Some(Box::pin(async move {
2019 match context
2020 .request(
2021 "STREAM.LIST",
2022 &json!({
2023 "offset": offset,
2024 }),
2025 )
2026 .await?
2027 {
2028 Response::Err { error } => {
2029 Err(RequestError::with_source(RequestErrorKind::Other, error))
2030 }
2031 Response::Ok(page) => Ok(page),
2032 }
2033 }));
2034 self.poll_next(cx)
2035 }
2036 }
2037 }
2038 }
2039}
2040
2041#[deprecated(
2044 note = "use jetstream::message::PublishMessage instead",
2045 since = "0.44.0"
2046)]
2047pub type Publish = super::message::PublishMessage;
2048
2049#[derive(Clone, Copy, Debug, PartialEq)]
2050pub enum RequestErrorKind {
2051 NoResponders,
2052 TimedOut,
2053 Other,
2054}
2055
2056impl Display for RequestErrorKind {
2057 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2058 match self {
2059 Self::TimedOut => write!(f, "timed out"),
2060 Self::Other => write!(f, "request failed"),
2061 Self::NoResponders => write!(f, "requested JetStream resource does not exist"),
2062 }
2063 }
2064}
2065
2066pub type RequestError = Error<RequestErrorKind>;
2067
2068impl From<crate::RequestError> for RequestError {
2069 fn from(error: crate::RequestError) -> Self {
2070 match error.kind() {
2071 crate::RequestErrorKind::TimedOut => {
2072 RequestError::with_source(RequestErrorKind::TimedOut, error)
2073 }
2074 crate::RequestErrorKind::NoResponders => {
2075 RequestError::new(RequestErrorKind::NoResponders)
2076 }
2077 crate::RequestErrorKind::Other => {
2078 RequestError::with_source(RequestErrorKind::Other, error)
2079 }
2080 }
2081 }
2082}
2083
2084impl From<super::errors::Error> for RequestError {
2085 fn from(err: super::errors::Error) -> Self {
2086 RequestError::with_source(RequestErrorKind::Other, err)
2087 }
2088}
2089
2090pub type ConsumerInfoError = Error<ConsumerInfoErrorKind>;
2091
2092#[derive(Clone, Debug, PartialEq)]
2093pub enum ConsumerInfoErrorKind {
2094 InvalidName,
2095 Offline,
2096 NotFound,
2097 StreamNotFound,
2098 Request,
2099 JetStream(super::errors::Error),
2100 TimedOut,
2101 NoResponders,
2102}
2103
2104impl Display for ConsumerInfoErrorKind {
2105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2106 match self {
2107 Self::InvalidName => write!(f, "invalid consumer name"),
2108 Self::Offline => write!(f, "consumer is offline"),
2109 Self::NotFound => write!(f, "consumer not found"),
2110 Self::StreamNotFound => write!(f, "stream not found"),
2111 Self::Request => write!(f, "request error"),
2112 Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2113 Self::TimedOut => write!(f, "timed out"),
2114 Self::NoResponders => write!(f, "no responders"),
2115 }
2116 }
2117}
2118
2119impl From<super::errors::Error> for ConsumerInfoError {
2120 fn from(error: super::errors::Error) -> Self {
2121 match error.error_code() {
2122 ErrorCode::CONSUMER_NOT_FOUND => {
2123 ConsumerInfoError::new(ConsumerInfoErrorKind::NotFound)
2124 }
2125 ErrorCode::STREAM_NOT_FOUND => {
2126 ConsumerInfoError::new(ConsumerInfoErrorKind::StreamNotFound)
2127 }
2128 ErrorCode::CONSUMER_OFFLINE => ConsumerInfoError::new(ConsumerInfoErrorKind::Offline),
2129 _ => ConsumerInfoError::new(ConsumerInfoErrorKind::JetStream(error)),
2130 }
2131 }
2132}
2133
2134impl From<RequestError> for ConsumerInfoError {
2135 fn from(error: RequestError) -> Self {
2136 match error.kind() {
2137 RequestErrorKind::TimedOut => ConsumerInfoError::new(ConsumerInfoErrorKind::TimedOut),
2138 RequestErrorKind::Other => {
2139 ConsumerInfoError::with_source(ConsumerInfoErrorKind::Request, error)
2140 }
2141 RequestErrorKind::NoResponders => {
2142 ConsumerInfoError::new(ConsumerInfoErrorKind::NoResponders)
2143 }
2144 }
2145 }
2146}
2147
2148#[derive(Clone, Debug, PartialEq)]
2149pub enum CreateStreamErrorKind {
2150 EmptyStreamName,
2151 InvalidStreamName,
2152 DomainAndExternalSet,
2153 JetStreamUnavailable,
2154 JetStream(super::errors::Error),
2155 TimedOut,
2156 Response,
2157 NotFound,
2158 ResponseParse,
2159}
2160
2161impl Display for CreateStreamErrorKind {
2162 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2163 match self {
2164 Self::EmptyStreamName => write!(f, "stream name cannot be empty"),
2165 Self::InvalidStreamName => write!(f, "stream name cannot contain `.`, `_`"),
2166 Self::DomainAndExternalSet => write!(f, "domain and external are both set"),
2167 Self::NotFound => write!(f, "stream not found"),
2168 Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2169 Self::TimedOut => write!(f, "jetstream request timed out"),
2170 Self::JetStreamUnavailable => write!(f, "jetstream unavailable"),
2171 Self::ResponseParse => write!(f, "failed to parse server response"),
2172 Self::Response => write!(f, "response error"),
2173 }
2174 }
2175}
2176
2177pub type CreateStreamError = Error<CreateStreamErrorKind>;
2178
2179impl From<super::errors::Error> for CreateStreamError {
2180 fn from(error: super::errors::Error) -> Self {
2181 match error.kind() {
2182 super::errors::ErrorCode::STREAM_NOT_FOUND => {
2183 CreateStreamError::new(CreateStreamErrorKind::NotFound)
2184 }
2185 _ => CreateStreamError::new(CreateStreamErrorKind::JetStream(error)),
2186 }
2187 }
2188}
2189
2190impl From<RequestError> for CreateStreamError {
2191 fn from(error: RequestError) -> Self {
2192 match error.kind() {
2193 RequestErrorKind::NoResponders => {
2194 CreateStreamError::new(CreateStreamErrorKind::JetStreamUnavailable)
2195 }
2196 RequestErrorKind::TimedOut => CreateStreamError::new(CreateStreamErrorKind::TimedOut),
2197 RequestErrorKind::Other => {
2198 CreateStreamError::with_source(CreateStreamErrorKind::Response, error)
2199 }
2200 }
2201 }
2202}
2203
2204#[derive(Clone, Debug, PartialEq)]
2205pub enum GetStreamErrorKind {
2206 EmptyName,
2207 Request,
2208 InvalidStreamName,
2209 JetStream(super::errors::Error),
2210}
2211
2212impl Display for GetStreamErrorKind {
2213 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2214 match self {
2215 Self::EmptyName => write!(f, "empty name cannot be empty"),
2216 Self::Request => write!(f, "request error"),
2217 Self::InvalidStreamName => write!(f, "invalid stream name"),
2218 Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2219 }
2220 }
2221}
2222
2223#[derive(Clone, Debug, PartialEq)]
2224pub enum GetStreamByNameErrorKind {
2225 Request,
2226 NotFound,
2227 InvalidSubject,
2228 JetStream(super::errors::Error),
2229}
2230
2231impl Display for GetStreamByNameErrorKind {
2232 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2233 match self {
2234 Self::Request => write!(f, "request error"),
2235 Self::NotFound => write!(f, "stream not found"),
2236 Self::InvalidSubject => write!(f, "invalid subject"),
2237 Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2238 }
2239 }
2240}
2241
2242pub type GetStreamError = Error<GetStreamErrorKind>;
2243pub type GetStreamByNameError = Error<GetStreamByNameErrorKind>;
2244
2245pub type UpdateStreamError = CreateStreamError;
2246pub type UpdateStreamErrorKind = CreateStreamErrorKind;
2247pub type DeleteStreamError = GetStreamError;
2248pub type DeleteStreamErrorKind = GetStreamErrorKind;
2249
2250#[derive(Clone, Copy, Debug, PartialEq)]
2251pub enum KeyValueErrorKind {
2252 InvalidStoreName,
2253 GetBucket,
2254 JetStream,
2255}
2256
2257impl Display for KeyValueErrorKind {
2258 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2259 match self {
2260 Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
2261 Self::GetBucket => write!(f, "failed to get the bucket"),
2262 Self::JetStream => write!(f, "JetStream error"),
2263 }
2264 }
2265}
2266
2267pub type KeyValueError = Error<KeyValueErrorKind>;
2268
2269#[derive(Clone, Copy, Debug, PartialEq)]
2270pub enum CreateKeyValueErrorKind {
2271 InvalidStoreName,
2272 TooLongHistory,
2273 JetStream,
2274 BucketCreate,
2275 TimedOut,
2276 LimitMarkersNotSupported,
2277}
2278
2279impl Display for CreateKeyValueErrorKind {
2280 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2281 match self {
2282 Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
2283 Self::TooLongHistory => write!(f, "too long history"),
2284 Self::JetStream => write!(f, "JetStream error"),
2285 Self::BucketCreate => write!(f, "bucket creation failed"),
2286 Self::TimedOut => write!(f, "timed out"),
2287 Self::LimitMarkersNotSupported => {
2288 write!(f, "limit markers not supported")
2289 }
2290 }
2291 }
2292}
2293
2294#[derive(Clone, Copy, Debug, PartialEq)]
2295pub enum UpdateKeyValueErrorKind {
2296 InvalidStoreName,
2297 TooLongHistory,
2298 JetStream,
2299 BucketUpdate,
2300 TimedOut,
2301 LimitMarkersNotSupported,
2302 NotFound,
2303}
2304
2305impl Display for UpdateKeyValueErrorKind {
2306 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2307 match self {
2308 Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
2309 Self::TooLongHistory => write!(f, "too long history"),
2310 Self::JetStream => write!(f, "JetStream error"),
2311 Self::BucketUpdate => write!(f, "bucket creation failed"),
2312 Self::TimedOut => write!(f, "timed out"),
2313 Self::LimitMarkersNotSupported => {
2314 write!(f, "limit markers not supported")
2315 }
2316 Self::NotFound => write!(f, "bucket does not exist"),
2317 }
2318 }
2319}
2320pub type CreateKeyValueError = Error<CreateKeyValueErrorKind>;
2321pub type UpdateKeyValueError = Error<UpdateKeyValueErrorKind>;
2322
2323pub type CreateObjectStoreError = CreateKeyValueError;
2324pub type CreateObjectStoreErrorKind = CreateKeyValueErrorKind;
2325
2326#[derive(Clone, Copy, Debug, PartialEq)]
2327pub enum ObjectStoreErrorKind {
2328 InvalidBucketName,
2329 GetStore,
2330}
2331
2332impl Display for ObjectStoreErrorKind {
2333 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2334 match self {
2335 Self::InvalidBucketName => write!(f, "invalid Object Store bucket name"),
2336 Self::GetStore => write!(f, "failed to get Object Store"),
2337 }
2338 }
2339}
2340
2341pub type ObjectStoreError = Error<ObjectStoreErrorKind>;
2342
2343pub type DeleteObjectStore = ObjectStoreError;
2344pub type DeleteObjectStoreKind = ObjectStoreErrorKind;
2345
2346#[derive(Clone, Debug, PartialEq)]
2347pub enum AccountErrorKind {
2348 TimedOut,
2349 JetStream(super::errors::Error),
2350 JetStreamUnavailable,
2351 Other,
2352}
2353
2354impl Display for AccountErrorKind {
2355 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2356 match self {
2357 Self::TimedOut => write!(f, "timed out"),
2358 Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2359 Self::Other => write!(f, "error"),
2360 Self::JetStreamUnavailable => write!(f, "JetStream unavailable"),
2361 }
2362 }
2363}
2364
2365pub type AccountError = Error<AccountErrorKind>;
2366
2367impl From<RequestError> for AccountError {
2368 fn from(err: RequestError) -> Self {
2369 match err.kind {
2370 RequestErrorKind::NoResponders => {
2371 AccountError::with_source(AccountErrorKind::JetStreamUnavailable, err)
2372 }
2373 RequestErrorKind::TimedOut => AccountError::new(AccountErrorKind::TimedOut),
2374 RequestErrorKind::Other => AccountError::with_source(AccountErrorKind::Other, err),
2375 }
2376 }
2377}
2378
2379#[derive(Clone, Debug, Serialize)]
2380enum ConsumerAction {
2381 #[serde(rename = "")]
2382 CreateOrUpdate,
2383 #[serde(rename = "create")]
2384 #[cfg(feature = "server_2_10")]
2385 Create,
2386 #[serde(rename = "update")]
2387 #[cfg(feature = "server_2_10")]
2388 Update,
2389}
2390
2391fn map_to_kv(stream: super::stream::Stream, prefix: String, bucket: String) -> Store {
2393 let mut store = Store {
2394 prefix: format!("$KV.{}.", bucket.as_str()),
2395 name: bucket,
2396 stream: stream.clone(),
2397 stream_name: stream.info.config.name.clone(),
2398 put_prefix: None,
2399 use_jetstream_prefix: prefix != "$JS.API",
2400 };
2401 if let Some(ref mirror) = stream.info.config.mirror {
2402 let bucket = mirror.name.trim_start_matches("KV_");
2403 if let Some(ref external) = mirror.external {
2404 if !external.api_prefix.is_empty() {
2405 store.use_jetstream_prefix = false;
2406 store.prefix = format!("$KV.{bucket}.");
2407 store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
2408 } else {
2409 store.put_prefix = Some(format!("$KV.{bucket}."));
2410 }
2411 }
2412 };
2413 store
2414}
2415
2416enum KvToStreamConfigError {
2417 TooLongHistory,
2418 #[allow(dead_code)]
2419 LimitMarkersNotSupported,
2420}
2421
2422impl From<KvToStreamConfigError> for CreateKeyValueError {
2423 fn from(err: KvToStreamConfigError) -> Self {
2424 match err {
2425 KvToStreamConfigError::TooLongHistory => {
2426 CreateKeyValueError::new(CreateKeyValueErrorKind::TooLongHistory)
2427 }
2428 KvToStreamConfigError::LimitMarkersNotSupported => {
2429 CreateKeyValueError::new(CreateKeyValueErrorKind::LimitMarkersNotSupported)
2430 }
2431 }
2432 }
2433}
2434
2435impl From<KvToStreamConfigError> for UpdateKeyValueError {
2436 fn from(err: KvToStreamConfigError) -> Self {
2437 match err {
2438 KvToStreamConfigError::TooLongHistory => {
2439 UpdateKeyValueError::new(UpdateKeyValueErrorKind::TooLongHistory)
2440 }
2441 KvToStreamConfigError::LimitMarkersNotSupported => {
2442 UpdateKeyValueError::new(UpdateKeyValueErrorKind::LimitMarkersNotSupported)
2443 }
2444 }
2445 }
2446}
2447
2448fn kv_to_stream_config(
2450 config: kv::Config,
2451 _account: Account,
2452) -> Result<super::stream::Config, KvToStreamConfigError> {
2453 let history = if config.history > 0 {
2454 if config.history > MAX_HISTORY {
2455 return Err(KvToStreamConfigError::TooLongHistory);
2456 }
2457 config.history
2458 } else {
2459 1
2460 };
2461
2462 let num_replicas = if config.num_replicas == 0 {
2463 1
2464 } else {
2465 config.num_replicas
2466 };
2467
2468 #[cfg(feature = "server_2_11")]
2469 let (mut allow_message_ttl, mut subject_delete_marker_ttl) = (false, None);
2470
2471 #[cfg(feature = "server_2_11")]
2472 if let Some(duration) = config.limit_markers {
2473 if _account.requests.level < 1 {
2474 return Err(KvToStreamConfigError::LimitMarkersNotSupported);
2475 }
2476 allow_message_ttl = true;
2477 subject_delete_marker_ttl = Some(duration);
2478 }
2479
2480 let mut mirror = config.mirror.clone();
2481 let mut sources = config.sources.clone();
2482 let mut mirror_direct = config.mirror_direct;
2483
2484 let mut subjects = Vec::new();
2485 if let Some(ref mut mirror) = mirror {
2486 if !mirror.name.starts_with("KV_") {
2487 mirror.name = format!("KV_{}", mirror.name);
2488 }
2489 mirror_direct = true;
2490 } else if let Some(ref mut sources) = sources {
2491 for source in sources {
2492 if !source.name.starts_with("KV_") {
2493 source.name = format!("KV_{}", source.name);
2494 }
2495 }
2496 } else {
2497 subjects = vec![format!("$KV.{}.>", config.bucket)];
2498 }
2499
2500 Ok(stream::Config {
2501 name: format!("KV_{}", config.bucket),
2502 description: Some(config.description),
2503 subjects,
2504 max_messages_per_subject: history,
2505 max_bytes: config.max_bytes,
2506 max_age: config.max_age,
2507 max_message_size: config.max_value_size,
2508 storage: config.storage,
2509 republish: config.republish,
2510 allow_rollup: true,
2511 deny_delete: true,
2512 deny_purge: false,
2513 allow_direct: true,
2514 sources,
2515 mirror,
2516 num_replicas,
2517 discard: stream::DiscardPolicy::New,
2518 mirror_direct,
2519 #[cfg(feature = "server_2_10")]
2520 compression: if config.compression {
2521 Some(stream::Compression::S2)
2522 } else {
2523 None
2524 },
2525 placement: config.placement,
2526 #[cfg(feature = "server_2_11")]
2527 allow_message_ttl,
2528 #[cfg(feature = "server_2_11")]
2529 subject_delete_marker_ttl,
2530 ..Default::default()
2531 })
2532}