1use std::collections::VecDeque;
16use std::fmt::Display;
17use std::{cmp, str::FromStr, task::Poll, time::Duration};
18
19use crate::crypto::Sha256;
20use crate::subject::Subject;
21use crate::{HeaderMap, HeaderValue};
22use base64::engine::general_purpose::{STANDARD, URL_SAFE};
23use base64::engine::Engine;
24use bytes::BytesMut;
25use futures::future::BoxFuture;
26use once_cell::sync::Lazy;
27use tokio::io::AsyncReadExt;
28
29use futures::{Stream, StreamExt};
30use regex::Regex;
31use serde::{Deserialize, Serialize};
32use tracing::{debug, trace};
33
34use super::consumer::push::{OrderedConfig, OrderedError};
35use super::consumer::{DeliverPolicy, StreamError, StreamErrorKind};
36use super::context::{PublishError, PublishErrorKind};
37use super::stream::{self, ConsumerError, ConsumerErrorKind, PurgeError, PurgeErrorKind};
38use super::{consumer::push::Ordered, stream::StorageType};
39use crate::error::Error;
40use time::{serde::rfc3339, OffsetDateTime};
41
42const DEFAULT_CHUNK_SIZE: usize = 128 * 1024;
43const NATS_ROLLUP: &str = "Nats-Rollup";
44const ROLLUP_SUBJECT: &str = "sub";
45
46static BUCKET_NAME_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\A[a-zA-Z0-9_-]+\z").unwrap());
47static OBJECT_NAME_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\A[-/_=\.a-zA-Z0-9]+\z").unwrap());
48
49pub(crate) fn is_valid_bucket_name(bucket_name: &str) -> bool {
50 BUCKET_NAME_RE.is_match(bucket_name)
51}
52
53pub(crate) fn is_valid_object_name(object_name: &str) -> bool {
54 if object_name.is_empty() || object_name.starts_with('.') || object_name.ends_with('.') {
55 return false;
56 }
57
58 OBJECT_NAME_RE.is_match(object_name)
59}
60
61pub(crate) fn encode_object_name(object_name: &str) -> String {
62 URL_SAFE.encode(object_name)
63}
64
65#[derive(Debug, Default, Clone, Serialize, Deserialize)]
67pub struct Config {
68 pub bucket: String,
70 pub description: Option<String>,
72 #[serde(default, with = "serde_nanos")]
74 pub max_age: Duration,
75 pub storage: StorageType,
77 pub num_replicas: usize,
79 pub compression: bool,
81 pub placement: Option<stream::Placement>,
83}
84
85#[derive(Clone)]
87pub struct ObjectStore {
88 pub(crate) name: String,
89 pub(crate) stream: crate::jetstream::stream::Stream,
90}
91
92impl ObjectStore {
93 pub async fn get<T: AsRef<str> + Send>(&self, object_name: T) -> Result<Object, GetError> {
117 self.get_impl(object_name).await
118 }
119
120 fn get_impl<'bucket, 'future, T>(
121 &'bucket self,
122 object_name: T,
123 ) -> BoxFuture<'future, Result<Object, GetError>>
124 where
125 T: AsRef<str> + Send + 'future,
126 'bucket: 'future,
127 {
128 Box::pin(async move {
129 let object_info = self.info(object_name).await?;
130 if let Some(ref options) = object_info.options {
131 if let Some(link) = options.link.as_ref() {
132 if let Some(link_name) = link.name.as_ref() {
133 let link_name = link_name.clone();
134 debug!("getting object via link");
135 if link.bucket == self.name {
136 return self.get_impl(link_name).await;
137 } else {
138 let bucket = self
139 .stream
140 .context
141 .get_object_store(&link.bucket)
142 .await
143 .map_err(|err| {
144 GetError::with_source(GetErrorKind::Other, err)
145 })?;
146 let object = bucket.get_impl(&link_name).await?;
147 return Ok(object);
148 }
149 } else {
150 return Err(GetError::new(GetErrorKind::BucketLink));
151 }
152 }
153 }
154
155 debug!("not a link. Getting the object");
156 Ok(Object::new(object_info, self.stream.clone()))
157 })
158 }
159
160 pub async fn delete<T: AsRef<str>>(&self, object_name: T) -> Result<(), DeleteError> {
176 let object_name = object_name.as_ref();
177 let mut object_info = self.info(object_name).await?;
178 object_info.chunks = 0;
179 object_info.size = 0;
180 object_info.deleted = true;
181
182 let data = serde_json::to_vec(&object_info).map_err(|err| {
183 DeleteError::with_source(
184 DeleteErrorKind::Other,
185 format!("failed deserializing object info: {}", err),
186 )
187 })?;
188
189 let mut headers = HeaderMap::default();
190 headers.insert(
191 NATS_ROLLUP,
192 HeaderValue::from_str(ROLLUP_SUBJECT).map_err(|err| {
193 DeleteError::with_source(
194 DeleteErrorKind::Other,
195 format!("failed parsing header: {}", err),
196 )
197 })?,
198 );
199
200 let subject = format!("$O.{}.M.{}", &self.name, encode_object_name(object_name));
201
202 self.stream
203 .context
204 .publish_with_headers(subject, headers, data.into())
205 .await?
206 .await?;
207
208 let chunk_subject = format!("$O.{}.C.{}", self.name, object_info.nuid);
209
210 self.stream.purge().filter(&chunk_subject).await?;
211
212 Ok(())
213 }
214
215 pub async fn info<T: AsRef<str>>(&self, object_name: T) -> Result<ObjectInfo, InfoError> {
231 let object_name = object_name.as_ref();
232 let object_name = encode_object_name(object_name);
233 if !is_valid_object_name(&object_name) {
234 return Err(InfoError::new(InfoErrorKind::InvalidName));
235 }
236
237 let subject = format!("$O.{}.M.{}", &self.name, &object_name);
239
240 let message = self
241 .stream
242 .get_last_raw_message_by_subject(subject.as_str())
243 .await
244 .map_err(|err| match err.kind() {
245 stream::LastRawMessageErrorKind::NoMessageFound => {
246 InfoError::new(InfoErrorKind::NotFound)
247 }
248 _ => InfoError::with_source(InfoErrorKind::Other, err),
249 })?;
250 let decoded_payload = STANDARD
251 .decode(message.payload)
252 .map_err(|err| InfoError::with_source(InfoErrorKind::Other, err))?;
253 let object_info =
254 serde_json::from_slice::<ObjectInfo>(&decoded_payload).map_err(|err| {
255 InfoError::with_source(
256 InfoErrorKind::Other,
257 format!("failed to decode info payload: {}", err),
258 )
259 })?;
260
261 Ok(object_info)
262 }
263
264 pub async fn put<T>(
282 &self,
283 meta: T,
284 data: &mut (impl tokio::io::AsyncRead + std::marker::Unpin),
285 ) -> Result<ObjectInfo, PutError>
286 where
287 ObjectMetadata: From<T>,
288 {
289 let object_meta: ObjectMetadata = meta.into();
290
291 let encoded_object_name = encode_object_name(&object_meta.name);
292 if !is_valid_object_name(&encoded_object_name) {
293 return Err(PutError::new(PutErrorKind::InvalidName));
294 }
295 let maybe_existing_object_info = match self.info(&encoded_object_name).await {
297 Ok(object_info) => Some(object_info),
298 Err(_) => None,
299 };
300
301 let object_nuid = nuid::next();
302 let chunk_subject = Subject::from(format!("$O.{}.C.{}", &self.name, &object_nuid));
303
304 let mut object_chunks = 0;
305 let mut object_size = 0;
306
307 let chunk_size = object_meta.chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE);
308 let mut buffer = BytesMut::with_capacity(chunk_size);
309 let mut sha256 = Sha256::new();
310
311 loop {
312 let n = data
313 .read_buf(&mut buffer)
314 .await
315 .map_err(|err| PutError::with_source(PutErrorKind::ReadChunks, err))?;
316
317 if n == 0 {
318 break;
319 }
320
321 let payload = buffer.split().freeze();
322 sha256.update(&payload);
323
324 object_size += payload.len();
325 object_chunks += 1;
326
327 self.stream
328 .context
329 .publish(chunk_subject.clone(), payload)
330 .await
331 .map_err(|err| {
332 PutError::with_source(
333 PutErrorKind::PublishChunks,
334 format!("failed chunk publish: {}", err),
335 )
336 })?
337 .await
338 .map_err(|err| {
339 PutError::with_source(
340 PutErrorKind::PublishChunks,
341 format!("failed getting chunk ack: {}", err),
342 )
343 })?;
344 }
345 let digest = sha256.finish();
346 let subject = format!("$O.{}.M.{}", &self.name, &encoded_object_name);
347 let object_info = ObjectInfo {
348 name: object_meta.name,
349 description: object_meta.description,
350 options: Some(ObjectOptions {
351 max_chunk_size: Some(chunk_size),
352 link: None,
353 }),
354 bucket: self.name.clone(),
355 nuid: object_nuid.to_string(),
356 chunks: object_chunks,
357 size: object_size,
358 digest: Some(format!("SHA-256={}", URL_SAFE.encode(digest))),
359 modified: Some(OffsetDateTime::now_utc()),
360 deleted: false,
361 };
362
363 let mut headers = HeaderMap::new();
364 headers.insert(
365 NATS_ROLLUP,
366 ROLLUP_SUBJECT.parse::<HeaderValue>().map_err(|err| {
367 PutError::with_source(
368 PutErrorKind::Other,
369 format!("failed parsing header: {}", err),
370 )
371 })?,
372 );
373 let data = serde_json::to_vec(&object_info).map_err(|err| {
374 PutError::with_source(
375 PutErrorKind::Other,
376 format!("failed serializing object info: {}", err),
377 )
378 })?;
379
380 self.stream
382 .context
383 .publish_with_headers(subject, headers, data.into())
384 .await
385 .map_err(|err| {
386 PutError::with_source(
387 PutErrorKind::PublishMetadata,
388 format!("failed publishing metadata: {}", err),
389 )
390 })?
391 .await
392 .map_err(|err| {
393 PutError::with_source(
394 PutErrorKind::PublishMetadata,
395 format!("failed ack from metadata publish: {}", err),
396 )
397 })?;
398
399 if let Some(existing_object_info) = maybe_existing_object_info {
401 let chunk_subject = format!("$O.{}.C.{}", &self.name, &existing_object_info.nuid);
402
403 self.stream
404 .purge()
405 .filter(&chunk_subject)
406 .await
407 .map_err(|err| PutError::with_source(PutErrorKind::PurgeOldChunks, err))?;
408 }
409
410 Ok(object_info)
411 }
412
413 pub async fn watch(&self) -> Result<Watch, WatchError> {
433 self.watch_with_deliver_policy(DeliverPolicy::New).await
434 }
435
436 pub async fn watch_with_history(&self) -> Result<Watch, WatchError> {
439 self.watch_with_deliver_policy(DeliverPolicy::LastPerSubject)
440 .await
441 }
442
443 async fn watch_with_deliver_policy(
444 &self,
445 deliver_policy: DeliverPolicy,
446 ) -> Result<Watch, WatchError> {
447 let subject = format!("$O.{}.M.>", self.name);
448 let ordered = self
449 .stream
450 .create_consumer(crate::jetstream::consumer::push::OrderedConfig {
451 deliver_policy,
452 deliver_subject: self.stream.context.client.new_inbox(),
453 description: Some("object store watcher".to_string()),
454 filter_subject: subject,
455 ..Default::default()
456 })
457 .await?;
458 Ok(Watch {
459 subscription: ordered.messages().await?,
460 })
461 }
462
463 pub async fn list(&self) -> Result<List, ListError> {
483 trace!("starting Object List");
484 let subject = format!("$O.{}.M.>", self.name);
485 let ordered = self
486 .stream
487 .create_consumer(crate::jetstream::consumer::push::OrderedConfig {
488 deliver_policy: super::consumer::DeliverPolicy::All,
489 deliver_subject: self.stream.context.client.new_inbox(),
490 description: Some("object store list".to_string()),
491 filter_subject: subject,
492 ..Default::default()
493 })
494 .await?;
495 Ok(List {
496 done: ordered.info.num_pending == 0,
497 subscription: Some(ordered.messages().await?),
498 })
499 }
500
501 pub async fn seal(&mut self) -> Result<(), SealError> {
518 let mut stream_config = self
519 .stream
520 .info()
521 .await
522 .map_err(|err| SealError::with_source(SealErrorKind::Info, err))?
523 .to_owned();
524 stream_config.config.sealed = true;
525
526 self.stream
527 .context
528 .update_stream(&stream_config.config)
529 .await?;
530 Ok(())
531 }
532
533 pub async fn update_metadata<A: AsRef<str>>(
558 &self,
559 object: A,
560 metadata: UpdateMetadata,
561 ) -> Result<ObjectInfo, UpdateMetadataError> {
562 let mut info = self.info(object.as_ref()).await?;
563
564 if metadata.name != info.name {
567 tracing::info!("new metadata name is different than then old one");
568 if !is_valid_object_name(&metadata.name) {
569 return Err(UpdateMetadataError::new(
570 UpdateMetadataErrorKind::InvalidName,
571 ));
572 }
573 match self.info(&metadata.name).await {
574 Ok(_) => {
575 return Err(UpdateMetadataError::new(
576 UpdateMetadataErrorKind::NameAlreadyInUse,
577 ))
578 }
579 Err(err) => match err.kind() {
580 InfoErrorKind::NotFound => {
581 tracing::info!("purging old metadata: {}", info.name);
582 self.stream
583 .purge()
584 .filter(format!(
585 "$O.{}.M.{}",
586 self.name,
587 encode_object_name(&info.name)
588 ))
589 .await
590 .map_err(|err| {
591 UpdateMetadataError::with_source(
592 UpdateMetadataErrorKind::Purge,
593 err,
594 )
595 })?;
596 }
597 _ => {
598 return Err(UpdateMetadataError::with_source(
599 UpdateMetadataErrorKind::Other,
600 err,
601 ))
602 }
603 },
604 }
605 }
606
607 info.name = metadata.name;
608 info.description = metadata.description;
609
610 let name = encode_object_name(&info.name);
611 let subject = format!("$O.{}.M.{}", &self.name, &name);
612
613 let mut headers = HeaderMap::new();
614 headers.insert(
615 NATS_ROLLUP,
616 ROLLUP_SUBJECT.parse::<HeaderValue>().map_err(|err| {
617 UpdateMetadataError::with_source(
618 UpdateMetadataErrorKind::Other,
619 format!("failed parsing header: {}", err),
620 )
621 })?,
622 );
623 let data = serde_json::to_vec(&info).map_err(|err| {
624 UpdateMetadataError::with_source(
625 UpdateMetadataErrorKind::Other,
626 format!("failed serializing object info: {}", err),
627 )
628 })?;
629
630 self.stream
632 .context
633 .publish_with_headers(subject, headers, data.into())
634 .await
635 .map_err(|err| {
636 UpdateMetadataError::with_source(
637 UpdateMetadataErrorKind::PublishMetadata,
638 format!("failed publishing metadata: {}", err),
639 )
640 })?
641 .await
642 .map_err(|err| {
643 UpdateMetadataError::with_source(
644 UpdateMetadataErrorKind::PublishMetadata,
645 format!("failed ack from metadata publish: {}", err),
646 )
647 })?;
648
649 Ok(info)
650 }
651
652 pub async fn add_link<'a, T, O>(&self, name: T, object: O) -> Result<ObjectInfo, AddLinkError>
672 where
673 T: ToString,
674 O: AsObjectInfo,
675 {
676 let object = object.as_info();
677 let name = name.to_string();
678 if name.is_empty() {
679 return Err(AddLinkError::new(AddLinkErrorKind::EmptyName));
680 }
681 if object.name.is_empty() {
682 return Err(AddLinkError::new(AddLinkErrorKind::ObjectRequired));
683 }
684 if object.deleted {
685 return Err(AddLinkError::new(AddLinkErrorKind::Deleted));
686 }
687 if let Some(ref options) = object.options {
688 if options.link.is_some() {
689 return Err(AddLinkError::new(AddLinkErrorKind::LinkToLink));
690 }
691 }
692 match self.info(&name).await {
693 Ok(info) => {
694 if let Some(options) = info.options {
695 if options.link.is_none() {
696 return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists));
697 }
698 } else {
699 return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists));
700 }
701 }
702 Err(err) if err.kind() != InfoErrorKind::NotFound => {
703 return Err(AddLinkError::with_source(AddLinkErrorKind::Other, err))
704 }
705 _ => (),
706 }
707
708 let info = ObjectInfo {
709 name,
710 description: None,
711 options: Some(ObjectOptions {
712 link: Some(ObjectLink {
713 name: Some(object.name.clone()),
714 bucket: object.bucket.clone(),
715 }),
716 max_chunk_size: None,
717 }),
718 bucket: self.name.clone(),
719 nuid: nuid::next().to_string(),
720 size: 0,
721 chunks: 0,
722 modified: Some(OffsetDateTime::now_utc()),
723 digest: None,
724 deleted: false,
725 };
726 publish_meta(self, &info).await?;
727 Ok(info)
728 }
729
730 pub async fn add_bucket_link<T: ToString, U: ToString>(
750 &self,
751 name: T,
752 bucket: U,
753 ) -> Result<ObjectInfo, AddLinkError> {
754 let name = name.to_string();
755 let bucket = bucket.to_string();
756 if name.is_empty() {
757 return Err(AddLinkError::new(AddLinkErrorKind::EmptyName));
758 }
759
760 match self.info(&name).await {
761 Ok(info) => {
762 if let Some(options) = info.options {
763 if options.link.is_none() {
764 return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists));
765 }
766 }
767 }
768 Err(err) if err.kind() != InfoErrorKind::NotFound => {
769 return Err(AddLinkError::with_source(AddLinkErrorKind::Other, err))
770 }
771 _ => (),
772 }
773
774 let info = ObjectInfo {
775 name: name.clone(),
776 description: None,
777 options: Some(ObjectOptions {
778 link: Some(ObjectLink { name: None, bucket }),
779 max_chunk_size: None,
780 }),
781 bucket: self.name.clone(),
782 nuid: nuid::next().to_string(),
783 size: 0,
784 chunks: 0,
785 modified: Some(OffsetDateTime::now_utc()),
786 digest: None,
787 deleted: false,
788 };
789 publish_meta(self, &info).await?;
790 Ok(info)
791 }
792}
793
794async fn publish_meta(store: &ObjectStore, info: &ObjectInfo) -> Result<(), PublishMetadataError> {
795 let encoded_object_name = encode_object_name(&info.name);
796 let subject = format!("$O.{}.M.{}", &store.name, &encoded_object_name);
797
798 let mut headers = HeaderMap::new();
799 headers.insert(
800 NATS_ROLLUP,
801 ROLLUP_SUBJECT.parse::<HeaderValue>().map_err(|err| {
802 PublishMetadataError::with_source(
803 PublishMetadataErrorKind::Other,
804 format!("failed parsing header: {}", err),
805 )
806 })?,
807 );
808 let data = serde_json::to_vec(&info).map_err(|err| {
809 PublishMetadataError::with_source(
810 PublishMetadataErrorKind::Other,
811 format!("failed serializing object info: {}", err),
812 )
813 })?;
814
815 store
816 .stream
817 .context
818 .publish_with_headers(subject, headers, data.into())
819 .await
820 .map_err(|err| {
821 PublishMetadataError::with_source(
822 PublishMetadataErrorKind::PublishMetadata,
823 format!("failed publishing metadata: {}", err),
824 )
825 })?
826 .await
827 .map_err(|err| {
828 PublishMetadataError::with_source(
829 PublishMetadataErrorKind::PublishMetadata,
830 format!("failed ack from metadata publish: {}", err),
831 )
832 })?;
833 Ok(())
834}
835
836pub struct Watch {
837 subscription: crate::jetstream::consumer::push::Ordered,
838}
839
840impl Stream for Watch {
841 type Item = Result<ObjectInfo, WatcherError>;
842
843 fn poll_next(
844 mut self: std::pin::Pin<&mut Self>,
845 cx: &mut std::task::Context<'_>,
846 ) -> Poll<Option<Self::Item>> {
847 match self.subscription.poll_next_unpin(cx) {
848 Poll::Ready(message) => match message {
849 Some(message) => Poll::Ready(
850 serde_json::from_slice::<ObjectInfo>(&message?.payload)
851 .map_err(|err| {
852 WatcherError::with_source(
853 WatcherErrorKind::Other,
854 format!("failed to deserialize object info: {}", err),
855 )
856 })
857 .map_or_else(|err| Some(Err(err)), |result| Some(Ok(result))),
858 ),
859 None => Poll::Ready(None),
860 },
861 Poll::Pending => Poll::Pending,
862 }
863 }
864}
865
866pub struct List {
867 subscription: Option<crate::jetstream::consumer::push::Ordered>,
868 done: bool,
869}
870
871impl Stream for List {
872 type Item = Result<ObjectInfo, ListerError>;
873
874 fn poll_next(
875 mut self: std::pin::Pin<&mut Self>,
876 cx: &mut std::task::Context<'_>,
877 ) -> Poll<Option<Self::Item>> {
878 loop {
879 if self.done {
880 debug!("Object Store list done");
881 self.subscription = None;
882 return Poll::Ready(None);
883 }
884
885 if let Some(subscription) = self.subscription.as_mut() {
886 match subscription.poll_next_unpin(cx) {
887 Poll::Ready(message) => match message {
888 None => return Poll::Ready(None),
889 Some(message) => {
890 let message = message?;
891 let info = message.info().map_err(|err| {
892 ListerError::with_source(ListerErrorKind::Other, err)
893 })?;
894 trace!("num pending: {}", info.pending);
895 if info.pending == 0 {
896 self.done = true;
897 }
898 let response: ObjectInfo = serde_json::from_slice(&message.payload)
899 .map_err(|err| {
900 ListerError::with_source(
901 ListerErrorKind::Other,
902 format!("failed deserializing object info: {}", err),
903 )
904 })?;
905 if response.deleted {
906 continue;
907 }
908 return Poll::Ready(Some(Ok(response)));
909 }
910 },
911 Poll::Pending => return Poll::Pending,
912 }
913 } else {
914 return Poll::Ready(None);
915 }
916 }
917 }
918}
919
920pub struct Object {
922 pub info: ObjectInfo,
923 remaining_bytes: VecDeque<u8>,
924 has_pending_messages: bool,
925 digest: Option<Sha256>,
926 subscription: Option<crate::jetstream::consumer::push::Ordered>,
927 subscription_future: Option<BoxFuture<'static, Result<Ordered, StreamError>>>,
928 stream: crate::jetstream::stream::Stream,
929}
930
931impl Object {
932 pub(crate) fn new(info: ObjectInfo, stream: stream::Stream) -> Self {
933 Object {
934 subscription: None,
935 info,
936 remaining_bytes: VecDeque::new(),
937 has_pending_messages: true,
938 digest: Some(Sha256::new()),
939 subscription_future: None,
940 stream,
941 }
942 }
943
944 pub fn info(&self) -> &ObjectInfo {
946 &self.info
947 }
948}
949
950impl tokio::io::AsyncRead for Object {
951 fn poll_read(
952 mut self: std::pin::Pin<&mut Self>,
953 cx: &mut std::task::Context<'_>,
954 buf: &mut tokio::io::ReadBuf<'_>,
955 ) -> std::task::Poll<std::io::Result<()>> {
956 let (buf1, _buf2) = self.remaining_bytes.as_slices();
957 if !buf1.is_empty() {
958 let len = cmp::min(buf.remaining(), buf1.len());
959 buf.put_slice(&buf1[..len]);
960 self.remaining_bytes.drain(..len);
961 return Poll::Ready(Ok(()));
962 }
963
964 if self.has_pending_messages {
965 if self.subscription.is_none() {
966 let future = match self.subscription_future.as_mut() {
967 Some(future) => future,
968 None => {
969 let stream = self.stream.clone();
970 let bucket = self.info.bucket.clone();
971 let nuid = self.info.nuid.clone();
972 self.subscription_future.insert(Box::pin(async move {
973 stream
974 .create_consumer(OrderedConfig {
975 deliver_subject: stream.context.client.new_inbox(),
976 filter_subject: format!("$O.{}.C.{}", bucket, nuid),
977 ..Default::default()
978 })
979 .await
980 .unwrap()
981 .messages()
982 .await
983 }))
984 }
985 };
986 match future.as_mut().poll(cx) {
987 Poll::Ready(subscription) => {
988 self.subscription = Some(subscription.unwrap());
989 }
990 Poll::Pending => (),
991 }
992 }
993 if let Some(subscription) = self.subscription.as_mut() {
994 match subscription.poll_next_unpin(cx) {
995 Poll::Ready(message) => match message {
996 Some(message) => {
997 let message = message.map_err(|err| {
998 std::io::Error::new(
999 std::io::ErrorKind::Other,
1000 format!("error from JetStream subscription: {err}"),
1001 )
1002 })?;
1003 let len = cmp::min(buf.remaining(), message.payload.len());
1004 buf.put_slice(&message.payload[..len]);
1005 if let Some(context) = &mut self.digest {
1006 context.update(&message.payload);
1007 }
1008 self.remaining_bytes.extend(&message.payload[len..]);
1009
1010 let info = message.info().map_err(|err| {
1011 std::io::Error::new(
1012 std::io::ErrorKind::Other,
1013 format!("error from JetStream subscription: {err}"),
1014 )
1015 })?;
1016 if info.pending == 0 {
1017 let digest = self.digest.take().map(Sha256::finish);
1018 if let Some(digest) = digest {
1019 if self
1020 .info
1021 .digest
1022 .as_ref()
1023 .map(|digest_self| {
1024 format!("SHA-256={}", URL_SAFE.encode(digest))
1025 != *digest_self
1026 })
1027 .unwrap_or(false)
1028 {
1029 return Poll::Ready(Err(std::io::Error::new(
1030 std::io::ErrorKind::InvalidData,
1031 "wrong digest",
1032 )));
1033 }
1034 } else {
1035 return Poll::Ready(Err(std::io::Error::new(
1036 std::io::ErrorKind::InvalidData,
1037 "digest should be Some",
1038 )));
1039 }
1040 self.has_pending_messages = false;
1041 self.subscription = None;
1042 }
1043 Poll::Ready(Ok(()))
1044 }
1045 None => Poll::Ready(Err(std::io::Error::new(
1046 std::io::ErrorKind::Other,
1047 "subscription ended before reading whole object",
1048 ))),
1049 },
1050 Poll::Pending => Poll::Pending,
1051 }
1052 } else {
1053 Poll::Pending
1054 }
1055 } else {
1056 Poll::Ready(Ok(()))
1057 }
1058 }
1059}
1060
1061#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
1062pub struct ObjectOptions {
1063 pub link: Option<ObjectLink>,
1064 pub max_chunk_size: Option<usize>,
1065}
1066
1067#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
1069pub struct ObjectInfo {
1070 pub name: String,
1072 #[serde(default)]
1074 pub description: Option<String>,
1075 #[serde(default)]
1077 pub options: Option<ObjectOptions>,
1078 pub bucket: String,
1080 #[serde(default)]
1082 pub nuid: String,
1083 #[serde(default)]
1085 pub size: usize,
1086 #[serde(default)]
1088 pub chunks: usize,
1089 #[serde(default, with = "rfc3339::option")]
1091 #[serde(rename = "mtime")]
1092 pub modified: Option<time::OffsetDateTime>,
1093 #[serde(default, skip_serializing_if = "Option::is_none")]
1095 pub digest: Option<String>,
1096 #[serde(default, skip_serializing_if = "is_default")]
1098 pub deleted: bool,
1099}
1100
1101fn is_default<T: Default + Eq>(t: &T) -> bool {
1102 t == &T::default()
1103}
1104#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
1106pub struct ObjectLink {
1107 pub name: Option<String>,
1109 pub bucket: String,
1111}
1112
1113#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
1114pub struct UpdateMetadata {
1115 pub name: String,
1117 pub description: Option<String>,
1119}
1120
1121#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
1123pub struct ObjectMetadata {
1124 pub name: String,
1126 pub description: Option<String>,
1128 pub chunk_size: Option<usize>,
1130}
1131
1132impl From<&str> for ObjectMetadata {
1133 fn from(s: &str) -> ObjectMetadata {
1134 ObjectMetadata {
1135 name: s.to_string(),
1136 ..Default::default()
1137 }
1138 }
1139}
1140
1141pub trait AsObjectInfo {
1142 fn as_info(&self) -> &ObjectInfo;
1143}
1144
1145impl AsObjectInfo for &Object {
1146 fn as_info(&self) -> &ObjectInfo {
1147 &self.info
1148 }
1149}
1150impl AsObjectInfo for &ObjectInfo {
1151 fn as_info(&self) -> &ObjectInfo {
1152 self
1153 }
1154}
1155
1156impl From<ObjectInfo> for ObjectMetadata {
1157 fn from(info: ObjectInfo) -> Self {
1158 ObjectMetadata {
1159 name: info.name,
1160 description: info.description,
1161 chunk_size: None,
1162 }
1163 }
1164}
1165
1166#[derive(Debug, PartialEq, Clone)]
1167pub enum UpdateMetadataErrorKind {
1168 InvalidName,
1169 NotFound,
1170 TimedOut,
1171 Other,
1172 PublishMetadata,
1173 NameAlreadyInUse,
1174 Purge,
1175}
1176
1177impl Display for UpdateMetadataErrorKind {
1178 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1179 match self {
1180 Self::InvalidName => write!(f, "invalid object name"),
1181 Self::NotFound => write!(f, "object not found"),
1182 Self::TimedOut => write!(f, "timed out"),
1183 Self::Other => write!(f, "error"),
1184 Self::PublishMetadata => {
1185 write!(f, "failed publishing metadata")
1186 }
1187 Self::NameAlreadyInUse => {
1188 write!(f, "object with updated name already exists")
1189 }
1190 Self::Purge => write!(f, "failed purging old name metadata"),
1191 }
1192 }
1193}
1194
1195impl From<InfoError> for UpdateMetadataError {
1196 fn from(error: InfoError) -> Self {
1197 match error.kind() {
1198 InfoErrorKind::InvalidName => {
1199 UpdateMetadataError::new(UpdateMetadataErrorKind::InvalidName)
1200 }
1201 InfoErrorKind::NotFound => UpdateMetadataError::new(UpdateMetadataErrorKind::NotFound),
1202 InfoErrorKind::Other => {
1203 UpdateMetadataError::with_source(UpdateMetadataErrorKind::Other, error)
1204 }
1205 InfoErrorKind::TimedOut => UpdateMetadataError::new(UpdateMetadataErrorKind::TimedOut),
1206 }
1207 }
1208}
1209
1210pub type UpdateMetadataError = Error<UpdateMetadataErrorKind>;
1211
1212#[derive(Clone, Copy, Debug, PartialEq)]
1213pub enum InfoErrorKind {
1214 InvalidName,
1215 NotFound,
1216 Other,
1217 TimedOut,
1218}
1219
1220impl Display for InfoErrorKind {
1221 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1222 match self {
1223 Self::InvalidName => write!(f, "invalid object name"),
1224 Self::Other => write!(f, "getting info failed"),
1225 Self::NotFound => write!(f, "not found"),
1226 Self::TimedOut => write!(f, "timed out"),
1227 }
1228 }
1229}
1230
1231pub type InfoError = Error<InfoErrorKind>;
1232
1233#[derive(Clone, Copy, Debug, PartialEq)]
1234pub enum GetErrorKind {
1235 InvalidName,
1236 ConsumerCreate,
1237 NotFound,
1238 BucketLink,
1239 Other,
1240 TimedOut,
1241}
1242
1243impl Display for GetErrorKind {
1244 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1245 match self {
1246 Self::ConsumerCreate => write!(f, "failed creating consumer for fetching object"),
1247 Self::Other => write!(f, "failed getting object"),
1248 Self::NotFound => write!(f, "object not found"),
1249 Self::TimedOut => write!(f, "timed out"),
1250 Self::InvalidName => write!(f, "invalid object name"),
1251 Self::BucketLink => write!(f, "object is a link to a bucket"),
1252 }
1253 }
1254}
1255
1256pub type GetError = Error<GetErrorKind>;
1257
1258crate::from_with_timeout!(GetError, GetErrorKind, ConsumerError, ConsumerErrorKind);
1259crate::from_with_timeout!(GetError, GetErrorKind, StreamError, StreamErrorKind);
1260
1261impl From<InfoError> for GetError {
1262 fn from(err: InfoError) -> Self {
1263 match err.kind() {
1264 InfoErrorKind::InvalidName => GetError::new(GetErrorKind::InvalidName),
1265 InfoErrorKind::NotFound => GetError::new(GetErrorKind::NotFound),
1266 InfoErrorKind::Other => GetError::with_source(GetErrorKind::Other, err),
1267 InfoErrorKind::TimedOut => GetError::new(GetErrorKind::TimedOut),
1268 }
1269 }
1270}
1271
1272#[derive(Clone, Copy, Debug, PartialEq)]
1273pub enum DeleteErrorKind {
1274 TimedOut,
1275 NotFound,
1276 Metadata,
1277 InvalidName,
1278 Chunks,
1279 Other,
1280}
1281
1282impl Display for DeleteErrorKind {
1283 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1284 match self {
1285 Self::TimedOut => write!(f, "timed out"),
1286 Self::Metadata => write!(f, "failed rolling up metadata"),
1287 Self::Chunks => write!(f, "failed purging chunks"),
1288 Self::Other => write!(f, "delete failed"),
1289 Self::NotFound => write!(f, "object not found"),
1290 Self::InvalidName => write!(f, "invalid object name"),
1291 }
1292 }
1293}
1294
1295pub type DeleteError = Error<DeleteErrorKind>;
1296
1297impl From<InfoError> for DeleteError {
1298 fn from(err: InfoError) -> Self {
1299 match err.kind() {
1300 InfoErrorKind::InvalidName => DeleteError::new(DeleteErrorKind::InvalidName),
1301 InfoErrorKind::NotFound => DeleteError::new(DeleteErrorKind::NotFound),
1302 InfoErrorKind::Other => DeleteError::with_source(DeleteErrorKind::Other, err),
1303 InfoErrorKind::TimedOut => DeleteError::new(DeleteErrorKind::TimedOut),
1304 }
1305 }
1306}
1307
1308crate::from_with_timeout!(DeleteError, DeleteErrorKind, PublishError, PublishErrorKind);
1309crate::from_with_timeout!(DeleteError, DeleteErrorKind, PurgeError, PurgeErrorKind);
1310
1311#[derive(Clone, Copy, Debug, PartialEq)]
1312pub enum PutErrorKind {
1313 InvalidName,
1314 ReadChunks,
1315 PublishChunks,
1316 PublishMetadata,
1317 PurgeOldChunks,
1318 TimedOut,
1319 Other,
1320}
1321
1322impl Display for PutErrorKind {
1323 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1324 match self {
1325 Self::PublishChunks => write!(f, "failed publishing object chunks"),
1326 Self::PublishMetadata => write!(f, "failed publishing metadata"),
1327 Self::PurgeOldChunks => write!(f, "failed purging old chunks"),
1328 Self::TimedOut => write!(f, "timed out"),
1329 Self::Other => write!(f, "error"),
1330 Self::InvalidName => write!(f, "invalid object name"),
1331 Self::ReadChunks => write!(f, "error while reading the buffer"),
1332 }
1333 }
1334}
1335
1336pub type PutError = Error<PutErrorKind>;
1337
1338pub type AddLinkError = Error<AddLinkErrorKind>;
1339
1340#[derive(Clone, Copy, Debug, PartialEq)]
1341pub enum AddLinkErrorKind {
1342 EmptyName,
1343 ObjectRequired,
1344 Deleted,
1345 LinkToLink,
1346 PublishMetadata,
1347 AlreadyExists,
1348 Other,
1349}
1350
1351impl Display for AddLinkErrorKind {
1352 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1353 match self {
1354 AddLinkErrorKind::ObjectRequired => write!(f, "cannot link to empty Object"),
1355 AddLinkErrorKind::Deleted => write!(f, "cannot link a deleted Object"),
1356 AddLinkErrorKind::LinkToLink => write!(f, "cannot link to another link"),
1357 AddLinkErrorKind::EmptyName => write!(f, "link name cannot be empty"),
1358 AddLinkErrorKind::PublishMetadata => write!(f, "failed publishing link metadata"),
1359 AddLinkErrorKind::Other => write!(f, "error"),
1360 AddLinkErrorKind::AlreadyExists => write!(f, "object already exists"),
1361 }
1362 }
1363}
1364
1365type PublishMetadataError = Error<PublishMetadataErrorKind>;
1366
1367#[derive(Clone, Copy, Debug, PartialEq)]
1368enum PublishMetadataErrorKind {
1369 PublishMetadata,
1370 Other,
1371}
1372
1373impl Display for PublishMetadataErrorKind {
1374 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1375 match self {
1376 PublishMetadataErrorKind::PublishMetadata => write!(f, "failed to publish metadata"),
1377 PublishMetadataErrorKind::Other => write!(f, "error"),
1378 }
1379 }
1380}
1381
1382impl From<PublishMetadataError> for AddLinkError {
1383 fn from(error: PublishMetadataError) -> Self {
1384 match error.kind {
1385 PublishMetadataErrorKind::PublishMetadata => {
1386 AddLinkError::new(AddLinkErrorKind::PublishMetadata)
1387 }
1388 PublishMetadataErrorKind::Other => {
1389 AddLinkError::with_source(AddLinkErrorKind::Other, error)
1390 }
1391 }
1392 }
1393}
1394impl From<PublishMetadataError> for PutError {
1395 fn from(error: PublishMetadataError) -> Self {
1396 match error.kind {
1397 PublishMetadataErrorKind::PublishMetadata => {
1398 PutError::new(PutErrorKind::PublishMetadata)
1399 }
1400 PublishMetadataErrorKind::Other => PutError::with_source(PutErrorKind::Other, error),
1401 }
1402 }
1403}
1404
1405#[derive(Clone, Copy, Debug, PartialEq)]
1406pub enum WatchErrorKind {
1407 TimedOut,
1408 ConsumerCreate,
1409 Other,
1410}
1411
1412impl Display for WatchErrorKind {
1413 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1414 match self {
1415 Self::ConsumerCreate => write!(f, "watch consumer creation failed"),
1416 Self::Other => write!(f, "watch failed"),
1417 Self::TimedOut => write!(f, "timed out"),
1418 }
1419 }
1420}
1421
1422pub type WatchError = Error<WatchErrorKind>;
1423
1424crate::from_with_timeout!(WatchError, WatchErrorKind, ConsumerError, ConsumerErrorKind);
1425crate::from_with_timeout!(WatchError, WatchErrorKind, StreamError, StreamErrorKind);
1426
1427pub type ListError = WatchError;
1428pub type ListErrorKind = WatchErrorKind;
1429
1430#[derive(Clone, Copy, Debug, PartialEq)]
1431pub enum SealErrorKind {
1432 TimedOut,
1433 Other,
1434 Info,
1435 Update,
1436}
1437
1438impl Display for SealErrorKind {
1439 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1440 match self {
1441 Self::TimedOut => write!(f, "timed out"),
1442 Self::Other => write!(f, "seal failed"),
1443 Self::Info => write!(f, "failed getting stream info before sealing bucket"),
1444 Self::Update => write!(f, "failed sealing the bucket"),
1445 }
1446 }
1447}
1448
1449pub type SealError = Error<SealErrorKind>;
1450
1451impl From<super::context::UpdateStreamError> for SealError {
1452 fn from(err: super::context::UpdateStreamError) -> Self {
1453 match err.kind() {
1454 super::context::CreateStreamErrorKind::TimedOut => {
1455 SealError::new(SealErrorKind::TimedOut)
1456 }
1457 _ => SealError::with_source(SealErrorKind::Update, err),
1458 }
1459 }
1460}
1461
1462#[derive(Clone, Copy, Debug, PartialEq)]
1463pub enum WatcherErrorKind {
1464 ConsumerError,
1465 Other,
1466}
1467
1468impl Display for WatcherErrorKind {
1469 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1470 match self {
1471 Self::ConsumerError => write!(f, "watcher consumer error"),
1472 Self::Other => write!(f, "watcher error"),
1473 }
1474 }
1475}
1476
1477pub type WatcherError = Error<WatcherErrorKind>;
1478
1479impl From<OrderedError> for WatcherError {
1480 fn from(err: OrderedError) -> Self {
1481 WatcherError::with_source(WatcherErrorKind::ConsumerError, err)
1482 }
1483}
1484
1485pub type ListerError = WatcherError;
1486pub type ListerErrorKind = WatcherErrorKind;