gcs_rsync/gcp/storage/
object.rs

1use futures::{Stream, StreamExt, TryStream, TryStreamExt};
2
3use crate::oauth2::token::TokenGenerator;
4
5use super::{
6    client::StorageClient,
7    resources::object::{ObjectMetadata, Objects},
8    Bucket, StorageResult, {Object, ObjectsListRequest, PartialObject},
9};
10
11pub struct ObjectClient {
12    storage_client: StorageClient,
13}
14
15impl ObjectClient {
16    pub async fn new(token_generator: Box<dyn TokenGenerator>) -> StorageResult<Self> {
17        Ok(Self {
18            storage_client: StorageClient::new(token_generator).await?,
19        })
20    }
21
22    pub fn no_auth() -> Self {
23        Self {
24            storage_client: StorageClient::no_auth(),
25        }
26    }
27
28    pub async fn get(&self, o: &Object, fields: &str) -> StorageResult<PartialObject> {
29        let url = o.url();
30        self.storage_client
31            .get_as_json(url.as_str(), &[("fields", fields)])
32            .await
33    }
34
35    pub async fn delete(&self, o: &Object) -> StorageResult<String> {
36        let url = o.url();
37        self.storage_client.delete(&url).await?;
38        super::StorageResult::Ok(url)
39    }
40
41    pub async fn download(
42        &self,
43        o: &Object,
44    ) -> StorageResult<impl Stream<Item = StorageResult<bytes::Bytes>>> {
45        let url = o.url();
46        self.storage_client
47            .get_as_stream(&url, &[("alt", "media")])
48            .await
49    }
50
51    pub async fn upload<S>(&self, o: &Object, stream: S) -> StorageResult<()>
52    where
53        S: futures::TryStream + Send + Sync + 'static,
54        S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
55        bytes::Bytes: From<S::Ok>,
56    {
57        let url = o.upload_url("media");
58        self.storage_client.post(&url, stream).await?;
59        super::StorageResult::Ok(())
60    }
61
62    pub async fn upload_with_metadata<S>(
63        &self,
64        m: &ObjectMetadata,
65        o: &Object,
66        stream: S,
67    ) -> StorageResult<()>
68    where
69        S: TryStream<Ok = bytes::Bytes> + Send + Sync + 'static,
70        S::Error: Into<Box<dyn std::error::Error + Send + Sync>> + Send + Sync,
71    {
72        let url = o.upload_url("multipart");
73        self.storage_client.post_multipart(&url, m, stream).await?;
74        super::StorageResult::Ok(())
75    }
76
77    fn list_url(bucket: &str) -> String {
78        format!("{}/o", Bucket::new(bucket).url())
79    }
80
81    pub async fn is_valid(&self, bucket: &str, prefix: &str) -> StorageResult<()> {
82        // For public bucket, the json api for bucket does not work to check if a bucket exists (surely for security reason).
83        // Only listing operation works. Listing the first page is quick.
84        let objects_list_request = ObjectsListRequest {
85            prefix: Some(prefix.to_owned()),
86            fields: Some("items(name)".to_owned()),
87            max_results: Some(1),
88            ..Default::default()
89        };
90
91        let url = Self::list_url(bucket);
92        self.storage_client
93            .get_as_json(&url, &objects_list_request)
94            .await
95            .map(|_: Objects| ())
96    }
97
98    pub async fn list(
99        &self,
100        bucket: &str,
101        objects_list_request: &ObjectsListRequest,
102    ) -> impl Stream<Item = StorageResult<PartialObject>> + '_ {
103        let objects_list_request = objects_list_request.to_owned();
104        let url = Self::list_url(bucket);
105        futures::stream::try_unfold(
106            (Some(objects_list_request), url),
107            move |(state, url)| async move {
108                match state {
109                    None => Ok(None),
110                    Some(state) => {
111                        let objects: Objects =
112                            self.storage_client.get_as_json(&url, &state).await?;
113                        let items = futures::stream::iter(objects.items).map(Ok);
114                        match objects.next_page_token {
115                            None => Ok(Some((items, (None, url)))),
116                            Some(next_token) => {
117                                let new_state = ObjectsListRequest {
118                                    page_token: Some(next_token),
119                                    ..state
120                                };
121                                Ok(Some((items, (Some(new_state), url))))
122                            }
123                        }
124                    }
125                }
126            },
127        )
128        .try_flatten()
129    }
130}