1use crate::error::Error;
17use crate::header::{IntoHeaderName, IntoHeaderValue};
18use crate::jetstream::account::Account;
19use crate::jetstream::publish::PublishAck;
20use crate::jetstream::response::Response;
21use crate::subject::ToSubject;
22use crate::{
23 header, is_valid_subject, Client, Command, HeaderMap, HeaderValue, Message, StatusCode,
24};
25use bytes::Bytes;
26use futures::future::BoxFuture;
27use futures::{Future, StreamExt, TryFutureExt};
28use serde::de::DeserializeOwned;
29use serde::{Deserialize, Serialize};
30use serde_json::{self, json};
31use std::borrow::Borrow;
32use std::fmt::Display;
33use std::future::IntoFuture;
34use std::pin::Pin;
35use std::str::from_utf8;
36use std::task::Poll;
37use std::time::Duration;
38use tokio::sync::oneshot;
39use tracing::debug;
40
41use super::consumer::{self, Consumer, FromConsumer, IntoConsumerConfig};
42use super::errors::ErrorCode;
43use super::kv::{Store, MAX_HISTORY};
44use super::object_store::{is_valid_bucket_name, ObjectStore};
45use super::stream::{
46 self, Config, ConsumerError, ConsumerErrorKind, DeleteStatus, DiscardPolicy, External, Info,
47 Stream,
48};
49#[cfg(feature = "server_2_10")]
50use super::stream::{Compression, ConsumerCreateStrictError, ConsumerUpdateError};
51use super::{is_valid_name, kv};
52
53#[derive(Debug, Clone)]
55pub struct Context {
56 pub(crate) client: Client,
57 pub(crate) prefix: String,
58 pub(crate) timeout: Duration,
59}
60
61impl Context {
62 pub(crate) fn new(client: Client) -> Context {
63 Context {
64 client,
65 prefix: "$JS.API".to_string(),
66 timeout: Duration::from_secs(5),
67 }
68 }
69
70 pub fn set_timeout(&mut self, timeout: Duration) {
71 self.timeout = timeout
72 }
73
74 pub(crate) fn with_prefix<T: ToString>(client: Client, prefix: T) -> Context {
75 Context {
76 client,
77 prefix: prefix.to_string(),
78 timeout: Duration::from_secs(5),
79 }
80 }
81
82 pub(crate) fn with_domain<T: AsRef<str>>(client: Client, domain: T) -> Context {
83 Context {
84 client,
85 prefix: format!("$JS.{}.API", domain.as_ref()),
86 timeout: Duration::from_secs(5),
87 }
88 }
89
90 pub async fn publish<S: ToSubject>(
131 &self,
132 subject: S,
133 payload: Bytes,
134 ) -> Result<PublishAckFuture, PublishError> {
135 self.send_publish(subject, Publish::build().payload(payload))
136 .await
137 }
138
139 pub async fn publish_with_headers<S: ToSubject>(
161 &self,
162 subject: S,
163 headers: crate::header::HeaderMap,
164 payload: Bytes,
165 ) -> Result<PublishAckFuture, PublishError> {
166 self.send_publish(subject, Publish::build().payload(payload).headers(headers))
167 .await
168 }
169
170 pub async fn send_publish<S: ToSubject>(
193 &self,
194 subject: S,
195 publish: Publish,
196 ) -> Result<PublishAckFuture, PublishError> {
197 let subject = subject.to_subject();
198 let (sender, receiver) = oneshot::channel();
199
200 let respond = self.client.new_inbox().into();
201
202 let send_fut = self
203 .client
204 .sender
205 .send(Command::Request {
206 subject,
207 payload: publish.payload,
208 respond,
209 headers: publish.headers,
210 sender,
211 })
212 .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err));
213
214 tokio::time::timeout(self.timeout, send_fut)
215 .map_err(|_elapsed| PublishError::new(PublishErrorKind::TimedOut))
216 .await??;
217
218 Ok(PublishAckFuture {
219 timeout: self.timeout,
220 subscription: receiver,
221 })
222 }
223
224 pub async fn query_account(&self) -> Result<Account, AccountError> {
226 let response: Response<Account> = self.request("INFO", b"").await?;
227
228 match response {
229 Response::Err { error } => Err(AccountError::new(AccountErrorKind::JetStream(error))),
230 Response::Ok(account) => Ok(account),
231 }
232 }
233
234 pub async fn create_stream<S>(
259 &self,
260 stream_config: S,
261 ) -> Result<Stream<Info>, CreateStreamError>
262 where
263 Config: From<S>,
264 {
265 let mut config: Config = stream_config.into();
266 if config.name.is_empty() {
267 return Err(CreateStreamError::new(
268 CreateStreamErrorKind::EmptyStreamName,
269 ));
270 }
271 if !is_valid_name(config.name.as_str()) {
272 return Err(CreateStreamError::new(
273 CreateStreamErrorKind::InvalidStreamName,
274 ));
275 }
276 if let Some(ref mut mirror) = config.mirror {
277 if let Some(ref mut domain) = mirror.domain {
278 if mirror.external.is_some() {
279 return Err(CreateStreamError::new(
280 CreateStreamErrorKind::DomainAndExternalSet,
281 ));
282 }
283 mirror.external = Some(External {
284 api_prefix: format!("$JS.{domain}.API"),
285 delivery_prefix: None,
286 })
287 }
288 }
289
290 if let Some(ref mut sources) = config.sources {
291 for source in sources {
292 if let Some(ref mut domain) = source.domain {
293 if source.external.is_some() {
294 return Err(CreateStreamError::new(
295 CreateStreamErrorKind::DomainAndExternalSet,
296 ));
297 }
298 source.external = Some(External {
299 api_prefix: format!("$JS.{domain}.API"),
300 delivery_prefix: None,
301 })
302 }
303 }
304 }
305 let subject = format!("STREAM.CREATE.{}", config.name);
306 let response: Response<Info> = self.request(subject, &config).await?;
307
308 match response {
309 Response::Err { error } => Err(error.into()),
310 Response::Ok(info) => Ok(Stream {
311 context: self.clone(),
312 info,
313 name: config.name,
314 }),
315 }
316 }
317
318 pub async fn get_stream_no_info<T: AsRef<str>>(
338 &self,
339 stream: T,
340 ) -> Result<Stream<()>, GetStreamError> {
341 let stream = stream.as_ref();
342 if stream.is_empty() {
343 return Err(GetStreamError::new(GetStreamErrorKind::EmptyName));
344 }
345
346 if !is_valid_name(stream) {
347 return Err(GetStreamError::new(GetStreamErrorKind::InvalidStreamName));
348 }
349
350 Ok(Stream {
351 context: self.clone(),
352 info: (),
353 name: stream.to_string(),
354 })
355 }
356
357 pub async fn get_stream<T: AsRef<str>>(&self, stream: T) -> Result<Stream, GetStreamError> {
373 let stream = stream.as_ref();
374 if stream.is_empty() {
375 return Err(GetStreamError::new(GetStreamErrorKind::EmptyName));
376 }
377
378 if !is_valid_name(stream) {
379 return Err(GetStreamError::new(GetStreamErrorKind::InvalidStreamName));
380 }
381
382 let subject = format!("STREAM.INFO.{stream}");
383 let request: Response<Info> = self
384 .request(subject, &())
385 .await
386 .map_err(|err| GetStreamError::with_source(GetStreamErrorKind::Request, err))?;
387 match request {
388 Response::Err { error } => {
389 Err(GetStreamError::new(GetStreamErrorKind::JetStream(error)))
390 }
391 Response::Ok(info) => Ok(Stream {
392 context: self.clone(),
393 info,
394 name: stream.to_string(),
395 }),
396 }
397 }
398
399 pub async fn get_or_create_stream<S>(
423 &self,
424 stream_config: S,
425 ) -> Result<Stream, CreateStreamError>
426 where
427 S: Into<Config>,
428 {
429 let config: Config = stream_config.into();
430
431 if config.name.is_empty() {
432 return Err(CreateStreamError::new(
433 CreateStreamErrorKind::EmptyStreamName,
434 ));
435 }
436
437 if !is_valid_name(config.name.as_str()) {
438 return Err(CreateStreamError::new(
439 CreateStreamErrorKind::InvalidStreamName,
440 ));
441 }
442 let subject = format!("STREAM.INFO.{}", config.name);
443
444 let request: Response<Info> = self.request(subject, &()).await?;
445 match request {
446 Response::Err { error } if error.code() == 404 => self.create_stream(&config).await,
447 Response::Err { error } => Err(error.into()),
448 Response::Ok(info) => Ok(Stream {
449 context: self.clone(),
450 info,
451 name: config.name,
452 }),
453 }
454 }
455
456 pub async fn delete_stream<T: AsRef<str>>(
472 &self,
473 stream: T,
474 ) -> Result<DeleteStatus, DeleteStreamError> {
475 let stream = stream.as_ref();
476 if stream.is_empty() {
477 return Err(DeleteStreamError::new(DeleteStreamErrorKind::EmptyName));
478 }
479
480 if !is_valid_name(stream) {
481 return Err(DeleteStreamError::new(
482 DeleteStreamErrorKind::InvalidStreamName,
483 ));
484 }
485
486 let subject = format!("STREAM.DELETE.{stream}");
487 match self
488 .request(subject, &json!({}))
489 .await
490 .map_err(|err| DeleteStreamError::with_source(DeleteStreamErrorKind::Request, err))?
491 {
492 Response::Err { error } => Err(DeleteStreamError::new(
493 DeleteStreamErrorKind::JetStream(error),
494 )),
495 Response::Ok(delete_response) => Ok(delete_response),
496 }
497 }
498
499 pub async fn update_stream<S>(&self, config: S) -> Result<Info, UpdateStreamError>
524 where
525 S: Borrow<Config>,
526 {
527 let config = config.borrow();
528
529 if config.name.is_empty() {
530 return Err(CreateStreamError::new(
531 CreateStreamErrorKind::EmptyStreamName,
532 ));
533 }
534
535 if !is_valid_name(config.name.as_str()) {
536 return Err(CreateStreamError::new(
537 CreateStreamErrorKind::InvalidStreamName,
538 ));
539 }
540
541 let subject = format!("STREAM.UPDATE.{}", config.name);
542 match self.request(subject, config).await? {
543 Response::Err { error } => Err(error.into()),
544 Response::Ok(info) => Ok(info),
545 }
546 }
547
548 pub async fn create_or_update_stream(&self, config: Config) -> Result<Info, CreateStreamError> {
572 match self.update_stream(config.clone()).await {
573 Ok(stream) => Ok(stream),
574 Err(err) => match err.kind() {
575 CreateStreamErrorKind::NotFound => {
576 let stream = self
577 .create_stream(config)
578 .await
579 .map_err(|err| CreateStreamError::with_source(err.kind(), err))?;
580 Ok(stream.info)
581 }
582 _ => Err(err),
583 },
584 }
585 }
586
587 pub async fn stream_by_subject<T: Into<String>>(
602 &self,
603 subject: T,
604 ) -> Result<String, GetStreamByNameError> {
605 let subject = subject.into();
606 if !is_valid_subject(subject.as_str()) {
607 return Err(GetStreamByNameError::new(
608 GetStreamByNameErrorKind::InvalidSubject,
609 ));
610 }
611 let mut names = StreamNames {
612 context: self.clone(),
613 offset: 0,
614 page_request: None,
615 streams: Vec::new(),
616 subject: Some(subject),
617 done: false,
618 };
619 match names.next().await {
620 Some(name) => match name {
621 Ok(name) => Ok(name),
622 Err(err) => Err(GetStreamByNameError::with_source(
623 GetStreamByNameErrorKind::Request,
624 err,
625 )),
626 },
627 None => Err(GetStreamByNameError::new(
628 GetStreamByNameErrorKind::NotFound,
629 )),
630 }
631 }
632
633 pub fn stream_names(&self) -> StreamNames {
651 StreamNames {
652 context: self.clone(),
653 offset: 0,
654 page_request: None,
655 streams: Vec::new(),
656 subject: None,
657 done: false,
658 }
659 }
660
661 pub fn streams(&self) -> Streams {
679 Streams {
680 context: self.clone(),
681 offset: 0,
682 page_request: None,
683 streams: Vec::new(),
684 done: false,
685 }
686 }
687 pub async fn get_key_value<T: Into<String>>(&self, bucket: T) -> Result<Store, KeyValueError> {
701 let bucket: String = bucket.into();
702 if !crate::jetstream::kv::is_valid_bucket_name(&bucket) {
703 return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
704 }
705
706 let stream_name = format!("KV_{}", &bucket);
707 let stream = self
708 .get_stream(stream_name.clone())
709 .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::GetBucket, err))
710 .await?;
711
712 if stream.info.config.max_messages_per_subject < 1 {
713 return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
714 }
715 let mut store = Store {
716 prefix: format!("$KV.{}.", &bucket),
717 name: bucket,
718 stream_name,
719 stream: stream.clone(),
720 put_prefix: None,
721 use_jetstream_prefix: self.prefix != "$JS.API",
722 };
723 if let Some(ref mirror) = stream.info.config.mirror {
724 let bucket = mirror.name.trim_start_matches("KV_");
725 if let Some(ref external) = mirror.external {
726 if !external.api_prefix.is_empty() {
727 store.use_jetstream_prefix = false;
728 store.prefix = format!("$KV.{bucket}.");
729 store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
730 } else {
731 store.put_prefix = Some(format!("$KV.{bucket}."));
732 }
733 }
734 };
735
736 Ok(store)
737 }
738
739 pub async fn create_key_value(
759 &self,
760 config: crate::jetstream::kv::Config,
761 ) -> Result<Store, CreateKeyValueError> {
762 if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
763 return Err(CreateKeyValueError::new(
764 CreateKeyValueErrorKind::InvalidStoreName,
765 ));
766 }
767 let info = self.query_account().await.map_err(|err| {
768 CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
769 })?;
770
771 let bucket_name = config.bucket.clone();
772 let stream_config = kv_to_stream_config(config, info)?;
773
774 let stream = self.create_stream(stream_config).await.map_err(|err| {
775 if err.kind() == CreateStreamErrorKind::TimedOut {
776 CreateKeyValueError::with_source(CreateKeyValueErrorKind::TimedOut, err)
777 } else {
778 CreateKeyValueError::with_source(CreateKeyValueErrorKind::BucketCreate, err)
779 }
780 })?;
781
782 Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
783 }
784
785 pub async fn update_key_value(
805 &self,
806 config: crate::jetstream::kv::Config,
807 ) -> Result<Store, UpdateKeyValueError> {
808 if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
809 return Err(UpdateKeyValueError::new(
810 UpdateKeyValueErrorKind::InvalidStoreName,
811 ));
812 }
813
814 let stream_name = format!("KV_{}", config.bucket);
815 let bucket_name = config.bucket.clone();
816
817 let account = self.query_account().await.map_err(|err| {
818 UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::JetStream, err)
819 })?;
820 let stream = self
821 .update_stream(kv_to_stream_config(config, account)?)
822 .await
823 .map_err(|err| match err.kind() {
824 UpdateStreamErrorKind::NotFound => {
825 UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::NotFound, err)
826 }
827 _ => UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::JetStream, err),
828 })?;
829
830 let stream = Stream {
831 context: self.clone(),
832 info: stream,
833 name: stream_name,
834 };
835
836 Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
837 }
838
839 pub async fn create_or_update_key_value(
859 &self,
860 config: crate::jetstream::kv::Config,
861 ) -> Result<Store, CreateKeyValueError> {
862 if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
863 return Err(CreateKeyValueError::new(
864 CreateKeyValueErrorKind::InvalidStoreName,
865 ));
866 }
867
868 let bucket_name = config.bucket.clone();
869 let stream_name = format!("KV_{}", config.bucket);
870
871 let account = self.query_account().await.map_err(|err| {
872 CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
873 })?;
874 let stream = self
875 .create_or_update_stream(kv_to_stream_config(config, account)?)
876 .await
877 .map_err(|err| {
878 CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
879 })?;
880
881 let stream = Stream {
882 context: self.clone(),
883 info: stream,
884 name: stream_name,
885 };
886
887 Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
888 }
889
890 pub async fn delete_key_value<T: AsRef<str>>(
910 &self,
911 bucket: T,
912 ) -> Result<DeleteStatus, KeyValueError> {
913 if !crate::jetstream::kv::is_valid_bucket_name(bucket.as_ref()) {
914 return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
915 }
916
917 let stream_name = format!("KV_{}", bucket.as_ref());
918 self.delete_stream(stream_name)
919 .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::JetStream, err))
920 .await
921 }
922
923 pub async fn get_consumer_from_stream<T, C, S>(
961 &self,
962 consumer: C,
963 stream: S,
964 ) -> Result<Consumer<T>, ConsumerError>
965 where
966 T: FromConsumer + IntoConsumerConfig,
967 S: AsRef<str>,
968 C: AsRef<str>,
969 {
970 if !is_valid_name(stream.as_ref()) {
971 return Err(ConsumerError::with_source(
972 ConsumerErrorKind::InvalidName,
973 "invalid stream",
974 ));
975 }
976
977 if !is_valid_name(consumer.as_ref()) {
978 return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
979 }
980
981 let subject = format!("CONSUMER.INFO.{}.{}", stream.as_ref(), consumer.as_ref());
982
983 let info: super::consumer::Info = match self.request(subject, &json!({})).await? {
984 Response::Ok(info) => info,
985 Response::Err { error } => return Err(error.into()),
986 };
987
988 Ok(Consumer::new(
989 T::try_from_consumer_config(info.config.clone()).map_err(|err| {
990 ConsumerError::with_source(ConsumerErrorKind::InvalidConsumerType, err)
991 })?,
992 info,
993 self.clone(),
994 ))
995 }
996
997 pub async fn delete_consumer_from_stream<C: AsRef<str>, S: AsRef<str>>(
1020 &self,
1021 consumer: C,
1022 stream: S,
1023 ) -> Result<DeleteStatus, ConsumerError> {
1024 if !is_valid_name(consumer.as_ref()) {
1025 return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
1026 }
1027
1028 if !is_valid_name(stream.as_ref()) {
1029 return Err(ConsumerError::with_source(
1030 ConsumerErrorKind::Other,
1031 "invalid stream name",
1032 ));
1033 }
1034
1035 let subject = format!("CONSUMER.DELETE.{}.{}", stream.as_ref(), consumer.as_ref());
1036
1037 match self.request(subject, &json!({})).await? {
1038 Response::Ok(delete_status) => Ok(delete_status),
1039 Response::Err { error } => Err(error.into()),
1040 }
1041 }
1042
1043 pub async fn create_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
1069 &self,
1070 config: C,
1071 stream: S,
1072 ) -> Result<Consumer<C>, ConsumerError> {
1073 self.create_consumer_on_stream_action(config, stream, ConsumerAction::CreateOrUpdate)
1074 .await
1075 }
1076
1077 #[cfg(feature = "server_2_10")]
1104 pub async fn update_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
1105 &self,
1106 config: C,
1107 stream: S,
1108 ) -> Result<Consumer<C>, ConsumerUpdateError> {
1109 self.create_consumer_on_stream_action(config, stream, ConsumerAction::Update)
1110 .await
1111 .map_err(|err| err.into())
1112 }
1113
1114 #[cfg(feature = "server_2_10")]
1141 pub async fn create_consumer_strict_on_stream<
1142 C: IntoConsumerConfig + FromConsumer,
1143 S: AsRef<str>,
1144 >(
1145 &self,
1146 config: C,
1147 stream: S,
1148 ) -> Result<Consumer<C>, ConsumerCreateStrictError> {
1149 self.create_consumer_on_stream_action(config, stream, ConsumerAction::Create)
1150 .await
1151 .map_err(|err| err.into())
1152 }
1153
1154 async fn create_consumer_on_stream_action<
1155 C: IntoConsumerConfig + FromConsumer,
1156 S: AsRef<str>,
1157 >(
1158 &self,
1159 config: C,
1160 stream: S,
1161 action: ConsumerAction,
1162 ) -> Result<Consumer<C>, ConsumerError> {
1163 let config = config.into_consumer_config();
1164
1165 let subject = {
1166 let filter = if config.filter_subject.is_empty() {
1167 "".to_string()
1168 } else {
1169 format!(".{}", config.filter_subject)
1170 };
1171 config
1172 .name
1173 .as_ref()
1174 .or(config.durable_name.as_ref())
1175 .map(|name| format!("CONSUMER.CREATE.{}.{}{}", stream.as_ref(), name, filter))
1176 .unwrap_or_else(|| format!("CONSUMER.CREATE.{}", stream.as_ref()))
1177 };
1178
1179 match self
1180 .request(
1181 subject,
1182 &json!({"stream_name": stream.as_ref(), "config": config, "action": action}),
1183 )
1184 .await?
1185 {
1186 Response::Err { error } => Err(ConsumerError::new(ConsumerErrorKind::JetStream(error))),
1187 Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
1188 FromConsumer::try_from_consumer_config(info.clone().config)
1189 .map_err(|err| ConsumerError::with_source(ConsumerErrorKind::Other, err))?,
1190 info,
1191 self.clone(),
1192 )),
1193 }
1194 }
1195
1196 pub async fn request<S, T, V>(&self, subject: S, payload: &T) -> Result<V, RequestError>
1216 where
1217 S: ToSubject,
1218 T: ?Sized + Serialize,
1219 V: DeserializeOwned,
1220 {
1221 let subject = subject.to_subject();
1222 let request = serde_json::to_vec(&payload)
1223 .map(Bytes::from)
1224 .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
1225
1226 debug!("JetStream request sent: {:?}", request);
1227
1228 let message = self
1229 .client
1230 .request(format!("{}.{}", self.prefix, subject.as_ref()), request)
1231 .await;
1232 let message = message?;
1233 debug!(
1234 "JetStream request response: {:?}",
1235 from_utf8(&message.payload)
1236 );
1237 let response = serde_json::from_slice(message.payload.as_ref())
1238 .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
1239
1240 Ok(response)
1241 }
1242
1243 pub async fn create_object_store(
1262 &self,
1263 config: super::object_store::Config,
1264 ) -> Result<super::object_store::ObjectStore, CreateObjectStoreError> {
1265 if !super::object_store::is_valid_bucket_name(&config.bucket) {
1266 return Err(CreateObjectStoreError::new(
1267 CreateKeyValueErrorKind::InvalidStoreName,
1268 ));
1269 }
1270
1271 let bucket_name = config.bucket.clone();
1272 let stream_name = format!("OBJ_{bucket_name}");
1273 let chunk_subject = format!("$O.{bucket_name}.C.>");
1274 let meta_subject = format!("$O.{bucket_name}.M.>");
1275
1276 let stream = self
1277 .create_stream(super::stream::Config {
1278 name: stream_name,
1279 description: config.description.clone(),
1280 subjects: vec![chunk_subject, meta_subject],
1281 max_age: config.max_age,
1282 max_bytes: config.max_bytes,
1283 storage: config.storage,
1284 num_replicas: config.num_replicas,
1285 discard: DiscardPolicy::New,
1286 allow_rollup: true,
1287 allow_direct: true,
1288 #[cfg(feature = "server_2_10")]
1289 compression: if config.compression {
1290 Some(Compression::S2)
1291 } else {
1292 None
1293 },
1294 placement: config.placement,
1295 ..Default::default()
1296 })
1297 .await
1298 .map_err(|err| {
1299 CreateObjectStoreError::with_source(CreateKeyValueErrorKind::BucketCreate, err)
1300 })?;
1301
1302 Ok(ObjectStore {
1303 name: bucket_name,
1304 stream,
1305 })
1306 }
1307
1308 pub async fn get_object_store<T: AsRef<str>>(
1322 &self,
1323 bucket_name: T,
1324 ) -> Result<ObjectStore, ObjectStoreError> {
1325 let bucket_name = bucket_name.as_ref();
1326 if !is_valid_bucket_name(bucket_name) {
1327 return Err(ObjectStoreError::new(
1328 ObjectStoreErrorKind::InvalidBucketName,
1329 ));
1330 }
1331 let stream_name = format!("OBJ_{bucket_name}");
1332 let stream = self
1333 .get_stream(stream_name)
1334 .await
1335 .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?;
1336
1337 Ok(ObjectStore {
1338 name: bucket_name.to_string(),
1339 stream,
1340 })
1341 }
1342
1343 pub async fn delete_object_store<T: AsRef<str>>(
1357 &self,
1358 bucket_name: T,
1359 ) -> Result<(), DeleteObjectStore> {
1360 let stream_name = format!("OBJ_{}", bucket_name.as_ref());
1361 self.delete_stream(stream_name)
1362 .await
1363 .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?;
1364 Ok(())
1365 }
1366}
1367
1368#[derive(Clone, Copy, Debug, PartialEq)]
1369pub enum PublishErrorKind {
1370 StreamNotFound,
1371 WrongLastMessageId,
1372 WrongLastSequence,
1373 TimedOut,
1374 BrokenPipe,
1375 Other,
1376}
1377
1378impl Display for PublishErrorKind {
1379 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1380 match self {
1381 Self::StreamNotFound => write!(f, "no stream found for given subject"),
1382 Self::TimedOut => write!(f, "timed out: didn't receive ack in time"),
1383 Self::Other => write!(f, "publish failed"),
1384 Self::BrokenPipe => write!(f, "broken pipe"),
1385 Self::WrongLastMessageId => write!(f, "wrong last message id"),
1386 Self::WrongLastSequence => write!(f, "wrong last sequence"),
1387 }
1388 }
1389}
1390
1391pub type PublishError = Error<PublishErrorKind>;
1392
1393#[derive(Debug)]
1394pub struct PublishAckFuture {
1395 timeout: Duration,
1396 subscription: oneshot::Receiver<Message>,
1397}
1398
1399impl PublishAckFuture {
1400 async fn next_with_timeout(self) -> Result<PublishAck, PublishError> {
1401 let next = tokio::time::timeout(self.timeout, self.subscription)
1402 .await
1403 .map_err(|_| PublishError::new(PublishErrorKind::TimedOut))?;
1404 next.map_or_else(
1405 |_| Err(PublishError::new(PublishErrorKind::BrokenPipe)),
1406 |m| {
1407 if m.status == Some(StatusCode::NO_RESPONDERS) {
1408 return Err(PublishError::new(PublishErrorKind::StreamNotFound));
1409 }
1410 let response = serde_json::from_slice(m.payload.as_ref())
1411 .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err))?;
1412 match response {
1413 Response::Err { error } => match error.error_code() {
1414 ErrorCode::STREAM_WRONG_LAST_MESSAGE_ID => Err(PublishError::with_source(
1415 PublishErrorKind::WrongLastMessageId,
1416 error,
1417 )),
1418 ErrorCode::STREAM_WRONG_LAST_SEQUENCE => Err(PublishError::with_source(
1419 PublishErrorKind::WrongLastSequence,
1420 error,
1421 )),
1422 _ => Err(PublishError::with_source(PublishErrorKind::Other, error)),
1423 },
1424 Response::Ok(publish_ack) => Ok(publish_ack),
1425 }
1426 },
1427 )
1428 }
1429}
1430impl IntoFuture for PublishAckFuture {
1431 type Output = Result<PublishAck, PublishError>;
1432
1433 type IntoFuture = Pin<Box<dyn Future<Output = Result<PublishAck, PublishError>> + Send>>;
1434
1435 fn into_future(self) -> Self::IntoFuture {
1436 Box::pin(std::future::IntoFuture::into_future(
1437 self.next_with_timeout(),
1438 ))
1439 }
1440}
1441
1442#[derive(Deserialize, Debug)]
1443struct StreamPage {
1444 total: usize,
1445 streams: Option<Vec<String>>,
1446}
1447
1448#[derive(Deserialize, Debug)]
1449struct StreamInfoPage {
1450 total: usize,
1451 streams: Option<Vec<super::stream::Info>>,
1452}
1453
1454type PageRequest = BoxFuture<'static, Result<StreamPage, RequestError>>;
1455
1456pub struct StreamNames {
1457 context: Context,
1458 offset: usize,
1459 page_request: Option<PageRequest>,
1460 subject: Option<String>,
1461 streams: Vec<String>,
1462 done: bool,
1463}
1464
1465impl futures::Stream for StreamNames {
1466 type Item = Result<String, StreamsError>;
1467
1468 fn poll_next(
1469 mut self: Pin<&mut Self>,
1470 cx: &mut std::task::Context<'_>,
1471 ) -> std::task::Poll<Option<Self::Item>> {
1472 match self.page_request.as_mut() {
1473 Some(page) => match page.try_poll_unpin(cx) {
1474 std::task::Poll::Ready(page) => {
1475 self.page_request = None;
1476 let page = page
1477 .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?;
1478 if let Some(streams) = page.streams {
1479 self.offset += streams.len();
1480 self.streams = streams;
1481 if self.offset >= page.total {
1482 self.done = true;
1483 }
1484 match self.streams.pop() {
1485 Some(stream) => Poll::Ready(Some(Ok(stream))),
1486 None => Poll::Ready(None),
1487 }
1488 } else {
1489 Poll::Ready(None)
1490 }
1491 }
1492 std::task::Poll::Pending => std::task::Poll::Pending,
1493 },
1494 None => {
1495 if let Some(stream) = self.streams.pop() {
1496 Poll::Ready(Some(Ok(stream)))
1497 } else {
1498 if self.done {
1499 return Poll::Ready(None);
1500 }
1501 let context = self.context.clone();
1502 let offset = self.offset;
1503 let subject = self.subject.clone();
1504 self.page_request = Some(Box::pin(async move {
1505 match context
1506 .request(
1507 "STREAM.NAMES",
1508 &json!({
1509 "offset": offset,
1510 "subject": subject
1511 }),
1512 )
1513 .await?
1514 {
1515 Response::Err { error } => {
1516 Err(RequestError::with_source(RequestErrorKind::Other, error))
1517 }
1518 Response::Ok(page) => Ok(page),
1519 }
1520 }));
1521 self.poll_next(cx)
1522 }
1523 }
1524 }
1525 }
1526}
1527
1528type PageInfoRequest = BoxFuture<'static, Result<StreamInfoPage, RequestError>>;
1529
1530pub type StreamsErrorKind = RequestErrorKind;
1531pub type StreamsError = RequestError;
1532
1533pub struct Streams {
1534 context: Context,
1535 offset: usize,
1536 page_request: Option<PageInfoRequest>,
1537 streams: Vec<super::stream::Info>,
1538 done: bool,
1539}
1540
1541impl futures::Stream for Streams {
1542 type Item = Result<super::stream::Info, StreamsError>;
1543
1544 fn poll_next(
1545 mut self: Pin<&mut Self>,
1546 cx: &mut std::task::Context<'_>,
1547 ) -> std::task::Poll<Option<Self::Item>> {
1548 match self.page_request.as_mut() {
1549 Some(page) => match page.try_poll_unpin(cx) {
1550 std::task::Poll::Ready(page) => {
1551 self.page_request = None;
1552 let page = page
1553 .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?;
1554 if let Some(streams) = page.streams {
1555 self.offset += streams.len();
1556 self.streams = streams;
1557 if self.offset >= page.total {
1558 self.done = true;
1559 }
1560 match self.streams.pop() {
1561 Some(stream) => Poll::Ready(Some(Ok(stream))),
1562 None => Poll::Ready(None),
1563 }
1564 } else {
1565 Poll::Ready(None)
1566 }
1567 }
1568 std::task::Poll::Pending => std::task::Poll::Pending,
1569 },
1570 None => {
1571 if let Some(stream) = self.streams.pop() {
1572 Poll::Ready(Some(Ok(stream)))
1573 } else {
1574 if self.done {
1575 return Poll::Ready(None);
1576 }
1577 let context = self.context.clone();
1578 let offset = self.offset;
1579 self.page_request = Some(Box::pin(async move {
1580 match context
1581 .request(
1582 "STREAM.LIST",
1583 &json!({
1584 "offset": offset,
1585 }),
1586 )
1587 .await?
1588 {
1589 Response::Err { error } => {
1590 Err(RequestError::with_source(RequestErrorKind::Other, error))
1591 }
1592 Response::Ok(page) => Ok(page),
1593 }
1594 }));
1595 self.poll_next(cx)
1596 }
1597 }
1598 }
1599 }
1600}
1601#[derive(Default, Clone, Debug)]
1603pub struct Publish {
1604 payload: Bytes,
1605 headers: Option<header::HeaderMap>,
1606}
1607impl Publish {
1608 pub fn build() -> Self {
1610 Default::default()
1611 }
1612
1613 pub fn payload(mut self, payload: Bytes) -> Self {
1615 self.payload = payload;
1616 self
1617 }
1618 pub fn headers(mut self, headers: HeaderMap) -> Self {
1620 self.headers = Some(headers);
1621 self
1622 }
1623 pub fn header<N: IntoHeaderName, V: IntoHeaderValue>(mut self, name: N, value: V) -> Self {
1625 self.headers
1626 .get_or_insert(header::HeaderMap::new())
1627 .insert(name, value);
1628 self
1629 }
1630 pub fn message_id<T: AsRef<str>>(self, id: T) -> Self {
1632 self.header(header::NATS_MESSAGE_ID, id.as_ref())
1633 }
1634 pub fn expected_last_message_id<T: AsRef<str>>(self, last_message_id: T) -> Self {
1637 self.header(
1638 header::NATS_EXPECTED_LAST_MESSAGE_ID,
1639 last_message_id.as_ref(),
1640 )
1641 }
1642 pub fn expected_last_sequence(self, last_sequence: u64) -> Self {
1645 self.header(
1646 header::NATS_EXPECTED_LAST_SEQUENCE,
1647 HeaderValue::from(last_sequence),
1648 )
1649 }
1650 pub fn expected_last_subject_sequence(self, subject_sequence: u64) -> Self {
1653 self.header(
1654 header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
1655 HeaderValue::from(subject_sequence),
1656 )
1657 }
1658 pub fn expected_stream<T: AsRef<str>>(self, stream: T) -> Self {
1661 self.header(
1662 header::NATS_EXPECTED_STREAM,
1663 HeaderValue::from(stream.as_ref()),
1664 )
1665 }
1666
1667 #[cfg(feature = "server_2_11")]
1668 pub fn ttl(self, ttl: Duration) -> Self {
1671 self.header(header::NATS_MESSAGE_TTL, ttl.as_secs().to_string())
1672 }
1673}
1674
1675#[derive(Clone, Copy, Debug, PartialEq)]
1676pub enum RequestErrorKind {
1677 NoResponders,
1678 TimedOut,
1679 Other,
1680}
1681
1682impl Display for RequestErrorKind {
1683 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1684 match self {
1685 Self::TimedOut => write!(f, "timed out"),
1686 Self::Other => write!(f, "request failed"),
1687 Self::NoResponders => write!(f, "requested JetStream resource does not exist"),
1688 }
1689 }
1690}
1691
1692pub type RequestError = Error<RequestErrorKind>;
1693
1694impl From<crate::RequestError> for RequestError {
1695 fn from(error: crate::RequestError) -> Self {
1696 match error.kind() {
1697 crate::RequestErrorKind::TimedOut => {
1698 RequestError::with_source(RequestErrorKind::TimedOut, error)
1699 }
1700 crate::RequestErrorKind::NoResponders => {
1701 RequestError::new(RequestErrorKind::NoResponders)
1702 }
1703 crate::RequestErrorKind::Other => {
1704 RequestError::with_source(RequestErrorKind::Other, error)
1705 }
1706 }
1707 }
1708}
1709
1710impl From<super::errors::Error> for RequestError {
1711 fn from(err: super::errors::Error) -> Self {
1712 RequestError::with_source(RequestErrorKind::Other, err)
1713 }
1714}
1715
1716pub type ConsumerInfoError = Error<ConsumerInfoErrorKind>;
1717
1718#[derive(Clone, Debug, PartialEq)]
1719pub enum ConsumerInfoErrorKind {
1720 InvalidName,
1721 Offline,
1722 NotFound,
1723 StreamNotFound,
1724 Request,
1725 JetStream(super::errors::Error),
1726 TimedOut,
1727 NoResponders,
1728}
1729
1730impl Display for ConsumerInfoErrorKind {
1731 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1732 match self {
1733 Self::InvalidName => write!(f, "invalid consumer name"),
1734 Self::Offline => write!(f, "consumer is offline"),
1735 Self::NotFound => write!(f, "consumer not found"),
1736 Self::StreamNotFound => write!(f, "stream not found"),
1737 Self::Request => write!(f, "request error"),
1738 Self::JetStream(err) => write!(f, "jetstream error: {}", err),
1739 Self::TimedOut => write!(f, "timed out"),
1740 Self::NoResponders => write!(f, "no responders"),
1741 }
1742 }
1743}
1744
1745impl From<super::errors::Error> for ConsumerInfoError {
1746 fn from(error: super::errors::Error) -> Self {
1747 match error.error_code() {
1748 ErrorCode::CONSUMER_NOT_FOUND => {
1749 ConsumerInfoError::new(ConsumerInfoErrorKind::NotFound)
1750 }
1751 ErrorCode::STREAM_NOT_FOUND => {
1752 ConsumerInfoError::new(ConsumerInfoErrorKind::StreamNotFound)
1753 }
1754 ErrorCode::CONSUMER_OFFLINE => ConsumerInfoError::new(ConsumerInfoErrorKind::Offline),
1755 _ => ConsumerInfoError::new(ConsumerInfoErrorKind::JetStream(error)),
1756 }
1757 }
1758}
1759
1760impl From<RequestError> for ConsumerInfoError {
1761 fn from(error: RequestError) -> Self {
1762 match error.kind() {
1763 RequestErrorKind::TimedOut => ConsumerInfoError::new(ConsumerInfoErrorKind::TimedOut),
1764 RequestErrorKind::Other => {
1765 ConsumerInfoError::with_source(ConsumerInfoErrorKind::Request, error)
1766 }
1767 RequestErrorKind::NoResponders => {
1768 ConsumerInfoError::new(ConsumerInfoErrorKind::NoResponders)
1769 }
1770 }
1771 }
1772}
1773
1774#[derive(Clone, Debug, PartialEq)]
1775pub enum CreateStreamErrorKind {
1776 EmptyStreamName,
1777 InvalidStreamName,
1778 DomainAndExternalSet,
1779 JetStreamUnavailable,
1780 JetStream(super::errors::Error),
1781 TimedOut,
1782 Response,
1783 NotFound,
1784 ResponseParse,
1785}
1786
1787impl Display for CreateStreamErrorKind {
1788 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1789 match self {
1790 Self::EmptyStreamName => write!(f, "stream name cannot be empty"),
1791 Self::InvalidStreamName => write!(f, "stream name cannot contain `.`, `_`"),
1792 Self::DomainAndExternalSet => write!(f, "domain and external are both set"),
1793 Self::NotFound => write!(f, "stream not found"),
1794 Self::JetStream(err) => write!(f, "jetstream error: {}", err),
1795 Self::TimedOut => write!(f, "jetstream request timed out"),
1796 Self::JetStreamUnavailable => write!(f, "jetstream unavailable"),
1797 Self::ResponseParse => write!(f, "failed to parse server response"),
1798 Self::Response => write!(f, "response error"),
1799 }
1800 }
1801}
1802
1803pub type CreateStreamError = Error<CreateStreamErrorKind>;
1804
1805impl From<super::errors::Error> for CreateStreamError {
1806 fn from(error: super::errors::Error) -> Self {
1807 match error.kind() {
1808 super::errors::ErrorCode::STREAM_NOT_FOUND => {
1809 CreateStreamError::new(CreateStreamErrorKind::NotFound)
1810 }
1811 _ => CreateStreamError::new(CreateStreamErrorKind::JetStream(error)),
1812 }
1813 }
1814}
1815
1816impl From<RequestError> for CreateStreamError {
1817 fn from(error: RequestError) -> Self {
1818 match error.kind() {
1819 RequestErrorKind::NoResponders => {
1820 CreateStreamError::new(CreateStreamErrorKind::JetStreamUnavailable)
1821 }
1822 RequestErrorKind::TimedOut => CreateStreamError::new(CreateStreamErrorKind::TimedOut),
1823 RequestErrorKind::Other => {
1824 CreateStreamError::with_source(CreateStreamErrorKind::Response, error)
1825 }
1826 }
1827 }
1828}
1829
1830#[derive(Clone, Debug, PartialEq)]
1831pub enum GetStreamErrorKind {
1832 EmptyName,
1833 Request,
1834 InvalidStreamName,
1835 JetStream(super::errors::Error),
1836}
1837
1838impl Display for GetStreamErrorKind {
1839 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1840 match self {
1841 Self::EmptyName => write!(f, "empty name cannot be empty"),
1842 Self::Request => write!(f, "request error"),
1843 Self::InvalidStreamName => write!(f, "invalid stream name"),
1844 Self::JetStream(err) => write!(f, "jetstream error: {}", err),
1845 }
1846 }
1847}
1848
1849#[derive(Clone, Debug, PartialEq)]
1850pub enum GetStreamByNameErrorKind {
1851 Request,
1852 NotFound,
1853 InvalidSubject,
1854 JetStream(super::errors::Error),
1855}
1856
1857impl Display for GetStreamByNameErrorKind {
1858 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1859 match self {
1860 Self::Request => write!(f, "request error"),
1861 Self::NotFound => write!(f, "stream not found"),
1862 Self::InvalidSubject => write!(f, "invalid subject"),
1863 Self::JetStream(err) => write!(f, "jetstream error: {}", err),
1864 }
1865 }
1866}
1867
1868pub type GetStreamError = Error<GetStreamErrorKind>;
1869pub type GetStreamByNameError = Error<GetStreamByNameErrorKind>;
1870
1871pub type UpdateStreamError = CreateStreamError;
1872pub type UpdateStreamErrorKind = CreateStreamErrorKind;
1873pub type DeleteStreamError = GetStreamError;
1874pub type DeleteStreamErrorKind = GetStreamErrorKind;
1875
1876#[derive(Clone, Copy, Debug, PartialEq)]
1877pub enum KeyValueErrorKind {
1878 InvalidStoreName,
1879 GetBucket,
1880 JetStream,
1881}
1882
1883impl Display for KeyValueErrorKind {
1884 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1885 match self {
1886 Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
1887 Self::GetBucket => write!(f, "failed to get the bucket"),
1888 Self::JetStream => write!(f, "JetStream error"),
1889 }
1890 }
1891}
1892
1893pub type KeyValueError = Error<KeyValueErrorKind>;
1894
1895#[derive(Clone, Copy, Debug, PartialEq)]
1896pub enum CreateKeyValueErrorKind {
1897 InvalidStoreName,
1898 TooLongHistory,
1899 JetStream,
1900 BucketCreate,
1901 TimedOut,
1902 LimitMarkersNotSupported,
1903}
1904
1905impl Display for CreateKeyValueErrorKind {
1906 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1907 match self {
1908 Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
1909 Self::TooLongHistory => write!(f, "too long history"),
1910 Self::JetStream => write!(f, "JetStream error"),
1911 Self::BucketCreate => write!(f, "bucket creation failed"),
1912 Self::TimedOut => write!(f, "timed out"),
1913 Self::LimitMarkersNotSupported => {
1914 write!(f, "limit markers not supported")
1915 }
1916 }
1917 }
1918}
1919
1920#[derive(Clone, Copy, Debug, PartialEq)]
1921pub enum UpdateKeyValueErrorKind {
1922 InvalidStoreName,
1923 TooLongHistory,
1924 JetStream,
1925 BucketUpdate,
1926 TimedOut,
1927 LimitMarkersNotSupported,
1928 NotFound,
1929}
1930
1931impl Display for UpdateKeyValueErrorKind {
1932 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1933 match self {
1934 Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
1935 Self::TooLongHistory => write!(f, "too long history"),
1936 Self::JetStream => write!(f, "JetStream error"),
1937 Self::BucketUpdate => write!(f, "bucket creation failed"),
1938 Self::TimedOut => write!(f, "timed out"),
1939 Self::LimitMarkersNotSupported => {
1940 write!(f, "limit markers not supported")
1941 }
1942 Self::NotFound => write!(f, "bucket does not exist"),
1943 }
1944 }
1945}
1946pub type CreateKeyValueError = Error<CreateKeyValueErrorKind>;
1947pub type UpdateKeyValueError = Error<UpdateKeyValueErrorKind>;
1948
1949pub type CreateObjectStoreError = CreateKeyValueError;
1950pub type CreateObjectStoreErrorKind = CreateKeyValueErrorKind;
1951
1952#[derive(Clone, Copy, Debug, PartialEq)]
1953pub enum ObjectStoreErrorKind {
1954 InvalidBucketName,
1955 GetStore,
1956}
1957
1958impl Display for ObjectStoreErrorKind {
1959 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1960 match self {
1961 Self::InvalidBucketName => write!(f, "invalid Object Store bucket name"),
1962 Self::GetStore => write!(f, "failed to get Object Store"),
1963 }
1964 }
1965}
1966
1967pub type ObjectStoreError = Error<ObjectStoreErrorKind>;
1968
1969pub type DeleteObjectStore = ObjectStoreError;
1970pub type DeleteObjectStoreKind = ObjectStoreErrorKind;
1971
1972#[derive(Clone, Debug, PartialEq)]
1973pub enum AccountErrorKind {
1974 TimedOut,
1975 JetStream(super::errors::Error),
1976 JetStreamUnavailable,
1977 Other,
1978}
1979
1980impl Display for AccountErrorKind {
1981 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1982 match self {
1983 Self::TimedOut => write!(f, "timed out"),
1984 Self::JetStream(err) => write!(f, "JetStream error: {}", err),
1985 Self::Other => write!(f, "error"),
1986 Self::JetStreamUnavailable => write!(f, "JetStream unavailable"),
1987 }
1988 }
1989}
1990
1991pub type AccountError = Error<AccountErrorKind>;
1992
1993impl From<RequestError> for AccountError {
1994 fn from(err: RequestError) -> Self {
1995 match err.kind {
1996 RequestErrorKind::NoResponders => {
1997 AccountError::with_source(AccountErrorKind::JetStreamUnavailable, err)
1998 }
1999 RequestErrorKind::TimedOut => AccountError::new(AccountErrorKind::TimedOut),
2000 RequestErrorKind::Other => AccountError::with_source(AccountErrorKind::Other, err),
2001 }
2002 }
2003}
2004
2005#[derive(Clone, Debug, Serialize)]
2006enum ConsumerAction {
2007 #[serde(rename = "")]
2008 CreateOrUpdate,
2009 #[serde(rename = "create")]
2010 #[cfg(feature = "server_2_10")]
2011 Create,
2012 #[serde(rename = "update")]
2013 #[cfg(feature = "server_2_10")]
2014 Update,
2015}
2016
2017fn map_to_kv(stream: super::stream::Stream, prefix: String, bucket: String) -> Store {
2019 let mut store = Store {
2020 prefix: format!("$KV.{}.", bucket.as_str()),
2021 name: bucket,
2022 stream: stream.clone(),
2023 stream_name: stream.info.config.name.clone(),
2024 put_prefix: None,
2025 use_jetstream_prefix: prefix != "$JS.API",
2026 };
2027 if let Some(ref mirror) = stream.info.config.mirror {
2028 let bucket = mirror.name.trim_start_matches("KV_");
2029 if let Some(ref external) = mirror.external {
2030 if !external.api_prefix.is_empty() {
2031 store.use_jetstream_prefix = false;
2032 store.prefix = format!("$KV.{bucket}.");
2033 store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
2034 } else {
2035 store.put_prefix = Some(format!("$KV.{bucket}."));
2036 }
2037 }
2038 };
2039 store
2040}
2041
2042enum KvToStreamConfigError {
2043 TooLongHistory,
2044 #[allow(dead_code)]
2045 LimitMarkersNotSupported,
2046}
2047
2048impl From<KvToStreamConfigError> for CreateKeyValueError {
2049 fn from(err: KvToStreamConfigError) -> Self {
2050 match err {
2051 KvToStreamConfigError::TooLongHistory => {
2052 CreateKeyValueError::new(CreateKeyValueErrorKind::TooLongHistory)
2053 }
2054 KvToStreamConfigError::LimitMarkersNotSupported => {
2055 CreateKeyValueError::new(CreateKeyValueErrorKind::LimitMarkersNotSupported)
2056 }
2057 }
2058 }
2059}
2060
2061impl From<KvToStreamConfigError> for UpdateKeyValueError {
2062 fn from(err: KvToStreamConfigError) -> Self {
2063 match err {
2064 KvToStreamConfigError::TooLongHistory => {
2065 UpdateKeyValueError::new(UpdateKeyValueErrorKind::TooLongHistory)
2066 }
2067 KvToStreamConfigError::LimitMarkersNotSupported => {
2068 UpdateKeyValueError::new(UpdateKeyValueErrorKind::LimitMarkersNotSupported)
2069 }
2070 }
2071 }
2072}
2073
2074fn kv_to_stream_config(
2076 config: kv::Config,
2077 _account: Account,
2078) -> Result<super::stream::Config, KvToStreamConfigError> {
2079 let history = if config.history > 0 {
2080 if config.history > MAX_HISTORY {
2081 return Err(KvToStreamConfigError::TooLongHistory);
2082 }
2083 config.history
2084 } else {
2085 1
2086 };
2087
2088 let num_replicas = if config.num_replicas == 0 {
2089 1
2090 } else {
2091 config.num_replicas
2092 };
2093
2094 #[cfg(feature = "server_2_11")]
2095 let (mut allow_message_ttl, mut subject_delete_marker_ttl) = (false, None);
2096
2097 #[cfg(feature = "server_2_11")]
2098 if let Some(duration) = config.limit_markers {
2099 if _account.requests.level < 1 {
2100 return Err(KvToStreamConfigError::LimitMarkersNotSupported);
2101 }
2102 allow_message_ttl = true;
2103 subject_delete_marker_ttl = Some(duration);
2104 }
2105
2106 let mut mirror = config.mirror.clone();
2107 let mut sources = config.sources.clone();
2108 let mut mirror_direct = config.mirror_direct;
2109
2110 let mut subjects = Vec::new();
2111 if let Some(ref mut mirror) = mirror {
2112 if !mirror.name.starts_with("KV_") {
2113 mirror.name = format!("KV_{}", mirror.name);
2114 }
2115 mirror_direct = true;
2116 } else if let Some(ref mut sources) = sources {
2117 for source in sources {
2118 if !source.name.starts_with("KV_") {
2119 source.name = format!("KV_{}", source.name);
2120 }
2121 }
2122 } else {
2123 subjects = vec![format!("$KV.{}.>", config.bucket)];
2124 }
2125
2126 Ok(stream::Config {
2127 name: format!("KV_{}", config.bucket),
2128 description: Some(config.description),
2129 subjects,
2130 max_messages_per_subject: history,
2131 max_bytes: config.max_bytes,
2132 max_age: config.max_age,
2133 max_message_size: config.max_value_size,
2134 storage: config.storage,
2135 republish: config.republish,
2136 allow_rollup: true,
2137 deny_delete: true,
2138 deny_purge: false,
2139 allow_direct: true,
2140 sources,
2141 mirror,
2142 num_replicas,
2143 discard: stream::DiscardPolicy::New,
2144 mirror_direct,
2145 #[cfg(feature = "server_2_10")]
2146 compression: if config.compression {
2147 Some(stream::Compression::S2)
2148 } else {
2149 None
2150 },
2151 placement: config.placement,
2152 #[cfg(feature = "server_2_11")]
2153 allow_message_ttl,
2154 #[cfg(feature = "server_2_11")]
2155 subject_delete_marker_ttl,
2156 ..Default::default()
2157 })
2158}