1use crate::error::*;
2use crate::util::*;
3use crate::Result;
4use cloud_storage::{object::Object, Client, ListRequest};
5use futures::stream::FuturesUnordered;
6use futures::stream::{StreamExt, TryStreamExt};
7use snafu::{futures::TryStreamExt as SnafuTryStreamExt, ResultExt};
8use std::path::{Path, PathBuf};
9use tokio::fs::{self, File};
10use tokio::io::AsyncWriteExt;
11
12#[derive(Debug)]
13pub struct GcsSource {
14 pub(crate) force_overwrite: bool,
15 pub(crate) concurrency: usize,
16 pub(crate) client: Client,
17}
18
19impl GcsSource {
20 pub fn new(force_overwrite: bool, concurrency: usize) -> Self {
21 let client = Client::default();
22 Self {
23 force_overwrite,
24 concurrency,
25 client,
26 }
27 }
28
29 pub fn client(&self) -> &Client {
30 &self.client
31 }
32
33 pub async fn to_local(
37 &self,
38 bucket_src: &str,
39 path_src: &str,
40 dst_dir: impl AsRef<Path>,
41 ) -> Result<usize> {
42 log::trace!(
43 "Syncing bucket: {}, path: {} to local path: {:?}",
44 bucket_src,
45 path_src,
46 dst_dir.as_ref()
47 );
48 let dst_dir = dst_dir.as_ref();
49 log::trace!("Requesting objects");
50 let objects_src = self
51 .client
52 .object()
53 .list(
54 bucket_src,
55 ListRequest {
56 prefix: Some(path_src.to_owned()),
57 ..Default::default()
58 },
59 )
60 .await
61 .context(CloudStorage {
62 object: path_src.to_owned(),
63 op: OpSource::pre(OpSource::ListPrefix),
64 })?;
65 log::trace!("iterating objects");
66 objects_src
67 .context(CloudStorage {
68 object: path_src.to_owned(),
69 op: OpSource::ListPrefix,
70 })
71 .try_fold(
73 (0usize, dst_dir),
74 |(mut count, dst_dir), object_srcs| async move {
75 log::trace!("objects: {:?}", object_srcs);
76 let mut jobs_pool = FuturesUnordered::new();
77
78 for object_src in object_srcs.items {
79 log::trace!("object: {:?}", object_src);
80
81 if jobs_pool.len() == self.concurrency {
82 count += jobs_pool.next().await.unwrap()?;
84 }
85
86 let strip_prefix = if path_src.ends_with('/') {
87 path_src.to_owned()
88 } else {
89 format!("{}/", path_src)
90 };
91 let stripped_object_name =
92 object_src.name.strip_prefix(&strip_prefix).ok_or({
93 Error::Other {
94 message: "Failed to strip path prefix, should never happen, please report an issue",
95 }
96 })?;
97 let path_dst = dst_dir.join(stripped_object_name);
98
99 Self::create_parent_dirs(self.force_overwrite, &path_dst).await?;
100
101 if object_src.name.ends_with('/') {
102 let created =
103 Self::maybe_create_dir(self.force_overwrite, &path_dst).await?;
104 if let Some(created) = created {
105 log::trace!("Created dir {:?}", created.as_os_str());
106 }
107 continue;
108 }
109
110 let path_dst = path_dst.to_str().expect("valid utf8 file name").to_owned();
111
112 log::trace!("downloading object {:?}", object_src);
113 let job = Self::download_object(
114 self.force_overwrite,
115 bucket_src,
116 path_dst,
117 object_src,
118 );
119
120 jobs_pool.push(job);
121 }
122
123 log::trace!("waiting for jobs completion");
124 while let Some(job) = jobs_pool.next().await {
125 count += job?;
126 }
127 log::trace!("all jobs completed");
128
129 Ok((count, dst_dir))
130 },
131 )
132 .await
133 .map(|(count, _)| count)
134 }
135
136 pub async fn to_gcs(
138 &self,
139 bucket_src: &str,
140 path_src: &str,
141 bucket_dst: &str,
142 path_dst: &str,
143 ) -> Result<usize, Error> {
144 let objects_src = self
145 .client
146 .object()
147 .list(
148 bucket_src,
149 ListRequest {
150 prefix: Some(path_src.to_owned()),
151 ..Default::default()
152 },
153 )
154 .await
155 .context(CloudStorage {
156 object: path_src.to_owned(),
157 op: OpSource::pre(OpSource::ListPrefix),
158 })?;
159 objects_src
160 .context(CloudStorage {
161 object: path_src.to_owned(),
162 op: OpSource::ListPrefix,
163 })
164 .try_fold(
166 (0usize, bucket_dst, path_dst),
167 |(mut count, bucket_dst, path_dst), object_srcs| async move {
168 for object_src in object_srcs.items {
169 object_src
170 .copy(bucket_dst, path_dst)
171 .await
172 .context(CloudStorage {
173 object: path_dst.to_owned(),
174 op: OpSource::CopyObject,
175 })?;
176 count += 1;
177 }
178
179 Ok((count, bucket_dst, path_dst))
180 },
181 )
182 .await
183 .map(|(count, ..)| count)
184 }
185
186 async fn create_parent_dirs(force_overwrite: bool, path_dst: impl AsRef<Path>) -> Result<()> {
187 let path_dst = PathBuf::from(path_dst.as_ref());
188
189 if let Some(dir_dst) = path_dst.parent() {
190 if FileUtil::exists(dir_dst).await {
191 if !FileUtil::is_dir(dir_dst).await {
192 if force_overwrite {
193 fs::remove_file(dir_dst)
194 .await
195 .context(Io { path: dir_dst })?;
196 } else {
197 return Err(Error::AlreadyExists { path: path_dst });
198 }
199 }
200 } else {
201 log::trace!("Creating directory {:?}", &dir_dst);
202 fs::create_dir_all(dir_dst)
203 .await
204 .context(Io { path: dir_dst })?;
205 }
206 }
207
208 Ok(())
209 }
210
211 async fn maybe_create_dir(
212 force_overwrite: bool,
213 path_dst: impl AsRef<Path>,
214 ) -> Result<Option<PathBuf>> {
215 let path_dst = path_dst.as_ref();
216 let path_dst = PathBuf::from(path_dst);
217 let path_dst = path_dst.as_path();
218 match path_dst.metadata() {
219 Ok(md) if md.is_dir() => Ok(None),
220 Ok(_) => {
221 if force_overwrite {
222 std::fs::remove_file(path_dst).context(Io { path: path_dst })?;
223 std::fs::create_dir(path_dst).context(Io { path: path_dst })?;
224 Ok(Some(path_dst.to_owned()))
225 } else {
226 Err(Error::AlreadyExists {
227 path: PathBuf::from(path_dst),
228 })
229 }
230 }
231 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
232 std::fs::create_dir(path_dst).context(Io { path: path_dst })?;
233 Ok(Some(path_dst.to_owned()))
234 }
235 Err(err) => Err(err).context(Io { path: path_dst }),
236 }
237 }
238
239 async fn download_object(
240 force_overwrite: bool,
241 bucket_src: &str,
242 path_dst: impl AsRef<Path>,
243 object_src: Object,
244 ) -> Result<usize> {
245 let mut count = 0;
246 let path_dst = path_dst.as_ref();
247
248 if !Self::should_download(force_overwrite, &object_src, path_dst).await? {
249 log::trace!("Skip {:?}", object_src.name);
250 } else {
251 log::trace!(
252 "Copy gs://{}/{} to {:?}",
253 bucket_src,
254 object_src.name,
255 &path_dst,
256 );
257 let file_dst = File::create(path_dst)
258 .await
259 .context(Io { path: path_dst })?;
260
261 let url_src = object_src.download_url(60).context(CloudStorage {
262 object: object_src.name.to_owned(),
263 op: OpSource::DownloadUrl,
264 })?;
265 let response_src = reqwest::get(&url_src).await?;
266
267 let (file_dst, copied) = response_src
268 .bytes_stream()
269 .map_err(Error::from)
270 .try_fold((file_dst, 0), |(mut file_dst, copied), chunk| async move {
271 let copied = copied + chunk.len();
272 file_dst
273 .write_all(&chunk)
274 .await
275 .context(Io { path: path_dst })?;
276 Ok((file_dst, copied))
277 })
278 .await?;
279
280 file_dst.sync_all().await.context(Io { path: path_dst })?;
281 count += 1;
282
283 log::trace!("Copied {} bytes", copied);
284 }
285 Ok(count)
286 }
287
288 async fn should_download(
289 force_overwrite: bool,
290 object: &Object,
291 path_dst: impl AsRef<Path>,
292 ) -> Result<bool> {
293 if force_overwrite {
294 return Ok(true);
295 }
296
297 if !path_dst.as_ref().exists() {
298 return Ok(true);
299 }
300
301 let dst_len = path_dst
302 .as_ref()
303 .metadata()
304 .context(Io {
305 path: path_dst.as_ref(),
306 })?
307 .len();
308
309 if dst_len != object.size {
310 log::trace!("Size mismatch, src: {}, dst: {}", object.size, dst_len);
311 Ok(true)
312 } else if file_crc32c(path_dst.as_ref()).await.context(Io {
313 path: path_dst.as_ref(),
314 })? != object.crc32c_decode()
315 {
316 log::trace!("Crc32c mismatch");
317 Ok(true)
318 } else {
319 Ok(false)
320 }
321 }
322}