async_nats/jetstream/object_store/
mod.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14//! Object Store module
15use std::collections::{HashMap, VecDeque};
16use std::fmt::Display;
17use std::{cmp, str::FromStr, task::Poll, time::Duration};
18
19use crate::crypto::Sha256;
20use crate::subject::Subject;
21use crate::{HeaderMap, HeaderValue};
22use base64::engine::general_purpose::URL_SAFE;
23use base64::engine::Engine;
24use bytes::BytesMut;
25use futures_util::future::BoxFuture;
26use once_cell::sync::Lazy;
27use tokio::io::AsyncReadExt;
28
29use futures_util::{Stream, StreamExt};
30use regex::Regex;
31use serde::{Deserialize, Serialize};
32use tracing::{debug, trace};
33
34use super::consumer::push::{OrderedConfig, OrderedError};
35use super::consumer::{DeliverPolicy, StreamError, StreamErrorKind};
36use super::context::{PublishError, PublishErrorKind};
37use super::stream::{self, ConsumerError, ConsumerErrorKind, PurgeError, PurgeErrorKind};
38use super::{consumer::push::Ordered, stream::StorageType};
39use crate::error::Error;
40use time::{serde::rfc3339, OffsetDateTime};
41
42const DEFAULT_CHUNK_SIZE: usize = 128 * 1024;
43const NATS_ROLLUP: &str = "Nats-Rollup";
44const ROLLUP_SUBJECT: &str = "sub";
45
46static BUCKET_NAME_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\A[a-zA-Z0-9_-]+\z").unwrap());
47static OBJECT_NAME_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\A[-/_=\.a-zA-Z0-9]+\z").unwrap());
48
49pub(crate) fn is_valid_bucket_name(bucket_name: &str) -> bool {
50    BUCKET_NAME_RE.is_match(bucket_name)
51}
52
53pub(crate) fn is_valid_object_name(object_name: &str) -> bool {
54    if object_name.is_empty() || object_name.starts_with('.') || object_name.ends_with('.') {
55        return false;
56    }
57
58    OBJECT_NAME_RE.is_match(object_name)
59}
60
61pub(crate) fn encode_object_name(object_name: &str) -> String {
62    URL_SAFE.encode(object_name)
63}
64
65/// Configuration values for object store buckets.
66#[derive(Debug, Default, Clone, Serialize, Deserialize)]
67pub struct Config {
68    /// Name of the storage bucket.
69    pub bucket: String,
70    /// A short description of the purpose of this storage bucket.
71    pub description: Option<String>,
72    /// Maximum age of any value in the bucket, expressed in nanoseconds
73    #[serde(default, with = "serde_nanos")]
74    pub max_age: Duration,
75    /// How large the storage bucket may become in total bytes.
76    pub max_bytes: i64,
77    /// The type of storage backend, `File` (default) and `Memory`
78    pub storage: StorageType,
79    /// How many replicas to keep for each value in a cluster, maximum 5.
80    pub num_replicas: usize,
81    /// Sets compression of the underlying stream.
82    pub compression: bool,
83    // Cluster and tag placement.
84    pub placement: Option<stream::Placement>,
85}
86
87/// A blob store capable of storing large objects efficiently in streams.
88#[derive(Clone)]
89pub struct ObjectStore {
90    pub(crate) name: String,
91    pub(crate) stream: crate::jetstream::stream::Stream,
92}
93
94impl ObjectStore {
95    /// Gets an [Object] from the [ObjectStore].
96    ///
97    /// [Object] implements [tokio::io::AsyncRead] that allows
98    /// to read the data from Object Store.
99    ///
100    /// # Examples
101    ///
102    /// ```no_run
103    /// # #[tokio::main]
104    /// # async fn main() -> Result<(), async_nats::Error> {
105    /// use tokio::io::AsyncReadExt;
106    /// let client = async_nats::connect("demo.nats.io").await?;
107    /// let jetstream = async_nats::jetstream::new(client);
108    ///
109    /// let bucket = jetstream.get_object_store("store").await?;
110    /// let mut object = bucket.get("FOO").await?;
111    ///
112    /// // Object implements `tokio::io::AsyncRead`.
113    /// let mut bytes = vec![];
114    /// object.read_to_end(&mut bytes).await?;
115    /// # Ok(())
116    /// # }
117    /// ```
118    pub async fn get<T: AsRef<str> + Send>(&self, object_name: T) -> Result<Object, GetError> {
119        self.get_impl(object_name).await
120    }
121
122    fn get_impl<'bucket, 'future, T>(
123        &'bucket self,
124        object_name: T,
125    ) -> BoxFuture<'future, Result<Object, GetError>>
126    where
127        T: AsRef<str> + Send + 'future,
128        'bucket: 'future,
129    {
130        Box::pin(async move {
131            let object_info = self.info(object_name).await?;
132            if object_info.deleted {
133                return Err(GetError::new(GetErrorKind::NotFound));
134            }
135            if let Some(ref options) = object_info.options {
136                if let Some(link) = options.link.as_ref() {
137                    if let Some(link_name) = link.name.as_ref() {
138                        let link_name = link_name.clone();
139                        debug!("getting object via link");
140                        if link.bucket == self.name {
141                            return self.get_impl(link_name).await;
142                        } else {
143                            let bucket = self
144                                .stream
145                                .context
146                                .get_object_store(&link.bucket)
147                                .await
148                                .map_err(|err| {
149                                GetError::with_source(GetErrorKind::Other, err)
150                            })?;
151                            let object = bucket.get_impl(&link_name).await?;
152                            return Ok(object);
153                        }
154                    } else {
155                        return Err(GetError::new(GetErrorKind::BucketLink));
156                    }
157                }
158            }
159
160            debug!("not a link. Getting the object");
161            Ok(Object::new(object_info, self.stream.clone()))
162        })
163    }
164
165    /// Deletes an [Object] from the [ObjectStore].
166    ///
167    /// # Examples
168    ///
169    /// ```no_run
170    /// # #[tokio::main]
171    /// # async fn main() -> Result<(), async_nats::Error> {
172    /// let client = async_nats::connect("demo.nats.io").await?;
173    /// let jetstream = async_nats::jetstream::new(client);
174    ///
175    /// let bucket = jetstream.get_object_store("store").await?;
176    /// bucket.delete("FOO").await?;
177    /// # Ok(())
178    /// # }
179    /// ```
180    pub async fn delete<T: AsRef<str>>(&self, object_name: T) -> Result<(), DeleteError> {
181        let object_name = object_name.as_ref();
182        let mut object_info = self.info(object_name).await?;
183        object_info.chunks = 0;
184        object_info.size = 0;
185        object_info.deleted = true;
186
187        let data = serde_json::to_vec(&object_info).map_err(|err| {
188            DeleteError::with_source(
189                DeleteErrorKind::Other,
190                format!("failed deserializing object info: {err}"),
191            )
192        })?;
193
194        let mut headers = HeaderMap::default();
195        headers.insert(
196            NATS_ROLLUP,
197            HeaderValue::from_str(ROLLUP_SUBJECT).map_err(|err| {
198                DeleteError::with_source(
199                    DeleteErrorKind::Other,
200                    format!("failed parsing header: {err}"),
201                )
202            })?,
203        );
204
205        let subject = format!("$O.{}.M.{}", &self.name, encode_object_name(object_name));
206
207        self.stream
208            .context
209            .publish_with_headers(subject, headers, data.into())
210            .await?
211            .await?;
212
213        let chunk_subject = format!("$O.{}.C.{}", self.name, object_info.nuid);
214
215        self.stream.purge().filter(&chunk_subject).await?;
216
217        Ok(())
218    }
219
220    /// Retrieves [Object] [ObjectInfo].
221    ///
222    /// # Examples
223    ///
224    /// ```no_run
225    /// # #[tokio::main]
226    /// # async fn main() -> Result<(), async_nats::Error> {
227    /// let client = async_nats::connect("demo.nats.io").await?;
228    /// let jetstream = async_nats::jetstream::new(client);
229    ///
230    /// let bucket = jetstream.get_object_store("store").await?;
231    /// let info = bucket.info("FOO").await?;
232    /// # Ok(())
233    /// # }
234    /// ```
235    pub async fn info<T: AsRef<str>>(&self, object_name: T) -> Result<ObjectInfo, InfoError> {
236        let object_name = object_name.as_ref();
237        let object_name = encode_object_name(object_name);
238        if !is_valid_object_name(&object_name) {
239            return Err(InfoError::new(InfoErrorKind::InvalidName));
240        }
241
242        // Grab last meta value we have.
243        let subject = format!("$O.{}.M.{}", &self.name, &object_name);
244
245        // FIXME(jrm): we should use direct get here when possible.
246        let message = self
247            .stream
248            .get_last_raw_message_by_subject(subject.as_str())
249            .await
250            .map_err(|err| match err.kind() {
251                stream::LastRawMessageErrorKind::NoMessageFound => {
252                    InfoError::new(InfoErrorKind::NotFound)
253                }
254                _ => InfoError::with_source(InfoErrorKind::Other, err),
255            })?;
256        let object_info =
257            serde_json::from_slice::<ObjectInfo>(&message.payload).map_err(|err| {
258                InfoError::with_source(
259                    InfoErrorKind::Other,
260                    format!("failed to decode info payload: {err}"),
261                )
262            })?;
263
264        Ok(object_info)
265    }
266
267    /// Puts an [Object] into the [ObjectStore].
268    /// This method implements `tokio::io::AsyncRead`.
269    ///
270    /// # Examples
271    ///
272    /// ```no_run
273    /// # #[tokio::main]
274    /// # async fn main() -> Result<(), async_nats::Error> {
275    /// let client = async_nats::connect("demo.nats.io").await?;
276    /// let jetstream = async_nats::jetstream::new(client);
277    ///
278    /// let bucket = jetstream.get_object_store("store").await?;
279    /// let mut file = tokio::fs::File::open("foo.txt").await?;
280    /// bucket.put("file", &mut file).await.unwrap();
281    /// # Ok(())
282    /// # }
283    /// ```
284    pub async fn put<T>(
285        &self,
286        meta: T,
287        data: &mut (impl tokio::io::AsyncRead + std::marker::Unpin),
288    ) -> Result<ObjectInfo, PutError>
289    where
290        ObjectMetadata: From<T>,
291    {
292        let object_meta: ObjectMetadata = meta.into();
293
294        // Fetch any existing object info, if there is any for later use.
295        let maybe_existing_object_info = (self.info(&object_meta.name).await).ok();
296
297        let object_nuid = nuid::next();
298        let chunk_subject = Subject::from(format!("$O.{}.C.{}", &self.name, &object_nuid));
299
300        let mut object_chunks = 0;
301        let mut object_size = 0;
302
303        let chunk_size = object_meta.chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE);
304        let mut buffer = BytesMut::with_capacity(chunk_size);
305        let mut sha256 = Sha256::new();
306
307        loop {
308            let n = data
309                .read_buf(&mut buffer)
310                .await
311                .map_err(|err| PutError::with_source(PutErrorKind::ReadChunks, err))?;
312
313            if n == 0 {
314                break;
315            }
316
317            let payload = buffer.split().freeze();
318            sha256.update(&payload);
319
320            object_size += payload.len();
321            object_chunks += 1;
322
323            self.stream
324                .context
325                .publish(chunk_subject.clone(), payload)
326                .await
327                .map_err(|err| {
328                    PutError::with_source(
329                        PutErrorKind::PublishChunks,
330                        format!("failed chunk publish: {err}"),
331                    )
332                })?
333                .await
334                .map_err(|err| {
335                    PutError::with_source(
336                        PutErrorKind::PublishChunks,
337                        format!("failed getting chunk ack: {err}"),
338                    )
339                })?;
340        }
341        let digest = sha256.finish();
342
343        let encoded_object_name = encode_object_name(&object_meta.name);
344        if !is_valid_object_name(&encoded_object_name) {
345            return Err(PutError::new(PutErrorKind::InvalidName));
346        }
347        let subject = format!("$O.{}.M.{}", &self.name, &encoded_object_name);
348
349        let object_info = ObjectInfo {
350            name: object_meta.name,
351            description: object_meta.description,
352            options: Some(ObjectOptions {
353                max_chunk_size: Some(chunk_size),
354                link: None,
355            }),
356            bucket: self.name.clone(),
357            nuid: object_nuid.to_string(),
358            chunks: object_chunks,
359            size: object_size,
360            digest: Some(format!("SHA-256={}", URL_SAFE.encode(digest))),
361            modified: Some(OffsetDateTime::now_utc()),
362            deleted: false,
363            metadata: object_meta.metadata,
364            headers: object_meta.headers,
365        };
366
367        let mut headers = HeaderMap::new();
368        headers.insert(
369            NATS_ROLLUP,
370            ROLLUP_SUBJECT.parse::<HeaderValue>().map_err(|err| {
371                PutError::with_source(PutErrorKind::Other, format!("failed parsing header: {err}"))
372            })?,
373        );
374        let data = serde_json::to_vec(&object_info).map_err(|err| {
375            PutError::with_source(
376                PutErrorKind::Other,
377                format!("failed serializing object info: {err}"),
378            )
379        })?;
380
381        // publish meta.
382        self.stream
383            .context
384            .publish_with_headers(subject, headers, data.into())
385            .await
386            .map_err(|err| {
387                PutError::with_source(
388                    PutErrorKind::PublishMetadata,
389                    format!("failed publishing metadata: {err}"),
390                )
391            })?
392            .await
393            .map_err(|err| {
394                PutError::with_source(
395                    PutErrorKind::PublishMetadata,
396                    format!("failed ack from metadata publish: {err}"),
397                )
398            })?;
399
400        // Purge any old chunks.
401        if let Some(existing_object_info) = maybe_existing_object_info {
402            let chunk_subject = format!("$O.{}.C.{}", &self.name, &existing_object_info.nuid);
403
404            self.stream
405                .purge()
406                .filter(&chunk_subject)
407                .await
408                .map_err(|err| PutError::with_source(PutErrorKind::PurgeOldChunks, err))?;
409        }
410
411        Ok(object_info)
412    }
413
414    /// Creates a [Watch] stream over changes in the [ObjectStore].
415    ///
416    /// # Examples
417    ///
418    /// ```no_run
419    /// # #[tokio::main]
420    /// # async fn main() -> Result<(), async_nats::Error> {
421    /// use futures_util::StreamExt;
422    /// let client = async_nats::connect("demo.nats.io").await?;
423    /// let jetstream = async_nats::jetstream::new(client);
424    ///
425    /// let bucket = jetstream.get_object_store("store").await?;
426    /// let mut watcher = bucket.watch().await.unwrap();
427    /// while let Some(object) = watcher.next().await {
428    ///     println!("detected changes in {:?}", object?);
429    /// }
430    /// # Ok(())
431    /// # }
432    /// ```
433    pub async fn watch(&self) -> Result<Watch, WatchError> {
434        self.watch_with_deliver_policy(DeliverPolicy::New).await
435    }
436
437    /// Creates a [Watch] stream over changes in the [ObjectStore] which yields values whenever
438    /// there are changes for that key with as well as last value.
439    pub async fn watch_with_history(&self) -> Result<Watch, WatchError> {
440        self.watch_with_deliver_policy(DeliverPolicy::LastPerSubject)
441            .await
442    }
443
444    async fn watch_with_deliver_policy(
445        &self,
446        deliver_policy: DeliverPolicy,
447    ) -> Result<Watch, WatchError> {
448        let subject = format!("$O.{}.M.>", self.name);
449        let ordered = self
450            .stream
451            .create_consumer(crate::jetstream::consumer::push::OrderedConfig {
452                deliver_policy,
453                deliver_subject: self.stream.context.client.new_inbox(),
454                description: Some("object store watcher".to_string()),
455                filter_subject: subject,
456                ..Default::default()
457            })
458            .await?;
459        Ok(Watch {
460            subscription: ordered.messages().await?,
461        })
462    }
463
464    /// Returns a [List] stream with all not deleted [Objects][Object] in the [ObjectStore].
465    ///
466    /// # Examples
467    ///
468    /// ```no_run
469    /// # #[tokio::main]
470    /// # async fn main() -> Result<(), async_nats::Error> {
471    /// use futures_util::StreamExt;
472    /// let client = async_nats::connect("demo.nats.io").await?;
473    /// let jetstream = async_nats::jetstream::new(client);
474    ///
475    /// let bucket = jetstream.get_object_store("store").await?;
476    /// let mut list = bucket.list().await.unwrap();
477    /// while let Some(object) = list.next().await {
478    ///     println!("object {:?}", object?);
479    /// }
480    /// # Ok(())
481    /// # }
482    /// ```
483    pub async fn list(&self) -> Result<List, ListError> {
484        trace!("starting Object List");
485        let subject = format!("$O.{}.M.>", self.name);
486        let ordered = self
487            .stream
488            .create_consumer(crate::jetstream::consumer::push::OrderedConfig {
489                deliver_policy: super::consumer::DeliverPolicy::All,
490                deliver_subject: self.stream.context.client.new_inbox(),
491                description: Some("object store list".to_string()),
492                filter_subject: subject,
493                ..Default::default()
494            })
495            .await?;
496        Ok(List {
497            done: ordered.info.num_pending == 0,
498            subscription: Some(ordered.messages().await?),
499        })
500    }
501
502    /// Seals a [ObjectStore], preventing any further changes to it or its [Objects][Object].
503    ///
504    /// # Examples
505    ///
506    /// ```no_run
507    /// # #[tokio::main]
508    /// # async fn main() -> Result<(), async_nats::Error> {
509    /// use futures_util::StreamExt;
510    /// let client = async_nats::connect("demo.nats.io").await?;
511    /// let jetstream = async_nats::jetstream::new(client);
512    ///
513    /// let mut bucket = jetstream.get_object_store("store").await?;
514    /// bucket.seal().await.unwrap();
515    /// # Ok(())
516    /// # }
517    /// ```
518    pub async fn seal(&mut self) -> Result<(), SealError> {
519        let mut stream_config = self
520            .stream
521            .info()
522            .await
523            .map_err(|err| SealError::with_source(SealErrorKind::Info, err))?
524            .to_owned();
525        stream_config.config.sealed = true;
526
527        self.stream
528            .context
529            .update_stream(&stream_config.config)
530            .await?;
531        Ok(())
532    }
533
534    /// Updates [Object] [ObjectMetadata].
535    ///
536    /// # Examples
537    ///
538    /// ```no_run
539    /// # #[tokio::main]
540    /// # async fn main() -> Result<(), async_nats::Error> {
541    /// use async_nats::jetstream::object_store;
542    /// let client = async_nats::connect("demo.nats.io").await?;
543    /// let jetstream = async_nats::jetstream::new(client);
544    ///
545    /// let mut bucket = jetstream.get_object_store("store").await?;
546    /// bucket
547    ///     .update_metadata(
548    ///         "object",
549    ///         object_store::UpdateMetadata {
550    ///             name: "new_name".to_string(),
551    ///             description: Some("a new description".to_string()),
552    ///             ..Default::default()
553    ///         },
554    ///     )
555    ///     .await?;
556    /// # Ok(())
557    /// # }
558    /// ```
559    pub async fn update_metadata<A: AsRef<str>>(
560        &self,
561        object: A,
562        metadata: UpdateMetadata,
563    ) -> Result<ObjectInfo, UpdateMetadataError> {
564        let mut info = self.info(object.as_ref()).await?;
565
566        // If name is being update, we need to check if other metadata with it already exists.
567        // If does, error. Otherwise, purge old name metadata.
568        if metadata.name != info.name {
569            tracing::info!("new metadata name is different than then old one");
570            if !is_valid_object_name(&metadata.name) {
571                return Err(UpdateMetadataError::new(
572                    UpdateMetadataErrorKind::InvalidName,
573                ));
574            }
575            match self.info(&metadata.name).await {
576                Ok(_) => {
577                    return Err(UpdateMetadataError::new(
578                        UpdateMetadataErrorKind::NameAlreadyInUse,
579                    ))
580                }
581                Err(err) => match err.kind() {
582                    InfoErrorKind::NotFound => {
583                        tracing::info!("purging old metadata: {}", info.name);
584                        self.stream
585                            .purge()
586                            .filter(format!(
587                                "$O.{}.M.{}",
588                                self.name,
589                                encode_object_name(&info.name)
590                            ))
591                            .await
592                            .map_err(|err| {
593                                UpdateMetadataError::with_source(
594                                    UpdateMetadataErrorKind::Purge,
595                                    err,
596                                )
597                            })?;
598                    }
599                    _ => {
600                        return Err(UpdateMetadataError::with_source(
601                            UpdateMetadataErrorKind::Other,
602                            err,
603                        ))
604                    }
605                },
606            }
607        }
608
609        info.name = metadata.name;
610        info.description = metadata.description;
611
612        let name = encode_object_name(&info.name);
613        let subject = format!("$O.{}.M.{}", &self.name, &name);
614
615        let mut headers = HeaderMap::new();
616        headers.insert(
617            NATS_ROLLUP,
618            ROLLUP_SUBJECT.parse::<HeaderValue>().map_err(|err| {
619                UpdateMetadataError::with_source(
620                    UpdateMetadataErrorKind::Other,
621                    format!("failed parsing header: {err}"),
622                )
623            })?,
624        );
625        let data = serde_json::to_vec(&info).map_err(|err| {
626            UpdateMetadataError::with_source(
627                UpdateMetadataErrorKind::Other,
628                format!("failed serializing object info: {err}"),
629            )
630        })?;
631
632        // publish meta.
633        self.stream
634            .context
635            .publish_with_headers(subject, headers, data.into())
636            .await
637            .map_err(|err| {
638                UpdateMetadataError::with_source(
639                    UpdateMetadataErrorKind::PublishMetadata,
640                    format!("failed publishing metadata: {err}"),
641                )
642            })?
643            .await
644            .map_err(|err| {
645                UpdateMetadataError::with_source(
646                    UpdateMetadataErrorKind::PublishMetadata,
647                    format!("failed ack from metadata publish: {err}"),
648                )
649            })?;
650
651        Ok(info)
652    }
653
654    /// Adds a link to an [Object].
655    /// It creates a new [Object] in the [ObjectStore] that points to another [Object]
656    /// and does not have any contents on it's own.
657    /// Links are automatically followed (one level deep) when calling [ObjectStore::get].
658    ///
659    /// # Examples
660    ///
661    /// ```no_run
662    /// # #[tokio::main]
663    /// # async fn main() -> Result<(), async_nats::Error> {
664    /// use async_nats::jetstream::object_store;
665    /// let client = async_nats::connect("demo.nats.io").await?;
666    /// let jetstream = async_nats::jetstream::new(client);
667    /// let bucket = jetstream.get_object_store("bucket").await?;
668    /// let object = bucket.get("object").await?;
669    /// bucket.add_link("link_to_object", &object).await?;
670    /// # Ok(())
671    /// # }
672    /// ```
673    pub async fn add_link<T, O>(&self, name: T, object: O) -> Result<ObjectInfo, AddLinkError>
674    where
675        T: ToString,
676        O: AsObjectInfo,
677    {
678        let object = object.as_info();
679        let name = name.to_string();
680        if name.is_empty() {
681            return Err(AddLinkError::new(AddLinkErrorKind::EmptyName));
682        }
683        if object.name.is_empty() {
684            return Err(AddLinkError::new(AddLinkErrorKind::ObjectRequired));
685        }
686        if object.deleted {
687            return Err(AddLinkError::new(AddLinkErrorKind::Deleted));
688        }
689        if let Some(ref options) = object.options {
690            if options.link.is_some() {
691                return Err(AddLinkError::new(AddLinkErrorKind::LinkToLink));
692            }
693        }
694        match self.info(&name).await {
695            Ok(info) => {
696                if let Some(options) = info.options {
697                    if options.link.is_none() {
698                        return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists));
699                    }
700                } else {
701                    return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists));
702                }
703            }
704            Err(err) if err.kind() != InfoErrorKind::NotFound => {
705                return Err(AddLinkError::with_source(AddLinkErrorKind::Other, err))
706            }
707            _ => (),
708        }
709
710        let info = ObjectInfo {
711            name,
712            description: None,
713            options: Some(ObjectOptions {
714                link: Some(ObjectLink {
715                    name: Some(object.name.clone()),
716                    bucket: object.bucket.clone(),
717                }),
718                max_chunk_size: None,
719            }),
720            bucket: self.name.clone(),
721            nuid: nuid::next().to_string(),
722            size: 0,
723            chunks: 0,
724            modified: Some(OffsetDateTime::now_utc()),
725            digest: None,
726            deleted: false,
727            metadata: HashMap::default(),
728            headers: None,
729        };
730        publish_meta(self, &info).await?;
731        Ok(info)
732    }
733
734    /// Adds a link to another [ObjectStore] bucket by creating a new [Object]
735    /// in the current [ObjectStore] that points to another [ObjectStore] and
736    /// does not contain any data.
737    ///
738    /// # Examples
739    ///
740    /// ```no_run
741    /// # #[tokio::main]
742    /// # async fn main() -> Result<(), async_nats::Error> {
743    /// use async_nats::jetstream::object_store;
744    /// let client = async_nats::connect("demo.nats.io").await?;
745    /// let jetstream = async_nats::jetstream::new(client);
746    /// let bucket = jetstream.get_object_store("bucket").await?;
747    /// bucket
748    ///     .add_bucket_link("link_to_object", "another_bucket")
749    ///     .await?;
750    /// # Ok(())
751    /// # }
752    /// ```
753    pub async fn add_bucket_link<T: ToString, U: ToString>(
754        &self,
755        name: T,
756        bucket: U,
757    ) -> Result<ObjectInfo, AddLinkError> {
758        let name = name.to_string();
759        let bucket = bucket.to_string();
760        if name.is_empty() {
761            return Err(AddLinkError::new(AddLinkErrorKind::EmptyName));
762        }
763
764        match self.info(&name).await {
765            Ok(info) => {
766                if let Some(options) = info.options {
767                    if options.link.is_none() {
768                        return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists));
769                    }
770                }
771            }
772            Err(err) if err.kind() != InfoErrorKind::NotFound => {
773                return Err(AddLinkError::with_source(AddLinkErrorKind::Other, err))
774            }
775            _ => (),
776        }
777
778        let info = ObjectInfo {
779            name: name.clone(),
780            description: None,
781            options: Some(ObjectOptions {
782                link: Some(ObjectLink { name: None, bucket }),
783                max_chunk_size: None,
784            }),
785            bucket: self.name.clone(),
786            nuid: nuid::next().to_string(),
787            size: 0,
788            chunks: 0,
789            modified: Some(OffsetDateTime::now_utc()),
790            digest: None,
791            deleted: false,
792            metadata: HashMap::default(),
793            headers: None,
794        };
795        publish_meta(self, &info).await?;
796        Ok(info)
797    }
798}
799
800async fn publish_meta(store: &ObjectStore, info: &ObjectInfo) -> Result<(), PublishMetadataError> {
801    let encoded_object_name = encode_object_name(&info.name);
802    let subject = format!("$O.{}.M.{}", &store.name, &encoded_object_name);
803
804    let mut headers = HeaderMap::new();
805    headers.insert(
806        NATS_ROLLUP,
807        ROLLUP_SUBJECT.parse::<HeaderValue>().map_err(|err| {
808            PublishMetadataError::with_source(
809                PublishMetadataErrorKind::Other,
810                format!("failed parsing header: {err}"),
811            )
812        })?,
813    );
814    let data = serde_json::to_vec(&info).map_err(|err| {
815        PublishMetadataError::with_source(
816            PublishMetadataErrorKind::Other,
817            format!("failed serializing object info: {err}"),
818        )
819    })?;
820
821    store
822        .stream
823        .context
824        .publish_with_headers(subject, headers, data.into())
825        .await
826        .map_err(|err| {
827            PublishMetadataError::with_source(
828                PublishMetadataErrorKind::PublishMetadata,
829                format!("failed publishing metadata: {err}"),
830            )
831        })?
832        .await
833        .map_err(|err| {
834            PublishMetadataError::with_source(
835                PublishMetadataErrorKind::PublishMetadata,
836                format!("failed ack from metadata publish: {err}"),
837            )
838        })?;
839    Ok(())
840}
841
842pub struct Watch {
843    subscription: crate::jetstream::consumer::push::Ordered,
844}
845
846impl Stream for Watch {
847    type Item = Result<ObjectInfo, WatcherError>;
848
849    fn poll_next(
850        mut self: std::pin::Pin<&mut Self>,
851        cx: &mut std::task::Context<'_>,
852    ) -> Poll<Option<Self::Item>> {
853        match self.subscription.poll_next_unpin(cx) {
854            Poll::Ready(message) => match message {
855                Some(message) => Poll::Ready(
856                    serde_json::from_slice::<ObjectInfo>(&message?.payload)
857                        .map_err(|err| {
858                            WatcherError::with_source(
859                                WatcherErrorKind::Other,
860                                format!("failed to deserialize object info: {err}"),
861                            )
862                        })
863                        .map_or_else(|err| Some(Err(err)), |result| Some(Ok(result))),
864                ),
865                None => Poll::Ready(None),
866            },
867            Poll::Pending => Poll::Pending,
868        }
869    }
870}
871
872pub struct List {
873    subscription: Option<crate::jetstream::consumer::push::Ordered>,
874    done: bool,
875}
876
877impl Stream for List {
878    type Item = Result<ObjectInfo, ListerError>;
879
880    fn poll_next(
881        mut self: std::pin::Pin<&mut Self>,
882        cx: &mut std::task::Context<'_>,
883    ) -> Poll<Option<Self::Item>> {
884        loop {
885            if self.done {
886                debug!("Object Store list done");
887                self.subscription = None;
888                return Poll::Ready(None);
889            }
890
891            if let Some(subscription) = self.subscription.as_mut() {
892                match subscription.poll_next_unpin(cx) {
893                    Poll::Ready(message) => match message {
894                        None => return Poll::Ready(None),
895                        Some(message) => {
896                            let message = message?;
897                            let info = message.info().map_err(|err| {
898                                ListerError::with_source(ListerErrorKind::Other, err)
899                            })?;
900                            trace!("num pending: {}", info.pending);
901                            if info.pending == 0 {
902                                self.done = true;
903                            }
904                            let response: ObjectInfo = serde_json::from_slice(&message.payload)
905                                .map_err(|err| {
906                                    ListerError::with_source(
907                                        ListerErrorKind::Other,
908                                        format!("failed deserializing object info: {err}"),
909                                    )
910                                })?;
911                            if response.deleted {
912                                continue;
913                            }
914                            return Poll::Ready(Some(Ok(response)));
915                        }
916                    },
917                    Poll::Pending => return Poll::Pending,
918                }
919            } else {
920                return Poll::Ready(None);
921            }
922        }
923    }
924}
925
926/// Represents an object stored in a bucket.
927pub struct Object {
928    pub info: ObjectInfo,
929    remaining_bytes: VecDeque<u8>,
930    has_pending_messages: bool,
931    digest: Option<Sha256>,
932    subscription: Option<crate::jetstream::consumer::push::Ordered>,
933    subscription_future: Option<BoxFuture<'static, Result<Ordered, StreamError>>>,
934    stream: crate::jetstream::stream::Stream,
935}
936
937impl std::fmt::Debug for Object {
938    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
939        f.debug_struct("Object")
940            .field("info", &self.info)
941            .field("remaining_bytes", &self.remaining_bytes)
942            .field("has_pending_messages", &self.has_pending_messages)
943            .finish()
944    }
945}
946
947impl Object {
948    pub(crate) fn new(info: ObjectInfo, stream: stream::Stream) -> Self {
949        let has_pending_messages = info.chunks > 0;
950        Object {
951            subscription: None,
952            info,
953            remaining_bytes: VecDeque::new(),
954            has_pending_messages,
955            digest: Some(Sha256::new()),
956            subscription_future: None,
957            stream,
958        }
959    }
960
961    /// Returns information about the object.
962    pub fn info(&self) -> &ObjectInfo {
963        &self.info
964    }
965}
966
967impl tokio::io::AsyncRead for Object {
968    fn poll_read(
969        mut self: std::pin::Pin<&mut Self>,
970        cx: &mut std::task::Context<'_>,
971        buf: &mut tokio::io::ReadBuf<'_>,
972    ) -> std::task::Poll<std::io::Result<()>> {
973        let (buf1, _buf2) = self.remaining_bytes.as_slices();
974        if !buf1.is_empty() {
975            let len = cmp::min(buf.remaining(), buf1.len());
976            buf.put_slice(&buf1[..len]);
977            self.remaining_bytes.drain(..len);
978            return Poll::Ready(Ok(()));
979        }
980
981        if self.has_pending_messages {
982            if self.subscription.is_none() {
983                let future = match self.subscription_future.as_mut() {
984                    Some(future) => future,
985                    None => {
986                        let stream = self.stream.clone();
987                        let bucket = self.info.bucket.clone();
988                        let nuid = self.info.nuid.clone();
989                        self.subscription_future.insert(Box::pin(async move {
990                            stream
991                                .create_consumer(OrderedConfig {
992                                    deliver_subject: stream.context.client.new_inbox(),
993                                    filter_subject: format!("$O.{bucket}.C.{nuid}"),
994                                    ..Default::default()
995                                })
996                                .await
997                                .unwrap()
998                                .messages()
999                                .await
1000                        }))
1001                    }
1002                };
1003                match future.as_mut().poll(cx) {
1004                    Poll::Ready(subscription) => {
1005                        self.subscription = Some(subscription.unwrap());
1006                    }
1007                    Poll::Pending => (),
1008                }
1009            }
1010            if let Some(subscription) = self.subscription.as_mut() {
1011                match subscription.poll_next_unpin(cx) {
1012                    Poll::Ready(message) => match message {
1013                        Some(message) => {
1014                            let message = message.map_err(|err| {
1015                                std::io::Error::other(format!(
1016                                    "error from JetStream subscription: {err}"
1017                                ))
1018                            })?;
1019                            let len = cmp::min(buf.remaining(), message.payload.len());
1020                            buf.put_slice(&message.payload[..len]);
1021                            if let Some(context) = &mut self.digest {
1022                                context.update(&message.payload);
1023                            }
1024                            self.remaining_bytes.extend(&message.payload[len..]);
1025
1026                            let info = message.info().map_err(|err| {
1027                                std::io::Error::other(format!(
1028                                    "error from JetStream subscription: {err}"
1029                                ))
1030                            })?;
1031                            if info.pending == 0 {
1032                                let digest = self.digest.take().map(Sha256::finish);
1033                                if let Some(digest) = digest {
1034                                    if self
1035                                        .info
1036                                        .digest
1037                                        .as_ref()
1038                                        .map(|digest_self| {
1039                                            format!("SHA-256={}", URL_SAFE.encode(digest))
1040                                                != *digest_self
1041                                        })
1042                                        .unwrap_or(false)
1043                                    {
1044                                        return Poll::Ready(Err(std::io::Error::new(
1045                                            std::io::ErrorKind::InvalidData,
1046                                            "wrong digest",
1047                                        )));
1048                                    }
1049                                } else {
1050                                    return Poll::Ready(Err(std::io::Error::new(
1051                                        std::io::ErrorKind::InvalidData,
1052                                        "digest should be Some",
1053                                    )));
1054                                }
1055                                self.has_pending_messages = false;
1056                                self.subscription = None;
1057                            }
1058                            Poll::Ready(Ok(()))
1059                        }
1060                        None => Poll::Ready(Err(std::io::Error::other(
1061                            "subscription ended before reading whole object",
1062                        ))),
1063                    },
1064                    Poll::Pending => Poll::Pending,
1065                }
1066            } else {
1067                Poll::Pending
1068            }
1069        } else {
1070            Poll::Ready(Ok(()))
1071        }
1072    }
1073}
1074
1075#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
1076pub struct ObjectOptions {
1077    pub link: Option<ObjectLink>,
1078    pub max_chunk_size: Option<usize>,
1079}
1080
1081/// Meta and instance information about an object.
1082#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
1083pub struct ObjectInfo {
1084    /// Name of the object
1085    pub name: String,
1086    /// A short human readable description of the object.
1087    #[serde(default)]
1088    pub description: Option<String>,
1089    /// Metadata for given object.
1090    #[serde(default)]
1091    pub metadata: HashMap<String, String>,
1092    /// Headers for given object.
1093    #[serde(default)]
1094    pub headers: Option<HeaderMap>,
1095    /// Link this object points to, if any.
1096    #[serde(default)]
1097    pub options: Option<ObjectOptions>,
1098    /// Name of the bucket the object is stored in.
1099    pub bucket: String,
1100    /// Unique identifier used to uniquely identify this version of the object.
1101    #[serde(default)]
1102    pub nuid: String,
1103    /// Size in bytes of the object.
1104    #[serde(default)]
1105    pub size: usize,
1106    /// Number of chunks the object is stored in.
1107    #[serde(default)]
1108    pub chunks: usize,
1109    /// Date and time the object was last modified.
1110    #[serde(default, with = "rfc3339::option")]
1111    #[serde(rename = "mtime")]
1112    pub modified: Option<time::OffsetDateTime>,
1113    /// Digest of the object stream.
1114    #[serde(default, skip_serializing_if = "Option::is_none")]
1115    pub digest: Option<String>,
1116    /// Set to true if the object has been deleted.
1117    #[serde(default, skip_serializing_if = "is_default")]
1118    pub deleted: bool,
1119}
1120
1121fn is_default<T: Default + Eq>(t: &T) -> bool {
1122    t == &T::default()
1123}
1124/// A link to another object, potentially in another bucket.
1125#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
1126pub struct ObjectLink {
1127    /// Name of the object
1128    pub name: Option<String>,
1129    /// Name of the bucket the object is stored in.
1130    pub bucket: String,
1131}
1132
1133#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
1134pub struct UpdateMetadata {
1135    /// Name of the object
1136    pub name: String,
1137    /// A short human readable description of the object.
1138    pub description: Option<String>,
1139    /// Metadata for given object.
1140    #[serde(default)]
1141    pub metadata: HashMap<String, String>,
1142    /// Headers for given object.
1143    pub headers: Option<HeaderMap>,
1144}
1145
1146/// Meta information about an object.
1147#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
1148pub struct ObjectMetadata {
1149    /// Name of the object
1150    pub name: String,
1151    /// A short human readable description of the object.
1152    pub description: Option<String>,
1153    /// Max chunk size. Default is 128k.
1154    pub chunk_size: Option<usize>,
1155    /// Metadata for given object.
1156    #[serde(default)]
1157    pub metadata: HashMap<String, String>,
1158    /// Headers for given object.
1159    pub headers: Option<HeaderMap>,
1160}
1161
1162impl From<&str> for ObjectMetadata {
1163    fn from(s: &str) -> ObjectMetadata {
1164        ObjectMetadata {
1165            name: s.to_string(),
1166            ..Default::default()
1167        }
1168    }
1169}
1170
1171pub trait AsObjectInfo {
1172    fn as_info(&self) -> &ObjectInfo;
1173}
1174
1175impl AsObjectInfo for &Object {
1176    fn as_info(&self) -> &ObjectInfo {
1177        &self.info
1178    }
1179}
1180impl AsObjectInfo for &ObjectInfo {
1181    fn as_info(&self) -> &ObjectInfo {
1182        self
1183    }
1184}
1185
1186impl From<ObjectInfo> for ObjectMetadata {
1187    fn from(info: ObjectInfo) -> Self {
1188        ObjectMetadata {
1189            name: info.name,
1190            description: info.description,
1191            metadata: info.metadata,
1192            headers: info.headers,
1193            chunk_size: None,
1194        }
1195    }
1196}
1197
1198#[derive(Debug, PartialEq, Clone)]
1199pub enum UpdateMetadataErrorKind {
1200    InvalidName,
1201    NotFound,
1202    TimedOut,
1203    Other,
1204    PublishMetadata,
1205    NameAlreadyInUse,
1206    Purge,
1207}
1208
1209impl Display for UpdateMetadataErrorKind {
1210    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1211        match self {
1212            Self::InvalidName => write!(f, "invalid object name"),
1213            Self::NotFound => write!(f, "object not found"),
1214            Self::TimedOut => write!(f, "timed out"),
1215            Self::Other => write!(f, "error"),
1216            Self::PublishMetadata => {
1217                write!(f, "failed publishing metadata")
1218            }
1219            Self::NameAlreadyInUse => {
1220                write!(f, "object with updated name already exists")
1221            }
1222            Self::Purge => write!(f, "failed purging old name metadata"),
1223        }
1224    }
1225}
1226
1227impl From<InfoError> for UpdateMetadataError {
1228    fn from(error: InfoError) -> Self {
1229        match error.kind() {
1230            InfoErrorKind::InvalidName => {
1231                UpdateMetadataError::new(UpdateMetadataErrorKind::InvalidName)
1232            }
1233            InfoErrorKind::NotFound => UpdateMetadataError::new(UpdateMetadataErrorKind::NotFound),
1234            InfoErrorKind::Other => {
1235                UpdateMetadataError::with_source(UpdateMetadataErrorKind::Other, error)
1236            }
1237            InfoErrorKind::TimedOut => UpdateMetadataError::new(UpdateMetadataErrorKind::TimedOut),
1238        }
1239    }
1240}
1241
1242pub type UpdateMetadataError = Error<UpdateMetadataErrorKind>;
1243
1244#[derive(Clone, Copy, Debug, PartialEq)]
1245pub enum InfoErrorKind {
1246    InvalidName,
1247    NotFound,
1248    Other,
1249    TimedOut,
1250}
1251
1252impl Display for InfoErrorKind {
1253    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1254        match self {
1255            Self::InvalidName => write!(f, "invalid object name"),
1256            Self::Other => write!(f, "getting info failed"),
1257            Self::NotFound => write!(f, "not found"),
1258            Self::TimedOut => write!(f, "timed out"),
1259        }
1260    }
1261}
1262
1263pub type InfoError = Error<InfoErrorKind>;
1264
1265#[derive(Clone, Copy, Debug, PartialEq)]
1266pub enum GetErrorKind {
1267    InvalidName,
1268    ConsumerCreate,
1269    NotFound,
1270    BucketLink,
1271    Other,
1272    TimedOut,
1273}
1274
1275impl Display for GetErrorKind {
1276    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1277        match self {
1278            Self::ConsumerCreate => write!(f, "failed creating consumer for fetching object"),
1279            Self::Other => write!(f, "failed getting object"),
1280            Self::NotFound => write!(f, "object not found"),
1281            Self::TimedOut => write!(f, "timed out"),
1282            Self::InvalidName => write!(f, "invalid object name"),
1283            Self::BucketLink => write!(f, "object is a link to a bucket"),
1284        }
1285    }
1286}
1287
1288pub type GetError = Error<GetErrorKind>;
1289
1290crate::from_with_timeout!(GetError, GetErrorKind, ConsumerError, ConsumerErrorKind);
1291crate::from_with_timeout!(GetError, GetErrorKind, StreamError, StreamErrorKind);
1292
1293impl From<InfoError> for GetError {
1294    fn from(err: InfoError) -> Self {
1295        match err.kind() {
1296            InfoErrorKind::InvalidName => GetError::new(GetErrorKind::InvalidName),
1297            InfoErrorKind::NotFound => GetError::new(GetErrorKind::NotFound),
1298            InfoErrorKind::Other => GetError::with_source(GetErrorKind::Other, err),
1299            InfoErrorKind::TimedOut => GetError::new(GetErrorKind::TimedOut),
1300        }
1301    }
1302}
1303
1304#[derive(Clone, Copy, Debug, PartialEq)]
1305pub enum DeleteErrorKind {
1306    TimedOut,
1307    NotFound,
1308    Metadata,
1309    InvalidName,
1310    Chunks,
1311    Other,
1312}
1313
1314impl Display for DeleteErrorKind {
1315    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1316        match self {
1317            Self::TimedOut => write!(f, "timed out"),
1318            Self::Metadata => write!(f, "failed rolling up metadata"),
1319            Self::Chunks => write!(f, "failed purging chunks"),
1320            Self::Other => write!(f, "delete failed"),
1321            Self::NotFound => write!(f, "object not found"),
1322            Self::InvalidName => write!(f, "invalid object name"),
1323        }
1324    }
1325}
1326
1327pub type DeleteError = Error<DeleteErrorKind>;
1328
1329impl From<InfoError> for DeleteError {
1330    fn from(err: InfoError) -> Self {
1331        match err.kind() {
1332            InfoErrorKind::InvalidName => DeleteError::new(DeleteErrorKind::InvalidName),
1333            InfoErrorKind::NotFound => DeleteError::new(DeleteErrorKind::NotFound),
1334            InfoErrorKind::Other => DeleteError::with_source(DeleteErrorKind::Other, err),
1335            InfoErrorKind::TimedOut => DeleteError::new(DeleteErrorKind::TimedOut),
1336        }
1337    }
1338}
1339
1340crate::from_with_timeout!(DeleteError, DeleteErrorKind, PublishError, PublishErrorKind);
1341crate::from_with_timeout!(DeleteError, DeleteErrorKind, PurgeError, PurgeErrorKind);
1342
1343#[derive(Clone, Copy, Debug, PartialEq)]
1344pub enum PutErrorKind {
1345    InvalidName,
1346    ReadChunks,
1347    PublishChunks,
1348    PublishMetadata,
1349    PurgeOldChunks,
1350    TimedOut,
1351    Other,
1352}
1353
1354impl Display for PutErrorKind {
1355    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1356        match self {
1357            Self::PublishChunks => write!(f, "failed publishing object chunks"),
1358            Self::PublishMetadata => write!(f, "failed publishing metadata"),
1359            Self::PurgeOldChunks => write!(f, "failed purging old chunks"),
1360            Self::TimedOut => write!(f, "timed out"),
1361            Self::Other => write!(f, "error"),
1362            Self::InvalidName => write!(f, "invalid object name"),
1363            Self::ReadChunks => write!(f, "error while reading the buffer"),
1364        }
1365    }
1366}
1367
1368pub type PutError = Error<PutErrorKind>;
1369
1370pub type AddLinkError = Error<AddLinkErrorKind>;
1371
1372#[derive(Clone, Copy, Debug, PartialEq)]
1373pub enum AddLinkErrorKind {
1374    EmptyName,
1375    ObjectRequired,
1376    Deleted,
1377    LinkToLink,
1378    PublishMetadata,
1379    AlreadyExists,
1380    Other,
1381}
1382
1383impl Display for AddLinkErrorKind {
1384    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1385        match self {
1386            AddLinkErrorKind::ObjectRequired => write!(f, "cannot link to empty Object"),
1387            AddLinkErrorKind::Deleted => write!(f, "cannot link a deleted Object"),
1388            AddLinkErrorKind::LinkToLink => write!(f, "cannot link to another link"),
1389            AddLinkErrorKind::EmptyName => write!(f, "link name cannot be empty"),
1390            AddLinkErrorKind::PublishMetadata => write!(f, "failed publishing link metadata"),
1391            AddLinkErrorKind::Other => write!(f, "error"),
1392            AddLinkErrorKind::AlreadyExists => write!(f, "object already exists"),
1393        }
1394    }
1395}
1396
1397type PublishMetadataError = Error<PublishMetadataErrorKind>;
1398
1399#[derive(Clone, Copy, Debug, PartialEq)]
1400enum PublishMetadataErrorKind {
1401    PublishMetadata,
1402    Other,
1403}
1404
1405impl Display for PublishMetadataErrorKind {
1406    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1407        match self {
1408            PublishMetadataErrorKind::PublishMetadata => write!(f, "failed to publish metadata"),
1409            PublishMetadataErrorKind::Other => write!(f, "error"),
1410        }
1411    }
1412}
1413
1414impl From<PublishMetadataError> for AddLinkError {
1415    fn from(error: PublishMetadataError) -> Self {
1416        match error.kind {
1417            PublishMetadataErrorKind::PublishMetadata => {
1418                AddLinkError::new(AddLinkErrorKind::PublishMetadata)
1419            }
1420            PublishMetadataErrorKind::Other => {
1421                AddLinkError::with_source(AddLinkErrorKind::Other, error)
1422            }
1423        }
1424    }
1425}
1426impl From<PublishMetadataError> for PutError {
1427    fn from(error: PublishMetadataError) -> Self {
1428        match error.kind {
1429            PublishMetadataErrorKind::PublishMetadata => {
1430                PutError::new(PutErrorKind::PublishMetadata)
1431            }
1432            PublishMetadataErrorKind::Other => PutError::with_source(PutErrorKind::Other, error),
1433        }
1434    }
1435}
1436
1437#[derive(Clone, Copy, Debug, PartialEq)]
1438pub enum WatchErrorKind {
1439    TimedOut,
1440    ConsumerCreate,
1441    Other,
1442}
1443
1444impl Display for WatchErrorKind {
1445    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1446        match self {
1447            Self::ConsumerCreate => write!(f, "watch consumer creation failed"),
1448            Self::Other => write!(f, "watch failed"),
1449            Self::TimedOut => write!(f, "timed out"),
1450        }
1451    }
1452}
1453
1454pub type WatchError = Error<WatchErrorKind>;
1455
1456crate::from_with_timeout!(WatchError, WatchErrorKind, ConsumerError, ConsumerErrorKind);
1457crate::from_with_timeout!(WatchError, WatchErrorKind, StreamError, StreamErrorKind);
1458
1459pub type ListError = WatchError;
1460pub type ListErrorKind = WatchErrorKind;
1461
1462#[derive(Clone, Copy, Debug, PartialEq)]
1463pub enum SealErrorKind {
1464    TimedOut,
1465    Other,
1466    Info,
1467    Update,
1468}
1469
1470impl Display for SealErrorKind {
1471    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1472        match self {
1473            Self::TimedOut => write!(f, "timed out"),
1474            Self::Other => write!(f, "seal failed"),
1475            Self::Info => write!(f, "failed getting stream info before sealing bucket"),
1476            Self::Update => write!(f, "failed sealing the bucket"),
1477        }
1478    }
1479}
1480
1481pub type SealError = Error<SealErrorKind>;
1482
1483impl From<super::context::UpdateStreamError> for SealError {
1484    fn from(err: super::context::UpdateStreamError) -> Self {
1485        match err.kind() {
1486            super::context::CreateStreamErrorKind::TimedOut => {
1487                SealError::new(SealErrorKind::TimedOut)
1488            }
1489            _ => SealError::with_source(SealErrorKind::Update, err),
1490        }
1491    }
1492}
1493
1494#[derive(Clone, Copy, Debug, PartialEq)]
1495pub enum WatcherErrorKind {
1496    ConsumerError,
1497    Other,
1498}
1499
1500impl Display for WatcherErrorKind {
1501    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1502        match self {
1503            Self::ConsumerError => write!(f, "watcher consumer error"),
1504            Self::Other => write!(f, "watcher error"),
1505        }
1506    }
1507}
1508
1509pub type WatcherError = Error<WatcherErrorKind>;
1510
1511impl From<OrderedError> for WatcherError {
1512    fn from(err: OrderedError) -> Self {
1513        WatcherError::with_source(WatcherErrorKind::ConsumerError, err)
1514    }
1515}
1516
1517pub type ListerError = WatcherError;
1518pub type ListerErrorKind = WatcherErrorKind;