1use std::collections::{HashMap, VecDeque};
16use std::fmt::Display;
17use std::{cmp, str::FromStr, task::Poll, time::Duration};
18
19use crate::crypto::Sha256;
20use crate::subject::Subject;
21use crate::{HeaderMap, HeaderValue};
22use base64::engine::general_purpose::URL_SAFE;
23use base64::engine::Engine;
24use bytes::BytesMut;
25use futures::future::BoxFuture;
26use once_cell::sync::Lazy;
27use tokio::io::AsyncReadExt;
28
29use futures::{Stream, StreamExt};
30use regex::Regex;
31use serde::{Deserialize, Serialize};
32use tracing::{debug, trace};
33
34use super::consumer::push::{OrderedConfig, OrderedError};
35use super::consumer::{DeliverPolicy, StreamError, StreamErrorKind};
36use super::context::{PublishError, PublishErrorKind};
37use super::stream::{self, ConsumerError, ConsumerErrorKind, PurgeError, PurgeErrorKind};
38use super::{consumer::push::Ordered, stream::StorageType};
39use crate::error::Error;
40use time::{serde::rfc3339, OffsetDateTime};
41
42const DEFAULT_CHUNK_SIZE: usize = 128 * 1024;
43const NATS_ROLLUP: &str = "Nats-Rollup";
44const ROLLUP_SUBJECT: &str = "sub";
45
46static BUCKET_NAME_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\A[a-zA-Z0-9_-]+\z").unwrap());
47static OBJECT_NAME_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\A[-/_=\.a-zA-Z0-9]+\z").unwrap());
48
49pub(crate) fn is_valid_bucket_name(bucket_name: &str) -> bool {
50 BUCKET_NAME_RE.is_match(bucket_name)
51}
52
53pub(crate) fn is_valid_object_name(object_name: &str) -> bool {
54 if object_name.is_empty() || object_name.starts_with('.') || object_name.ends_with('.') {
55 return false;
56 }
57
58 OBJECT_NAME_RE.is_match(object_name)
59}
60
61pub(crate) fn encode_object_name(object_name: &str) -> String {
62 URL_SAFE.encode(object_name)
63}
64
65#[derive(Debug, Default, Clone, Serialize, Deserialize)]
67pub struct Config {
68 pub bucket: String,
70 pub description: Option<String>,
72 #[serde(default, with = "serde_nanos")]
74 pub max_age: Duration,
75 pub max_bytes: i64,
77 pub storage: StorageType,
79 pub num_replicas: usize,
81 pub compression: bool,
83 pub placement: Option<stream::Placement>,
85}
86
87#[derive(Clone)]
89pub struct ObjectStore {
90 pub(crate) name: String,
91 pub(crate) stream: crate::jetstream::stream::Stream,
92}
93
94impl ObjectStore {
95 pub async fn get<T: AsRef<str> + Send>(&self, object_name: T) -> Result<Object, GetError> {
119 self.get_impl(object_name).await
120 }
121
122 fn get_impl<'bucket, 'future, T>(
123 &'bucket self,
124 object_name: T,
125 ) -> BoxFuture<'future, Result<Object, GetError>>
126 where
127 T: AsRef<str> + Send + 'future,
128 'bucket: 'future,
129 {
130 Box::pin(async move {
131 let object_info = self.info(object_name).await?;
132 if let Some(ref options) = object_info.options {
133 if let Some(link) = options.link.as_ref() {
134 if let Some(link_name) = link.name.as_ref() {
135 let link_name = link_name.clone();
136 debug!("getting object via link");
137 if link.bucket == self.name {
138 return self.get_impl(link_name).await;
139 } else {
140 let bucket = self
141 .stream
142 .context
143 .get_object_store(&link.bucket)
144 .await
145 .map_err(|err| {
146 GetError::with_source(GetErrorKind::Other, err)
147 })?;
148 let object = bucket.get_impl(&link_name).await?;
149 return Ok(object);
150 }
151 } else {
152 return Err(GetError::new(GetErrorKind::BucketLink));
153 }
154 }
155 }
156
157 debug!("not a link. Getting the object");
158 Ok(Object::new(object_info, self.stream.clone()))
159 })
160 }
161
162 pub async fn delete<T: AsRef<str>>(&self, object_name: T) -> Result<(), DeleteError> {
178 let object_name = object_name.as_ref();
179 let mut object_info = self.info(object_name).await?;
180 object_info.chunks = 0;
181 object_info.size = 0;
182 object_info.deleted = true;
183
184 let data = serde_json::to_vec(&object_info).map_err(|err| {
185 DeleteError::with_source(
186 DeleteErrorKind::Other,
187 format!("failed deserializing object info: {}", err),
188 )
189 })?;
190
191 let mut headers = HeaderMap::default();
192 headers.insert(
193 NATS_ROLLUP,
194 HeaderValue::from_str(ROLLUP_SUBJECT).map_err(|err| {
195 DeleteError::with_source(
196 DeleteErrorKind::Other,
197 format!("failed parsing header: {}", err),
198 )
199 })?,
200 );
201
202 let subject = format!("$O.{}.M.{}", &self.name, encode_object_name(object_name));
203
204 self.stream
205 .context
206 .publish_with_headers(subject, headers, data.into())
207 .await?
208 .await?;
209
210 let chunk_subject = format!("$O.{}.C.{}", self.name, object_info.nuid);
211
212 self.stream.purge().filter(&chunk_subject).await?;
213
214 Ok(())
215 }
216
217 pub async fn info<T: AsRef<str>>(&self, object_name: T) -> Result<ObjectInfo, InfoError> {
233 let object_name = object_name.as_ref();
234 let object_name = encode_object_name(object_name);
235 if !is_valid_object_name(&object_name) {
236 return Err(InfoError::new(InfoErrorKind::InvalidName));
237 }
238
239 let subject = format!("$O.{}.M.{}", &self.name, &object_name);
241
242 let message = self
244 .stream
245 .get_last_raw_message_by_subject(subject.as_str())
246 .await
247 .map_err(|err| match err.kind() {
248 stream::LastRawMessageErrorKind::NoMessageFound => {
249 InfoError::new(InfoErrorKind::NotFound)
250 }
251 _ => InfoError::with_source(InfoErrorKind::Other, err),
252 })?;
253 let object_info =
254 serde_json::from_slice::<ObjectInfo>(&message.payload).map_err(|err| {
255 InfoError::with_source(
256 InfoErrorKind::Other,
257 format!("failed to decode info payload: {}", err),
258 )
259 })?;
260
261 Ok(object_info)
262 }
263
264 pub async fn put<T>(
282 &self,
283 meta: T,
284 data: &mut (impl tokio::io::AsyncRead + std::marker::Unpin),
285 ) -> Result<ObjectInfo, PutError>
286 where
287 ObjectMetadata: From<T>,
288 {
289 let object_meta: ObjectMetadata = meta.into();
290
291 let maybe_existing_object_info = (self.info(&object_meta.name).await).ok();
293
294 let object_nuid = nuid::next();
295 let chunk_subject = Subject::from(format!("$O.{}.C.{}", &self.name, &object_nuid));
296
297 let mut object_chunks = 0;
298 let mut object_size = 0;
299
300 let chunk_size = object_meta.chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE);
301 let mut buffer = BytesMut::with_capacity(chunk_size);
302 let mut sha256 = Sha256::new();
303
304 loop {
305 let n = data
306 .read_buf(&mut buffer)
307 .await
308 .map_err(|err| PutError::with_source(PutErrorKind::ReadChunks, err))?;
309
310 if n == 0 {
311 break;
312 }
313
314 let payload = buffer.split().freeze();
315 sha256.update(&payload);
316
317 object_size += payload.len();
318 object_chunks += 1;
319
320 self.stream
321 .context
322 .publish(chunk_subject.clone(), payload)
323 .await
324 .map_err(|err| {
325 PutError::with_source(
326 PutErrorKind::PublishChunks,
327 format!("failed chunk publish: {}", err),
328 )
329 })?
330 .await
331 .map_err(|err| {
332 PutError::with_source(
333 PutErrorKind::PublishChunks,
334 format!("failed getting chunk ack: {}", err),
335 )
336 })?;
337 }
338 let digest = sha256.finish();
339
340 let encoded_object_name = encode_object_name(&object_meta.name);
341 if !is_valid_object_name(&encoded_object_name) {
342 return Err(PutError::new(PutErrorKind::InvalidName));
343 }
344 let subject = format!("$O.{}.M.{}", &self.name, &encoded_object_name);
345
346 let object_info = ObjectInfo {
347 name: object_meta.name,
348 description: object_meta.description,
349 options: Some(ObjectOptions {
350 max_chunk_size: Some(chunk_size),
351 link: None,
352 }),
353 bucket: self.name.clone(),
354 nuid: object_nuid.to_string(),
355 chunks: object_chunks,
356 size: object_size,
357 digest: Some(format!("SHA-256={}", URL_SAFE.encode(digest))),
358 modified: Some(OffsetDateTime::now_utc()),
359 deleted: false,
360 metadata: object_meta.metadata,
361 headers: object_meta.headers,
362 };
363
364 let mut headers = HeaderMap::new();
365 headers.insert(
366 NATS_ROLLUP,
367 ROLLUP_SUBJECT.parse::<HeaderValue>().map_err(|err| {
368 PutError::with_source(
369 PutErrorKind::Other,
370 format!("failed parsing header: {}", err),
371 )
372 })?,
373 );
374 let data = serde_json::to_vec(&object_info).map_err(|err| {
375 PutError::with_source(
376 PutErrorKind::Other,
377 format!("failed serializing object info: {}", err),
378 )
379 })?;
380
381 self.stream
383 .context
384 .publish_with_headers(subject, headers, data.into())
385 .await
386 .map_err(|err| {
387 PutError::with_source(
388 PutErrorKind::PublishMetadata,
389 format!("failed publishing metadata: {}", err),
390 )
391 })?
392 .await
393 .map_err(|err| {
394 PutError::with_source(
395 PutErrorKind::PublishMetadata,
396 format!("failed ack from metadata publish: {}", err),
397 )
398 })?;
399
400 if let Some(existing_object_info) = maybe_existing_object_info {
402 let chunk_subject = format!("$O.{}.C.{}", &self.name, &existing_object_info.nuid);
403
404 self.stream
405 .purge()
406 .filter(&chunk_subject)
407 .await
408 .map_err(|err| PutError::with_source(PutErrorKind::PurgeOldChunks, err))?;
409 }
410
411 Ok(object_info)
412 }
413
414 pub async fn watch(&self) -> Result<Watch, WatchError> {
434 self.watch_with_deliver_policy(DeliverPolicy::New).await
435 }
436
437 pub async fn watch_with_history(&self) -> Result<Watch, WatchError> {
440 self.watch_with_deliver_policy(DeliverPolicy::LastPerSubject)
441 .await
442 }
443
444 async fn watch_with_deliver_policy(
445 &self,
446 deliver_policy: DeliverPolicy,
447 ) -> Result<Watch, WatchError> {
448 let subject = format!("$O.{}.M.>", self.name);
449 let ordered = self
450 .stream
451 .create_consumer(crate::jetstream::consumer::push::OrderedConfig {
452 deliver_policy,
453 deliver_subject: self.stream.context.client.new_inbox(),
454 description: Some("object store watcher".to_string()),
455 filter_subject: subject,
456 ..Default::default()
457 })
458 .await?;
459 Ok(Watch {
460 subscription: ordered.messages().await?,
461 })
462 }
463
464 pub async fn list(&self) -> Result<List, ListError> {
484 trace!("starting Object List");
485 let subject = format!("$O.{}.M.>", self.name);
486 let ordered = self
487 .stream
488 .create_consumer(crate::jetstream::consumer::push::OrderedConfig {
489 deliver_policy: super::consumer::DeliverPolicy::All,
490 deliver_subject: self.stream.context.client.new_inbox(),
491 description: Some("object store list".to_string()),
492 filter_subject: subject,
493 ..Default::default()
494 })
495 .await?;
496 Ok(List {
497 done: ordered.info.num_pending == 0,
498 subscription: Some(ordered.messages().await?),
499 })
500 }
501
502 pub async fn seal(&mut self) -> Result<(), SealError> {
519 let mut stream_config = self
520 .stream
521 .info()
522 .await
523 .map_err(|err| SealError::with_source(SealErrorKind::Info, err))?
524 .to_owned();
525 stream_config.config.sealed = true;
526
527 self.stream
528 .context
529 .update_stream(&stream_config.config)
530 .await?;
531 Ok(())
532 }
533
534 pub async fn update_metadata<A: AsRef<str>>(
560 &self,
561 object: A,
562 metadata: UpdateMetadata,
563 ) -> Result<ObjectInfo, UpdateMetadataError> {
564 let mut info = self.info(object.as_ref()).await?;
565
566 if metadata.name != info.name {
569 tracing::info!("new metadata name is different than then old one");
570 if !is_valid_object_name(&metadata.name) {
571 return Err(UpdateMetadataError::new(
572 UpdateMetadataErrorKind::InvalidName,
573 ));
574 }
575 match self.info(&metadata.name).await {
576 Ok(_) => {
577 return Err(UpdateMetadataError::new(
578 UpdateMetadataErrorKind::NameAlreadyInUse,
579 ))
580 }
581 Err(err) => match err.kind() {
582 InfoErrorKind::NotFound => {
583 tracing::info!("purging old metadata: {}", info.name);
584 self.stream
585 .purge()
586 .filter(format!(
587 "$O.{}.M.{}",
588 self.name,
589 encode_object_name(&info.name)
590 ))
591 .await
592 .map_err(|err| {
593 UpdateMetadataError::with_source(
594 UpdateMetadataErrorKind::Purge,
595 err,
596 )
597 })?;
598 }
599 _ => {
600 return Err(UpdateMetadataError::with_source(
601 UpdateMetadataErrorKind::Other,
602 err,
603 ))
604 }
605 },
606 }
607 }
608
609 info.name = metadata.name;
610 info.description = metadata.description;
611
612 let name = encode_object_name(&info.name);
613 let subject = format!("$O.{}.M.{}", &self.name, &name);
614
615 let mut headers = HeaderMap::new();
616 headers.insert(
617 NATS_ROLLUP,
618 ROLLUP_SUBJECT.parse::<HeaderValue>().map_err(|err| {
619 UpdateMetadataError::with_source(
620 UpdateMetadataErrorKind::Other,
621 format!("failed parsing header: {}", err),
622 )
623 })?,
624 );
625 let data = serde_json::to_vec(&info).map_err(|err| {
626 UpdateMetadataError::with_source(
627 UpdateMetadataErrorKind::Other,
628 format!("failed serializing object info: {}", err),
629 )
630 })?;
631
632 self.stream
634 .context
635 .publish_with_headers(subject, headers, data.into())
636 .await
637 .map_err(|err| {
638 UpdateMetadataError::with_source(
639 UpdateMetadataErrorKind::PublishMetadata,
640 format!("failed publishing metadata: {}", err),
641 )
642 })?
643 .await
644 .map_err(|err| {
645 UpdateMetadataError::with_source(
646 UpdateMetadataErrorKind::PublishMetadata,
647 format!("failed ack from metadata publish: {}", err),
648 )
649 })?;
650
651 Ok(info)
652 }
653
654 pub async fn add_link<T, O>(&self, name: T, object: O) -> Result<ObjectInfo, AddLinkError>
674 where
675 T: ToString,
676 O: AsObjectInfo,
677 {
678 let object = object.as_info();
679 let name = name.to_string();
680 if name.is_empty() {
681 return Err(AddLinkError::new(AddLinkErrorKind::EmptyName));
682 }
683 if object.name.is_empty() {
684 return Err(AddLinkError::new(AddLinkErrorKind::ObjectRequired));
685 }
686 if object.deleted {
687 return Err(AddLinkError::new(AddLinkErrorKind::Deleted));
688 }
689 if let Some(ref options) = object.options {
690 if options.link.is_some() {
691 return Err(AddLinkError::new(AddLinkErrorKind::LinkToLink));
692 }
693 }
694 match self.info(&name).await {
695 Ok(info) => {
696 if let Some(options) = info.options {
697 if options.link.is_none() {
698 return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists));
699 }
700 } else {
701 return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists));
702 }
703 }
704 Err(err) if err.kind() != InfoErrorKind::NotFound => {
705 return Err(AddLinkError::with_source(AddLinkErrorKind::Other, err))
706 }
707 _ => (),
708 }
709
710 let info = ObjectInfo {
711 name,
712 description: None,
713 options: Some(ObjectOptions {
714 link: Some(ObjectLink {
715 name: Some(object.name.clone()),
716 bucket: object.bucket.clone(),
717 }),
718 max_chunk_size: None,
719 }),
720 bucket: self.name.clone(),
721 nuid: nuid::next().to_string(),
722 size: 0,
723 chunks: 0,
724 modified: Some(OffsetDateTime::now_utc()),
725 digest: None,
726 deleted: false,
727 metadata: HashMap::default(),
728 headers: None,
729 };
730 publish_meta(self, &info).await?;
731 Ok(info)
732 }
733
734 pub async fn add_bucket_link<T: ToString, U: ToString>(
754 &self,
755 name: T,
756 bucket: U,
757 ) -> Result<ObjectInfo, AddLinkError> {
758 let name = name.to_string();
759 let bucket = bucket.to_string();
760 if name.is_empty() {
761 return Err(AddLinkError::new(AddLinkErrorKind::EmptyName));
762 }
763
764 match self.info(&name).await {
765 Ok(info) => {
766 if let Some(options) = info.options {
767 if options.link.is_none() {
768 return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists));
769 }
770 }
771 }
772 Err(err) if err.kind() != InfoErrorKind::NotFound => {
773 return Err(AddLinkError::with_source(AddLinkErrorKind::Other, err))
774 }
775 _ => (),
776 }
777
778 let info = ObjectInfo {
779 name: name.clone(),
780 description: None,
781 options: Some(ObjectOptions {
782 link: Some(ObjectLink { name: None, bucket }),
783 max_chunk_size: None,
784 }),
785 bucket: self.name.clone(),
786 nuid: nuid::next().to_string(),
787 size: 0,
788 chunks: 0,
789 modified: Some(OffsetDateTime::now_utc()),
790 digest: None,
791 deleted: false,
792 metadata: HashMap::default(),
793 headers: None,
794 };
795 publish_meta(self, &info).await?;
796 Ok(info)
797 }
798}
799
800async fn publish_meta(store: &ObjectStore, info: &ObjectInfo) -> Result<(), PublishMetadataError> {
801 let encoded_object_name = encode_object_name(&info.name);
802 let subject = format!("$O.{}.M.{}", &store.name, &encoded_object_name);
803
804 let mut headers = HeaderMap::new();
805 headers.insert(
806 NATS_ROLLUP,
807 ROLLUP_SUBJECT.parse::<HeaderValue>().map_err(|err| {
808 PublishMetadataError::with_source(
809 PublishMetadataErrorKind::Other,
810 format!("failed parsing header: {}", err),
811 )
812 })?,
813 );
814 let data = serde_json::to_vec(&info).map_err(|err| {
815 PublishMetadataError::with_source(
816 PublishMetadataErrorKind::Other,
817 format!("failed serializing object info: {}", err),
818 )
819 })?;
820
821 store
822 .stream
823 .context
824 .publish_with_headers(subject, headers, data.into())
825 .await
826 .map_err(|err| {
827 PublishMetadataError::with_source(
828 PublishMetadataErrorKind::PublishMetadata,
829 format!("failed publishing metadata: {}", err),
830 )
831 })?
832 .await
833 .map_err(|err| {
834 PublishMetadataError::with_source(
835 PublishMetadataErrorKind::PublishMetadata,
836 format!("failed ack from metadata publish: {}", err),
837 )
838 })?;
839 Ok(())
840}
841
842pub struct Watch {
843 subscription: crate::jetstream::consumer::push::Ordered,
844}
845
846impl Stream for Watch {
847 type Item = Result<ObjectInfo, WatcherError>;
848
849 fn poll_next(
850 mut self: std::pin::Pin<&mut Self>,
851 cx: &mut std::task::Context<'_>,
852 ) -> Poll<Option<Self::Item>> {
853 match self.subscription.poll_next_unpin(cx) {
854 Poll::Ready(message) => match message {
855 Some(message) => Poll::Ready(
856 serde_json::from_slice::<ObjectInfo>(&message?.payload)
857 .map_err(|err| {
858 WatcherError::with_source(
859 WatcherErrorKind::Other,
860 format!("failed to deserialize object info: {}", err),
861 )
862 })
863 .map_or_else(|err| Some(Err(err)), |result| Some(Ok(result))),
864 ),
865 None => Poll::Ready(None),
866 },
867 Poll::Pending => Poll::Pending,
868 }
869 }
870}
871
872pub struct List {
873 subscription: Option<crate::jetstream::consumer::push::Ordered>,
874 done: bool,
875}
876
877impl Stream for List {
878 type Item = Result<ObjectInfo, ListerError>;
879
880 fn poll_next(
881 mut self: std::pin::Pin<&mut Self>,
882 cx: &mut std::task::Context<'_>,
883 ) -> Poll<Option<Self::Item>> {
884 loop {
885 if self.done {
886 debug!("Object Store list done");
887 self.subscription = None;
888 return Poll::Ready(None);
889 }
890
891 if let Some(subscription) = self.subscription.as_mut() {
892 match subscription.poll_next_unpin(cx) {
893 Poll::Ready(message) => match message {
894 None => return Poll::Ready(None),
895 Some(message) => {
896 let message = message?;
897 let info = message.info().map_err(|err| {
898 ListerError::with_source(ListerErrorKind::Other, err)
899 })?;
900 trace!("num pending: {}", info.pending);
901 if info.pending == 0 {
902 self.done = true;
903 }
904 let response: ObjectInfo = serde_json::from_slice(&message.payload)
905 .map_err(|err| {
906 ListerError::with_source(
907 ListerErrorKind::Other,
908 format!("failed deserializing object info: {}", err),
909 )
910 })?;
911 if response.deleted {
912 continue;
913 }
914 return Poll::Ready(Some(Ok(response)));
915 }
916 },
917 Poll::Pending => return Poll::Pending,
918 }
919 } else {
920 return Poll::Ready(None);
921 }
922 }
923 }
924}
925
926pub struct Object {
928 pub info: ObjectInfo,
929 remaining_bytes: VecDeque<u8>,
930 has_pending_messages: bool,
931 digest: Option<Sha256>,
932 subscription: Option<crate::jetstream::consumer::push::Ordered>,
933 subscription_future: Option<BoxFuture<'static, Result<Ordered, StreamError>>>,
934 stream: crate::jetstream::stream::Stream,
935}
936
937impl Object {
938 pub(crate) fn new(info: ObjectInfo, stream: stream::Stream) -> Self {
939 Object {
940 subscription: None,
941 info,
942 remaining_bytes: VecDeque::new(),
943 has_pending_messages: true,
944 digest: Some(Sha256::new()),
945 subscription_future: None,
946 stream,
947 }
948 }
949
950 pub fn info(&self) -> &ObjectInfo {
952 &self.info
953 }
954}
955
956impl tokio::io::AsyncRead for Object {
957 fn poll_read(
958 mut self: std::pin::Pin<&mut Self>,
959 cx: &mut std::task::Context<'_>,
960 buf: &mut tokio::io::ReadBuf<'_>,
961 ) -> std::task::Poll<std::io::Result<()>> {
962 let (buf1, _buf2) = self.remaining_bytes.as_slices();
963 if !buf1.is_empty() {
964 let len = cmp::min(buf.remaining(), buf1.len());
965 buf.put_slice(&buf1[..len]);
966 self.remaining_bytes.drain(..len);
967 return Poll::Ready(Ok(()));
968 }
969
970 if self.has_pending_messages {
971 if self.subscription.is_none() {
972 let future = match self.subscription_future.as_mut() {
973 Some(future) => future,
974 None => {
975 let stream = self.stream.clone();
976 let bucket = self.info.bucket.clone();
977 let nuid = self.info.nuid.clone();
978 self.subscription_future.insert(Box::pin(async move {
979 stream
980 .create_consumer(OrderedConfig {
981 deliver_subject: stream.context.client.new_inbox(),
982 filter_subject: format!("$O.{}.C.{}", bucket, nuid),
983 ..Default::default()
984 })
985 .await
986 .unwrap()
987 .messages()
988 .await
989 }))
990 }
991 };
992 match future.as_mut().poll(cx) {
993 Poll::Ready(subscription) => {
994 self.subscription = Some(subscription.unwrap());
995 }
996 Poll::Pending => (),
997 }
998 }
999 if let Some(subscription) = self.subscription.as_mut() {
1000 match subscription.poll_next_unpin(cx) {
1001 Poll::Ready(message) => match message {
1002 Some(message) => {
1003 let message = message.map_err(|err| {
1004 std::io::Error::new(
1005 std::io::ErrorKind::Other,
1006 format!("error from JetStream subscription: {err}"),
1007 )
1008 })?;
1009 let len = cmp::min(buf.remaining(), message.payload.len());
1010 buf.put_slice(&message.payload[..len]);
1011 if let Some(context) = &mut self.digest {
1012 context.update(&message.payload);
1013 }
1014 self.remaining_bytes.extend(&message.payload[len..]);
1015
1016 let info = message.info().map_err(|err| {
1017 std::io::Error::new(
1018 std::io::ErrorKind::Other,
1019 format!("error from JetStream subscription: {err}"),
1020 )
1021 })?;
1022 if info.pending == 0 {
1023 let digest = self.digest.take().map(Sha256::finish);
1024 if let Some(digest) = digest {
1025 if self
1026 .info
1027 .digest
1028 .as_ref()
1029 .map(|digest_self| {
1030 format!("SHA-256={}", URL_SAFE.encode(digest))
1031 != *digest_self
1032 })
1033 .unwrap_or(false)
1034 {
1035 return Poll::Ready(Err(std::io::Error::new(
1036 std::io::ErrorKind::InvalidData,
1037 "wrong digest",
1038 )));
1039 }
1040 } else {
1041 return Poll::Ready(Err(std::io::Error::new(
1042 std::io::ErrorKind::InvalidData,
1043 "digest should be Some",
1044 )));
1045 }
1046 self.has_pending_messages = false;
1047 self.subscription = None;
1048 }
1049 Poll::Ready(Ok(()))
1050 }
1051 None => Poll::Ready(Err(std::io::Error::new(
1052 std::io::ErrorKind::Other,
1053 "subscription ended before reading whole object",
1054 ))),
1055 },
1056 Poll::Pending => Poll::Pending,
1057 }
1058 } else {
1059 Poll::Pending
1060 }
1061 } else {
1062 Poll::Ready(Ok(()))
1063 }
1064 }
1065}
1066
1067#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
1068pub struct ObjectOptions {
1069 pub link: Option<ObjectLink>,
1070 pub max_chunk_size: Option<usize>,
1071}
1072
1073#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
1075pub struct ObjectInfo {
1076 pub name: String,
1078 #[serde(default)]
1080 pub description: Option<String>,
1081 #[serde(default)]
1083 pub metadata: HashMap<String, String>,
1084 #[serde(default)]
1086 pub headers: Option<HeaderMap>,
1087 #[serde(default)]
1089 pub options: Option<ObjectOptions>,
1090 pub bucket: String,
1092 #[serde(default)]
1094 pub nuid: String,
1095 #[serde(default)]
1097 pub size: usize,
1098 #[serde(default)]
1100 pub chunks: usize,
1101 #[serde(default, with = "rfc3339::option")]
1103 #[serde(rename = "mtime")]
1104 pub modified: Option<time::OffsetDateTime>,
1105 #[serde(default, skip_serializing_if = "Option::is_none")]
1107 pub digest: Option<String>,
1108 #[serde(default, skip_serializing_if = "is_default")]
1110 pub deleted: bool,
1111}
1112
1113fn is_default<T: Default + Eq>(t: &T) -> bool {
1114 t == &T::default()
1115}
1116#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
1118pub struct ObjectLink {
1119 pub name: Option<String>,
1121 pub bucket: String,
1123}
1124
1125#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
1126pub struct UpdateMetadata {
1127 pub name: String,
1129 pub description: Option<String>,
1131 #[serde(default)]
1133 pub metadata: HashMap<String, String>,
1134 pub headers: Option<HeaderMap>,
1136}
1137
1138#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
1140pub struct ObjectMetadata {
1141 pub name: String,
1143 pub description: Option<String>,
1145 pub chunk_size: Option<usize>,
1147 #[serde(default)]
1149 pub metadata: HashMap<String, String>,
1150 pub headers: Option<HeaderMap>,
1152}
1153
1154impl From<&str> for ObjectMetadata {
1155 fn from(s: &str) -> ObjectMetadata {
1156 ObjectMetadata {
1157 name: s.to_string(),
1158 ..Default::default()
1159 }
1160 }
1161}
1162
1163pub trait AsObjectInfo {
1164 fn as_info(&self) -> &ObjectInfo;
1165}
1166
1167impl AsObjectInfo for &Object {
1168 fn as_info(&self) -> &ObjectInfo {
1169 &self.info
1170 }
1171}
1172impl AsObjectInfo for &ObjectInfo {
1173 fn as_info(&self) -> &ObjectInfo {
1174 self
1175 }
1176}
1177
1178impl From<ObjectInfo> for ObjectMetadata {
1179 fn from(info: ObjectInfo) -> Self {
1180 ObjectMetadata {
1181 name: info.name,
1182 description: info.description,
1183 metadata: info.metadata,
1184 headers: info.headers,
1185 chunk_size: None,
1186 }
1187 }
1188}
1189
1190#[derive(Debug, PartialEq, Clone)]
1191pub enum UpdateMetadataErrorKind {
1192 InvalidName,
1193 NotFound,
1194 TimedOut,
1195 Other,
1196 PublishMetadata,
1197 NameAlreadyInUse,
1198 Purge,
1199}
1200
1201impl Display for UpdateMetadataErrorKind {
1202 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1203 match self {
1204 Self::InvalidName => write!(f, "invalid object name"),
1205 Self::NotFound => write!(f, "object not found"),
1206 Self::TimedOut => write!(f, "timed out"),
1207 Self::Other => write!(f, "error"),
1208 Self::PublishMetadata => {
1209 write!(f, "failed publishing metadata")
1210 }
1211 Self::NameAlreadyInUse => {
1212 write!(f, "object with updated name already exists")
1213 }
1214 Self::Purge => write!(f, "failed purging old name metadata"),
1215 }
1216 }
1217}
1218
1219impl From<InfoError> for UpdateMetadataError {
1220 fn from(error: InfoError) -> Self {
1221 match error.kind() {
1222 InfoErrorKind::InvalidName => {
1223 UpdateMetadataError::new(UpdateMetadataErrorKind::InvalidName)
1224 }
1225 InfoErrorKind::NotFound => UpdateMetadataError::new(UpdateMetadataErrorKind::NotFound),
1226 InfoErrorKind::Other => {
1227 UpdateMetadataError::with_source(UpdateMetadataErrorKind::Other, error)
1228 }
1229 InfoErrorKind::TimedOut => UpdateMetadataError::new(UpdateMetadataErrorKind::TimedOut),
1230 }
1231 }
1232}
1233
1234pub type UpdateMetadataError = Error<UpdateMetadataErrorKind>;
1235
1236#[derive(Clone, Copy, Debug, PartialEq)]
1237pub enum InfoErrorKind {
1238 InvalidName,
1239 NotFound,
1240 Other,
1241 TimedOut,
1242}
1243
1244impl Display for InfoErrorKind {
1245 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1246 match self {
1247 Self::InvalidName => write!(f, "invalid object name"),
1248 Self::Other => write!(f, "getting info failed"),
1249 Self::NotFound => write!(f, "not found"),
1250 Self::TimedOut => write!(f, "timed out"),
1251 }
1252 }
1253}
1254
1255pub type InfoError = Error<InfoErrorKind>;
1256
1257#[derive(Clone, Copy, Debug, PartialEq)]
1258pub enum GetErrorKind {
1259 InvalidName,
1260 ConsumerCreate,
1261 NotFound,
1262 BucketLink,
1263 Other,
1264 TimedOut,
1265}
1266
1267impl Display for GetErrorKind {
1268 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1269 match self {
1270 Self::ConsumerCreate => write!(f, "failed creating consumer for fetching object"),
1271 Self::Other => write!(f, "failed getting object"),
1272 Self::NotFound => write!(f, "object not found"),
1273 Self::TimedOut => write!(f, "timed out"),
1274 Self::InvalidName => write!(f, "invalid object name"),
1275 Self::BucketLink => write!(f, "object is a link to a bucket"),
1276 }
1277 }
1278}
1279
1280pub type GetError = Error<GetErrorKind>;
1281
1282crate::from_with_timeout!(GetError, GetErrorKind, ConsumerError, ConsumerErrorKind);
1283crate::from_with_timeout!(GetError, GetErrorKind, StreamError, StreamErrorKind);
1284
1285impl From<InfoError> for GetError {
1286 fn from(err: InfoError) -> Self {
1287 match err.kind() {
1288 InfoErrorKind::InvalidName => GetError::new(GetErrorKind::InvalidName),
1289 InfoErrorKind::NotFound => GetError::new(GetErrorKind::NotFound),
1290 InfoErrorKind::Other => GetError::with_source(GetErrorKind::Other, err),
1291 InfoErrorKind::TimedOut => GetError::new(GetErrorKind::TimedOut),
1292 }
1293 }
1294}
1295
1296#[derive(Clone, Copy, Debug, PartialEq)]
1297pub enum DeleteErrorKind {
1298 TimedOut,
1299 NotFound,
1300 Metadata,
1301 InvalidName,
1302 Chunks,
1303 Other,
1304}
1305
1306impl Display for DeleteErrorKind {
1307 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1308 match self {
1309 Self::TimedOut => write!(f, "timed out"),
1310 Self::Metadata => write!(f, "failed rolling up metadata"),
1311 Self::Chunks => write!(f, "failed purging chunks"),
1312 Self::Other => write!(f, "delete failed"),
1313 Self::NotFound => write!(f, "object not found"),
1314 Self::InvalidName => write!(f, "invalid object name"),
1315 }
1316 }
1317}
1318
1319pub type DeleteError = Error<DeleteErrorKind>;
1320
1321impl From<InfoError> for DeleteError {
1322 fn from(err: InfoError) -> Self {
1323 match err.kind() {
1324 InfoErrorKind::InvalidName => DeleteError::new(DeleteErrorKind::InvalidName),
1325 InfoErrorKind::NotFound => DeleteError::new(DeleteErrorKind::NotFound),
1326 InfoErrorKind::Other => DeleteError::with_source(DeleteErrorKind::Other, err),
1327 InfoErrorKind::TimedOut => DeleteError::new(DeleteErrorKind::TimedOut),
1328 }
1329 }
1330}
1331
1332crate::from_with_timeout!(DeleteError, DeleteErrorKind, PublishError, PublishErrorKind);
1333crate::from_with_timeout!(DeleteError, DeleteErrorKind, PurgeError, PurgeErrorKind);
1334
1335#[derive(Clone, Copy, Debug, PartialEq)]
1336pub enum PutErrorKind {
1337 InvalidName,
1338 ReadChunks,
1339 PublishChunks,
1340 PublishMetadata,
1341 PurgeOldChunks,
1342 TimedOut,
1343 Other,
1344}
1345
1346impl Display for PutErrorKind {
1347 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1348 match self {
1349 Self::PublishChunks => write!(f, "failed publishing object chunks"),
1350 Self::PublishMetadata => write!(f, "failed publishing metadata"),
1351 Self::PurgeOldChunks => write!(f, "failed purging old chunks"),
1352 Self::TimedOut => write!(f, "timed out"),
1353 Self::Other => write!(f, "error"),
1354 Self::InvalidName => write!(f, "invalid object name"),
1355 Self::ReadChunks => write!(f, "error while reading the buffer"),
1356 }
1357 }
1358}
1359
1360pub type PutError = Error<PutErrorKind>;
1361
1362pub type AddLinkError = Error<AddLinkErrorKind>;
1363
1364#[derive(Clone, Copy, Debug, PartialEq)]
1365pub enum AddLinkErrorKind {
1366 EmptyName,
1367 ObjectRequired,
1368 Deleted,
1369 LinkToLink,
1370 PublishMetadata,
1371 AlreadyExists,
1372 Other,
1373}
1374
1375impl Display for AddLinkErrorKind {
1376 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1377 match self {
1378 AddLinkErrorKind::ObjectRequired => write!(f, "cannot link to empty Object"),
1379 AddLinkErrorKind::Deleted => write!(f, "cannot link a deleted Object"),
1380 AddLinkErrorKind::LinkToLink => write!(f, "cannot link to another link"),
1381 AddLinkErrorKind::EmptyName => write!(f, "link name cannot be empty"),
1382 AddLinkErrorKind::PublishMetadata => write!(f, "failed publishing link metadata"),
1383 AddLinkErrorKind::Other => write!(f, "error"),
1384 AddLinkErrorKind::AlreadyExists => write!(f, "object already exists"),
1385 }
1386 }
1387}
1388
1389type PublishMetadataError = Error<PublishMetadataErrorKind>;
1390
1391#[derive(Clone, Copy, Debug, PartialEq)]
1392enum PublishMetadataErrorKind {
1393 PublishMetadata,
1394 Other,
1395}
1396
1397impl Display for PublishMetadataErrorKind {
1398 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1399 match self {
1400 PublishMetadataErrorKind::PublishMetadata => write!(f, "failed to publish metadata"),
1401 PublishMetadataErrorKind::Other => write!(f, "error"),
1402 }
1403 }
1404}
1405
1406impl From<PublishMetadataError> for AddLinkError {
1407 fn from(error: PublishMetadataError) -> Self {
1408 match error.kind {
1409 PublishMetadataErrorKind::PublishMetadata => {
1410 AddLinkError::new(AddLinkErrorKind::PublishMetadata)
1411 }
1412 PublishMetadataErrorKind::Other => {
1413 AddLinkError::with_source(AddLinkErrorKind::Other, error)
1414 }
1415 }
1416 }
1417}
1418impl From<PublishMetadataError> for PutError {
1419 fn from(error: PublishMetadataError) -> Self {
1420 match error.kind {
1421 PublishMetadataErrorKind::PublishMetadata => {
1422 PutError::new(PutErrorKind::PublishMetadata)
1423 }
1424 PublishMetadataErrorKind::Other => PutError::with_source(PutErrorKind::Other, error),
1425 }
1426 }
1427}
1428
1429#[derive(Clone, Copy, Debug, PartialEq)]
1430pub enum WatchErrorKind {
1431 TimedOut,
1432 ConsumerCreate,
1433 Other,
1434}
1435
1436impl Display for WatchErrorKind {
1437 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1438 match self {
1439 Self::ConsumerCreate => write!(f, "watch consumer creation failed"),
1440 Self::Other => write!(f, "watch failed"),
1441 Self::TimedOut => write!(f, "timed out"),
1442 }
1443 }
1444}
1445
1446pub type WatchError = Error<WatchErrorKind>;
1447
1448crate::from_with_timeout!(WatchError, WatchErrorKind, ConsumerError, ConsumerErrorKind);
1449crate::from_with_timeout!(WatchError, WatchErrorKind, StreamError, StreamErrorKind);
1450
1451pub type ListError = WatchError;
1452pub type ListErrorKind = WatchErrorKind;
1453
1454#[derive(Clone, Copy, Debug, PartialEq)]
1455pub enum SealErrorKind {
1456 TimedOut,
1457 Other,
1458 Info,
1459 Update,
1460}
1461
1462impl Display for SealErrorKind {
1463 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1464 match self {
1465 Self::TimedOut => write!(f, "timed out"),
1466 Self::Other => write!(f, "seal failed"),
1467 Self::Info => write!(f, "failed getting stream info before sealing bucket"),
1468 Self::Update => write!(f, "failed sealing the bucket"),
1469 }
1470 }
1471}
1472
1473pub type SealError = Error<SealErrorKind>;
1474
1475impl From<super::context::UpdateStreamError> for SealError {
1476 fn from(err: super::context::UpdateStreamError) -> Self {
1477 match err.kind() {
1478 super::context::CreateStreamErrorKind::TimedOut => {
1479 SealError::new(SealErrorKind::TimedOut)
1480 }
1481 _ => SealError::with_source(SealErrorKind::Update, err),
1482 }
1483 }
1484}
1485
1486#[derive(Clone, Copy, Debug, PartialEq)]
1487pub enum WatcherErrorKind {
1488 ConsumerError,
1489 Other,
1490}
1491
1492impl Display for WatcherErrorKind {
1493 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1494 match self {
1495 Self::ConsumerError => write!(f, "watcher consumer error"),
1496 Self::Other => write!(f, "watcher error"),
1497 }
1498 }
1499}
1500
1501pub type WatcherError = Error<WatcherErrorKind>;
1502
1503impl From<OrderedError> for WatcherError {
1504 fn from(err: OrderedError) -> Self {
1505 WatcherError::with_source(WatcherErrorKind::ConsumerError, err)
1506 }
1507}
1508
1509pub type ListerError = WatcherError;
1510pub type ListerErrorKind = WatcherErrorKind;