1use crate::error::Error;
17use crate::header::{IntoHeaderName, IntoHeaderValue};
18use crate::jetstream::account::Account;
19use crate::jetstream::publish::PublishAck;
20use crate::jetstream::response::Response;
21use crate::subject::ToSubject;
22use crate::{
23 header, is_valid_subject, Client, Command, HeaderMap, HeaderValue, Message, StatusCode,
24};
25use bytes::Bytes;
26use futures::future::BoxFuture;
27use futures::{Future, StreamExt, TryFutureExt};
28use serde::de::DeserializeOwned;
29use serde::{Deserialize, Serialize};
30use serde_json::{self, json};
31use std::borrow::Borrow;
32use std::fmt::Display;
33use std::future::IntoFuture;
34use std::pin::Pin;
35use std::str::from_utf8;
36use std::task::Poll;
37use std::time::Duration;
38use tokio::sync::oneshot;
39use tracing::debug;
40
41use super::consumer::{self, Consumer, FromConsumer, IntoConsumerConfig};
42use super::errors::ErrorCode;
43use super::kv::{Store, MAX_HISTORY};
44use super::object_store::{is_valid_bucket_name, ObjectStore};
45use super::stream::{
46 self, Config, ConsumerError, ConsumerErrorKind, DeleteStatus, DiscardPolicy, External, Info,
47 Stream,
48};
49#[cfg(feature = "server_2_10")]
50use super::stream::{Compression, ConsumerCreateStrictError, ConsumerUpdateError};
51use super::{is_valid_name, kv};
52
53#[derive(Debug, Clone)]
55pub struct Context {
56 pub(crate) client: Client,
57 pub(crate) prefix: String,
58 pub(crate) timeout: Duration,
59}
60
61impl Context {
62 pub(crate) fn new(client: Client) -> Context {
63 Context {
64 client,
65 prefix: "$JS.API".to_string(),
66 timeout: Duration::from_secs(5),
67 }
68 }
69
70 pub fn set_timeout(&mut self, timeout: Duration) {
71 self.timeout = timeout
72 }
73
74 pub(crate) fn with_prefix<T: ToString>(client: Client, prefix: T) -> Context {
75 Context {
76 client,
77 prefix: prefix.to_string(),
78 timeout: Duration::from_secs(5),
79 }
80 }
81
82 pub(crate) fn with_domain<T: AsRef<str>>(client: Client, domain: T) -> Context {
83 Context {
84 client,
85 prefix: format!("$JS.{}.API", domain.as_ref()),
86 timeout: Duration::from_secs(5),
87 }
88 }
89
90 pub async fn publish<S: ToSubject>(
131 &self,
132 subject: S,
133 payload: Bytes,
134 ) -> Result<PublishAckFuture, PublishError> {
135 self.send_publish(subject, Publish::build().payload(payload))
136 .await
137 }
138
139 pub async fn publish_with_headers<S: ToSubject>(
161 &self,
162 subject: S,
163 headers: crate::header::HeaderMap,
164 payload: Bytes,
165 ) -> Result<PublishAckFuture, PublishError> {
166 self.send_publish(subject, Publish::build().payload(payload).headers(headers))
167 .await
168 }
169
170 pub async fn send_publish<S: ToSubject>(
193 &self,
194 subject: S,
195 publish: Publish,
196 ) -> Result<PublishAckFuture, PublishError> {
197 let subject = subject.to_subject();
198 let (sender, receiver) = oneshot::channel();
199
200 let respond = self.client.new_inbox().into();
201
202 let send_fut = self
203 .client
204 .sender
205 .send(Command::Request {
206 subject,
207 payload: publish.payload,
208 respond,
209 headers: publish.headers,
210 sender,
211 })
212 .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err));
213
214 tokio::time::timeout(self.timeout, send_fut)
215 .map_err(|_elapsed| PublishError::new(PublishErrorKind::TimedOut))
216 .await??;
217
218 Ok(PublishAckFuture {
219 timeout: self.timeout,
220 subscription: receiver,
221 })
222 }
223
224 pub async fn query_account(&self) -> Result<Account, AccountError> {
226 let response: Response<Account> = self.request("INFO", b"").await?;
227
228 match response {
229 Response::Err { error } => Err(AccountError::new(AccountErrorKind::JetStream(error))),
230 Response::Ok(account) => Ok(account),
231 }
232 }
233
234 pub async fn create_stream<S>(
259 &self,
260 stream_config: S,
261 ) -> Result<Stream<Info>, CreateStreamError>
262 where
263 Config: From<S>,
264 {
265 let mut config: Config = stream_config.into();
266 if config.name.is_empty() {
267 return Err(CreateStreamError::new(
268 CreateStreamErrorKind::EmptyStreamName,
269 ));
270 }
271 if !is_valid_name(config.name.as_str()) {
272 return Err(CreateStreamError::new(
273 CreateStreamErrorKind::InvalidStreamName,
274 ));
275 }
276 if let Some(ref mut mirror) = config.mirror {
277 if let Some(ref mut domain) = mirror.domain {
278 if mirror.external.is_some() {
279 return Err(CreateStreamError::new(
280 CreateStreamErrorKind::DomainAndExternalSet,
281 ));
282 }
283 mirror.external = Some(External {
284 api_prefix: format!("$JS.{domain}.API"),
285 delivery_prefix: None,
286 })
287 }
288 }
289
290 if let Some(ref mut sources) = config.sources {
291 for source in sources {
292 if let Some(ref mut domain) = source.domain {
293 if source.external.is_some() {
294 return Err(CreateStreamError::new(
295 CreateStreamErrorKind::DomainAndExternalSet,
296 ));
297 }
298 source.external = Some(External {
299 api_prefix: format!("$JS.{domain}.API"),
300 delivery_prefix: None,
301 })
302 }
303 }
304 }
305 let subject = format!("STREAM.CREATE.{}", config.name);
306 let response: Response<Info> = self.request(subject, &config).await?;
307
308 match response {
309 Response::Err { error } => Err(error.into()),
310 Response::Ok(info) => Ok(Stream {
311 context: self.clone(),
312 info,
313 name: config.name,
314 }),
315 }
316 }
317
318 pub async fn get_stream_no_info<T: AsRef<str>>(
338 &self,
339 stream: T,
340 ) -> Result<Stream<()>, GetStreamError> {
341 let stream = stream.as_ref();
342 if stream.is_empty() {
343 return Err(GetStreamError::new(GetStreamErrorKind::EmptyName));
344 }
345
346 if !is_valid_name(stream) {
347 return Err(GetStreamError::new(GetStreamErrorKind::InvalidStreamName));
348 }
349
350 Ok(Stream {
351 context: self.clone(),
352 info: (),
353 name: stream.to_string(),
354 })
355 }
356
357 pub async fn get_stream<T: AsRef<str>>(&self, stream: T) -> Result<Stream, GetStreamError> {
373 let stream = stream.as_ref();
374 if stream.is_empty() {
375 return Err(GetStreamError::new(GetStreamErrorKind::EmptyName));
376 }
377
378 if !is_valid_name(stream) {
379 return Err(GetStreamError::new(GetStreamErrorKind::InvalidStreamName));
380 }
381
382 let subject = format!("STREAM.INFO.{stream}");
383 let request: Response<Info> = self
384 .request(subject, &())
385 .await
386 .map_err(|err| GetStreamError::with_source(GetStreamErrorKind::Request, err))?;
387 match request {
388 Response::Err { error } => {
389 Err(GetStreamError::new(GetStreamErrorKind::JetStream(error)))
390 }
391 Response::Ok(info) => Ok(Stream {
392 context: self.clone(),
393 info,
394 name: stream.to_string(),
395 }),
396 }
397 }
398
399 pub async fn get_or_create_stream<S>(
423 &self,
424 stream_config: S,
425 ) -> Result<Stream, CreateStreamError>
426 where
427 S: Into<Config>,
428 {
429 let config: Config = stream_config.into();
430
431 if config.name.is_empty() {
432 return Err(CreateStreamError::new(
433 CreateStreamErrorKind::EmptyStreamName,
434 ));
435 }
436
437 if !is_valid_name(config.name.as_str()) {
438 return Err(CreateStreamError::new(
439 CreateStreamErrorKind::InvalidStreamName,
440 ));
441 }
442 let subject = format!("STREAM.INFO.{}", config.name);
443
444 let request: Response<Info> = self.request(subject, &()).await?;
445 match request {
446 Response::Err { error } if error.code() == 404 => self.create_stream(&config).await,
447 Response::Err { error } => Err(error.into()),
448 Response::Ok(info) => Ok(Stream {
449 context: self.clone(),
450 info,
451 name: config.name,
452 }),
453 }
454 }
455
456 pub async fn delete_stream<T: AsRef<str>>(
472 &self,
473 stream: T,
474 ) -> Result<DeleteStatus, DeleteStreamError> {
475 let stream = stream.as_ref();
476 if stream.is_empty() {
477 return Err(DeleteStreamError::new(DeleteStreamErrorKind::EmptyName));
478 }
479
480 if !is_valid_name(stream) {
481 return Err(DeleteStreamError::new(
482 DeleteStreamErrorKind::InvalidStreamName,
483 ));
484 }
485
486 let subject = format!("STREAM.DELETE.{stream}");
487 match self
488 .request(subject, &json!({}))
489 .await
490 .map_err(|err| DeleteStreamError::with_source(DeleteStreamErrorKind::Request, err))?
491 {
492 Response::Err { error } => Err(DeleteStreamError::new(
493 DeleteStreamErrorKind::JetStream(error),
494 )),
495 Response::Ok(delete_response) => Ok(delete_response),
496 }
497 }
498
499 pub async fn update_stream<S>(&self, config: S) -> Result<Info, UpdateStreamError>
524 where
525 S: Borrow<Config>,
526 {
527 let config = config.borrow();
528
529 if config.name.is_empty() {
530 return Err(CreateStreamError::new(
531 CreateStreamErrorKind::EmptyStreamName,
532 ));
533 }
534
535 if !is_valid_name(config.name.as_str()) {
536 return Err(CreateStreamError::new(
537 CreateStreamErrorKind::InvalidStreamName,
538 ));
539 }
540
541 let subject = format!("STREAM.UPDATE.{}", config.name);
542 match self.request(subject, config).await? {
543 Response::Err { error } => Err(error.into()),
544 Response::Ok(info) => Ok(info),
545 }
546 }
547
548 pub async fn create_or_update_stream(&self, config: Config) -> Result<Info, CreateStreamError> {
572 match self.update_stream(config.clone()).await {
573 Ok(stream) => Ok(stream),
574 Err(err) => match err.kind() {
575 CreateStreamErrorKind::NotFound => {
576 let stream = self
577 .create_stream(config)
578 .await
579 .map_err(|err| CreateStreamError::with_source(err.kind(), err))?;
580 Ok(stream.info)
581 }
582 _ => Err(err),
583 },
584 }
585 }
586
587 pub async fn stream_by_subject<T: Into<String>>(
602 &self,
603 subject: T,
604 ) -> Result<String, GetStreamByNameError> {
605 let subject = subject.into();
606 if !is_valid_subject(subject.as_str()) {
607 return Err(GetStreamByNameError::new(
608 GetStreamByNameErrorKind::InvalidSubject,
609 ));
610 }
611 let mut names = StreamNames {
612 context: self.clone(),
613 offset: 0,
614 page_request: None,
615 streams: Vec::new(),
616 subject: Some(subject),
617 done: false,
618 };
619 match names.next().await {
620 Some(name) => match name {
621 Ok(name) => Ok(name),
622 Err(err) => Err(GetStreamByNameError::with_source(
623 GetStreamByNameErrorKind::Request,
624 err,
625 )),
626 },
627 None => Err(GetStreamByNameError::new(
628 GetStreamByNameErrorKind::NotFound,
629 )),
630 }
631 }
632
633 pub fn stream_names(&self) -> StreamNames {
651 StreamNames {
652 context: self.clone(),
653 offset: 0,
654 page_request: None,
655 streams: Vec::new(),
656 subject: None,
657 done: false,
658 }
659 }
660
661 pub fn streams(&self) -> Streams {
679 Streams {
680 context: self.clone(),
681 offset: 0,
682 page_request: None,
683 streams: Vec::new(),
684 done: false,
685 }
686 }
687 pub async fn get_key_value<T: Into<String>>(&self, bucket: T) -> Result<Store, KeyValueError> {
701 let bucket: String = bucket.into();
702 if !crate::jetstream::kv::is_valid_bucket_name(&bucket) {
703 return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
704 }
705
706 let stream_name = format!("KV_{}", &bucket);
707 let stream = self
708 .get_stream(stream_name.clone())
709 .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::GetBucket, err))
710 .await?;
711
712 if stream.info.config.max_messages_per_subject < 1 {
713 return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
714 }
715 let mut store = Store {
716 prefix: format!("$KV.{}.", &bucket),
717 name: bucket,
718 stream_name,
719 stream: stream.clone(),
720 put_prefix: None,
721 use_jetstream_prefix: self.prefix != "$JS.API",
722 };
723 if let Some(ref mirror) = stream.info.config.mirror {
724 let bucket = mirror.name.trim_start_matches("KV_");
725 if let Some(ref external) = mirror.external {
726 if !external.api_prefix.is_empty() {
727 store.use_jetstream_prefix = false;
728 store.prefix = format!("$KV.{bucket}.");
729 store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
730 } else {
731 store.put_prefix = Some(format!("$KV.{bucket}."));
732 }
733 }
734 };
735
736 Ok(store)
737 }
738
739 pub async fn create_key_value(
759 &self,
760 config: crate::jetstream::kv::Config,
761 ) -> Result<Store, CreateKeyValueError> {
762 if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
763 return Err(CreateKeyValueError::new(
764 CreateKeyValueErrorKind::InvalidStoreName,
765 ));
766 }
767 let info = self.query_account().await.map_err(|err| {
768 CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
769 })?;
770
771 let bucket_name = config.bucket.clone();
772 let stream_config = kv_to_stream_config(config, info)?;
773
774 let stream = self.create_stream(stream_config).await.map_err(|err| {
775 if err.kind() == CreateStreamErrorKind::TimedOut {
776 CreateKeyValueError::with_source(CreateKeyValueErrorKind::TimedOut, err)
777 } else {
778 CreateKeyValueError::with_source(CreateKeyValueErrorKind::BucketCreate, err)
779 }
780 })?;
781
782 Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
783 }
784
785 pub async fn update_key_value(
805 &self,
806 config: crate::jetstream::kv::Config,
807 ) -> Result<Store, UpdateKeyValueError> {
808 if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
809 return Err(UpdateKeyValueError::new(
810 UpdateKeyValueErrorKind::InvalidStoreName,
811 ));
812 }
813
814 let stream_name = format!("KV_{}", config.bucket);
815 let bucket_name = config.bucket.clone();
816
817 let account = self.query_account().await.map_err(|err| {
818 UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::JetStream, err)
819 })?;
820 let stream = self
821 .update_stream(kv_to_stream_config(config, account)?)
822 .await
823 .map_err(|err| match err.kind() {
824 UpdateStreamErrorKind::NotFound => {
825 UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::NotFound, err)
826 }
827 _ => UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::JetStream, err),
828 })?;
829
830 let stream = Stream {
831 context: self.clone(),
832 info: stream,
833 name: stream_name,
834 };
835
836 Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
837 }
838
839 pub async fn create_or_update_key_value(
859 &self,
860 config: crate::jetstream::kv::Config,
861 ) -> Result<Store, CreateKeyValueError> {
862 if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
863 return Err(CreateKeyValueError::new(
864 CreateKeyValueErrorKind::InvalidStoreName,
865 ));
866 }
867
868 let bucket_name = config.bucket.clone();
869 let stream_name = format!("KV_{}", config.bucket);
870
871 let account = self.query_account().await.map_err(|err| {
872 CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
873 })?;
874 let stream = self
875 .create_or_update_stream(kv_to_stream_config(config, account)?)
876 .await
877 .map_err(|err| {
878 CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
879 })?;
880
881 let stream = Stream {
882 context: self.clone(),
883 info: stream,
884 name: stream_name,
885 };
886
887 Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
888 }
889
890 pub async fn delete_key_value<T: AsRef<str>>(
910 &self,
911 bucket: T,
912 ) -> Result<DeleteStatus, KeyValueError> {
913 if !crate::jetstream::kv::is_valid_bucket_name(bucket.as_ref()) {
914 return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
915 }
916
917 let stream_name = format!("KV_{}", bucket.as_ref());
918 self.delete_stream(stream_name)
919 .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::JetStream, err))
920 .await
921 }
922
923 pub async fn get_consumer_from_stream<T, C, S>(
960 &self,
961 consumer: C,
962 stream: S,
963 ) -> Result<Consumer<T>, ConsumerError>
964 where
965 T: FromConsumer + IntoConsumerConfig,
966 S: AsRef<str>,
967 C: AsRef<str>,
968 {
969 if !is_valid_name(stream.as_ref()) {
970 return Err(ConsumerError::with_source(
971 ConsumerErrorKind::InvalidName,
972 "invalid stream",
973 ));
974 }
975
976 if !is_valid_name(consumer.as_ref()) {
977 return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
978 }
979
980 let subject = format!("CONSUMER.INFO.{}.{}", stream.as_ref(), consumer.as_ref());
981
982 let info: super::consumer::Info = match self.request(subject, &json!({})).await? {
983 Response::Ok(info) => info,
984 Response::Err { error } => return Err(error.into()),
985 };
986
987 Ok(Consumer::new(
988 T::try_from_consumer_config(info.config.clone()).map_err(|err| {
989 ConsumerError::with_source(ConsumerErrorKind::InvalidConsumerType, err)
990 })?,
991 info,
992 self.clone(),
993 ))
994 }
995
996 pub async fn delete_consumer_from_stream<C: AsRef<str>, S: AsRef<str>>(
1019 &self,
1020 consumer: C,
1021 stream: S,
1022 ) -> Result<DeleteStatus, ConsumerError> {
1023 if !is_valid_name(consumer.as_ref()) {
1024 return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
1025 }
1026
1027 if !is_valid_name(stream.as_ref()) {
1028 return Err(ConsumerError::with_source(
1029 ConsumerErrorKind::Other,
1030 "invalid stream name",
1031 ));
1032 }
1033
1034 let subject = format!("CONSUMER.DELETE.{}.{}", stream.as_ref(), consumer.as_ref());
1035
1036 match self.request(subject, &json!({})).await? {
1037 Response::Ok(delete_status) => Ok(delete_status),
1038 Response::Err { error } => Err(error.into()),
1039 }
1040 }
1041
1042 pub async fn create_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
1068 &self,
1069 config: C,
1070 stream: S,
1071 ) -> Result<Consumer<C>, ConsumerError> {
1072 self.create_consumer_on_stream_action(config, stream, ConsumerAction::CreateOrUpdate)
1073 .await
1074 }
1075
1076 #[cfg(feature = "server_2_10")]
1103 pub async fn update_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
1104 &self,
1105 config: C,
1106 stream: S,
1107 ) -> Result<Consumer<C>, ConsumerUpdateError> {
1108 self.create_consumer_on_stream_action(config, stream, ConsumerAction::Update)
1109 .await
1110 .map_err(|err| err.into())
1111 }
1112
1113 #[cfg(feature = "server_2_10")]
1140 pub async fn create_consumer_strict_on_stream<
1141 C: IntoConsumerConfig + FromConsumer,
1142 S: AsRef<str>,
1143 >(
1144 &self,
1145 config: C,
1146 stream: S,
1147 ) -> Result<Consumer<C>, ConsumerCreateStrictError> {
1148 self.create_consumer_on_stream_action(config, stream, ConsumerAction::Create)
1149 .await
1150 .map_err(|err| err.into())
1151 }
1152
1153 async fn create_consumer_on_stream_action<
1154 C: IntoConsumerConfig + FromConsumer,
1155 S: AsRef<str>,
1156 >(
1157 &self,
1158 config: C,
1159 stream: S,
1160 action: ConsumerAction,
1161 ) -> Result<Consumer<C>, ConsumerError> {
1162 let config = config.into_consumer_config();
1163
1164 let subject = {
1165 let filter = if config.filter_subject.is_empty() {
1166 "".to_string()
1167 } else {
1168 format!(".{}", config.filter_subject)
1169 };
1170 config
1171 .name
1172 .as_ref()
1173 .or(config.durable_name.as_ref())
1174 .map(|name| format!("CONSUMER.CREATE.{}.{}{}", stream.as_ref(), name, filter))
1175 .unwrap_or_else(|| format!("CONSUMER.CREATE.{}", stream.as_ref()))
1176 };
1177
1178 match self
1179 .request(
1180 subject,
1181 &json!({"stream_name": stream.as_ref(), "config": config, "action": action}),
1182 )
1183 .await?
1184 {
1185 Response::Err { error } => Err(ConsumerError::new(ConsumerErrorKind::JetStream(error))),
1186 Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
1187 FromConsumer::try_from_consumer_config(info.clone().config)
1188 .map_err(|err| ConsumerError::with_source(ConsumerErrorKind::Other, err))?,
1189 info,
1190 self.clone(),
1191 )),
1192 }
1193 }
1194
1195 pub async fn request<S, T, V>(&self, subject: S, payload: &T) -> Result<V, RequestError>
1215 where
1216 S: ToSubject,
1217 T: ?Sized + Serialize,
1218 V: DeserializeOwned,
1219 {
1220 let subject = subject.to_subject();
1221 let request = serde_json::to_vec(&payload)
1222 .map(Bytes::from)
1223 .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
1224
1225 debug!("JetStream request sent: {:?}", request);
1226
1227 let message = self
1228 .client
1229 .request(format!("{}.{}", self.prefix, subject.as_ref()), request)
1230 .await;
1231 let message = message?;
1232 debug!(
1233 "JetStream request response: {:?}",
1234 from_utf8(&message.payload)
1235 );
1236 let response = serde_json::from_slice(message.payload.as_ref())
1237 .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
1238
1239 Ok(response)
1240 }
1241
1242 pub async fn create_object_store(
1261 &self,
1262 config: super::object_store::Config,
1263 ) -> Result<super::object_store::ObjectStore, CreateObjectStoreError> {
1264 if !super::object_store::is_valid_bucket_name(&config.bucket) {
1265 return Err(CreateObjectStoreError::new(
1266 CreateKeyValueErrorKind::InvalidStoreName,
1267 ));
1268 }
1269
1270 let bucket_name = config.bucket.clone();
1271 let stream_name = format!("OBJ_{bucket_name}");
1272 let chunk_subject = format!("$O.{bucket_name}.C.>");
1273 let meta_subject = format!("$O.{bucket_name}.M.>");
1274
1275 let stream = self
1276 .create_stream(super::stream::Config {
1277 name: stream_name,
1278 description: config.description.clone(),
1279 subjects: vec![chunk_subject, meta_subject],
1280 max_age: config.max_age,
1281 max_bytes: config.max_bytes,
1282 storage: config.storage,
1283 num_replicas: config.num_replicas,
1284 discard: DiscardPolicy::New,
1285 allow_rollup: true,
1286 allow_direct: true,
1287 #[cfg(feature = "server_2_10")]
1288 compression: if config.compression {
1289 Some(Compression::S2)
1290 } else {
1291 None
1292 },
1293 placement: config.placement,
1294 ..Default::default()
1295 })
1296 .await
1297 .map_err(|err| {
1298 CreateObjectStoreError::with_source(CreateKeyValueErrorKind::BucketCreate, err)
1299 })?;
1300
1301 Ok(ObjectStore {
1302 name: bucket_name,
1303 stream,
1304 })
1305 }
1306
1307 pub async fn get_object_store<T: AsRef<str>>(
1321 &self,
1322 bucket_name: T,
1323 ) -> Result<ObjectStore, ObjectStoreError> {
1324 let bucket_name = bucket_name.as_ref();
1325 if !is_valid_bucket_name(bucket_name) {
1326 return Err(ObjectStoreError::new(
1327 ObjectStoreErrorKind::InvalidBucketName,
1328 ));
1329 }
1330 let stream_name = format!("OBJ_{bucket_name}");
1331 let stream = self
1332 .get_stream(stream_name)
1333 .await
1334 .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?;
1335
1336 Ok(ObjectStore {
1337 name: bucket_name.to_string(),
1338 stream,
1339 })
1340 }
1341
1342 pub async fn delete_object_store<T: AsRef<str>>(
1356 &self,
1357 bucket_name: T,
1358 ) -> Result<(), DeleteObjectStore> {
1359 let stream_name = format!("OBJ_{}", bucket_name.as_ref());
1360 self.delete_stream(stream_name)
1361 .await
1362 .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?;
1363 Ok(())
1364 }
1365}
1366
1367#[derive(Clone, Copy, Debug, PartialEq)]
1368pub enum PublishErrorKind {
1369 StreamNotFound,
1370 WrongLastMessageId,
1371 WrongLastSequence,
1372 TimedOut,
1373 BrokenPipe,
1374 Other,
1375}
1376
1377impl Display for PublishErrorKind {
1378 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1379 match self {
1380 Self::StreamNotFound => write!(f, "no stream found for given subject"),
1381 Self::TimedOut => write!(f, "timed out: didn't receive ack in time"),
1382 Self::Other => write!(f, "publish failed"),
1383 Self::BrokenPipe => write!(f, "broken pipe"),
1384 Self::WrongLastMessageId => write!(f, "wrong last message id"),
1385 Self::WrongLastSequence => write!(f, "wrong last sequence"),
1386 }
1387 }
1388}
1389
1390pub type PublishError = Error<PublishErrorKind>;
1391
1392#[derive(Debug)]
1393pub struct PublishAckFuture {
1394 timeout: Duration,
1395 subscription: oneshot::Receiver<Message>,
1396}
1397
1398impl PublishAckFuture {
1399 async fn next_with_timeout(self) -> Result<PublishAck, PublishError> {
1400 let next = tokio::time::timeout(self.timeout, self.subscription)
1401 .await
1402 .map_err(|_| PublishError::new(PublishErrorKind::TimedOut))?;
1403 next.map_or_else(
1404 |_| Err(PublishError::new(PublishErrorKind::BrokenPipe)),
1405 |m| {
1406 if m.status == Some(StatusCode::NO_RESPONDERS) {
1407 return Err(PublishError::new(PublishErrorKind::StreamNotFound));
1408 }
1409 let response = serde_json::from_slice(m.payload.as_ref())
1410 .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err))?;
1411 match response {
1412 Response::Err { error } => match error.error_code() {
1413 ErrorCode::STREAM_WRONG_LAST_MESSAGE_ID => Err(PublishError::with_source(
1414 PublishErrorKind::WrongLastMessageId,
1415 error,
1416 )),
1417 ErrorCode::STREAM_WRONG_LAST_SEQUENCE => Err(PublishError::with_source(
1418 PublishErrorKind::WrongLastSequence,
1419 error,
1420 )),
1421 _ => Err(PublishError::with_source(PublishErrorKind::Other, error)),
1422 },
1423 Response::Ok(publish_ack) => Ok(publish_ack),
1424 }
1425 },
1426 )
1427 }
1428}
1429impl IntoFuture for PublishAckFuture {
1430 type Output = Result<PublishAck, PublishError>;
1431
1432 type IntoFuture = Pin<Box<dyn Future<Output = Result<PublishAck, PublishError>> + Send>>;
1433
1434 fn into_future(self) -> Self::IntoFuture {
1435 Box::pin(std::future::IntoFuture::into_future(
1436 self.next_with_timeout(),
1437 ))
1438 }
1439}
1440
1441#[derive(Deserialize, Debug)]
1442struct StreamPage {
1443 total: usize,
1444 streams: Option<Vec<String>>,
1445}
1446
1447#[derive(Deserialize, Debug)]
1448struct StreamInfoPage {
1449 total: usize,
1450 streams: Option<Vec<super::stream::Info>>,
1451}
1452
1453type PageRequest = BoxFuture<'static, Result<StreamPage, RequestError>>;
1454
1455pub struct StreamNames {
1456 context: Context,
1457 offset: usize,
1458 page_request: Option<PageRequest>,
1459 subject: Option<String>,
1460 streams: Vec<String>,
1461 done: bool,
1462}
1463
1464impl futures::Stream for StreamNames {
1465 type Item = Result<String, StreamsError>;
1466
1467 fn poll_next(
1468 mut self: Pin<&mut Self>,
1469 cx: &mut std::task::Context<'_>,
1470 ) -> std::task::Poll<Option<Self::Item>> {
1471 match self.page_request.as_mut() {
1472 Some(page) => match page.try_poll_unpin(cx) {
1473 std::task::Poll::Ready(page) => {
1474 self.page_request = None;
1475 let page = page
1476 .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?;
1477 if let Some(streams) = page.streams {
1478 self.offset += streams.len();
1479 self.streams = streams;
1480 if self.offset >= page.total {
1481 self.done = true;
1482 }
1483 match self.streams.pop() {
1484 Some(stream) => Poll::Ready(Some(Ok(stream))),
1485 None => Poll::Ready(None),
1486 }
1487 } else {
1488 Poll::Ready(None)
1489 }
1490 }
1491 std::task::Poll::Pending => std::task::Poll::Pending,
1492 },
1493 None => {
1494 if let Some(stream) = self.streams.pop() {
1495 Poll::Ready(Some(Ok(stream)))
1496 } else {
1497 if self.done {
1498 return Poll::Ready(None);
1499 }
1500 let context = self.context.clone();
1501 let offset = self.offset;
1502 let subject = self.subject.clone();
1503 self.page_request = Some(Box::pin(async move {
1504 match context
1505 .request(
1506 "STREAM.NAMES",
1507 &json!({
1508 "offset": offset,
1509 "subject": subject
1510 }),
1511 )
1512 .await?
1513 {
1514 Response::Err { error } => {
1515 Err(RequestError::with_source(RequestErrorKind::Other, error))
1516 }
1517 Response::Ok(page) => Ok(page),
1518 }
1519 }));
1520 self.poll_next(cx)
1521 }
1522 }
1523 }
1524 }
1525}
1526
1527type PageInfoRequest = BoxFuture<'static, Result<StreamInfoPage, RequestError>>;
1528
1529pub type StreamsErrorKind = RequestErrorKind;
1530pub type StreamsError = RequestError;
1531
1532pub struct Streams {
1533 context: Context,
1534 offset: usize,
1535 page_request: Option<PageInfoRequest>,
1536 streams: Vec<super::stream::Info>,
1537 done: bool,
1538}
1539
1540impl futures::Stream for Streams {
1541 type Item = Result<super::stream::Info, StreamsError>;
1542
1543 fn poll_next(
1544 mut self: Pin<&mut Self>,
1545 cx: &mut std::task::Context<'_>,
1546 ) -> std::task::Poll<Option<Self::Item>> {
1547 match self.page_request.as_mut() {
1548 Some(page) => match page.try_poll_unpin(cx) {
1549 std::task::Poll::Ready(page) => {
1550 self.page_request = None;
1551 let page = page
1552 .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?;
1553 if let Some(streams) = page.streams {
1554 self.offset += streams.len();
1555 self.streams = streams;
1556 if self.offset >= page.total {
1557 self.done = true;
1558 }
1559 match self.streams.pop() {
1560 Some(stream) => Poll::Ready(Some(Ok(stream))),
1561 None => Poll::Ready(None),
1562 }
1563 } else {
1564 Poll::Ready(None)
1565 }
1566 }
1567 std::task::Poll::Pending => std::task::Poll::Pending,
1568 },
1569 None => {
1570 if let Some(stream) = self.streams.pop() {
1571 Poll::Ready(Some(Ok(stream)))
1572 } else {
1573 if self.done {
1574 return Poll::Ready(None);
1575 }
1576 let context = self.context.clone();
1577 let offset = self.offset;
1578 self.page_request = Some(Box::pin(async move {
1579 match context
1580 .request(
1581 "STREAM.LIST",
1582 &json!({
1583 "offset": offset,
1584 }),
1585 )
1586 .await?
1587 {
1588 Response::Err { error } => {
1589 Err(RequestError::with_source(RequestErrorKind::Other, error))
1590 }
1591 Response::Ok(page) => Ok(page),
1592 }
1593 }));
1594 self.poll_next(cx)
1595 }
1596 }
1597 }
1598 }
1599}
1600#[derive(Default, Clone, Debug)]
1602pub struct Publish {
1603 payload: Bytes,
1604 headers: Option<header::HeaderMap>,
1605}
1606impl Publish {
1607 pub fn build() -> Self {
1609 Default::default()
1610 }
1611
1612 pub fn payload(mut self, payload: Bytes) -> Self {
1614 self.payload = payload;
1615 self
1616 }
1617 pub fn headers(mut self, headers: HeaderMap) -> Self {
1619 self.headers = Some(headers);
1620 self
1621 }
1622 pub fn header<N: IntoHeaderName, V: IntoHeaderValue>(mut self, name: N, value: V) -> Self {
1624 self.headers
1625 .get_or_insert(header::HeaderMap::new())
1626 .insert(name, value);
1627 self
1628 }
1629 pub fn message_id<T: AsRef<str>>(self, id: T) -> Self {
1631 self.header(header::NATS_MESSAGE_ID, id.as_ref())
1632 }
1633 pub fn expected_last_message_id<T: AsRef<str>>(self, last_message_id: T) -> Self {
1636 self.header(
1637 header::NATS_EXPECTED_LAST_MESSAGE_ID,
1638 last_message_id.as_ref(),
1639 )
1640 }
1641 pub fn expected_last_sequence(self, last_sequence: u64) -> Self {
1644 self.header(
1645 header::NATS_EXPECTED_LAST_SEQUENCE,
1646 HeaderValue::from(last_sequence),
1647 )
1648 }
1649 pub fn expected_last_subject_sequence(self, subject_sequence: u64) -> Self {
1652 self.header(
1653 header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
1654 HeaderValue::from(subject_sequence),
1655 )
1656 }
1657 pub fn expected_stream<T: AsRef<str>>(self, stream: T) -> Self {
1660 self.header(
1661 header::NATS_EXPECTED_STREAM,
1662 HeaderValue::from(stream.as_ref()),
1663 )
1664 }
1665
1666 #[cfg(feature = "server_2_11")]
1667 pub fn ttl(self, ttl: Duration) -> Self {
1670 self.header(header::NATS_MESSAGE_TTL, ttl.as_secs().to_string())
1671 }
1672}
1673
1674#[derive(Clone, Copy, Debug, PartialEq)]
1675pub enum RequestErrorKind {
1676 NoResponders,
1677 TimedOut,
1678 Other,
1679}
1680
1681impl Display for RequestErrorKind {
1682 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1683 match self {
1684 Self::TimedOut => write!(f, "timed out"),
1685 Self::Other => write!(f, "request failed"),
1686 Self::NoResponders => write!(f, "requested JetStream resource does not exist"),
1687 }
1688 }
1689}
1690
1691pub type RequestError = Error<RequestErrorKind>;
1692
1693impl From<crate::RequestError> for RequestError {
1694 fn from(error: crate::RequestError) -> Self {
1695 match error.kind() {
1696 crate::RequestErrorKind::TimedOut => {
1697 RequestError::with_source(RequestErrorKind::TimedOut, error)
1698 }
1699 crate::RequestErrorKind::NoResponders => {
1700 RequestError::new(RequestErrorKind::NoResponders)
1701 }
1702 crate::RequestErrorKind::Other => {
1703 RequestError::with_source(RequestErrorKind::Other, error)
1704 }
1705 }
1706 }
1707}
1708
1709impl From<super::errors::Error> for RequestError {
1710 fn from(err: super::errors::Error) -> Self {
1711 RequestError::with_source(RequestErrorKind::Other, err)
1712 }
1713}
1714
1715pub type ConsumerInfoError = Error<ConsumerInfoErrorKind>;
1716
1717#[derive(Clone, Debug, PartialEq)]
1718pub enum ConsumerInfoErrorKind {
1719 InvalidName,
1720 Offline,
1721 NotFound,
1722 StreamNotFound,
1723 Request,
1724 JetStream(super::errors::Error),
1725 TimedOut,
1726 NoResponders,
1727}
1728
1729impl Display for ConsumerInfoErrorKind {
1730 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1731 match self {
1732 Self::InvalidName => write!(f, "invalid consumer name"),
1733 Self::Offline => write!(f, "consumer is offline"),
1734 Self::NotFound => write!(f, "consumer not found"),
1735 Self::StreamNotFound => write!(f, "stream not found"),
1736 Self::Request => write!(f, "request error"),
1737 Self::JetStream(err) => write!(f, "jetstream error: {}", err),
1738 Self::TimedOut => write!(f, "timed out"),
1739 Self::NoResponders => write!(f, "no responders"),
1740 }
1741 }
1742}
1743
1744impl From<super::errors::Error> for ConsumerInfoError {
1745 fn from(error: super::errors::Error) -> Self {
1746 match error.error_code() {
1747 ErrorCode::CONSUMER_NOT_FOUND => {
1748 ConsumerInfoError::new(ConsumerInfoErrorKind::NotFound)
1749 }
1750 ErrorCode::STREAM_NOT_FOUND => {
1751 ConsumerInfoError::new(ConsumerInfoErrorKind::StreamNotFound)
1752 }
1753 ErrorCode::CONSUMER_OFFLINE => ConsumerInfoError::new(ConsumerInfoErrorKind::Offline),
1754 _ => ConsumerInfoError::new(ConsumerInfoErrorKind::JetStream(error)),
1755 }
1756 }
1757}
1758
1759impl From<RequestError> for ConsumerInfoError {
1760 fn from(error: RequestError) -> Self {
1761 match error.kind() {
1762 RequestErrorKind::TimedOut => ConsumerInfoError::new(ConsumerInfoErrorKind::TimedOut),
1763 RequestErrorKind::Other => {
1764 ConsumerInfoError::with_source(ConsumerInfoErrorKind::Request, error)
1765 }
1766 RequestErrorKind::NoResponders => {
1767 ConsumerInfoError::new(ConsumerInfoErrorKind::NoResponders)
1768 }
1769 }
1770 }
1771}
1772
1773#[derive(Clone, Debug, PartialEq)]
1774pub enum CreateStreamErrorKind {
1775 EmptyStreamName,
1776 InvalidStreamName,
1777 DomainAndExternalSet,
1778 JetStreamUnavailable,
1779 JetStream(super::errors::Error),
1780 TimedOut,
1781 Response,
1782 NotFound,
1783 ResponseParse,
1784}
1785
1786impl Display for CreateStreamErrorKind {
1787 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1788 match self {
1789 Self::EmptyStreamName => write!(f, "stream name cannot be empty"),
1790 Self::InvalidStreamName => write!(f, "stream name cannot contain `.`, `_`"),
1791 Self::DomainAndExternalSet => write!(f, "domain and external are both set"),
1792 Self::NotFound => write!(f, "stream not found"),
1793 Self::JetStream(err) => write!(f, "jetstream error: {}", err),
1794 Self::TimedOut => write!(f, "jetstream request timed out"),
1795 Self::JetStreamUnavailable => write!(f, "jetstream unavailable"),
1796 Self::ResponseParse => write!(f, "failed to parse server response"),
1797 Self::Response => write!(f, "response error"),
1798 }
1799 }
1800}
1801
1802pub type CreateStreamError = Error<CreateStreamErrorKind>;
1803
1804impl From<super::errors::Error> for CreateStreamError {
1805 fn from(error: super::errors::Error) -> Self {
1806 match error.kind() {
1807 super::errors::ErrorCode::STREAM_NOT_FOUND => {
1808 CreateStreamError::new(CreateStreamErrorKind::NotFound)
1809 }
1810 _ => CreateStreamError::new(CreateStreamErrorKind::JetStream(error)),
1811 }
1812 }
1813}
1814
1815impl From<RequestError> for CreateStreamError {
1816 fn from(error: RequestError) -> Self {
1817 match error.kind() {
1818 RequestErrorKind::NoResponders => {
1819 CreateStreamError::new(CreateStreamErrorKind::JetStreamUnavailable)
1820 }
1821 RequestErrorKind::TimedOut => CreateStreamError::new(CreateStreamErrorKind::TimedOut),
1822 RequestErrorKind::Other => {
1823 CreateStreamError::with_source(CreateStreamErrorKind::Response, error)
1824 }
1825 }
1826 }
1827}
1828
1829#[derive(Clone, Debug, PartialEq)]
1830pub enum GetStreamErrorKind {
1831 EmptyName,
1832 Request,
1833 InvalidStreamName,
1834 JetStream(super::errors::Error),
1835}
1836
1837impl Display for GetStreamErrorKind {
1838 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1839 match self {
1840 Self::EmptyName => write!(f, "empty name cannot be empty"),
1841 Self::Request => write!(f, "request error"),
1842 Self::InvalidStreamName => write!(f, "invalid stream name"),
1843 Self::JetStream(err) => write!(f, "jetstream error: {}", err),
1844 }
1845 }
1846}
1847
1848#[derive(Clone, Debug, PartialEq)]
1849pub enum GetStreamByNameErrorKind {
1850 Request,
1851 NotFound,
1852 InvalidSubject,
1853 JetStream(super::errors::Error),
1854}
1855
1856impl Display for GetStreamByNameErrorKind {
1857 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1858 match self {
1859 Self::Request => write!(f, "request error"),
1860 Self::NotFound => write!(f, "stream not found"),
1861 Self::InvalidSubject => write!(f, "invalid subject"),
1862 Self::JetStream(err) => write!(f, "jetstream error: {}", err),
1863 }
1864 }
1865}
1866
1867pub type GetStreamError = Error<GetStreamErrorKind>;
1868pub type GetStreamByNameError = Error<GetStreamByNameErrorKind>;
1869
1870pub type UpdateStreamError = CreateStreamError;
1871pub type UpdateStreamErrorKind = CreateStreamErrorKind;
1872pub type DeleteStreamError = GetStreamError;
1873pub type DeleteStreamErrorKind = GetStreamErrorKind;
1874
1875#[derive(Clone, Copy, Debug, PartialEq)]
1876pub enum KeyValueErrorKind {
1877 InvalidStoreName,
1878 GetBucket,
1879 JetStream,
1880}
1881
1882impl Display for KeyValueErrorKind {
1883 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1884 match self {
1885 Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
1886 Self::GetBucket => write!(f, "failed to get the bucket"),
1887 Self::JetStream => write!(f, "JetStream error"),
1888 }
1889 }
1890}
1891
1892pub type KeyValueError = Error<KeyValueErrorKind>;
1893
1894#[derive(Clone, Copy, Debug, PartialEq)]
1895pub enum CreateKeyValueErrorKind {
1896 InvalidStoreName,
1897 TooLongHistory,
1898 JetStream,
1899 BucketCreate,
1900 TimedOut,
1901 LimitMarkersNotSupported,
1902}
1903
1904impl Display for CreateKeyValueErrorKind {
1905 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1906 match self {
1907 Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
1908 Self::TooLongHistory => write!(f, "too long history"),
1909 Self::JetStream => write!(f, "JetStream error"),
1910 Self::BucketCreate => write!(f, "bucket creation failed"),
1911 Self::TimedOut => write!(f, "timed out"),
1912 Self::LimitMarkersNotSupported => {
1913 write!(f, "limit markers not supported")
1914 }
1915 }
1916 }
1917}
1918
1919#[derive(Clone, Copy, Debug, PartialEq)]
1920pub enum UpdateKeyValueErrorKind {
1921 InvalidStoreName,
1922 TooLongHistory,
1923 JetStream,
1924 BucketUpdate,
1925 TimedOut,
1926 LimitMarkersNotSupported,
1927 NotFound,
1928}
1929
1930impl Display for UpdateKeyValueErrorKind {
1931 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1932 match self {
1933 Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
1934 Self::TooLongHistory => write!(f, "too long history"),
1935 Self::JetStream => write!(f, "JetStream error"),
1936 Self::BucketUpdate => write!(f, "bucket creation failed"),
1937 Self::TimedOut => write!(f, "timed out"),
1938 Self::LimitMarkersNotSupported => {
1939 write!(f, "limit markers not supported")
1940 }
1941 Self::NotFound => write!(f, "bucket does not exist"),
1942 }
1943 }
1944}
1945pub type CreateKeyValueError = Error<CreateKeyValueErrorKind>;
1946pub type UpdateKeyValueError = Error<UpdateKeyValueErrorKind>;
1947
1948pub type CreateObjectStoreError = CreateKeyValueError;
1949pub type CreateObjectStoreErrorKind = CreateKeyValueErrorKind;
1950
1951#[derive(Clone, Copy, Debug, PartialEq)]
1952pub enum ObjectStoreErrorKind {
1953 InvalidBucketName,
1954 GetStore,
1955}
1956
1957impl Display for ObjectStoreErrorKind {
1958 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1959 match self {
1960 Self::InvalidBucketName => write!(f, "invalid Object Store bucket name"),
1961 Self::GetStore => write!(f, "failed to get Object Store"),
1962 }
1963 }
1964}
1965
1966pub type ObjectStoreError = Error<ObjectStoreErrorKind>;
1967
1968pub type DeleteObjectStore = ObjectStoreError;
1969pub type DeleteObjectStoreKind = ObjectStoreErrorKind;
1970
1971#[derive(Clone, Debug, PartialEq)]
1972pub enum AccountErrorKind {
1973 TimedOut,
1974 JetStream(super::errors::Error),
1975 JetStreamUnavailable,
1976 Other,
1977}
1978
1979impl Display for AccountErrorKind {
1980 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1981 match self {
1982 Self::TimedOut => write!(f, "timed out"),
1983 Self::JetStream(err) => write!(f, "JetStream error: {}", err),
1984 Self::Other => write!(f, "error"),
1985 Self::JetStreamUnavailable => write!(f, "JetStream unavailable"),
1986 }
1987 }
1988}
1989
1990pub type AccountError = Error<AccountErrorKind>;
1991
1992impl From<RequestError> for AccountError {
1993 fn from(err: RequestError) -> Self {
1994 match err.kind {
1995 RequestErrorKind::NoResponders => {
1996 AccountError::with_source(AccountErrorKind::JetStreamUnavailable, err)
1997 }
1998 RequestErrorKind::TimedOut => AccountError::new(AccountErrorKind::TimedOut),
1999 RequestErrorKind::Other => AccountError::with_source(AccountErrorKind::Other, err),
2000 }
2001 }
2002}
2003
2004#[derive(Clone, Debug, Serialize)]
2005enum ConsumerAction {
2006 #[serde(rename = "")]
2007 CreateOrUpdate,
2008 #[serde(rename = "create")]
2009 #[cfg(feature = "server_2_10")]
2010 Create,
2011 #[serde(rename = "update")]
2012 #[cfg(feature = "server_2_10")]
2013 Update,
2014}
2015
2016fn map_to_kv(stream: super::stream::Stream, prefix: String, bucket: String) -> Store {
2018 let mut store = Store {
2019 prefix: format!("$KV.{}.", bucket.as_str()),
2020 name: bucket,
2021 stream: stream.clone(),
2022 stream_name: stream.info.config.name.clone(),
2023 put_prefix: None,
2024 use_jetstream_prefix: prefix != "$JS.API",
2025 };
2026 if let Some(ref mirror) = stream.info.config.mirror {
2027 let bucket = mirror.name.trim_start_matches("KV_");
2028 if let Some(ref external) = mirror.external {
2029 if !external.api_prefix.is_empty() {
2030 store.use_jetstream_prefix = false;
2031 store.prefix = format!("$KV.{bucket}.");
2032 store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
2033 } else {
2034 store.put_prefix = Some(format!("$KV.{bucket}."));
2035 }
2036 }
2037 };
2038 store
2039}
2040
2041enum KvToStreamConfigError {
2042 TooLongHistory,
2043 #[allow(dead_code)]
2044 LimitMarkersNotSupported,
2045}
2046
2047impl From<KvToStreamConfigError> for CreateKeyValueError {
2048 fn from(err: KvToStreamConfigError) -> Self {
2049 match err {
2050 KvToStreamConfigError::TooLongHistory => {
2051 CreateKeyValueError::new(CreateKeyValueErrorKind::TooLongHistory)
2052 }
2053 KvToStreamConfigError::LimitMarkersNotSupported => {
2054 CreateKeyValueError::new(CreateKeyValueErrorKind::LimitMarkersNotSupported)
2055 }
2056 }
2057 }
2058}
2059
2060impl From<KvToStreamConfigError> for UpdateKeyValueError {
2061 fn from(err: KvToStreamConfigError) -> Self {
2062 match err {
2063 KvToStreamConfigError::TooLongHistory => {
2064 UpdateKeyValueError::new(UpdateKeyValueErrorKind::TooLongHistory)
2065 }
2066 KvToStreamConfigError::LimitMarkersNotSupported => {
2067 UpdateKeyValueError::new(UpdateKeyValueErrorKind::LimitMarkersNotSupported)
2068 }
2069 }
2070 }
2071}
2072
2073fn kv_to_stream_config(
2075 config: kv::Config,
2076 _account: Account,
2077) -> Result<super::stream::Config, KvToStreamConfigError> {
2078 let history = if config.history > 0 {
2079 if config.history > MAX_HISTORY {
2080 return Err(KvToStreamConfigError::TooLongHistory);
2081 }
2082 config.history
2083 } else {
2084 1
2085 };
2086
2087 let num_replicas = if config.num_replicas == 0 {
2088 1
2089 } else {
2090 config.num_replicas
2091 };
2092
2093 #[cfg(feature = "server_2_11")]
2094 let (mut allow_message_ttl, mut subject_delete_marker_ttl) = (false, None);
2095
2096 #[cfg(feature = "server_2_11")]
2097 if let Some(duration) = config.limit_markers {
2098 if _account.requests.level < 1 {
2099 return Err(KvToStreamConfigError::LimitMarkersNotSupported);
2100 }
2101 allow_message_ttl = true;
2102 subject_delete_marker_ttl = Some(duration);
2103 }
2104
2105 let mut mirror = config.mirror.clone();
2106 let mut sources = config.sources.clone();
2107 let mut mirror_direct = config.mirror_direct;
2108
2109 let mut subjects = Vec::new();
2110 if let Some(ref mut mirror) = mirror {
2111 if !mirror.name.starts_with("KV_") {
2112 mirror.name = format!("KV_{}", mirror.name);
2113 }
2114 mirror_direct = true;
2115 } else if let Some(ref mut sources) = sources {
2116 for source in sources {
2117 if !source.name.starts_with("KV_") {
2118 source.name = format!("KV_{}", source.name);
2119 }
2120 }
2121 } else {
2122 subjects = vec![format!("$KV.{}.>", config.bucket)];
2123 }
2124
2125 Ok(stream::Config {
2126 name: format!("KV_{}", config.bucket),
2127 description: Some(config.description),
2128 subjects,
2129 max_messages_per_subject: history,
2130 max_bytes: config.max_bytes,
2131 max_age: config.max_age,
2132 max_message_size: config.max_value_size,
2133 storage: config.storage,
2134 republish: config.republish,
2135 allow_rollup: true,
2136 deny_delete: true,
2137 deny_purge: false,
2138 allow_direct: true,
2139 sources,
2140 mirror,
2141 num_replicas,
2142 discard: stream::DiscardPolicy::New,
2143 mirror_direct,
2144 #[cfg(feature = "server_2_10")]
2145 compression: if config.compression {
2146 Some(stream::Compression::S2)
2147 } else {
2148 None
2149 },
2150 placement: config.placement,
2151 #[cfg(feature = "server_2_11")]
2152 allow_message_ttl,
2153 #[cfg(feature = "server_2_11")]
2154 subject_delete_marker_ttl,
2155 ..Default::default()
2156 })
2157}