1use crate::error::Error;
17use crate::header::{IntoHeaderName, IntoHeaderValue};
18use crate::jetstream::account::Account;
19use crate::jetstream::publish::PublishAck;
20use crate::jetstream::response::Response;
21use crate::subject::ToSubject;
22use crate::{
23 header, is_valid_subject, Client, Command, HeaderMap, HeaderValue, Message, StatusCode,
24};
25use bytes::Bytes;
26use futures_util::future::BoxFuture;
27use futures_util::{Future, StreamExt, TryFutureExt};
28use serde::de::DeserializeOwned;
29use serde::{Deserialize, Serialize};
30use serde_json::{self, json};
31use std::borrow::Borrow;
32use std::fmt::Debug;
33use std::fmt::Display;
34use std::future::IntoFuture;
35use std::pin::Pin;
36use std::str::from_utf8;
37use std::sync::Arc;
38use std::task::Poll;
39use tokio::sync::{mpsc, oneshot, OwnedSemaphorePermit, TryAcquireError};
40use tokio::time::Duration;
41use tokio_stream::wrappers::ReceiverStream;
42use tracing::debug;
43
44use super::consumer::{self, Consumer, FromConsumer, IntoConsumerConfig};
45use super::errors::ErrorCode;
46use super::kv::{Store, MAX_HISTORY};
47use super::object_store::{is_valid_bucket_name, ObjectStore};
48use super::stream::{
49 self, Config, ConsumerError, ConsumerErrorKind, DeleteStatus, DiscardPolicy, External, Info,
50 Stream,
51};
52#[cfg(feature = "server_2_10")]
53use super::stream::{Compression, ConsumerCreateStrictError, ConsumerUpdateError};
54use super::{is_valid_name, kv};
55
56#[derive(Debug, Clone)]
58pub struct Context {
59 pub(crate) client: Client,
60 pub(crate) prefix: String,
61 pub(crate) timeout: Duration,
62 pub(crate) max_ack_semaphore: Arc<tokio::sync::Semaphore>,
63 pub(crate) acker_task: Arc<tokio::task::JoinHandle<()>>,
64 pub(crate) ack_sender:
65 tokio::sync::mpsc::Sender<(oneshot::Receiver<Message>, OwnedSemaphorePermit)>,
66 pub(crate) backpressure_on_inflight: bool,
67 pub(crate) semaphore_capacity: usize,
68}
69
70fn spawn_acker(
71 rx: ReceiverStream<(oneshot::Receiver<Message>, OwnedSemaphorePermit)>,
72 ack_timeout: Duration,
73 concurrency: Option<usize>,
74) -> tokio::task::JoinHandle<()> {
75 tokio::spawn(async move {
76 rx.for_each_concurrent(concurrency, |(subscription, permit)| async move {
77 tokio::time::timeout(ack_timeout, subscription).await.ok();
78 drop(permit);
79 })
80 .await;
81 })
82}
83
84impl Drop for Context {
85 fn drop(&mut self) {
86 self.acker_task.abort();
87 }
88}
89
90use std::marker::PhantomData;
91
92#[derive(Debug, Default)]
93pub struct Yes;
94#[derive(Debug, Default)]
95pub struct No;
96
97pub trait ToAssign: Debug {}
98
99impl ToAssign for Yes {}
100impl ToAssign for No {}
101
102pub struct ContextBuilder<PREFIX: ToAssign> {
121 prefix: String,
122 timeout: Duration,
123 semaphore_capacity: usize,
124 ack_timeout: Duration,
125 backpressure_on_inflight: bool,
126 concurrency_limit: Option<usize>,
127 _phantom: PhantomData<PREFIX>,
128}
129
130impl Default for ContextBuilder<Yes> {
131 fn default() -> Self {
132 ContextBuilder {
133 prefix: "$JS.API".to_string(),
134 timeout: Duration::from_secs(5),
135 semaphore_capacity: 5_000,
136 ack_timeout: Duration::from_secs(30),
137 backpressure_on_inflight: true,
138 concurrency_limit: None,
139 _phantom: PhantomData {},
140 }
141 }
142}
143
144impl ContextBuilder<Yes> {
145 pub fn new() -> ContextBuilder<Yes> {
147 ContextBuilder::default()
148 }
149}
150
151impl ContextBuilder<Yes> {
152 pub fn api_prefix<T: Into<String>>(self, prefix: T) -> ContextBuilder<No> {
154 ContextBuilder {
155 prefix: prefix.into(),
156 timeout: self.timeout,
157 semaphore_capacity: self.semaphore_capacity,
158 ack_timeout: self.ack_timeout,
159 backpressure_on_inflight: self.backpressure_on_inflight,
160 concurrency_limit: self.concurrency_limit,
161 _phantom: PhantomData,
162 }
163 }
164
165 pub fn domain<T: Into<String>>(self, domain: T) -> ContextBuilder<No> {
168 ContextBuilder {
169 prefix: format!("$JS.{}.API", domain.into()),
170 timeout: self.timeout,
171 semaphore_capacity: self.semaphore_capacity,
172 ack_timeout: self.ack_timeout,
173 backpressure_on_inflight: self.backpressure_on_inflight,
174 concurrency_limit: self.concurrency_limit,
175 _phantom: PhantomData,
176 }
177 }
178}
179
180impl<PREFIX> ContextBuilder<PREFIX>
181where
182 PREFIX: ToAssign,
183{
184 pub fn timeout(self, timeout: Duration) -> ContextBuilder<Yes>
186 where
187 Yes: ToAssign,
188 {
189 ContextBuilder {
190 prefix: self.prefix,
191 timeout,
192 semaphore_capacity: self.semaphore_capacity,
193 ack_timeout: self.ack_timeout,
194 backpressure_on_inflight: self.backpressure_on_inflight,
195 concurrency_limit: self.concurrency_limit,
196 _phantom: PhantomData,
197 }
198 }
199
200 pub fn ack_timeout(self, ack_timeout: Duration) -> ContextBuilder<Yes>
203 where
204 Yes: ToAssign,
205 {
206 ContextBuilder {
207 prefix: self.prefix,
208 timeout: self.timeout,
209 semaphore_capacity: self.semaphore_capacity,
210 ack_timeout,
211 backpressure_on_inflight: self.backpressure_on_inflight,
212 concurrency_limit: self.concurrency_limit,
213 _phantom: PhantomData,
214 }
215 }
216
217 pub fn max_ack_inflight(self, capacity: usize) -> ContextBuilder<Yes>
220 where
221 Yes: ToAssign,
222 {
223 ContextBuilder {
224 prefix: self.prefix,
225 timeout: self.timeout,
226 semaphore_capacity: capacity,
227 ack_timeout: self.ack_timeout,
228 backpressure_on_inflight: self.backpressure_on_inflight,
229 concurrency_limit: self.concurrency_limit,
230 _phantom: PhantomData,
231 }
232 }
233
234 pub fn backpressure_on_inflight(self, enabled: bool) -> ContextBuilder<Yes>
239 where
240 Yes: ToAssign,
241 {
242 ContextBuilder {
243 prefix: self.prefix,
244 timeout: self.timeout,
245 semaphore_capacity: self.semaphore_capacity,
246 ack_timeout: self.ack_timeout,
247 backpressure_on_inflight: enabled,
248 concurrency_limit: self.concurrency_limit,
249 _phantom: PhantomData,
250 }
251 }
252
253 pub fn concurrency_limit(self, limit: Option<usize>) -> ContextBuilder<Yes>
256 where
257 Yes: ToAssign,
258 {
259 ContextBuilder {
260 prefix: self.prefix,
261 timeout: self.timeout,
262 semaphore_capacity: self.semaphore_capacity,
263 ack_timeout: self.ack_timeout,
264 backpressure_on_inflight: self.backpressure_on_inflight,
265 concurrency_limit: limit,
266 _phantom: PhantomData,
267 }
268 }
269
270 pub fn build(self, client: Client) -> Context {
272 let acker_channel_capacity = self.semaphore_capacity;
273 let (tx, rx) = tokio::sync::mpsc::channel::<(
274 oneshot::Receiver<Message>,
275 OwnedSemaphorePermit,
276 )>(acker_channel_capacity);
277 let stream = ReceiverStream::new(rx);
278 let acker_task = Arc::new(spawn_acker(
279 stream,
280 self.ack_timeout,
281 self.concurrency_limit,
282 ));
283 Context {
284 client,
285 prefix: self.prefix,
286 timeout: self.timeout,
287 max_ack_semaphore: Arc::new(tokio::sync::Semaphore::new(self.semaphore_capacity)),
288 acker_task,
289 ack_sender: tx,
290 backpressure_on_inflight: self.backpressure_on_inflight,
291 semaphore_capacity: self.semaphore_capacity,
292 }
293 }
294}
295
296impl Context {
297 pub(crate) fn new(client: Client) -> Context {
298 ContextBuilder::default().build(client)
299 }
300
301 pub fn set_timeout(&mut self, timeout: Duration) {
303 self.timeout = timeout
304 }
305
306 pub async fn wait_for_acks(&self) {
312 self.max_ack_semaphore
313 .acquire_many(self.semaphore_capacity as u32)
314 .await
315 .ok();
316 }
317
318 pub(crate) fn with_prefix<T: ToString>(client: Client, prefix: T) -> Context {
320 ContextBuilder::new()
321 .api_prefix(prefix.to_string())
322 .build(client)
323 }
324
325 pub(crate) fn with_domain<T: AsRef<str>>(client: Client, domain: T) -> Context {
327 ContextBuilder::new().domain(domain.as_ref()).build(client)
328 }
329
330 pub async fn publish<S: ToSubject>(
371 &self,
372 subject: S,
373 payload: Bytes,
374 ) -> Result<PublishAckFuture, PublishError> {
375 self.send_publish(subject, Publish::build().payload(payload))
376 .await
377 }
378
379 pub async fn publish_with_headers<S: ToSubject>(
401 &self,
402 subject: S,
403 headers: crate::header::HeaderMap,
404 payload: Bytes,
405 ) -> Result<PublishAckFuture, PublishError> {
406 self.send_publish(subject, Publish::build().payload(payload).headers(headers))
407 .await
408 }
409
410 pub async fn send_publish<S: ToSubject>(
433 &self,
434 subject: S,
435 publish: Publish,
436 ) -> Result<PublishAckFuture, PublishError> {
437 let permit = if self.backpressure_on_inflight {
438 self.max_ack_semaphore
440 .clone()
441 .acquire_owned()
442 .await
443 .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err))?
444 } else {
445 self.max_ack_semaphore
447 .clone()
448 .try_acquire_owned()
449 .map_err(|err| match err {
450 TryAcquireError::NoPermits => {
451 PublishError::new(PublishErrorKind::MaxAckPending)
452 }
453 _ => PublishError::with_source(PublishErrorKind::Other, err),
454 })?
455 };
456 let subject = subject.to_subject();
457 let (sender, receiver) = oneshot::channel();
458
459 let respond = self.client.new_inbox().into();
460
461 let send_fut = self
462 .client
463 .sender
464 .send(Command::Request {
465 subject,
466 payload: publish.payload,
467 respond,
468 headers: publish.headers,
469 sender,
470 })
471 .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err));
472
473 tokio::time::timeout(self.timeout, send_fut)
474 .map_err(|_elapsed| PublishError::new(PublishErrorKind::TimedOut))
475 .await??;
476
477 Ok(PublishAckFuture {
478 timeout: self.timeout,
479 subscription: Some(receiver),
480 permit: Some(permit),
481 tx: self.ack_sender.clone(),
482 })
483 }
484
485 pub async fn query_account(&self) -> Result<Account, AccountError> {
487 let response: Response<Account> = self.request("INFO", b"").await?;
488
489 match response {
490 Response::Err { error } => Err(AccountError::new(AccountErrorKind::JetStream(error))),
491 Response::Ok(account) => Ok(account),
492 }
493 }
494
495 pub async fn create_stream<S>(
520 &self,
521 stream_config: S,
522 ) -> Result<Stream<Info>, CreateStreamError>
523 where
524 Config: From<S>,
525 {
526 let mut config: Config = stream_config.into();
527 if config.name.is_empty() {
528 return Err(CreateStreamError::new(
529 CreateStreamErrorKind::EmptyStreamName,
530 ));
531 }
532 if !is_valid_name(config.name.as_str()) {
533 return Err(CreateStreamError::new(
534 CreateStreamErrorKind::InvalidStreamName,
535 ));
536 }
537 if let Some(ref mut mirror) = config.mirror {
538 if let Some(ref mut domain) = mirror.domain {
539 if mirror.external.is_some() {
540 return Err(CreateStreamError::new(
541 CreateStreamErrorKind::DomainAndExternalSet,
542 ));
543 }
544 mirror.external = Some(External {
545 api_prefix: format!("$JS.{domain}.API"),
546 delivery_prefix: None,
547 })
548 }
549 }
550
551 if let Some(ref mut sources) = config.sources {
552 for source in sources {
553 if let Some(ref mut domain) = source.domain {
554 if source.external.is_some() {
555 return Err(CreateStreamError::new(
556 CreateStreamErrorKind::DomainAndExternalSet,
557 ));
558 }
559 source.external = Some(External {
560 api_prefix: format!("$JS.{domain}.API"),
561 delivery_prefix: None,
562 })
563 }
564 }
565 }
566 let subject = format!("STREAM.CREATE.{}", config.name);
567 let response: Response<Info> = self.request(subject, &config).await?;
568
569 match response {
570 Response::Err { error } => Err(error.into()),
571 Response::Ok(info) => Ok(Stream {
572 context: self.clone(),
573 info,
574 name: config.name,
575 }),
576 }
577 }
578
579 pub async fn get_stream_no_info<T: AsRef<str>>(
599 &self,
600 stream: T,
601 ) -> Result<Stream<()>, GetStreamError> {
602 let stream = stream.as_ref();
603 if stream.is_empty() {
604 return Err(GetStreamError::new(GetStreamErrorKind::EmptyName));
605 }
606
607 if !is_valid_name(stream) {
608 return Err(GetStreamError::new(GetStreamErrorKind::InvalidStreamName));
609 }
610
611 Ok(Stream {
612 context: self.clone(),
613 info: (),
614 name: stream.to_string(),
615 })
616 }
617
618 pub async fn get_stream<T: AsRef<str>>(&self, stream: T) -> Result<Stream, GetStreamError> {
634 let stream = stream.as_ref();
635 if stream.is_empty() {
636 return Err(GetStreamError::new(GetStreamErrorKind::EmptyName));
637 }
638
639 if !is_valid_name(stream) {
640 return Err(GetStreamError::new(GetStreamErrorKind::InvalidStreamName));
641 }
642
643 let subject = format!("STREAM.INFO.{stream}");
644 let request: Response<Info> = self
645 .request(subject, &())
646 .await
647 .map_err(|err| GetStreamError::with_source(GetStreamErrorKind::Request, err))?;
648 match request {
649 Response::Err { error } => {
650 Err(GetStreamError::new(GetStreamErrorKind::JetStream(error)))
651 }
652 Response::Ok(info) => Ok(Stream {
653 context: self.clone(),
654 info,
655 name: stream.to_string(),
656 }),
657 }
658 }
659
660 pub async fn get_or_create_stream<S>(
684 &self,
685 stream_config: S,
686 ) -> Result<Stream, CreateStreamError>
687 where
688 S: Into<Config>,
689 {
690 let config: Config = stream_config.into();
691
692 if config.name.is_empty() {
693 return Err(CreateStreamError::new(
694 CreateStreamErrorKind::EmptyStreamName,
695 ));
696 }
697
698 if !is_valid_name(config.name.as_str()) {
699 return Err(CreateStreamError::new(
700 CreateStreamErrorKind::InvalidStreamName,
701 ));
702 }
703 let subject = format!("STREAM.INFO.{}", config.name);
704
705 let request: Response<Info> = self.request(subject, &()).await?;
706 match request {
707 Response::Err { error } if error.code() == 404 => self.create_stream(&config).await,
708 Response::Err { error } => Err(error.into()),
709 Response::Ok(info) => Ok(Stream {
710 context: self.clone(),
711 info,
712 name: config.name,
713 }),
714 }
715 }
716
717 pub async fn delete_stream<T: AsRef<str>>(
733 &self,
734 stream: T,
735 ) -> Result<DeleteStatus, DeleteStreamError> {
736 let stream = stream.as_ref();
737 if stream.is_empty() {
738 return Err(DeleteStreamError::new(DeleteStreamErrorKind::EmptyName));
739 }
740
741 if !is_valid_name(stream) {
742 return Err(DeleteStreamError::new(
743 DeleteStreamErrorKind::InvalidStreamName,
744 ));
745 }
746
747 let subject = format!("STREAM.DELETE.{stream}");
748 match self
749 .request(subject, &json!({}))
750 .await
751 .map_err(|err| DeleteStreamError::with_source(DeleteStreamErrorKind::Request, err))?
752 {
753 Response::Err { error } => Err(DeleteStreamError::new(
754 DeleteStreamErrorKind::JetStream(error),
755 )),
756 Response::Ok(delete_response) => Ok(delete_response),
757 }
758 }
759
760 pub async fn update_stream<S>(&self, config: S) -> Result<Info, UpdateStreamError>
785 where
786 S: Borrow<Config>,
787 {
788 let config = config.borrow();
789
790 if config.name.is_empty() {
791 return Err(CreateStreamError::new(
792 CreateStreamErrorKind::EmptyStreamName,
793 ));
794 }
795
796 if !is_valid_name(config.name.as_str()) {
797 return Err(CreateStreamError::new(
798 CreateStreamErrorKind::InvalidStreamName,
799 ));
800 }
801
802 let subject = format!("STREAM.UPDATE.{}", config.name);
803 match self.request(subject, config).await? {
804 Response::Err { error } => Err(error.into()),
805 Response::Ok(info) => Ok(info),
806 }
807 }
808
809 pub async fn create_or_update_stream(&self, config: Config) -> Result<Info, CreateStreamError> {
833 match self.update_stream(config.clone()).await {
834 Ok(stream) => Ok(stream),
835 Err(err) => match err.kind() {
836 CreateStreamErrorKind::NotFound => {
837 let stream = self
838 .create_stream(config)
839 .await
840 .map_err(|err| CreateStreamError::with_source(err.kind(), err))?;
841 Ok(stream.info)
842 }
843 _ => Err(err),
844 },
845 }
846 }
847
848 pub async fn stream_by_subject<T: Into<String>>(
863 &self,
864 subject: T,
865 ) -> Result<String, GetStreamByNameError> {
866 let subject = subject.into();
867 if !is_valid_subject(subject.as_str()) {
868 return Err(GetStreamByNameError::new(
869 GetStreamByNameErrorKind::InvalidSubject,
870 ));
871 }
872 let mut names = StreamNames {
873 context: self.clone(),
874 offset: 0,
875 page_request: None,
876 streams: Vec::new(),
877 subject: Some(subject),
878 done: false,
879 };
880 match names.next().await {
881 Some(name) => match name {
882 Ok(name) => Ok(name),
883 Err(err) => Err(GetStreamByNameError::with_source(
884 GetStreamByNameErrorKind::Request,
885 err,
886 )),
887 },
888 None => Err(GetStreamByNameError::new(
889 GetStreamByNameErrorKind::NotFound,
890 )),
891 }
892 }
893
894 pub fn stream_names(&self) -> StreamNames {
912 StreamNames {
913 context: self.clone(),
914 offset: 0,
915 page_request: None,
916 streams: Vec::new(),
917 subject: None,
918 done: false,
919 }
920 }
921
922 pub fn streams(&self) -> Streams {
940 Streams {
941 context: self.clone(),
942 offset: 0,
943 page_request: None,
944 streams: Vec::new(),
945 done: false,
946 }
947 }
948 pub async fn get_key_value<T: Into<String>>(&self, bucket: T) -> Result<Store, KeyValueError> {
962 let bucket: String = bucket.into();
963 if !crate::jetstream::kv::is_valid_bucket_name(&bucket) {
964 return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
965 }
966
967 let stream_name = format!("KV_{}", &bucket);
968 let stream = self
969 .get_stream(stream_name.clone())
970 .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::GetBucket, err))
971 .await?;
972
973 if stream.info.config.max_messages_per_subject < 1 {
974 return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
975 }
976 let mut store = Store {
977 prefix: format!("$KV.{}.", &bucket),
978 name: bucket,
979 stream_name,
980 stream: stream.clone(),
981 put_prefix: None,
982 use_jetstream_prefix: self.prefix != "$JS.API",
983 };
984 if let Some(ref mirror) = stream.info.config.mirror {
985 let bucket = mirror.name.trim_start_matches("KV_");
986 if let Some(ref external) = mirror.external {
987 if !external.api_prefix.is_empty() {
988 store.use_jetstream_prefix = false;
989 store.prefix = format!("$KV.{bucket}.");
990 store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
991 } else {
992 store.put_prefix = Some(format!("$KV.{bucket}."));
993 }
994 }
995 };
996
997 Ok(store)
998 }
999
1000 pub async fn create_key_value(
1020 &self,
1021 config: crate::jetstream::kv::Config,
1022 ) -> Result<Store, CreateKeyValueError> {
1023 if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1024 return Err(CreateKeyValueError::new(
1025 CreateKeyValueErrorKind::InvalidStoreName,
1026 ));
1027 }
1028 let info = self.query_account().await.map_err(|err| {
1029 CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
1030 })?;
1031
1032 let bucket_name = config.bucket.clone();
1033 let stream_config = kv_to_stream_config(config, info)?;
1034
1035 let stream = self.create_stream(stream_config).await.map_err(|err| {
1036 if err.kind() == CreateStreamErrorKind::TimedOut {
1037 CreateKeyValueError::with_source(CreateKeyValueErrorKind::TimedOut, err)
1038 } else {
1039 CreateKeyValueError::with_source(CreateKeyValueErrorKind::BucketCreate, err)
1040 }
1041 })?;
1042
1043 Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
1044 }
1045
1046 pub async fn update_key_value(
1066 &self,
1067 config: crate::jetstream::kv::Config,
1068 ) -> Result<Store, UpdateKeyValueError> {
1069 if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1070 return Err(UpdateKeyValueError::new(
1071 UpdateKeyValueErrorKind::InvalidStoreName,
1072 ));
1073 }
1074
1075 let stream_name = format!("KV_{}", config.bucket);
1076 let bucket_name = config.bucket.clone();
1077
1078 let account = self.query_account().await.map_err(|err| {
1079 UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::JetStream, err)
1080 })?;
1081 let stream = self
1082 .update_stream(kv_to_stream_config(config, account)?)
1083 .await
1084 .map_err(|err| match err.kind() {
1085 UpdateStreamErrorKind::NotFound => {
1086 UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::NotFound, err)
1087 }
1088 _ => UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::JetStream, err),
1089 })?;
1090
1091 let stream = Stream {
1092 context: self.clone(),
1093 info: stream,
1094 name: stream_name,
1095 };
1096
1097 Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
1098 }
1099
1100 pub async fn create_or_update_key_value(
1120 &self,
1121 config: crate::jetstream::kv::Config,
1122 ) -> Result<Store, CreateKeyValueError> {
1123 if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1124 return Err(CreateKeyValueError::new(
1125 CreateKeyValueErrorKind::InvalidStoreName,
1126 ));
1127 }
1128
1129 let bucket_name = config.bucket.clone();
1130 let stream_name = format!("KV_{}", config.bucket);
1131
1132 let account = self.query_account().await.map_err(|err| {
1133 CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
1134 })?;
1135 let stream = self
1136 .create_or_update_stream(kv_to_stream_config(config, account)?)
1137 .await
1138 .map_err(|err| {
1139 CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
1140 })?;
1141
1142 let stream = Stream {
1143 context: self.clone(),
1144 info: stream,
1145 name: stream_name,
1146 };
1147
1148 Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
1149 }
1150
1151 pub async fn delete_key_value<T: AsRef<str>>(
1171 &self,
1172 bucket: T,
1173 ) -> Result<DeleteStatus, KeyValueError> {
1174 if !crate::jetstream::kv::is_valid_bucket_name(bucket.as_ref()) {
1175 return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
1176 }
1177
1178 let stream_name = format!("KV_{}", bucket.as_ref());
1179 self.delete_stream(stream_name)
1180 .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::JetStream, err))
1181 .await
1182 }
1183
1184 pub async fn get_consumer_from_stream<T, C, S>(
1221 &self,
1222 consumer: C,
1223 stream: S,
1224 ) -> Result<Consumer<T>, ConsumerError>
1225 where
1226 T: FromConsumer + IntoConsumerConfig,
1227 S: AsRef<str>,
1228 C: AsRef<str>,
1229 {
1230 if !is_valid_name(stream.as_ref()) {
1231 return Err(ConsumerError::with_source(
1232 ConsumerErrorKind::InvalidName,
1233 "invalid stream",
1234 ));
1235 }
1236
1237 if !is_valid_name(consumer.as_ref()) {
1238 return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
1239 }
1240
1241 let subject = format!("CONSUMER.INFO.{}.{}", stream.as_ref(), consumer.as_ref());
1242
1243 let info: super::consumer::Info = match self.request(subject, &json!({})).await? {
1244 Response::Ok(info) => info,
1245 Response::Err { error } => return Err(error.into()),
1246 };
1247
1248 Ok(Consumer::new(
1249 T::try_from_consumer_config(info.config.clone()).map_err(|err| {
1250 ConsumerError::with_source(ConsumerErrorKind::InvalidConsumerType, err)
1251 })?,
1252 info,
1253 self.clone(),
1254 ))
1255 }
1256
1257 pub async fn delete_consumer_from_stream<C: AsRef<str>, S: AsRef<str>>(
1280 &self,
1281 consumer: C,
1282 stream: S,
1283 ) -> Result<DeleteStatus, ConsumerError> {
1284 if !is_valid_name(consumer.as_ref()) {
1285 return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
1286 }
1287
1288 if !is_valid_name(stream.as_ref()) {
1289 return Err(ConsumerError::with_source(
1290 ConsumerErrorKind::Other,
1291 "invalid stream name",
1292 ));
1293 }
1294
1295 let subject = format!("CONSUMER.DELETE.{}.{}", stream.as_ref(), consumer.as_ref());
1296
1297 match self.request(subject, &json!({})).await? {
1298 Response::Ok(delete_status) => Ok(delete_status),
1299 Response::Err { error } => Err(error.into()),
1300 }
1301 }
1302
1303 pub async fn create_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
1329 &self,
1330 config: C,
1331 stream: S,
1332 ) -> Result<Consumer<C>, ConsumerError> {
1333 self.create_consumer_on_stream_action(config, stream, ConsumerAction::CreateOrUpdate)
1334 .await
1335 }
1336
1337 #[cfg(feature = "server_2_10")]
1364 pub async fn update_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
1365 &self,
1366 config: C,
1367 stream: S,
1368 ) -> Result<Consumer<C>, ConsumerUpdateError> {
1369 self.create_consumer_on_stream_action(config, stream, ConsumerAction::Update)
1370 .await
1371 .map_err(|err| err.into())
1372 }
1373
1374 #[cfg(feature = "server_2_10")]
1401 pub async fn create_consumer_strict_on_stream<
1402 C: IntoConsumerConfig + FromConsumer,
1403 S: AsRef<str>,
1404 >(
1405 &self,
1406 config: C,
1407 stream: S,
1408 ) -> Result<Consumer<C>, ConsumerCreateStrictError> {
1409 self.create_consumer_on_stream_action(config, stream, ConsumerAction::Create)
1410 .await
1411 .map_err(|err| err.into())
1412 }
1413
1414 async fn create_consumer_on_stream_action<
1415 C: IntoConsumerConfig + FromConsumer,
1416 S: AsRef<str>,
1417 >(
1418 &self,
1419 config: C,
1420 stream: S,
1421 action: ConsumerAction,
1422 ) -> Result<Consumer<C>, ConsumerError> {
1423 let config = config.into_consumer_config();
1424
1425 let subject = {
1426 let filter = if config.filter_subject.is_empty() {
1427 "".to_string()
1428 } else {
1429 format!(".{}", config.filter_subject)
1430 };
1431 config
1432 .name
1433 .as_ref()
1434 .or(config.durable_name.as_ref())
1435 .map(|name| format!("CONSUMER.CREATE.{}.{}{}", stream.as_ref(), name, filter))
1436 .unwrap_or_else(|| format!("CONSUMER.CREATE.{}", stream.as_ref()))
1437 };
1438
1439 match self
1440 .request(
1441 subject,
1442 &json!({"stream_name": stream.as_ref(), "config": config, "action": action}),
1443 )
1444 .await?
1445 {
1446 Response::Err { error } => Err(ConsumerError::new(ConsumerErrorKind::JetStream(error))),
1447 Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
1448 FromConsumer::try_from_consumer_config(info.clone().config)
1449 .map_err(|err| ConsumerError::with_source(ConsumerErrorKind::Other, err))?,
1450 info,
1451 self.clone(),
1452 )),
1453 }
1454 }
1455
1456 pub async fn request<S, T, V>(&self, subject: S, payload: &T) -> Result<V, RequestError>
1476 where
1477 S: ToSubject,
1478 T: ?Sized + Serialize,
1479 V: DeserializeOwned,
1480 {
1481 let subject = subject.to_subject();
1482 let request = serde_json::to_vec(&payload)
1483 .map(Bytes::from)
1484 .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
1485
1486 debug!("JetStream request sent: {:?}", request);
1487
1488 let message = self
1489 .client
1490 .request(format!("{}.{}", self.prefix, subject.as_ref()), request)
1491 .await;
1492 let message = message?;
1493 debug!(
1494 "JetStream request response: {:?}",
1495 from_utf8(&message.payload)
1496 );
1497 let response = serde_json::from_slice(message.payload.as_ref())
1498 .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
1499
1500 Ok(response)
1501 }
1502
1503 pub async fn create_object_store(
1522 &self,
1523 config: super::object_store::Config,
1524 ) -> Result<super::object_store::ObjectStore, CreateObjectStoreError> {
1525 if !super::object_store::is_valid_bucket_name(&config.bucket) {
1526 return Err(CreateObjectStoreError::new(
1527 CreateKeyValueErrorKind::InvalidStoreName,
1528 ));
1529 }
1530
1531 let bucket_name = config.bucket.clone();
1532 let stream_name = format!("OBJ_{bucket_name}");
1533 let chunk_subject = format!("$O.{bucket_name}.C.>");
1534 let meta_subject = format!("$O.{bucket_name}.M.>");
1535
1536 let stream = self
1537 .create_stream(super::stream::Config {
1538 name: stream_name,
1539 description: config.description.clone(),
1540 subjects: vec![chunk_subject, meta_subject],
1541 max_age: config.max_age,
1542 max_bytes: config.max_bytes,
1543 storage: config.storage,
1544 num_replicas: config.num_replicas,
1545 discard: DiscardPolicy::New,
1546 allow_rollup: true,
1547 allow_direct: true,
1548 #[cfg(feature = "server_2_10")]
1549 compression: if config.compression {
1550 Some(Compression::S2)
1551 } else {
1552 None
1553 },
1554 placement: config.placement,
1555 ..Default::default()
1556 })
1557 .await
1558 .map_err(|err| {
1559 CreateObjectStoreError::with_source(CreateKeyValueErrorKind::BucketCreate, err)
1560 })?;
1561
1562 Ok(ObjectStore {
1563 name: bucket_name,
1564 stream,
1565 })
1566 }
1567
1568 pub async fn get_object_store<T: AsRef<str>>(
1582 &self,
1583 bucket_name: T,
1584 ) -> Result<ObjectStore, ObjectStoreError> {
1585 let bucket_name = bucket_name.as_ref();
1586 if !is_valid_bucket_name(bucket_name) {
1587 return Err(ObjectStoreError::new(
1588 ObjectStoreErrorKind::InvalidBucketName,
1589 ));
1590 }
1591 let stream_name = format!("OBJ_{bucket_name}");
1592 let stream = self
1593 .get_stream(stream_name)
1594 .await
1595 .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?;
1596
1597 Ok(ObjectStore {
1598 name: bucket_name.to_string(),
1599 stream,
1600 })
1601 }
1602
1603 pub async fn delete_object_store<T: AsRef<str>>(
1617 &self,
1618 bucket_name: T,
1619 ) -> Result<(), DeleteObjectStore> {
1620 let stream_name = format!("OBJ_{}", bucket_name.as_ref());
1621 self.delete_stream(stream_name)
1622 .await
1623 .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?;
1624 Ok(())
1625 }
1626}
1627
1628#[derive(Clone, Copy, Debug, PartialEq)]
1629pub enum PublishErrorKind {
1630 StreamNotFound,
1631 WrongLastMessageId,
1632 WrongLastSequence,
1633 TimedOut,
1634 BrokenPipe,
1635 MaxAckPending,
1636 Other,
1637}
1638
1639impl Display for PublishErrorKind {
1640 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1641 match self {
1642 Self::StreamNotFound => write!(f, "no stream found for given subject"),
1643 Self::TimedOut => write!(f, "timed out: didn't receive ack in time"),
1644 Self::Other => write!(f, "publish failed"),
1645 Self::BrokenPipe => write!(f, "broken pipe"),
1646 Self::WrongLastMessageId => write!(f, "wrong last message id"),
1647 Self::WrongLastSequence => write!(f, "wrong last sequence"),
1648 Self::MaxAckPending => write!(f, "max ack pending reached"),
1649 }
1650 }
1651}
1652
1653pub type PublishError = Error<PublishErrorKind>;
1654
1655#[derive(Debug)]
1656pub struct PublishAckFuture {
1657 timeout: Duration,
1658 subscription: Option<oneshot::Receiver<Message>>,
1659 permit: Option<OwnedSemaphorePermit>,
1660 tx: mpsc::Sender<(oneshot::Receiver<Message>, OwnedSemaphorePermit)>,
1661}
1662
1663impl Drop for PublishAckFuture {
1664 fn drop(&mut self) {
1665 if let (Some(sub), Some(permit)) = (self.subscription.take(), self.permit.take()) {
1666 if let Err(err) = self.tx.try_send((sub, permit)) {
1667 tracing::warn!("failed to pass future permit to the acker: {}", err);
1668 }
1669 }
1670 }
1671}
1672
1673impl PublishAckFuture {
1674 async fn next_with_timeout(mut self) -> Result<PublishAck, PublishError> {
1675 let next = tokio::time::timeout(self.timeout, self.subscription.take().unwrap())
1676 .await
1677 .map_err(|_| PublishError::new(PublishErrorKind::TimedOut))?;
1678 next.map_or_else(
1679 |_| Err(PublishError::new(PublishErrorKind::BrokenPipe)),
1680 |m| {
1681 if m.status == Some(StatusCode::NO_RESPONDERS) {
1682 return Err(PublishError::new(PublishErrorKind::StreamNotFound));
1683 }
1684 let response = serde_json::from_slice(m.payload.as_ref())
1685 .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err))?;
1686 match response {
1687 Response::Err { error } => match error.error_code() {
1688 ErrorCode::STREAM_WRONG_LAST_MESSAGE_ID => Err(PublishError::with_source(
1689 PublishErrorKind::WrongLastMessageId,
1690 error,
1691 )),
1692 ErrorCode::STREAM_WRONG_LAST_SEQUENCE => Err(PublishError::with_source(
1693 PublishErrorKind::WrongLastSequence,
1694 error,
1695 )),
1696 _ => Err(PublishError::with_source(PublishErrorKind::Other, error)),
1697 },
1698 Response::Ok(publish_ack) => Ok(publish_ack),
1699 }
1700 },
1701 )
1702 }
1703}
1704impl IntoFuture for PublishAckFuture {
1705 type Output = Result<PublishAck, PublishError>;
1706
1707 type IntoFuture = Pin<Box<dyn Future<Output = Result<PublishAck, PublishError>> + Send>>;
1708
1709 fn into_future(self) -> Self::IntoFuture {
1710 Box::pin(std::future::IntoFuture::into_future(
1711 self.next_with_timeout(),
1712 ))
1713 }
1714}
1715
1716#[derive(Deserialize, Debug)]
1717struct StreamPage {
1718 total: usize,
1719 streams: Option<Vec<String>>,
1720}
1721
1722#[derive(Deserialize, Debug)]
1723struct StreamInfoPage {
1724 total: usize,
1725 streams: Option<Vec<super::stream::Info>>,
1726}
1727
1728type PageRequest = BoxFuture<'static, Result<StreamPage, RequestError>>;
1729
1730pub struct StreamNames {
1731 context: Context,
1732 offset: usize,
1733 page_request: Option<PageRequest>,
1734 subject: Option<String>,
1735 streams: Vec<String>,
1736 done: bool,
1737}
1738
1739impl futures_util::Stream for StreamNames {
1740 type Item = Result<String, StreamsError>;
1741
1742 fn poll_next(
1743 mut self: Pin<&mut Self>,
1744 cx: &mut std::task::Context<'_>,
1745 ) -> std::task::Poll<Option<Self::Item>> {
1746 match self.page_request.as_mut() {
1747 Some(page) => match page.try_poll_unpin(cx) {
1748 std::task::Poll::Ready(page) => {
1749 self.page_request = None;
1750 let page = page
1751 .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?;
1752 if let Some(streams) = page.streams {
1753 self.offset += streams.len();
1754 self.streams = streams;
1755 if self.offset >= page.total {
1756 self.done = true;
1757 }
1758 match self.streams.pop() {
1759 Some(stream) => Poll::Ready(Some(Ok(stream))),
1760 None => Poll::Ready(None),
1761 }
1762 } else {
1763 Poll::Ready(None)
1764 }
1765 }
1766 std::task::Poll::Pending => std::task::Poll::Pending,
1767 },
1768 None => {
1769 if let Some(stream) = self.streams.pop() {
1770 Poll::Ready(Some(Ok(stream)))
1771 } else {
1772 if self.done {
1773 return Poll::Ready(None);
1774 }
1775 let context = self.context.clone();
1776 let offset = self.offset;
1777 let subject = self.subject.clone();
1778 self.page_request = Some(Box::pin(async move {
1779 match context
1780 .request(
1781 "STREAM.NAMES",
1782 &json!({
1783 "offset": offset,
1784 "subject": subject
1785 }),
1786 )
1787 .await?
1788 {
1789 Response::Err { error } => {
1790 Err(RequestError::with_source(RequestErrorKind::Other, error))
1791 }
1792 Response::Ok(page) => Ok(page),
1793 }
1794 }));
1795 self.poll_next(cx)
1796 }
1797 }
1798 }
1799 }
1800}
1801
1802type PageInfoRequest = BoxFuture<'static, Result<StreamInfoPage, RequestError>>;
1803
1804pub type StreamsErrorKind = RequestErrorKind;
1805pub type StreamsError = RequestError;
1806
1807pub struct Streams {
1808 context: Context,
1809 offset: usize,
1810 page_request: Option<PageInfoRequest>,
1811 streams: Vec<super::stream::Info>,
1812 done: bool,
1813}
1814
1815impl futures_util::Stream for Streams {
1816 type Item = Result<super::stream::Info, StreamsError>;
1817
1818 fn poll_next(
1819 mut self: Pin<&mut Self>,
1820 cx: &mut std::task::Context<'_>,
1821 ) -> std::task::Poll<Option<Self::Item>> {
1822 match self.page_request.as_mut() {
1823 Some(page) => match page.try_poll_unpin(cx) {
1824 std::task::Poll::Ready(page) => {
1825 self.page_request = None;
1826 let page = page
1827 .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?;
1828 if let Some(streams) = page.streams {
1829 self.offset += streams.len();
1830 self.streams = streams;
1831 if self.offset >= page.total {
1832 self.done = true;
1833 }
1834 match self.streams.pop() {
1835 Some(stream) => Poll::Ready(Some(Ok(stream))),
1836 None => Poll::Ready(None),
1837 }
1838 } else {
1839 Poll::Ready(None)
1840 }
1841 }
1842 std::task::Poll::Pending => std::task::Poll::Pending,
1843 },
1844 None => {
1845 if let Some(stream) = self.streams.pop() {
1846 Poll::Ready(Some(Ok(stream)))
1847 } else {
1848 if self.done {
1849 return Poll::Ready(None);
1850 }
1851 let context = self.context.clone();
1852 let offset = self.offset;
1853 self.page_request = Some(Box::pin(async move {
1854 match context
1855 .request(
1856 "STREAM.LIST",
1857 &json!({
1858 "offset": offset,
1859 }),
1860 )
1861 .await?
1862 {
1863 Response::Err { error } => {
1864 Err(RequestError::with_source(RequestErrorKind::Other, error))
1865 }
1866 Response::Ok(page) => Ok(page),
1867 }
1868 }));
1869 self.poll_next(cx)
1870 }
1871 }
1872 }
1873 }
1874}
1875#[derive(Default, Clone, Debug)]
1877pub struct Publish {
1878 payload: Bytes,
1879 headers: Option<header::HeaderMap>,
1880}
1881impl Publish {
1882 pub fn build() -> Self {
1884 Default::default()
1885 }
1886
1887 pub fn payload(mut self, payload: Bytes) -> Self {
1889 self.payload = payload;
1890 self
1891 }
1892 pub fn headers(mut self, headers: HeaderMap) -> Self {
1894 self.headers = Some(headers);
1895 self
1896 }
1897 pub fn header<N: IntoHeaderName, V: IntoHeaderValue>(mut self, name: N, value: V) -> Self {
1899 self.headers
1900 .get_or_insert(header::HeaderMap::new())
1901 .insert(name, value);
1902 self
1903 }
1904 pub fn message_id<T: AsRef<str>>(self, id: T) -> Self {
1906 self.header(header::NATS_MESSAGE_ID, id.as_ref())
1907 }
1908 pub fn expected_last_message_id<T: AsRef<str>>(self, last_message_id: T) -> Self {
1911 self.header(
1912 header::NATS_EXPECTED_LAST_MESSAGE_ID,
1913 last_message_id.as_ref(),
1914 )
1915 }
1916 pub fn expected_last_sequence(self, last_sequence: u64) -> Self {
1919 self.header(
1920 header::NATS_EXPECTED_LAST_SEQUENCE,
1921 HeaderValue::from(last_sequence),
1922 )
1923 }
1924 pub fn expected_last_subject_sequence(self, subject_sequence: u64) -> Self {
1927 self.header(
1928 header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
1929 HeaderValue::from(subject_sequence),
1930 )
1931 }
1932 pub fn expected_stream<T: AsRef<str>>(self, stream: T) -> Self {
1935 self.header(
1936 header::NATS_EXPECTED_STREAM,
1937 HeaderValue::from(stream.as_ref()),
1938 )
1939 }
1940
1941 #[cfg(feature = "server_2_11")]
1942 pub fn ttl(self, ttl: Duration) -> Self {
1945 self.header(header::NATS_MESSAGE_TTL, ttl.as_secs().to_string())
1946 }
1947}
1948
1949#[derive(Clone, Copy, Debug, PartialEq)]
1950pub enum RequestErrorKind {
1951 NoResponders,
1952 TimedOut,
1953 Other,
1954}
1955
1956impl Display for RequestErrorKind {
1957 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1958 match self {
1959 Self::TimedOut => write!(f, "timed out"),
1960 Self::Other => write!(f, "request failed"),
1961 Self::NoResponders => write!(f, "requested JetStream resource does not exist"),
1962 }
1963 }
1964}
1965
1966pub type RequestError = Error<RequestErrorKind>;
1967
1968impl From<crate::RequestError> for RequestError {
1969 fn from(error: crate::RequestError) -> Self {
1970 match error.kind() {
1971 crate::RequestErrorKind::TimedOut => {
1972 RequestError::with_source(RequestErrorKind::TimedOut, error)
1973 }
1974 crate::RequestErrorKind::NoResponders => {
1975 RequestError::new(RequestErrorKind::NoResponders)
1976 }
1977 crate::RequestErrorKind::Other => {
1978 RequestError::with_source(RequestErrorKind::Other, error)
1979 }
1980 }
1981 }
1982}
1983
1984impl From<super::errors::Error> for RequestError {
1985 fn from(err: super::errors::Error) -> Self {
1986 RequestError::with_source(RequestErrorKind::Other, err)
1987 }
1988}
1989
1990pub type ConsumerInfoError = Error<ConsumerInfoErrorKind>;
1991
1992#[derive(Clone, Debug, PartialEq)]
1993pub enum ConsumerInfoErrorKind {
1994 InvalidName,
1995 Offline,
1996 NotFound,
1997 StreamNotFound,
1998 Request,
1999 JetStream(super::errors::Error),
2000 TimedOut,
2001 NoResponders,
2002}
2003
2004impl Display for ConsumerInfoErrorKind {
2005 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2006 match self {
2007 Self::InvalidName => write!(f, "invalid consumer name"),
2008 Self::Offline => write!(f, "consumer is offline"),
2009 Self::NotFound => write!(f, "consumer not found"),
2010 Self::StreamNotFound => write!(f, "stream not found"),
2011 Self::Request => write!(f, "request error"),
2012 Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2013 Self::TimedOut => write!(f, "timed out"),
2014 Self::NoResponders => write!(f, "no responders"),
2015 }
2016 }
2017}
2018
2019impl From<super::errors::Error> for ConsumerInfoError {
2020 fn from(error: super::errors::Error) -> Self {
2021 match error.error_code() {
2022 ErrorCode::CONSUMER_NOT_FOUND => {
2023 ConsumerInfoError::new(ConsumerInfoErrorKind::NotFound)
2024 }
2025 ErrorCode::STREAM_NOT_FOUND => {
2026 ConsumerInfoError::new(ConsumerInfoErrorKind::StreamNotFound)
2027 }
2028 ErrorCode::CONSUMER_OFFLINE => ConsumerInfoError::new(ConsumerInfoErrorKind::Offline),
2029 _ => ConsumerInfoError::new(ConsumerInfoErrorKind::JetStream(error)),
2030 }
2031 }
2032}
2033
2034impl From<RequestError> for ConsumerInfoError {
2035 fn from(error: RequestError) -> Self {
2036 match error.kind() {
2037 RequestErrorKind::TimedOut => ConsumerInfoError::new(ConsumerInfoErrorKind::TimedOut),
2038 RequestErrorKind::Other => {
2039 ConsumerInfoError::with_source(ConsumerInfoErrorKind::Request, error)
2040 }
2041 RequestErrorKind::NoResponders => {
2042 ConsumerInfoError::new(ConsumerInfoErrorKind::NoResponders)
2043 }
2044 }
2045 }
2046}
2047
2048#[derive(Clone, Debug, PartialEq)]
2049pub enum CreateStreamErrorKind {
2050 EmptyStreamName,
2051 InvalidStreamName,
2052 DomainAndExternalSet,
2053 JetStreamUnavailable,
2054 JetStream(super::errors::Error),
2055 TimedOut,
2056 Response,
2057 NotFound,
2058 ResponseParse,
2059}
2060
2061impl Display for CreateStreamErrorKind {
2062 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2063 match self {
2064 Self::EmptyStreamName => write!(f, "stream name cannot be empty"),
2065 Self::InvalidStreamName => write!(f, "stream name cannot contain `.`, `_`"),
2066 Self::DomainAndExternalSet => write!(f, "domain and external are both set"),
2067 Self::NotFound => write!(f, "stream not found"),
2068 Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2069 Self::TimedOut => write!(f, "jetstream request timed out"),
2070 Self::JetStreamUnavailable => write!(f, "jetstream unavailable"),
2071 Self::ResponseParse => write!(f, "failed to parse server response"),
2072 Self::Response => write!(f, "response error"),
2073 }
2074 }
2075}
2076
2077pub type CreateStreamError = Error<CreateStreamErrorKind>;
2078
2079impl From<super::errors::Error> for CreateStreamError {
2080 fn from(error: super::errors::Error) -> Self {
2081 match error.kind() {
2082 super::errors::ErrorCode::STREAM_NOT_FOUND => {
2083 CreateStreamError::new(CreateStreamErrorKind::NotFound)
2084 }
2085 _ => CreateStreamError::new(CreateStreamErrorKind::JetStream(error)),
2086 }
2087 }
2088}
2089
2090impl From<RequestError> for CreateStreamError {
2091 fn from(error: RequestError) -> Self {
2092 match error.kind() {
2093 RequestErrorKind::NoResponders => {
2094 CreateStreamError::new(CreateStreamErrorKind::JetStreamUnavailable)
2095 }
2096 RequestErrorKind::TimedOut => CreateStreamError::new(CreateStreamErrorKind::TimedOut),
2097 RequestErrorKind::Other => {
2098 CreateStreamError::with_source(CreateStreamErrorKind::Response, error)
2099 }
2100 }
2101 }
2102}
2103
2104#[derive(Clone, Debug, PartialEq)]
2105pub enum GetStreamErrorKind {
2106 EmptyName,
2107 Request,
2108 InvalidStreamName,
2109 JetStream(super::errors::Error),
2110}
2111
2112impl Display for GetStreamErrorKind {
2113 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2114 match self {
2115 Self::EmptyName => write!(f, "empty name cannot be empty"),
2116 Self::Request => write!(f, "request error"),
2117 Self::InvalidStreamName => write!(f, "invalid stream name"),
2118 Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2119 }
2120 }
2121}
2122
2123#[derive(Clone, Debug, PartialEq)]
2124pub enum GetStreamByNameErrorKind {
2125 Request,
2126 NotFound,
2127 InvalidSubject,
2128 JetStream(super::errors::Error),
2129}
2130
2131impl Display for GetStreamByNameErrorKind {
2132 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2133 match self {
2134 Self::Request => write!(f, "request error"),
2135 Self::NotFound => write!(f, "stream not found"),
2136 Self::InvalidSubject => write!(f, "invalid subject"),
2137 Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2138 }
2139 }
2140}
2141
2142pub type GetStreamError = Error<GetStreamErrorKind>;
2143pub type GetStreamByNameError = Error<GetStreamByNameErrorKind>;
2144
2145pub type UpdateStreamError = CreateStreamError;
2146pub type UpdateStreamErrorKind = CreateStreamErrorKind;
2147pub type DeleteStreamError = GetStreamError;
2148pub type DeleteStreamErrorKind = GetStreamErrorKind;
2149
2150#[derive(Clone, Copy, Debug, PartialEq)]
2151pub enum KeyValueErrorKind {
2152 InvalidStoreName,
2153 GetBucket,
2154 JetStream,
2155}
2156
2157impl Display for KeyValueErrorKind {
2158 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2159 match self {
2160 Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
2161 Self::GetBucket => write!(f, "failed to get the bucket"),
2162 Self::JetStream => write!(f, "JetStream error"),
2163 }
2164 }
2165}
2166
2167pub type KeyValueError = Error<KeyValueErrorKind>;
2168
2169#[derive(Clone, Copy, Debug, PartialEq)]
2170pub enum CreateKeyValueErrorKind {
2171 InvalidStoreName,
2172 TooLongHistory,
2173 JetStream,
2174 BucketCreate,
2175 TimedOut,
2176 LimitMarkersNotSupported,
2177}
2178
2179impl Display for CreateKeyValueErrorKind {
2180 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2181 match self {
2182 Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
2183 Self::TooLongHistory => write!(f, "too long history"),
2184 Self::JetStream => write!(f, "JetStream error"),
2185 Self::BucketCreate => write!(f, "bucket creation failed"),
2186 Self::TimedOut => write!(f, "timed out"),
2187 Self::LimitMarkersNotSupported => {
2188 write!(f, "limit markers not supported")
2189 }
2190 }
2191 }
2192}
2193
2194#[derive(Clone, Copy, Debug, PartialEq)]
2195pub enum UpdateKeyValueErrorKind {
2196 InvalidStoreName,
2197 TooLongHistory,
2198 JetStream,
2199 BucketUpdate,
2200 TimedOut,
2201 LimitMarkersNotSupported,
2202 NotFound,
2203}
2204
2205impl Display for UpdateKeyValueErrorKind {
2206 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2207 match self {
2208 Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
2209 Self::TooLongHistory => write!(f, "too long history"),
2210 Self::JetStream => write!(f, "JetStream error"),
2211 Self::BucketUpdate => write!(f, "bucket creation failed"),
2212 Self::TimedOut => write!(f, "timed out"),
2213 Self::LimitMarkersNotSupported => {
2214 write!(f, "limit markers not supported")
2215 }
2216 Self::NotFound => write!(f, "bucket does not exist"),
2217 }
2218 }
2219}
2220pub type CreateKeyValueError = Error<CreateKeyValueErrorKind>;
2221pub type UpdateKeyValueError = Error<UpdateKeyValueErrorKind>;
2222
2223pub type CreateObjectStoreError = CreateKeyValueError;
2224pub type CreateObjectStoreErrorKind = CreateKeyValueErrorKind;
2225
2226#[derive(Clone, Copy, Debug, PartialEq)]
2227pub enum ObjectStoreErrorKind {
2228 InvalidBucketName,
2229 GetStore,
2230}
2231
2232impl Display for ObjectStoreErrorKind {
2233 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2234 match self {
2235 Self::InvalidBucketName => write!(f, "invalid Object Store bucket name"),
2236 Self::GetStore => write!(f, "failed to get Object Store"),
2237 }
2238 }
2239}
2240
2241pub type ObjectStoreError = Error<ObjectStoreErrorKind>;
2242
2243pub type DeleteObjectStore = ObjectStoreError;
2244pub type DeleteObjectStoreKind = ObjectStoreErrorKind;
2245
2246#[derive(Clone, Debug, PartialEq)]
2247pub enum AccountErrorKind {
2248 TimedOut,
2249 JetStream(super::errors::Error),
2250 JetStreamUnavailable,
2251 Other,
2252}
2253
2254impl Display for AccountErrorKind {
2255 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2256 match self {
2257 Self::TimedOut => write!(f, "timed out"),
2258 Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2259 Self::Other => write!(f, "error"),
2260 Self::JetStreamUnavailable => write!(f, "JetStream unavailable"),
2261 }
2262 }
2263}
2264
2265pub type AccountError = Error<AccountErrorKind>;
2266
2267impl From<RequestError> for AccountError {
2268 fn from(err: RequestError) -> Self {
2269 match err.kind {
2270 RequestErrorKind::NoResponders => {
2271 AccountError::with_source(AccountErrorKind::JetStreamUnavailable, err)
2272 }
2273 RequestErrorKind::TimedOut => AccountError::new(AccountErrorKind::TimedOut),
2274 RequestErrorKind::Other => AccountError::with_source(AccountErrorKind::Other, err),
2275 }
2276 }
2277}
2278
2279#[derive(Clone, Debug, Serialize)]
2280enum ConsumerAction {
2281 #[serde(rename = "")]
2282 CreateOrUpdate,
2283 #[serde(rename = "create")]
2284 #[cfg(feature = "server_2_10")]
2285 Create,
2286 #[serde(rename = "update")]
2287 #[cfg(feature = "server_2_10")]
2288 Update,
2289}
2290
2291fn map_to_kv(stream: super::stream::Stream, prefix: String, bucket: String) -> Store {
2293 let mut store = Store {
2294 prefix: format!("$KV.{}.", bucket.as_str()),
2295 name: bucket,
2296 stream: stream.clone(),
2297 stream_name: stream.info.config.name.clone(),
2298 put_prefix: None,
2299 use_jetstream_prefix: prefix != "$JS.API",
2300 };
2301 if let Some(ref mirror) = stream.info.config.mirror {
2302 let bucket = mirror.name.trim_start_matches("KV_");
2303 if let Some(ref external) = mirror.external {
2304 if !external.api_prefix.is_empty() {
2305 store.use_jetstream_prefix = false;
2306 store.prefix = format!("$KV.{bucket}.");
2307 store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
2308 } else {
2309 store.put_prefix = Some(format!("$KV.{bucket}."));
2310 }
2311 }
2312 };
2313 store
2314}
2315
2316enum KvToStreamConfigError {
2317 TooLongHistory,
2318 #[allow(dead_code)]
2319 LimitMarkersNotSupported,
2320}
2321
2322impl From<KvToStreamConfigError> for CreateKeyValueError {
2323 fn from(err: KvToStreamConfigError) -> Self {
2324 match err {
2325 KvToStreamConfigError::TooLongHistory => {
2326 CreateKeyValueError::new(CreateKeyValueErrorKind::TooLongHistory)
2327 }
2328 KvToStreamConfigError::LimitMarkersNotSupported => {
2329 CreateKeyValueError::new(CreateKeyValueErrorKind::LimitMarkersNotSupported)
2330 }
2331 }
2332 }
2333}
2334
2335impl From<KvToStreamConfigError> for UpdateKeyValueError {
2336 fn from(err: KvToStreamConfigError) -> Self {
2337 match err {
2338 KvToStreamConfigError::TooLongHistory => {
2339 UpdateKeyValueError::new(UpdateKeyValueErrorKind::TooLongHistory)
2340 }
2341 KvToStreamConfigError::LimitMarkersNotSupported => {
2342 UpdateKeyValueError::new(UpdateKeyValueErrorKind::LimitMarkersNotSupported)
2343 }
2344 }
2345 }
2346}
2347
2348fn kv_to_stream_config(
2350 config: kv::Config,
2351 _account: Account,
2352) -> Result<super::stream::Config, KvToStreamConfigError> {
2353 let history = if config.history > 0 {
2354 if config.history > MAX_HISTORY {
2355 return Err(KvToStreamConfigError::TooLongHistory);
2356 }
2357 config.history
2358 } else {
2359 1
2360 };
2361
2362 let num_replicas = if config.num_replicas == 0 {
2363 1
2364 } else {
2365 config.num_replicas
2366 };
2367
2368 #[cfg(feature = "server_2_11")]
2369 let (mut allow_message_ttl, mut subject_delete_marker_ttl) = (false, None);
2370
2371 #[cfg(feature = "server_2_11")]
2372 if let Some(duration) = config.limit_markers {
2373 if _account.requests.level < 1 {
2374 return Err(KvToStreamConfigError::LimitMarkersNotSupported);
2375 }
2376 allow_message_ttl = true;
2377 subject_delete_marker_ttl = Some(duration);
2378 }
2379
2380 let mut mirror = config.mirror.clone();
2381 let mut sources = config.sources.clone();
2382 let mut mirror_direct = config.mirror_direct;
2383
2384 let mut subjects = Vec::new();
2385 if let Some(ref mut mirror) = mirror {
2386 if !mirror.name.starts_with("KV_") {
2387 mirror.name = format!("KV_{}", mirror.name);
2388 }
2389 mirror_direct = true;
2390 } else if let Some(ref mut sources) = sources {
2391 for source in sources {
2392 if !source.name.starts_with("KV_") {
2393 source.name = format!("KV_{}", source.name);
2394 }
2395 }
2396 } else {
2397 subjects = vec![format!("$KV.{}.>", config.bucket)];
2398 }
2399
2400 Ok(stream::Config {
2401 name: format!("KV_{}", config.bucket),
2402 description: Some(config.description),
2403 subjects,
2404 max_messages_per_subject: history,
2405 max_bytes: config.max_bytes,
2406 max_age: config.max_age,
2407 max_message_size: config.max_value_size,
2408 storage: config.storage,
2409 republish: config.republish,
2410 allow_rollup: true,
2411 deny_delete: true,
2412 deny_purge: false,
2413 allow_direct: true,
2414 sources,
2415 mirror,
2416 num_replicas,
2417 discard: stream::DiscardPolicy::New,
2418 mirror_direct,
2419 #[cfg(feature = "server_2_10")]
2420 compression: if config.compression {
2421 Some(stream::Compression::S2)
2422 } else {
2423 None
2424 },
2425 placement: config.placement,
2426 #[cfg(feature = "server_2_11")]
2427 allow_message_ttl,
2428 #[cfg(feature = "server_2_11")]
2429 subject_delete_marker_ttl,
2430 ..Default::default()
2431 })
2432}