Skip to main content

async_nats/jetstream/object_store/
mod.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14//! Object Store module
15use std::collections::{HashMap, VecDeque};
16use std::fmt::Display;
17use std::{cmp, str::FromStr, task::Poll, time::Duration};
18
19use crate::crypto::Sha256;
20use crate::subject::Subject;
21use crate::{HeaderMap, HeaderValue};
22use base64::engine::general_purpose::URL_SAFE;
23use base64::engine::Engine;
24use bytes::BytesMut;
25use futures_util::future::BoxFuture;
26use std::sync::LazyLock;
27use tokio::io::AsyncReadExt;
28
29use futures_util::{Stream, StreamExt};
30use regex::Regex;
31use serde::{Deserialize, Serialize};
32use tracing::{debug, trace};
33
34use super::consumer::push::{OrderedConfig, OrderedError};
35use super::consumer::{DeliverPolicy, StreamError, StreamErrorKind};
36use super::context::{PublishError, PublishErrorKind};
37use super::stream::{self, ConsumerError, ConsumerErrorKind, PurgeError, PurgeErrorKind};
38use super::{consumer::push::Ordered, stream::StorageType};
39use crate::error::Error;
40use time::{serde::rfc3339, OffsetDateTime};
41
42const DEFAULT_CHUNK_SIZE: usize = 128 * 1024;
43const NATS_ROLLUP: &str = "Nats-Rollup";
44const ROLLUP_SUBJECT: &str = "sub";
45
46static BUCKET_NAME_RE: LazyLock<Regex> =
47    LazyLock::new(|| Regex::new(r"\A[a-zA-Z0-9_-]+\z").unwrap());
48static OBJECT_NAME_RE: LazyLock<Regex> =
49    LazyLock::new(|| Regex::new(r"\A[-/_=\.a-zA-Z0-9]+\z").unwrap());
50
51pub(crate) fn is_valid_bucket_name(bucket_name: &str) -> bool {
52    BUCKET_NAME_RE.is_match(bucket_name)
53}
54
55pub(crate) fn is_valid_object_name(object_name: &str) -> bool {
56    if object_name.is_empty() || object_name.starts_with('.') || object_name.ends_with('.') {
57        return false;
58    }
59
60    OBJECT_NAME_RE.is_match(object_name)
61}
62
63pub(crate) fn encode_object_name(object_name: &str) -> String {
64    URL_SAFE.encode(object_name)
65}
66
67/// Configuration values for object store buckets.
68#[derive(Debug, Default, Clone, Serialize, Deserialize)]
69pub struct Config {
70    /// Name of the storage bucket.
71    pub bucket: String,
72    /// A short description of the purpose of this storage bucket.
73    pub description: Option<String>,
74    /// Maximum age of any value in the bucket, expressed in nanoseconds
75    #[serde(default, with = "serde_nanos")]
76    pub max_age: Duration,
77    /// How large the storage bucket may become in total bytes.
78    pub max_bytes: i64,
79    /// The type of storage backend, `File` (default) and `Memory`
80    pub storage: StorageType,
81    /// How many replicas to keep for each value in a cluster, maximum 5.
82    pub num_replicas: usize,
83    /// Sets compression of the underlying stream.
84    pub compression: bool,
85    // Cluster and tag placement.
86    pub placement: Option<stream::Placement>,
87}
88
89/// A blob store capable of storing large objects efficiently in streams.
90#[derive(Clone)]
91pub struct ObjectStore {
92    pub(crate) name: String,
93    pub(crate) stream: crate::jetstream::stream::Stream,
94}
95
96impl ObjectStore {
97    /// Gets an [Object] from the [ObjectStore].
98    ///
99    /// [Object] implements [tokio::io::AsyncRead] that allows
100    /// to read the data from Object Store.
101    ///
102    /// # Examples
103    ///
104    /// ```no_run
105    /// # #[tokio::main]
106    /// # async fn main() -> Result<(), async_nats::Error> {
107    /// use tokio::io::AsyncReadExt;
108    /// let client = async_nats::connect("demo.nats.io").await?;
109    /// let jetstream = async_nats::jetstream::new(client);
110    ///
111    /// let bucket = jetstream.get_object_store("store").await?;
112    /// let mut object = bucket.get("FOO").await?;
113    ///
114    /// // Object implements `tokio::io::AsyncRead`.
115    /// let mut bytes = vec![];
116    /// object.read_to_end(&mut bytes).await?;
117    /// # Ok(())
118    /// # }
119    /// ```
120    pub async fn get<T: AsRef<str> + Send>(&self, object_name: T) -> Result<Object, GetError> {
121        self.get_impl(object_name).await
122    }
123
124    fn get_impl<'bucket, 'future, T>(
125        &'bucket self,
126        object_name: T,
127    ) -> BoxFuture<'future, Result<Object, GetError>>
128    where
129        T: AsRef<str> + Send + 'future,
130        'bucket: 'future,
131    {
132        Box::pin(async move {
133            let object_info = self.info(object_name).await?;
134            if object_info.deleted {
135                return Err(GetError::new(GetErrorKind::NotFound));
136            }
137            if let Some(ref options) = object_info.options {
138                if let Some(link) = options.link.as_ref() {
139                    if let Some(link_name) = link.name.as_ref() {
140                        let link_name = link_name.clone();
141                        debug!("getting object via link");
142                        if link.bucket == self.name {
143                            return self.get_impl(link_name).await;
144                        } else {
145                            let bucket = self
146                                .stream
147                                .context
148                                .get_object_store(&link.bucket)
149                                .await
150                                .map_err(|err| {
151                                GetError::with_source(GetErrorKind::Other, err)
152                            })?;
153                            let object = bucket.get_impl(&link_name).await?;
154                            return Ok(object);
155                        }
156                    } else {
157                        return Err(GetError::new(GetErrorKind::BucketLink));
158                    }
159                }
160            }
161
162            debug!("not a link. Getting the object");
163            Ok(Object::new(object_info, self.stream.clone()))
164        })
165    }
166
167    /// Deletes an [Object] from the [ObjectStore].
168    ///
169    /// # Examples
170    ///
171    /// ```no_run
172    /// # #[tokio::main]
173    /// # async fn main() -> Result<(), async_nats::Error> {
174    /// let client = async_nats::connect("demo.nats.io").await?;
175    /// let jetstream = async_nats::jetstream::new(client);
176    ///
177    /// let bucket = jetstream.get_object_store("store").await?;
178    /// bucket.delete("FOO").await?;
179    /// # Ok(())
180    /// # }
181    /// ```
182    pub async fn delete<T: AsRef<str>>(&self, object_name: T) -> Result<(), DeleteError> {
183        let object_name = object_name.as_ref();
184        let mut object_info = self.info(object_name).await?;
185        object_info.chunks = 0;
186        object_info.size = 0;
187        object_info.deleted = true;
188
189        let data = serde_json::to_vec(&object_info).map_err(|err| {
190            DeleteError::with_source(
191                DeleteErrorKind::Other,
192                format!("failed deserializing object info: {err}"),
193            )
194        })?;
195
196        let mut headers = HeaderMap::default();
197        headers.insert(
198            NATS_ROLLUP,
199            HeaderValue::from_str(ROLLUP_SUBJECT).map_err(|err| {
200                DeleteError::with_source(
201                    DeleteErrorKind::Other,
202                    format!("failed parsing header: {err}"),
203                )
204            })?,
205        );
206
207        let subject = format!("$O.{}.M.{}", &self.name, encode_object_name(object_name));
208
209        self.stream
210            .context
211            .publish_with_headers(subject, headers, data.into())
212            .await?
213            .await?;
214
215        let chunk_subject = format!("$O.{}.C.{}", self.name, object_info.nuid);
216
217        self.stream.purge().filter(&chunk_subject).await?;
218
219        Ok(())
220    }
221
222    /// Retrieves [Object] [ObjectInfo].
223    ///
224    /// # Examples
225    ///
226    /// ```no_run
227    /// # #[tokio::main]
228    /// # async fn main() -> Result<(), async_nats::Error> {
229    /// let client = async_nats::connect("demo.nats.io").await?;
230    /// let jetstream = async_nats::jetstream::new(client);
231    ///
232    /// let bucket = jetstream.get_object_store("store").await?;
233    /// let info = bucket.info("FOO").await?;
234    /// # Ok(())
235    /// # }
236    /// ```
237    pub async fn info<T: AsRef<str>>(&self, object_name: T) -> Result<ObjectInfo, InfoError> {
238        let object_name = object_name.as_ref();
239        let object_name = encode_object_name(object_name);
240        if !is_valid_object_name(&object_name) {
241            return Err(InfoError::new(InfoErrorKind::InvalidName));
242        }
243
244        // Grab last meta value we have.
245        let subject = format!("$O.{}.M.{}", &self.name, &object_name);
246
247        // FIXME(jrm): we should use direct get here when possible.
248        let message = self
249            .stream
250            .get_last_raw_message_by_subject(subject.as_str())
251            .await
252            .map_err(|err| match err.kind() {
253                stream::LastRawMessageErrorKind::NoMessageFound => {
254                    InfoError::new(InfoErrorKind::NotFound)
255                }
256                _ => InfoError::with_source(InfoErrorKind::Other, err),
257            })?;
258        let object_info =
259            serde_json::from_slice::<ObjectInfo>(&message.payload).map_err(|err| {
260                InfoError::with_source(
261                    InfoErrorKind::Other,
262                    format!("failed to decode info payload: {err}"),
263                )
264            })?;
265
266        Ok(object_info)
267    }
268
269    /// Puts an [Object] into the [ObjectStore].
270    /// This method implements `tokio::io::AsyncRead`.
271    ///
272    /// # Examples
273    ///
274    /// ```no_run
275    /// # #[tokio::main]
276    /// # async fn main() -> Result<(), async_nats::Error> {
277    /// let client = async_nats::connect("demo.nats.io").await?;
278    /// let jetstream = async_nats::jetstream::new(client);
279    ///
280    /// let bucket = jetstream.get_object_store("store").await?;
281    /// let mut file = tokio::fs::File::open("foo.txt").await?;
282    /// bucket.put("file", &mut file).await.unwrap();
283    /// # Ok(())
284    /// # }
285    /// ```
286    pub async fn put<T>(
287        &self,
288        meta: T,
289        data: &mut (impl tokio::io::AsyncRead + std::marker::Unpin),
290    ) -> Result<ObjectInfo, PutError>
291    where
292        ObjectMetadata: From<T>,
293    {
294        let object_meta: ObjectMetadata = meta.into();
295
296        // Fetch any existing object info, if there is any for later use.
297        let maybe_existing_object_info = (self.info(&object_meta.name).await).ok();
298
299        let object_nuid = crate::id_generator::next();
300        let chunk_subject = Subject::from(format!("$O.{}.C.{}", &self.name, &object_nuid));
301
302        let mut object_chunks = 0;
303        let mut object_size = 0;
304
305        let chunk_size = object_meta.chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE);
306        let mut buffer = BytesMut::with_capacity(chunk_size);
307        let mut sha256 = Sha256::new();
308
309        loop {
310            let n = data
311                .read_buf(&mut buffer)
312                .await
313                .map_err(|err| PutError::with_source(PutErrorKind::ReadChunks, err))?;
314
315            if n == 0 {
316                break;
317            }
318
319            let payload = buffer.split().freeze();
320            sha256.update(&payload);
321
322            object_size += payload.len();
323            object_chunks += 1;
324
325            self.stream
326                .context
327                .publish(chunk_subject.clone(), payload)
328                .await
329                .map_err(|err| {
330                    PutError::with_source(
331                        PutErrorKind::PublishChunks,
332                        format!("failed chunk publish: {err}"),
333                    )
334                })?
335                .await
336                .map_err(|err| {
337                    PutError::with_source(
338                        PutErrorKind::PublishChunks,
339                        format!("failed getting chunk ack: {err}"),
340                    )
341                })?;
342        }
343        let digest = sha256.finish();
344
345        let encoded_object_name = encode_object_name(&object_meta.name);
346        if !is_valid_object_name(&encoded_object_name) {
347            return Err(PutError::new(PutErrorKind::InvalidName));
348        }
349        let subject = format!("$O.{}.M.{}", &self.name, &encoded_object_name);
350
351        let object_info = ObjectInfo {
352            name: object_meta.name,
353            description: object_meta.description,
354            options: Some(ObjectOptions {
355                max_chunk_size: Some(chunk_size),
356                link: None,
357            }),
358            bucket: self.name.clone(),
359            nuid: object_nuid.to_string(),
360            chunks: object_chunks,
361            size: object_size,
362            digest: Some(format!("SHA-256={}", URL_SAFE.encode(digest))),
363            modified: Some(OffsetDateTime::now_utc()),
364            deleted: false,
365            metadata: object_meta.metadata,
366            headers: object_meta.headers,
367        };
368
369        let mut headers = HeaderMap::new();
370        headers.insert(
371            NATS_ROLLUP,
372            ROLLUP_SUBJECT.parse::<HeaderValue>().map_err(|err| {
373                PutError::with_source(PutErrorKind::Other, format!("failed parsing header: {err}"))
374            })?,
375        );
376        let data = serde_json::to_vec(&object_info).map_err(|err| {
377            PutError::with_source(
378                PutErrorKind::Other,
379                format!("failed serializing object info: {err}"),
380            )
381        })?;
382
383        // publish meta.
384        self.stream
385            .context
386            .publish_with_headers(subject, headers, data.into())
387            .await
388            .map_err(|err| {
389                PutError::with_source(
390                    PutErrorKind::PublishMetadata,
391                    format!("failed publishing metadata: {err}"),
392                )
393            })?
394            .await
395            .map_err(|err| {
396                PutError::with_source(
397                    PutErrorKind::PublishMetadata,
398                    format!("failed ack from metadata publish: {err}"),
399                )
400            })?;
401
402        // Purge any old chunks.
403        if let Some(existing_object_info) = maybe_existing_object_info {
404            let chunk_subject = format!("$O.{}.C.{}", &self.name, &existing_object_info.nuid);
405
406            self.stream
407                .purge()
408                .filter(&chunk_subject)
409                .await
410                .map_err(|err| PutError::with_source(PutErrorKind::PurgeOldChunks, err))?;
411        }
412
413        Ok(object_info)
414    }
415
416    /// Creates a [Watch] stream over changes in the [ObjectStore].
417    ///
418    /// # Examples
419    ///
420    /// ```no_run
421    /// # #[tokio::main]
422    /// # async fn main() -> Result<(), async_nats::Error> {
423    /// use futures_util::StreamExt;
424    /// let client = async_nats::connect("demo.nats.io").await?;
425    /// let jetstream = async_nats::jetstream::new(client);
426    ///
427    /// let bucket = jetstream.get_object_store("store").await?;
428    /// let mut watcher = bucket.watch().await.unwrap();
429    /// while let Some(object) = watcher.next().await {
430    ///     println!("detected changes in {:?}", object?);
431    /// }
432    /// # Ok(())
433    /// # }
434    /// ```
435    pub async fn watch(&self) -> Result<Watch, WatchError> {
436        self.watch_with_deliver_policy(DeliverPolicy::New).await
437    }
438
439    /// Creates a [Watch] stream over changes in the [ObjectStore] which yields values whenever
440    /// there are changes for that key with as well as last value.
441    pub async fn watch_with_history(&self) -> Result<Watch, WatchError> {
442        self.watch_with_deliver_policy(DeliverPolicy::LastPerSubject)
443            .await
444    }
445
446    async fn watch_with_deliver_policy(
447        &self,
448        deliver_policy: DeliverPolicy,
449    ) -> Result<Watch, WatchError> {
450        let subject = format!("$O.{}.M.>", self.name);
451        let ordered = self
452            .stream
453            .create_consumer(crate::jetstream::consumer::push::OrderedConfig {
454                deliver_policy,
455                deliver_subject: self.stream.context.client.new_inbox(),
456                description: Some("object store watcher".to_string()),
457                filter_subject: subject,
458                ..Default::default()
459            })
460            .await?;
461        Ok(Watch {
462            subscription: ordered.messages().await?,
463        })
464    }
465
466    /// Returns a [List] stream with all not deleted [Objects][Object] in the [ObjectStore].
467    ///
468    /// # Examples
469    ///
470    /// ```no_run
471    /// # #[tokio::main]
472    /// # async fn main() -> Result<(), async_nats::Error> {
473    /// use futures_util::StreamExt;
474    /// let client = async_nats::connect("demo.nats.io").await?;
475    /// let jetstream = async_nats::jetstream::new(client);
476    ///
477    /// let bucket = jetstream.get_object_store("store").await?;
478    /// let mut list = bucket.list().await.unwrap();
479    /// while let Some(object) = list.next().await {
480    ///     println!("object {:?}", object?);
481    /// }
482    /// # Ok(())
483    /// # }
484    /// ```
485    pub async fn list(&self) -> Result<List, ListError> {
486        trace!("starting Object List");
487        let subject = format!("$O.{}.M.>", self.name);
488        let ordered = self
489            .stream
490            .create_consumer(crate::jetstream::consumer::push::OrderedConfig {
491                deliver_policy: super::consumer::DeliverPolicy::All,
492                deliver_subject: self.stream.context.client.new_inbox(),
493                description: Some("object store list".to_string()),
494                filter_subject: subject,
495                ..Default::default()
496            })
497            .await?;
498        Ok(List {
499            done: ordered.info.num_pending == 0,
500            subscription: Some(ordered.messages().await?),
501        })
502    }
503
504    /// Seals a [ObjectStore], preventing any further changes to it or its [Objects][Object].
505    ///
506    /// # Examples
507    ///
508    /// ```no_run
509    /// # #[tokio::main]
510    /// # async fn main() -> Result<(), async_nats::Error> {
511    /// use futures_util::StreamExt;
512    /// let client = async_nats::connect("demo.nats.io").await?;
513    /// let jetstream = async_nats::jetstream::new(client);
514    ///
515    /// let mut bucket = jetstream.get_object_store("store").await?;
516    /// bucket.seal().await.unwrap();
517    /// # Ok(())
518    /// # }
519    /// ```
520    pub async fn seal(&mut self) -> Result<(), SealError> {
521        let mut stream_config = self
522            .stream
523            .info()
524            .await
525            .map_err(|err| SealError::with_source(SealErrorKind::Info, err))?
526            .to_owned();
527        stream_config.config.sealed = true;
528
529        self.stream
530            .context
531            .update_stream(&stream_config.config)
532            .await?;
533        Ok(())
534    }
535
536    /// Updates [Object] [ObjectMetadata].
537    ///
538    /// # Examples
539    ///
540    /// ```no_run
541    /// # #[tokio::main]
542    /// # async fn main() -> Result<(), async_nats::Error> {
543    /// use async_nats::jetstream::object_store;
544    /// let client = async_nats::connect("demo.nats.io").await?;
545    /// let jetstream = async_nats::jetstream::new(client);
546    ///
547    /// let mut bucket = jetstream.get_object_store("store").await?;
548    /// bucket
549    ///     .update_metadata(
550    ///         "object",
551    ///         object_store::UpdateMetadata {
552    ///             name: "new_name".to_string(),
553    ///             description: Some("a new description".to_string()),
554    ///             ..Default::default()
555    ///         },
556    ///     )
557    ///     .await?;
558    /// # Ok(())
559    /// # }
560    /// ```
561    pub async fn update_metadata<A: AsRef<str>>(
562        &self,
563        object: A,
564        metadata: UpdateMetadata,
565    ) -> Result<ObjectInfo, UpdateMetadataError> {
566        let mut info = self.info(object.as_ref()).await?;
567
568        // If name is being update, we need to check if other metadata with it already exists.
569        // If does, error. Otherwise, purge old name metadata.
570        if metadata.name != info.name {
571            tracing::info!("new metadata name is different than then old one");
572            if !is_valid_object_name(&metadata.name) {
573                return Err(UpdateMetadataError::new(
574                    UpdateMetadataErrorKind::InvalidName,
575                ));
576            }
577            match self.info(&metadata.name).await {
578                Ok(_) => {
579                    return Err(UpdateMetadataError::new(
580                        UpdateMetadataErrorKind::NameAlreadyInUse,
581                    ))
582                }
583                Err(err) => match err.kind() {
584                    InfoErrorKind::NotFound => {
585                        tracing::info!("purging old metadata: {}", info.name);
586                        self.stream
587                            .purge()
588                            .filter(format!(
589                                "$O.{}.M.{}",
590                                self.name,
591                                encode_object_name(&info.name)
592                            ))
593                            .await
594                            .map_err(|err| {
595                                UpdateMetadataError::with_source(
596                                    UpdateMetadataErrorKind::Purge,
597                                    err,
598                                )
599                            })?;
600                    }
601                    _ => {
602                        return Err(UpdateMetadataError::with_source(
603                            UpdateMetadataErrorKind::Other,
604                            err,
605                        ))
606                    }
607                },
608            }
609        }
610
611        info.name = metadata.name;
612        info.description = metadata.description;
613
614        let name = encode_object_name(&info.name);
615        let subject = format!("$O.{}.M.{}", &self.name, &name);
616
617        let mut headers = HeaderMap::new();
618        headers.insert(
619            NATS_ROLLUP,
620            ROLLUP_SUBJECT.parse::<HeaderValue>().map_err(|err| {
621                UpdateMetadataError::with_source(
622                    UpdateMetadataErrorKind::Other,
623                    format!("failed parsing header: {err}"),
624                )
625            })?,
626        );
627        let data = serde_json::to_vec(&info).map_err(|err| {
628            UpdateMetadataError::with_source(
629                UpdateMetadataErrorKind::Other,
630                format!("failed serializing object info: {err}"),
631            )
632        })?;
633
634        // publish meta.
635        self.stream
636            .context
637            .publish_with_headers(subject, headers, data.into())
638            .await
639            .map_err(|err| {
640                UpdateMetadataError::with_source(
641                    UpdateMetadataErrorKind::PublishMetadata,
642                    format!("failed publishing metadata: {err}"),
643                )
644            })?
645            .await
646            .map_err(|err| {
647                UpdateMetadataError::with_source(
648                    UpdateMetadataErrorKind::PublishMetadata,
649                    format!("failed ack from metadata publish: {err}"),
650                )
651            })?;
652
653        Ok(info)
654    }
655
656    /// Adds a link to an [Object].
657    /// It creates a new [Object] in the [ObjectStore] that points to another [Object]
658    /// and does not have any contents on it's own.
659    /// Links are automatically followed (one level deep) when calling [ObjectStore::get].
660    ///
661    /// # Examples
662    ///
663    /// ```no_run
664    /// # #[tokio::main]
665    /// # async fn main() -> Result<(), async_nats::Error> {
666    /// use async_nats::jetstream::object_store;
667    /// let client = async_nats::connect("demo.nats.io").await?;
668    /// let jetstream = async_nats::jetstream::new(client);
669    /// let bucket = jetstream.get_object_store("bucket").await?;
670    /// let object = bucket.get("object").await?;
671    /// bucket.add_link("link_to_object", &object).await?;
672    /// # Ok(())
673    /// # }
674    /// ```
675    pub async fn add_link<T, O>(&self, name: T, object: O) -> Result<ObjectInfo, AddLinkError>
676    where
677        T: ToString,
678        O: AsObjectInfo,
679    {
680        let object = object.as_info();
681        let name = name.to_string();
682        if name.is_empty() {
683            return Err(AddLinkError::new(AddLinkErrorKind::EmptyName));
684        }
685        if object.name.is_empty() {
686            return Err(AddLinkError::new(AddLinkErrorKind::ObjectRequired));
687        }
688        if object.deleted {
689            return Err(AddLinkError::new(AddLinkErrorKind::Deleted));
690        }
691        if let Some(ref options) = object.options {
692            if options.link.is_some() {
693                return Err(AddLinkError::new(AddLinkErrorKind::LinkToLink));
694            }
695        }
696        match self.info(&name).await {
697            Ok(info) => {
698                if let Some(options) = info.options {
699                    if options.link.is_none() {
700                        return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists));
701                    }
702                } else {
703                    return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists));
704                }
705            }
706            Err(err) if err.kind() != InfoErrorKind::NotFound => {
707                return Err(AddLinkError::with_source(AddLinkErrorKind::Other, err))
708            }
709            _ => (),
710        }
711
712        let info = ObjectInfo {
713            name,
714            description: None,
715            options: Some(ObjectOptions {
716                link: Some(ObjectLink {
717                    name: Some(object.name.clone()),
718                    bucket: object.bucket.clone(),
719                }),
720                max_chunk_size: None,
721            }),
722            bucket: self.name.clone(),
723            nuid: crate::id_generator::next(),
724            size: 0,
725            chunks: 0,
726            modified: Some(OffsetDateTime::now_utc()),
727            digest: None,
728            deleted: false,
729            metadata: HashMap::default(),
730            headers: None,
731        };
732        publish_meta(self, &info).await?;
733        Ok(info)
734    }
735
736    /// Adds a link to another [ObjectStore] bucket by creating a new [Object]
737    /// in the current [ObjectStore] that points to another [ObjectStore] and
738    /// does not contain any data.
739    ///
740    /// # Examples
741    ///
742    /// ```no_run
743    /// # #[tokio::main]
744    /// # async fn main() -> Result<(), async_nats::Error> {
745    /// use async_nats::jetstream::object_store;
746    /// let client = async_nats::connect("demo.nats.io").await?;
747    /// let jetstream = async_nats::jetstream::new(client);
748    /// let bucket = jetstream.get_object_store("bucket").await?;
749    /// bucket
750    ///     .add_bucket_link("link_to_object", "another_bucket")
751    ///     .await?;
752    /// # Ok(())
753    /// # }
754    /// ```
755    pub async fn add_bucket_link<T: ToString, U: ToString>(
756        &self,
757        name: T,
758        bucket: U,
759    ) -> Result<ObjectInfo, AddLinkError> {
760        let name = name.to_string();
761        let bucket = bucket.to_string();
762        if name.is_empty() {
763            return Err(AddLinkError::new(AddLinkErrorKind::EmptyName));
764        }
765
766        match self.info(&name).await {
767            Ok(info) => {
768                if let Some(options) = info.options {
769                    if options.link.is_none() {
770                        return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists));
771                    }
772                }
773            }
774            Err(err) if err.kind() != InfoErrorKind::NotFound => {
775                return Err(AddLinkError::with_source(AddLinkErrorKind::Other, err))
776            }
777            _ => (),
778        }
779
780        let info = ObjectInfo {
781            name: name.clone(),
782            description: None,
783            options: Some(ObjectOptions {
784                link: Some(ObjectLink { name: None, bucket }),
785                max_chunk_size: None,
786            }),
787            bucket: self.name.clone(),
788            nuid: crate::id_generator::next(),
789            size: 0,
790            chunks: 0,
791            modified: Some(OffsetDateTime::now_utc()),
792            digest: None,
793            deleted: false,
794            metadata: HashMap::default(),
795            headers: None,
796        };
797        publish_meta(self, &info).await?;
798        Ok(info)
799    }
800}
801
802async fn publish_meta(store: &ObjectStore, info: &ObjectInfo) -> Result<(), PublishMetadataError> {
803    let encoded_object_name = encode_object_name(&info.name);
804    let subject = format!("$O.{}.M.{}", &store.name, &encoded_object_name);
805
806    let mut headers = HeaderMap::new();
807    headers.insert(
808        NATS_ROLLUP,
809        ROLLUP_SUBJECT.parse::<HeaderValue>().map_err(|err| {
810            PublishMetadataError::with_source(
811                PublishMetadataErrorKind::Other,
812                format!("failed parsing header: {err}"),
813            )
814        })?,
815    );
816    let data = serde_json::to_vec(&info).map_err(|err| {
817        PublishMetadataError::with_source(
818            PublishMetadataErrorKind::Other,
819            format!("failed serializing object info: {err}"),
820        )
821    })?;
822
823    store
824        .stream
825        .context
826        .publish_with_headers(subject, headers, data.into())
827        .await
828        .map_err(|err| {
829            PublishMetadataError::with_source(
830                PublishMetadataErrorKind::PublishMetadata,
831                format!("failed publishing metadata: {err}"),
832            )
833        })?
834        .await
835        .map_err(|err| {
836            PublishMetadataError::with_source(
837                PublishMetadataErrorKind::PublishMetadata,
838                format!("failed ack from metadata publish: {err}"),
839            )
840        })?;
841    Ok(())
842}
843
844pub struct Watch {
845    subscription: crate::jetstream::consumer::push::Ordered,
846}
847
848impl Stream for Watch {
849    type Item = Result<ObjectInfo, WatcherError>;
850
851    fn poll_next(
852        mut self: std::pin::Pin<&mut Self>,
853        cx: &mut std::task::Context<'_>,
854    ) -> Poll<Option<Self::Item>> {
855        match self.subscription.poll_next_unpin(cx) {
856            Poll::Ready(message) => match message {
857                Some(message) => Poll::Ready(
858                    serde_json::from_slice::<ObjectInfo>(&message?.payload)
859                        .map_err(|err| {
860                            WatcherError::with_source(
861                                WatcherErrorKind::Other,
862                                format!("failed to deserialize object info: {err}"),
863                            )
864                        })
865                        .map_or_else(|err| Some(Err(err)), |result| Some(Ok(result))),
866                ),
867                None => Poll::Ready(None),
868            },
869            Poll::Pending => Poll::Pending,
870        }
871    }
872}
873
874pub struct List {
875    subscription: Option<crate::jetstream::consumer::push::Ordered>,
876    done: bool,
877}
878
879impl Stream for List {
880    type Item = Result<ObjectInfo, ListerError>;
881
882    fn poll_next(
883        mut self: std::pin::Pin<&mut Self>,
884        cx: &mut std::task::Context<'_>,
885    ) -> Poll<Option<Self::Item>> {
886        loop {
887            if self.done {
888                debug!("Object Store list done");
889                self.subscription = None;
890                return Poll::Ready(None);
891            }
892
893            if let Some(subscription) = self.subscription.as_mut() {
894                match subscription.poll_next_unpin(cx) {
895                    Poll::Ready(message) => match message {
896                        None => return Poll::Ready(None),
897                        Some(message) => {
898                            let message = message?;
899                            let info = message.info().map_err(|err| {
900                                ListerError::with_source(ListerErrorKind::Other, err)
901                            })?;
902                            trace!("num pending: {}", info.pending);
903                            if info.pending == 0 {
904                                self.done = true;
905                            }
906                            let response: ObjectInfo = serde_json::from_slice(&message.payload)
907                                .map_err(|err| {
908                                    ListerError::with_source(
909                                        ListerErrorKind::Other,
910                                        format!("failed deserializing object info: {err}"),
911                                    )
912                                })?;
913                            if response.deleted {
914                                continue;
915                            }
916                            return Poll::Ready(Some(Ok(response)));
917                        }
918                    },
919                    Poll::Pending => return Poll::Pending,
920                }
921            } else {
922                return Poll::Ready(None);
923            }
924        }
925    }
926}
927
928/// Represents an object stored in a bucket.
929pub struct Object {
930    pub info: ObjectInfo,
931    remaining_bytes: VecDeque<u8>,
932    has_pending_messages: bool,
933    digest: Option<Sha256>,
934    subscription: Option<crate::jetstream::consumer::push::Ordered>,
935    subscription_future: Option<BoxFuture<'static, Result<Ordered, StreamError>>>,
936    stream: crate::jetstream::stream::Stream,
937}
938
939impl std::fmt::Debug for Object {
940    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
941        f.debug_struct("Object")
942            .field("info", &self.info)
943            .field("remaining_bytes", &self.remaining_bytes)
944            .field("has_pending_messages", &self.has_pending_messages)
945            .finish()
946    }
947}
948
949impl Object {
950    pub(crate) fn new(info: ObjectInfo, stream: stream::Stream) -> Self {
951        let has_pending_messages = info.chunks > 0;
952        Object {
953            subscription: None,
954            info,
955            remaining_bytes: VecDeque::new(),
956            has_pending_messages,
957            digest: Some(Sha256::new()),
958            subscription_future: None,
959            stream,
960        }
961    }
962
963    /// Returns information about the object.
964    pub fn info(&self) -> &ObjectInfo {
965        &self.info
966    }
967}
968
969impl tokio::io::AsyncRead for Object {
970    fn poll_read(
971        mut self: std::pin::Pin<&mut Self>,
972        cx: &mut std::task::Context<'_>,
973        buf: &mut tokio::io::ReadBuf<'_>,
974    ) -> std::task::Poll<std::io::Result<()>> {
975        let (buf1, _buf2) = self.remaining_bytes.as_slices();
976        if !buf1.is_empty() {
977            let len = cmp::min(buf.remaining(), buf1.len());
978            buf.put_slice(&buf1[..len]);
979            self.remaining_bytes.drain(..len);
980            return Poll::Ready(Ok(()));
981        }
982
983        if self.has_pending_messages {
984            if self.subscription.is_none() {
985                let future = match self.subscription_future.as_mut() {
986                    Some(future) => future,
987                    None => {
988                        let stream = self.stream.clone();
989                        let bucket = self.info.bucket.clone();
990                        let nuid = self.info.nuid.clone();
991                        self.subscription_future.insert(Box::pin(async move {
992                            stream
993                                .create_consumer(OrderedConfig {
994                                    deliver_subject: stream.context.client.new_inbox(),
995                                    filter_subject: format!("$O.{bucket}.C.{nuid}"),
996                                    ..Default::default()
997                                })
998                                .await
999                                .unwrap()
1000                                .messages()
1001                                .await
1002                        }))
1003                    }
1004                };
1005                match future.as_mut().poll(cx) {
1006                    Poll::Ready(subscription) => {
1007                        self.subscription = Some(subscription.unwrap());
1008                    }
1009                    Poll::Pending => (),
1010                }
1011            }
1012            if let Some(subscription) = self.subscription.as_mut() {
1013                match subscription.poll_next_unpin(cx) {
1014                    Poll::Ready(message) => match message {
1015                        Some(message) => {
1016                            let message = message.map_err(|err| {
1017                                std::io::Error::other(format!(
1018                                    "error from JetStream subscription: {err}"
1019                                ))
1020                            })?;
1021                            let len = cmp::min(buf.remaining(), message.payload.len());
1022                            buf.put_slice(&message.payload[..len]);
1023                            if let Some(context) = &mut self.digest {
1024                                context.update(&message.payload);
1025                            }
1026                            self.remaining_bytes.extend(&message.payload[len..]);
1027
1028                            let info = message.info().map_err(|err| {
1029                                std::io::Error::other(format!(
1030                                    "error from JetStream subscription: {err}"
1031                                ))
1032                            })?;
1033                            if info.pending == 0 {
1034                                let digest = self.digest.take().map(Sha256::finish);
1035                                if let Some(digest) = digest {
1036                                    if self
1037                                        .info
1038                                        .digest
1039                                        .as_ref()
1040                                        .map(|digest_self| {
1041                                            format!("SHA-256={}", URL_SAFE.encode(digest))
1042                                                != *digest_self
1043                                        })
1044                                        .unwrap_or(false)
1045                                    {
1046                                        return Poll::Ready(Err(std::io::Error::new(
1047                                            std::io::ErrorKind::InvalidData,
1048                                            "wrong digest",
1049                                        )));
1050                                    }
1051                                } else {
1052                                    return Poll::Ready(Err(std::io::Error::new(
1053                                        std::io::ErrorKind::InvalidData,
1054                                        "digest should be Some",
1055                                    )));
1056                                }
1057                                self.has_pending_messages = false;
1058                                self.subscription = None;
1059                            }
1060                            Poll::Ready(Ok(()))
1061                        }
1062                        None => Poll::Ready(Err(std::io::Error::other(
1063                            "subscription ended before reading whole object",
1064                        ))),
1065                    },
1066                    Poll::Pending => Poll::Pending,
1067                }
1068            } else {
1069                Poll::Pending
1070            }
1071        } else {
1072            Poll::Ready(Ok(()))
1073        }
1074    }
1075}
1076
1077#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
1078pub struct ObjectOptions {
1079    pub link: Option<ObjectLink>,
1080    pub max_chunk_size: Option<usize>,
1081}
1082
1083/// Meta and instance information about an object.
1084#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
1085pub struct ObjectInfo {
1086    /// Name of the object
1087    pub name: String,
1088    /// A short human readable description of the object.
1089    #[serde(default)]
1090    pub description: Option<String>,
1091    /// Metadata for given object.
1092    #[serde(default)]
1093    pub metadata: HashMap<String, String>,
1094    /// Headers for given object.
1095    #[serde(default)]
1096    pub headers: Option<HeaderMap>,
1097    /// Link this object points to, if any.
1098    #[serde(default)]
1099    pub options: Option<ObjectOptions>,
1100    /// Name of the bucket the object is stored in.
1101    pub bucket: String,
1102    /// Unique identifier used to uniquely identify this version of the object.
1103    #[serde(default)]
1104    pub nuid: String,
1105    /// Size in bytes of the object.
1106    #[serde(default)]
1107    pub size: usize,
1108    /// Number of chunks the object is stored in.
1109    #[serde(default)]
1110    pub chunks: usize,
1111    /// Date and time the object was last modified.
1112    #[serde(default, with = "rfc3339::option")]
1113    #[serde(rename = "mtime")]
1114    pub modified: Option<time::OffsetDateTime>,
1115    /// Digest of the object stream.
1116    #[serde(default, skip_serializing_if = "Option::is_none")]
1117    pub digest: Option<String>,
1118    /// Set to true if the object has been deleted.
1119    #[serde(default, skip_serializing_if = "is_default")]
1120    pub deleted: bool,
1121}
1122
1123fn is_default<T: Default + Eq>(t: &T) -> bool {
1124    t == &T::default()
1125}
1126/// A link to another object, potentially in another bucket.
1127#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
1128pub struct ObjectLink {
1129    /// Name of the object
1130    pub name: Option<String>,
1131    /// Name of the bucket the object is stored in.
1132    pub bucket: String,
1133}
1134
1135#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
1136pub struct UpdateMetadata {
1137    /// Name of the object
1138    pub name: String,
1139    /// A short human readable description of the object.
1140    pub description: Option<String>,
1141    /// Metadata for given object.
1142    #[serde(default)]
1143    pub metadata: HashMap<String, String>,
1144    /// Headers for given object.
1145    pub headers: Option<HeaderMap>,
1146}
1147
1148/// Meta information about an object.
1149#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
1150pub struct ObjectMetadata {
1151    /// Name of the object
1152    pub name: String,
1153    /// A short human readable description of the object.
1154    pub description: Option<String>,
1155    /// Max chunk size. Default is 128k.
1156    pub chunk_size: Option<usize>,
1157    /// Metadata for given object.
1158    #[serde(default)]
1159    pub metadata: HashMap<String, String>,
1160    /// Headers for given object.
1161    pub headers: Option<HeaderMap>,
1162}
1163
1164impl From<&str> for ObjectMetadata {
1165    fn from(s: &str) -> ObjectMetadata {
1166        ObjectMetadata {
1167            name: s.to_string(),
1168            ..Default::default()
1169        }
1170    }
1171}
1172
1173pub trait AsObjectInfo {
1174    fn as_info(&self) -> &ObjectInfo;
1175}
1176
1177impl AsObjectInfo for &Object {
1178    fn as_info(&self) -> &ObjectInfo {
1179        &self.info
1180    }
1181}
1182impl AsObjectInfo for &ObjectInfo {
1183    fn as_info(&self) -> &ObjectInfo {
1184        self
1185    }
1186}
1187
1188impl From<ObjectInfo> for ObjectMetadata {
1189    fn from(info: ObjectInfo) -> Self {
1190        ObjectMetadata {
1191            name: info.name,
1192            description: info.description,
1193            metadata: info.metadata,
1194            headers: info.headers,
1195            chunk_size: None,
1196        }
1197    }
1198}
1199
1200#[derive(Debug, PartialEq, Clone)]
1201pub enum UpdateMetadataErrorKind {
1202    InvalidName,
1203    NotFound,
1204    TimedOut,
1205    Other,
1206    PublishMetadata,
1207    NameAlreadyInUse,
1208    Purge,
1209}
1210
1211impl Display for UpdateMetadataErrorKind {
1212    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1213        match self {
1214            Self::InvalidName => write!(f, "invalid object name"),
1215            Self::NotFound => write!(f, "object not found"),
1216            Self::TimedOut => write!(f, "timed out"),
1217            Self::Other => write!(f, "error"),
1218            Self::PublishMetadata => {
1219                write!(f, "failed publishing metadata")
1220            }
1221            Self::NameAlreadyInUse => {
1222                write!(f, "object with updated name already exists")
1223            }
1224            Self::Purge => write!(f, "failed purging old name metadata"),
1225        }
1226    }
1227}
1228
1229impl From<InfoError> for UpdateMetadataError {
1230    fn from(error: InfoError) -> Self {
1231        match error.kind() {
1232            InfoErrorKind::InvalidName => {
1233                UpdateMetadataError::new(UpdateMetadataErrorKind::InvalidName)
1234            }
1235            InfoErrorKind::NotFound => UpdateMetadataError::new(UpdateMetadataErrorKind::NotFound),
1236            InfoErrorKind::Other => {
1237                UpdateMetadataError::with_source(UpdateMetadataErrorKind::Other, error)
1238            }
1239            InfoErrorKind::TimedOut => UpdateMetadataError::new(UpdateMetadataErrorKind::TimedOut),
1240        }
1241    }
1242}
1243
1244pub type UpdateMetadataError = Error<UpdateMetadataErrorKind>;
1245
1246#[derive(Clone, Copy, Debug, PartialEq)]
1247pub enum InfoErrorKind {
1248    InvalidName,
1249    NotFound,
1250    Other,
1251    TimedOut,
1252}
1253
1254impl Display for InfoErrorKind {
1255    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1256        match self {
1257            Self::InvalidName => write!(f, "invalid object name"),
1258            Self::Other => write!(f, "getting info failed"),
1259            Self::NotFound => write!(f, "not found"),
1260            Self::TimedOut => write!(f, "timed out"),
1261        }
1262    }
1263}
1264
1265pub type InfoError = Error<InfoErrorKind>;
1266
1267#[derive(Clone, Copy, Debug, PartialEq)]
1268pub enum GetErrorKind {
1269    InvalidName,
1270    ConsumerCreate,
1271    NotFound,
1272    BucketLink,
1273    Other,
1274    TimedOut,
1275}
1276
1277impl Display for GetErrorKind {
1278    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1279        match self {
1280            Self::ConsumerCreate => write!(f, "failed creating consumer for fetching object"),
1281            Self::Other => write!(f, "failed getting object"),
1282            Self::NotFound => write!(f, "object not found"),
1283            Self::TimedOut => write!(f, "timed out"),
1284            Self::InvalidName => write!(f, "invalid object name"),
1285            Self::BucketLink => write!(f, "object is a link to a bucket"),
1286        }
1287    }
1288}
1289
1290pub type GetError = Error<GetErrorKind>;
1291
1292crate::from_with_timeout!(GetError, GetErrorKind, ConsumerError, ConsumerErrorKind);
1293crate::from_with_timeout!(GetError, GetErrorKind, StreamError, StreamErrorKind);
1294
1295impl From<InfoError> for GetError {
1296    fn from(err: InfoError) -> Self {
1297        match err.kind() {
1298            InfoErrorKind::InvalidName => GetError::new(GetErrorKind::InvalidName),
1299            InfoErrorKind::NotFound => GetError::new(GetErrorKind::NotFound),
1300            InfoErrorKind::Other => GetError::with_source(GetErrorKind::Other, err),
1301            InfoErrorKind::TimedOut => GetError::new(GetErrorKind::TimedOut),
1302        }
1303    }
1304}
1305
1306#[derive(Clone, Copy, Debug, PartialEq)]
1307pub enum DeleteErrorKind {
1308    TimedOut,
1309    NotFound,
1310    Metadata,
1311    InvalidName,
1312    Chunks,
1313    Other,
1314}
1315
1316impl Display for DeleteErrorKind {
1317    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1318        match self {
1319            Self::TimedOut => write!(f, "timed out"),
1320            Self::Metadata => write!(f, "failed rolling up metadata"),
1321            Self::Chunks => write!(f, "failed purging chunks"),
1322            Self::Other => write!(f, "delete failed"),
1323            Self::NotFound => write!(f, "object not found"),
1324            Self::InvalidName => write!(f, "invalid object name"),
1325        }
1326    }
1327}
1328
1329pub type DeleteError = Error<DeleteErrorKind>;
1330
1331impl From<InfoError> for DeleteError {
1332    fn from(err: InfoError) -> Self {
1333        match err.kind() {
1334            InfoErrorKind::InvalidName => DeleteError::new(DeleteErrorKind::InvalidName),
1335            InfoErrorKind::NotFound => DeleteError::new(DeleteErrorKind::NotFound),
1336            InfoErrorKind::Other => DeleteError::with_source(DeleteErrorKind::Other, err),
1337            InfoErrorKind::TimedOut => DeleteError::new(DeleteErrorKind::TimedOut),
1338        }
1339    }
1340}
1341
1342crate::from_with_timeout!(DeleteError, DeleteErrorKind, PublishError, PublishErrorKind);
1343crate::from_with_timeout!(DeleteError, DeleteErrorKind, PurgeError, PurgeErrorKind);
1344
1345#[derive(Clone, Copy, Debug, PartialEq)]
1346pub enum PutErrorKind {
1347    InvalidName,
1348    ReadChunks,
1349    PublishChunks,
1350    PublishMetadata,
1351    PurgeOldChunks,
1352    TimedOut,
1353    Other,
1354}
1355
1356impl Display for PutErrorKind {
1357    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1358        match self {
1359            Self::PublishChunks => write!(f, "failed publishing object chunks"),
1360            Self::PublishMetadata => write!(f, "failed publishing metadata"),
1361            Self::PurgeOldChunks => write!(f, "failed purging old chunks"),
1362            Self::TimedOut => write!(f, "timed out"),
1363            Self::Other => write!(f, "error"),
1364            Self::InvalidName => write!(f, "invalid object name"),
1365            Self::ReadChunks => write!(f, "error while reading the buffer"),
1366        }
1367    }
1368}
1369
1370pub type PutError = Error<PutErrorKind>;
1371
1372pub type AddLinkError = Error<AddLinkErrorKind>;
1373
1374#[derive(Clone, Copy, Debug, PartialEq)]
1375pub enum AddLinkErrorKind {
1376    EmptyName,
1377    ObjectRequired,
1378    Deleted,
1379    LinkToLink,
1380    PublishMetadata,
1381    AlreadyExists,
1382    Other,
1383}
1384
1385impl Display for AddLinkErrorKind {
1386    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1387        match self {
1388            AddLinkErrorKind::ObjectRequired => write!(f, "cannot link to empty Object"),
1389            AddLinkErrorKind::Deleted => write!(f, "cannot link a deleted Object"),
1390            AddLinkErrorKind::LinkToLink => write!(f, "cannot link to another link"),
1391            AddLinkErrorKind::EmptyName => write!(f, "link name cannot be empty"),
1392            AddLinkErrorKind::PublishMetadata => write!(f, "failed publishing link metadata"),
1393            AddLinkErrorKind::Other => write!(f, "error"),
1394            AddLinkErrorKind::AlreadyExists => write!(f, "object already exists"),
1395        }
1396    }
1397}
1398
1399type PublishMetadataError = Error<PublishMetadataErrorKind>;
1400
1401#[derive(Clone, Copy, Debug, PartialEq)]
1402enum PublishMetadataErrorKind {
1403    PublishMetadata,
1404    Other,
1405}
1406
1407impl Display for PublishMetadataErrorKind {
1408    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1409        match self {
1410            PublishMetadataErrorKind::PublishMetadata => write!(f, "failed to publish metadata"),
1411            PublishMetadataErrorKind::Other => write!(f, "error"),
1412        }
1413    }
1414}
1415
1416impl From<PublishMetadataError> for AddLinkError {
1417    fn from(error: PublishMetadataError) -> Self {
1418        match error.kind {
1419            PublishMetadataErrorKind::PublishMetadata => {
1420                AddLinkError::new(AddLinkErrorKind::PublishMetadata)
1421            }
1422            PublishMetadataErrorKind::Other => {
1423                AddLinkError::with_source(AddLinkErrorKind::Other, error)
1424            }
1425        }
1426    }
1427}
1428impl From<PublishMetadataError> for PutError {
1429    fn from(error: PublishMetadataError) -> Self {
1430        match error.kind {
1431            PublishMetadataErrorKind::PublishMetadata => {
1432                PutError::new(PutErrorKind::PublishMetadata)
1433            }
1434            PublishMetadataErrorKind::Other => PutError::with_source(PutErrorKind::Other, error),
1435        }
1436    }
1437}
1438
1439#[derive(Clone, Copy, Debug, PartialEq)]
1440pub enum WatchErrorKind {
1441    TimedOut,
1442    ConsumerCreate,
1443    Other,
1444}
1445
1446impl Display for WatchErrorKind {
1447    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1448        match self {
1449            Self::ConsumerCreate => write!(f, "watch consumer creation failed"),
1450            Self::Other => write!(f, "watch failed"),
1451            Self::TimedOut => write!(f, "timed out"),
1452        }
1453    }
1454}
1455
1456pub type WatchError = Error<WatchErrorKind>;
1457
1458crate::from_with_timeout!(WatchError, WatchErrorKind, ConsumerError, ConsumerErrorKind);
1459crate::from_with_timeout!(WatchError, WatchErrorKind, StreamError, StreamErrorKind);
1460
1461pub type ListError = WatchError;
1462pub type ListErrorKind = WatchErrorKind;
1463
1464#[derive(Clone, Copy, Debug, PartialEq)]
1465pub enum SealErrorKind {
1466    TimedOut,
1467    Other,
1468    Info,
1469    Update,
1470}
1471
1472impl Display for SealErrorKind {
1473    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1474        match self {
1475            Self::TimedOut => write!(f, "timed out"),
1476            Self::Other => write!(f, "seal failed"),
1477            Self::Info => write!(f, "failed getting stream info before sealing bucket"),
1478            Self::Update => write!(f, "failed sealing the bucket"),
1479        }
1480    }
1481}
1482
1483pub type SealError = Error<SealErrorKind>;
1484
1485impl From<super::context::UpdateStreamError> for SealError {
1486    fn from(err: super::context::UpdateStreamError) -> Self {
1487        match err.kind() {
1488            super::context::CreateStreamErrorKind::TimedOut => {
1489                SealError::new(SealErrorKind::TimedOut)
1490            }
1491            _ => SealError::with_source(SealErrorKind::Update, err),
1492        }
1493    }
1494}
1495
1496#[derive(Clone, Copy, Debug, PartialEq)]
1497pub enum WatcherErrorKind {
1498    ConsumerError,
1499    Other,
1500}
1501
1502impl Display for WatcherErrorKind {
1503    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1504        match self {
1505            Self::ConsumerError => write!(f, "watcher consumer error"),
1506            Self::Other => write!(f, "watcher error"),
1507        }
1508    }
1509}
1510
1511pub type WatcherError = Error<WatcherErrorKind>;
1512
1513impl From<OrderedError> for WatcherError {
1514    fn from(err: OrderedError) -> Self {
1515        WatcherError::with_source(WatcherErrorKind::ConsumerError, err)
1516    }
1517}
1518
1519pub type ListerError = WatcherError;
1520pub type ListerErrorKind = WatcherErrorKind;