gcs_rsync/gcp/storage/
object.rs1use futures::{Stream, StreamExt, TryStream, TryStreamExt};
2
3use crate::oauth2::token::TokenGenerator;
4
5use super::{
6 client::StorageClient,
7 resources::object::{ObjectMetadata, Objects},
8 Bucket, StorageResult, {Object, ObjectsListRequest, PartialObject},
9};
10
11pub struct ObjectClient {
12 storage_client: StorageClient,
13}
14
15impl ObjectClient {
16 pub async fn new(token_generator: Box<dyn TokenGenerator>) -> StorageResult<Self> {
17 Ok(Self {
18 storage_client: StorageClient::new(token_generator).await?,
19 })
20 }
21
22 pub fn no_auth() -> Self {
23 Self {
24 storage_client: StorageClient::no_auth(),
25 }
26 }
27
28 pub async fn get(&self, o: &Object, fields: &str) -> StorageResult<PartialObject> {
29 let url = o.url();
30 self.storage_client
31 .get_as_json(url.as_str(), &[("fields", fields)])
32 .await
33 }
34
35 pub async fn delete(&self, o: &Object) -> StorageResult<String> {
36 let url = o.url();
37 self.storage_client.delete(&url).await?;
38 super::StorageResult::Ok(url)
39 }
40
41 pub async fn download(
42 &self,
43 o: &Object,
44 ) -> StorageResult<impl Stream<Item = StorageResult<bytes::Bytes>>> {
45 let url = o.url();
46 self.storage_client
47 .get_as_stream(&url, &[("alt", "media")])
48 .await
49 }
50
51 pub async fn upload<S>(&self, o: &Object, stream: S) -> StorageResult<()>
52 where
53 S: futures::TryStream + Send + Sync + 'static,
54 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
55 bytes::Bytes: From<S::Ok>,
56 {
57 let url = o.upload_url("media");
58 self.storage_client.post(&url, stream).await?;
59 super::StorageResult::Ok(())
60 }
61
62 pub async fn upload_with_metadata<S>(
63 &self,
64 m: &ObjectMetadata,
65 o: &Object,
66 stream: S,
67 ) -> StorageResult<()>
68 where
69 S: TryStream<Ok = bytes::Bytes> + Send + Sync + 'static,
70 S::Error: Into<Box<dyn std::error::Error + Send + Sync>> + Send + Sync,
71 {
72 let url = o.upload_url("multipart");
73 self.storage_client.post_multipart(&url, m, stream).await?;
74 super::StorageResult::Ok(())
75 }
76
77 fn list_url(bucket: &str) -> String {
78 format!("{}/o", Bucket::new(bucket).url())
79 }
80
81 pub async fn is_valid(&self, bucket: &str, prefix: &str) -> StorageResult<()> {
82 let objects_list_request = ObjectsListRequest {
85 prefix: Some(prefix.to_owned()),
86 fields: Some("items(name)".to_owned()),
87 max_results: Some(1),
88 ..Default::default()
89 };
90
91 let url = Self::list_url(bucket);
92 self.storage_client
93 .get_as_json(&url, &objects_list_request)
94 .await
95 .map(|_: Objects| ())
96 }
97
98 pub async fn list(
99 &self,
100 bucket: &str,
101 objects_list_request: &ObjectsListRequest,
102 ) -> impl Stream<Item = StorageResult<PartialObject>> + '_ {
103 let objects_list_request = objects_list_request.to_owned();
104 let url = Self::list_url(bucket);
105 futures::stream::try_unfold(
106 (Some(objects_list_request), url),
107 move |(state, url)| async move {
108 match state {
109 None => Ok(None),
110 Some(state) => {
111 let objects: Objects =
112 self.storage_client.get_as_json(&url, &state).await?;
113 let items = futures::stream::iter(objects.items).map(Ok);
114 match objects.next_page_token {
115 None => Ok(Some((items, (None, url)))),
116 Some(next_token) => {
117 let new_state = ObjectsListRequest {
118 page_token: Some(next_token),
119 ..state
120 };
121 Ok(Some((items, (Some(new_state), url))))
122 }
123 }
124 }
125 }
126 },
127 )
128 .try_flatten()
129 }
130}