1use std::collections::{HashMap, VecDeque};
16use std::fmt::Display;
17use std::{cmp, str::FromStr, task::Poll, time::Duration};
18
19use crate::crypto::Sha256;
20use crate::subject::Subject;
21use crate::{HeaderMap, HeaderValue};
22use base64::engine::general_purpose::URL_SAFE;
23use base64::engine::Engine;
24use bytes::BytesMut;
25use futures_util::future::BoxFuture;
26use once_cell::sync::Lazy;
27use tokio::io::AsyncReadExt;
28
29use futures_util::{Stream, StreamExt};
30use regex::Regex;
31use serde::{Deserialize, Serialize};
32use tracing::{debug, trace};
33
34use super::consumer::push::{OrderedConfig, OrderedError};
35use super::consumer::{DeliverPolicy, StreamError, StreamErrorKind};
36use super::context::{PublishError, PublishErrorKind};
37use super::stream::{self, ConsumerError, ConsumerErrorKind, PurgeError, PurgeErrorKind};
38use super::{consumer::push::Ordered, stream::StorageType};
39use crate::error::Error;
40use time::{serde::rfc3339, OffsetDateTime};
41
42const DEFAULT_CHUNK_SIZE: usize = 128 * 1024;
43const NATS_ROLLUP: &str = "Nats-Rollup";
44const ROLLUP_SUBJECT: &str = "sub";
45
46static BUCKET_NAME_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\A[a-zA-Z0-9_-]+\z").unwrap());
47static OBJECT_NAME_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\A[-/_=\.a-zA-Z0-9]+\z").unwrap());
48
49pub(crate) fn is_valid_bucket_name(bucket_name: &str) -> bool {
50 BUCKET_NAME_RE.is_match(bucket_name)
51}
52
53pub(crate) fn is_valid_object_name(object_name: &str) -> bool {
54 if object_name.is_empty() || object_name.starts_with('.') || object_name.ends_with('.') {
55 return false;
56 }
57
58 OBJECT_NAME_RE.is_match(object_name)
59}
60
61pub(crate) fn encode_object_name(object_name: &str) -> String {
62 URL_SAFE.encode(object_name)
63}
64
65#[derive(Debug, Default, Clone, Serialize, Deserialize)]
67pub struct Config {
68 pub bucket: String,
70 pub description: Option<String>,
72 #[serde(default, with = "serde_nanos")]
74 pub max_age: Duration,
75 pub max_bytes: i64,
77 pub storage: StorageType,
79 pub num_replicas: usize,
81 pub compression: bool,
83 pub placement: Option<stream::Placement>,
85}
86
87#[derive(Clone)]
89pub struct ObjectStore {
90 pub(crate) name: String,
91 pub(crate) stream: crate::jetstream::stream::Stream,
92}
93
94impl ObjectStore {
95 pub async fn get<T: AsRef<str> + Send>(&self, object_name: T) -> Result<Object, GetError> {
119 self.get_impl(object_name).await
120 }
121
122 fn get_impl<'bucket, 'future, T>(
123 &'bucket self,
124 object_name: T,
125 ) -> BoxFuture<'future, Result<Object, GetError>>
126 where
127 T: AsRef<str> + Send + 'future,
128 'bucket: 'future,
129 {
130 Box::pin(async move {
131 let object_info = self.info(object_name).await?;
132 if object_info.deleted {
133 return Err(GetError::new(GetErrorKind::NotFound));
134 }
135 if let Some(ref options) = object_info.options {
136 if let Some(link) = options.link.as_ref() {
137 if let Some(link_name) = link.name.as_ref() {
138 let link_name = link_name.clone();
139 debug!("getting object via link");
140 if link.bucket == self.name {
141 return self.get_impl(link_name).await;
142 } else {
143 let bucket = self
144 .stream
145 .context
146 .get_object_store(&link.bucket)
147 .await
148 .map_err(|err| {
149 GetError::with_source(GetErrorKind::Other, err)
150 })?;
151 let object = bucket.get_impl(&link_name).await?;
152 return Ok(object);
153 }
154 } else {
155 return Err(GetError::new(GetErrorKind::BucketLink));
156 }
157 }
158 }
159
160 debug!("not a link. Getting the object");
161 Ok(Object::new(object_info, self.stream.clone()))
162 })
163 }
164
165 pub async fn delete<T: AsRef<str>>(&self, object_name: T) -> Result<(), DeleteError> {
181 let object_name = object_name.as_ref();
182 let mut object_info = self.info(object_name).await?;
183 object_info.chunks = 0;
184 object_info.size = 0;
185 object_info.deleted = true;
186
187 let data = serde_json::to_vec(&object_info).map_err(|err| {
188 DeleteError::with_source(
189 DeleteErrorKind::Other,
190 format!("failed deserializing object info: {err}"),
191 )
192 })?;
193
194 let mut headers = HeaderMap::default();
195 headers.insert(
196 NATS_ROLLUP,
197 HeaderValue::from_str(ROLLUP_SUBJECT).map_err(|err| {
198 DeleteError::with_source(
199 DeleteErrorKind::Other,
200 format!("failed parsing header: {err}"),
201 )
202 })?,
203 );
204
205 let subject = format!("$O.{}.M.{}", &self.name, encode_object_name(object_name));
206
207 self.stream
208 .context
209 .publish_with_headers(subject, headers, data.into())
210 .await?
211 .await?;
212
213 let chunk_subject = format!("$O.{}.C.{}", self.name, object_info.nuid);
214
215 self.stream.purge().filter(&chunk_subject).await?;
216
217 Ok(())
218 }
219
220 pub async fn info<T: AsRef<str>>(&self, object_name: T) -> Result<ObjectInfo, InfoError> {
236 let object_name = object_name.as_ref();
237 let object_name = encode_object_name(object_name);
238 if !is_valid_object_name(&object_name) {
239 return Err(InfoError::new(InfoErrorKind::InvalidName));
240 }
241
242 let subject = format!("$O.{}.M.{}", &self.name, &object_name);
244
245 let message = self
247 .stream
248 .get_last_raw_message_by_subject(subject.as_str())
249 .await
250 .map_err(|err| match err.kind() {
251 stream::LastRawMessageErrorKind::NoMessageFound => {
252 InfoError::new(InfoErrorKind::NotFound)
253 }
254 _ => InfoError::with_source(InfoErrorKind::Other, err),
255 })?;
256 let object_info =
257 serde_json::from_slice::<ObjectInfo>(&message.payload).map_err(|err| {
258 InfoError::with_source(
259 InfoErrorKind::Other,
260 format!("failed to decode info payload: {err}"),
261 )
262 })?;
263
264 Ok(object_info)
265 }
266
267 pub async fn put<T>(
285 &self,
286 meta: T,
287 data: &mut (impl tokio::io::AsyncRead + std::marker::Unpin),
288 ) -> Result<ObjectInfo, PutError>
289 where
290 ObjectMetadata: From<T>,
291 {
292 let object_meta: ObjectMetadata = meta.into();
293
294 let maybe_existing_object_info = (self.info(&object_meta.name).await).ok();
296
297 let object_nuid = nuid::next();
298 let chunk_subject = Subject::from(format!("$O.{}.C.{}", &self.name, &object_nuid));
299
300 let mut object_chunks = 0;
301 let mut object_size = 0;
302
303 let chunk_size = object_meta.chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE);
304 let mut buffer = BytesMut::with_capacity(chunk_size);
305 let mut sha256 = Sha256::new();
306
307 loop {
308 let n = data
309 .read_buf(&mut buffer)
310 .await
311 .map_err(|err| PutError::with_source(PutErrorKind::ReadChunks, err))?;
312
313 if n == 0 {
314 break;
315 }
316
317 let payload = buffer.split().freeze();
318 sha256.update(&payload);
319
320 object_size += payload.len();
321 object_chunks += 1;
322
323 self.stream
324 .context
325 .publish(chunk_subject.clone(), payload)
326 .await
327 .map_err(|err| {
328 PutError::with_source(
329 PutErrorKind::PublishChunks,
330 format!("failed chunk publish: {err}"),
331 )
332 })?
333 .await
334 .map_err(|err| {
335 PutError::with_source(
336 PutErrorKind::PublishChunks,
337 format!("failed getting chunk ack: {err}"),
338 )
339 })?;
340 }
341 let digest = sha256.finish();
342
343 let encoded_object_name = encode_object_name(&object_meta.name);
344 if !is_valid_object_name(&encoded_object_name) {
345 return Err(PutError::new(PutErrorKind::InvalidName));
346 }
347 let subject = format!("$O.{}.M.{}", &self.name, &encoded_object_name);
348
349 let object_info = ObjectInfo {
350 name: object_meta.name,
351 description: object_meta.description,
352 options: Some(ObjectOptions {
353 max_chunk_size: Some(chunk_size),
354 link: None,
355 }),
356 bucket: self.name.clone(),
357 nuid: object_nuid.to_string(),
358 chunks: object_chunks,
359 size: object_size,
360 digest: Some(format!("SHA-256={}", URL_SAFE.encode(digest))),
361 modified: Some(OffsetDateTime::now_utc()),
362 deleted: false,
363 metadata: object_meta.metadata,
364 headers: object_meta.headers,
365 };
366
367 let mut headers = HeaderMap::new();
368 headers.insert(
369 NATS_ROLLUP,
370 ROLLUP_SUBJECT.parse::<HeaderValue>().map_err(|err| {
371 PutError::with_source(PutErrorKind::Other, format!("failed parsing header: {err}"))
372 })?,
373 );
374 let data = serde_json::to_vec(&object_info).map_err(|err| {
375 PutError::with_source(
376 PutErrorKind::Other,
377 format!("failed serializing object info: {err}"),
378 )
379 })?;
380
381 self.stream
383 .context
384 .publish_with_headers(subject, headers, data.into())
385 .await
386 .map_err(|err| {
387 PutError::with_source(
388 PutErrorKind::PublishMetadata,
389 format!("failed publishing metadata: {err}"),
390 )
391 })?
392 .await
393 .map_err(|err| {
394 PutError::with_source(
395 PutErrorKind::PublishMetadata,
396 format!("failed ack from metadata publish: {err}"),
397 )
398 })?;
399
400 if let Some(existing_object_info) = maybe_existing_object_info {
402 let chunk_subject = format!("$O.{}.C.{}", &self.name, &existing_object_info.nuid);
403
404 self.stream
405 .purge()
406 .filter(&chunk_subject)
407 .await
408 .map_err(|err| PutError::with_source(PutErrorKind::PurgeOldChunks, err))?;
409 }
410
411 Ok(object_info)
412 }
413
414 pub async fn watch(&self) -> Result<Watch, WatchError> {
434 self.watch_with_deliver_policy(DeliverPolicy::New).await
435 }
436
437 pub async fn watch_with_history(&self) -> Result<Watch, WatchError> {
440 self.watch_with_deliver_policy(DeliverPolicy::LastPerSubject)
441 .await
442 }
443
444 async fn watch_with_deliver_policy(
445 &self,
446 deliver_policy: DeliverPolicy,
447 ) -> Result<Watch, WatchError> {
448 let subject = format!("$O.{}.M.>", self.name);
449 let ordered = self
450 .stream
451 .create_consumer(crate::jetstream::consumer::push::OrderedConfig {
452 deliver_policy,
453 deliver_subject: self.stream.context.client.new_inbox(),
454 description: Some("object store watcher".to_string()),
455 filter_subject: subject,
456 ..Default::default()
457 })
458 .await?;
459 Ok(Watch {
460 subscription: ordered.messages().await?,
461 })
462 }
463
464 pub async fn list(&self) -> Result<List, ListError> {
484 trace!("starting Object List");
485 let subject = format!("$O.{}.M.>", self.name);
486 let ordered = self
487 .stream
488 .create_consumer(crate::jetstream::consumer::push::OrderedConfig {
489 deliver_policy: super::consumer::DeliverPolicy::All,
490 deliver_subject: self.stream.context.client.new_inbox(),
491 description: Some("object store list".to_string()),
492 filter_subject: subject,
493 ..Default::default()
494 })
495 .await?;
496 Ok(List {
497 done: ordered.info.num_pending == 0,
498 subscription: Some(ordered.messages().await?),
499 })
500 }
501
502 pub async fn seal(&mut self) -> Result<(), SealError> {
519 let mut stream_config = self
520 .stream
521 .info()
522 .await
523 .map_err(|err| SealError::with_source(SealErrorKind::Info, err))?
524 .to_owned();
525 stream_config.config.sealed = true;
526
527 self.stream
528 .context
529 .update_stream(&stream_config.config)
530 .await?;
531 Ok(())
532 }
533
534 pub async fn update_metadata<A: AsRef<str>>(
560 &self,
561 object: A,
562 metadata: UpdateMetadata,
563 ) -> Result<ObjectInfo, UpdateMetadataError> {
564 let mut info = self.info(object.as_ref()).await?;
565
566 if metadata.name != info.name {
569 tracing::info!("new metadata name is different than then old one");
570 if !is_valid_object_name(&metadata.name) {
571 return Err(UpdateMetadataError::new(
572 UpdateMetadataErrorKind::InvalidName,
573 ));
574 }
575 match self.info(&metadata.name).await {
576 Ok(_) => {
577 return Err(UpdateMetadataError::new(
578 UpdateMetadataErrorKind::NameAlreadyInUse,
579 ))
580 }
581 Err(err) => match err.kind() {
582 InfoErrorKind::NotFound => {
583 tracing::info!("purging old metadata: {}", info.name);
584 self.stream
585 .purge()
586 .filter(format!(
587 "$O.{}.M.{}",
588 self.name,
589 encode_object_name(&info.name)
590 ))
591 .await
592 .map_err(|err| {
593 UpdateMetadataError::with_source(
594 UpdateMetadataErrorKind::Purge,
595 err,
596 )
597 })?;
598 }
599 _ => {
600 return Err(UpdateMetadataError::with_source(
601 UpdateMetadataErrorKind::Other,
602 err,
603 ))
604 }
605 },
606 }
607 }
608
609 info.name = metadata.name;
610 info.description = metadata.description;
611
612 let name = encode_object_name(&info.name);
613 let subject = format!("$O.{}.M.{}", &self.name, &name);
614
615 let mut headers = HeaderMap::new();
616 headers.insert(
617 NATS_ROLLUP,
618 ROLLUP_SUBJECT.parse::<HeaderValue>().map_err(|err| {
619 UpdateMetadataError::with_source(
620 UpdateMetadataErrorKind::Other,
621 format!("failed parsing header: {err}"),
622 )
623 })?,
624 );
625 let data = serde_json::to_vec(&info).map_err(|err| {
626 UpdateMetadataError::with_source(
627 UpdateMetadataErrorKind::Other,
628 format!("failed serializing object info: {err}"),
629 )
630 })?;
631
632 self.stream
634 .context
635 .publish_with_headers(subject, headers, data.into())
636 .await
637 .map_err(|err| {
638 UpdateMetadataError::with_source(
639 UpdateMetadataErrorKind::PublishMetadata,
640 format!("failed publishing metadata: {err}"),
641 )
642 })?
643 .await
644 .map_err(|err| {
645 UpdateMetadataError::with_source(
646 UpdateMetadataErrorKind::PublishMetadata,
647 format!("failed ack from metadata publish: {err}"),
648 )
649 })?;
650
651 Ok(info)
652 }
653
654 pub async fn add_link<T, O>(&self, name: T, object: O) -> Result<ObjectInfo, AddLinkError>
674 where
675 T: ToString,
676 O: AsObjectInfo,
677 {
678 let object = object.as_info();
679 let name = name.to_string();
680 if name.is_empty() {
681 return Err(AddLinkError::new(AddLinkErrorKind::EmptyName));
682 }
683 if object.name.is_empty() {
684 return Err(AddLinkError::new(AddLinkErrorKind::ObjectRequired));
685 }
686 if object.deleted {
687 return Err(AddLinkError::new(AddLinkErrorKind::Deleted));
688 }
689 if let Some(ref options) = object.options {
690 if options.link.is_some() {
691 return Err(AddLinkError::new(AddLinkErrorKind::LinkToLink));
692 }
693 }
694 match self.info(&name).await {
695 Ok(info) => {
696 if let Some(options) = info.options {
697 if options.link.is_none() {
698 return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists));
699 }
700 } else {
701 return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists));
702 }
703 }
704 Err(err) if err.kind() != InfoErrorKind::NotFound => {
705 return Err(AddLinkError::with_source(AddLinkErrorKind::Other, err))
706 }
707 _ => (),
708 }
709
710 let info = ObjectInfo {
711 name,
712 description: None,
713 options: Some(ObjectOptions {
714 link: Some(ObjectLink {
715 name: Some(object.name.clone()),
716 bucket: object.bucket.clone(),
717 }),
718 max_chunk_size: None,
719 }),
720 bucket: self.name.clone(),
721 nuid: nuid::next().to_string(),
722 size: 0,
723 chunks: 0,
724 modified: Some(OffsetDateTime::now_utc()),
725 digest: None,
726 deleted: false,
727 metadata: HashMap::default(),
728 headers: None,
729 };
730 publish_meta(self, &info).await?;
731 Ok(info)
732 }
733
734 pub async fn add_bucket_link<T: ToString, U: ToString>(
754 &self,
755 name: T,
756 bucket: U,
757 ) -> Result<ObjectInfo, AddLinkError> {
758 let name = name.to_string();
759 let bucket = bucket.to_string();
760 if name.is_empty() {
761 return Err(AddLinkError::new(AddLinkErrorKind::EmptyName));
762 }
763
764 match self.info(&name).await {
765 Ok(info) => {
766 if let Some(options) = info.options {
767 if options.link.is_none() {
768 return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists));
769 }
770 }
771 }
772 Err(err) if err.kind() != InfoErrorKind::NotFound => {
773 return Err(AddLinkError::with_source(AddLinkErrorKind::Other, err))
774 }
775 _ => (),
776 }
777
778 let info = ObjectInfo {
779 name: name.clone(),
780 description: None,
781 options: Some(ObjectOptions {
782 link: Some(ObjectLink { name: None, bucket }),
783 max_chunk_size: None,
784 }),
785 bucket: self.name.clone(),
786 nuid: nuid::next().to_string(),
787 size: 0,
788 chunks: 0,
789 modified: Some(OffsetDateTime::now_utc()),
790 digest: None,
791 deleted: false,
792 metadata: HashMap::default(),
793 headers: None,
794 };
795 publish_meta(self, &info).await?;
796 Ok(info)
797 }
798}
799
800async fn publish_meta(store: &ObjectStore, info: &ObjectInfo) -> Result<(), PublishMetadataError> {
801 let encoded_object_name = encode_object_name(&info.name);
802 let subject = format!("$O.{}.M.{}", &store.name, &encoded_object_name);
803
804 let mut headers = HeaderMap::new();
805 headers.insert(
806 NATS_ROLLUP,
807 ROLLUP_SUBJECT.parse::<HeaderValue>().map_err(|err| {
808 PublishMetadataError::with_source(
809 PublishMetadataErrorKind::Other,
810 format!("failed parsing header: {err}"),
811 )
812 })?,
813 );
814 let data = serde_json::to_vec(&info).map_err(|err| {
815 PublishMetadataError::with_source(
816 PublishMetadataErrorKind::Other,
817 format!("failed serializing object info: {err}"),
818 )
819 })?;
820
821 store
822 .stream
823 .context
824 .publish_with_headers(subject, headers, data.into())
825 .await
826 .map_err(|err| {
827 PublishMetadataError::with_source(
828 PublishMetadataErrorKind::PublishMetadata,
829 format!("failed publishing metadata: {err}"),
830 )
831 })?
832 .await
833 .map_err(|err| {
834 PublishMetadataError::with_source(
835 PublishMetadataErrorKind::PublishMetadata,
836 format!("failed ack from metadata publish: {err}"),
837 )
838 })?;
839 Ok(())
840}
841
842pub struct Watch {
843 subscription: crate::jetstream::consumer::push::Ordered,
844}
845
846impl Stream for Watch {
847 type Item = Result<ObjectInfo, WatcherError>;
848
849 fn poll_next(
850 mut self: std::pin::Pin<&mut Self>,
851 cx: &mut std::task::Context<'_>,
852 ) -> Poll<Option<Self::Item>> {
853 match self.subscription.poll_next_unpin(cx) {
854 Poll::Ready(message) => match message {
855 Some(message) => Poll::Ready(
856 serde_json::from_slice::<ObjectInfo>(&message?.payload)
857 .map_err(|err| {
858 WatcherError::with_source(
859 WatcherErrorKind::Other,
860 format!("failed to deserialize object info: {err}"),
861 )
862 })
863 .map_or_else(|err| Some(Err(err)), |result| Some(Ok(result))),
864 ),
865 None => Poll::Ready(None),
866 },
867 Poll::Pending => Poll::Pending,
868 }
869 }
870}
871
872pub struct List {
873 subscription: Option<crate::jetstream::consumer::push::Ordered>,
874 done: bool,
875}
876
877impl Stream for List {
878 type Item = Result<ObjectInfo, ListerError>;
879
880 fn poll_next(
881 mut self: std::pin::Pin<&mut Self>,
882 cx: &mut std::task::Context<'_>,
883 ) -> Poll<Option<Self::Item>> {
884 loop {
885 if self.done {
886 debug!("Object Store list done");
887 self.subscription = None;
888 return Poll::Ready(None);
889 }
890
891 if let Some(subscription) = self.subscription.as_mut() {
892 match subscription.poll_next_unpin(cx) {
893 Poll::Ready(message) => match message {
894 None => return Poll::Ready(None),
895 Some(message) => {
896 let message = message?;
897 let info = message.info().map_err(|err| {
898 ListerError::with_source(ListerErrorKind::Other, err)
899 })?;
900 trace!("num pending: {}", info.pending);
901 if info.pending == 0 {
902 self.done = true;
903 }
904 let response: ObjectInfo = serde_json::from_slice(&message.payload)
905 .map_err(|err| {
906 ListerError::with_source(
907 ListerErrorKind::Other,
908 format!("failed deserializing object info: {err}"),
909 )
910 })?;
911 if response.deleted {
912 continue;
913 }
914 return Poll::Ready(Some(Ok(response)));
915 }
916 },
917 Poll::Pending => return Poll::Pending,
918 }
919 } else {
920 return Poll::Ready(None);
921 }
922 }
923 }
924}
925
926pub struct Object {
928 pub info: ObjectInfo,
929 remaining_bytes: VecDeque<u8>,
930 has_pending_messages: bool,
931 digest: Option<Sha256>,
932 subscription: Option<crate::jetstream::consumer::push::Ordered>,
933 subscription_future: Option<BoxFuture<'static, Result<Ordered, StreamError>>>,
934 stream: crate::jetstream::stream::Stream,
935}
936
937impl std::fmt::Debug for Object {
938 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
939 f.debug_struct("Object")
940 .field("info", &self.info)
941 .field("remaining_bytes", &self.remaining_bytes)
942 .field("has_pending_messages", &self.has_pending_messages)
943 .finish()
944 }
945}
946
947impl Object {
948 pub(crate) fn new(info: ObjectInfo, stream: stream::Stream) -> Self {
949 let has_pending_messages = info.chunks > 0;
950 Object {
951 subscription: None,
952 info,
953 remaining_bytes: VecDeque::new(),
954 has_pending_messages,
955 digest: Some(Sha256::new()),
956 subscription_future: None,
957 stream,
958 }
959 }
960
961 pub fn info(&self) -> &ObjectInfo {
963 &self.info
964 }
965}
966
967impl tokio::io::AsyncRead for Object {
968 fn poll_read(
969 mut self: std::pin::Pin<&mut Self>,
970 cx: &mut std::task::Context<'_>,
971 buf: &mut tokio::io::ReadBuf<'_>,
972 ) -> std::task::Poll<std::io::Result<()>> {
973 let (buf1, _buf2) = self.remaining_bytes.as_slices();
974 if !buf1.is_empty() {
975 let len = cmp::min(buf.remaining(), buf1.len());
976 buf.put_slice(&buf1[..len]);
977 self.remaining_bytes.drain(..len);
978 return Poll::Ready(Ok(()));
979 }
980
981 if self.has_pending_messages {
982 if self.subscription.is_none() {
983 let future = match self.subscription_future.as_mut() {
984 Some(future) => future,
985 None => {
986 let stream = self.stream.clone();
987 let bucket = self.info.bucket.clone();
988 let nuid = self.info.nuid.clone();
989 self.subscription_future.insert(Box::pin(async move {
990 stream
991 .create_consumer(OrderedConfig {
992 deliver_subject: stream.context.client.new_inbox(),
993 filter_subject: format!("$O.{bucket}.C.{nuid}"),
994 ..Default::default()
995 })
996 .await
997 .unwrap()
998 .messages()
999 .await
1000 }))
1001 }
1002 };
1003 match future.as_mut().poll(cx) {
1004 Poll::Ready(subscription) => {
1005 self.subscription = Some(subscription.unwrap());
1006 }
1007 Poll::Pending => (),
1008 }
1009 }
1010 if let Some(subscription) = self.subscription.as_mut() {
1011 match subscription.poll_next_unpin(cx) {
1012 Poll::Ready(message) => match message {
1013 Some(message) => {
1014 let message = message.map_err(|err| {
1015 std::io::Error::other(format!(
1016 "error from JetStream subscription: {err}"
1017 ))
1018 })?;
1019 let len = cmp::min(buf.remaining(), message.payload.len());
1020 buf.put_slice(&message.payload[..len]);
1021 if let Some(context) = &mut self.digest {
1022 context.update(&message.payload);
1023 }
1024 self.remaining_bytes.extend(&message.payload[len..]);
1025
1026 let info = message.info().map_err(|err| {
1027 std::io::Error::other(format!(
1028 "error from JetStream subscription: {err}"
1029 ))
1030 })?;
1031 if info.pending == 0 {
1032 let digest = self.digest.take().map(Sha256::finish);
1033 if let Some(digest) = digest {
1034 if self
1035 .info
1036 .digest
1037 .as_ref()
1038 .map(|digest_self| {
1039 format!("SHA-256={}", URL_SAFE.encode(digest))
1040 != *digest_self
1041 })
1042 .unwrap_or(false)
1043 {
1044 return Poll::Ready(Err(std::io::Error::new(
1045 std::io::ErrorKind::InvalidData,
1046 "wrong digest",
1047 )));
1048 }
1049 } else {
1050 return Poll::Ready(Err(std::io::Error::new(
1051 std::io::ErrorKind::InvalidData,
1052 "digest should be Some",
1053 )));
1054 }
1055 self.has_pending_messages = false;
1056 self.subscription = None;
1057 }
1058 Poll::Ready(Ok(()))
1059 }
1060 None => Poll::Ready(Err(std::io::Error::other(
1061 "subscription ended before reading whole object",
1062 ))),
1063 },
1064 Poll::Pending => Poll::Pending,
1065 }
1066 } else {
1067 Poll::Pending
1068 }
1069 } else {
1070 Poll::Ready(Ok(()))
1071 }
1072 }
1073}
1074
1075#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
1076pub struct ObjectOptions {
1077 pub link: Option<ObjectLink>,
1078 pub max_chunk_size: Option<usize>,
1079}
1080
1081#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
1083pub struct ObjectInfo {
1084 pub name: String,
1086 #[serde(default)]
1088 pub description: Option<String>,
1089 #[serde(default)]
1091 pub metadata: HashMap<String, String>,
1092 #[serde(default)]
1094 pub headers: Option<HeaderMap>,
1095 #[serde(default)]
1097 pub options: Option<ObjectOptions>,
1098 pub bucket: String,
1100 #[serde(default)]
1102 pub nuid: String,
1103 #[serde(default)]
1105 pub size: usize,
1106 #[serde(default)]
1108 pub chunks: usize,
1109 #[serde(default, with = "rfc3339::option")]
1111 #[serde(rename = "mtime")]
1112 pub modified: Option<time::OffsetDateTime>,
1113 #[serde(default, skip_serializing_if = "Option::is_none")]
1115 pub digest: Option<String>,
1116 #[serde(default, skip_serializing_if = "is_default")]
1118 pub deleted: bool,
1119}
1120
1121fn is_default<T: Default + Eq>(t: &T) -> bool {
1122 t == &T::default()
1123}
1124#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
1126pub struct ObjectLink {
1127 pub name: Option<String>,
1129 pub bucket: String,
1131}
1132
1133#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
1134pub struct UpdateMetadata {
1135 pub name: String,
1137 pub description: Option<String>,
1139 #[serde(default)]
1141 pub metadata: HashMap<String, String>,
1142 pub headers: Option<HeaderMap>,
1144}
1145
1146#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
1148pub struct ObjectMetadata {
1149 pub name: String,
1151 pub description: Option<String>,
1153 pub chunk_size: Option<usize>,
1155 #[serde(default)]
1157 pub metadata: HashMap<String, String>,
1158 pub headers: Option<HeaderMap>,
1160}
1161
1162impl From<&str> for ObjectMetadata {
1163 fn from(s: &str) -> ObjectMetadata {
1164 ObjectMetadata {
1165 name: s.to_string(),
1166 ..Default::default()
1167 }
1168 }
1169}
1170
1171pub trait AsObjectInfo {
1172 fn as_info(&self) -> &ObjectInfo;
1173}
1174
1175impl AsObjectInfo for &Object {
1176 fn as_info(&self) -> &ObjectInfo {
1177 &self.info
1178 }
1179}
1180impl AsObjectInfo for &ObjectInfo {
1181 fn as_info(&self) -> &ObjectInfo {
1182 self
1183 }
1184}
1185
1186impl From<ObjectInfo> for ObjectMetadata {
1187 fn from(info: ObjectInfo) -> Self {
1188 ObjectMetadata {
1189 name: info.name,
1190 description: info.description,
1191 metadata: info.metadata,
1192 headers: info.headers,
1193 chunk_size: None,
1194 }
1195 }
1196}
1197
1198#[derive(Debug, PartialEq, Clone)]
1199pub enum UpdateMetadataErrorKind {
1200 InvalidName,
1201 NotFound,
1202 TimedOut,
1203 Other,
1204 PublishMetadata,
1205 NameAlreadyInUse,
1206 Purge,
1207}
1208
1209impl Display for UpdateMetadataErrorKind {
1210 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1211 match self {
1212 Self::InvalidName => write!(f, "invalid object name"),
1213 Self::NotFound => write!(f, "object not found"),
1214 Self::TimedOut => write!(f, "timed out"),
1215 Self::Other => write!(f, "error"),
1216 Self::PublishMetadata => {
1217 write!(f, "failed publishing metadata")
1218 }
1219 Self::NameAlreadyInUse => {
1220 write!(f, "object with updated name already exists")
1221 }
1222 Self::Purge => write!(f, "failed purging old name metadata"),
1223 }
1224 }
1225}
1226
1227impl From<InfoError> for UpdateMetadataError {
1228 fn from(error: InfoError) -> Self {
1229 match error.kind() {
1230 InfoErrorKind::InvalidName => {
1231 UpdateMetadataError::new(UpdateMetadataErrorKind::InvalidName)
1232 }
1233 InfoErrorKind::NotFound => UpdateMetadataError::new(UpdateMetadataErrorKind::NotFound),
1234 InfoErrorKind::Other => {
1235 UpdateMetadataError::with_source(UpdateMetadataErrorKind::Other, error)
1236 }
1237 InfoErrorKind::TimedOut => UpdateMetadataError::new(UpdateMetadataErrorKind::TimedOut),
1238 }
1239 }
1240}
1241
1242pub type UpdateMetadataError = Error<UpdateMetadataErrorKind>;
1243
1244#[derive(Clone, Copy, Debug, PartialEq)]
1245pub enum InfoErrorKind {
1246 InvalidName,
1247 NotFound,
1248 Other,
1249 TimedOut,
1250}
1251
1252impl Display for InfoErrorKind {
1253 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1254 match self {
1255 Self::InvalidName => write!(f, "invalid object name"),
1256 Self::Other => write!(f, "getting info failed"),
1257 Self::NotFound => write!(f, "not found"),
1258 Self::TimedOut => write!(f, "timed out"),
1259 }
1260 }
1261}
1262
1263pub type InfoError = Error<InfoErrorKind>;
1264
1265#[derive(Clone, Copy, Debug, PartialEq)]
1266pub enum GetErrorKind {
1267 InvalidName,
1268 ConsumerCreate,
1269 NotFound,
1270 BucketLink,
1271 Other,
1272 TimedOut,
1273}
1274
1275impl Display for GetErrorKind {
1276 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1277 match self {
1278 Self::ConsumerCreate => write!(f, "failed creating consumer for fetching object"),
1279 Self::Other => write!(f, "failed getting object"),
1280 Self::NotFound => write!(f, "object not found"),
1281 Self::TimedOut => write!(f, "timed out"),
1282 Self::InvalidName => write!(f, "invalid object name"),
1283 Self::BucketLink => write!(f, "object is a link to a bucket"),
1284 }
1285 }
1286}
1287
1288pub type GetError = Error<GetErrorKind>;
1289
1290crate::from_with_timeout!(GetError, GetErrorKind, ConsumerError, ConsumerErrorKind);
1291crate::from_with_timeout!(GetError, GetErrorKind, StreamError, StreamErrorKind);
1292
1293impl From<InfoError> for GetError {
1294 fn from(err: InfoError) -> Self {
1295 match err.kind() {
1296 InfoErrorKind::InvalidName => GetError::new(GetErrorKind::InvalidName),
1297 InfoErrorKind::NotFound => GetError::new(GetErrorKind::NotFound),
1298 InfoErrorKind::Other => GetError::with_source(GetErrorKind::Other, err),
1299 InfoErrorKind::TimedOut => GetError::new(GetErrorKind::TimedOut),
1300 }
1301 }
1302}
1303
1304#[derive(Clone, Copy, Debug, PartialEq)]
1305pub enum DeleteErrorKind {
1306 TimedOut,
1307 NotFound,
1308 Metadata,
1309 InvalidName,
1310 Chunks,
1311 Other,
1312}
1313
1314impl Display for DeleteErrorKind {
1315 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1316 match self {
1317 Self::TimedOut => write!(f, "timed out"),
1318 Self::Metadata => write!(f, "failed rolling up metadata"),
1319 Self::Chunks => write!(f, "failed purging chunks"),
1320 Self::Other => write!(f, "delete failed"),
1321 Self::NotFound => write!(f, "object not found"),
1322 Self::InvalidName => write!(f, "invalid object name"),
1323 }
1324 }
1325}
1326
1327pub type DeleteError = Error<DeleteErrorKind>;
1328
1329impl From<InfoError> for DeleteError {
1330 fn from(err: InfoError) -> Self {
1331 match err.kind() {
1332 InfoErrorKind::InvalidName => DeleteError::new(DeleteErrorKind::InvalidName),
1333 InfoErrorKind::NotFound => DeleteError::new(DeleteErrorKind::NotFound),
1334 InfoErrorKind::Other => DeleteError::with_source(DeleteErrorKind::Other, err),
1335 InfoErrorKind::TimedOut => DeleteError::new(DeleteErrorKind::TimedOut),
1336 }
1337 }
1338}
1339
1340crate::from_with_timeout!(DeleteError, DeleteErrorKind, PublishError, PublishErrorKind);
1341crate::from_with_timeout!(DeleteError, DeleteErrorKind, PurgeError, PurgeErrorKind);
1342
1343#[derive(Clone, Copy, Debug, PartialEq)]
1344pub enum PutErrorKind {
1345 InvalidName,
1346 ReadChunks,
1347 PublishChunks,
1348 PublishMetadata,
1349 PurgeOldChunks,
1350 TimedOut,
1351 Other,
1352}
1353
1354impl Display for PutErrorKind {
1355 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1356 match self {
1357 Self::PublishChunks => write!(f, "failed publishing object chunks"),
1358 Self::PublishMetadata => write!(f, "failed publishing metadata"),
1359 Self::PurgeOldChunks => write!(f, "failed purging old chunks"),
1360 Self::TimedOut => write!(f, "timed out"),
1361 Self::Other => write!(f, "error"),
1362 Self::InvalidName => write!(f, "invalid object name"),
1363 Self::ReadChunks => write!(f, "error while reading the buffer"),
1364 }
1365 }
1366}
1367
1368pub type PutError = Error<PutErrorKind>;
1369
1370pub type AddLinkError = Error<AddLinkErrorKind>;
1371
1372#[derive(Clone, Copy, Debug, PartialEq)]
1373pub enum AddLinkErrorKind {
1374 EmptyName,
1375 ObjectRequired,
1376 Deleted,
1377 LinkToLink,
1378 PublishMetadata,
1379 AlreadyExists,
1380 Other,
1381}
1382
1383impl Display for AddLinkErrorKind {
1384 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1385 match self {
1386 AddLinkErrorKind::ObjectRequired => write!(f, "cannot link to empty Object"),
1387 AddLinkErrorKind::Deleted => write!(f, "cannot link a deleted Object"),
1388 AddLinkErrorKind::LinkToLink => write!(f, "cannot link to another link"),
1389 AddLinkErrorKind::EmptyName => write!(f, "link name cannot be empty"),
1390 AddLinkErrorKind::PublishMetadata => write!(f, "failed publishing link metadata"),
1391 AddLinkErrorKind::Other => write!(f, "error"),
1392 AddLinkErrorKind::AlreadyExists => write!(f, "object already exists"),
1393 }
1394 }
1395}
1396
1397type PublishMetadataError = Error<PublishMetadataErrorKind>;
1398
1399#[derive(Clone, Copy, Debug, PartialEq)]
1400enum PublishMetadataErrorKind {
1401 PublishMetadata,
1402 Other,
1403}
1404
1405impl Display for PublishMetadataErrorKind {
1406 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1407 match self {
1408 PublishMetadataErrorKind::PublishMetadata => write!(f, "failed to publish metadata"),
1409 PublishMetadataErrorKind::Other => write!(f, "error"),
1410 }
1411 }
1412}
1413
1414impl From<PublishMetadataError> for AddLinkError {
1415 fn from(error: PublishMetadataError) -> Self {
1416 match error.kind {
1417 PublishMetadataErrorKind::PublishMetadata => {
1418 AddLinkError::new(AddLinkErrorKind::PublishMetadata)
1419 }
1420 PublishMetadataErrorKind::Other => {
1421 AddLinkError::with_source(AddLinkErrorKind::Other, error)
1422 }
1423 }
1424 }
1425}
1426impl From<PublishMetadataError> for PutError {
1427 fn from(error: PublishMetadataError) -> Self {
1428 match error.kind {
1429 PublishMetadataErrorKind::PublishMetadata => {
1430 PutError::new(PutErrorKind::PublishMetadata)
1431 }
1432 PublishMetadataErrorKind::Other => PutError::with_source(PutErrorKind::Other, error),
1433 }
1434 }
1435}
1436
1437#[derive(Clone, Copy, Debug, PartialEq)]
1438pub enum WatchErrorKind {
1439 TimedOut,
1440 ConsumerCreate,
1441 Other,
1442}
1443
1444impl Display for WatchErrorKind {
1445 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1446 match self {
1447 Self::ConsumerCreate => write!(f, "watch consumer creation failed"),
1448 Self::Other => write!(f, "watch failed"),
1449 Self::TimedOut => write!(f, "timed out"),
1450 }
1451 }
1452}
1453
1454pub type WatchError = Error<WatchErrorKind>;
1455
1456crate::from_with_timeout!(WatchError, WatchErrorKind, ConsumerError, ConsumerErrorKind);
1457crate::from_with_timeout!(WatchError, WatchErrorKind, StreamError, StreamErrorKind);
1458
1459pub type ListError = WatchError;
1460pub type ListErrorKind = WatchErrorKind;
1461
1462#[derive(Clone, Copy, Debug, PartialEq)]
1463pub enum SealErrorKind {
1464 TimedOut,
1465 Other,
1466 Info,
1467 Update,
1468}
1469
1470impl Display for SealErrorKind {
1471 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1472 match self {
1473 Self::TimedOut => write!(f, "timed out"),
1474 Self::Other => write!(f, "seal failed"),
1475 Self::Info => write!(f, "failed getting stream info before sealing bucket"),
1476 Self::Update => write!(f, "failed sealing the bucket"),
1477 }
1478 }
1479}
1480
1481pub type SealError = Error<SealErrorKind>;
1482
1483impl From<super::context::UpdateStreamError> for SealError {
1484 fn from(err: super::context::UpdateStreamError) -> Self {
1485 match err.kind() {
1486 super::context::CreateStreamErrorKind::TimedOut => {
1487 SealError::new(SealErrorKind::TimedOut)
1488 }
1489 _ => SealError::with_source(SealErrorKind::Update, err),
1490 }
1491 }
1492}
1493
1494#[derive(Clone, Copy, Debug, PartialEq)]
1495pub enum WatcherErrorKind {
1496 ConsumerError,
1497 Other,
1498}
1499
1500impl Display for WatcherErrorKind {
1501 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1502 match self {
1503 Self::ConsumerError => write!(f, "watcher consumer error"),
1504 Self::Other => write!(f, "watcher error"),
1505 }
1506 }
1507}
1508
1509pub type WatcherError = Error<WatcherErrorKind>;
1510
1511impl From<OrderedError> for WatcherError {
1512 fn from(err: OrderedError) -> Self {
1513 WatcherError::with_source(WatcherErrorKind::ConsumerError, err)
1514 }
1515}
1516
1517pub type ListerError = WatcherError;
1518pub type ListerErrorKind = WatcherErrorKind;