async_nats/jetstream/object_store/
mod.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14//! Object Store module
15use std::collections::{HashMap, VecDeque};
16use std::fmt::Display;
17use std::{cmp, str::FromStr, task::Poll, time::Duration};
18
19use crate::crypto::Sha256;
20use crate::subject::Subject;
21use crate::{HeaderMap, HeaderValue};
22use base64::engine::general_purpose::URL_SAFE;
23use base64::engine::Engine;
24use bytes::BytesMut;
25use futures::future::BoxFuture;
26use once_cell::sync::Lazy;
27use tokio::io::AsyncReadExt;
28
29use futures::{Stream, StreamExt};
30use regex::Regex;
31use serde::{Deserialize, Serialize};
32use tracing::{debug, trace};
33
34use super::consumer::push::{OrderedConfig, OrderedError};
35use super::consumer::{DeliverPolicy, StreamError, StreamErrorKind};
36use super::context::{PublishError, PublishErrorKind};
37use super::stream::{self, ConsumerError, ConsumerErrorKind, PurgeError, PurgeErrorKind};
38use super::{consumer::push::Ordered, stream::StorageType};
39use crate::error::Error;
40use time::{serde::rfc3339, OffsetDateTime};
41
42const DEFAULT_CHUNK_SIZE: usize = 128 * 1024;
43const NATS_ROLLUP: &str = "Nats-Rollup";
44const ROLLUP_SUBJECT: &str = "sub";
45
46static BUCKET_NAME_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\A[a-zA-Z0-9_-]+\z").unwrap());
47static OBJECT_NAME_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\A[-/_=\.a-zA-Z0-9]+\z").unwrap());
48
49pub(crate) fn is_valid_bucket_name(bucket_name: &str) -> bool {
50    BUCKET_NAME_RE.is_match(bucket_name)
51}
52
53pub(crate) fn is_valid_object_name(object_name: &str) -> bool {
54    if object_name.is_empty() || object_name.starts_with('.') || object_name.ends_with('.') {
55        return false;
56    }
57
58    OBJECT_NAME_RE.is_match(object_name)
59}
60
61pub(crate) fn encode_object_name(object_name: &str) -> String {
62    URL_SAFE.encode(object_name)
63}
64
65/// Configuration values for object store buckets.
66#[derive(Debug, Default, Clone, Serialize, Deserialize)]
67pub struct Config {
68    /// Name of the storage bucket.
69    pub bucket: String,
70    /// A short description of the purpose of this storage bucket.
71    pub description: Option<String>,
72    /// Maximum age of any value in the bucket, expressed in nanoseconds
73    #[serde(default, with = "serde_nanos")]
74    pub max_age: Duration,
75    /// How large the storage bucket may become in total bytes.
76    pub max_bytes: i64,
77    /// The type of storage backend, `File` (default) and `Memory`
78    pub storage: StorageType,
79    /// How many replicas to keep for each value in a cluster, maximum 5.
80    pub num_replicas: usize,
81    /// Sets compression of the underlying stream.
82    pub compression: bool,
83    // Cluster and tag placement.
84    pub placement: Option<stream::Placement>,
85}
86
87/// A blob store capable of storing large objects efficiently in streams.
88#[derive(Clone)]
89pub struct ObjectStore {
90    pub(crate) name: String,
91    pub(crate) stream: crate::jetstream::stream::Stream,
92}
93
94impl ObjectStore {
95    /// Gets an [Object] from the [ObjectStore].
96    ///
97    /// [Object] implements [tokio::io::AsyncRead] that allows
98    /// to read the data from Object Store.
99    ///
100    /// # Examples
101    ///
102    /// ```no_run
103    /// # #[tokio::main]
104    /// # async fn main() -> Result<(), async_nats::Error> {
105    /// use tokio::io::AsyncReadExt;
106    /// let client = async_nats::connect("demo.nats.io").await?;
107    /// let jetstream = async_nats::jetstream::new(client);
108    ///
109    /// let bucket = jetstream.get_object_store("store").await?;
110    /// let mut object = bucket.get("FOO").await?;
111    ///
112    /// // Object implements `tokio::io::AsyncRead`.
113    /// let mut bytes = vec![];
114    /// object.read_to_end(&mut bytes).await?;
115    /// # Ok(())
116    /// # }
117    /// ```
118    pub async fn get<T: AsRef<str> + Send>(&self, object_name: T) -> Result<Object, GetError> {
119        self.get_impl(object_name).await
120    }
121
122    fn get_impl<'bucket, 'future, T>(
123        &'bucket self,
124        object_name: T,
125    ) -> BoxFuture<'future, Result<Object, GetError>>
126    where
127        T: AsRef<str> + Send + 'future,
128        'bucket: 'future,
129    {
130        Box::pin(async move {
131            let object_info = self.info(object_name).await?;
132            if let Some(ref options) = object_info.options {
133                if let Some(link) = options.link.as_ref() {
134                    if let Some(link_name) = link.name.as_ref() {
135                        let link_name = link_name.clone();
136                        debug!("getting object via link");
137                        if link.bucket == self.name {
138                            return self.get_impl(link_name).await;
139                        } else {
140                            let bucket = self
141                                .stream
142                                .context
143                                .get_object_store(&link.bucket)
144                                .await
145                                .map_err(|err| {
146                                GetError::with_source(GetErrorKind::Other, err)
147                            })?;
148                            let object = bucket.get_impl(&link_name).await?;
149                            return Ok(object);
150                        }
151                    } else {
152                        return Err(GetError::new(GetErrorKind::BucketLink));
153                    }
154                }
155            }
156
157            debug!("not a link. Getting the object");
158            Ok(Object::new(object_info, self.stream.clone()))
159        })
160    }
161
162    /// Deletes an [Object] from the [ObjectStore].
163    ///
164    /// # Examples
165    ///
166    /// ```no_run
167    /// # #[tokio::main]
168    /// # async fn main() -> Result<(), async_nats::Error> {
169    /// let client = async_nats::connect("demo.nats.io").await?;
170    /// let jetstream = async_nats::jetstream::new(client);
171    ///
172    /// let bucket = jetstream.get_object_store("store").await?;
173    /// bucket.delete("FOO").await?;
174    /// # Ok(())
175    /// # }
176    /// ```
177    pub async fn delete<T: AsRef<str>>(&self, object_name: T) -> Result<(), DeleteError> {
178        let object_name = object_name.as_ref();
179        let mut object_info = self.info(object_name).await?;
180        object_info.chunks = 0;
181        object_info.size = 0;
182        object_info.deleted = true;
183
184        let data = serde_json::to_vec(&object_info).map_err(|err| {
185            DeleteError::with_source(
186                DeleteErrorKind::Other,
187                format!("failed deserializing object info: {}", err),
188            )
189        })?;
190
191        let mut headers = HeaderMap::default();
192        headers.insert(
193            NATS_ROLLUP,
194            HeaderValue::from_str(ROLLUP_SUBJECT).map_err(|err| {
195                DeleteError::with_source(
196                    DeleteErrorKind::Other,
197                    format!("failed parsing header: {}", err),
198                )
199            })?,
200        );
201
202        let subject = format!("$O.{}.M.{}", &self.name, encode_object_name(object_name));
203
204        self.stream
205            .context
206            .publish_with_headers(subject, headers, data.into())
207            .await?
208            .await?;
209
210        let chunk_subject = format!("$O.{}.C.{}", self.name, object_info.nuid);
211
212        self.stream.purge().filter(&chunk_subject).await?;
213
214        Ok(())
215    }
216
217    /// Retrieves [Object] [ObjectInfo].
218    ///
219    /// # Examples
220    ///
221    /// ```no_run
222    /// # #[tokio::main]
223    /// # async fn main() -> Result<(), async_nats::Error> {
224    /// let client = async_nats::connect("demo.nats.io").await?;
225    /// let jetstream = async_nats::jetstream::new(client);
226    ///
227    /// let bucket = jetstream.get_object_store("store").await?;
228    /// let info = bucket.info("FOO").await?;
229    /// # Ok(())
230    /// # }
231    /// ```
232    pub async fn info<T: AsRef<str>>(&self, object_name: T) -> Result<ObjectInfo, InfoError> {
233        let object_name = object_name.as_ref();
234        let object_name = encode_object_name(object_name);
235        if !is_valid_object_name(&object_name) {
236            return Err(InfoError::new(InfoErrorKind::InvalidName));
237        }
238
239        // Grab last meta value we have.
240        let subject = format!("$O.{}.M.{}", &self.name, &object_name);
241
242        // FIXME(jrm): we should use direct get here when possible.
243        let message = self
244            .stream
245            .get_last_raw_message_by_subject(subject.as_str())
246            .await
247            .map_err(|err| match err.kind() {
248                stream::LastRawMessageErrorKind::NoMessageFound => {
249                    InfoError::new(InfoErrorKind::NotFound)
250                }
251                _ => InfoError::with_source(InfoErrorKind::Other, err),
252            })?;
253        let object_info =
254            serde_json::from_slice::<ObjectInfo>(&message.payload).map_err(|err| {
255                InfoError::with_source(
256                    InfoErrorKind::Other,
257                    format!("failed to decode info payload: {}", err),
258                )
259            })?;
260
261        Ok(object_info)
262    }
263
264    /// Puts an [Object] into the [ObjectStore].
265    /// This method implements `tokio::io::AsyncRead`.
266    ///
267    /// # Examples
268    ///
269    /// ```no_run
270    /// # #[tokio::main]
271    /// # async fn main() -> Result<(), async_nats::Error> {
272    /// let client = async_nats::connect("demo.nats.io").await?;
273    /// let jetstream = async_nats::jetstream::new(client);
274    ///
275    /// let bucket = jetstream.get_object_store("store").await?;
276    /// let mut file = tokio::fs::File::open("foo.txt").await?;
277    /// bucket.put("file", &mut file).await.unwrap();
278    /// # Ok(())
279    /// # }
280    /// ```
281    pub async fn put<T>(
282        &self,
283        meta: T,
284        data: &mut (impl tokio::io::AsyncRead + std::marker::Unpin),
285    ) -> Result<ObjectInfo, PutError>
286    where
287        ObjectMetadata: From<T>,
288    {
289        let object_meta: ObjectMetadata = meta.into();
290
291        // Fetch any existing object info, if there is any for later use.
292        let maybe_existing_object_info = (self.info(&object_meta.name).await).ok();
293
294        let object_nuid = nuid::next();
295        let chunk_subject = Subject::from(format!("$O.{}.C.{}", &self.name, &object_nuid));
296
297        let mut object_chunks = 0;
298        let mut object_size = 0;
299
300        let chunk_size = object_meta.chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE);
301        let mut buffer = BytesMut::with_capacity(chunk_size);
302        let mut sha256 = Sha256::new();
303
304        loop {
305            let n = data
306                .read_buf(&mut buffer)
307                .await
308                .map_err(|err| PutError::with_source(PutErrorKind::ReadChunks, err))?;
309
310            if n == 0 {
311                break;
312            }
313
314            let payload = buffer.split().freeze();
315            sha256.update(&payload);
316
317            object_size += payload.len();
318            object_chunks += 1;
319
320            self.stream
321                .context
322                .publish(chunk_subject.clone(), payload)
323                .await
324                .map_err(|err| {
325                    PutError::with_source(
326                        PutErrorKind::PublishChunks,
327                        format!("failed chunk publish: {}", err),
328                    )
329                })?
330                .await
331                .map_err(|err| {
332                    PutError::with_source(
333                        PutErrorKind::PublishChunks,
334                        format!("failed getting chunk ack: {}", err),
335                    )
336                })?;
337        }
338        let digest = sha256.finish();
339
340        let encoded_object_name = encode_object_name(&object_meta.name);
341        if !is_valid_object_name(&encoded_object_name) {
342            return Err(PutError::new(PutErrorKind::InvalidName));
343        }
344        let subject = format!("$O.{}.M.{}", &self.name, &encoded_object_name);
345
346        let object_info = ObjectInfo {
347            name: object_meta.name,
348            description: object_meta.description,
349            options: Some(ObjectOptions {
350                max_chunk_size: Some(chunk_size),
351                link: None,
352            }),
353            bucket: self.name.clone(),
354            nuid: object_nuid.to_string(),
355            chunks: object_chunks,
356            size: object_size,
357            digest: Some(format!("SHA-256={}", URL_SAFE.encode(digest))),
358            modified: Some(OffsetDateTime::now_utc()),
359            deleted: false,
360            metadata: object_meta.metadata,
361            headers: object_meta.headers,
362        };
363
364        let mut headers = HeaderMap::new();
365        headers.insert(
366            NATS_ROLLUP,
367            ROLLUP_SUBJECT.parse::<HeaderValue>().map_err(|err| {
368                PutError::with_source(
369                    PutErrorKind::Other,
370                    format!("failed parsing header: {}", err),
371                )
372            })?,
373        );
374        let data = serde_json::to_vec(&object_info).map_err(|err| {
375            PutError::with_source(
376                PutErrorKind::Other,
377                format!("failed serializing object info: {}", err),
378            )
379        })?;
380
381        // publish meta.
382        self.stream
383            .context
384            .publish_with_headers(subject, headers, data.into())
385            .await
386            .map_err(|err| {
387                PutError::with_source(
388                    PutErrorKind::PublishMetadata,
389                    format!("failed publishing metadata: {}", err),
390                )
391            })?
392            .await
393            .map_err(|err| {
394                PutError::with_source(
395                    PutErrorKind::PublishMetadata,
396                    format!("failed ack from metadata publish: {}", err),
397                )
398            })?;
399
400        // Purge any old chunks.
401        if let Some(existing_object_info) = maybe_existing_object_info {
402            let chunk_subject = format!("$O.{}.C.{}", &self.name, &existing_object_info.nuid);
403
404            self.stream
405                .purge()
406                .filter(&chunk_subject)
407                .await
408                .map_err(|err| PutError::with_source(PutErrorKind::PurgeOldChunks, err))?;
409        }
410
411        Ok(object_info)
412    }
413
414    /// Creates a [Watch] stream over changes in the [ObjectStore].
415    ///
416    /// # Examples
417    ///
418    /// ```no_run
419    /// # #[tokio::main]
420    /// # async fn main() -> Result<(), async_nats::Error> {
421    /// use futures::StreamExt;
422    /// let client = async_nats::connect("demo.nats.io").await?;
423    /// let jetstream = async_nats::jetstream::new(client);
424    ///
425    /// let bucket = jetstream.get_object_store("store").await?;
426    /// let mut watcher = bucket.watch().await.unwrap();
427    /// while let Some(object) = watcher.next().await {
428    ///     println!("detected changes in {:?}", object?);
429    /// }
430    /// # Ok(())
431    /// # }
432    /// ```
433    pub async fn watch(&self) -> Result<Watch, WatchError> {
434        self.watch_with_deliver_policy(DeliverPolicy::New).await
435    }
436
437    /// Creates a [Watch] stream over changes in the [ObjectStore] which yields values whenever
438    /// there are changes for that key with as well as last value.
439    pub async fn watch_with_history(&self) -> Result<Watch, WatchError> {
440        self.watch_with_deliver_policy(DeliverPolicy::LastPerSubject)
441            .await
442    }
443
444    async fn watch_with_deliver_policy(
445        &self,
446        deliver_policy: DeliverPolicy,
447    ) -> Result<Watch, WatchError> {
448        let subject = format!("$O.{}.M.>", self.name);
449        let ordered = self
450            .stream
451            .create_consumer(crate::jetstream::consumer::push::OrderedConfig {
452                deliver_policy,
453                deliver_subject: self.stream.context.client.new_inbox(),
454                description: Some("object store watcher".to_string()),
455                filter_subject: subject,
456                ..Default::default()
457            })
458            .await?;
459        Ok(Watch {
460            subscription: ordered.messages().await?,
461        })
462    }
463
464    /// Returns a [List] stream with all not deleted [Objects][Object] in the [ObjectStore].
465    ///
466    /// # Examples
467    ///
468    /// ```no_run
469    /// # #[tokio::main]
470    /// # async fn main() -> Result<(), async_nats::Error> {
471    /// use futures::StreamExt;
472    /// let client = async_nats::connect("demo.nats.io").await?;
473    /// let jetstream = async_nats::jetstream::new(client);
474    ///
475    /// let bucket = jetstream.get_object_store("store").await?;
476    /// let mut list = bucket.list().await.unwrap();
477    /// while let Some(object) = list.next().await {
478    ///     println!("object {:?}", object?);
479    /// }
480    /// # Ok(())
481    /// # }
482    /// ```
483    pub async fn list(&self) -> Result<List, ListError> {
484        trace!("starting Object List");
485        let subject = format!("$O.{}.M.>", self.name);
486        let ordered = self
487            .stream
488            .create_consumer(crate::jetstream::consumer::push::OrderedConfig {
489                deliver_policy: super::consumer::DeliverPolicy::All,
490                deliver_subject: self.stream.context.client.new_inbox(),
491                description: Some("object store list".to_string()),
492                filter_subject: subject,
493                ..Default::default()
494            })
495            .await?;
496        Ok(List {
497            done: ordered.info.num_pending == 0,
498            subscription: Some(ordered.messages().await?),
499        })
500    }
501
502    /// Seals a [ObjectStore], preventing any further changes to it or its [Objects][Object].
503    ///
504    /// # Examples
505    ///
506    /// ```no_run
507    /// # #[tokio::main]
508    /// # async fn main() -> Result<(), async_nats::Error> {
509    /// use futures::StreamExt;
510    /// let client = async_nats::connect("demo.nats.io").await?;
511    /// let jetstream = async_nats::jetstream::new(client);
512    ///
513    /// let mut bucket = jetstream.get_object_store("store").await?;
514    /// bucket.seal().await.unwrap();
515    /// # Ok(())
516    /// # }
517    /// ```
518    pub async fn seal(&mut self) -> Result<(), SealError> {
519        let mut stream_config = self
520            .stream
521            .info()
522            .await
523            .map_err(|err| SealError::with_source(SealErrorKind::Info, err))?
524            .to_owned();
525        stream_config.config.sealed = true;
526
527        self.stream
528            .context
529            .update_stream(&stream_config.config)
530            .await?;
531        Ok(())
532    }
533
534    /// Updates [Object] [ObjectMetadata].
535    ///
536    /// # Examples
537    ///
538    /// ```no_run
539    /// # #[tokio::main]
540    /// # async fn main() -> Result<(), async_nats::Error> {
541    /// use async_nats::jetstream::object_store;
542    /// let client = async_nats::connect("demo.nats.io").await?;
543    /// let jetstream = async_nats::jetstream::new(client);
544    ///
545    /// let mut bucket = jetstream.get_object_store("store").await?;
546    /// bucket
547    ///     .update_metadata(
548    ///         "object",
549    ///         object_store::UpdateMetadata {
550    ///             name: "new_name".to_string(),
551    ///             description: Some("a new description".to_string()),
552    ///             ..Default::default()
553    ///         },
554    ///     )
555    ///     .await?;
556    /// # Ok(())
557    /// # }
558    /// ```
559    pub async fn update_metadata<A: AsRef<str>>(
560        &self,
561        object: A,
562        metadata: UpdateMetadata,
563    ) -> Result<ObjectInfo, UpdateMetadataError> {
564        let mut info = self.info(object.as_ref()).await?;
565
566        // If name is being update, we need to check if other metadata with it already exists.
567        // If does, error. Otherwise, purge old name metadata.
568        if metadata.name != info.name {
569            tracing::info!("new metadata name is different than then old one");
570            if !is_valid_object_name(&metadata.name) {
571                return Err(UpdateMetadataError::new(
572                    UpdateMetadataErrorKind::InvalidName,
573                ));
574            }
575            match self.info(&metadata.name).await {
576                Ok(_) => {
577                    return Err(UpdateMetadataError::new(
578                        UpdateMetadataErrorKind::NameAlreadyInUse,
579                    ))
580                }
581                Err(err) => match err.kind() {
582                    InfoErrorKind::NotFound => {
583                        tracing::info!("purging old metadata: {}", info.name);
584                        self.stream
585                            .purge()
586                            .filter(format!(
587                                "$O.{}.M.{}",
588                                self.name,
589                                encode_object_name(&info.name)
590                            ))
591                            .await
592                            .map_err(|err| {
593                                UpdateMetadataError::with_source(
594                                    UpdateMetadataErrorKind::Purge,
595                                    err,
596                                )
597                            })?;
598                    }
599                    _ => {
600                        return Err(UpdateMetadataError::with_source(
601                            UpdateMetadataErrorKind::Other,
602                            err,
603                        ))
604                    }
605                },
606            }
607        }
608
609        info.name = metadata.name;
610        info.description = metadata.description;
611
612        let name = encode_object_name(&info.name);
613        let subject = format!("$O.{}.M.{}", &self.name, &name);
614
615        let mut headers = HeaderMap::new();
616        headers.insert(
617            NATS_ROLLUP,
618            ROLLUP_SUBJECT.parse::<HeaderValue>().map_err(|err| {
619                UpdateMetadataError::with_source(
620                    UpdateMetadataErrorKind::Other,
621                    format!("failed parsing header: {}", err),
622                )
623            })?,
624        );
625        let data = serde_json::to_vec(&info).map_err(|err| {
626            UpdateMetadataError::with_source(
627                UpdateMetadataErrorKind::Other,
628                format!("failed serializing object info: {}", err),
629            )
630        })?;
631
632        // publish meta.
633        self.stream
634            .context
635            .publish_with_headers(subject, headers, data.into())
636            .await
637            .map_err(|err| {
638                UpdateMetadataError::with_source(
639                    UpdateMetadataErrorKind::PublishMetadata,
640                    format!("failed publishing metadata: {}", err),
641                )
642            })?
643            .await
644            .map_err(|err| {
645                UpdateMetadataError::with_source(
646                    UpdateMetadataErrorKind::PublishMetadata,
647                    format!("failed ack from metadata publish: {}", err),
648                )
649            })?;
650
651        Ok(info)
652    }
653
654    /// Adds a link to an [Object].
655    /// It creates a new [Object] in the [ObjectStore] that points to another [Object]
656    /// and does not have any contents on it's own.
657    /// Links are automatically followed (one level deep) when calling [ObjectStore::get].
658    ///
659    /// # Examples
660    ///
661    /// ```no_run
662    /// # #[tokio::main]
663    /// # async fn main() -> Result<(), async_nats::Error> {
664    /// use async_nats::jetstream::object_store;
665    /// let client = async_nats::connect("demo.nats.io").await?;
666    /// let jetstream = async_nats::jetstream::new(client);
667    /// let bucket = jetstream.get_object_store("bucket").await?;
668    /// let object = bucket.get("object").await?;
669    /// bucket.add_link("link_to_object", &object).await?;
670    /// # Ok(())
671    /// # }
672    /// ```
673    pub async fn add_link<T, O>(&self, name: T, object: O) -> Result<ObjectInfo, AddLinkError>
674    where
675        T: ToString,
676        O: AsObjectInfo,
677    {
678        let object = object.as_info();
679        let name = name.to_string();
680        if name.is_empty() {
681            return Err(AddLinkError::new(AddLinkErrorKind::EmptyName));
682        }
683        if object.name.is_empty() {
684            return Err(AddLinkError::new(AddLinkErrorKind::ObjectRequired));
685        }
686        if object.deleted {
687            return Err(AddLinkError::new(AddLinkErrorKind::Deleted));
688        }
689        if let Some(ref options) = object.options {
690            if options.link.is_some() {
691                return Err(AddLinkError::new(AddLinkErrorKind::LinkToLink));
692            }
693        }
694        match self.info(&name).await {
695            Ok(info) => {
696                if let Some(options) = info.options {
697                    if options.link.is_none() {
698                        return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists));
699                    }
700                } else {
701                    return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists));
702                }
703            }
704            Err(err) if err.kind() != InfoErrorKind::NotFound => {
705                return Err(AddLinkError::with_source(AddLinkErrorKind::Other, err))
706            }
707            _ => (),
708        }
709
710        let info = ObjectInfo {
711            name,
712            description: None,
713            options: Some(ObjectOptions {
714                link: Some(ObjectLink {
715                    name: Some(object.name.clone()),
716                    bucket: object.bucket.clone(),
717                }),
718                max_chunk_size: None,
719            }),
720            bucket: self.name.clone(),
721            nuid: nuid::next().to_string(),
722            size: 0,
723            chunks: 0,
724            modified: Some(OffsetDateTime::now_utc()),
725            digest: None,
726            deleted: false,
727            metadata: HashMap::default(),
728            headers: None,
729        };
730        publish_meta(self, &info).await?;
731        Ok(info)
732    }
733
734    /// Adds a link to another [ObjectStore] bucket by creating a new [Object]
735    /// in the current [ObjectStore] that points to another [ObjectStore] and
736    /// does not contain any data.
737    ///
738    /// # Examples
739    ///
740    /// ```no_run
741    /// # #[tokio::main]
742    /// # async fn main() -> Result<(), async_nats::Error> {
743    /// use async_nats::jetstream::object_store;
744    /// let client = async_nats::connect("demo.nats.io").await?;
745    /// let jetstream = async_nats::jetstream::new(client);
746    /// let bucket = jetstream.get_object_store("bucket").await?;
747    /// bucket
748    ///     .add_bucket_link("link_to_object", "another_bucket")
749    ///     .await?;
750    /// # Ok(())
751    /// # }
752    /// ```
753    pub async fn add_bucket_link<T: ToString, U: ToString>(
754        &self,
755        name: T,
756        bucket: U,
757    ) -> Result<ObjectInfo, AddLinkError> {
758        let name = name.to_string();
759        let bucket = bucket.to_string();
760        if name.is_empty() {
761            return Err(AddLinkError::new(AddLinkErrorKind::EmptyName));
762        }
763
764        match self.info(&name).await {
765            Ok(info) => {
766                if let Some(options) = info.options {
767                    if options.link.is_none() {
768                        return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists));
769                    }
770                }
771            }
772            Err(err) if err.kind() != InfoErrorKind::NotFound => {
773                return Err(AddLinkError::with_source(AddLinkErrorKind::Other, err))
774            }
775            _ => (),
776        }
777
778        let info = ObjectInfo {
779            name: name.clone(),
780            description: None,
781            options: Some(ObjectOptions {
782                link: Some(ObjectLink { name: None, bucket }),
783                max_chunk_size: None,
784            }),
785            bucket: self.name.clone(),
786            nuid: nuid::next().to_string(),
787            size: 0,
788            chunks: 0,
789            modified: Some(OffsetDateTime::now_utc()),
790            digest: None,
791            deleted: false,
792            metadata: HashMap::default(),
793            headers: None,
794        };
795        publish_meta(self, &info).await?;
796        Ok(info)
797    }
798}
799
800async fn publish_meta(store: &ObjectStore, info: &ObjectInfo) -> Result<(), PublishMetadataError> {
801    let encoded_object_name = encode_object_name(&info.name);
802    let subject = format!("$O.{}.M.{}", &store.name, &encoded_object_name);
803
804    let mut headers = HeaderMap::new();
805    headers.insert(
806        NATS_ROLLUP,
807        ROLLUP_SUBJECT.parse::<HeaderValue>().map_err(|err| {
808            PublishMetadataError::with_source(
809                PublishMetadataErrorKind::Other,
810                format!("failed parsing header: {}", err),
811            )
812        })?,
813    );
814    let data = serde_json::to_vec(&info).map_err(|err| {
815        PublishMetadataError::with_source(
816            PublishMetadataErrorKind::Other,
817            format!("failed serializing object info: {}", err),
818        )
819    })?;
820
821    store
822        .stream
823        .context
824        .publish_with_headers(subject, headers, data.into())
825        .await
826        .map_err(|err| {
827            PublishMetadataError::with_source(
828                PublishMetadataErrorKind::PublishMetadata,
829                format!("failed publishing metadata: {}", err),
830            )
831        })?
832        .await
833        .map_err(|err| {
834            PublishMetadataError::with_source(
835                PublishMetadataErrorKind::PublishMetadata,
836                format!("failed ack from metadata publish: {}", err),
837            )
838        })?;
839    Ok(())
840}
841
842pub struct Watch {
843    subscription: crate::jetstream::consumer::push::Ordered,
844}
845
846impl Stream for Watch {
847    type Item = Result<ObjectInfo, WatcherError>;
848
849    fn poll_next(
850        mut self: std::pin::Pin<&mut Self>,
851        cx: &mut std::task::Context<'_>,
852    ) -> Poll<Option<Self::Item>> {
853        match self.subscription.poll_next_unpin(cx) {
854            Poll::Ready(message) => match message {
855                Some(message) => Poll::Ready(
856                    serde_json::from_slice::<ObjectInfo>(&message?.payload)
857                        .map_err(|err| {
858                            WatcherError::with_source(
859                                WatcherErrorKind::Other,
860                                format!("failed to deserialize object info: {}", err),
861                            )
862                        })
863                        .map_or_else(|err| Some(Err(err)), |result| Some(Ok(result))),
864                ),
865                None => Poll::Ready(None),
866            },
867            Poll::Pending => Poll::Pending,
868        }
869    }
870}
871
872pub struct List {
873    subscription: Option<crate::jetstream::consumer::push::Ordered>,
874    done: bool,
875}
876
877impl Stream for List {
878    type Item = Result<ObjectInfo, ListerError>;
879
880    fn poll_next(
881        mut self: std::pin::Pin<&mut Self>,
882        cx: &mut std::task::Context<'_>,
883    ) -> Poll<Option<Self::Item>> {
884        loop {
885            if self.done {
886                debug!("Object Store list done");
887                self.subscription = None;
888                return Poll::Ready(None);
889            }
890
891            if let Some(subscription) = self.subscription.as_mut() {
892                match subscription.poll_next_unpin(cx) {
893                    Poll::Ready(message) => match message {
894                        None => return Poll::Ready(None),
895                        Some(message) => {
896                            let message = message?;
897                            let info = message.info().map_err(|err| {
898                                ListerError::with_source(ListerErrorKind::Other, err)
899                            })?;
900                            trace!("num pending: {}", info.pending);
901                            if info.pending == 0 {
902                                self.done = true;
903                            }
904                            let response: ObjectInfo = serde_json::from_slice(&message.payload)
905                                .map_err(|err| {
906                                    ListerError::with_source(
907                                        ListerErrorKind::Other,
908                                        format!("failed deserializing object info: {}", err),
909                                    )
910                                })?;
911                            if response.deleted {
912                                continue;
913                            }
914                            return Poll::Ready(Some(Ok(response)));
915                        }
916                    },
917                    Poll::Pending => return Poll::Pending,
918                }
919            } else {
920                return Poll::Ready(None);
921            }
922        }
923    }
924}
925
926/// Represents an object stored in a bucket.
927pub struct Object {
928    pub info: ObjectInfo,
929    remaining_bytes: VecDeque<u8>,
930    has_pending_messages: bool,
931    digest: Option<Sha256>,
932    subscription: Option<crate::jetstream::consumer::push::Ordered>,
933    subscription_future: Option<BoxFuture<'static, Result<Ordered, StreamError>>>,
934    stream: crate::jetstream::stream::Stream,
935}
936
937impl Object {
938    pub(crate) fn new(info: ObjectInfo, stream: stream::Stream) -> Self {
939        Object {
940            subscription: None,
941            info,
942            remaining_bytes: VecDeque::new(),
943            has_pending_messages: true,
944            digest: Some(Sha256::new()),
945            subscription_future: None,
946            stream,
947        }
948    }
949
950    /// Returns information about the object.
951    pub fn info(&self) -> &ObjectInfo {
952        &self.info
953    }
954}
955
956impl tokio::io::AsyncRead for Object {
957    fn poll_read(
958        mut self: std::pin::Pin<&mut Self>,
959        cx: &mut std::task::Context<'_>,
960        buf: &mut tokio::io::ReadBuf<'_>,
961    ) -> std::task::Poll<std::io::Result<()>> {
962        let (buf1, _buf2) = self.remaining_bytes.as_slices();
963        if !buf1.is_empty() {
964            let len = cmp::min(buf.remaining(), buf1.len());
965            buf.put_slice(&buf1[..len]);
966            self.remaining_bytes.drain(..len);
967            return Poll::Ready(Ok(()));
968        }
969
970        if self.has_pending_messages {
971            if self.subscription.is_none() {
972                let future = match self.subscription_future.as_mut() {
973                    Some(future) => future,
974                    None => {
975                        let stream = self.stream.clone();
976                        let bucket = self.info.bucket.clone();
977                        let nuid = self.info.nuid.clone();
978                        self.subscription_future.insert(Box::pin(async move {
979                            stream
980                                .create_consumer(OrderedConfig {
981                                    deliver_subject: stream.context.client.new_inbox(),
982                                    filter_subject: format!("$O.{}.C.{}", bucket, nuid),
983                                    ..Default::default()
984                                })
985                                .await
986                                .unwrap()
987                                .messages()
988                                .await
989                        }))
990                    }
991                };
992                match future.as_mut().poll(cx) {
993                    Poll::Ready(subscription) => {
994                        self.subscription = Some(subscription.unwrap());
995                    }
996                    Poll::Pending => (),
997                }
998            }
999            if let Some(subscription) = self.subscription.as_mut() {
1000                match subscription.poll_next_unpin(cx) {
1001                    Poll::Ready(message) => match message {
1002                        Some(message) => {
1003                            let message = message.map_err(|err| {
1004                                std::io::Error::new(
1005                                    std::io::ErrorKind::Other,
1006                                    format!("error from JetStream subscription: {err}"),
1007                                )
1008                            })?;
1009                            let len = cmp::min(buf.remaining(), message.payload.len());
1010                            buf.put_slice(&message.payload[..len]);
1011                            if let Some(context) = &mut self.digest {
1012                                context.update(&message.payload);
1013                            }
1014                            self.remaining_bytes.extend(&message.payload[len..]);
1015
1016                            let info = message.info().map_err(|err| {
1017                                std::io::Error::new(
1018                                    std::io::ErrorKind::Other,
1019                                    format!("error from JetStream subscription: {err}"),
1020                                )
1021                            })?;
1022                            if info.pending == 0 {
1023                                let digest = self.digest.take().map(Sha256::finish);
1024                                if let Some(digest) = digest {
1025                                    if self
1026                                        .info
1027                                        .digest
1028                                        .as_ref()
1029                                        .map(|digest_self| {
1030                                            format!("SHA-256={}", URL_SAFE.encode(digest))
1031                                                != *digest_self
1032                                        })
1033                                        .unwrap_or(false)
1034                                    {
1035                                        return Poll::Ready(Err(std::io::Error::new(
1036                                            std::io::ErrorKind::InvalidData,
1037                                            "wrong digest",
1038                                        )));
1039                                    }
1040                                } else {
1041                                    return Poll::Ready(Err(std::io::Error::new(
1042                                        std::io::ErrorKind::InvalidData,
1043                                        "digest should be Some",
1044                                    )));
1045                                }
1046                                self.has_pending_messages = false;
1047                                self.subscription = None;
1048                            }
1049                            Poll::Ready(Ok(()))
1050                        }
1051                        None => Poll::Ready(Err(std::io::Error::new(
1052                            std::io::ErrorKind::Other,
1053                            "subscription ended before reading whole object",
1054                        ))),
1055                    },
1056                    Poll::Pending => Poll::Pending,
1057                }
1058            } else {
1059                Poll::Pending
1060            }
1061        } else {
1062            Poll::Ready(Ok(()))
1063        }
1064    }
1065}
1066
1067#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
1068pub struct ObjectOptions {
1069    pub link: Option<ObjectLink>,
1070    pub max_chunk_size: Option<usize>,
1071}
1072
1073/// Meta and instance information about an object.
1074#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
1075pub struct ObjectInfo {
1076    /// Name of the object
1077    pub name: String,
1078    /// A short human readable description of the object.
1079    #[serde(default)]
1080    pub description: Option<String>,
1081    /// Metadata for given object.
1082    #[serde(default)]
1083    pub metadata: HashMap<String, String>,
1084    /// Headers for given object.
1085    #[serde(default)]
1086    pub headers: Option<HeaderMap>,
1087    /// Link this object points to, if any.
1088    #[serde(default)]
1089    pub options: Option<ObjectOptions>,
1090    /// Name of the bucket the object is stored in.
1091    pub bucket: String,
1092    /// Unique identifier used to uniquely identify this version of the object.
1093    #[serde(default)]
1094    pub nuid: String,
1095    /// Size in bytes of the object.
1096    #[serde(default)]
1097    pub size: usize,
1098    /// Number of chunks the object is stored in.
1099    #[serde(default)]
1100    pub chunks: usize,
1101    /// Date and time the object was last modified.
1102    #[serde(default, with = "rfc3339::option")]
1103    #[serde(rename = "mtime")]
1104    pub modified: Option<time::OffsetDateTime>,
1105    /// Digest of the object stream.
1106    #[serde(default, skip_serializing_if = "Option::is_none")]
1107    pub digest: Option<String>,
1108    /// Set to true if the object has been deleted.
1109    #[serde(default, skip_serializing_if = "is_default")]
1110    pub deleted: bool,
1111}
1112
1113fn is_default<T: Default + Eq>(t: &T) -> bool {
1114    t == &T::default()
1115}
1116/// A link to another object, potentially in another bucket.
1117#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
1118pub struct ObjectLink {
1119    /// Name of the object
1120    pub name: Option<String>,
1121    /// Name of the bucket the object is stored in.
1122    pub bucket: String,
1123}
1124
1125#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
1126pub struct UpdateMetadata {
1127    /// Name of the object
1128    pub name: String,
1129    /// A short human readable description of the object.
1130    pub description: Option<String>,
1131    /// Metadata for given object.
1132    #[serde(default)]
1133    pub metadata: HashMap<String, String>,
1134    /// Headers for given object.
1135    pub headers: Option<HeaderMap>,
1136}
1137
1138/// Meta information about an object.
1139#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
1140pub struct ObjectMetadata {
1141    /// Name of the object
1142    pub name: String,
1143    /// A short human readable description of the object.
1144    pub description: Option<String>,
1145    /// Max chunk size. Default is 128k.
1146    pub chunk_size: Option<usize>,
1147    /// Metadata for given object.
1148    #[serde(default)]
1149    pub metadata: HashMap<String, String>,
1150    /// Headers for given object.
1151    pub headers: Option<HeaderMap>,
1152}
1153
1154impl From<&str> for ObjectMetadata {
1155    fn from(s: &str) -> ObjectMetadata {
1156        ObjectMetadata {
1157            name: s.to_string(),
1158            ..Default::default()
1159        }
1160    }
1161}
1162
1163pub trait AsObjectInfo {
1164    fn as_info(&self) -> &ObjectInfo;
1165}
1166
1167impl AsObjectInfo for &Object {
1168    fn as_info(&self) -> &ObjectInfo {
1169        &self.info
1170    }
1171}
1172impl AsObjectInfo for &ObjectInfo {
1173    fn as_info(&self) -> &ObjectInfo {
1174        self
1175    }
1176}
1177
1178impl From<ObjectInfo> for ObjectMetadata {
1179    fn from(info: ObjectInfo) -> Self {
1180        ObjectMetadata {
1181            name: info.name,
1182            description: info.description,
1183            metadata: info.metadata,
1184            headers: info.headers,
1185            chunk_size: None,
1186        }
1187    }
1188}
1189
1190#[derive(Debug, PartialEq, Clone)]
1191pub enum UpdateMetadataErrorKind {
1192    InvalidName,
1193    NotFound,
1194    TimedOut,
1195    Other,
1196    PublishMetadata,
1197    NameAlreadyInUse,
1198    Purge,
1199}
1200
1201impl Display for UpdateMetadataErrorKind {
1202    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1203        match self {
1204            Self::InvalidName => write!(f, "invalid object name"),
1205            Self::NotFound => write!(f, "object not found"),
1206            Self::TimedOut => write!(f, "timed out"),
1207            Self::Other => write!(f, "error"),
1208            Self::PublishMetadata => {
1209                write!(f, "failed publishing metadata")
1210            }
1211            Self::NameAlreadyInUse => {
1212                write!(f, "object with updated name already exists")
1213            }
1214            Self::Purge => write!(f, "failed purging old name metadata"),
1215        }
1216    }
1217}
1218
1219impl From<InfoError> for UpdateMetadataError {
1220    fn from(error: InfoError) -> Self {
1221        match error.kind() {
1222            InfoErrorKind::InvalidName => {
1223                UpdateMetadataError::new(UpdateMetadataErrorKind::InvalidName)
1224            }
1225            InfoErrorKind::NotFound => UpdateMetadataError::new(UpdateMetadataErrorKind::NotFound),
1226            InfoErrorKind::Other => {
1227                UpdateMetadataError::with_source(UpdateMetadataErrorKind::Other, error)
1228            }
1229            InfoErrorKind::TimedOut => UpdateMetadataError::new(UpdateMetadataErrorKind::TimedOut),
1230        }
1231    }
1232}
1233
1234pub type UpdateMetadataError = Error<UpdateMetadataErrorKind>;
1235
1236#[derive(Clone, Copy, Debug, PartialEq)]
1237pub enum InfoErrorKind {
1238    InvalidName,
1239    NotFound,
1240    Other,
1241    TimedOut,
1242}
1243
1244impl Display for InfoErrorKind {
1245    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1246        match self {
1247            Self::InvalidName => write!(f, "invalid object name"),
1248            Self::Other => write!(f, "getting info failed"),
1249            Self::NotFound => write!(f, "not found"),
1250            Self::TimedOut => write!(f, "timed out"),
1251        }
1252    }
1253}
1254
1255pub type InfoError = Error<InfoErrorKind>;
1256
1257#[derive(Clone, Copy, Debug, PartialEq)]
1258pub enum GetErrorKind {
1259    InvalidName,
1260    ConsumerCreate,
1261    NotFound,
1262    BucketLink,
1263    Other,
1264    TimedOut,
1265}
1266
1267impl Display for GetErrorKind {
1268    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1269        match self {
1270            Self::ConsumerCreate => write!(f, "failed creating consumer for fetching object"),
1271            Self::Other => write!(f, "failed getting object"),
1272            Self::NotFound => write!(f, "object not found"),
1273            Self::TimedOut => write!(f, "timed out"),
1274            Self::InvalidName => write!(f, "invalid object name"),
1275            Self::BucketLink => write!(f, "object is a link to a bucket"),
1276        }
1277    }
1278}
1279
1280pub type GetError = Error<GetErrorKind>;
1281
1282crate::from_with_timeout!(GetError, GetErrorKind, ConsumerError, ConsumerErrorKind);
1283crate::from_with_timeout!(GetError, GetErrorKind, StreamError, StreamErrorKind);
1284
1285impl From<InfoError> for GetError {
1286    fn from(err: InfoError) -> Self {
1287        match err.kind() {
1288            InfoErrorKind::InvalidName => GetError::new(GetErrorKind::InvalidName),
1289            InfoErrorKind::NotFound => GetError::new(GetErrorKind::NotFound),
1290            InfoErrorKind::Other => GetError::with_source(GetErrorKind::Other, err),
1291            InfoErrorKind::TimedOut => GetError::new(GetErrorKind::TimedOut),
1292        }
1293    }
1294}
1295
1296#[derive(Clone, Copy, Debug, PartialEq)]
1297pub enum DeleteErrorKind {
1298    TimedOut,
1299    NotFound,
1300    Metadata,
1301    InvalidName,
1302    Chunks,
1303    Other,
1304}
1305
1306impl Display for DeleteErrorKind {
1307    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1308        match self {
1309            Self::TimedOut => write!(f, "timed out"),
1310            Self::Metadata => write!(f, "failed rolling up metadata"),
1311            Self::Chunks => write!(f, "failed purging chunks"),
1312            Self::Other => write!(f, "delete failed"),
1313            Self::NotFound => write!(f, "object not found"),
1314            Self::InvalidName => write!(f, "invalid object name"),
1315        }
1316    }
1317}
1318
1319pub type DeleteError = Error<DeleteErrorKind>;
1320
1321impl From<InfoError> for DeleteError {
1322    fn from(err: InfoError) -> Self {
1323        match err.kind() {
1324            InfoErrorKind::InvalidName => DeleteError::new(DeleteErrorKind::InvalidName),
1325            InfoErrorKind::NotFound => DeleteError::new(DeleteErrorKind::NotFound),
1326            InfoErrorKind::Other => DeleteError::with_source(DeleteErrorKind::Other, err),
1327            InfoErrorKind::TimedOut => DeleteError::new(DeleteErrorKind::TimedOut),
1328        }
1329    }
1330}
1331
1332crate::from_with_timeout!(DeleteError, DeleteErrorKind, PublishError, PublishErrorKind);
1333crate::from_with_timeout!(DeleteError, DeleteErrorKind, PurgeError, PurgeErrorKind);
1334
1335#[derive(Clone, Copy, Debug, PartialEq)]
1336pub enum PutErrorKind {
1337    InvalidName,
1338    ReadChunks,
1339    PublishChunks,
1340    PublishMetadata,
1341    PurgeOldChunks,
1342    TimedOut,
1343    Other,
1344}
1345
1346impl Display for PutErrorKind {
1347    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1348        match self {
1349            Self::PublishChunks => write!(f, "failed publishing object chunks"),
1350            Self::PublishMetadata => write!(f, "failed publishing metadata"),
1351            Self::PurgeOldChunks => write!(f, "failed purging old chunks"),
1352            Self::TimedOut => write!(f, "timed out"),
1353            Self::Other => write!(f, "error"),
1354            Self::InvalidName => write!(f, "invalid object name"),
1355            Self::ReadChunks => write!(f, "error while reading the buffer"),
1356        }
1357    }
1358}
1359
1360pub type PutError = Error<PutErrorKind>;
1361
1362pub type AddLinkError = Error<AddLinkErrorKind>;
1363
1364#[derive(Clone, Copy, Debug, PartialEq)]
1365pub enum AddLinkErrorKind {
1366    EmptyName,
1367    ObjectRequired,
1368    Deleted,
1369    LinkToLink,
1370    PublishMetadata,
1371    AlreadyExists,
1372    Other,
1373}
1374
1375impl Display for AddLinkErrorKind {
1376    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1377        match self {
1378            AddLinkErrorKind::ObjectRequired => write!(f, "cannot link to empty Object"),
1379            AddLinkErrorKind::Deleted => write!(f, "cannot link a deleted Object"),
1380            AddLinkErrorKind::LinkToLink => write!(f, "cannot link to another link"),
1381            AddLinkErrorKind::EmptyName => write!(f, "link name cannot be empty"),
1382            AddLinkErrorKind::PublishMetadata => write!(f, "failed publishing link metadata"),
1383            AddLinkErrorKind::Other => write!(f, "error"),
1384            AddLinkErrorKind::AlreadyExists => write!(f, "object already exists"),
1385        }
1386    }
1387}
1388
1389type PublishMetadataError = Error<PublishMetadataErrorKind>;
1390
1391#[derive(Clone, Copy, Debug, PartialEq)]
1392enum PublishMetadataErrorKind {
1393    PublishMetadata,
1394    Other,
1395}
1396
1397impl Display for PublishMetadataErrorKind {
1398    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1399        match self {
1400            PublishMetadataErrorKind::PublishMetadata => write!(f, "failed to publish metadata"),
1401            PublishMetadataErrorKind::Other => write!(f, "error"),
1402        }
1403    }
1404}
1405
1406impl From<PublishMetadataError> for AddLinkError {
1407    fn from(error: PublishMetadataError) -> Self {
1408        match error.kind {
1409            PublishMetadataErrorKind::PublishMetadata => {
1410                AddLinkError::new(AddLinkErrorKind::PublishMetadata)
1411            }
1412            PublishMetadataErrorKind::Other => {
1413                AddLinkError::with_source(AddLinkErrorKind::Other, error)
1414            }
1415        }
1416    }
1417}
1418impl From<PublishMetadataError> for PutError {
1419    fn from(error: PublishMetadataError) -> Self {
1420        match error.kind {
1421            PublishMetadataErrorKind::PublishMetadata => {
1422                PutError::new(PutErrorKind::PublishMetadata)
1423            }
1424            PublishMetadataErrorKind::Other => PutError::with_source(PutErrorKind::Other, error),
1425        }
1426    }
1427}
1428
1429#[derive(Clone, Copy, Debug, PartialEq)]
1430pub enum WatchErrorKind {
1431    TimedOut,
1432    ConsumerCreate,
1433    Other,
1434}
1435
1436impl Display for WatchErrorKind {
1437    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1438        match self {
1439            Self::ConsumerCreate => write!(f, "watch consumer creation failed"),
1440            Self::Other => write!(f, "watch failed"),
1441            Self::TimedOut => write!(f, "timed out"),
1442        }
1443    }
1444}
1445
1446pub type WatchError = Error<WatchErrorKind>;
1447
1448crate::from_with_timeout!(WatchError, WatchErrorKind, ConsumerError, ConsumerErrorKind);
1449crate::from_with_timeout!(WatchError, WatchErrorKind, StreamError, StreamErrorKind);
1450
1451pub type ListError = WatchError;
1452pub type ListErrorKind = WatchErrorKind;
1453
1454#[derive(Clone, Copy, Debug, PartialEq)]
1455pub enum SealErrorKind {
1456    TimedOut,
1457    Other,
1458    Info,
1459    Update,
1460}
1461
1462impl Display for SealErrorKind {
1463    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1464        match self {
1465            Self::TimedOut => write!(f, "timed out"),
1466            Self::Other => write!(f, "seal failed"),
1467            Self::Info => write!(f, "failed getting stream info before sealing bucket"),
1468            Self::Update => write!(f, "failed sealing the bucket"),
1469        }
1470    }
1471}
1472
1473pub type SealError = Error<SealErrorKind>;
1474
1475impl From<super::context::UpdateStreamError> for SealError {
1476    fn from(err: super::context::UpdateStreamError) -> Self {
1477        match err.kind() {
1478            super::context::CreateStreamErrorKind::TimedOut => {
1479                SealError::new(SealErrorKind::TimedOut)
1480            }
1481            _ => SealError::with_source(SealErrorKind::Update, err),
1482        }
1483    }
1484}
1485
1486#[derive(Clone, Copy, Debug, PartialEq)]
1487pub enum WatcherErrorKind {
1488    ConsumerError,
1489    Other,
1490}
1491
1492impl Display for WatcherErrorKind {
1493    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1494        match self {
1495            Self::ConsumerError => write!(f, "watcher consumer error"),
1496            Self::Other => write!(f, "watcher error"),
1497        }
1498    }
1499}
1500
1501pub type WatcherError = Error<WatcherErrorKind>;
1502
1503impl From<OrderedError> for WatcherError {
1504    fn from(err: OrderedError) -> Self {
1505        WatcherError::with_source(WatcherErrorKind::ConsumerError, err)
1506    }
1507}
1508
1509pub type ListerError = WatcherError;
1510pub type ListerErrorKind = WatcherErrorKind;