1use std::collections::{HashMap, VecDeque};
16use std::fmt::Display;
17use std::{cmp, str::FromStr, task::Poll, time::Duration};
18
19use crate::crypto::Sha256;
20use crate::subject::Subject;
21use crate::{HeaderMap, HeaderValue};
22use base64::engine::general_purpose::URL_SAFE;
23use base64::engine::Engine;
24use bytes::BytesMut;
25use futures::future::BoxFuture;
26use once_cell::sync::Lazy;
27use tokio::io::AsyncReadExt;
28
29use futures::{Stream, StreamExt};
30use regex::Regex;
31use serde::{Deserialize, Serialize};
32use tracing::{debug, trace};
33
34use super::consumer::push::{OrderedConfig, OrderedError};
35use super::consumer::{DeliverPolicy, StreamError, StreamErrorKind};
36use super::context::{PublishError, PublishErrorKind};
37use super::stream::{self, ConsumerError, ConsumerErrorKind, PurgeError, PurgeErrorKind};
38use super::{consumer::push::Ordered, stream::StorageType};
39use crate::error::Error;
40use time::{serde::rfc3339, OffsetDateTime};
41
42const DEFAULT_CHUNK_SIZE: usize = 128 * 1024;
43const NATS_ROLLUP: &str = "Nats-Rollup";
44const ROLLUP_SUBJECT: &str = "sub";
45
46static BUCKET_NAME_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\A[a-zA-Z0-9_-]+\z").unwrap());
47static OBJECT_NAME_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\A[-/_=\.a-zA-Z0-9]+\z").unwrap());
48
49pub(crate) fn is_valid_bucket_name(bucket_name: &str) -> bool {
50 BUCKET_NAME_RE.is_match(bucket_name)
51}
52
53pub(crate) fn is_valid_object_name(object_name: &str) -> bool {
54 if object_name.is_empty() || object_name.starts_with('.') || object_name.ends_with('.') {
55 return false;
56 }
57
58 OBJECT_NAME_RE.is_match(object_name)
59}
60
61pub(crate) fn encode_object_name(object_name: &str) -> String {
62 URL_SAFE.encode(object_name)
63}
64
65#[derive(Debug, Default, Clone, Serialize, Deserialize)]
67pub struct Config {
68 pub bucket: String,
70 pub description: Option<String>,
72 #[serde(default, with = "serde_nanos")]
74 pub max_age: Duration,
75 pub max_bytes: i64,
77 pub storage: StorageType,
79 pub num_replicas: usize,
81 pub compression: bool,
83 pub placement: Option<stream::Placement>,
85}
86
87#[derive(Clone)]
89pub struct ObjectStore {
90 pub(crate) name: String,
91 pub(crate) stream: crate::jetstream::stream::Stream,
92}
93
94impl ObjectStore {
95 pub async fn get<T: AsRef<str> + Send>(&self, object_name: T) -> Result<Object, GetError> {
119 self.get_impl(object_name).await
120 }
121
122 fn get_impl<'bucket, 'future, T>(
123 &'bucket self,
124 object_name: T,
125 ) -> BoxFuture<'future, Result<Object, GetError>>
126 where
127 T: AsRef<str> + Send + 'future,
128 'bucket: 'future,
129 {
130 Box::pin(async move {
131 let object_info = self.info(object_name).await?;
132 if object_info.deleted {
133 return Err(GetError::new(GetErrorKind::NotFound));
134 }
135 if let Some(ref options) = object_info.options {
136 if let Some(link) = options.link.as_ref() {
137 if let Some(link_name) = link.name.as_ref() {
138 let link_name = link_name.clone();
139 debug!("getting object via link");
140 if link.bucket == self.name {
141 return self.get_impl(link_name).await;
142 } else {
143 let bucket = self
144 .stream
145 .context
146 .get_object_store(&link.bucket)
147 .await
148 .map_err(|err| {
149 GetError::with_source(GetErrorKind::Other, err)
150 })?;
151 let object = bucket.get_impl(&link_name).await?;
152 return Ok(object);
153 }
154 } else {
155 return Err(GetError::new(GetErrorKind::BucketLink));
156 }
157 }
158 }
159
160 debug!("not a link. Getting the object");
161 Ok(Object::new(object_info, self.stream.clone()))
162 })
163 }
164
165 pub async fn delete<T: AsRef<str>>(&self, object_name: T) -> Result<(), DeleteError> {
181 let object_name = object_name.as_ref();
182 let mut object_info = self.info(object_name).await?;
183 object_info.chunks = 0;
184 object_info.size = 0;
185 object_info.deleted = true;
186
187 let data = serde_json::to_vec(&object_info).map_err(|err| {
188 DeleteError::with_source(
189 DeleteErrorKind::Other,
190 format!("failed deserializing object info: {}", err),
191 )
192 })?;
193
194 let mut headers = HeaderMap::default();
195 headers.insert(
196 NATS_ROLLUP,
197 HeaderValue::from_str(ROLLUP_SUBJECT).map_err(|err| {
198 DeleteError::with_source(
199 DeleteErrorKind::Other,
200 format!("failed parsing header: {}", err),
201 )
202 })?,
203 );
204
205 let subject = format!("$O.{}.M.{}", &self.name, encode_object_name(object_name));
206
207 self.stream
208 .context
209 .publish_with_headers(subject, headers, data.into())
210 .await?
211 .await?;
212
213 let chunk_subject = format!("$O.{}.C.{}", self.name, object_info.nuid);
214
215 self.stream.purge().filter(&chunk_subject).await?;
216
217 Ok(())
218 }
219
220 pub async fn info<T: AsRef<str>>(&self, object_name: T) -> Result<ObjectInfo, InfoError> {
236 let object_name = object_name.as_ref();
237 let object_name = encode_object_name(object_name);
238 if !is_valid_object_name(&object_name) {
239 return Err(InfoError::new(InfoErrorKind::InvalidName));
240 }
241
242 let subject = format!("$O.{}.M.{}", &self.name, &object_name);
244
245 let message = self
247 .stream
248 .get_last_raw_message_by_subject(subject.as_str())
249 .await
250 .map_err(|err| match err.kind() {
251 stream::LastRawMessageErrorKind::NoMessageFound => {
252 InfoError::new(InfoErrorKind::NotFound)
253 }
254 _ => InfoError::with_source(InfoErrorKind::Other, err),
255 })?;
256 let object_info =
257 serde_json::from_slice::<ObjectInfo>(&message.payload).map_err(|err| {
258 InfoError::with_source(
259 InfoErrorKind::Other,
260 format!("failed to decode info payload: {}", err),
261 )
262 })?;
263
264 Ok(object_info)
265 }
266
267 pub async fn put<T>(
285 &self,
286 meta: T,
287 data: &mut (impl tokio::io::AsyncRead + std::marker::Unpin),
288 ) -> Result<ObjectInfo, PutError>
289 where
290 ObjectMetadata: From<T>,
291 {
292 let object_meta: ObjectMetadata = meta.into();
293
294 let maybe_existing_object_info = (self.info(&object_meta.name).await).ok();
296
297 let object_nuid = nuid::next();
298 let chunk_subject = Subject::from(format!("$O.{}.C.{}", &self.name, &object_nuid));
299
300 let mut object_chunks = 0;
301 let mut object_size = 0;
302
303 let chunk_size = object_meta.chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE);
304 let mut buffer = BytesMut::with_capacity(chunk_size);
305 let mut sha256 = Sha256::new();
306
307 loop {
308 let n = data
309 .read_buf(&mut buffer)
310 .await
311 .map_err(|err| PutError::with_source(PutErrorKind::ReadChunks, err))?;
312
313 if n == 0 {
314 break;
315 }
316
317 let payload = buffer.split().freeze();
318 sha256.update(&payload);
319
320 object_size += payload.len();
321 object_chunks += 1;
322
323 self.stream
324 .context
325 .publish(chunk_subject.clone(), payload)
326 .await
327 .map_err(|err| {
328 PutError::with_source(
329 PutErrorKind::PublishChunks,
330 format!("failed chunk publish: {}", err),
331 )
332 })?
333 .await
334 .map_err(|err| {
335 PutError::with_source(
336 PutErrorKind::PublishChunks,
337 format!("failed getting chunk ack: {}", err),
338 )
339 })?;
340 }
341 let digest = sha256.finish();
342
343 let encoded_object_name = encode_object_name(&object_meta.name);
344 if !is_valid_object_name(&encoded_object_name) {
345 return Err(PutError::new(PutErrorKind::InvalidName));
346 }
347 let subject = format!("$O.{}.M.{}", &self.name, &encoded_object_name);
348
349 let object_info = ObjectInfo {
350 name: object_meta.name,
351 description: object_meta.description,
352 options: Some(ObjectOptions {
353 max_chunk_size: Some(chunk_size),
354 link: None,
355 }),
356 bucket: self.name.clone(),
357 nuid: object_nuid.to_string(),
358 chunks: object_chunks,
359 size: object_size,
360 digest: Some(format!("SHA-256={}", URL_SAFE.encode(digest))),
361 modified: Some(OffsetDateTime::now_utc()),
362 deleted: false,
363 metadata: object_meta.metadata,
364 headers: object_meta.headers,
365 };
366
367 let mut headers = HeaderMap::new();
368 headers.insert(
369 NATS_ROLLUP,
370 ROLLUP_SUBJECT.parse::<HeaderValue>().map_err(|err| {
371 PutError::with_source(
372 PutErrorKind::Other,
373 format!("failed parsing header: {}", err),
374 )
375 })?,
376 );
377 let data = serde_json::to_vec(&object_info).map_err(|err| {
378 PutError::with_source(
379 PutErrorKind::Other,
380 format!("failed serializing object info: {}", err),
381 )
382 })?;
383
384 self.stream
386 .context
387 .publish_with_headers(subject, headers, data.into())
388 .await
389 .map_err(|err| {
390 PutError::with_source(
391 PutErrorKind::PublishMetadata,
392 format!("failed publishing metadata: {}", err),
393 )
394 })?
395 .await
396 .map_err(|err| {
397 PutError::with_source(
398 PutErrorKind::PublishMetadata,
399 format!("failed ack from metadata publish: {}", err),
400 )
401 })?;
402
403 if let Some(existing_object_info) = maybe_existing_object_info {
405 let chunk_subject = format!("$O.{}.C.{}", &self.name, &existing_object_info.nuid);
406
407 self.stream
408 .purge()
409 .filter(&chunk_subject)
410 .await
411 .map_err(|err| PutError::with_source(PutErrorKind::PurgeOldChunks, err))?;
412 }
413
414 Ok(object_info)
415 }
416
417 pub async fn watch(&self) -> Result<Watch, WatchError> {
437 self.watch_with_deliver_policy(DeliverPolicy::New).await
438 }
439
440 pub async fn watch_with_history(&self) -> Result<Watch, WatchError> {
443 self.watch_with_deliver_policy(DeliverPolicy::LastPerSubject)
444 .await
445 }
446
447 async fn watch_with_deliver_policy(
448 &self,
449 deliver_policy: DeliverPolicy,
450 ) -> Result<Watch, WatchError> {
451 let subject = format!("$O.{}.M.>", self.name);
452 let ordered = self
453 .stream
454 .create_consumer(crate::jetstream::consumer::push::OrderedConfig {
455 deliver_policy,
456 deliver_subject: self.stream.context.client.new_inbox(),
457 description: Some("object store watcher".to_string()),
458 filter_subject: subject,
459 ..Default::default()
460 })
461 .await?;
462 Ok(Watch {
463 subscription: ordered.messages().await?,
464 })
465 }
466
467 pub async fn list(&self) -> Result<List, ListError> {
487 trace!("starting Object List");
488 let subject = format!("$O.{}.M.>", self.name);
489 let ordered = self
490 .stream
491 .create_consumer(crate::jetstream::consumer::push::OrderedConfig {
492 deliver_policy: super::consumer::DeliverPolicy::All,
493 deliver_subject: self.stream.context.client.new_inbox(),
494 description: Some("object store list".to_string()),
495 filter_subject: subject,
496 ..Default::default()
497 })
498 .await?;
499 Ok(List {
500 done: ordered.info.num_pending == 0,
501 subscription: Some(ordered.messages().await?),
502 })
503 }
504
505 pub async fn seal(&mut self) -> Result<(), SealError> {
522 let mut stream_config = self
523 .stream
524 .info()
525 .await
526 .map_err(|err| SealError::with_source(SealErrorKind::Info, err))?
527 .to_owned();
528 stream_config.config.sealed = true;
529
530 self.stream
531 .context
532 .update_stream(&stream_config.config)
533 .await?;
534 Ok(())
535 }
536
537 pub async fn update_metadata<A: AsRef<str>>(
563 &self,
564 object: A,
565 metadata: UpdateMetadata,
566 ) -> Result<ObjectInfo, UpdateMetadataError> {
567 let mut info = self.info(object.as_ref()).await?;
568
569 if metadata.name != info.name {
572 tracing::info!("new metadata name is different than then old one");
573 if !is_valid_object_name(&metadata.name) {
574 return Err(UpdateMetadataError::new(
575 UpdateMetadataErrorKind::InvalidName,
576 ));
577 }
578 match self.info(&metadata.name).await {
579 Ok(_) => {
580 return Err(UpdateMetadataError::new(
581 UpdateMetadataErrorKind::NameAlreadyInUse,
582 ))
583 }
584 Err(err) => match err.kind() {
585 InfoErrorKind::NotFound => {
586 tracing::info!("purging old metadata: {}", info.name);
587 self.stream
588 .purge()
589 .filter(format!(
590 "$O.{}.M.{}",
591 self.name,
592 encode_object_name(&info.name)
593 ))
594 .await
595 .map_err(|err| {
596 UpdateMetadataError::with_source(
597 UpdateMetadataErrorKind::Purge,
598 err,
599 )
600 })?;
601 }
602 _ => {
603 return Err(UpdateMetadataError::with_source(
604 UpdateMetadataErrorKind::Other,
605 err,
606 ))
607 }
608 },
609 }
610 }
611
612 info.name = metadata.name;
613 info.description = metadata.description;
614
615 let name = encode_object_name(&info.name);
616 let subject = format!("$O.{}.M.{}", &self.name, &name);
617
618 let mut headers = HeaderMap::new();
619 headers.insert(
620 NATS_ROLLUP,
621 ROLLUP_SUBJECT.parse::<HeaderValue>().map_err(|err| {
622 UpdateMetadataError::with_source(
623 UpdateMetadataErrorKind::Other,
624 format!("failed parsing header: {}", err),
625 )
626 })?,
627 );
628 let data = serde_json::to_vec(&info).map_err(|err| {
629 UpdateMetadataError::with_source(
630 UpdateMetadataErrorKind::Other,
631 format!("failed serializing object info: {}", err),
632 )
633 })?;
634
635 self.stream
637 .context
638 .publish_with_headers(subject, headers, data.into())
639 .await
640 .map_err(|err| {
641 UpdateMetadataError::with_source(
642 UpdateMetadataErrorKind::PublishMetadata,
643 format!("failed publishing metadata: {}", err),
644 )
645 })?
646 .await
647 .map_err(|err| {
648 UpdateMetadataError::with_source(
649 UpdateMetadataErrorKind::PublishMetadata,
650 format!("failed ack from metadata publish: {}", err),
651 )
652 })?;
653
654 Ok(info)
655 }
656
657 pub async fn add_link<T, O>(&self, name: T, object: O) -> Result<ObjectInfo, AddLinkError>
677 where
678 T: ToString,
679 O: AsObjectInfo,
680 {
681 let object = object.as_info();
682 let name = name.to_string();
683 if name.is_empty() {
684 return Err(AddLinkError::new(AddLinkErrorKind::EmptyName));
685 }
686 if object.name.is_empty() {
687 return Err(AddLinkError::new(AddLinkErrorKind::ObjectRequired));
688 }
689 if object.deleted {
690 return Err(AddLinkError::new(AddLinkErrorKind::Deleted));
691 }
692 if let Some(ref options) = object.options {
693 if options.link.is_some() {
694 return Err(AddLinkError::new(AddLinkErrorKind::LinkToLink));
695 }
696 }
697 match self.info(&name).await {
698 Ok(info) => {
699 if let Some(options) = info.options {
700 if options.link.is_none() {
701 return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists));
702 }
703 } else {
704 return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists));
705 }
706 }
707 Err(err) if err.kind() != InfoErrorKind::NotFound => {
708 return Err(AddLinkError::with_source(AddLinkErrorKind::Other, err))
709 }
710 _ => (),
711 }
712
713 let info = ObjectInfo {
714 name,
715 description: None,
716 options: Some(ObjectOptions {
717 link: Some(ObjectLink {
718 name: Some(object.name.clone()),
719 bucket: object.bucket.clone(),
720 }),
721 max_chunk_size: None,
722 }),
723 bucket: self.name.clone(),
724 nuid: nuid::next().to_string(),
725 size: 0,
726 chunks: 0,
727 modified: Some(OffsetDateTime::now_utc()),
728 digest: None,
729 deleted: false,
730 metadata: HashMap::default(),
731 headers: None,
732 };
733 publish_meta(self, &info).await?;
734 Ok(info)
735 }
736
737 pub async fn add_bucket_link<T: ToString, U: ToString>(
757 &self,
758 name: T,
759 bucket: U,
760 ) -> Result<ObjectInfo, AddLinkError> {
761 let name = name.to_string();
762 let bucket = bucket.to_string();
763 if name.is_empty() {
764 return Err(AddLinkError::new(AddLinkErrorKind::EmptyName));
765 }
766
767 match self.info(&name).await {
768 Ok(info) => {
769 if let Some(options) = info.options {
770 if options.link.is_none() {
771 return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists));
772 }
773 }
774 }
775 Err(err) if err.kind() != InfoErrorKind::NotFound => {
776 return Err(AddLinkError::with_source(AddLinkErrorKind::Other, err))
777 }
778 _ => (),
779 }
780
781 let info = ObjectInfo {
782 name: name.clone(),
783 description: None,
784 options: Some(ObjectOptions {
785 link: Some(ObjectLink { name: None, bucket }),
786 max_chunk_size: None,
787 }),
788 bucket: self.name.clone(),
789 nuid: nuid::next().to_string(),
790 size: 0,
791 chunks: 0,
792 modified: Some(OffsetDateTime::now_utc()),
793 digest: None,
794 deleted: false,
795 metadata: HashMap::default(),
796 headers: None,
797 };
798 publish_meta(self, &info).await?;
799 Ok(info)
800 }
801}
802
803async fn publish_meta(store: &ObjectStore, info: &ObjectInfo) -> Result<(), PublishMetadataError> {
804 let encoded_object_name = encode_object_name(&info.name);
805 let subject = format!("$O.{}.M.{}", &store.name, &encoded_object_name);
806
807 let mut headers = HeaderMap::new();
808 headers.insert(
809 NATS_ROLLUP,
810 ROLLUP_SUBJECT.parse::<HeaderValue>().map_err(|err| {
811 PublishMetadataError::with_source(
812 PublishMetadataErrorKind::Other,
813 format!("failed parsing header: {}", err),
814 )
815 })?,
816 );
817 let data = serde_json::to_vec(&info).map_err(|err| {
818 PublishMetadataError::with_source(
819 PublishMetadataErrorKind::Other,
820 format!("failed serializing object info: {}", err),
821 )
822 })?;
823
824 store
825 .stream
826 .context
827 .publish_with_headers(subject, headers, data.into())
828 .await
829 .map_err(|err| {
830 PublishMetadataError::with_source(
831 PublishMetadataErrorKind::PublishMetadata,
832 format!("failed publishing metadata: {}", err),
833 )
834 })?
835 .await
836 .map_err(|err| {
837 PublishMetadataError::with_source(
838 PublishMetadataErrorKind::PublishMetadata,
839 format!("failed ack from metadata publish: {}", err),
840 )
841 })?;
842 Ok(())
843}
844
845pub struct Watch {
846 subscription: crate::jetstream::consumer::push::Ordered,
847}
848
849impl Stream for Watch {
850 type Item = Result<ObjectInfo, WatcherError>;
851
852 fn poll_next(
853 mut self: std::pin::Pin<&mut Self>,
854 cx: &mut std::task::Context<'_>,
855 ) -> Poll<Option<Self::Item>> {
856 match self.subscription.poll_next_unpin(cx) {
857 Poll::Ready(message) => match message {
858 Some(message) => Poll::Ready(
859 serde_json::from_slice::<ObjectInfo>(&message?.payload)
860 .map_err(|err| {
861 WatcherError::with_source(
862 WatcherErrorKind::Other,
863 format!("failed to deserialize object info: {}", err),
864 )
865 })
866 .map_or_else(|err| Some(Err(err)), |result| Some(Ok(result))),
867 ),
868 None => Poll::Ready(None),
869 },
870 Poll::Pending => Poll::Pending,
871 }
872 }
873}
874
875pub struct List {
876 subscription: Option<crate::jetstream::consumer::push::Ordered>,
877 done: bool,
878}
879
880impl Stream for List {
881 type Item = Result<ObjectInfo, ListerError>;
882
883 fn poll_next(
884 mut self: std::pin::Pin<&mut Self>,
885 cx: &mut std::task::Context<'_>,
886 ) -> Poll<Option<Self::Item>> {
887 loop {
888 if self.done {
889 debug!("Object Store list done");
890 self.subscription = None;
891 return Poll::Ready(None);
892 }
893
894 if let Some(subscription) = self.subscription.as_mut() {
895 match subscription.poll_next_unpin(cx) {
896 Poll::Ready(message) => match message {
897 None => return Poll::Ready(None),
898 Some(message) => {
899 let message = message?;
900 let info = message.info().map_err(|err| {
901 ListerError::with_source(ListerErrorKind::Other, err)
902 })?;
903 trace!("num pending: {}", info.pending);
904 if info.pending == 0 {
905 self.done = true;
906 }
907 let response: ObjectInfo = serde_json::from_slice(&message.payload)
908 .map_err(|err| {
909 ListerError::with_source(
910 ListerErrorKind::Other,
911 format!("failed deserializing object info: {}", err),
912 )
913 })?;
914 if response.deleted {
915 continue;
916 }
917 return Poll::Ready(Some(Ok(response)));
918 }
919 },
920 Poll::Pending => return Poll::Pending,
921 }
922 } else {
923 return Poll::Ready(None);
924 }
925 }
926 }
927}
928
929pub struct Object {
931 pub info: ObjectInfo,
932 remaining_bytes: VecDeque<u8>,
933 has_pending_messages: bool,
934 digest: Option<Sha256>,
935 subscription: Option<crate::jetstream::consumer::push::Ordered>,
936 subscription_future: Option<BoxFuture<'static, Result<Ordered, StreamError>>>,
937 stream: crate::jetstream::stream::Stream,
938}
939
940impl std::fmt::Debug for Object {
941 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
942 f.debug_struct("Object")
943 .field("info", &self.info)
944 .field("remaining_bytes", &self.remaining_bytes)
945 .field("has_pending_messages", &self.has_pending_messages)
946 .finish()
947 }
948}
949
950impl Object {
951 pub(crate) fn new(info: ObjectInfo, stream: stream::Stream) -> Self {
952 let has_pending_messages = info.chunks > 0;
953 Object {
954 subscription: None,
955 info,
956 remaining_bytes: VecDeque::new(),
957 has_pending_messages,
958 digest: Some(Sha256::new()),
959 subscription_future: None,
960 stream,
961 }
962 }
963
964 pub fn info(&self) -> &ObjectInfo {
966 &self.info
967 }
968}
969
970impl tokio::io::AsyncRead for Object {
971 fn poll_read(
972 mut self: std::pin::Pin<&mut Self>,
973 cx: &mut std::task::Context<'_>,
974 buf: &mut tokio::io::ReadBuf<'_>,
975 ) -> std::task::Poll<std::io::Result<()>> {
976 let (buf1, _buf2) = self.remaining_bytes.as_slices();
977 if !buf1.is_empty() {
978 let len = cmp::min(buf.remaining(), buf1.len());
979 buf.put_slice(&buf1[..len]);
980 self.remaining_bytes.drain(..len);
981 return Poll::Ready(Ok(()));
982 }
983
984 if self.has_pending_messages {
985 if self.subscription.is_none() {
986 let future = match self.subscription_future.as_mut() {
987 Some(future) => future,
988 None => {
989 let stream = self.stream.clone();
990 let bucket = self.info.bucket.clone();
991 let nuid = self.info.nuid.clone();
992 self.subscription_future.insert(Box::pin(async move {
993 stream
994 .create_consumer(OrderedConfig {
995 deliver_subject: stream.context.client.new_inbox(),
996 filter_subject: format!("$O.{}.C.{}", bucket, nuid),
997 ..Default::default()
998 })
999 .await
1000 .unwrap()
1001 .messages()
1002 .await
1003 }))
1004 }
1005 };
1006 match future.as_mut().poll(cx) {
1007 Poll::Ready(subscription) => {
1008 self.subscription = Some(subscription.unwrap());
1009 }
1010 Poll::Pending => (),
1011 }
1012 }
1013 if let Some(subscription) = self.subscription.as_mut() {
1014 match subscription.poll_next_unpin(cx) {
1015 Poll::Ready(message) => match message {
1016 Some(message) => {
1017 let message = message.map_err(|err| {
1018 std::io::Error::other(format!(
1019 "error from JetStream subscription: {err}"
1020 ))
1021 })?;
1022 let len = cmp::min(buf.remaining(), message.payload.len());
1023 buf.put_slice(&message.payload[..len]);
1024 if let Some(context) = &mut self.digest {
1025 context.update(&message.payload);
1026 }
1027 self.remaining_bytes.extend(&message.payload[len..]);
1028
1029 let info = message.info().map_err(|err| {
1030 std::io::Error::other(format!(
1031 "error from JetStream subscription: {err}"
1032 ))
1033 })?;
1034 if info.pending == 0 {
1035 let digest = self.digest.take().map(Sha256::finish);
1036 if let Some(digest) = digest {
1037 if self
1038 .info
1039 .digest
1040 .as_ref()
1041 .map(|digest_self| {
1042 format!("SHA-256={}", URL_SAFE.encode(digest))
1043 != *digest_self
1044 })
1045 .unwrap_or(false)
1046 {
1047 return Poll::Ready(Err(std::io::Error::new(
1048 std::io::ErrorKind::InvalidData,
1049 "wrong digest",
1050 )));
1051 }
1052 } else {
1053 return Poll::Ready(Err(std::io::Error::new(
1054 std::io::ErrorKind::InvalidData,
1055 "digest should be Some",
1056 )));
1057 }
1058 self.has_pending_messages = false;
1059 self.subscription = None;
1060 }
1061 Poll::Ready(Ok(()))
1062 }
1063 None => Poll::Ready(Err(std::io::Error::other(
1064 "subscription ended before reading whole object",
1065 ))),
1066 },
1067 Poll::Pending => Poll::Pending,
1068 }
1069 } else {
1070 Poll::Pending
1071 }
1072 } else {
1073 Poll::Ready(Ok(()))
1074 }
1075 }
1076}
1077
1078#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
1079pub struct ObjectOptions {
1080 pub link: Option<ObjectLink>,
1081 pub max_chunk_size: Option<usize>,
1082}
1083
1084#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
1086pub struct ObjectInfo {
1087 pub name: String,
1089 #[serde(default)]
1091 pub description: Option<String>,
1092 #[serde(default)]
1094 pub metadata: HashMap<String, String>,
1095 #[serde(default)]
1097 pub headers: Option<HeaderMap>,
1098 #[serde(default)]
1100 pub options: Option<ObjectOptions>,
1101 pub bucket: String,
1103 #[serde(default)]
1105 pub nuid: String,
1106 #[serde(default)]
1108 pub size: usize,
1109 #[serde(default)]
1111 pub chunks: usize,
1112 #[serde(default, with = "rfc3339::option")]
1114 #[serde(rename = "mtime")]
1115 pub modified: Option<time::OffsetDateTime>,
1116 #[serde(default, skip_serializing_if = "Option::is_none")]
1118 pub digest: Option<String>,
1119 #[serde(default, skip_serializing_if = "is_default")]
1121 pub deleted: bool,
1122}
1123
1124fn is_default<T: Default + Eq>(t: &T) -> bool {
1125 t == &T::default()
1126}
1127#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
1129pub struct ObjectLink {
1130 pub name: Option<String>,
1132 pub bucket: String,
1134}
1135
1136#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
1137pub struct UpdateMetadata {
1138 pub name: String,
1140 pub description: Option<String>,
1142 #[serde(default)]
1144 pub metadata: HashMap<String, String>,
1145 pub headers: Option<HeaderMap>,
1147}
1148
1149#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
1151pub struct ObjectMetadata {
1152 pub name: String,
1154 pub description: Option<String>,
1156 pub chunk_size: Option<usize>,
1158 #[serde(default)]
1160 pub metadata: HashMap<String, String>,
1161 pub headers: Option<HeaderMap>,
1163}
1164
1165impl From<&str> for ObjectMetadata {
1166 fn from(s: &str) -> ObjectMetadata {
1167 ObjectMetadata {
1168 name: s.to_string(),
1169 ..Default::default()
1170 }
1171 }
1172}
1173
1174pub trait AsObjectInfo {
1175 fn as_info(&self) -> &ObjectInfo;
1176}
1177
1178impl AsObjectInfo for &Object {
1179 fn as_info(&self) -> &ObjectInfo {
1180 &self.info
1181 }
1182}
1183impl AsObjectInfo for &ObjectInfo {
1184 fn as_info(&self) -> &ObjectInfo {
1185 self
1186 }
1187}
1188
1189impl From<ObjectInfo> for ObjectMetadata {
1190 fn from(info: ObjectInfo) -> Self {
1191 ObjectMetadata {
1192 name: info.name,
1193 description: info.description,
1194 metadata: info.metadata,
1195 headers: info.headers,
1196 chunk_size: None,
1197 }
1198 }
1199}
1200
1201#[derive(Debug, PartialEq, Clone)]
1202pub enum UpdateMetadataErrorKind {
1203 InvalidName,
1204 NotFound,
1205 TimedOut,
1206 Other,
1207 PublishMetadata,
1208 NameAlreadyInUse,
1209 Purge,
1210}
1211
1212impl Display for UpdateMetadataErrorKind {
1213 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1214 match self {
1215 Self::InvalidName => write!(f, "invalid object name"),
1216 Self::NotFound => write!(f, "object not found"),
1217 Self::TimedOut => write!(f, "timed out"),
1218 Self::Other => write!(f, "error"),
1219 Self::PublishMetadata => {
1220 write!(f, "failed publishing metadata")
1221 }
1222 Self::NameAlreadyInUse => {
1223 write!(f, "object with updated name already exists")
1224 }
1225 Self::Purge => write!(f, "failed purging old name metadata"),
1226 }
1227 }
1228}
1229
1230impl From<InfoError> for UpdateMetadataError {
1231 fn from(error: InfoError) -> Self {
1232 match error.kind() {
1233 InfoErrorKind::InvalidName => {
1234 UpdateMetadataError::new(UpdateMetadataErrorKind::InvalidName)
1235 }
1236 InfoErrorKind::NotFound => UpdateMetadataError::new(UpdateMetadataErrorKind::NotFound),
1237 InfoErrorKind::Other => {
1238 UpdateMetadataError::with_source(UpdateMetadataErrorKind::Other, error)
1239 }
1240 InfoErrorKind::TimedOut => UpdateMetadataError::new(UpdateMetadataErrorKind::TimedOut),
1241 }
1242 }
1243}
1244
1245pub type UpdateMetadataError = Error<UpdateMetadataErrorKind>;
1246
1247#[derive(Clone, Copy, Debug, PartialEq)]
1248pub enum InfoErrorKind {
1249 InvalidName,
1250 NotFound,
1251 Other,
1252 TimedOut,
1253}
1254
1255impl Display for InfoErrorKind {
1256 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1257 match self {
1258 Self::InvalidName => write!(f, "invalid object name"),
1259 Self::Other => write!(f, "getting info failed"),
1260 Self::NotFound => write!(f, "not found"),
1261 Self::TimedOut => write!(f, "timed out"),
1262 }
1263 }
1264}
1265
1266pub type InfoError = Error<InfoErrorKind>;
1267
1268#[derive(Clone, Copy, Debug, PartialEq)]
1269pub enum GetErrorKind {
1270 InvalidName,
1271 ConsumerCreate,
1272 NotFound,
1273 BucketLink,
1274 Other,
1275 TimedOut,
1276}
1277
1278impl Display for GetErrorKind {
1279 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1280 match self {
1281 Self::ConsumerCreate => write!(f, "failed creating consumer for fetching object"),
1282 Self::Other => write!(f, "failed getting object"),
1283 Self::NotFound => write!(f, "object not found"),
1284 Self::TimedOut => write!(f, "timed out"),
1285 Self::InvalidName => write!(f, "invalid object name"),
1286 Self::BucketLink => write!(f, "object is a link to a bucket"),
1287 }
1288 }
1289}
1290
1291pub type GetError = Error<GetErrorKind>;
1292
1293crate::from_with_timeout!(GetError, GetErrorKind, ConsumerError, ConsumerErrorKind);
1294crate::from_with_timeout!(GetError, GetErrorKind, StreamError, StreamErrorKind);
1295
1296impl From<InfoError> for GetError {
1297 fn from(err: InfoError) -> Self {
1298 match err.kind() {
1299 InfoErrorKind::InvalidName => GetError::new(GetErrorKind::InvalidName),
1300 InfoErrorKind::NotFound => GetError::new(GetErrorKind::NotFound),
1301 InfoErrorKind::Other => GetError::with_source(GetErrorKind::Other, err),
1302 InfoErrorKind::TimedOut => GetError::new(GetErrorKind::TimedOut),
1303 }
1304 }
1305}
1306
1307#[derive(Clone, Copy, Debug, PartialEq)]
1308pub enum DeleteErrorKind {
1309 TimedOut,
1310 NotFound,
1311 Metadata,
1312 InvalidName,
1313 Chunks,
1314 Other,
1315}
1316
1317impl Display for DeleteErrorKind {
1318 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1319 match self {
1320 Self::TimedOut => write!(f, "timed out"),
1321 Self::Metadata => write!(f, "failed rolling up metadata"),
1322 Self::Chunks => write!(f, "failed purging chunks"),
1323 Self::Other => write!(f, "delete failed"),
1324 Self::NotFound => write!(f, "object not found"),
1325 Self::InvalidName => write!(f, "invalid object name"),
1326 }
1327 }
1328}
1329
1330pub type DeleteError = Error<DeleteErrorKind>;
1331
1332impl From<InfoError> for DeleteError {
1333 fn from(err: InfoError) -> Self {
1334 match err.kind() {
1335 InfoErrorKind::InvalidName => DeleteError::new(DeleteErrorKind::InvalidName),
1336 InfoErrorKind::NotFound => DeleteError::new(DeleteErrorKind::NotFound),
1337 InfoErrorKind::Other => DeleteError::with_source(DeleteErrorKind::Other, err),
1338 InfoErrorKind::TimedOut => DeleteError::new(DeleteErrorKind::TimedOut),
1339 }
1340 }
1341}
1342
1343crate::from_with_timeout!(DeleteError, DeleteErrorKind, PublishError, PublishErrorKind);
1344crate::from_with_timeout!(DeleteError, DeleteErrorKind, PurgeError, PurgeErrorKind);
1345
1346#[derive(Clone, Copy, Debug, PartialEq)]
1347pub enum PutErrorKind {
1348 InvalidName,
1349 ReadChunks,
1350 PublishChunks,
1351 PublishMetadata,
1352 PurgeOldChunks,
1353 TimedOut,
1354 Other,
1355}
1356
1357impl Display for PutErrorKind {
1358 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1359 match self {
1360 Self::PublishChunks => write!(f, "failed publishing object chunks"),
1361 Self::PublishMetadata => write!(f, "failed publishing metadata"),
1362 Self::PurgeOldChunks => write!(f, "failed purging old chunks"),
1363 Self::TimedOut => write!(f, "timed out"),
1364 Self::Other => write!(f, "error"),
1365 Self::InvalidName => write!(f, "invalid object name"),
1366 Self::ReadChunks => write!(f, "error while reading the buffer"),
1367 }
1368 }
1369}
1370
1371pub type PutError = Error<PutErrorKind>;
1372
1373pub type AddLinkError = Error<AddLinkErrorKind>;
1374
1375#[derive(Clone, Copy, Debug, PartialEq)]
1376pub enum AddLinkErrorKind {
1377 EmptyName,
1378 ObjectRequired,
1379 Deleted,
1380 LinkToLink,
1381 PublishMetadata,
1382 AlreadyExists,
1383 Other,
1384}
1385
1386impl Display for AddLinkErrorKind {
1387 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1388 match self {
1389 AddLinkErrorKind::ObjectRequired => write!(f, "cannot link to empty Object"),
1390 AddLinkErrorKind::Deleted => write!(f, "cannot link a deleted Object"),
1391 AddLinkErrorKind::LinkToLink => write!(f, "cannot link to another link"),
1392 AddLinkErrorKind::EmptyName => write!(f, "link name cannot be empty"),
1393 AddLinkErrorKind::PublishMetadata => write!(f, "failed publishing link metadata"),
1394 AddLinkErrorKind::Other => write!(f, "error"),
1395 AddLinkErrorKind::AlreadyExists => write!(f, "object already exists"),
1396 }
1397 }
1398}
1399
1400type PublishMetadataError = Error<PublishMetadataErrorKind>;
1401
1402#[derive(Clone, Copy, Debug, PartialEq)]
1403enum PublishMetadataErrorKind {
1404 PublishMetadata,
1405 Other,
1406}
1407
1408impl Display for PublishMetadataErrorKind {
1409 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1410 match self {
1411 PublishMetadataErrorKind::PublishMetadata => write!(f, "failed to publish metadata"),
1412 PublishMetadataErrorKind::Other => write!(f, "error"),
1413 }
1414 }
1415}
1416
1417impl From<PublishMetadataError> for AddLinkError {
1418 fn from(error: PublishMetadataError) -> Self {
1419 match error.kind {
1420 PublishMetadataErrorKind::PublishMetadata => {
1421 AddLinkError::new(AddLinkErrorKind::PublishMetadata)
1422 }
1423 PublishMetadataErrorKind::Other => {
1424 AddLinkError::with_source(AddLinkErrorKind::Other, error)
1425 }
1426 }
1427 }
1428}
1429impl From<PublishMetadataError> for PutError {
1430 fn from(error: PublishMetadataError) -> Self {
1431 match error.kind {
1432 PublishMetadataErrorKind::PublishMetadata => {
1433 PutError::new(PutErrorKind::PublishMetadata)
1434 }
1435 PublishMetadataErrorKind::Other => PutError::with_source(PutErrorKind::Other, error),
1436 }
1437 }
1438}
1439
1440#[derive(Clone, Copy, Debug, PartialEq)]
1441pub enum WatchErrorKind {
1442 TimedOut,
1443 ConsumerCreate,
1444 Other,
1445}
1446
1447impl Display for WatchErrorKind {
1448 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1449 match self {
1450 Self::ConsumerCreate => write!(f, "watch consumer creation failed"),
1451 Self::Other => write!(f, "watch failed"),
1452 Self::TimedOut => write!(f, "timed out"),
1453 }
1454 }
1455}
1456
1457pub type WatchError = Error<WatchErrorKind>;
1458
1459crate::from_with_timeout!(WatchError, WatchErrorKind, ConsumerError, ConsumerErrorKind);
1460crate::from_with_timeout!(WatchError, WatchErrorKind, StreamError, StreamErrorKind);
1461
1462pub type ListError = WatchError;
1463pub type ListErrorKind = WatchErrorKind;
1464
1465#[derive(Clone, Copy, Debug, PartialEq)]
1466pub enum SealErrorKind {
1467 TimedOut,
1468 Other,
1469 Info,
1470 Update,
1471}
1472
1473impl Display for SealErrorKind {
1474 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1475 match self {
1476 Self::TimedOut => write!(f, "timed out"),
1477 Self::Other => write!(f, "seal failed"),
1478 Self::Info => write!(f, "failed getting stream info before sealing bucket"),
1479 Self::Update => write!(f, "failed sealing the bucket"),
1480 }
1481 }
1482}
1483
1484pub type SealError = Error<SealErrorKind>;
1485
1486impl From<super::context::UpdateStreamError> for SealError {
1487 fn from(err: super::context::UpdateStreamError) -> Self {
1488 match err.kind() {
1489 super::context::CreateStreamErrorKind::TimedOut => {
1490 SealError::new(SealErrorKind::TimedOut)
1491 }
1492 _ => SealError::with_source(SealErrorKind::Update, err),
1493 }
1494 }
1495}
1496
1497#[derive(Clone, Copy, Debug, PartialEq)]
1498pub enum WatcherErrorKind {
1499 ConsumerError,
1500 Other,
1501}
1502
1503impl Display for WatcherErrorKind {
1504 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1505 match self {
1506 Self::ConsumerError => write!(f, "watcher consumer error"),
1507 Self::Other => write!(f, "watcher error"),
1508 }
1509 }
1510}
1511
1512pub type WatcherError = Error<WatcherErrorKind>;
1513
1514impl From<OrderedError> for WatcherError {
1515 fn from(err: OrderedError) -> Self {
1516 WatcherError::with_source(WatcherErrorKind::ConsumerError, err)
1517 }
1518}
1519
1520pub type ListerError = WatcherError;
1521pub type ListerErrorKind = WatcherErrorKind;