1use crate::error::Error;
17use crate::header::{IntoHeaderName, IntoHeaderValue};
18use crate::jetstream::account::Account;
19use crate::jetstream::publish::PublishAck;
20use crate::jetstream::response::Response;
21use crate::subject::ToSubject;
22use crate::{
23 header, is_valid_subject, Client, Command, HeaderMap, HeaderValue, Message, StatusCode,
24};
25use bytes::Bytes;
26use futures_util::future::BoxFuture;
27use futures_util::{Future, StreamExt, TryFutureExt};
28use serde::de::DeserializeOwned;
29use serde::{Deserialize, Serialize};
30use serde_json::{self, json};
31use std::borrow::Borrow;
32use std::fmt::Debug;
33use std::fmt::Display;
34use std::future::IntoFuture;
35use std::pin::Pin;
36use std::str::from_utf8;
37use std::sync::Arc;
38use std::task::Poll;
39use tokio::sync::{mpsc, oneshot, OwnedSemaphorePermit, TryAcquireError};
40use tokio::time::Duration;
41use tokio_stream::wrappers::ReceiverStream;
42use tracing::debug;
43
44use super::consumer::{self, Consumer, FromConsumer, IntoConsumerConfig};
45use super::errors::ErrorCode;
46use super::kv::{Store, MAX_HISTORY};
47use super::object_store::{is_valid_bucket_name, ObjectStore};
48use super::stream::{
49 self, Config, ConsumerError, ConsumerErrorKind, DeleteStatus, DiscardPolicy, External, Info,
50 Stream,
51};
52#[cfg(feature = "server_2_10")]
53use super::stream::{Compression, ConsumerCreateStrictError, ConsumerUpdateError};
54use super::{is_valid_name, kv};
55
56#[derive(Debug, Clone)]
58pub struct Context {
59 pub(crate) client: Client,
60 pub(crate) prefix: String,
61 pub(crate) timeout: Duration,
62 pub(crate) max_ack_semaphore: Arc<tokio::sync::Semaphore>,
63 pub(crate) ack_sender:
64 tokio::sync::mpsc::Sender<(oneshot::Receiver<Message>, OwnedSemaphorePermit)>,
65 pub(crate) backpressure_on_inflight: bool,
66 pub(crate) semaphore_capacity: usize,
67}
68
69fn spawn_acker(
70 rx: ReceiverStream<(oneshot::Receiver<Message>, OwnedSemaphorePermit)>,
71 ack_timeout: Duration,
72 concurrency: Option<usize>,
73) -> tokio::task::JoinHandle<()> {
74 tokio::spawn(async move {
75 rx.for_each_concurrent(concurrency, |(subscription, permit)| async move {
76 tokio::time::timeout(ack_timeout, subscription).await.ok();
77 drop(permit);
78 })
79 .await;
80 debug!("Acker task exited");
81 })
82}
83
84use std::marker::PhantomData;
85
86#[derive(Debug, Default)]
87pub struct Yes;
88#[derive(Debug, Default)]
89pub struct No;
90
91pub trait ToAssign: Debug {}
92
93impl ToAssign for Yes {}
94impl ToAssign for No {}
95
96pub struct ContextBuilder<PREFIX: ToAssign> {
115 prefix: String,
116 timeout: Duration,
117 semaphore_capacity: usize,
118 ack_timeout: Duration,
119 backpressure_on_inflight: bool,
120 concurrency_limit: Option<usize>,
121 _phantom: PhantomData<PREFIX>,
122}
123
124impl Default for ContextBuilder<Yes> {
125 fn default() -> Self {
126 ContextBuilder {
127 prefix: "$JS.API".to_string(),
128 timeout: Duration::from_secs(5),
129 semaphore_capacity: 5_000,
130 ack_timeout: Duration::from_secs(30),
131 backpressure_on_inflight: true,
132 concurrency_limit: None,
133 _phantom: PhantomData {},
134 }
135 }
136}
137
138impl ContextBuilder<Yes> {
139 pub fn new() -> ContextBuilder<Yes> {
141 ContextBuilder::default()
142 }
143}
144
145impl ContextBuilder<Yes> {
146 pub fn api_prefix<T: Into<String>>(self, prefix: T) -> ContextBuilder<No> {
148 ContextBuilder {
149 prefix: prefix.into(),
150 timeout: self.timeout,
151 semaphore_capacity: self.semaphore_capacity,
152 ack_timeout: self.ack_timeout,
153 backpressure_on_inflight: self.backpressure_on_inflight,
154 concurrency_limit: self.concurrency_limit,
155 _phantom: PhantomData,
156 }
157 }
158
159 pub fn domain<T: Into<String>>(self, domain: T) -> ContextBuilder<No> {
162 ContextBuilder {
163 prefix: format!("$JS.{}.API", domain.into()),
164 timeout: self.timeout,
165 semaphore_capacity: self.semaphore_capacity,
166 ack_timeout: self.ack_timeout,
167 backpressure_on_inflight: self.backpressure_on_inflight,
168 concurrency_limit: self.concurrency_limit,
169 _phantom: PhantomData,
170 }
171 }
172}
173
174impl<PREFIX> ContextBuilder<PREFIX>
175where
176 PREFIX: ToAssign,
177{
178 pub fn timeout(self, timeout: Duration) -> ContextBuilder<Yes>
180 where
181 Yes: ToAssign,
182 {
183 ContextBuilder {
184 prefix: self.prefix,
185 timeout,
186 semaphore_capacity: self.semaphore_capacity,
187 ack_timeout: self.ack_timeout,
188 backpressure_on_inflight: self.backpressure_on_inflight,
189 concurrency_limit: self.concurrency_limit,
190 _phantom: PhantomData,
191 }
192 }
193
194 pub fn ack_timeout(self, ack_timeout: Duration) -> ContextBuilder<Yes>
197 where
198 Yes: ToAssign,
199 {
200 ContextBuilder {
201 prefix: self.prefix,
202 timeout: self.timeout,
203 semaphore_capacity: self.semaphore_capacity,
204 ack_timeout,
205 backpressure_on_inflight: self.backpressure_on_inflight,
206 concurrency_limit: self.concurrency_limit,
207 _phantom: PhantomData,
208 }
209 }
210
211 pub fn max_ack_inflight(self, capacity: usize) -> ContextBuilder<Yes>
214 where
215 Yes: ToAssign,
216 {
217 ContextBuilder {
218 prefix: self.prefix,
219 timeout: self.timeout,
220 semaphore_capacity: capacity,
221 ack_timeout: self.ack_timeout,
222 backpressure_on_inflight: self.backpressure_on_inflight,
223 concurrency_limit: self.concurrency_limit,
224 _phantom: PhantomData,
225 }
226 }
227
228 pub fn backpressure_on_inflight(self, enabled: bool) -> ContextBuilder<Yes>
233 where
234 Yes: ToAssign,
235 {
236 ContextBuilder {
237 prefix: self.prefix,
238 timeout: self.timeout,
239 semaphore_capacity: self.semaphore_capacity,
240 ack_timeout: self.ack_timeout,
241 backpressure_on_inflight: enabled,
242 concurrency_limit: self.concurrency_limit,
243 _phantom: PhantomData,
244 }
245 }
246
247 pub fn concurrency_limit(self, limit: Option<usize>) -> ContextBuilder<Yes>
250 where
251 Yes: ToAssign,
252 {
253 ContextBuilder {
254 prefix: self.prefix,
255 timeout: self.timeout,
256 semaphore_capacity: self.semaphore_capacity,
257 ack_timeout: self.ack_timeout,
258 backpressure_on_inflight: self.backpressure_on_inflight,
259 concurrency_limit: limit,
260 _phantom: PhantomData,
261 }
262 }
263
264 pub fn build(self, client: Client) -> Context {
266 let (tx, rx) = tokio::sync::mpsc::channel::<(
267 oneshot::Receiver<Message>,
268 OwnedSemaphorePermit,
269 )>(self.semaphore_capacity);
270 let stream = ReceiverStream::new(rx);
271 spawn_acker(stream, self.ack_timeout, self.concurrency_limit);
272 Context {
273 client,
274 prefix: self.prefix,
275 timeout: self.timeout,
276 max_ack_semaphore: Arc::new(tokio::sync::Semaphore::new(self.semaphore_capacity)),
277 ack_sender: tx,
278 backpressure_on_inflight: self.backpressure_on_inflight,
279 semaphore_capacity: self.semaphore_capacity,
280 }
281 }
282}
283
284impl Context {
285 pub(crate) fn new(client: Client) -> Context {
286 ContextBuilder::default().build(client)
287 }
288
289 pub fn set_timeout(&mut self, timeout: Duration) {
291 self.timeout = timeout
292 }
293
294 pub async fn wait_for_acks(&self) {
300 self.max_ack_semaphore
301 .acquire_many(self.semaphore_capacity as u32)
302 .await
303 .ok();
304 }
305
306 pub(crate) fn with_prefix<T: ToString>(client: Client, prefix: T) -> Context {
308 ContextBuilder::new()
309 .api_prefix(prefix.to_string())
310 .build(client)
311 }
312
313 pub(crate) fn with_domain<T: AsRef<str>>(client: Client, domain: T) -> Context {
315 ContextBuilder::new().domain(domain.as_ref()).build(client)
316 }
317
318 pub async fn publish<S: ToSubject>(
359 &self,
360 subject: S,
361 payload: Bytes,
362 ) -> Result<PublishAckFuture, PublishError> {
363 self.send_publish(subject, Publish::build().payload(payload))
364 .await
365 }
366
367 pub async fn publish_with_headers<S: ToSubject>(
389 &self,
390 subject: S,
391 headers: crate::header::HeaderMap,
392 payload: Bytes,
393 ) -> Result<PublishAckFuture, PublishError> {
394 self.send_publish(subject, Publish::build().payload(payload).headers(headers))
395 .await
396 }
397
398 pub async fn send_publish<S: ToSubject>(
421 &self,
422 subject: S,
423 publish: Publish,
424 ) -> Result<PublishAckFuture, PublishError> {
425 let permit = if self.backpressure_on_inflight {
426 self.max_ack_semaphore
428 .clone()
429 .acquire_owned()
430 .await
431 .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err))?
432 } else {
433 self.max_ack_semaphore
435 .clone()
436 .try_acquire_owned()
437 .map_err(|err| match err {
438 TryAcquireError::NoPermits => {
439 PublishError::new(PublishErrorKind::MaxAckPending)
440 }
441 _ => PublishError::with_source(PublishErrorKind::Other, err),
442 })?
443 };
444 let subject = subject.to_subject();
445 let (sender, receiver) = oneshot::channel();
446
447 let respond = self.client.new_inbox().into();
448
449 let send_fut = self
450 .client
451 .sender
452 .send(Command::Request {
453 subject,
454 payload: publish.payload,
455 respond,
456 headers: publish.headers,
457 sender,
458 })
459 .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err));
460
461 tokio::time::timeout(self.timeout, send_fut)
462 .map_err(|_elapsed| PublishError::new(PublishErrorKind::TimedOut))
463 .await??;
464
465 Ok(PublishAckFuture {
466 timeout: self.timeout,
467 subscription: Some(receiver),
468 permit: Some(permit),
469 tx: self.ack_sender.clone(),
470 })
471 }
472
473 pub async fn query_account(&self) -> Result<Account, AccountError> {
475 let response: Response<Account> = self.request("INFO", b"").await?;
476
477 match response {
478 Response::Err { error } => Err(AccountError::new(AccountErrorKind::JetStream(error))),
479 Response::Ok(account) => Ok(account),
480 }
481 }
482
483 pub async fn create_stream<S>(
508 &self,
509 stream_config: S,
510 ) -> Result<Stream<Info>, CreateStreamError>
511 where
512 Config: From<S>,
513 {
514 let mut config: Config = stream_config.into();
515 if config.name.is_empty() {
516 return Err(CreateStreamError::new(
517 CreateStreamErrorKind::EmptyStreamName,
518 ));
519 }
520 if !is_valid_name(config.name.as_str()) {
521 return Err(CreateStreamError::new(
522 CreateStreamErrorKind::InvalidStreamName,
523 ));
524 }
525 if let Some(ref mut mirror) = config.mirror {
526 if let Some(ref mut domain) = mirror.domain {
527 if mirror.external.is_some() {
528 return Err(CreateStreamError::new(
529 CreateStreamErrorKind::DomainAndExternalSet,
530 ));
531 }
532 mirror.external = Some(External {
533 api_prefix: format!("$JS.{domain}.API"),
534 delivery_prefix: None,
535 })
536 }
537 }
538
539 if let Some(ref mut sources) = config.sources {
540 for source in sources {
541 if let Some(ref mut domain) = source.domain {
542 if source.external.is_some() {
543 return Err(CreateStreamError::new(
544 CreateStreamErrorKind::DomainAndExternalSet,
545 ));
546 }
547 source.external = Some(External {
548 api_prefix: format!("$JS.{domain}.API"),
549 delivery_prefix: None,
550 })
551 }
552 }
553 }
554 let subject = format!("STREAM.CREATE.{}", config.name);
555 let response: Response<Info> = self.request(subject, &config).await?;
556
557 match response {
558 Response::Err { error } => Err(error.into()),
559 Response::Ok(info) => Ok(Stream {
560 context: self.clone(),
561 info,
562 name: config.name,
563 }),
564 }
565 }
566
567 pub async fn get_stream_no_info<T: AsRef<str>>(
587 &self,
588 stream: T,
589 ) -> Result<Stream<()>, GetStreamError> {
590 let stream = stream.as_ref();
591 if stream.is_empty() {
592 return Err(GetStreamError::new(GetStreamErrorKind::EmptyName));
593 }
594
595 if !is_valid_name(stream) {
596 return Err(GetStreamError::new(GetStreamErrorKind::InvalidStreamName));
597 }
598
599 Ok(Stream {
600 context: self.clone(),
601 info: (),
602 name: stream.to_string(),
603 })
604 }
605
606 pub async fn get_stream<T: AsRef<str>>(&self, stream: T) -> Result<Stream, GetStreamError> {
622 let stream = stream.as_ref();
623 if stream.is_empty() {
624 return Err(GetStreamError::new(GetStreamErrorKind::EmptyName));
625 }
626
627 if !is_valid_name(stream) {
628 return Err(GetStreamError::new(GetStreamErrorKind::InvalidStreamName));
629 }
630
631 let subject = format!("STREAM.INFO.{stream}");
632 let request: Response<Info> = self
633 .request(subject, &())
634 .await
635 .map_err(|err| GetStreamError::with_source(GetStreamErrorKind::Request, err))?;
636 match request {
637 Response::Err { error } => {
638 Err(GetStreamError::new(GetStreamErrorKind::JetStream(error)))
639 }
640 Response::Ok(info) => Ok(Stream {
641 context: self.clone(),
642 info,
643 name: stream.to_string(),
644 }),
645 }
646 }
647
648 pub async fn get_or_create_stream<S>(
672 &self,
673 stream_config: S,
674 ) -> Result<Stream, CreateStreamError>
675 where
676 S: Into<Config>,
677 {
678 let config: Config = stream_config.into();
679
680 if config.name.is_empty() {
681 return Err(CreateStreamError::new(
682 CreateStreamErrorKind::EmptyStreamName,
683 ));
684 }
685
686 if !is_valid_name(config.name.as_str()) {
687 return Err(CreateStreamError::new(
688 CreateStreamErrorKind::InvalidStreamName,
689 ));
690 }
691 let subject = format!("STREAM.INFO.{}", config.name);
692
693 let request: Response<Info> = self.request(subject, &()).await?;
694 match request {
695 Response::Err { error } if error.code() == 404 => self.create_stream(&config).await,
696 Response::Err { error } => Err(error.into()),
697 Response::Ok(info) => Ok(Stream {
698 context: self.clone(),
699 info,
700 name: config.name,
701 }),
702 }
703 }
704
705 pub async fn delete_stream<T: AsRef<str>>(
721 &self,
722 stream: T,
723 ) -> Result<DeleteStatus, DeleteStreamError> {
724 let stream = stream.as_ref();
725 if stream.is_empty() {
726 return Err(DeleteStreamError::new(DeleteStreamErrorKind::EmptyName));
727 }
728
729 if !is_valid_name(stream) {
730 return Err(DeleteStreamError::new(
731 DeleteStreamErrorKind::InvalidStreamName,
732 ));
733 }
734
735 let subject = format!("STREAM.DELETE.{stream}");
736 match self
737 .request(subject, &json!({}))
738 .await
739 .map_err(|err| DeleteStreamError::with_source(DeleteStreamErrorKind::Request, err))?
740 {
741 Response::Err { error } => Err(DeleteStreamError::new(
742 DeleteStreamErrorKind::JetStream(error),
743 )),
744 Response::Ok(delete_response) => Ok(delete_response),
745 }
746 }
747
748 pub async fn update_stream<S>(&self, config: S) -> Result<Info, UpdateStreamError>
773 where
774 S: Borrow<Config>,
775 {
776 let config = config.borrow();
777
778 if config.name.is_empty() {
779 return Err(CreateStreamError::new(
780 CreateStreamErrorKind::EmptyStreamName,
781 ));
782 }
783
784 if !is_valid_name(config.name.as_str()) {
785 return Err(CreateStreamError::new(
786 CreateStreamErrorKind::InvalidStreamName,
787 ));
788 }
789
790 let subject = format!("STREAM.UPDATE.{}", config.name);
791 match self.request(subject, config).await? {
792 Response::Err { error } => Err(error.into()),
793 Response::Ok(info) => Ok(info),
794 }
795 }
796
797 pub async fn create_or_update_stream(&self, config: Config) -> Result<Info, CreateStreamError> {
821 match self.update_stream(config.clone()).await {
822 Ok(stream) => Ok(stream),
823 Err(err) => match err.kind() {
824 CreateStreamErrorKind::NotFound => {
825 let stream = self
826 .create_stream(config)
827 .await
828 .map_err(|err| CreateStreamError::with_source(err.kind(), err))?;
829 Ok(stream.info)
830 }
831 _ => Err(err),
832 },
833 }
834 }
835
836 pub async fn stream_by_subject<T: Into<String>>(
851 &self,
852 subject: T,
853 ) -> Result<String, GetStreamByNameError> {
854 let subject = subject.into();
855 if !is_valid_subject(subject.as_str()) {
856 return Err(GetStreamByNameError::new(
857 GetStreamByNameErrorKind::InvalidSubject,
858 ));
859 }
860 let mut names = StreamNames {
861 context: self.clone(),
862 offset: 0,
863 page_request: None,
864 streams: Vec::new(),
865 subject: Some(subject),
866 done: false,
867 };
868 match names.next().await {
869 Some(name) => match name {
870 Ok(name) => Ok(name),
871 Err(err) => Err(GetStreamByNameError::with_source(
872 GetStreamByNameErrorKind::Request,
873 err,
874 )),
875 },
876 None => Err(GetStreamByNameError::new(
877 GetStreamByNameErrorKind::NotFound,
878 )),
879 }
880 }
881
882 pub fn stream_names(&self) -> StreamNames {
900 StreamNames {
901 context: self.clone(),
902 offset: 0,
903 page_request: None,
904 streams: Vec::new(),
905 subject: None,
906 done: false,
907 }
908 }
909
910 pub fn streams(&self) -> Streams {
928 Streams {
929 context: self.clone(),
930 offset: 0,
931 page_request: None,
932 streams: Vec::new(),
933 done: false,
934 }
935 }
936 pub async fn get_key_value<T: Into<String>>(&self, bucket: T) -> Result<Store, KeyValueError> {
950 let bucket: String = bucket.into();
951 if !crate::jetstream::kv::is_valid_bucket_name(&bucket) {
952 return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
953 }
954
955 let stream_name = format!("KV_{}", &bucket);
956 let stream = self
957 .get_stream(stream_name.clone())
958 .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::GetBucket, err))
959 .await?;
960
961 if stream.info.config.max_messages_per_subject < 1 {
962 return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
963 }
964 let mut store = Store {
965 prefix: format!("$KV.{}.", &bucket),
966 name: bucket,
967 stream_name,
968 stream: stream.clone(),
969 put_prefix: None,
970 use_jetstream_prefix: self.prefix != "$JS.API",
971 };
972 if let Some(ref mirror) = stream.info.config.mirror {
973 let bucket = mirror.name.trim_start_matches("KV_");
974 if let Some(ref external) = mirror.external {
975 if !external.api_prefix.is_empty() {
976 store.use_jetstream_prefix = false;
977 store.prefix = format!("$KV.{bucket}.");
978 store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
979 } else {
980 store.put_prefix = Some(format!("$KV.{bucket}."));
981 }
982 }
983 };
984
985 Ok(store)
986 }
987
988 pub async fn create_key_value(
1008 &self,
1009 config: crate::jetstream::kv::Config,
1010 ) -> Result<Store, CreateKeyValueError> {
1011 if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1012 return Err(CreateKeyValueError::new(
1013 CreateKeyValueErrorKind::InvalidStoreName,
1014 ));
1015 }
1016 let info = self.query_account().await.map_err(|err| {
1017 CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
1018 })?;
1019
1020 let bucket_name = config.bucket.clone();
1021 let stream_config = kv_to_stream_config(config, info)?;
1022
1023 let stream = self.create_stream(stream_config).await.map_err(|err| {
1024 if err.kind() == CreateStreamErrorKind::TimedOut {
1025 CreateKeyValueError::with_source(CreateKeyValueErrorKind::TimedOut, err)
1026 } else {
1027 CreateKeyValueError::with_source(CreateKeyValueErrorKind::BucketCreate, err)
1028 }
1029 })?;
1030
1031 Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
1032 }
1033
1034 pub async fn update_key_value(
1054 &self,
1055 config: crate::jetstream::kv::Config,
1056 ) -> Result<Store, UpdateKeyValueError> {
1057 if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1058 return Err(UpdateKeyValueError::new(
1059 UpdateKeyValueErrorKind::InvalidStoreName,
1060 ));
1061 }
1062
1063 let stream_name = format!("KV_{}", config.bucket);
1064 let bucket_name = config.bucket.clone();
1065
1066 let account = self.query_account().await.map_err(|err| {
1067 UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::JetStream, err)
1068 })?;
1069 let stream = self
1070 .update_stream(kv_to_stream_config(config, account)?)
1071 .await
1072 .map_err(|err| match err.kind() {
1073 UpdateStreamErrorKind::NotFound => {
1074 UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::NotFound, err)
1075 }
1076 _ => UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::JetStream, err),
1077 })?;
1078
1079 let stream = Stream {
1080 context: self.clone(),
1081 info: stream,
1082 name: stream_name,
1083 };
1084
1085 Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
1086 }
1087
1088 pub async fn create_or_update_key_value(
1108 &self,
1109 config: crate::jetstream::kv::Config,
1110 ) -> Result<Store, CreateKeyValueError> {
1111 if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1112 return Err(CreateKeyValueError::new(
1113 CreateKeyValueErrorKind::InvalidStoreName,
1114 ));
1115 }
1116
1117 let bucket_name = config.bucket.clone();
1118 let stream_name = format!("KV_{}", config.bucket);
1119
1120 let account = self.query_account().await.map_err(|err| {
1121 CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
1122 })?;
1123 let stream = self
1124 .create_or_update_stream(kv_to_stream_config(config, account)?)
1125 .await
1126 .map_err(|err| {
1127 CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
1128 })?;
1129
1130 let stream = Stream {
1131 context: self.clone(),
1132 info: stream,
1133 name: stream_name,
1134 };
1135
1136 Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
1137 }
1138
1139 pub async fn delete_key_value<T: AsRef<str>>(
1159 &self,
1160 bucket: T,
1161 ) -> Result<DeleteStatus, KeyValueError> {
1162 if !crate::jetstream::kv::is_valid_bucket_name(bucket.as_ref()) {
1163 return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
1164 }
1165
1166 let stream_name = format!("KV_{}", bucket.as_ref());
1167 self.delete_stream(stream_name)
1168 .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::JetStream, err))
1169 .await
1170 }
1171
1172 pub async fn get_consumer_from_stream<T, C, S>(
1209 &self,
1210 consumer: C,
1211 stream: S,
1212 ) -> Result<Consumer<T>, ConsumerError>
1213 where
1214 T: FromConsumer + IntoConsumerConfig,
1215 S: AsRef<str>,
1216 C: AsRef<str>,
1217 {
1218 if !is_valid_name(stream.as_ref()) {
1219 return Err(ConsumerError::with_source(
1220 ConsumerErrorKind::InvalidName,
1221 "invalid stream",
1222 ));
1223 }
1224
1225 if !is_valid_name(consumer.as_ref()) {
1226 return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
1227 }
1228
1229 let subject = format!("CONSUMER.INFO.{}.{}", stream.as_ref(), consumer.as_ref());
1230
1231 let info: super::consumer::Info = match self.request(subject, &json!({})).await? {
1232 Response::Ok(info) => info,
1233 Response::Err { error } => return Err(error.into()),
1234 };
1235
1236 Ok(Consumer::new(
1237 T::try_from_consumer_config(info.config.clone()).map_err(|err| {
1238 ConsumerError::with_source(ConsumerErrorKind::InvalidConsumerType, err)
1239 })?,
1240 info,
1241 self.clone(),
1242 ))
1243 }
1244
1245 pub async fn delete_consumer_from_stream<C: AsRef<str>, S: AsRef<str>>(
1268 &self,
1269 consumer: C,
1270 stream: S,
1271 ) -> Result<DeleteStatus, ConsumerError> {
1272 if !is_valid_name(consumer.as_ref()) {
1273 return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
1274 }
1275
1276 if !is_valid_name(stream.as_ref()) {
1277 return Err(ConsumerError::with_source(
1278 ConsumerErrorKind::Other,
1279 "invalid stream name",
1280 ));
1281 }
1282
1283 let subject = format!("CONSUMER.DELETE.{}.{}", stream.as_ref(), consumer.as_ref());
1284
1285 match self.request(subject, &json!({})).await? {
1286 Response::Ok(delete_status) => Ok(delete_status),
1287 Response::Err { error } => Err(error.into()),
1288 }
1289 }
1290
1291 pub async fn create_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
1317 &self,
1318 config: C,
1319 stream: S,
1320 ) -> Result<Consumer<C>, ConsumerError> {
1321 self.create_consumer_on_stream_action(config, stream, ConsumerAction::CreateOrUpdate)
1322 .await
1323 }
1324
1325 #[cfg(feature = "server_2_10")]
1352 pub async fn update_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
1353 &self,
1354 config: C,
1355 stream: S,
1356 ) -> Result<Consumer<C>, ConsumerUpdateError> {
1357 self.create_consumer_on_stream_action(config, stream, ConsumerAction::Update)
1358 .await
1359 .map_err(|err| err.into())
1360 }
1361
1362 #[cfg(feature = "server_2_10")]
1389 pub async fn create_consumer_strict_on_stream<
1390 C: IntoConsumerConfig + FromConsumer,
1391 S: AsRef<str>,
1392 >(
1393 &self,
1394 config: C,
1395 stream: S,
1396 ) -> Result<Consumer<C>, ConsumerCreateStrictError> {
1397 self.create_consumer_on_stream_action(config, stream, ConsumerAction::Create)
1398 .await
1399 .map_err(|err| err.into())
1400 }
1401
1402 async fn create_consumer_on_stream_action<
1403 C: IntoConsumerConfig + FromConsumer,
1404 S: AsRef<str>,
1405 >(
1406 &self,
1407 config: C,
1408 stream: S,
1409 action: ConsumerAction,
1410 ) -> Result<Consumer<C>, ConsumerError> {
1411 let config = config.into_consumer_config();
1412
1413 let subject = {
1414 let filter = if config.filter_subject.is_empty() {
1415 "".to_string()
1416 } else {
1417 format!(".{}", config.filter_subject)
1418 };
1419 config
1420 .name
1421 .as_ref()
1422 .or(config.durable_name.as_ref())
1423 .map(|name| format!("CONSUMER.CREATE.{}.{}{}", stream.as_ref(), name, filter))
1424 .unwrap_or_else(|| format!("CONSUMER.CREATE.{}", stream.as_ref()))
1425 };
1426
1427 match self
1428 .request(
1429 subject,
1430 &json!({"stream_name": stream.as_ref(), "config": config, "action": action}),
1431 )
1432 .await?
1433 {
1434 Response::Err { error } => Err(ConsumerError::new(ConsumerErrorKind::JetStream(error))),
1435 Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
1436 FromConsumer::try_from_consumer_config(info.clone().config)
1437 .map_err(|err| ConsumerError::with_source(ConsumerErrorKind::Other, err))?,
1438 info,
1439 self.clone(),
1440 )),
1441 }
1442 }
1443
1444 pub async fn request<S, T, V>(&self, subject: S, payload: &T) -> Result<V, RequestError>
1464 where
1465 S: ToSubject,
1466 T: ?Sized + Serialize,
1467 V: DeserializeOwned,
1468 {
1469 let subject = subject.to_subject();
1470 let request = serde_json::to_vec(&payload)
1471 .map(Bytes::from)
1472 .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
1473
1474 debug!("JetStream request sent: {:?}", request);
1475
1476 let message = self
1477 .client
1478 .request(format!("{}.{}", self.prefix, subject.as_ref()), request)
1479 .await;
1480 let message = message?;
1481 debug!(
1482 "JetStream request response: {:?}",
1483 from_utf8(&message.payload)
1484 );
1485 let response = serde_json::from_slice(message.payload.as_ref())
1486 .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
1487
1488 Ok(response)
1489 }
1490
1491 pub async fn create_object_store(
1510 &self,
1511 config: super::object_store::Config,
1512 ) -> Result<super::object_store::ObjectStore, CreateObjectStoreError> {
1513 if !super::object_store::is_valid_bucket_name(&config.bucket) {
1514 return Err(CreateObjectStoreError::new(
1515 CreateKeyValueErrorKind::InvalidStoreName,
1516 ));
1517 }
1518
1519 let bucket_name = config.bucket.clone();
1520 let stream_name = format!("OBJ_{bucket_name}");
1521 let chunk_subject = format!("$O.{bucket_name}.C.>");
1522 let meta_subject = format!("$O.{bucket_name}.M.>");
1523
1524 let stream = self
1525 .create_stream(super::stream::Config {
1526 name: stream_name,
1527 description: config.description.clone(),
1528 subjects: vec![chunk_subject, meta_subject],
1529 max_age: config.max_age,
1530 max_bytes: config.max_bytes,
1531 storage: config.storage,
1532 num_replicas: config.num_replicas,
1533 discard: DiscardPolicy::New,
1534 allow_rollup: true,
1535 allow_direct: true,
1536 #[cfg(feature = "server_2_10")]
1537 compression: if config.compression {
1538 Some(Compression::S2)
1539 } else {
1540 None
1541 },
1542 placement: config.placement,
1543 ..Default::default()
1544 })
1545 .await
1546 .map_err(|err| {
1547 CreateObjectStoreError::with_source(CreateKeyValueErrorKind::BucketCreate, err)
1548 })?;
1549
1550 Ok(ObjectStore {
1551 name: bucket_name,
1552 stream,
1553 })
1554 }
1555
1556 pub async fn get_object_store<T: AsRef<str>>(
1570 &self,
1571 bucket_name: T,
1572 ) -> Result<ObjectStore, ObjectStoreError> {
1573 let bucket_name = bucket_name.as_ref();
1574 if !is_valid_bucket_name(bucket_name) {
1575 return Err(ObjectStoreError::new(
1576 ObjectStoreErrorKind::InvalidBucketName,
1577 ));
1578 }
1579 let stream_name = format!("OBJ_{bucket_name}");
1580 let stream = self
1581 .get_stream(stream_name)
1582 .await
1583 .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?;
1584
1585 Ok(ObjectStore {
1586 name: bucket_name.to_string(),
1587 stream,
1588 })
1589 }
1590
1591 pub async fn delete_object_store<T: AsRef<str>>(
1605 &self,
1606 bucket_name: T,
1607 ) -> Result<(), DeleteObjectStore> {
1608 let stream_name = format!("OBJ_{}", bucket_name.as_ref());
1609 self.delete_stream(stream_name)
1610 .await
1611 .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?;
1612 Ok(())
1613 }
1614}
1615
1616#[derive(Clone, Copy, Debug, PartialEq)]
1617pub enum PublishErrorKind {
1618 StreamNotFound,
1619 WrongLastMessageId,
1620 WrongLastSequence,
1621 TimedOut,
1622 BrokenPipe,
1623 MaxAckPending,
1624 Other,
1625}
1626
1627impl Display for PublishErrorKind {
1628 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1629 match self {
1630 Self::StreamNotFound => write!(f, "no stream found for given subject"),
1631 Self::TimedOut => write!(f, "timed out: didn't receive ack in time"),
1632 Self::Other => write!(f, "publish failed"),
1633 Self::BrokenPipe => write!(f, "broken pipe"),
1634 Self::WrongLastMessageId => write!(f, "wrong last message id"),
1635 Self::WrongLastSequence => write!(f, "wrong last sequence"),
1636 Self::MaxAckPending => write!(f, "max ack pending reached"),
1637 }
1638 }
1639}
1640
1641pub type PublishError = Error<PublishErrorKind>;
1642
1643#[derive(Debug)]
1644pub struct PublishAckFuture {
1645 timeout: Duration,
1646 subscription: Option<oneshot::Receiver<Message>>,
1647 permit: Option<OwnedSemaphorePermit>,
1648 tx: mpsc::Sender<(oneshot::Receiver<Message>, OwnedSemaphorePermit)>,
1649}
1650
1651impl Drop for PublishAckFuture {
1652 fn drop(&mut self) {
1653 if let (Some(sub), Some(permit)) = (self.subscription.take(), self.permit.take()) {
1654 if let Err(err) = self.tx.try_send((sub, permit)) {
1655 tracing::warn!("failed to pass future permit to the acker: {}", err);
1656 }
1657 }
1658 }
1659}
1660
1661impl PublishAckFuture {
1662 async fn next_with_timeout(mut self) -> Result<PublishAck, PublishError> {
1663 let next = tokio::time::timeout(self.timeout, self.subscription.take().unwrap())
1664 .await
1665 .map_err(|_| PublishError::new(PublishErrorKind::TimedOut))?;
1666 next.map_or_else(
1667 |_| Err(PublishError::new(PublishErrorKind::BrokenPipe)),
1668 |m| {
1669 if m.status == Some(StatusCode::NO_RESPONDERS) {
1670 return Err(PublishError::new(PublishErrorKind::StreamNotFound));
1671 }
1672 let response = serde_json::from_slice(m.payload.as_ref())
1673 .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err))?;
1674 match response {
1675 Response::Err { error } => match error.error_code() {
1676 ErrorCode::STREAM_WRONG_LAST_MESSAGE_ID => Err(PublishError::with_source(
1677 PublishErrorKind::WrongLastMessageId,
1678 error,
1679 )),
1680 ErrorCode::STREAM_WRONG_LAST_SEQUENCE => Err(PublishError::with_source(
1681 PublishErrorKind::WrongLastSequence,
1682 error,
1683 )),
1684 _ => Err(PublishError::with_source(PublishErrorKind::Other, error)),
1685 },
1686 Response::Ok(publish_ack) => Ok(publish_ack),
1687 }
1688 },
1689 )
1690 }
1691}
1692impl IntoFuture for PublishAckFuture {
1693 type Output = Result<PublishAck, PublishError>;
1694
1695 type IntoFuture = Pin<Box<dyn Future<Output = Result<PublishAck, PublishError>> + Send>>;
1696
1697 fn into_future(self) -> Self::IntoFuture {
1698 Box::pin(std::future::IntoFuture::into_future(
1699 self.next_with_timeout(),
1700 ))
1701 }
1702}
1703
1704#[derive(Deserialize, Debug)]
1705struct StreamPage {
1706 total: usize,
1707 streams: Option<Vec<String>>,
1708}
1709
1710#[derive(Deserialize, Debug)]
1711struct StreamInfoPage {
1712 total: usize,
1713 streams: Option<Vec<super::stream::Info>>,
1714}
1715
1716type PageRequest = BoxFuture<'static, Result<StreamPage, RequestError>>;
1717
1718pub struct StreamNames {
1719 context: Context,
1720 offset: usize,
1721 page_request: Option<PageRequest>,
1722 subject: Option<String>,
1723 streams: Vec<String>,
1724 done: bool,
1725}
1726
1727impl futures_util::Stream for StreamNames {
1728 type Item = Result<String, StreamsError>;
1729
1730 fn poll_next(
1731 mut self: Pin<&mut Self>,
1732 cx: &mut std::task::Context<'_>,
1733 ) -> std::task::Poll<Option<Self::Item>> {
1734 match self.page_request.as_mut() {
1735 Some(page) => match page.try_poll_unpin(cx) {
1736 std::task::Poll::Ready(page) => {
1737 self.page_request = None;
1738 let page = page
1739 .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?;
1740 if let Some(streams) = page.streams {
1741 self.offset += streams.len();
1742 self.streams = streams;
1743 if self.offset >= page.total {
1744 self.done = true;
1745 }
1746 match self.streams.pop() {
1747 Some(stream) => Poll::Ready(Some(Ok(stream))),
1748 None => Poll::Ready(None),
1749 }
1750 } else {
1751 Poll::Ready(None)
1752 }
1753 }
1754 std::task::Poll::Pending => std::task::Poll::Pending,
1755 },
1756 None => {
1757 if let Some(stream) = self.streams.pop() {
1758 Poll::Ready(Some(Ok(stream)))
1759 } else {
1760 if self.done {
1761 return Poll::Ready(None);
1762 }
1763 let context = self.context.clone();
1764 let offset = self.offset;
1765 let subject = self.subject.clone();
1766 self.page_request = Some(Box::pin(async move {
1767 match context
1768 .request(
1769 "STREAM.NAMES",
1770 &json!({
1771 "offset": offset,
1772 "subject": subject
1773 }),
1774 )
1775 .await?
1776 {
1777 Response::Err { error } => {
1778 Err(RequestError::with_source(RequestErrorKind::Other, error))
1779 }
1780 Response::Ok(page) => Ok(page),
1781 }
1782 }));
1783 self.poll_next(cx)
1784 }
1785 }
1786 }
1787 }
1788}
1789
1790type PageInfoRequest = BoxFuture<'static, Result<StreamInfoPage, RequestError>>;
1791
1792pub type StreamsErrorKind = RequestErrorKind;
1793pub type StreamsError = RequestError;
1794
1795pub struct Streams {
1796 context: Context,
1797 offset: usize,
1798 page_request: Option<PageInfoRequest>,
1799 streams: Vec<super::stream::Info>,
1800 done: bool,
1801}
1802
1803impl futures_util::Stream for Streams {
1804 type Item = Result<super::stream::Info, StreamsError>;
1805
1806 fn poll_next(
1807 mut self: Pin<&mut Self>,
1808 cx: &mut std::task::Context<'_>,
1809 ) -> std::task::Poll<Option<Self::Item>> {
1810 match self.page_request.as_mut() {
1811 Some(page) => match page.try_poll_unpin(cx) {
1812 std::task::Poll::Ready(page) => {
1813 self.page_request = None;
1814 let page = page
1815 .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?;
1816 if let Some(streams) = page.streams {
1817 self.offset += streams.len();
1818 self.streams = streams;
1819 if self.offset >= page.total {
1820 self.done = true;
1821 }
1822 match self.streams.pop() {
1823 Some(stream) => Poll::Ready(Some(Ok(stream))),
1824 None => Poll::Ready(None),
1825 }
1826 } else {
1827 Poll::Ready(None)
1828 }
1829 }
1830 std::task::Poll::Pending => std::task::Poll::Pending,
1831 },
1832 None => {
1833 if let Some(stream) = self.streams.pop() {
1834 Poll::Ready(Some(Ok(stream)))
1835 } else {
1836 if self.done {
1837 return Poll::Ready(None);
1838 }
1839 let context = self.context.clone();
1840 let offset = self.offset;
1841 self.page_request = Some(Box::pin(async move {
1842 match context
1843 .request(
1844 "STREAM.LIST",
1845 &json!({
1846 "offset": offset,
1847 }),
1848 )
1849 .await?
1850 {
1851 Response::Err { error } => {
1852 Err(RequestError::with_source(RequestErrorKind::Other, error))
1853 }
1854 Response::Ok(page) => Ok(page),
1855 }
1856 }));
1857 self.poll_next(cx)
1858 }
1859 }
1860 }
1861 }
1862}
1863#[derive(Default, Clone, Debug)]
1865pub struct Publish {
1866 payload: Bytes,
1867 headers: Option<header::HeaderMap>,
1868}
1869impl Publish {
1870 pub fn build() -> Self {
1872 Default::default()
1873 }
1874
1875 pub fn payload(mut self, payload: Bytes) -> Self {
1877 self.payload = payload;
1878 self
1879 }
1880 pub fn headers(mut self, headers: HeaderMap) -> Self {
1882 self.headers = Some(headers);
1883 self
1884 }
1885 pub fn header<N: IntoHeaderName, V: IntoHeaderValue>(mut self, name: N, value: V) -> Self {
1887 self.headers
1888 .get_or_insert(header::HeaderMap::new())
1889 .insert(name, value);
1890 self
1891 }
1892 pub fn message_id<T: AsRef<str>>(self, id: T) -> Self {
1894 self.header(header::NATS_MESSAGE_ID, id.as_ref())
1895 }
1896 pub fn expected_last_message_id<T: AsRef<str>>(self, last_message_id: T) -> Self {
1899 self.header(
1900 header::NATS_EXPECTED_LAST_MESSAGE_ID,
1901 last_message_id.as_ref(),
1902 )
1903 }
1904 pub fn expected_last_sequence(self, last_sequence: u64) -> Self {
1907 self.header(
1908 header::NATS_EXPECTED_LAST_SEQUENCE,
1909 HeaderValue::from(last_sequence),
1910 )
1911 }
1912 pub fn expected_last_subject_sequence(self, subject_sequence: u64) -> Self {
1915 self.header(
1916 header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
1917 HeaderValue::from(subject_sequence),
1918 )
1919 }
1920 pub fn expected_stream<T: AsRef<str>>(self, stream: T) -> Self {
1923 self.header(
1924 header::NATS_EXPECTED_STREAM,
1925 HeaderValue::from(stream.as_ref()),
1926 )
1927 }
1928
1929 #[cfg(feature = "server_2_11")]
1930 pub fn ttl(self, ttl: Duration) -> Self {
1933 self.header(header::NATS_MESSAGE_TTL, ttl.as_secs().to_string())
1934 }
1935}
1936
1937#[derive(Clone, Copy, Debug, PartialEq)]
1938pub enum RequestErrorKind {
1939 NoResponders,
1940 TimedOut,
1941 Other,
1942}
1943
1944impl Display for RequestErrorKind {
1945 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1946 match self {
1947 Self::TimedOut => write!(f, "timed out"),
1948 Self::Other => write!(f, "request failed"),
1949 Self::NoResponders => write!(f, "requested JetStream resource does not exist"),
1950 }
1951 }
1952}
1953
1954pub type RequestError = Error<RequestErrorKind>;
1955
1956impl From<crate::RequestError> for RequestError {
1957 fn from(error: crate::RequestError) -> Self {
1958 match error.kind() {
1959 crate::RequestErrorKind::TimedOut => {
1960 RequestError::with_source(RequestErrorKind::TimedOut, error)
1961 }
1962 crate::RequestErrorKind::NoResponders => {
1963 RequestError::new(RequestErrorKind::NoResponders)
1964 }
1965 crate::RequestErrorKind::Other => {
1966 RequestError::with_source(RequestErrorKind::Other, error)
1967 }
1968 }
1969 }
1970}
1971
1972impl From<super::errors::Error> for RequestError {
1973 fn from(err: super::errors::Error) -> Self {
1974 RequestError::with_source(RequestErrorKind::Other, err)
1975 }
1976}
1977
1978pub type ConsumerInfoError = Error<ConsumerInfoErrorKind>;
1979
1980#[derive(Clone, Debug, PartialEq)]
1981pub enum ConsumerInfoErrorKind {
1982 InvalidName,
1983 Offline,
1984 NotFound,
1985 StreamNotFound,
1986 Request,
1987 JetStream(super::errors::Error),
1988 TimedOut,
1989 NoResponders,
1990}
1991
1992impl Display for ConsumerInfoErrorKind {
1993 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1994 match self {
1995 Self::InvalidName => write!(f, "invalid consumer name"),
1996 Self::Offline => write!(f, "consumer is offline"),
1997 Self::NotFound => write!(f, "consumer not found"),
1998 Self::StreamNotFound => write!(f, "stream not found"),
1999 Self::Request => write!(f, "request error"),
2000 Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2001 Self::TimedOut => write!(f, "timed out"),
2002 Self::NoResponders => write!(f, "no responders"),
2003 }
2004 }
2005}
2006
2007impl From<super::errors::Error> for ConsumerInfoError {
2008 fn from(error: super::errors::Error) -> Self {
2009 match error.error_code() {
2010 ErrorCode::CONSUMER_NOT_FOUND => {
2011 ConsumerInfoError::new(ConsumerInfoErrorKind::NotFound)
2012 }
2013 ErrorCode::STREAM_NOT_FOUND => {
2014 ConsumerInfoError::new(ConsumerInfoErrorKind::StreamNotFound)
2015 }
2016 ErrorCode::CONSUMER_OFFLINE => ConsumerInfoError::new(ConsumerInfoErrorKind::Offline),
2017 _ => ConsumerInfoError::new(ConsumerInfoErrorKind::JetStream(error)),
2018 }
2019 }
2020}
2021
2022impl From<RequestError> for ConsumerInfoError {
2023 fn from(error: RequestError) -> Self {
2024 match error.kind() {
2025 RequestErrorKind::TimedOut => ConsumerInfoError::new(ConsumerInfoErrorKind::TimedOut),
2026 RequestErrorKind::Other => {
2027 ConsumerInfoError::with_source(ConsumerInfoErrorKind::Request, error)
2028 }
2029 RequestErrorKind::NoResponders => {
2030 ConsumerInfoError::new(ConsumerInfoErrorKind::NoResponders)
2031 }
2032 }
2033 }
2034}
2035
2036#[derive(Clone, Debug, PartialEq)]
2037pub enum CreateStreamErrorKind {
2038 EmptyStreamName,
2039 InvalidStreamName,
2040 DomainAndExternalSet,
2041 JetStreamUnavailable,
2042 JetStream(super::errors::Error),
2043 TimedOut,
2044 Response,
2045 NotFound,
2046 ResponseParse,
2047}
2048
2049impl Display for CreateStreamErrorKind {
2050 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2051 match self {
2052 Self::EmptyStreamName => write!(f, "stream name cannot be empty"),
2053 Self::InvalidStreamName => write!(f, "stream name cannot contain `.`, `_`"),
2054 Self::DomainAndExternalSet => write!(f, "domain and external are both set"),
2055 Self::NotFound => write!(f, "stream not found"),
2056 Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2057 Self::TimedOut => write!(f, "jetstream request timed out"),
2058 Self::JetStreamUnavailable => write!(f, "jetstream unavailable"),
2059 Self::ResponseParse => write!(f, "failed to parse server response"),
2060 Self::Response => write!(f, "response error"),
2061 }
2062 }
2063}
2064
2065pub type CreateStreamError = Error<CreateStreamErrorKind>;
2066
2067impl From<super::errors::Error> for CreateStreamError {
2068 fn from(error: super::errors::Error) -> Self {
2069 match error.kind() {
2070 super::errors::ErrorCode::STREAM_NOT_FOUND => {
2071 CreateStreamError::new(CreateStreamErrorKind::NotFound)
2072 }
2073 _ => CreateStreamError::new(CreateStreamErrorKind::JetStream(error)),
2074 }
2075 }
2076}
2077
2078impl From<RequestError> for CreateStreamError {
2079 fn from(error: RequestError) -> Self {
2080 match error.kind() {
2081 RequestErrorKind::NoResponders => {
2082 CreateStreamError::new(CreateStreamErrorKind::JetStreamUnavailable)
2083 }
2084 RequestErrorKind::TimedOut => CreateStreamError::new(CreateStreamErrorKind::TimedOut),
2085 RequestErrorKind::Other => {
2086 CreateStreamError::with_source(CreateStreamErrorKind::Response, error)
2087 }
2088 }
2089 }
2090}
2091
2092#[derive(Clone, Debug, PartialEq)]
2093pub enum GetStreamErrorKind {
2094 EmptyName,
2095 Request,
2096 InvalidStreamName,
2097 JetStream(super::errors::Error),
2098}
2099
2100impl Display for GetStreamErrorKind {
2101 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2102 match self {
2103 Self::EmptyName => write!(f, "empty name cannot be empty"),
2104 Self::Request => write!(f, "request error"),
2105 Self::InvalidStreamName => write!(f, "invalid stream name"),
2106 Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2107 }
2108 }
2109}
2110
2111#[derive(Clone, Debug, PartialEq)]
2112pub enum GetStreamByNameErrorKind {
2113 Request,
2114 NotFound,
2115 InvalidSubject,
2116 JetStream(super::errors::Error),
2117}
2118
2119impl Display for GetStreamByNameErrorKind {
2120 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2121 match self {
2122 Self::Request => write!(f, "request error"),
2123 Self::NotFound => write!(f, "stream not found"),
2124 Self::InvalidSubject => write!(f, "invalid subject"),
2125 Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2126 }
2127 }
2128}
2129
2130pub type GetStreamError = Error<GetStreamErrorKind>;
2131pub type GetStreamByNameError = Error<GetStreamByNameErrorKind>;
2132
2133pub type UpdateStreamError = CreateStreamError;
2134pub type UpdateStreamErrorKind = CreateStreamErrorKind;
2135pub type DeleteStreamError = GetStreamError;
2136pub type DeleteStreamErrorKind = GetStreamErrorKind;
2137
2138#[derive(Clone, Copy, Debug, PartialEq)]
2139pub enum KeyValueErrorKind {
2140 InvalidStoreName,
2141 GetBucket,
2142 JetStream,
2143}
2144
2145impl Display for KeyValueErrorKind {
2146 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2147 match self {
2148 Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
2149 Self::GetBucket => write!(f, "failed to get the bucket"),
2150 Self::JetStream => write!(f, "JetStream error"),
2151 }
2152 }
2153}
2154
2155pub type KeyValueError = Error<KeyValueErrorKind>;
2156
2157#[derive(Clone, Copy, Debug, PartialEq)]
2158pub enum CreateKeyValueErrorKind {
2159 InvalidStoreName,
2160 TooLongHistory,
2161 JetStream,
2162 BucketCreate,
2163 TimedOut,
2164 LimitMarkersNotSupported,
2165}
2166
2167impl Display for CreateKeyValueErrorKind {
2168 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2169 match self {
2170 Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
2171 Self::TooLongHistory => write!(f, "too long history"),
2172 Self::JetStream => write!(f, "JetStream error"),
2173 Self::BucketCreate => write!(f, "bucket creation failed"),
2174 Self::TimedOut => write!(f, "timed out"),
2175 Self::LimitMarkersNotSupported => {
2176 write!(f, "limit markers not supported")
2177 }
2178 }
2179 }
2180}
2181
2182#[derive(Clone, Copy, Debug, PartialEq)]
2183pub enum UpdateKeyValueErrorKind {
2184 InvalidStoreName,
2185 TooLongHistory,
2186 JetStream,
2187 BucketUpdate,
2188 TimedOut,
2189 LimitMarkersNotSupported,
2190 NotFound,
2191}
2192
2193impl Display for UpdateKeyValueErrorKind {
2194 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2195 match self {
2196 Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
2197 Self::TooLongHistory => write!(f, "too long history"),
2198 Self::JetStream => write!(f, "JetStream error"),
2199 Self::BucketUpdate => write!(f, "bucket creation failed"),
2200 Self::TimedOut => write!(f, "timed out"),
2201 Self::LimitMarkersNotSupported => {
2202 write!(f, "limit markers not supported")
2203 }
2204 Self::NotFound => write!(f, "bucket does not exist"),
2205 }
2206 }
2207}
2208pub type CreateKeyValueError = Error<CreateKeyValueErrorKind>;
2209pub type UpdateKeyValueError = Error<UpdateKeyValueErrorKind>;
2210
2211pub type CreateObjectStoreError = CreateKeyValueError;
2212pub type CreateObjectStoreErrorKind = CreateKeyValueErrorKind;
2213
2214#[derive(Clone, Copy, Debug, PartialEq)]
2215pub enum ObjectStoreErrorKind {
2216 InvalidBucketName,
2217 GetStore,
2218}
2219
2220impl Display for ObjectStoreErrorKind {
2221 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2222 match self {
2223 Self::InvalidBucketName => write!(f, "invalid Object Store bucket name"),
2224 Self::GetStore => write!(f, "failed to get Object Store"),
2225 }
2226 }
2227}
2228
2229pub type ObjectStoreError = Error<ObjectStoreErrorKind>;
2230
2231pub type DeleteObjectStore = ObjectStoreError;
2232pub type DeleteObjectStoreKind = ObjectStoreErrorKind;
2233
2234#[derive(Clone, Debug, PartialEq)]
2235pub enum AccountErrorKind {
2236 TimedOut,
2237 JetStream(super::errors::Error),
2238 JetStreamUnavailable,
2239 Other,
2240}
2241
2242impl Display for AccountErrorKind {
2243 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2244 match self {
2245 Self::TimedOut => write!(f, "timed out"),
2246 Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2247 Self::Other => write!(f, "error"),
2248 Self::JetStreamUnavailable => write!(f, "JetStream unavailable"),
2249 }
2250 }
2251}
2252
2253pub type AccountError = Error<AccountErrorKind>;
2254
2255impl From<RequestError> for AccountError {
2256 fn from(err: RequestError) -> Self {
2257 match err.kind {
2258 RequestErrorKind::NoResponders => {
2259 AccountError::with_source(AccountErrorKind::JetStreamUnavailable, err)
2260 }
2261 RequestErrorKind::TimedOut => AccountError::new(AccountErrorKind::TimedOut),
2262 RequestErrorKind::Other => AccountError::with_source(AccountErrorKind::Other, err),
2263 }
2264 }
2265}
2266
2267#[derive(Clone, Debug, Serialize)]
2268enum ConsumerAction {
2269 #[serde(rename = "")]
2270 CreateOrUpdate,
2271 #[serde(rename = "create")]
2272 #[cfg(feature = "server_2_10")]
2273 Create,
2274 #[serde(rename = "update")]
2275 #[cfg(feature = "server_2_10")]
2276 Update,
2277}
2278
2279fn map_to_kv(stream: super::stream::Stream, prefix: String, bucket: String) -> Store {
2281 let mut store = Store {
2282 prefix: format!("$KV.{}.", bucket.as_str()),
2283 name: bucket,
2284 stream: stream.clone(),
2285 stream_name: stream.info.config.name.clone(),
2286 put_prefix: None,
2287 use_jetstream_prefix: prefix != "$JS.API",
2288 };
2289 if let Some(ref mirror) = stream.info.config.mirror {
2290 let bucket = mirror.name.trim_start_matches("KV_");
2291 if let Some(ref external) = mirror.external {
2292 if !external.api_prefix.is_empty() {
2293 store.use_jetstream_prefix = false;
2294 store.prefix = format!("$KV.{bucket}.");
2295 store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
2296 } else {
2297 store.put_prefix = Some(format!("$KV.{bucket}."));
2298 }
2299 }
2300 };
2301 store
2302}
2303
2304enum KvToStreamConfigError {
2305 TooLongHistory,
2306 #[allow(dead_code)]
2307 LimitMarkersNotSupported,
2308}
2309
2310impl From<KvToStreamConfigError> for CreateKeyValueError {
2311 fn from(err: KvToStreamConfigError) -> Self {
2312 match err {
2313 KvToStreamConfigError::TooLongHistory => {
2314 CreateKeyValueError::new(CreateKeyValueErrorKind::TooLongHistory)
2315 }
2316 KvToStreamConfigError::LimitMarkersNotSupported => {
2317 CreateKeyValueError::new(CreateKeyValueErrorKind::LimitMarkersNotSupported)
2318 }
2319 }
2320 }
2321}
2322
2323impl From<KvToStreamConfigError> for UpdateKeyValueError {
2324 fn from(err: KvToStreamConfigError) -> Self {
2325 match err {
2326 KvToStreamConfigError::TooLongHistory => {
2327 UpdateKeyValueError::new(UpdateKeyValueErrorKind::TooLongHistory)
2328 }
2329 KvToStreamConfigError::LimitMarkersNotSupported => {
2330 UpdateKeyValueError::new(UpdateKeyValueErrorKind::LimitMarkersNotSupported)
2331 }
2332 }
2333 }
2334}
2335
2336fn kv_to_stream_config(
2338 config: kv::Config,
2339 _account: Account,
2340) -> Result<super::stream::Config, KvToStreamConfigError> {
2341 let history = if config.history > 0 {
2342 if config.history > MAX_HISTORY {
2343 return Err(KvToStreamConfigError::TooLongHistory);
2344 }
2345 config.history
2346 } else {
2347 1
2348 };
2349
2350 let num_replicas = if config.num_replicas == 0 {
2351 1
2352 } else {
2353 config.num_replicas
2354 };
2355
2356 #[cfg(feature = "server_2_11")]
2357 let (mut allow_message_ttl, mut subject_delete_marker_ttl) = (false, None);
2358
2359 #[cfg(feature = "server_2_11")]
2360 if let Some(duration) = config.limit_markers {
2361 if _account.requests.level < 1 {
2362 return Err(KvToStreamConfigError::LimitMarkersNotSupported);
2363 }
2364 allow_message_ttl = true;
2365 subject_delete_marker_ttl = Some(duration);
2366 }
2367
2368 let mut mirror = config.mirror.clone();
2369 let mut sources = config.sources.clone();
2370 let mut mirror_direct = config.mirror_direct;
2371
2372 let mut subjects = Vec::new();
2373 if let Some(ref mut mirror) = mirror {
2374 if !mirror.name.starts_with("KV_") {
2375 mirror.name = format!("KV_{}", mirror.name);
2376 }
2377 mirror_direct = true;
2378 } else if let Some(ref mut sources) = sources {
2379 for source in sources {
2380 if !source.name.starts_with("KV_") {
2381 source.name = format!("KV_{}", source.name);
2382 }
2383 }
2384 } else {
2385 subjects = vec![format!("$KV.{}.>", config.bucket)];
2386 }
2387
2388 Ok(stream::Config {
2389 name: format!("KV_{}", config.bucket),
2390 description: Some(config.description),
2391 subjects,
2392 max_messages_per_subject: history,
2393 max_bytes: config.max_bytes,
2394 max_age: config.max_age,
2395 max_message_size: config.max_value_size,
2396 storage: config.storage,
2397 republish: config.republish,
2398 allow_rollup: true,
2399 deny_delete: true,
2400 deny_purge: false,
2401 allow_direct: true,
2402 sources,
2403 mirror,
2404 num_replicas,
2405 discard: stream::DiscardPolicy::New,
2406 mirror_direct,
2407 #[cfg(feature = "server_2_10")]
2408 compression: if config.compression {
2409 Some(stream::Compression::S2)
2410 } else {
2411 None
2412 },
2413 placement: config.placement,
2414 #[cfg(feature = "server_2_11")]
2415 allow_message_ttl,
2416 #[cfg(feature = "server_2_11")]
2417 subject_delete_marker_ttl,
2418 ..Default::default()
2419 })
2420}