1use std::collections::{HashMap, VecDeque};
16use std::fmt::Display;
17use std::{cmp, str::FromStr, task::Poll, time::Duration};
18
19use crate::crypto::Sha256;
20use crate::subject::Subject;
21use crate::{HeaderMap, HeaderValue};
22use base64::engine::general_purpose::URL_SAFE;
23use base64::engine::Engine;
24use bytes::BytesMut;
25use futures_util::future::BoxFuture;
26use std::sync::LazyLock;
27use tokio::io::AsyncReadExt;
28
29use futures_util::{Stream, StreamExt};
30use regex::Regex;
31use serde::{Deserialize, Serialize};
32use tracing::{debug, trace};
33
34use super::consumer::push::{OrderedConfig, OrderedError};
35use super::consumer::{DeliverPolicy, StreamError, StreamErrorKind};
36use super::context::{PublishError, PublishErrorKind};
37use super::stream::{self, ConsumerError, ConsumerErrorKind, PurgeError, PurgeErrorKind};
38use super::{consumer::push::Ordered, stream::StorageType};
39use crate::error::Error;
40use time::{serde::rfc3339, OffsetDateTime};
41
42const DEFAULT_CHUNK_SIZE: usize = 128 * 1024;
43const NATS_ROLLUP: &str = "Nats-Rollup";
44const ROLLUP_SUBJECT: &str = "sub";
45
46static BUCKET_NAME_RE: LazyLock<Regex> =
47 LazyLock::new(|| Regex::new(r"\A[a-zA-Z0-9_-]+\z").unwrap());
48static OBJECT_NAME_RE: LazyLock<Regex> =
49 LazyLock::new(|| Regex::new(r"\A[-/_=\.a-zA-Z0-9]+\z").unwrap());
50
51pub(crate) fn is_valid_bucket_name(bucket_name: &str) -> bool {
52 BUCKET_NAME_RE.is_match(bucket_name)
53}
54
55pub(crate) fn is_valid_object_name(object_name: &str) -> bool {
56 if object_name.is_empty() || object_name.starts_with('.') || object_name.ends_with('.') {
57 return false;
58 }
59
60 OBJECT_NAME_RE.is_match(object_name)
61}
62
63pub(crate) fn encode_object_name(object_name: &str) -> String {
64 URL_SAFE.encode(object_name)
65}
66
67#[derive(Debug, Default, Clone, Serialize, Deserialize)]
69pub struct Config {
70 pub bucket: String,
72 pub description: Option<String>,
74 #[serde(default, with = "serde_nanos")]
76 pub max_age: Duration,
77 pub max_bytes: i64,
79 pub storage: StorageType,
81 pub num_replicas: usize,
83 pub compression: bool,
85 pub placement: Option<stream::Placement>,
87}
88
89#[derive(Clone)]
91pub struct ObjectStore {
92 pub(crate) name: String,
93 pub(crate) stream: crate::jetstream::stream::Stream,
94}
95
96impl ObjectStore {
97 pub async fn get<T: AsRef<str> + Send>(&self, object_name: T) -> Result<Object, GetError> {
121 self.get_impl(object_name).await
122 }
123
124 fn get_impl<'bucket, 'future, T>(
125 &'bucket self,
126 object_name: T,
127 ) -> BoxFuture<'future, Result<Object, GetError>>
128 where
129 T: AsRef<str> + Send + 'future,
130 'bucket: 'future,
131 {
132 Box::pin(async move {
133 let object_info = self.info(object_name).await?;
134 if object_info.deleted {
135 return Err(GetError::new(GetErrorKind::NotFound));
136 }
137 if let Some(ref options) = object_info.options {
138 if let Some(link) = options.link.as_ref() {
139 if let Some(link_name) = link.name.as_ref() {
140 let link_name = link_name.clone();
141 debug!("getting object via link");
142 if link.bucket == self.name {
143 return self.get_impl(link_name).await;
144 } else {
145 let bucket = self
146 .stream
147 .context
148 .get_object_store(&link.bucket)
149 .await
150 .map_err(|err| {
151 GetError::with_source(GetErrorKind::Other, err)
152 })?;
153 let object = bucket.get_impl(&link_name).await?;
154 return Ok(object);
155 }
156 } else {
157 return Err(GetError::new(GetErrorKind::BucketLink));
158 }
159 }
160 }
161
162 debug!("not a link. Getting the object");
163 Ok(Object::new(object_info, self.stream.clone()))
164 })
165 }
166
167 pub async fn delete<T: AsRef<str>>(&self, object_name: T) -> Result<(), DeleteError> {
183 let object_name = object_name.as_ref();
184 let mut object_info = self.info(object_name).await?;
185 object_info.chunks = 0;
186 object_info.size = 0;
187 object_info.deleted = true;
188
189 let data = serde_json::to_vec(&object_info).map_err(|err| {
190 DeleteError::with_source(
191 DeleteErrorKind::Other,
192 format!("failed deserializing object info: {err}"),
193 )
194 })?;
195
196 let mut headers = HeaderMap::default();
197 headers.insert(
198 NATS_ROLLUP,
199 HeaderValue::from_str(ROLLUP_SUBJECT).map_err(|err| {
200 DeleteError::with_source(
201 DeleteErrorKind::Other,
202 format!("failed parsing header: {err}"),
203 )
204 })?,
205 );
206
207 let subject = format!("$O.{}.M.{}", &self.name, encode_object_name(object_name));
208
209 self.stream
210 .context
211 .publish_with_headers(subject, headers, data.into())
212 .await?
213 .await?;
214
215 let chunk_subject = format!("$O.{}.C.{}", self.name, object_info.nuid);
216
217 self.stream.purge().filter(&chunk_subject).await?;
218
219 Ok(())
220 }
221
222 pub async fn info<T: AsRef<str>>(&self, object_name: T) -> Result<ObjectInfo, InfoError> {
238 let object_name = object_name.as_ref();
239 let object_name = encode_object_name(object_name);
240 if !is_valid_object_name(&object_name) {
241 return Err(InfoError::new(InfoErrorKind::InvalidName));
242 }
243
244 let subject = format!("$O.{}.M.{}", &self.name, &object_name);
246
247 let message = self
249 .stream
250 .get_last_raw_message_by_subject(subject.as_str())
251 .await
252 .map_err(|err| match err.kind() {
253 stream::LastRawMessageErrorKind::NoMessageFound => {
254 InfoError::new(InfoErrorKind::NotFound)
255 }
256 _ => InfoError::with_source(InfoErrorKind::Other, err),
257 })?;
258 let object_info =
259 serde_json::from_slice::<ObjectInfo>(&message.payload).map_err(|err| {
260 InfoError::with_source(
261 InfoErrorKind::Other,
262 format!("failed to decode info payload: {err}"),
263 )
264 })?;
265
266 Ok(object_info)
267 }
268
269 pub async fn put<T>(
287 &self,
288 meta: T,
289 data: &mut (impl tokio::io::AsyncRead + std::marker::Unpin),
290 ) -> Result<ObjectInfo, PutError>
291 where
292 ObjectMetadata: From<T>,
293 {
294 let object_meta: ObjectMetadata = meta.into();
295
296 let maybe_existing_object_info = (self.info(&object_meta.name).await).ok();
298
299 let object_nuid = crate::id_generator::next();
300 let chunk_subject = Subject::from(format!("$O.{}.C.{}", &self.name, &object_nuid));
301
302 let mut object_chunks = 0;
303 let mut object_size = 0;
304
305 let chunk_size = object_meta.chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE);
306 let mut buffer = BytesMut::with_capacity(chunk_size);
307 let mut sha256 = Sha256::new();
308
309 loop {
310 let n = data
311 .read_buf(&mut buffer)
312 .await
313 .map_err(|err| PutError::with_source(PutErrorKind::ReadChunks, err))?;
314
315 if n == 0 {
316 break;
317 }
318
319 let payload = buffer.split().freeze();
320 sha256.update(&payload);
321
322 object_size += payload.len();
323 object_chunks += 1;
324
325 self.stream
326 .context
327 .publish(chunk_subject.clone(), payload)
328 .await
329 .map_err(|err| {
330 PutError::with_source(
331 PutErrorKind::PublishChunks,
332 format!("failed chunk publish: {err}"),
333 )
334 })?
335 .await
336 .map_err(|err| {
337 PutError::with_source(
338 PutErrorKind::PublishChunks,
339 format!("failed getting chunk ack: {err}"),
340 )
341 })?;
342 }
343 let digest = sha256.finish();
344
345 let encoded_object_name = encode_object_name(&object_meta.name);
346 if !is_valid_object_name(&encoded_object_name) {
347 return Err(PutError::new(PutErrorKind::InvalidName));
348 }
349 let subject = format!("$O.{}.M.{}", &self.name, &encoded_object_name);
350
351 let object_info = ObjectInfo {
352 name: object_meta.name,
353 description: object_meta.description,
354 options: Some(ObjectOptions {
355 max_chunk_size: Some(chunk_size),
356 link: None,
357 }),
358 bucket: self.name.clone(),
359 nuid: object_nuid.to_string(),
360 chunks: object_chunks,
361 size: object_size,
362 digest: Some(format!("SHA-256={}", URL_SAFE.encode(digest))),
363 modified: Some(OffsetDateTime::now_utc()),
364 deleted: false,
365 metadata: object_meta.metadata,
366 headers: object_meta.headers,
367 };
368
369 let mut headers = HeaderMap::new();
370 headers.insert(
371 NATS_ROLLUP,
372 ROLLUP_SUBJECT.parse::<HeaderValue>().map_err(|err| {
373 PutError::with_source(PutErrorKind::Other, format!("failed parsing header: {err}"))
374 })?,
375 );
376 let data = serde_json::to_vec(&object_info).map_err(|err| {
377 PutError::with_source(
378 PutErrorKind::Other,
379 format!("failed serializing object info: {err}"),
380 )
381 })?;
382
383 self.stream
385 .context
386 .publish_with_headers(subject, headers, data.into())
387 .await
388 .map_err(|err| {
389 PutError::with_source(
390 PutErrorKind::PublishMetadata,
391 format!("failed publishing metadata: {err}"),
392 )
393 })?
394 .await
395 .map_err(|err| {
396 PutError::with_source(
397 PutErrorKind::PublishMetadata,
398 format!("failed ack from metadata publish: {err}"),
399 )
400 })?;
401
402 if let Some(existing_object_info) = maybe_existing_object_info {
404 let chunk_subject = format!("$O.{}.C.{}", &self.name, &existing_object_info.nuid);
405
406 self.stream
407 .purge()
408 .filter(&chunk_subject)
409 .await
410 .map_err(|err| PutError::with_source(PutErrorKind::PurgeOldChunks, err))?;
411 }
412
413 Ok(object_info)
414 }
415
416 pub async fn watch(&self) -> Result<Watch, WatchError> {
436 self.watch_with_deliver_policy(DeliverPolicy::New).await
437 }
438
439 pub async fn watch_with_history(&self) -> Result<Watch, WatchError> {
442 self.watch_with_deliver_policy(DeliverPolicy::LastPerSubject)
443 .await
444 }
445
446 async fn watch_with_deliver_policy(
447 &self,
448 deliver_policy: DeliverPolicy,
449 ) -> Result<Watch, WatchError> {
450 let subject = format!("$O.{}.M.>", self.name);
451 let ordered = self
452 .stream
453 .create_consumer(crate::jetstream::consumer::push::OrderedConfig {
454 deliver_policy,
455 deliver_subject: self.stream.context.client.new_inbox(),
456 description: Some("object store watcher".to_string()),
457 filter_subject: subject,
458 ..Default::default()
459 })
460 .await?;
461 Ok(Watch {
462 subscription: ordered.messages().await?,
463 })
464 }
465
466 pub async fn list(&self) -> Result<List, ListError> {
486 trace!("starting Object List");
487 let subject = format!("$O.{}.M.>", self.name);
488 let ordered = self
489 .stream
490 .create_consumer(crate::jetstream::consumer::push::OrderedConfig {
491 deliver_policy: super::consumer::DeliverPolicy::All,
492 deliver_subject: self.stream.context.client.new_inbox(),
493 description: Some("object store list".to_string()),
494 filter_subject: subject,
495 ..Default::default()
496 })
497 .await?;
498 Ok(List {
499 done: ordered.info.num_pending == 0,
500 subscription: Some(ordered.messages().await?),
501 })
502 }
503
504 pub async fn seal(&mut self) -> Result<(), SealError> {
521 let mut stream_config = self
522 .stream
523 .info()
524 .await
525 .map_err(|err| SealError::with_source(SealErrorKind::Info, err))?
526 .to_owned();
527 stream_config.config.sealed = true;
528
529 self.stream
530 .context
531 .update_stream(&stream_config.config)
532 .await?;
533 Ok(())
534 }
535
536 pub async fn update_metadata<A: AsRef<str>>(
562 &self,
563 object: A,
564 metadata: UpdateMetadata,
565 ) -> Result<ObjectInfo, UpdateMetadataError> {
566 let mut info = self.info(object.as_ref()).await?;
567
568 if metadata.name != info.name {
571 tracing::info!("new metadata name is different than then old one");
572 if !is_valid_object_name(&metadata.name) {
573 return Err(UpdateMetadataError::new(
574 UpdateMetadataErrorKind::InvalidName,
575 ));
576 }
577 match self.info(&metadata.name).await {
578 Ok(_) => {
579 return Err(UpdateMetadataError::new(
580 UpdateMetadataErrorKind::NameAlreadyInUse,
581 ))
582 }
583 Err(err) => match err.kind() {
584 InfoErrorKind::NotFound => {
585 tracing::info!("purging old metadata: {}", info.name);
586 self.stream
587 .purge()
588 .filter(format!(
589 "$O.{}.M.{}",
590 self.name,
591 encode_object_name(&info.name)
592 ))
593 .await
594 .map_err(|err| {
595 UpdateMetadataError::with_source(
596 UpdateMetadataErrorKind::Purge,
597 err,
598 )
599 })?;
600 }
601 _ => {
602 return Err(UpdateMetadataError::with_source(
603 UpdateMetadataErrorKind::Other,
604 err,
605 ))
606 }
607 },
608 }
609 }
610
611 info.name = metadata.name;
612 info.description = metadata.description;
613
614 let name = encode_object_name(&info.name);
615 let subject = format!("$O.{}.M.{}", &self.name, &name);
616
617 let mut headers = HeaderMap::new();
618 headers.insert(
619 NATS_ROLLUP,
620 ROLLUP_SUBJECT.parse::<HeaderValue>().map_err(|err| {
621 UpdateMetadataError::with_source(
622 UpdateMetadataErrorKind::Other,
623 format!("failed parsing header: {err}"),
624 )
625 })?,
626 );
627 let data = serde_json::to_vec(&info).map_err(|err| {
628 UpdateMetadataError::with_source(
629 UpdateMetadataErrorKind::Other,
630 format!("failed serializing object info: {err}"),
631 )
632 })?;
633
634 self.stream
636 .context
637 .publish_with_headers(subject, headers, data.into())
638 .await
639 .map_err(|err| {
640 UpdateMetadataError::with_source(
641 UpdateMetadataErrorKind::PublishMetadata,
642 format!("failed publishing metadata: {err}"),
643 )
644 })?
645 .await
646 .map_err(|err| {
647 UpdateMetadataError::with_source(
648 UpdateMetadataErrorKind::PublishMetadata,
649 format!("failed ack from metadata publish: {err}"),
650 )
651 })?;
652
653 Ok(info)
654 }
655
656 pub async fn add_link<T, O>(&self, name: T, object: O) -> Result<ObjectInfo, AddLinkError>
676 where
677 T: ToString,
678 O: AsObjectInfo,
679 {
680 let object = object.as_info();
681 let name = name.to_string();
682 if name.is_empty() {
683 return Err(AddLinkError::new(AddLinkErrorKind::EmptyName));
684 }
685 if object.name.is_empty() {
686 return Err(AddLinkError::new(AddLinkErrorKind::ObjectRequired));
687 }
688 if object.deleted {
689 return Err(AddLinkError::new(AddLinkErrorKind::Deleted));
690 }
691 if let Some(ref options) = object.options {
692 if options.link.is_some() {
693 return Err(AddLinkError::new(AddLinkErrorKind::LinkToLink));
694 }
695 }
696 match self.info(&name).await {
697 Ok(info) => {
698 if let Some(options) = info.options {
699 if options.link.is_none() {
700 return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists));
701 }
702 } else {
703 return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists));
704 }
705 }
706 Err(err) if err.kind() != InfoErrorKind::NotFound => {
707 return Err(AddLinkError::with_source(AddLinkErrorKind::Other, err))
708 }
709 _ => (),
710 }
711
712 let info = ObjectInfo {
713 name,
714 description: None,
715 options: Some(ObjectOptions {
716 link: Some(ObjectLink {
717 name: Some(object.name.clone()),
718 bucket: object.bucket.clone(),
719 }),
720 max_chunk_size: None,
721 }),
722 bucket: self.name.clone(),
723 nuid: crate::id_generator::next(),
724 size: 0,
725 chunks: 0,
726 modified: Some(OffsetDateTime::now_utc()),
727 digest: None,
728 deleted: false,
729 metadata: HashMap::default(),
730 headers: None,
731 };
732 publish_meta(self, &info).await?;
733 Ok(info)
734 }
735
736 pub async fn add_bucket_link<T: ToString, U: ToString>(
756 &self,
757 name: T,
758 bucket: U,
759 ) -> Result<ObjectInfo, AddLinkError> {
760 let name = name.to_string();
761 let bucket = bucket.to_string();
762 if name.is_empty() {
763 return Err(AddLinkError::new(AddLinkErrorKind::EmptyName));
764 }
765
766 match self.info(&name).await {
767 Ok(info) => {
768 if let Some(options) = info.options {
769 if options.link.is_none() {
770 return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists));
771 }
772 }
773 }
774 Err(err) if err.kind() != InfoErrorKind::NotFound => {
775 return Err(AddLinkError::with_source(AddLinkErrorKind::Other, err))
776 }
777 _ => (),
778 }
779
780 let info = ObjectInfo {
781 name: name.clone(),
782 description: None,
783 options: Some(ObjectOptions {
784 link: Some(ObjectLink { name: None, bucket }),
785 max_chunk_size: None,
786 }),
787 bucket: self.name.clone(),
788 nuid: crate::id_generator::next(),
789 size: 0,
790 chunks: 0,
791 modified: Some(OffsetDateTime::now_utc()),
792 digest: None,
793 deleted: false,
794 metadata: HashMap::default(),
795 headers: None,
796 };
797 publish_meta(self, &info).await?;
798 Ok(info)
799 }
800}
801
802async fn publish_meta(store: &ObjectStore, info: &ObjectInfo) -> Result<(), PublishMetadataError> {
803 let encoded_object_name = encode_object_name(&info.name);
804 let subject = format!("$O.{}.M.{}", &store.name, &encoded_object_name);
805
806 let mut headers = HeaderMap::new();
807 headers.insert(
808 NATS_ROLLUP,
809 ROLLUP_SUBJECT.parse::<HeaderValue>().map_err(|err| {
810 PublishMetadataError::with_source(
811 PublishMetadataErrorKind::Other,
812 format!("failed parsing header: {err}"),
813 )
814 })?,
815 );
816 let data = serde_json::to_vec(&info).map_err(|err| {
817 PublishMetadataError::with_source(
818 PublishMetadataErrorKind::Other,
819 format!("failed serializing object info: {err}"),
820 )
821 })?;
822
823 store
824 .stream
825 .context
826 .publish_with_headers(subject, headers, data.into())
827 .await
828 .map_err(|err| {
829 PublishMetadataError::with_source(
830 PublishMetadataErrorKind::PublishMetadata,
831 format!("failed publishing metadata: {err}"),
832 )
833 })?
834 .await
835 .map_err(|err| {
836 PublishMetadataError::with_source(
837 PublishMetadataErrorKind::PublishMetadata,
838 format!("failed ack from metadata publish: {err}"),
839 )
840 })?;
841 Ok(())
842}
843
844pub struct Watch {
845 subscription: crate::jetstream::consumer::push::Ordered,
846}
847
848impl Stream for Watch {
849 type Item = Result<ObjectInfo, WatcherError>;
850
851 fn poll_next(
852 mut self: std::pin::Pin<&mut Self>,
853 cx: &mut std::task::Context<'_>,
854 ) -> Poll<Option<Self::Item>> {
855 match self.subscription.poll_next_unpin(cx) {
856 Poll::Ready(message) => match message {
857 Some(message) => Poll::Ready(
858 serde_json::from_slice::<ObjectInfo>(&message?.payload)
859 .map_err(|err| {
860 WatcherError::with_source(
861 WatcherErrorKind::Other,
862 format!("failed to deserialize object info: {err}"),
863 )
864 })
865 .map_or_else(|err| Some(Err(err)), |result| Some(Ok(result))),
866 ),
867 None => Poll::Ready(None),
868 },
869 Poll::Pending => Poll::Pending,
870 }
871 }
872}
873
874pub struct List {
875 subscription: Option<crate::jetstream::consumer::push::Ordered>,
876 done: bool,
877}
878
879impl Stream for List {
880 type Item = Result<ObjectInfo, ListerError>;
881
882 fn poll_next(
883 mut self: std::pin::Pin<&mut Self>,
884 cx: &mut std::task::Context<'_>,
885 ) -> Poll<Option<Self::Item>> {
886 loop {
887 if self.done {
888 debug!("Object Store list done");
889 self.subscription = None;
890 return Poll::Ready(None);
891 }
892
893 if let Some(subscription) = self.subscription.as_mut() {
894 match subscription.poll_next_unpin(cx) {
895 Poll::Ready(message) => match message {
896 None => return Poll::Ready(None),
897 Some(message) => {
898 let message = message?;
899 let info = message.info().map_err(|err| {
900 ListerError::with_source(ListerErrorKind::Other, err)
901 })?;
902 trace!("num pending: {}", info.pending);
903 if info.pending == 0 {
904 self.done = true;
905 }
906 let response: ObjectInfo = serde_json::from_slice(&message.payload)
907 .map_err(|err| {
908 ListerError::with_source(
909 ListerErrorKind::Other,
910 format!("failed deserializing object info: {err}"),
911 )
912 })?;
913 if response.deleted {
914 continue;
915 }
916 return Poll::Ready(Some(Ok(response)));
917 }
918 },
919 Poll::Pending => return Poll::Pending,
920 }
921 } else {
922 return Poll::Ready(None);
923 }
924 }
925 }
926}
927
928pub struct Object {
930 pub info: ObjectInfo,
931 remaining_bytes: VecDeque<u8>,
932 has_pending_messages: bool,
933 digest: Option<Sha256>,
934 subscription: Option<crate::jetstream::consumer::push::Ordered>,
935 subscription_future: Option<BoxFuture<'static, Result<Ordered, StreamError>>>,
936 stream: crate::jetstream::stream::Stream,
937}
938
939impl std::fmt::Debug for Object {
940 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
941 f.debug_struct("Object")
942 .field("info", &self.info)
943 .field("remaining_bytes", &self.remaining_bytes)
944 .field("has_pending_messages", &self.has_pending_messages)
945 .finish()
946 }
947}
948
949impl Object {
950 pub(crate) fn new(info: ObjectInfo, stream: stream::Stream) -> Self {
951 let has_pending_messages = info.chunks > 0;
952 Object {
953 subscription: None,
954 info,
955 remaining_bytes: VecDeque::new(),
956 has_pending_messages,
957 digest: Some(Sha256::new()),
958 subscription_future: None,
959 stream,
960 }
961 }
962
963 pub fn info(&self) -> &ObjectInfo {
965 &self.info
966 }
967}
968
969impl tokio::io::AsyncRead for Object {
970 fn poll_read(
971 mut self: std::pin::Pin<&mut Self>,
972 cx: &mut std::task::Context<'_>,
973 buf: &mut tokio::io::ReadBuf<'_>,
974 ) -> std::task::Poll<std::io::Result<()>> {
975 let (buf1, _buf2) = self.remaining_bytes.as_slices();
976 if !buf1.is_empty() {
977 let len = cmp::min(buf.remaining(), buf1.len());
978 buf.put_slice(&buf1[..len]);
979 self.remaining_bytes.drain(..len);
980 return Poll::Ready(Ok(()));
981 }
982
983 if self.has_pending_messages {
984 if self.subscription.is_none() {
985 let future = match self.subscription_future.as_mut() {
986 Some(future) => future,
987 None => {
988 let stream = self.stream.clone();
989 let bucket = self.info.bucket.clone();
990 let nuid = self.info.nuid.clone();
991 self.subscription_future.insert(Box::pin(async move {
992 stream
993 .create_consumer(OrderedConfig {
994 deliver_subject: stream.context.client.new_inbox(),
995 filter_subject: format!("$O.{bucket}.C.{nuid}"),
996 ..Default::default()
997 })
998 .await
999 .unwrap()
1000 .messages()
1001 .await
1002 }))
1003 }
1004 };
1005 match future.as_mut().poll(cx) {
1006 Poll::Ready(subscription) => {
1007 self.subscription = Some(subscription.unwrap());
1008 }
1009 Poll::Pending => (),
1010 }
1011 }
1012 if let Some(subscription) = self.subscription.as_mut() {
1013 match subscription.poll_next_unpin(cx) {
1014 Poll::Ready(message) => match message {
1015 Some(message) => {
1016 let message = message.map_err(|err| {
1017 std::io::Error::other(format!(
1018 "error from JetStream subscription: {err}"
1019 ))
1020 })?;
1021 let len = cmp::min(buf.remaining(), message.payload.len());
1022 buf.put_slice(&message.payload[..len]);
1023 if let Some(context) = &mut self.digest {
1024 context.update(&message.payload);
1025 }
1026 self.remaining_bytes.extend(&message.payload[len..]);
1027
1028 let info = message.info().map_err(|err| {
1029 std::io::Error::other(format!(
1030 "error from JetStream subscription: {err}"
1031 ))
1032 })?;
1033 if info.pending == 0 {
1034 let digest = self.digest.take().map(Sha256::finish);
1035 if let Some(digest) = digest {
1036 if self
1037 .info
1038 .digest
1039 .as_ref()
1040 .map(|digest_self| {
1041 format!("SHA-256={}", URL_SAFE.encode(digest))
1042 != *digest_self
1043 })
1044 .unwrap_or(false)
1045 {
1046 return Poll::Ready(Err(std::io::Error::new(
1047 std::io::ErrorKind::InvalidData,
1048 "wrong digest",
1049 )));
1050 }
1051 } else {
1052 return Poll::Ready(Err(std::io::Error::new(
1053 std::io::ErrorKind::InvalidData,
1054 "digest should be Some",
1055 )));
1056 }
1057 self.has_pending_messages = false;
1058 self.subscription = None;
1059 }
1060 Poll::Ready(Ok(()))
1061 }
1062 None => Poll::Ready(Err(std::io::Error::other(
1063 "subscription ended before reading whole object",
1064 ))),
1065 },
1066 Poll::Pending => Poll::Pending,
1067 }
1068 } else {
1069 Poll::Pending
1070 }
1071 } else {
1072 Poll::Ready(Ok(()))
1073 }
1074 }
1075}
1076
1077#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
1078pub struct ObjectOptions {
1079 pub link: Option<ObjectLink>,
1080 pub max_chunk_size: Option<usize>,
1081}
1082
1083#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
1085pub struct ObjectInfo {
1086 pub name: String,
1088 #[serde(default)]
1090 pub description: Option<String>,
1091 #[serde(default)]
1093 pub metadata: HashMap<String, String>,
1094 #[serde(default)]
1096 pub headers: Option<HeaderMap>,
1097 #[serde(default)]
1099 pub options: Option<ObjectOptions>,
1100 pub bucket: String,
1102 #[serde(default)]
1104 pub nuid: String,
1105 #[serde(default)]
1107 pub size: usize,
1108 #[serde(default)]
1110 pub chunks: usize,
1111 #[serde(default, with = "rfc3339::option")]
1113 #[serde(rename = "mtime")]
1114 pub modified: Option<time::OffsetDateTime>,
1115 #[serde(default, skip_serializing_if = "Option::is_none")]
1117 pub digest: Option<String>,
1118 #[serde(default, skip_serializing_if = "is_default")]
1120 pub deleted: bool,
1121}
1122
1123fn is_default<T: Default + Eq>(t: &T) -> bool {
1124 t == &T::default()
1125}
1126#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
1128pub struct ObjectLink {
1129 pub name: Option<String>,
1131 pub bucket: String,
1133}
1134
1135#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
1136pub struct UpdateMetadata {
1137 pub name: String,
1139 pub description: Option<String>,
1141 #[serde(default)]
1143 pub metadata: HashMap<String, String>,
1144 pub headers: Option<HeaderMap>,
1146}
1147
1148#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
1150pub struct ObjectMetadata {
1151 pub name: String,
1153 pub description: Option<String>,
1155 pub chunk_size: Option<usize>,
1157 #[serde(default)]
1159 pub metadata: HashMap<String, String>,
1160 pub headers: Option<HeaderMap>,
1162}
1163
1164impl From<&str> for ObjectMetadata {
1165 fn from(s: &str) -> ObjectMetadata {
1166 ObjectMetadata {
1167 name: s.to_string(),
1168 ..Default::default()
1169 }
1170 }
1171}
1172
1173pub trait AsObjectInfo {
1174 fn as_info(&self) -> &ObjectInfo;
1175}
1176
1177impl AsObjectInfo for &Object {
1178 fn as_info(&self) -> &ObjectInfo {
1179 &self.info
1180 }
1181}
1182impl AsObjectInfo for &ObjectInfo {
1183 fn as_info(&self) -> &ObjectInfo {
1184 self
1185 }
1186}
1187
1188impl From<ObjectInfo> for ObjectMetadata {
1189 fn from(info: ObjectInfo) -> Self {
1190 ObjectMetadata {
1191 name: info.name,
1192 description: info.description,
1193 metadata: info.metadata,
1194 headers: info.headers,
1195 chunk_size: None,
1196 }
1197 }
1198}
1199
1200#[derive(Debug, PartialEq, Clone)]
1201pub enum UpdateMetadataErrorKind {
1202 InvalidName,
1203 NotFound,
1204 TimedOut,
1205 Other,
1206 PublishMetadata,
1207 NameAlreadyInUse,
1208 Purge,
1209}
1210
1211impl Display for UpdateMetadataErrorKind {
1212 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1213 match self {
1214 Self::InvalidName => write!(f, "invalid object name"),
1215 Self::NotFound => write!(f, "object not found"),
1216 Self::TimedOut => write!(f, "timed out"),
1217 Self::Other => write!(f, "error"),
1218 Self::PublishMetadata => {
1219 write!(f, "failed publishing metadata")
1220 }
1221 Self::NameAlreadyInUse => {
1222 write!(f, "object with updated name already exists")
1223 }
1224 Self::Purge => write!(f, "failed purging old name metadata"),
1225 }
1226 }
1227}
1228
1229impl From<InfoError> for UpdateMetadataError {
1230 fn from(error: InfoError) -> Self {
1231 match error.kind() {
1232 InfoErrorKind::InvalidName => {
1233 UpdateMetadataError::new(UpdateMetadataErrorKind::InvalidName)
1234 }
1235 InfoErrorKind::NotFound => UpdateMetadataError::new(UpdateMetadataErrorKind::NotFound),
1236 InfoErrorKind::Other => {
1237 UpdateMetadataError::with_source(UpdateMetadataErrorKind::Other, error)
1238 }
1239 InfoErrorKind::TimedOut => UpdateMetadataError::new(UpdateMetadataErrorKind::TimedOut),
1240 }
1241 }
1242}
1243
1244pub type UpdateMetadataError = Error<UpdateMetadataErrorKind>;
1245
1246#[derive(Clone, Copy, Debug, PartialEq)]
1247pub enum InfoErrorKind {
1248 InvalidName,
1249 NotFound,
1250 Other,
1251 TimedOut,
1252}
1253
1254impl Display for InfoErrorKind {
1255 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1256 match self {
1257 Self::InvalidName => write!(f, "invalid object name"),
1258 Self::Other => write!(f, "getting info failed"),
1259 Self::NotFound => write!(f, "not found"),
1260 Self::TimedOut => write!(f, "timed out"),
1261 }
1262 }
1263}
1264
1265pub type InfoError = Error<InfoErrorKind>;
1266
1267#[derive(Clone, Copy, Debug, PartialEq)]
1268pub enum GetErrorKind {
1269 InvalidName,
1270 ConsumerCreate,
1271 NotFound,
1272 BucketLink,
1273 Other,
1274 TimedOut,
1275}
1276
1277impl Display for GetErrorKind {
1278 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1279 match self {
1280 Self::ConsumerCreate => write!(f, "failed creating consumer for fetching object"),
1281 Self::Other => write!(f, "failed getting object"),
1282 Self::NotFound => write!(f, "object not found"),
1283 Self::TimedOut => write!(f, "timed out"),
1284 Self::InvalidName => write!(f, "invalid object name"),
1285 Self::BucketLink => write!(f, "object is a link to a bucket"),
1286 }
1287 }
1288}
1289
1290pub type GetError = Error<GetErrorKind>;
1291
1292crate::from_with_timeout!(GetError, GetErrorKind, ConsumerError, ConsumerErrorKind);
1293crate::from_with_timeout!(GetError, GetErrorKind, StreamError, StreamErrorKind);
1294
1295impl From<InfoError> for GetError {
1296 fn from(err: InfoError) -> Self {
1297 match err.kind() {
1298 InfoErrorKind::InvalidName => GetError::new(GetErrorKind::InvalidName),
1299 InfoErrorKind::NotFound => GetError::new(GetErrorKind::NotFound),
1300 InfoErrorKind::Other => GetError::with_source(GetErrorKind::Other, err),
1301 InfoErrorKind::TimedOut => GetError::new(GetErrorKind::TimedOut),
1302 }
1303 }
1304}
1305
1306#[derive(Clone, Copy, Debug, PartialEq)]
1307pub enum DeleteErrorKind {
1308 TimedOut,
1309 NotFound,
1310 Metadata,
1311 InvalidName,
1312 Chunks,
1313 Other,
1314}
1315
1316impl Display for DeleteErrorKind {
1317 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1318 match self {
1319 Self::TimedOut => write!(f, "timed out"),
1320 Self::Metadata => write!(f, "failed rolling up metadata"),
1321 Self::Chunks => write!(f, "failed purging chunks"),
1322 Self::Other => write!(f, "delete failed"),
1323 Self::NotFound => write!(f, "object not found"),
1324 Self::InvalidName => write!(f, "invalid object name"),
1325 }
1326 }
1327}
1328
1329pub type DeleteError = Error<DeleteErrorKind>;
1330
1331impl From<InfoError> for DeleteError {
1332 fn from(err: InfoError) -> Self {
1333 match err.kind() {
1334 InfoErrorKind::InvalidName => DeleteError::new(DeleteErrorKind::InvalidName),
1335 InfoErrorKind::NotFound => DeleteError::new(DeleteErrorKind::NotFound),
1336 InfoErrorKind::Other => DeleteError::with_source(DeleteErrorKind::Other, err),
1337 InfoErrorKind::TimedOut => DeleteError::new(DeleteErrorKind::TimedOut),
1338 }
1339 }
1340}
1341
1342crate::from_with_timeout!(DeleteError, DeleteErrorKind, PublishError, PublishErrorKind);
1343crate::from_with_timeout!(DeleteError, DeleteErrorKind, PurgeError, PurgeErrorKind);
1344
1345#[derive(Clone, Copy, Debug, PartialEq)]
1346pub enum PutErrorKind {
1347 InvalidName,
1348 ReadChunks,
1349 PublishChunks,
1350 PublishMetadata,
1351 PurgeOldChunks,
1352 TimedOut,
1353 Other,
1354}
1355
1356impl Display for PutErrorKind {
1357 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1358 match self {
1359 Self::PublishChunks => write!(f, "failed publishing object chunks"),
1360 Self::PublishMetadata => write!(f, "failed publishing metadata"),
1361 Self::PurgeOldChunks => write!(f, "failed purging old chunks"),
1362 Self::TimedOut => write!(f, "timed out"),
1363 Self::Other => write!(f, "error"),
1364 Self::InvalidName => write!(f, "invalid object name"),
1365 Self::ReadChunks => write!(f, "error while reading the buffer"),
1366 }
1367 }
1368}
1369
1370pub type PutError = Error<PutErrorKind>;
1371
1372pub type AddLinkError = Error<AddLinkErrorKind>;
1373
1374#[derive(Clone, Copy, Debug, PartialEq)]
1375pub enum AddLinkErrorKind {
1376 EmptyName,
1377 ObjectRequired,
1378 Deleted,
1379 LinkToLink,
1380 PublishMetadata,
1381 AlreadyExists,
1382 Other,
1383}
1384
1385impl Display for AddLinkErrorKind {
1386 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1387 match self {
1388 AddLinkErrorKind::ObjectRequired => write!(f, "cannot link to empty Object"),
1389 AddLinkErrorKind::Deleted => write!(f, "cannot link a deleted Object"),
1390 AddLinkErrorKind::LinkToLink => write!(f, "cannot link to another link"),
1391 AddLinkErrorKind::EmptyName => write!(f, "link name cannot be empty"),
1392 AddLinkErrorKind::PublishMetadata => write!(f, "failed publishing link metadata"),
1393 AddLinkErrorKind::Other => write!(f, "error"),
1394 AddLinkErrorKind::AlreadyExists => write!(f, "object already exists"),
1395 }
1396 }
1397}
1398
1399type PublishMetadataError = Error<PublishMetadataErrorKind>;
1400
1401#[derive(Clone, Copy, Debug, PartialEq)]
1402enum PublishMetadataErrorKind {
1403 PublishMetadata,
1404 Other,
1405}
1406
1407impl Display for PublishMetadataErrorKind {
1408 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1409 match self {
1410 PublishMetadataErrorKind::PublishMetadata => write!(f, "failed to publish metadata"),
1411 PublishMetadataErrorKind::Other => write!(f, "error"),
1412 }
1413 }
1414}
1415
1416impl From<PublishMetadataError> for AddLinkError {
1417 fn from(error: PublishMetadataError) -> Self {
1418 match error.kind {
1419 PublishMetadataErrorKind::PublishMetadata => {
1420 AddLinkError::new(AddLinkErrorKind::PublishMetadata)
1421 }
1422 PublishMetadataErrorKind::Other => {
1423 AddLinkError::with_source(AddLinkErrorKind::Other, error)
1424 }
1425 }
1426 }
1427}
1428impl From<PublishMetadataError> for PutError {
1429 fn from(error: PublishMetadataError) -> Self {
1430 match error.kind {
1431 PublishMetadataErrorKind::PublishMetadata => {
1432 PutError::new(PutErrorKind::PublishMetadata)
1433 }
1434 PublishMetadataErrorKind::Other => PutError::with_source(PutErrorKind::Other, error),
1435 }
1436 }
1437}
1438
1439#[derive(Clone, Copy, Debug, PartialEq)]
1440pub enum WatchErrorKind {
1441 TimedOut,
1442 ConsumerCreate,
1443 Other,
1444}
1445
1446impl Display for WatchErrorKind {
1447 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1448 match self {
1449 Self::ConsumerCreate => write!(f, "watch consumer creation failed"),
1450 Self::Other => write!(f, "watch failed"),
1451 Self::TimedOut => write!(f, "timed out"),
1452 }
1453 }
1454}
1455
1456pub type WatchError = Error<WatchErrorKind>;
1457
1458crate::from_with_timeout!(WatchError, WatchErrorKind, ConsumerError, ConsumerErrorKind);
1459crate::from_with_timeout!(WatchError, WatchErrorKind, StreamError, StreamErrorKind);
1460
1461pub type ListError = WatchError;
1462pub type ListErrorKind = WatchErrorKind;
1463
1464#[derive(Clone, Copy, Debug, PartialEq)]
1465pub enum SealErrorKind {
1466 TimedOut,
1467 Other,
1468 Info,
1469 Update,
1470}
1471
1472impl Display for SealErrorKind {
1473 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1474 match self {
1475 Self::TimedOut => write!(f, "timed out"),
1476 Self::Other => write!(f, "seal failed"),
1477 Self::Info => write!(f, "failed getting stream info before sealing bucket"),
1478 Self::Update => write!(f, "failed sealing the bucket"),
1479 }
1480 }
1481}
1482
1483pub type SealError = Error<SealErrorKind>;
1484
1485impl From<super::context::UpdateStreamError> for SealError {
1486 fn from(err: super::context::UpdateStreamError) -> Self {
1487 match err.kind() {
1488 super::context::CreateStreamErrorKind::TimedOut => {
1489 SealError::new(SealErrorKind::TimedOut)
1490 }
1491 _ => SealError::with_source(SealErrorKind::Update, err),
1492 }
1493 }
1494}
1495
1496#[derive(Clone, Copy, Debug, PartialEq)]
1497pub enum WatcherErrorKind {
1498 ConsumerError,
1499 Other,
1500}
1501
1502impl Display for WatcherErrorKind {
1503 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1504 match self {
1505 Self::ConsumerError => write!(f, "watcher consumer error"),
1506 Self::Other => write!(f, "watcher error"),
1507 }
1508 }
1509}
1510
1511pub type WatcherError = Error<WatcherErrorKind>;
1512
1513impl From<OrderedError> for WatcherError {
1514 fn from(err: OrderedError) -> Self {
1515 WatcherError::with_source(WatcherErrorKind::ConsumerError, err)
1516 }
1517}
1518
1519pub type ListerError = WatcherError;
1520pub type ListerErrorKind = WatcherErrorKind;