async_nats_wrpc/jetstream/object_store/
mod.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14//! Object Store module
15use std::collections::VecDeque;
16use std::fmt::Display;
17use std::{cmp, str::FromStr, task::Poll, time::Duration};
18
19use crate::crypto::Sha256;
20use crate::subject::Subject;
21use crate::{HeaderMap, HeaderValue};
22use base64::engine::general_purpose::{STANDARD, URL_SAFE};
23use base64::engine::Engine;
24use bytes::BytesMut;
25use futures::future::BoxFuture;
26use once_cell::sync::Lazy;
27use tokio::io::AsyncReadExt;
28
29use futures::{Stream, StreamExt};
30use regex::Regex;
31use serde::{Deserialize, Serialize};
32use tracing::{debug, trace};
33
34use super::consumer::push::{OrderedConfig, OrderedError};
35use super::consumer::{DeliverPolicy, StreamError, StreamErrorKind};
36use super::context::{PublishError, PublishErrorKind};
37use super::stream::{self, ConsumerError, ConsumerErrorKind, PurgeError, PurgeErrorKind};
38use super::{consumer::push::Ordered, stream::StorageType};
39use crate::error::Error;
40use time::{serde::rfc3339, OffsetDateTime};
41
42const DEFAULT_CHUNK_SIZE: usize = 128 * 1024;
43const NATS_ROLLUP: &str = "Nats-Rollup";
44const ROLLUP_SUBJECT: &str = "sub";
45
46static BUCKET_NAME_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\A[a-zA-Z0-9_-]+\z").unwrap());
47static OBJECT_NAME_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\A[-/_=\.a-zA-Z0-9]+\z").unwrap());
48
49pub(crate) fn is_valid_bucket_name(bucket_name: &str) -> bool {
50    BUCKET_NAME_RE.is_match(bucket_name)
51}
52
53pub(crate) fn is_valid_object_name(object_name: &str) -> bool {
54    if object_name.is_empty() || object_name.starts_with('.') || object_name.ends_with('.') {
55        return false;
56    }
57
58    OBJECT_NAME_RE.is_match(object_name)
59}
60
61pub(crate) fn encode_object_name(object_name: &str) -> String {
62    URL_SAFE.encode(object_name)
63}
64
65/// Configuration values for object store buckets.
66#[derive(Debug, Default, Clone, Serialize, Deserialize)]
67pub struct Config {
68    /// Name of the storage bucket.
69    pub bucket: String,
70    /// A short description of the purpose of this storage bucket.
71    pub description: Option<String>,
72    /// Maximum age of any value in the bucket, expressed in nanoseconds
73    #[serde(default, with = "serde_nanos")]
74    pub max_age: Duration,
75    /// The type of storage backend, `File` (default) and `Memory`
76    pub storage: StorageType,
77    /// How many replicas to keep for each value in a cluster, maximum 5.
78    pub num_replicas: usize,
79    /// Sets compression of the underlying stream.
80    pub compression: bool,
81    // Cluster and tag placement.
82    pub placement: Option<stream::Placement>,
83}
84
85/// A blob store capable of storing large objects efficiently in streams.
86#[derive(Clone)]
87pub struct ObjectStore {
88    pub(crate) name: String,
89    pub(crate) stream: crate::jetstream::stream::Stream,
90}
91
92impl ObjectStore {
93    /// Gets an [Object] from the [ObjectStore].
94    ///
95    /// [Object] implements [tokio::io::AsyncRead] that allows
96    /// to read the data from Object Store.
97    ///
98    /// # Examples
99    ///
100    /// ```no_run
101    /// # #[tokio::main]
102    /// # async fn main() -> Result<(), async_nats::Error> {
103    /// use tokio::io::AsyncReadExt;
104    /// let client = async_nats::connect("demo.nats.io").await?;
105    /// let jetstream = async_nats::jetstream::new(client);
106    ///
107    /// let bucket = jetstream.get_object_store("store").await?;
108    /// let mut object = bucket.get("FOO").await?;
109    ///
110    /// // Object implements `tokio::io::AsyncRead`.
111    /// let mut bytes = vec![];
112    /// object.read_to_end(&mut bytes).await?;
113    /// # Ok(())
114    /// # }
115    /// ```
116    pub async fn get<T: AsRef<str> + Send>(&self, object_name: T) -> Result<Object, GetError> {
117        self.get_impl(object_name).await
118    }
119
120    fn get_impl<'bucket, 'future, T>(
121        &'bucket self,
122        object_name: T,
123    ) -> BoxFuture<'future, Result<Object, GetError>>
124    where
125        T: AsRef<str> + Send + 'future,
126        'bucket: 'future,
127    {
128        Box::pin(async move {
129            let object_info = self.info(object_name).await?;
130            if let Some(ref options) = object_info.options {
131                if let Some(link) = options.link.as_ref() {
132                    if let Some(link_name) = link.name.as_ref() {
133                        let link_name = link_name.clone();
134                        debug!("getting object via link");
135                        if link.bucket == self.name {
136                            return self.get_impl(link_name).await;
137                        } else {
138                            let bucket = self
139                                .stream
140                                .context
141                                .get_object_store(&link.bucket)
142                                .await
143                                .map_err(|err| {
144                                GetError::with_source(GetErrorKind::Other, err)
145                            })?;
146                            let object = bucket.get_impl(&link_name).await?;
147                            return Ok(object);
148                        }
149                    } else {
150                        return Err(GetError::new(GetErrorKind::BucketLink));
151                    }
152                }
153            }
154
155            debug!("not a link. Getting the object");
156            Ok(Object::new(object_info, self.stream.clone()))
157        })
158    }
159
160    /// Deletes an [Object] from the [ObjectStore].
161    ///
162    /// # Examples
163    ///
164    /// ```no_run
165    /// # #[tokio::main]
166    /// # async fn main() -> Result<(), async_nats::Error> {
167    /// let client = async_nats::connect("demo.nats.io").await?;
168    /// let jetstream = async_nats::jetstream::new(client);
169    ///
170    /// let bucket = jetstream.get_object_store("store").await?;
171    /// bucket.delete("FOO").await?;
172    /// # Ok(())
173    /// # }
174    /// ```
175    pub async fn delete<T: AsRef<str>>(&self, object_name: T) -> Result<(), DeleteError> {
176        let object_name = object_name.as_ref();
177        let mut object_info = self.info(object_name).await?;
178        object_info.chunks = 0;
179        object_info.size = 0;
180        object_info.deleted = true;
181
182        let data = serde_json::to_vec(&object_info).map_err(|err| {
183            DeleteError::with_source(
184                DeleteErrorKind::Other,
185                format!("failed deserializing object info: {}", err),
186            )
187        })?;
188
189        let mut headers = HeaderMap::default();
190        headers.insert(
191            NATS_ROLLUP,
192            HeaderValue::from_str(ROLLUP_SUBJECT).map_err(|err| {
193                DeleteError::with_source(
194                    DeleteErrorKind::Other,
195                    format!("failed parsing header: {}", err),
196                )
197            })?,
198        );
199
200        let subject = format!("$O.{}.M.{}", &self.name, encode_object_name(object_name));
201
202        self.stream
203            .context
204            .publish_with_headers(subject, headers, data.into())
205            .await?
206            .await?;
207
208        let chunk_subject = format!("$O.{}.C.{}", self.name, object_info.nuid);
209
210        self.stream.purge().filter(&chunk_subject).await?;
211
212        Ok(())
213    }
214
215    /// Retrieves [Object] [ObjectInfo].
216    ///
217    /// # Examples
218    ///
219    /// ```no_run
220    /// # #[tokio::main]
221    /// # async fn main() -> Result<(), async_nats::Error> {
222    /// let client = async_nats::connect("demo.nats.io").await?;
223    /// let jetstream = async_nats::jetstream::new(client);
224    ///
225    /// let bucket = jetstream.get_object_store("store").await?;
226    /// let info = bucket.info("FOO").await?;
227    /// # Ok(())
228    /// # }
229    /// ```
230    pub async fn info<T: AsRef<str>>(&self, object_name: T) -> Result<ObjectInfo, InfoError> {
231        let object_name = object_name.as_ref();
232        let object_name = encode_object_name(object_name);
233        if !is_valid_object_name(&object_name) {
234            return Err(InfoError::new(InfoErrorKind::InvalidName));
235        }
236
237        // Grab last meta value we have.
238        let subject = format!("$O.{}.M.{}", &self.name, &object_name);
239
240        let message = self
241            .stream
242            .get_last_raw_message_by_subject(subject.as_str())
243            .await
244            .map_err(|err| match err.kind() {
245                stream::LastRawMessageErrorKind::NoMessageFound => {
246                    InfoError::new(InfoErrorKind::NotFound)
247                }
248                _ => InfoError::with_source(InfoErrorKind::Other, err),
249            })?;
250        let decoded_payload = STANDARD
251            .decode(message.payload)
252            .map_err(|err| InfoError::with_source(InfoErrorKind::Other, err))?;
253        let object_info =
254            serde_json::from_slice::<ObjectInfo>(&decoded_payload).map_err(|err| {
255                InfoError::with_source(
256                    InfoErrorKind::Other,
257                    format!("failed to decode info payload: {}", err),
258                )
259            })?;
260
261        Ok(object_info)
262    }
263
264    /// Puts an [Object] into the [ObjectStore].
265    /// This method implements `tokio::io::AsyncRead`.
266    ///
267    /// # Examples
268    ///
269    /// ```no_run
270    /// # #[tokio::main]
271    /// # async fn main() -> Result<(), async_nats::Error> {
272    /// let client = async_nats::connect("demo.nats.io").await?;
273    /// let jetstream = async_nats::jetstream::new(client);
274    ///
275    /// let bucket = jetstream.get_object_store("store").await?;
276    /// let mut file = tokio::fs::File::open("foo.txt").await?;
277    /// bucket.put("file", &mut file).await.unwrap();
278    /// # Ok(())
279    /// # }
280    /// ```
281    pub async fn put<T>(
282        &self,
283        meta: T,
284        data: &mut (impl tokio::io::AsyncRead + std::marker::Unpin),
285    ) -> Result<ObjectInfo, PutError>
286    where
287        ObjectMetadata: From<T>,
288    {
289        let object_meta: ObjectMetadata = meta.into();
290
291        let encoded_object_name = encode_object_name(&object_meta.name);
292        if !is_valid_object_name(&encoded_object_name) {
293            return Err(PutError::new(PutErrorKind::InvalidName));
294        }
295        // Fetch any existing object info, if there is any for later use.
296        let maybe_existing_object_info = match self.info(&encoded_object_name).await {
297            Ok(object_info) => Some(object_info),
298            Err(_) => None,
299        };
300
301        let object_nuid = nuid::next();
302        let chunk_subject = Subject::from(format!("$O.{}.C.{}", &self.name, &object_nuid));
303
304        let mut object_chunks = 0;
305        let mut object_size = 0;
306
307        let chunk_size = object_meta.chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE);
308        let mut buffer = BytesMut::with_capacity(chunk_size);
309        let mut sha256 = Sha256::new();
310
311        loop {
312            let n = data
313                .read_buf(&mut buffer)
314                .await
315                .map_err(|err| PutError::with_source(PutErrorKind::ReadChunks, err))?;
316
317            if n == 0 {
318                break;
319            }
320
321            let payload = buffer.split().freeze();
322            sha256.update(&payload);
323
324            object_size += payload.len();
325            object_chunks += 1;
326
327            self.stream
328                .context
329                .publish(chunk_subject.clone(), payload)
330                .await
331                .map_err(|err| {
332                    PutError::with_source(
333                        PutErrorKind::PublishChunks,
334                        format!("failed chunk publish: {}", err),
335                    )
336                })?
337                .await
338                .map_err(|err| {
339                    PutError::with_source(
340                        PutErrorKind::PublishChunks,
341                        format!("failed getting chunk ack: {}", err),
342                    )
343                })?;
344        }
345        let digest = sha256.finish();
346        let subject = format!("$O.{}.M.{}", &self.name, &encoded_object_name);
347        let object_info = ObjectInfo {
348            name: object_meta.name,
349            description: object_meta.description,
350            options: Some(ObjectOptions {
351                max_chunk_size: Some(chunk_size),
352                link: None,
353            }),
354            bucket: self.name.clone(),
355            nuid: object_nuid.to_string(),
356            chunks: object_chunks,
357            size: object_size,
358            digest: Some(format!("SHA-256={}", URL_SAFE.encode(digest))),
359            modified: Some(OffsetDateTime::now_utc()),
360            deleted: false,
361        };
362
363        let mut headers = HeaderMap::new();
364        headers.insert(
365            NATS_ROLLUP,
366            ROLLUP_SUBJECT.parse::<HeaderValue>().map_err(|err| {
367                PutError::with_source(
368                    PutErrorKind::Other,
369                    format!("failed parsing header: {}", err),
370                )
371            })?,
372        );
373        let data = serde_json::to_vec(&object_info).map_err(|err| {
374            PutError::with_source(
375                PutErrorKind::Other,
376                format!("failed serializing object info: {}", err),
377            )
378        })?;
379
380        // publish meta.
381        self.stream
382            .context
383            .publish_with_headers(subject, headers, data.into())
384            .await
385            .map_err(|err| {
386                PutError::with_source(
387                    PutErrorKind::PublishMetadata,
388                    format!("failed publishing metadata: {}", err),
389                )
390            })?
391            .await
392            .map_err(|err| {
393                PutError::with_source(
394                    PutErrorKind::PublishMetadata,
395                    format!("failed ack from metadata publish: {}", err),
396                )
397            })?;
398
399        // Purge any old chunks.
400        if let Some(existing_object_info) = maybe_existing_object_info {
401            let chunk_subject = format!("$O.{}.C.{}", &self.name, &existing_object_info.nuid);
402
403            self.stream
404                .purge()
405                .filter(&chunk_subject)
406                .await
407                .map_err(|err| PutError::with_source(PutErrorKind::PurgeOldChunks, err))?;
408        }
409
410        Ok(object_info)
411    }
412
413    /// Creates a [Watch] stream over changes in the [ObjectStore].
414    ///
415    /// # Examples
416    ///
417    /// ```no_run
418    /// # #[tokio::main]
419    /// # async fn main() -> Result<(), async_nats::Error> {
420    /// use futures::StreamExt;
421    /// let client = async_nats::connect("demo.nats.io").await?;
422    /// let jetstream = async_nats::jetstream::new(client);
423    ///
424    /// let bucket = jetstream.get_object_store("store").await?;
425    /// let mut watcher = bucket.watch().await.unwrap();
426    /// while let Some(object) = watcher.next().await {
427    ///     println!("detected changes in {:?}", object?);
428    /// }
429    /// # Ok(())
430    /// # }
431    /// ```
432    pub async fn watch(&self) -> Result<Watch, WatchError> {
433        self.watch_with_deliver_policy(DeliverPolicy::New).await
434    }
435
436    /// Creates a [Watch] stream over changes in the [ObjectStore] which yields values whenever
437    /// there are changes for that key with as well as last value.
438    pub async fn watch_with_history(&self) -> Result<Watch, WatchError> {
439        self.watch_with_deliver_policy(DeliverPolicy::LastPerSubject)
440            .await
441    }
442
443    async fn watch_with_deliver_policy(
444        &self,
445        deliver_policy: DeliverPolicy,
446    ) -> Result<Watch, WatchError> {
447        let subject = format!("$O.{}.M.>", self.name);
448        let ordered = self
449            .stream
450            .create_consumer(crate::jetstream::consumer::push::OrderedConfig {
451                deliver_policy,
452                deliver_subject: self.stream.context.client.new_inbox(),
453                description: Some("object store watcher".to_string()),
454                filter_subject: subject,
455                ..Default::default()
456            })
457            .await?;
458        Ok(Watch {
459            subscription: ordered.messages().await?,
460        })
461    }
462
463    /// Returns a [List] stream with all not deleted [Objects][Object] in the [ObjectStore].
464    ///
465    /// # Examples
466    ///
467    /// ```no_run
468    /// # #[tokio::main]
469    /// # async fn main() -> Result<(), async_nats::Error> {
470    /// use futures::StreamExt;
471    /// let client = async_nats::connect("demo.nats.io").await?;
472    /// let jetstream = async_nats::jetstream::new(client);
473    ///
474    /// let bucket = jetstream.get_object_store("store").await?;
475    /// let mut list = bucket.list().await.unwrap();
476    /// while let Some(object) = list.next().await {
477    ///     println!("object {:?}", object?);
478    /// }
479    /// # Ok(())
480    /// # }
481    /// ```
482    pub async fn list(&self) -> Result<List, ListError> {
483        trace!("starting Object List");
484        let subject = format!("$O.{}.M.>", self.name);
485        let ordered = self
486            .stream
487            .create_consumer(crate::jetstream::consumer::push::OrderedConfig {
488                deliver_policy: super::consumer::DeliverPolicy::All,
489                deliver_subject: self.stream.context.client.new_inbox(),
490                description: Some("object store list".to_string()),
491                filter_subject: subject,
492                ..Default::default()
493            })
494            .await?;
495        Ok(List {
496            done: ordered.info.num_pending == 0,
497            subscription: Some(ordered.messages().await?),
498        })
499    }
500
501    /// Seals a [ObjectStore], preventing any further changes to it or its [Objects][Object].
502    ///
503    /// # Examples
504    ///
505    /// ```no_run
506    /// # #[tokio::main]
507    /// # async fn main() -> Result<(), async_nats::Error> {
508    /// use futures::StreamExt;
509    /// let client = async_nats::connect("demo.nats.io").await?;
510    /// let jetstream = async_nats::jetstream::new(client);
511    ///
512    /// let mut bucket = jetstream.get_object_store("store").await?;
513    /// bucket.seal().await.unwrap();
514    /// # Ok(())
515    /// # }
516    /// ```
517    pub async fn seal(&mut self) -> Result<(), SealError> {
518        let mut stream_config = self
519            .stream
520            .info()
521            .await
522            .map_err(|err| SealError::with_source(SealErrorKind::Info, err))?
523            .to_owned();
524        stream_config.config.sealed = true;
525
526        self.stream
527            .context
528            .update_stream(&stream_config.config)
529            .await?;
530        Ok(())
531    }
532
533    /// Updates [Object] [ObjectMetadata].
534    ///
535    /// # Examples
536    ///
537    /// ```no_run
538    /// # #[tokio::main]
539    /// # async fn main() -> Result<(), async_nats::Error> {
540    /// use async_nats::jetstream::object_store;
541    /// let client = async_nats::connect("demo.nats.io").await?;
542    /// let jetstream = async_nats::jetstream::new(client);
543    ///
544    /// let mut bucket = jetstream.get_object_store("store").await?;
545    /// bucket
546    ///     .update_metadata(
547    ///         "object",
548    ///         object_store::UpdateMetadata {
549    ///             name: "new_name".to_string(),
550    ///             description: Some("a new description".to_string()),
551    ///         },
552    ///     )
553    ///     .await?;
554    /// # Ok(())
555    /// # }
556    /// ```
557    pub async fn update_metadata<A: AsRef<str>>(
558        &self,
559        object: A,
560        metadata: UpdateMetadata,
561    ) -> Result<ObjectInfo, UpdateMetadataError> {
562        let mut info = self.info(object.as_ref()).await?;
563
564        // If name is being update, we need to check if other metadata with it already exists.
565        // If does, error. Otherwise, purge old name metadata.
566        if metadata.name != info.name {
567            tracing::info!("new metadata name is different than then old one");
568            if !is_valid_object_name(&metadata.name) {
569                return Err(UpdateMetadataError::new(
570                    UpdateMetadataErrorKind::InvalidName,
571                ));
572            }
573            match self.info(&metadata.name).await {
574                Ok(_) => {
575                    return Err(UpdateMetadataError::new(
576                        UpdateMetadataErrorKind::NameAlreadyInUse,
577                    ))
578                }
579                Err(err) => match err.kind() {
580                    InfoErrorKind::NotFound => {
581                        tracing::info!("purging old metadata: {}", info.name);
582                        self.stream
583                            .purge()
584                            .filter(format!(
585                                "$O.{}.M.{}",
586                                self.name,
587                                encode_object_name(&info.name)
588                            ))
589                            .await
590                            .map_err(|err| {
591                                UpdateMetadataError::with_source(
592                                    UpdateMetadataErrorKind::Purge,
593                                    err,
594                                )
595                            })?;
596                    }
597                    _ => {
598                        return Err(UpdateMetadataError::with_source(
599                            UpdateMetadataErrorKind::Other,
600                            err,
601                        ))
602                    }
603                },
604            }
605        }
606
607        info.name = metadata.name;
608        info.description = metadata.description;
609
610        let name = encode_object_name(&info.name);
611        let subject = format!("$O.{}.M.{}", &self.name, &name);
612
613        let mut headers = HeaderMap::new();
614        headers.insert(
615            NATS_ROLLUP,
616            ROLLUP_SUBJECT.parse::<HeaderValue>().map_err(|err| {
617                UpdateMetadataError::with_source(
618                    UpdateMetadataErrorKind::Other,
619                    format!("failed parsing header: {}", err),
620                )
621            })?,
622        );
623        let data = serde_json::to_vec(&info).map_err(|err| {
624            UpdateMetadataError::with_source(
625                UpdateMetadataErrorKind::Other,
626                format!("failed serializing object info: {}", err),
627            )
628        })?;
629
630        // publish meta.
631        self.stream
632            .context
633            .publish_with_headers(subject, headers, data.into())
634            .await
635            .map_err(|err| {
636                UpdateMetadataError::with_source(
637                    UpdateMetadataErrorKind::PublishMetadata,
638                    format!("failed publishing metadata: {}", err),
639                )
640            })?
641            .await
642            .map_err(|err| {
643                UpdateMetadataError::with_source(
644                    UpdateMetadataErrorKind::PublishMetadata,
645                    format!("failed ack from metadata publish: {}", err),
646                )
647            })?;
648
649        Ok(info)
650    }
651
652    /// Adds a link to an [Object].
653    /// It creates a new [Object] in the [ObjectStore] that points to another [Object]
654    /// and does not have any contents on it's own.
655    /// Links are automatically followed (one level deep) when calling [ObjectStore::get].
656    ///
657    /// # Examples
658    ///
659    /// ```no_run
660    /// # #[tokio::main]
661    /// # async fn main() -> Result<(), async_nats::Error> {
662    /// use async_nats::jetstream::object_store;
663    /// let client = async_nats::connect("demo.nats.io").await?;
664    /// let jetstream = async_nats::jetstream::new(client);
665    /// let bucket = jetstream.get_object_store("bucket").await?;
666    /// let object = bucket.get("object").await?;
667    /// bucket.add_link("link_to_object", &object).await?;
668    /// # Ok(())
669    /// # }
670    /// ```
671    pub async fn add_link<'a, T, O>(&self, name: T, object: O) -> Result<ObjectInfo, AddLinkError>
672    where
673        T: ToString,
674        O: AsObjectInfo,
675    {
676        let object = object.as_info();
677        let name = name.to_string();
678        if name.is_empty() {
679            return Err(AddLinkError::new(AddLinkErrorKind::EmptyName));
680        }
681        if object.name.is_empty() {
682            return Err(AddLinkError::new(AddLinkErrorKind::ObjectRequired));
683        }
684        if object.deleted {
685            return Err(AddLinkError::new(AddLinkErrorKind::Deleted));
686        }
687        if let Some(ref options) = object.options {
688            if options.link.is_some() {
689                return Err(AddLinkError::new(AddLinkErrorKind::LinkToLink));
690            }
691        }
692        match self.info(&name).await {
693            Ok(info) => {
694                if let Some(options) = info.options {
695                    if options.link.is_none() {
696                        return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists));
697                    }
698                } else {
699                    return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists));
700                }
701            }
702            Err(err) if err.kind() != InfoErrorKind::NotFound => {
703                return Err(AddLinkError::with_source(AddLinkErrorKind::Other, err))
704            }
705            _ => (),
706        }
707
708        let info = ObjectInfo {
709            name,
710            description: None,
711            options: Some(ObjectOptions {
712                link: Some(ObjectLink {
713                    name: Some(object.name.clone()),
714                    bucket: object.bucket.clone(),
715                }),
716                max_chunk_size: None,
717            }),
718            bucket: self.name.clone(),
719            nuid: nuid::next().to_string(),
720            size: 0,
721            chunks: 0,
722            modified: Some(OffsetDateTime::now_utc()),
723            digest: None,
724            deleted: false,
725        };
726        publish_meta(self, &info).await?;
727        Ok(info)
728    }
729
730    /// Adds a link to another [ObjectStore] bucket by creating a new [Object]
731    /// in the current [ObjectStore] that points to another [ObjectStore] and
732    /// does not contain any data.
733    ///
734    /// # Examples
735    ///
736    /// ```no_run
737    /// # #[tokio::main]
738    /// # async fn main() -> Result<(), async_nats::Error> {
739    /// use async_nats::jetstream::object_store;
740    /// let client = async_nats::connect("demo.nats.io").await?;
741    /// let jetstream = async_nats::jetstream::new(client);
742    /// let bucket = jetstream.get_object_store("bucket").await?;
743    /// bucket
744    ///     .add_bucket_link("link_to_object", "another_bucket")
745    ///     .await?;
746    /// # Ok(())
747    /// # }
748    /// ```
749    pub async fn add_bucket_link<T: ToString, U: ToString>(
750        &self,
751        name: T,
752        bucket: U,
753    ) -> Result<ObjectInfo, AddLinkError> {
754        let name = name.to_string();
755        let bucket = bucket.to_string();
756        if name.is_empty() {
757            return Err(AddLinkError::new(AddLinkErrorKind::EmptyName));
758        }
759
760        match self.info(&name).await {
761            Ok(info) => {
762                if let Some(options) = info.options {
763                    if options.link.is_none() {
764                        return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists));
765                    }
766                }
767            }
768            Err(err) if err.kind() != InfoErrorKind::NotFound => {
769                return Err(AddLinkError::with_source(AddLinkErrorKind::Other, err))
770            }
771            _ => (),
772        }
773
774        let info = ObjectInfo {
775            name: name.clone(),
776            description: None,
777            options: Some(ObjectOptions {
778                link: Some(ObjectLink { name: None, bucket }),
779                max_chunk_size: None,
780            }),
781            bucket: self.name.clone(),
782            nuid: nuid::next().to_string(),
783            size: 0,
784            chunks: 0,
785            modified: Some(OffsetDateTime::now_utc()),
786            digest: None,
787            deleted: false,
788        };
789        publish_meta(self, &info).await?;
790        Ok(info)
791    }
792}
793
794async fn publish_meta(store: &ObjectStore, info: &ObjectInfo) -> Result<(), PublishMetadataError> {
795    let encoded_object_name = encode_object_name(&info.name);
796    let subject = format!("$O.{}.M.{}", &store.name, &encoded_object_name);
797
798    let mut headers = HeaderMap::new();
799    headers.insert(
800        NATS_ROLLUP,
801        ROLLUP_SUBJECT.parse::<HeaderValue>().map_err(|err| {
802            PublishMetadataError::with_source(
803                PublishMetadataErrorKind::Other,
804                format!("failed parsing header: {}", err),
805            )
806        })?,
807    );
808    let data = serde_json::to_vec(&info).map_err(|err| {
809        PublishMetadataError::with_source(
810            PublishMetadataErrorKind::Other,
811            format!("failed serializing object info: {}", err),
812        )
813    })?;
814
815    store
816        .stream
817        .context
818        .publish_with_headers(subject, headers, data.into())
819        .await
820        .map_err(|err| {
821            PublishMetadataError::with_source(
822                PublishMetadataErrorKind::PublishMetadata,
823                format!("failed publishing metadata: {}", err),
824            )
825        })?
826        .await
827        .map_err(|err| {
828            PublishMetadataError::with_source(
829                PublishMetadataErrorKind::PublishMetadata,
830                format!("failed ack from metadata publish: {}", err),
831            )
832        })?;
833    Ok(())
834}
835
836pub struct Watch {
837    subscription: crate::jetstream::consumer::push::Ordered,
838}
839
840impl Stream for Watch {
841    type Item = Result<ObjectInfo, WatcherError>;
842
843    fn poll_next(
844        mut self: std::pin::Pin<&mut Self>,
845        cx: &mut std::task::Context<'_>,
846    ) -> Poll<Option<Self::Item>> {
847        match self.subscription.poll_next_unpin(cx) {
848            Poll::Ready(message) => match message {
849                Some(message) => Poll::Ready(
850                    serde_json::from_slice::<ObjectInfo>(&message?.payload)
851                        .map_err(|err| {
852                            WatcherError::with_source(
853                                WatcherErrorKind::Other,
854                                format!("failed to deserialize object info: {}", err),
855                            )
856                        })
857                        .map_or_else(|err| Some(Err(err)), |result| Some(Ok(result))),
858                ),
859                None => Poll::Ready(None),
860            },
861            Poll::Pending => Poll::Pending,
862        }
863    }
864}
865
866pub struct List {
867    subscription: Option<crate::jetstream::consumer::push::Ordered>,
868    done: bool,
869}
870
871impl Stream for List {
872    type Item = Result<ObjectInfo, ListerError>;
873
874    fn poll_next(
875        mut self: std::pin::Pin<&mut Self>,
876        cx: &mut std::task::Context<'_>,
877    ) -> Poll<Option<Self::Item>> {
878        loop {
879            if self.done {
880                debug!("Object Store list done");
881                self.subscription = None;
882                return Poll::Ready(None);
883            }
884
885            if let Some(subscription) = self.subscription.as_mut() {
886                match subscription.poll_next_unpin(cx) {
887                    Poll::Ready(message) => match message {
888                        None => return Poll::Ready(None),
889                        Some(message) => {
890                            let message = message?;
891                            let info = message.info().map_err(|err| {
892                                ListerError::with_source(ListerErrorKind::Other, err)
893                            })?;
894                            trace!("num pending: {}", info.pending);
895                            if info.pending == 0 {
896                                self.done = true;
897                            }
898                            let response: ObjectInfo = serde_json::from_slice(&message.payload)
899                                .map_err(|err| {
900                                    ListerError::with_source(
901                                        ListerErrorKind::Other,
902                                        format!("failed deserializing object info: {}", err),
903                                    )
904                                })?;
905                            if response.deleted {
906                                continue;
907                            }
908                            return Poll::Ready(Some(Ok(response)));
909                        }
910                    },
911                    Poll::Pending => return Poll::Pending,
912                }
913            } else {
914                return Poll::Ready(None);
915            }
916        }
917    }
918}
919
920/// Represents an object stored in a bucket.
921pub struct Object {
922    pub info: ObjectInfo,
923    remaining_bytes: VecDeque<u8>,
924    has_pending_messages: bool,
925    digest: Option<Sha256>,
926    subscription: Option<crate::jetstream::consumer::push::Ordered>,
927    subscription_future: Option<BoxFuture<'static, Result<Ordered, StreamError>>>,
928    stream: crate::jetstream::stream::Stream,
929}
930
931impl Object {
932    pub(crate) fn new(info: ObjectInfo, stream: stream::Stream) -> Self {
933        Object {
934            subscription: None,
935            info,
936            remaining_bytes: VecDeque::new(),
937            has_pending_messages: true,
938            digest: Some(Sha256::new()),
939            subscription_future: None,
940            stream,
941        }
942    }
943
944    /// Returns information about the object.
945    pub fn info(&self) -> &ObjectInfo {
946        &self.info
947    }
948}
949
950impl tokio::io::AsyncRead for Object {
951    fn poll_read(
952        mut self: std::pin::Pin<&mut Self>,
953        cx: &mut std::task::Context<'_>,
954        buf: &mut tokio::io::ReadBuf<'_>,
955    ) -> std::task::Poll<std::io::Result<()>> {
956        let (buf1, _buf2) = self.remaining_bytes.as_slices();
957        if !buf1.is_empty() {
958            let len = cmp::min(buf.remaining(), buf1.len());
959            buf.put_slice(&buf1[..len]);
960            self.remaining_bytes.drain(..len);
961            return Poll::Ready(Ok(()));
962        }
963
964        if self.has_pending_messages {
965            if self.subscription.is_none() {
966                let future = match self.subscription_future.as_mut() {
967                    Some(future) => future,
968                    None => {
969                        let stream = self.stream.clone();
970                        let bucket = self.info.bucket.clone();
971                        let nuid = self.info.nuid.clone();
972                        self.subscription_future.insert(Box::pin(async move {
973                            stream
974                                .create_consumer(OrderedConfig {
975                                    deliver_subject: stream.context.client.new_inbox(),
976                                    filter_subject: format!("$O.{}.C.{}", bucket, nuid),
977                                    ..Default::default()
978                                })
979                                .await
980                                .unwrap()
981                                .messages()
982                                .await
983                        }))
984                    }
985                };
986                match future.as_mut().poll(cx) {
987                    Poll::Ready(subscription) => {
988                        self.subscription = Some(subscription.unwrap());
989                    }
990                    Poll::Pending => (),
991                }
992            }
993            if let Some(subscription) = self.subscription.as_mut() {
994                match subscription.poll_next_unpin(cx) {
995                    Poll::Ready(message) => match message {
996                        Some(message) => {
997                            let message = message.map_err(|err| {
998                                std::io::Error::new(
999                                    std::io::ErrorKind::Other,
1000                                    format!("error from JetStream subscription: {err}"),
1001                                )
1002                            })?;
1003                            let len = cmp::min(buf.remaining(), message.payload.len());
1004                            buf.put_slice(&message.payload[..len]);
1005                            if let Some(context) = &mut self.digest {
1006                                context.update(&message.payload);
1007                            }
1008                            self.remaining_bytes.extend(&message.payload[len..]);
1009
1010                            let info = message.info().map_err(|err| {
1011                                std::io::Error::new(
1012                                    std::io::ErrorKind::Other,
1013                                    format!("error from JetStream subscription: {err}"),
1014                                )
1015                            })?;
1016                            if info.pending == 0 {
1017                                let digest = self.digest.take().map(Sha256::finish);
1018                                if let Some(digest) = digest {
1019                                    if self
1020                                        .info
1021                                        .digest
1022                                        .as_ref()
1023                                        .map(|digest_self| {
1024                                            format!("SHA-256={}", URL_SAFE.encode(digest))
1025                                                != *digest_self
1026                                        })
1027                                        .unwrap_or(false)
1028                                    {
1029                                        return Poll::Ready(Err(std::io::Error::new(
1030                                            std::io::ErrorKind::InvalidData,
1031                                            "wrong digest",
1032                                        )));
1033                                    }
1034                                } else {
1035                                    return Poll::Ready(Err(std::io::Error::new(
1036                                        std::io::ErrorKind::InvalidData,
1037                                        "digest should be Some",
1038                                    )));
1039                                }
1040                                self.has_pending_messages = false;
1041                                self.subscription = None;
1042                            }
1043                            Poll::Ready(Ok(()))
1044                        }
1045                        None => Poll::Ready(Err(std::io::Error::new(
1046                            std::io::ErrorKind::Other,
1047                            "subscription ended before reading whole object",
1048                        ))),
1049                    },
1050                    Poll::Pending => Poll::Pending,
1051                }
1052            } else {
1053                Poll::Pending
1054            }
1055        } else {
1056            Poll::Ready(Ok(()))
1057        }
1058    }
1059}
1060
1061#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
1062pub struct ObjectOptions {
1063    pub link: Option<ObjectLink>,
1064    pub max_chunk_size: Option<usize>,
1065}
1066
1067/// Meta and instance information about an object.
1068#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
1069pub struct ObjectInfo {
1070    /// Name of the object
1071    pub name: String,
1072    /// A short human readable description of the object.
1073    #[serde(default)]
1074    pub description: Option<String>,
1075    /// Link this object points to, if any.
1076    #[serde(default)]
1077    pub options: Option<ObjectOptions>,
1078    /// Name of the bucket the object is stored in.
1079    pub bucket: String,
1080    /// Unique identifier used to uniquely identify this version of the object.
1081    #[serde(default)]
1082    pub nuid: String,
1083    /// Size in bytes of the object.
1084    #[serde(default)]
1085    pub size: usize,
1086    /// Number of chunks the object is stored in.
1087    #[serde(default)]
1088    pub chunks: usize,
1089    /// Date and time the object was last modified.
1090    #[serde(default, with = "rfc3339::option")]
1091    #[serde(rename = "mtime")]
1092    pub modified: Option<time::OffsetDateTime>,
1093    /// Digest of the object stream.
1094    #[serde(default, skip_serializing_if = "Option::is_none")]
1095    pub digest: Option<String>,
1096    /// Set to true if the object has been deleted.
1097    #[serde(default, skip_serializing_if = "is_default")]
1098    pub deleted: bool,
1099}
1100
1101fn is_default<T: Default + Eq>(t: &T) -> bool {
1102    t == &T::default()
1103}
1104/// A link to another object, potentially in another bucket.
1105#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
1106pub struct ObjectLink {
1107    /// Name of the object
1108    pub name: Option<String>,
1109    /// Name of the bucket the object is stored in.
1110    pub bucket: String,
1111}
1112
1113#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
1114pub struct UpdateMetadata {
1115    /// Name of the object
1116    pub name: String,
1117    /// A short human readable description of the object.
1118    pub description: Option<String>,
1119}
1120
1121/// Meta information about an object.
1122#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
1123pub struct ObjectMetadata {
1124    /// Name of the object
1125    pub name: String,
1126    /// A short human readable description of the object.
1127    pub description: Option<String>,
1128    /// Max chunk size. Default is 128k.
1129    pub chunk_size: Option<usize>,
1130}
1131
1132impl From<&str> for ObjectMetadata {
1133    fn from(s: &str) -> ObjectMetadata {
1134        ObjectMetadata {
1135            name: s.to_string(),
1136            ..Default::default()
1137        }
1138    }
1139}
1140
1141pub trait AsObjectInfo {
1142    fn as_info(&self) -> &ObjectInfo;
1143}
1144
1145impl AsObjectInfo for &Object {
1146    fn as_info(&self) -> &ObjectInfo {
1147        &self.info
1148    }
1149}
1150impl AsObjectInfo for &ObjectInfo {
1151    fn as_info(&self) -> &ObjectInfo {
1152        self
1153    }
1154}
1155
1156impl From<ObjectInfo> for ObjectMetadata {
1157    fn from(info: ObjectInfo) -> Self {
1158        ObjectMetadata {
1159            name: info.name,
1160            description: info.description,
1161            chunk_size: None,
1162        }
1163    }
1164}
1165
1166#[derive(Debug, PartialEq, Clone)]
1167pub enum UpdateMetadataErrorKind {
1168    InvalidName,
1169    NotFound,
1170    TimedOut,
1171    Other,
1172    PublishMetadata,
1173    NameAlreadyInUse,
1174    Purge,
1175}
1176
1177impl Display for UpdateMetadataErrorKind {
1178    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1179        match self {
1180            Self::InvalidName => write!(f, "invalid object name"),
1181            Self::NotFound => write!(f, "object not found"),
1182            Self::TimedOut => write!(f, "timed out"),
1183            Self::Other => write!(f, "error"),
1184            Self::PublishMetadata => {
1185                write!(f, "failed publishing metadata")
1186            }
1187            Self::NameAlreadyInUse => {
1188                write!(f, "object with updated name already exists")
1189            }
1190            Self::Purge => write!(f, "failed purging old name metadata"),
1191        }
1192    }
1193}
1194
1195impl From<InfoError> for UpdateMetadataError {
1196    fn from(error: InfoError) -> Self {
1197        match error.kind() {
1198            InfoErrorKind::InvalidName => {
1199                UpdateMetadataError::new(UpdateMetadataErrorKind::InvalidName)
1200            }
1201            InfoErrorKind::NotFound => UpdateMetadataError::new(UpdateMetadataErrorKind::NotFound),
1202            InfoErrorKind::Other => {
1203                UpdateMetadataError::with_source(UpdateMetadataErrorKind::Other, error)
1204            }
1205            InfoErrorKind::TimedOut => UpdateMetadataError::new(UpdateMetadataErrorKind::TimedOut),
1206        }
1207    }
1208}
1209
1210pub type UpdateMetadataError = Error<UpdateMetadataErrorKind>;
1211
1212#[derive(Clone, Copy, Debug, PartialEq)]
1213pub enum InfoErrorKind {
1214    InvalidName,
1215    NotFound,
1216    Other,
1217    TimedOut,
1218}
1219
1220impl Display for InfoErrorKind {
1221    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1222        match self {
1223            Self::InvalidName => write!(f, "invalid object name"),
1224            Self::Other => write!(f, "getting info failed"),
1225            Self::NotFound => write!(f, "not found"),
1226            Self::TimedOut => write!(f, "timed out"),
1227        }
1228    }
1229}
1230
1231pub type InfoError = Error<InfoErrorKind>;
1232
1233#[derive(Clone, Copy, Debug, PartialEq)]
1234pub enum GetErrorKind {
1235    InvalidName,
1236    ConsumerCreate,
1237    NotFound,
1238    BucketLink,
1239    Other,
1240    TimedOut,
1241}
1242
1243impl Display for GetErrorKind {
1244    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1245        match self {
1246            Self::ConsumerCreate => write!(f, "failed creating consumer for fetching object"),
1247            Self::Other => write!(f, "failed getting object"),
1248            Self::NotFound => write!(f, "object not found"),
1249            Self::TimedOut => write!(f, "timed out"),
1250            Self::InvalidName => write!(f, "invalid object name"),
1251            Self::BucketLink => write!(f, "object is a link to a bucket"),
1252        }
1253    }
1254}
1255
1256pub type GetError = Error<GetErrorKind>;
1257
1258crate::from_with_timeout!(GetError, GetErrorKind, ConsumerError, ConsumerErrorKind);
1259crate::from_with_timeout!(GetError, GetErrorKind, StreamError, StreamErrorKind);
1260
1261impl From<InfoError> for GetError {
1262    fn from(err: InfoError) -> Self {
1263        match err.kind() {
1264            InfoErrorKind::InvalidName => GetError::new(GetErrorKind::InvalidName),
1265            InfoErrorKind::NotFound => GetError::new(GetErrorKind::NotFound),
1266            InfoErrorKind::Other => GetError::with_source(GetErrorKind::Other, err),
1267            InfoErrorKind::TimedOut => GetError::new(GetErrorKind::TimedOut),
1268        }
1269    }
1270}
1271
1272#[derive(Clone, Copy, Debug, PartialEq)]
1273pub enum DeleteErrorKind {
1274    TimedOut,
1275    NotFound,
1276    Metadata,
1277    InvalidName,
1278    Chunks,
1279    Other,
1280}
1281
1282impl Display for DeleteErrorKind {
1283    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1284        match self {
1285            Self::TimedOut => write!(f, "timed out"),
1286            Self::Metadata => write!(f, "failed rolling up metadata"),
1287            Self::Chunks => write!(f, "failed purging chunks"),
1288            Self::Other => write!(f, "delete failed"),
1289            Self::NotFound => write!(f, "object not found"),
1290            Self::InvalidName => write!(f, "invalid object name"),
1291        }
1292    }
1293}
1294
1295pub type DeleteError = Error<DeleteErrorKind>;
1296
1297impl From<InfoError> for DeleteError {
1298    fn from(err: InfoError) -> Self {
1299        match err.kind() {
1300            InfoErrorKind::InvalidName => DeleteError::new(DeleteErrorKind::InvalidName),
1301            InfoErrorKind::NotFound => DeleteError::new(DeleteErrorKind::NotFound),
1302            InfoErrorKind::Other => DeleteError::with_source(DeleteErrorKind::Other, err),
1303            InfoErrorKind::TimedOut => DeleteError::new(DeleteErrorKind::TimedOut),
1304        }
1305    }
1306}
1307
1308crate::from_with_timeout!(DeleteError, DeleteErrorKind, PublishError, PublishErrorKind);
1309crate::from_with_timeout!(DeleteError, DeleteErrorKind, PurgeError, PurgeErrorKind);
1310
1311#[derive(Clone, Copy, Debug, PartialEq)]
1312pub enum PutErrorKind {
1313    InvalidName,
1314    ReadChunks,
1315    PublishChunks,
1316    PublishMetadata,
1317    PurgeOldChunks,
1318    TimedOut,
1319    Other,
1320}
1321
1322impl Display for PutErrorKind {
1323    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1324        match self {
1325            Self::PublishChunks => write!(f, "failed publishing object chunks"),
1326            Self::PublishMetadata => write!(f, "failed publishing metadata"),
1327            Self::PurgeOldChunks => write!(f, "failed purging old chunks"),
1328            Self::TimedOut => write!(f, "timed out"),
1329            Self::Other => write!(f, "error"),
1330            Self::InvalidName => write!(f, "invalid object name"),
1331            Self::ReadChunks => write!(f, "error while reading the buffer"),
1332        }
1333    }
1334}
1335
1336pub type PutError = Error<PutErrorKind>;
1337
1338pub type AddLinkError = Error<AddLinkErrorKind>;
1339
1340#[derive(Clone, Copy, Debug, PartialEq)]
1341pub enum AddLinkErrorKind {
1342    EmptyName,
1343    ObjectRequired,
1344    Deleted,
1345    LinkToLink,
1346    PublishMetadata,
1347    AlreadyExists,
1348    Other,
1349}
1350
1351impl Display for AddLinkErrorKind {
1352    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1353        match self {
1354            AddLinkErrorKind::ObjectRequired => write!(f, "cannot link to empty Object"),
1355            AddLinkErrorKind::Deleted => write!(f, "cannot link a deleted Object"),
1356            AddLinkErrorKind::LinkToLink => write!(f, "cannot link to another link"),
1357            AddLinkErrorKind::EmptyName => write!(f, "link name cannot be empty"),
1358            AddLinkErrorKind::PublishMetadata => write!(f, "failed publishing link metadata"),
1359            AddLinkErrorKind::Other => write!(f, "error"),
1360            AddLinkErrorKind::AlreadyExists => write!(f, "object already exists"),
1361        }
1362    }
1363}
1364
1365type PublishMetadataError = Error<PublishMetadataErrorKind>;
1366
1367#[derive(Clone, Copy, Debug, PartialEq)]
1368enum PublishMetadataErrorKind {
1369    PublishMetadata,
1370    Other,
1371}
1372
1373impl Display for PublishMetadataErrorKind {
1374    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1375        match self {
1376            PublishMetadataErrorKind::PublishMetadata => write!(f, "failed to publish metadata"),
1377            PublishMetadataErrorKind::Other => write!(f, "error"),
1378        }
1379    }
1380}
1381
1382impl From<PublishMetadataError> for AddLinkError {
1383    fn from(error: PublishMetadataError) -> Self {
1384        match error.kind {
1385            PublishMetadataErrorKind::PublishMetadata => {
1386                AddLinkError::new(AddLinkErrorKind::PublishMetadata)
1387            }
1388            PublishMetadataErrorKind::Other => {
1389                AddLinkError::with_source(AddLinkErrorKind::Other, error)
1390            }
1391        }
1392    }
1393}
1394impl From<PublishMetadataError> for PutError {
1395    fn from(error: PublishMetadataError) -> Self {
1396        match error.kind {
1397            PublishMetadataErrorKind::PublishMetadata => {
1398                PutError::new(PutErrorKind::PublishMetadata)
1399            }
1400            PublishMetadataErrorKind::Other => PutError::with_source(PutErrorKind::Other, error),
1401        }
1402    }
1403}
1404
1405#[derive(Clone, Copy, Debug, PartialEq)]
1406pub enum WatchErrorKind {
1407    TimedOut,
1408    ConsumerCreate,
1409    Other,
1410}
1411
1412impl Display for WatchErrorKind {
1413    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1414        match self {
1415            Self::ConsumerCreate => write!(f, "watch consumer creation failed"),
1416            Self::Other => write!(f, "watch failed"),
1417            Self::TimedOut => write!(f, "timed out"),
1418        }
1419    }
1420}
1421
1422pub type WatchError = Error<WatchErrorKind>;
1423
1424crate::from_with_timeout!(WatchError, WatchErrorKind, ConsumerError, ConsumerErrorKind);
1425crate::from_with_timeout!(WatchError, WatchErrorKind, StreamError, StreamErrorKind);
1426
1427pub type ListError = WatchError;
1428pub type ListErrorKind = WatchErrorKind;
1429
1430#[derive(Clone, Copy, Debug, PartialEq)]
1431pub enum SealErrorKind {
1432    TimedOut,
1433    Other,
1434    Info,
1435    Update,
1436}
1437
1438impl Display for SealErrorKind {
1439    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1440        match self {
1441            Self::TimedOut => write!(f, "timed out"),
1442            Self::Other => write!(f, "seal failed"),
1443            Self::Info => write!(f, "failed getting stream info before sealing bucket"),
1444            Self::Update => write!(f, "failed sealing the bucket"),
1445        }
1446    }
1447}
1448
1449pub type SealError = Error<SealErrorKind>;
1450
1451impl From<super::context::UpdateStreamError> for SealError {
1452    fn from(err: super::context::UpdateStreamError) -> Self {
1453        match err.kind() {
1454            super::context::CreateStreamErrorKind::TimedOut => {
1455                SealError::new(SealErrorKind::TimedOut)
1456            }
1457            _ => SealError::with_source(SealErrorKind::Update, err),
1458        }
1459    }
1460}
1461
1462#[derive(Clone, Copy, Debug, PartialEq)]
1463pub enum WatcherErrorKind {
1464    ConsumerError,
1465    Other,
1466}
1467
1468impl Display for WatcherErrorKind {
1469    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1470        match self {
1471            Self::ConsumerError => write!(f, "watcher consumer error"),
1472            Self::Other => write!(f, "watcher error"),
1473        }
1474    }
1475}
1476
1477pub type WatcherError = Error<WatcherErrorKind>;
1478
1479impl From<OrderedError> for WatcherError {
1480    fn from(err: OrderedError) -> Self {
1481        WatcherError::with_source(WatcherErrorKind::ConsumerError, err)
1482    }
1483}
1484
1485pub type ListerError = WatcherError;
1486pub type ListerErrorKind = WatcherErrorKind;