async_nats/jetstream/object_store/
mod.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14//! Object Store module
15use std::collections::{HashMap, VecDeque};
16use std::fmt::Display;
17use std::{cmp, str::FromStr, task::Poll, time::Duration};
18
19use crate::crypto::Sha256;
20use crate::subject::Subject;
21use crate::{HeaderMap, HeaderValue};
22use base64::engine::general_purpose::URL_SAFE;
23use base64::engine::Engine;
24use bytes::BytesMut;
25use futures::future::BoxFuture;
26use once_cell::sync::Lazy;
27use tokio::io::AsyncReadExt;
28
29use futures::{Stream, StreamExt};
30use regex::Regex;
31use serde::{Deserialize, Serialize};
32use tracing::{debug, trace};
33
34use super::consumer::push::{OrderedConfig, OrderedError};
35use super::consumer::{DeliverPolicy, StreamError, StreamErrorKind};
36use super::context::{PublishError, PublishErrorKind};
37use super::stream::{self, ConsumerError, ConsumerErrorKind, PurgeError, PurgeErrorKind};
38use super::{consumer::push::Ordered, stream::StorageType};
39use crate::error::Error;
40use time::{serde::rfc3339, OffsetDateTime};
41
42const DEFAULT_CHUNK_SIZE: usize = 128 * 1024;
43const NATS_ROLLUP: &str = "Nats-Rollup";
44const ROLLUP_SUBJECT: &str = "sub";
45
46static BUCKET_NAME_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\A[a-zA-Z0-9_-]+\z").unwrap());
47static OBJECT_NAME_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\A[-/_=\.a-zA-Z0-9]+\z").unwrap());
48
49pub(crate) fn is_valid_bucket_name(bucket_name: &str) -> bool {
50    BUCKET_NAME_RE.is_match(bucket_name)
51}
52
53pub(crate) fn is_valid_object_name(object_name: &str) -> bool {
54    if object_name.is_empty() || object_name.starts_with('.') || object_name.ends_with('.') {
55        return false;
56    }
57
58    OBJECT_NAME_RE.is_match(object_name)
59}
60
61pub(crate) fn encode_object_name(object_name: &str) -> String {
62    URL_SAFE.encode(object_name)
63}
64
65/// Configuration values for object store buckets.
66#[derive(Debug, Default, Clone, Serialize, Deserialize)]
67pub struct Config {
68    /// Name of the storage bucket.
69    pub bucket: String,
70    /// A short description of the purpose of this storage bucket.
71    pub description: Option<String>,
72    /// Maximum age of any value in the bucket, expressed in nanoseconds
73    #[serde(default, with = "serde_nanos")]
74    pub max_age: Duration,
75    /// How large the storage bucket may become in total bytes.
76    pub max_bytes: i64,
77    /// The type of storage backend, `File` (default) and `Memory`
78    pub storage: StorageType,
79    /// How many replicas to keep for each value in a cluster, maximum 5.
80    pub num_replicas: usize,
81    /// Sets compression of the underlying stream.
82    pub compression: bool,
83    // Cluster and tag placement.
84    pub placement: Option<stream::Placement>,
85}
86
87/// A blob store capable of storing large objects efficiently in streams.
88#[derive(Clone)]
89pub struct ObjectStore {
90    pub(crate) name: String,
91    pub(crate) stream: crate::jetstream::stream::Stream,
92}
93
94impl ObjectStore {
95    /// Gets an [Object] from the [ObjectStore].
96    ///
97    /// [Object] implements [tokio::io::AsyncRead] that allows
98    /// to read the data from Object Store.
99    ///
100    /// # Examples
101    ///
102    /// ```no_run
103    /// # #[tokio::main]
104    /// # async fn main() -> Result<(), async_nats::Error> {
105    /// use tokio::io::AsyncReadExt;
106    /// let client = async_nats::connect("demo.nats.io").await?;
107    /// let jetstream = async_nats::jetstream::new(client);
108    ///
109    /// let bucket = jetstream.get_object_store("store").await?;
110    /// let mut object = bucket.get("FOO").await?;
111    ///
112    /// // Object implements `tokio::io::AsyncRead`.
113    /// let mut bytes = vec![];
114    /// object.read_to_end(&mut bytes).await?;
115    /// # Ok(())
116    /// # }
117    /// ```
118    pub async fn get<T: AsRef<str> + Send>(&self, object_name: T) -> Result<Object, GetError> {
119        self.get_impl(object_name).await
120    }
121
122    fn get_impl<'bucket, 'future, T>(
123        &'bucket self,
124        object_name: T,
125    ) -> BoxFuture<'future, Result<Object, GetError>>
126    where
127        T: AsRef<str> + Send + 'future,
128        'bucket: 'future,
129    {
130        Box::pin(async move {
131            let object_info = self.info(object_name).await?;
132            if object_info.deleted {
133                return Err(GetError::new(GetErrorKind::NotFound));
134            }
135            if let Some(ref options) = object_info.options {
136                if let Some(link) = options.link.as_ref() {
137                    if let Some(link_name) = link.name.as_ref() {
138                        let link_name = link_name.clone();
139                        debug!("getting object via link");
140                        if link.bucket == self.name {
141                            return self.get_impl(link_name).await;
142                        } else {
143                            let bucket = self
144                                .stream
145                                .context
146                                .get_object_store(&link.bucket)
147                                .await
148                                .map_err(|err| {
149                                GetError::with_source(GetErrorKind::Other, err)
150                            })?;
151                            let object = bucket.get_impl(&link_name).await?;
152                            return Ok(object);
153                        }
154                    } else {
155                        return Err(GetError::new(GetErrorKind::BucketLink));
156                    }
157                }
158            }
159
160            debug!("not a link. Getting the object");
161            Ok(Object::new(object_info, self.stream.clone()))
162        })
163    }
164
165    /// Deletes an [Object] from the [ObjectStore].
166    ///
167    /// # Examples
168    ///
169    /// ```no_run
170    /// # #[tokio::main]
171    /// # async fn main() -> Result<(), async_nats::Error> {
172    /// let client = async_nats::connect("demo.nats.io").await?;
173    /// let jetstream = async_nats::jetstream::new(client);
174    ///
175    /// let bucket = jetstream.get_object_store("store").await?;
176    /// bucket.delete("FOO").await?;
177    /// # Ok(())
178    /// # }
179    /// ```
180    pub async fn delete<T: AsRef<str>>(&self, object_name: T) -> Result<(), DeleteError> {
181        let object_name = object_name.as_ref();
182        let mut object_info = self.info(object_name).await?;
183        object_info.chunks = 0;
184        object_info.size = 0;
185        object_info.deleted = true;
186
187        let data = serde_json::to_vec(&object_info).map_err(|err| {
188            DeleteError::with_source(
189                DeleteErrorKind::Other,
190                format!("failed deserializing object info: {}", err),
191            )
192        })?;
193
194        let mut headers = HeaderMap::default();
195        headers.insert(
196            NATS_ROLLUP,
197            HeaderValue::from_str(ROLLUP_SUBJECT).map_err(|err| {
198                DeleteError::with_source(
199                    DeleteErrorKind::Other,
200                    format!("failed parsing header: {}", err),
201                )
202            })?,
203        );
204
205        let subject = format!("$O.{}.M.{}", &self.name, encode_object_name(object_name));
206
207        self.stream
208            .context
209            .publish_with_headers(subject, headers, data.into())
210            .await?
211            .await?;
212
213        let chunk_subject = format!("$O.{}.C.{}", self.name, object_info.nuid);
214
215        self.stream.purge().filter(&chunk_subject).await?;
216
217        Ok(())
218    }
219
220    /// Retrieves [Object] [ObjectInfo].
221    ///
222    /// # Examples
223    ///
224    /// ```no_run
225    /// # #[tokio::main]
226    /// # async fn main() -> Result<(), async_nats::Error> {
227    /// let client = async_nats::connect("demo.nats.io").await?;
228    /// let jetstream = async_nats::jetstream::new(client);
229    ///
230    /// let bucket = jetstream.get_object_store("store").await?;
231    /// let info = bucket.info("FOO").await?;
232    /// # Ok(())
233    /// # }
234    /// ```
235    pub async fn info<T: AsRef<str>>(&self, object_name: T) -> Result<ObjectInfo, InfoError> {
236        let object_name = object_name.as_ref();
237        let object_name = encode_object_name(object_name);
238        if !is_valid_object_name(&object_name) {
239            return Err(InfoError::new(InfoErrorKind::InvalidName));
240        }
241
242        // Grab last meta value we have.
243        let subject = format!("$O.{}.M.{}", &self.name, &object_name);
244
245        // FIXME(jrm): we should use direct get here when possible.
246        let message = self
247            .stream
248            .get_last_raw_message_by_subject(subject.as_str())
249            .await
250            .map_err(|err| match err.kind() {
251                stream::LastRawMessageErrorKind::NoMessageFound => {
252                    InfoError::new(InfoErrorKind::NotFound)
253                }
254                _ => InfoError::with_source(InfoErrorKind::Other, err),
255            })?;
256        let object_info =
257            serde_json::from_slice::<ObjectInfo>(&message.payload).map_err(|err| {
258                InfoError::with_source(
259                    InfoErrorKind::Other,
260                    format!("failed to decode info payload: {}", err),
261                )
262            })?;
263
264        Ok(object_info)
265    }
266
267    /// Puts an [Object] into the [ObjectStore].
268    /// This method implements `tokio::io::AsyncRead`.
269    ///
270    /// # Examples
271    ///
272    /// ```no_run
273    /// # #[tokio::main]
274    /// # async fn main() -> Result<(), async_nats::Error> {
275    /// let client = async_nats::connect("demo.nats.io").await?;
276    /// let jetstream = async_nats::jetstream::new(client);
277    ///
278    /// let bucket = jetstream.get_object_store("store").await?;
279    /// let mut file = tokio::fs::File::open("foo.txt").await?;
280    /// bucket.put("file", &mut file).await.unwrap();
281    /// # Ok(())
282    /// # }
283    /// ```
284    pub async fn put<T>(
285        &self,
286        meta: T,
287        data: &mut (impl tokio::io::AsyncRead + std::marker::Unpin),
288    ) -> Result<ObjectInfo, PutError>
289    where
290        ObjectMetadata: From<T>,
291    {
292        let object_meta: ObjectMetadata = meta.into();
293
294        // Fetch any existing object info, if there is any for later use.
295        let maybe_existing_object_info = (self.info(&object_meta.name).await).ok();
296
297        let object_nuid = nuid::next();
298        let chunk_subject = Subject::from(format!("$O.{}.C.{}", &self.name, &object_nuid));
299
300        let mut object_chunks = 0;
301        let mut object_size = 0;
302
303        let chunk_size = object_meta.chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE);
304        let mut buffer = BytesMut::with_capacity(chunk_size);
305        let mut sha256 = Sha256::new();
306
307        loop {
308            let n = data
309                .read_buf(&mut buffer)
310                .await
311                .map_err(|err| PutError::with_source(PutErrorKind::ReadChunks, err))?;
312
313            if n == 0 {
314                break;
315            }
316
317            let payload = buffer.split().freeze();
318            sha256.update(&payload);
319
320            object_size += payload.len();
321            object_chunks += 1;
322
323            self.stream
324                .context
325                .publish(chunk_subject.clone(), payload)
326                .await
327                .map_err(|err| {
328                    PutError::with_source(
329                        PutErrorKind::PublishChunks,
330                        format!("failed chunk publish: {}", err),
331                    )
332                })?
333                .await
334                .map_err(|err| {
335                    PutError::with_source(
336                        PutErrorKind::PublishChunks,
337                        format!("failed getting chunk ack: {}", err),
338                    )
339                })?;
340        }
341        let digest = sha256.finish();
342
343        let encoded_object_name = encode_object_name(&object_meta.name);
344        if !is_valid_object_name(&encoded_object_name) {
345            return Err(PutError::new(PutErrorKind::InvalidName));
346        }
347        let subject = format!("$O.{}.M.{}", &self.name, &encoded_object_name);
348
349        let object_info = ObjectInfo {
350            name: object_meta.name,
351            description: object_meta.description,
352            options: Some(ObjectOptions {
353                max_chunk_size: Some(chunk_size),
354                link: None,
355            }),
356            bucket: self.name.clone(),
357            nuid: object_nuid.to_string(),
358            chunks: object_chunks,
359            size: object_size,
360            digest: Some(format!("SHA-256={}", URL_SAFE.encode(digest))),
361            modified: Some(OffsetDateTime::now_utc()),
362            deleted: false,
363            metadata: object_meta.metadata,
364            headers: object_meta.headers,
365        };
366
367        let mut headers = HeaderMap::new();
368        headers.insert(
369            NATS_ROLLUP,
370            ROLLUP_SUBJECT.parse::<HeaderValue>().map_err(|err| {
371                PutError::with_source(
372                    PutErrorKind::Other,
373                    format!("failed parsing header: {}", err),
374                )
375            })?,
376        );
377        let data = serde_json::to_vec(&object_info).map_err(|err| {
378            PutError::with_source(
379                PutErrorKind::Other,
380                format!("failed serializing object info: {}", err),
381            )
382        })?;
383
384        // publish meta.
385        self.stream
386            .context
387            .publish_with_headers(subject, headers, data.into())
388            .await
389            .map_err(|err| {
390                PutError::with_source(
391                    PutErrorKind::PublishMetadata,
392                    format!("failed publishing metadata: {}", err),
393                )
394            })?
395            .await
396            .map_err(|err| {
397                PutError::with_source(
398                    PutErrorKind::PublishMetadata,
399                    format!("failed ack from metadata publish: {}", err),
400                )
401            })?;
402
403        // Purge any old chunks.
404        if let Some(existing_object_info) = maybe_existing_object_info {
405            let chunk_subject = format!("$O.{}.C.{}", &self.name, &existing_object_info.nuid);
406
407            self.stream
408                .purge()
409                .filter(&chunk_subject)
410                .await
411                .map_err(|err| PutError::with_source(PutErrorKind::PurgeOldChunks, err))?;
412        }
413
414        Ok(object_info)
415    }
416
417    /// Creates a [Watch] stream over changes in the [ObjectStore].
418    ///
419    /// # Examples
420    ///
421    /// ```no_run
422    /// # #[tokio::main]
423    /// # async fn main() -> Result<(), async_nats::Error> {
424    /// use futures::StreamExt;
425    /// let client = async_nats::connect("demo.nats.io").await?;
426    /// let jetstream = async_nats::jetstream::new(client);
427    ///
428    /// let bucket = jetstream.get_object_store("store").await?;
429    /// let mut watcher = bucket.watch().await.unwrap();
430    /// while let Some(object) = watcher.next().await {
431    ///     println!("detected changes in {:?}", object?);
432    /// }
433    /// # Ok(())
434    /// # }
435    /// ```
436    pub async fn watch(&self) -> Result<Watch, WatchError> {
437        self.watch_with_deliver_policy(DeliverPolicy::New).await
438    }
439
440    /// Creates a [Watch] stream over changes in the [ObjectStore] which yields values whenever
441    /// there are changes for that key with as well as last value.
442    pub async fn watch_with_history(&self) -> Result<Watch, WatchError> {
443        self.watch_with_deliver_policy(DeliverPolicy::LastPerSubject)
444            .await
445    }
446
447    async fn watch_with_deliver_policy(
448        &self,
449        deliver_policy: DeliverPolicy,
450    ) -> Result<Watch, WatchError> {
451        let subject = format!("$O.{}.M.>", self.name);
452        let ordered = self
453            .stream
454            .create_consumer(crate::jetstream::consumer::push::OrderedConfig {
455                deliver_policy,
456                deliver_subject: self.stream.context.client.new_inbox(),
457                description: Some("object store watcher".to_string()),
458                filter_subject: subject,
459                ..Default::default()
460            })
461            .await?;
462        Ok(Watch {
463            subscription: ordered.messages().await?,
464        })
465    }
466
467    /// Returns a [List] stream with all not deleted [Objects][Object] in the [ObjectStore].
468    ///
469    /// # Examples
470    ///
471    /// ```no_run
472    /// # #[tokio::main]
473    /// # async fn main() -> Result<(), async_nats::Error> {
474    /// use futures::StreamExt;
475    /// let client = async_nats::connect("demo.nats.io").await?;
476    /// let jetstream = async_nats::jetstream::new(client);
477    ///
478    /// let bucket = jetstream.get_object_store("store").await?;
479    /// let mut list = bucket.list().await.unwrap();
480    /// while let Some(object) = list.next().await {
481    ///     println!("object {:?}", object?);
482    /// }
483    /// # Ok(())
484    /// # }
485    /// ```
486    pub async fn list(&self) -> Result<List, ListError> {
487        trace!("starting Object List");
488        let subject = format!("$O.{}.M.>", self.name);
489        let ordered = self
490            .stream
491            .create_consumer(crate::jetstream::consumer::push::OrderedConfig {
492                deliver_policy: super::consumer::DeliverPolicy::All,
493                deliver_subject: self.stream.context.client.new_inbox(),
494                description: Some("object store list".to_string()),
495                filter_subject: subject,
496                ..Default::default()
497            })
498            .await?;
499        Ok(List {
500            done: ordered.info.num_pending == 0,
501            subscription: Some(ordered.messages().await?),
502        })
503    }
504
505    /// Seals a [ObjectStore], preventing any further changes to it or its [Objects][Object].
506    ///
507    /// # Examples
508    ///
509    /// ```no_run
510    /// # #[tokio::main]
511    /// # async fn main() -> Result<(), async_nats::Error> {
512    /// use futures::StreamExt;
513    /// let client = async_nats::connect("demo.nats.io").await?;
514    /// let jetstream = async_nats::jetstream::new(client);
515    ///
516    /// let mut bucket = jetstream.get_object_store("store").await?;
517    /// bucket.seal().await.unwrap();
518    /// # Ok(())
519    /// # }
520    /// ```
521    pub async fn seal(&mut self) -> Result<(), SealError> {
522        let mut stream_config = self
523            .stream
524            .info()
525            .await
526            .map_err(|err| SealError::with_source(SealErrorKind::Info, err))?
527            .to_owned();
528        stream_config.config.sealed = true;
529
530        self.stream
531            .context
532            .update_stream(&stream_config.config)
533            .await?;
534        Ok(())
535    }
536
537    /// Updates [Object] [ObjectMetadata].
538    ///
539    /// # Examples
540    ///
541    /// ```no_run
542    /// # #[tokio::main]
543    /// # async fn main() -> Result<(), async_nats::Error> {
544    /// use async_nats::jetstream::object_store;
545    /// let client = async_nats::connect("demo.nats.io").await?;
546    /// let jetstream = async_nats::jetstream::new(client);
547    ///
548    /// let mut bucket = jetstream.get_object_store("store").await?;
549    /// bucket
550    ///     .update_metadata(
551    ///         "object",
552    ///         object_store::UpdateMetadata {
553    ///             name: "new_name".to_string(),
554    ///             description: Some("a new description".to_string()),
555    ///             ..Default::default()
556    ///         },
557    ///     )
558    ///     .await?;
559    /// # Ok(())
560    /// # }
561    /// ```
562    pub async fn update_metadata<A: AsRef<str>>(
563        &self,
564        object: A,
565        metadata: UpdateMetadata,
566    ) -> Result<ObjectInfo, UpdateMetadataError> {
567        let mut info = self.info(object.as_ref()).await?;
568
569        // If name is being update, we need to check if other metadata with it already exists.
570        // If does, error. Otherwise, purge old name metadata.
571        if metadata.name != info.name {
572            tracing::info!("new metadata name is different than then old one");
573            if !is_valid_object_name(&metadata.name) {
574                return Err(UpdateMetadataError::new(
575                    UpdateMetadataErrorKind::InvalidName,
576                ));
577            }
578            match self.info(&metadata.name).await {
579                Ok(_) => {
580                    return Err(UpdateMetadataError::new(
581                        UpdateMetadataErrorKind::NameAlreadyInUse,
582                    ))
583                }
584                Err(err) => match err.kind() {
585                    InfoErrorKind::NotFound => {
586                        tracing::info!("purging old metadata: {}", info.name);
587                        self.stream
588                            .purge()
589                            .filter(format!(
590                                "$O.{}.M.{}",
591                                self.name,
592                                encode_object_name(&info.name)
593                            ))
594                            .await
595                            .map_err(|err| {
596                                UpdateMetadataError::with_source(
597                                    UpdateMetadataErrorKind::Purge,
598                                    err,
599                                )
600                            })?;
601                    }
602                    _ => {
603                        return Err(UpdateMetadataError::with_source(
604                            UpdateMetadataErrorKind::Other,
605                            err,
606                        ))
607                    }
608                },
609            }
610        }
611
612        info.name = metadata.name;
613        info.description = metadata.description;
614
615        let name = encode_object_name(&info.name);
616        let subject = format!("$O.{}.M.{}", &self.name, &name);
617
618        let mut headers = HeaderMap::new();
619        headers.insert(
620            NATS_ROLLUP,
621            ROLLUP_SUBJECT.parse::<HeaderValue>().map_err(|err| {
622                UpdateMetadataError::with_source(
623                    UpdateMetadataErrorKind::Other,
624                    format!("failed parsing header: {}", err),
625                )
626            })?,
627        );
628        let data = serde_json::to_vec(&info).map_err(|err| {
629            UpdateMetadataError::with_source(
630                UpdateMetadataErrorKind::Other,
631                format!("failed serializing object info: {}", err),
632            )
633        })?;
634
635        // publish meta.
636        self.stream
637            .context
638            .publish_with_headers(subject, headers, data.into())
639            .await
640            .map_err(|err| {
641                UpdateMetadataError::with_source(
642                    UpdateMetadataErrorKind::PublishMetadata,
643                    format!("failed publishing metadata: {}", err),
644                )
645            })?
646            .await
647            .map_err(|err| {
648                UpdateMetadataError::with_source(
649                    UpdateMetadataErrorKind::PublishMetadata,
650                    format!("failed ack from metadata publish: {}", err),
651                )
652            })?;
653
654        Ok(info)
655    }
656
657    /// Adds a link to an [Object].
658    /// It creates a new [Object] in the [ObjectStore] that points to another [Object]
659    /// and does not have any contents on it's own.
660    /// Links are automatically followed (one level deep) when calling [ObjectStore::get].
661    ///
662    /// # Examples
663    ///
664    /// ```no_run
665    /// # #[tokio::main]
666    /// # async fn main() -> Result<(), async_nats::Error> {
667    /// use async_nats::jetstream::object_store;
668    /// let client = async_nats::connect("demo.nats.io").await?;
669    /// let jetstream = async_nats::jetstream::new(client);
670    /// let bucket = jetstream.get_object_store("bucket").await?;
671    /// let object = bucket.get("object").await?;
672    /// bucket.add_link("link_to_object", &object).await?;
673    /// # Ok(())
674    /// # }
675    /// ```
676    pub async fn add_link<T, O>(&self, name: T, object: O) -> Result<ObjectInfo, AddLinkError>
677    where
678        T: ToString,
679        O: AsObjectInfo,
680    {
681        let object = object.as_info();
682        let name = name.to_string();
683        if name.is_empty() {
684            return Err(AddLinkError::new(AddLinkErrorKind::EmptyName));
685        }
686        if object.name.is_empty() {
687            return Err(AddLinkError::new(AddLinkErrorKind::ObjectRequired));
688        }
689        if object.deleted {
690            return Err(AddLinkError::new(AddLinkErrorKind::Deleted));
691        }
692        if let Some(ref options) = object.options {
693            if options.link.is_some() {
694                return Err(AddLinkError::new(AddLinkErrorKind::LinkToLink));
695            }
696        }
697        match self.info(&name).await {
698            Ok(info) => {
699                if let Some(options) = info.options {
700                    if options.link.is_none() {
701                        return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists));
702                    }
703                } else {
704                    return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists));
705                }
706            }
707            Err(err) if err.kind() != InfoErrorKind::NotFound => {
708                return Err(AddLinkError::with_source(AddLinkErrorKind::Other, err))
709            }
710            _ => (),
711        }
712
713        let info = ObjectInfo {
714            name,
715            description: None,
716            options: Some(ObjectOptions {
717                link: Some(ObjectLink {
718                    name: Some(object.name.clone()),
719                    bucket: object.bucket.clone(),
720                }),
721                max_chunk_size: None,
722            }),
723            bucket: self.name.clone(),
724            nuid: nuid::next().to_string(),
725            size: 0,
726            chunks: 0,
727            modified: Some(OffsetDateTime::now_utc()),
728            digest: None,
729            deleted: false,
730            metadata: HashMap::default(),
731            headers: None,
732        };
733        publish_meta(self, &info).await?;
734        Ok(info)
735    }
736
737    /// Adds a link to another [ObjectStore] bucket by creating a new [Object]
738    /// in the current [ObjectStore] that points to another [ObjectStore] and
739    /// does not contain any data.
740    ///
741    /// # Examples
742    ///
743    /// ```no_run
744    /// # #[tokio::main]
745    /// # async fn main() -> Result<(), async_nats::Error> {
746    /// use async_nats::jetstream::object_store;
747    /// let client = async_nats::connect("demo.nats.io").await?;
748    /// let jetstream = async_nats::jetstream::new(client);
749    /// let bucket = jetstream.get_object_store("bucket").await?;
750    /// bucket
751    ///     .add_bucket_link("link_to_object", "another_bucket")
752    ///     .await?;
753    /// # Ok(())
754    /// # }
755    /// ```
756    pub async fn add_bucket_link<T: ToString, U: ToString>(
757        &self,
758        name: T,
759        bucket: U,
760    ) -> Result<ObjectInfo, AddLinkError> {
761        let name = name.to_string();
762        let bucket = bucket.to_string();
763        if name.is_empty() {
764            return Err(AddLinkError::new(AddLinkErrorKind::EmptyName));
765        }
766
767        match self.info(&name).await {
768            Ok(info) => {
769                if let Some(options) = info.options {
770                    if options.link.is_none() {
771                        return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists));
772                    }
773                }
774            }
775            Err(err) if err.kind() != InfoErrorKind::NotFound => {
776                return Err(AddLinkError::with_source(AddLinkErrorKind::Other, err))
777            }
778            _ => (),
779        }
780
781        let info = ObjectInfo {
782            name: name.clone(),
783            description: None,
784            options: Some(ObjectOptions {
785                link: Some(ObjectLink { name: None, bucket }),
786                max_chunk_size: None,
787            }),
788            bucket: self.name.clone(),
789            nuid: nuid::next().to_string(),
790            size: 0,
791            chunks: 0,
792            modified: Some(OffsetDateTime::now_utc()),
793            digest: None,
794            deleted: false,
795            metadata: HashMap::default(),
796            headers: None,
797        };
798        publish_meta(self, &info).await?;
799        Ok(info)
800    }
801}
802
803async fn publish_meta(store: &ObjectStore, info: &ObjectInfo) -> Result<(), PublishMetadataError> {
804    let encoded_object_name = encode_object_name(&info.name);
805    let subject = format!("$O.{}.M.{}", &store.name, &encoded_object_name);
806
807    let mut headers = HeaderMap::new();
808    headers.insert(
809        NATS_ROLLUP,
810        ROLLUP_SUBJECT.parse::<HeaderValue>().map_err(|err| {
811            PublishMetadataError::with_source(
812                PublishMetadataErrorKind::Other,
813                format!("failed parsing header: {}", err),
814            )
815        })?,
816    );
817    let data = serde_json::to_vec(&info).map_err(|err| {
818        PublishMetadataError::with_source(
819            PublishMetadataErrorKind::Other,
820            format!("failed serializing object info: {}", err),
821        )
822    })?;
823
824    store
825        .stream
826        .context
827        .publish_with_headers(subject, headers, data.into())
828        .await
829        .map_err(|err| {
830            PublishMetadataError::with_source(
831                PublishMetadataErrorKind::PublishMetadata,
832                format!("failed publishing metadata: {}", err),
833            )
834        })?
835        .await
836        .map_err(|err| {
837            PublishMetadataError::with_source(
838                PublishMetadataErrorKind::PublishMetadata,
839                format!("failed ack from metadata publish: {}", err),
840            )
841        })?;
842    Ok(())
843}
844
845pub struct Watch {
846    subscription: crate::jetstream::consumer::push::Ordered,
847}
848
849impl Stream for Watch {
850    type Item = Result<ObjectInfo, WatcherError>;
851
852    fn poll_next(
853        mut self: std::pin::Pin<&mut Self>,
854        cx: &mut std::task::Context<'_>,
855    ) -> Poll<Option<Self::Item>> {
856        match self.subscription.poll_next_unpin(cx) {
857            Poll::Ready(message) => match message {
858                Some(message) => Poll::Ready(
859                    serde_json::from_slice::<ObjectInfo>(&message?.payload)
860                        .map_err(|err| {
861                            WatcherError::with_source(
862                                WatcherErrorKind::Other,
863                                format!("failed to deserialize object info: {}", err),
864                            )
865                        })
866                        .map_or_else(|err| Some(Err(err)), |result| Some(Ok(result))),
867                ),
868                None => Poll::Ready(None),
869            },
870            Poll::Pending => Poll::Pending,
871        }
872    }
873}
874
875pub struct List {
876    subscription: Option<crate::jetstream::consumer::push::Ordered>,
877    done: bool,
878}
879
880impl Stream for List {
881    type Item = Result<ObjectInfo, ListerError>;
882
883    fn poll_next(
884        mut self: std::pin::Pin<&mut Self>,
885        cx: &mut std::task::Context<'_>,
886    ) -> Poll<Option<Self::Item>> {
887        loop {
888            if self.done {
889                debug!("Object Store list done");
890                self.subscription = None;
891                return Poll::Ready(None);
892            }
893
894            if let Some(subscription) = self.subscription.as_mut() {
895                match subscription.poll_next_unpin(cx) {
896                    Poll::Ready(message) => match message {
897                        None => return Poll::Ready(None),
898                        Some(message) => {
899                            let message = message?;
900                            let info = message.info().map_err(|err| {
901                                ListerError::with_source(ListerErrorKind::Other, err)
902                            })?;
903                            trace!("num pending: {}", info.pending);
904                            if info.pending == 0 {
905                                self.done = true;
906                            }
907                            let response: ObjectInfo = serde_json::from_slice(&message.payload)
908                                .map_err(|err| {
909                                    ListerError::with_source(
910                                        ListerErrorKind::Other,
911                                        format!("failed deserializing object info: {}", err),
912                                    )
913                                })?;
914                            if response.deleted {
915                                continue;
916                            }
917                            return Poll::Ready(Some(Ok(response)));
918                        }
919                    },
920                    Poll::Pending => return Poll::Pending,
921                }
922            } else {
923                return Poll::Ready(None);
924            }
925        }
926    }
927}
928
929/// Represents an object stored in a bucket.
930pub struct Object {
931    pub info: ObjectInfo,
932    remaining_bytes: VecDeque<u8>,
933    has_pending_messages: bool,
934    digest: Option<Sha256>,
935    subscription: Option<crate::jetstream::consumer::push::Ordered>,
936    subscription_future: Option<BoxFuture<'static, Result<Ordered, StreamError>>>,
937    stream: crate::jetstream::stream::Stream,
938}
939
940impl std::fmt::Debug for Object {
941    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
942        f.debug_struct("Object")
943            .field("info", &self.info)
944            .field("remaining_bytes", &self.remaining_bytes)
945            .field("has_pending_messages", &self.has_pending_messages)
946            .finish()
947    }
948}
949
950impl Object {
951    pub(crate) fn new(info: ObjectInfo, stream: stream::Stream) -> Self {
952        let has_pending_messages = info.chunks > 0;
953        Object {
954            subscription: None,
955            info,
956            remaining_bytes: VecDeque::new(),
957            has_pending_messages,
958            digest: Some(Sha256::new()),
959            subscription_future: None,
960            stream,
961        }
962    }
963
964    /// Returns information about the object.
965    pub fn info(&self) -> &ObjectInfo {
966        &self.info
967    }
968}
969
970impl tokio::io::AsyncRead for Object {
971    fn poll_read(
972        mut self: std::pin::Pin<&mut Self>,
973        cx: &mut std::task::Context<'_>,
974        buf: &mut tokio::io::ReadBuf<'_>,
975    ) -> std::task::Poll<std::io::Result<()>> {
976        let (buf1, _buf2) = self.remaining_bytes.as_slices();
977        if !buf1.is_empty() {
978            let len = cmp::min(buf.remaining(), buf1.len());
979            buf.put_slice(&buf1[..len]);
980            self.remaining_bytes.drain(..len);
981            return Poll::Ready(Ok(()));
982        }
983
984        if self.has_pending_messages {
985            if self.subscription.is_none() {
986                let future = match self.subscription_future.as_mut() {
987                    Some(future) => future,
988                    None => {
989                        let stream = self.stream.clone();
990                        let bucket = self.info.bucket.clone();
991                        let nuid = self.info.nuid.clone();
992                        self.subscription_future.insert(Box::pin(async move {
993                            stream
994                                .create_consumer(OrderedConfig {
995                                    deliver_subject: stream.context.client.new_inbox(),
996                                    filter_subject: format!("$O.{}.C.{}", bucket, nuid),
997                                    ..Default::default()
998                                })
999                                .await
1000                                .unwrap()
1001                                .messages()
1002                                .await
1003                        }))
1004                    }
1005                };
1006                match future.as_mut().poll(cx) {
1007                    Poll::Ready(subscription) => {
1008                        self.subscription = Some(subscription.unwrap());
1009                    }
1010                    Poll::Pending => (),
1011                }
1012            }
1013            if let Some(subscription) = self.subscription.as_mut() {
1014                match subscription.poll_next_unpin(cx) {
1015                    Poll::Ready(message) => match message {
1016                        Some(message) => {
1017                            let message = message.map_err(|err| {
1018                                std::io::Error::other(format!(
1019                                    "error from JetStream subscription: {err}"
1020                                ))
1021                            })?;
1022                            let len = cmp::min(buf.remaining(), message.payload.len());
1023                            buf.put_slice(&message.payload[..len]);
1024                            if let Some(context) = &mut self.digest {
1025                                context.update(&message.payload);
1026                            }
1027                            self.remaining_bytes.extend(&message.payload[len..]);
1028
1029                            let info = message.info().map_err(|err| {
1030                                std::io::Error::other(format!(
1031                                    "error from JetStream subscription: {err}"
1032                                ))
1033                            })?;
1034                            if info.pending == 0 {
1035                                let digest = self.digest.take().map(Sha256::finish);
1036                                if let Some(digest) = digest {
1037                                    if self
1038                                        .info
1039                                        .digest
1040                                        .as_ref()
1041                                        .map(|digest_self| {
1042                                            format!("SHA-256={}", URL_SAFE.encode(digest))
1043                                                != *digest_self
1044                                        })
1045                                        .unwrap_or(false)
1046                                    {
1047                                        return Poll::Ready(Err(std::io::Error::new(
1048                                            std::io::ErrorKind::InvalidData,
1049                                            "wrong digest",
1050                                        )));
1051                                    }
1052                                } else {
1053                                    return Poll::Ready(Err(std::io::Error::new(
1054                                        std::io::ErrorKind::InvalidData,
1055                                        "digest should be Some",
1056                                    )));
1057                                }
1058                                self.has_pending_messages = false;
1059                                self.subscription = None;
1060                            }
1061                            Poll::Ready(Ok(()))
1062                        }
1063                        None => Poll::Ready(Err(std::io::Error::other(
1064                            "subscription ended before reading whole object",
1065                        ))),
1066                    },
1067                    Poll::Pending => Poll::Pending,
1068                }
1069            } else {
1070                Poll::Pending
1071            }
1072        } else {
1073            Poll::Ready(Ok(()))
1074        }
1075    }
1076}
1077
1078#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
1079pub struct ObjectOptions {
1080    pub link: Option<ObjectLink>,
1081    pub max_chunk_size: Option<usize>,
1082}
1083
1084/// Meta and instance information about an object.
1085#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
1086pub struct ObjectInfo {
1087    /// Name of the object
1088    pub name: String,
1089    /// A short human readable description of the object.
1090    #[serde(default)]
1091    pub description: Option<String>,
1092    /// Metadata for given object.
1093    #[serde(default)]
1094    pub metadata: HashMap<String, String>,
1095    /// Headers for given object.
1096    #[serde(default)]
1097    pub headers: Option<HeaderMap>,
1098    /// Link this object points to, if any.
1099    #[serde(default)]
1100    pub options: Option<ObjectOptions>,
1101    /// Name of the bucket the object is stored in.
1102    pub bucket: String,
1103    /// Unique identifier used to uniquely identify this version of the object.
1104    #[serde(default)]
1105    pub nuid: String,
1106    /// Size in bytes of the object.
1107    #[serde(default)]
1108    pub size: usize,
1109    /// Number of chunks the object is stored in.
1110    #[serde(default)]
1111    pub chunks: usize,
1112    /// Date and time the object was last modified.
1113    #[serde(default, with = "rfc3339::option")]
1114    #[serde(rename = "mtime")]
1115    pub modified: Option<time::OffsetDateTime>,
1116    /// Digest of the object stream.
1117    #[serde(default, skip_serializing_if = "Option::is_none")]
1118    pub digest: Option<String>,
1119    /// Set to true if the object has been deleted.
1120    #[serde(default, skip_serializing_if = "is_default")]
1121    pub deleted: bool,
1122}
1123
1124fn is_default<T: Default + Eq>(t: &T) -> bool {
1125    t == &T::default()
1126}
1127/// A link to another object, potentially in another bucket.
1128#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
1129pub struct ObjectLink {
1130    /// Name of the object
1131    pub name: Option<String>,
1132    /// Name of the bucket the object is stored in.
1133    pub bucket: String,
1134}
1135
1136#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
1137pub struct UpdateMetadata {
1138    /// Name of the object
1139    pub name: String,
1140    /// A short human readable description of the object.
1141    pub description: Option<String>,
1142    /// Metadata for given object.
1143    #[serde(default)]
1144    pub metadata: HashMap<String, String>,
1145    /// Headers for given object.
1146    pub headers: Option<HeaderMap>,
1147}
1148
1149/// Meta information about an object.
1150#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
1151pub struct ObjectMetadata {
1152    /// Name of the object
1153    pub name: String,
1154    /// A short human readable description of the object.
1155    pub description: Option<String>,
1156    /// Max chunk size. Default is 128k.
1157    pub chunk_size: Option<usize>,
1158    /// Metadata for given object.
1159    #[serde(default)]
1160    pub metadata: HashMap<String, String>,
1161    /// Headers for given object.
1162    pub headers: Option<HeaderMap>,
1163}
1164
1165impl From<&str> for ObjectMetadata {
1166    fn from(s: &str) -> ObjectMetadata {
1167        ObjectMetadata {
1168            name: s.to_string(),
1169            ..Default::default()
1170        }
1171    }
1172}
1173
1174pub trait AsObjectInfo {
1175    fn as_info(&self) -> &ObjectInfo;
1176}
1177
1178impl AsObjectInfo for &Object {
1179    fn as_info(&self) -> &ObjectInfo {
1180        &self.info
1181    }
1182}
1183impl AsObjectInfo for &ObjectInfo {
1184    fn as_info(&self) -> &ObjectInfo {
1185        self
1186    }
1187}
1188
1189impl From<ObjectInfo> for ObjectMetadata {
1190    fn from(info: ObjectInfo) -> Self {
1191        ObjectMetadata {
1192            name: info.name,
1193            description: info.description,
1194            metadata: info.metadata,
1195            headers: info.headers,
1196            chunk_size: None,
1197        }
1198    }
1199}
1200
1201#[derive(Debug, PartialEq, Clone)]
1202pub enum UpdateMetadataErrorKind {
1203    InvalidName,
1204    NotFound,
1205    TimedOut,
1206    Other,
1207    PublishMetadata,
1208    NameAlreadyInUse,
1209    Purge,
1210}
1211
1212impl Display for UpdateMetadataErrorKind {
1213    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1214        match self {
1215            Self::InvalidName => write!(f, "invalid object name"),
1216            Self::NotFound => write!(f, "object not found"),
1217            Self::TimedOut => write!(f, "timed out"),
1218            Self::Other => write!(f, "error"),
1219            Self::PublishMetadata => {
1220                write!(f, "failed publishing metadata")
1221            }
1222            Self::NameAlreadyInUse => {
1223                write!(f, "object with updated name already exists")
1224            }
1225            Self::Purge => write!(f, "failed purging old name metadata"),
1226        }
1227    }
1228}
1229
1230impl From<InfoError> for UpdateMetadataError {
1231    fn from(error: InfoError) -> Self {
1232        match error.kind() {
1233            InfoErrorKind::InvalidName => {
1234                UpdateMetadataError::new(UpdateMetadataErrorKind::InvalidName)
1235            }
1236            InfoErrorKind::NotFound => UpdateMetadataError::new(UpdateMetadataErrorKind::NotFound),
1237            InfoErrorKind::Other => {
1238                UpdateMetadataError::with_source(UpdateMetadataErrorKind::Other, error)
1239            }
1240            InfoErrorKind::TimedOut => UpdateMetadataError::new(UpdateMetadataErrorKind::TimedOut),
1241        }
1242    }
1243}
1244
1245pub type UpdateMetadataError = Error<UpdateMetadataErrorKind>;
1246
1247#[derive(Clone, Copy, Debug, PartialEq)]
1248pub enum InfoErrorKind {
1249    InvalidName,
1250    NotFound,
1251    Other,
1252    TimedOut,
1253}
1254
1255impl Display for InfoErrorKind {
1256    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1257        match self {
1258            Self::InvalidName => write!(f, "invalid object name"),
1259            Self::Other => write!(f, "getting info failed"),
1260            Self::NotFound => write!(f, "not found"),
1261            Self::TimedOut => write!(f, "timed out"),
1262        }
1263    }
1264}
1265
1266pub type InfoError = Error<InfoErrorKind>;
1267
1268#[derive(Clone, Copy, Debug, PartialEq)]
1269pub enum GetErrorKind {
1270    InvalidName,
1271    ConsumerCreate,
1272    NotFound,
1273    BucketLink,
1274    Other,
1275    TimedOut,
1276}
1277
1278impl Display for GetErrorKind {
1279    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1280        match self {
1281            Self::ConsumerCreate => write!(f, "failed creating consumer for fetching object"),
1282            Self::Other => write!(f, "failed getting object"),
1283            Self::NotFound => write!(f, "object not found"),
1284            Self::TimedOut => write!(f, "timed out"),
1285            Self::InvalidName => write!(f, "invalid object name"),
1286            Self::BucketLink => write!(f, "object is a link to a bucket"),
1287        }
1288    }
1289}
1290
1291pub type GetError = Error<GetErrorKind>;
1292
1293crate::from_with_timeout!(GetError, GetErrorKind, ConsumerError, ConsumerErrorKind);
1294crate::from_with_timeout!(GetError, GetErrorKind, StreamError, StreamErrorKind);
1295
1296impl From<InfoError> for GetError {
1297    fn from(err: InfoError) -> Self {
1298        match err.kind() {
1299            InfoErrorKind::InvalidName => GetError::new(GetErrorKind::InvalidName),
1300            InfoErrorKind::NotFound => GetError::new(GetErrorKind::NotFound),
1301            InfoErrorKind::Other => GetError::with_source(GetErrorKind::Other, err),
1302            InfoErrorKind::TimedOut => GetError::new(GetErrorKind::TimedOut),
1303        }
1304    }
1305}
1306
1307#[derive(Clone, Copy, Debug, PartialEq)]
1308pub enum DeleteErrorKind {
1309    TimedOut,
1310    NotFound,
1311    Metadata,
1312    InvalidName,
1313    Chunks,
1314    Other,
1315}
1316
1317impl Display for DeleteErrorKind {
1318    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1319        match self {
1320            Self::TimedOut => write!(f, "timed out"),
1321            Self::Metadata => write!(f, "failed rolling up metadata"),
1322            Self::Chunks => write!(f, "failed purging chunks"),
1323            Self::Other => write!(f, "delete failed"),
1324            Self::NotFound => write!(f, "object not found"),
1325            Self::InvalidName => write!(f, "invalid object name"),
1326        }
1327    }
1328}
1329
1330pub type DeleteError = Error<DeleteErrorKind>;
1331
1332impl From<InfoError> for DeleteError {
1333    fn from(err: InfoError) -> Self {
1334        match err.kind() {
1335            InfoErrorKind::InvalidName => DeleteError::new(DeleteErrorKind::InvalidName),
1336            InfoErrorKind::NotFound => DeleteError::new(DeleteErrorKind::NotFound),
1337            InfoErrorKind::Other => DeleteError::with_source(DeleteErrorKind::Other, err),
1338            InfoErrorKind::TimedOut => DeleteError::new(DeleteErrorKind::TimedOut),
1339        }
1340    }
1341}
1342
1343crate::from_with_timeout!(DeleteError, DeleteErrorKind, PublishError, PublishErrorKind);
1344crate::from_with_timeout!(DeleteError, DeleteErrorKind, PurgeError, PurgeErrorKind);
1345
1346#[derive(Clone, Copy, Debug, PartialEq)]
1347pub enum PutErrorKind {
1348    InvalidName,
1349    ReadChunks,
1350    PublishChunks,
1351    PublishMetadata,
1352    PurgeOldChunks,
1353    TimedOut,
1354    Other,
1355}
1356
1357impl Display for PutErrorKind {
1358    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1359        match self {
1360            Self::PublishChunks => write!(f, "failed publishing object chunks"),
1361            Self::PublishMetadata => write!(f, "failed publishing metadata"),
1362            Self::PurgeOldChunks => write!(f, "failed purging old chunks"),
1363            Self::TimedOut => write!(f, "timed out"),
1364            Self::Other => write!(f, "error"),
1365            Self::InvalidName => write!(f, "invalid object name"),
1366            Self::ReadChunks => write!(f, "error while reading the buffer"),
1367        }
1368    }
1369}
1370
1371pub type PutError = Error<PutErrorKind>;
1372
1373pub type AddLinkError = Error<AddLinkErrorKind>;
1374
1375#[derive(Clone, Copy, Debug, PartialEq)]
1376pub enum AddLinkErrorKind {
1377    EmptyName,
1378    ObjectRequired,
1379    Deleted,
1380    LinkToLink,
1381    PublishMetadata,
1382    AlreadyExists,
1383    Other,
1384}
1385
1386impl Display for AddLinkErrorKind {
1387    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1388        match self {
1389            AddLinkErrorKind::ObjectRequired => write!(f, "cannot link to empty Object"),
1390            AddLinkErrorKind::Deleted => write!(f, "cannot link a deleted Object"),
1391            AddLinkErrorKind::LinkToLink => write!(f, "cannot link to another link"),
1392            AddLinkErrorKind::EmptyName => write!(f, "link name cannot be empty"),
1393            AddLinkErrorKind::PublishMetadata => write!(f, "failed publishing link metadata"),
1394            AddLinkErrorKind::Other => write!(f, "error"),
1395            AddLinkErrorKind::AlreadyExists => write!(f, "object already exists"),
1396        }
1397    }
1398}
1399
1400type PublishMetadataError = Error<PublishMetadataErrorKind>;
1401
1402#[derive(Clone, Copy, Debug, PartialEq)]
1403enum PublishMetadataErrorKind {
1404    PublishMetadata,
1405    Other,
1406}
1407
1408impl Display for PublishMetadataErrorKind {
1409    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1410        match self {
1411            PublishMetadataErrorKind::PublishMetadata => write!(f, "failed to publish metadata"),
1412            PublishMetadataErrorKind::Other => write!(f, "error"),
1413        }
1414    }
1415}
1416
1417impl From<PublishMetadataError> for AddLinkError {
1418    fn from(error: PublishMetadataError) -> Self {
1419        match error.kind {
1420            PublishMetadataErrorKind::PublishMetadata => {
1421                AddLinkError::new(AddLinkErrorKind::PublishMetadata)
1422            }
1423            PublishMetadataErrorKind::Other => {
1424                AddLinkError::with_source(AddLinkErrorKind::Other, error)
1425            }
1426        }
1427    }
1428}
1429impl From<PublishMetadataError> for PutError {
1430    fn from(error: PublishMetadataError) -> Self {
1431        match error.kind {
1432            PublishMetadataErrorKind::PublishMetadata => {
1433                PutError::new(PutErrorKind::PublishMetadata)
1434            }
1435            PublishMetadataErrorKind::Other => PutError::with_source(PutErrorKind::Other, error),
1436        }
1437    }
1438}
1439
1440#[derive(Clone, Copy, Debug, PartialEq)]
1441pub enum WatchErrorKind {
1442    TimedOut,
1443    ConsumerCreate,
1444    Other,
1445}
1446
1447impl Display for WatchErrorKind {
1448    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1449        match self {
1450            Self::ConsumerCreate => write!(f, "watch consumer creation failed"),
1451            Self::Other => write!(f, "watch failed"),
1452            Self::TimedOut => write!(f, "timed out"),
1453        }
1454    }
1455}
1456
1457pub type WatchError = Error<WatchErrorKind>;
1458
1459crate::from_with_timeout!(WatchError, WatchErrorKind, ConsumerError, ConsumerErrorKind);
1460crate::from_with_timeout!(WatchError, WatchErrorKind, StreamError, StreamErrorKind);
1461
1462pub type ListError = WatchError;
1463pub type ListErrorKind = WatchErrorKind;
1464
1465#[derive(Clone, Copy, Debug, PartialEq)]
1466pub enum SealErrorKind {
1467    TimedOut,
1468    Other,
1469    Info,
1470    Update,
1471}
1472
1473impl Display for SealErrorKind {
1474    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1475        match self {
1476            Self::TimedOut => write!(f, "timed out"),
1477            Self::Other => write!(f, "seal failed"),
1478            Self::Info => write!(f, "failed getting stream info before sealing bucket"),
1479            Self::Update => write!(f, "failed sealing the bucket"),
1480        }
1481    }
1482}
1483
1484pub type SealError = Error<SealErrorKind>;
1485
1486impl From<super::context::UpdateStreamError> for SealError {
1487    fn from(err: super::context::UpdateStreamError) -> Self {
1488        match err.kind() {
1489            super::context::CreateStreamErrorKind::TimedOut => {
1490                SealError::new(SealErrorKind::TimedOut)
1491            }
1492            _ => SealError::with_source(SealErrorKind::Update, err),
1493        }
1494    }
1495}
1496
1497#[derive(Clone, Copy, Debug, PartialEq)]
1498pub enum WatcherErrorKind {
1499    ConsumerError,
1500    Other,
1501}
1502
1503impl Display for WatcherErrorKind {
1504    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1505        match self {
1506            Self::ConsumerError => write!(f, "watcher consumer error"),
1507            Self::Other => write!(f, "watcher error"),
1508        }
1509    }
1510}
1511
1512pub type WatcherError = Error<WatcherErrorKind>;
1513
1514impl From<OrderedError> for WatcherError {
1515    fn from(err: OrderedError) -> Self {
1516        WatcherError::with_source(WatcherErrorKind::ConsumerError, err)
1517    }
1518}
1519
1520pub type ListerError = WatcherError;
1521pub type ListerErrorKind = WatcherErrorKind;