cloud_storage_sync/
gcs.rs

1use crate::error::*;
2use crate::util::*;
3use crate::Result;
4use cloud_storage::{object::Object, Client, ListRequest};
5use futures::stream::FuturesUnordered;
6use futures::stream::{StreamExt, TryStreamExt};
7use snafu::{futures::TryStreamExt as SnafuTryStreamExt, ResultExt};
8use std::path::{Path, PathBuf};
9use tokio::fs::{self, File};
10use tokio::io::AsyncWriteExt;
11
12#[derive(Debug)]
13pub struct GcsSource {
14    pub(crate) force_overwrite: bool,
15    pub(crate) concurrency: usize,
16    pub(crate) client: Client,
17}
18
19impl GcsSource {
20    pub fn new(force_overwrite: bool, concurrency: usize) -> Self {
21        let client = Client::default();
22        Self {
23            force_overwrite,
24            concurrency,
25            client,
26        }
27    }
28
29    pub fn client(&self) -> &Client {
30        &self.client
31    }
32
33    /// Syncs remote Gcs bucket path to a local path
34    ///
35    /// Returns actual downloads count
36    pub async fn to_local(
37        &self,
38        bucket_src: &str,
39        path_src: &str,
40        dst_dir: impl AsRef<Path>,
41    ) -> Result<usize> {
42        log::trace!(
43            "Syncing bucket: {}, path: {} to local path: {:?}",
44            bucket_src,
45            path_src,
46            dst_dir.as_ref()
47        );
48        let dst_dir = dst_dir.as_ref();
49        log::trace!("Requesting objects");
50        let objects_src = self
51            .client
52            .object()
53            .list(
54                bucket_src,
55                ListRequest {
56                    prefix: Some(path_src.to_owned()),
57                    ..Default::default()
58                },
59            )
60            .await
61            .context(CloudStorage {
62                object: path_src.to_owned(),
63                op: OpSource::pre(OpSource::ListPrefix),
64            })?;
65        log::trace!("iterating objects");
66        objects_src
67            .context(CloudStorage {
68                object: path_src.to_owned(),
69                op: OpSource::ListPrefix,
70            })
71            // .map_err(Error::from)
72            .try_fold(
73                (0usize, dst_dir),
74                |(mut count, dst_dir), object_srcs| async move {
75                    log::trace!("objects: {:?}", object_srcs);
76                    let mut jobs_pool = FuturesUnordered::new();
77
78                    for object_src in object_srcs.items {
79                        log::trace!("object: {:?}", object_src);
80
81                        if jobs_pool.len() == self.concurrency {
82                            // unwrap because it's not empty
83                            count += jobs_pool.next().await.unwrap()?;
84                        }
85
86                        let strip_prefix = if path_src.ends_with('/') {
87                            path_src.to_owned()
88                        } else {
89                            format!("{}/", path_src)
90                        };
91                        let stripped_object_name =
92                            object_src.name.strip_prefix(&strip_prefix).ok_or({
93                                Error::Other {
94                message: "Failed to strip path prefix, should never happen, please report an issue",
95            }
96                            })?;
97                        let path_dst = dst_dir.join(stripped_object_name);
98
99                        Self::create_parent_dirs(self.force_overwrite, &path_dst).await?;
100
101                        if object_src.name.ends_with('/') {
102                            let created =
103                                Self::maybe_create_dir(self.force_overwrite, &path_dst).await?;
104                            if let Some(created) = created {
105                                log::trace!("Created dir {:?}", created.as_os_str());
106                            }
107                            continue;
108                        }
109
110                        let path_dst = path_dst.to_str().expect("valid utf8 file name").to_owned();
111
112                        log::trace!("downloading object {:?}", object_src);
113                        let job = Self::download_object(
114                            self.force_overwrite,
115                            bucket_src,
116                            path_dst,
117                            object_src,
118                        );
119
120                        jobs_pool.push(job);
121                    }
122
123                    log::trace!("waiting for jobs completion");
124                    while let Some(job) = jobs_pool.next().await {
125                        count += job?;
126                    }
127                    log::trace!("all jobs completed");
128
129                    Ok((count, dst_dir))
130                },
131            )
132            .await
133            .map(|(count, _)| count)
134    }
135
136    /// Copies remote Gcs bucket file or directory to another remote Gcs bucket file or directory
137    pub async fn to_gcs(
138        &self,
139        bucket_src: &str,
140        path_src: &str,
141        bucket_dst: &str,
142        path_dst: &str,
143    ) -> Result<usize, Error> {
144        let objects_src = self
145            .client
146            .object()
147            .list(
148                bucket_src,
149                ListRequest {
150                    prefix: Some(path_src.to_owned()),
151                    ..Default::default()
152                },
153            )
154            .await
155            .context(CloudStorage {
156                object: path_src.to_owned(),
157                op: OpSource::pre(OpSource::ListPrefix),
158            })?;
159        objects_src
160            .context(CloudStorage {
161                object: path_src.to_owned(),
162                op: OpSource::ListPrefix,
163            })
164            // .map_err(Error::from)
165            .try_fold(
166                (0usize, bucket_dst, path_dst),
167                |(mut count, bucket_dst, path_dst), object_srcs| async move {
168                    for object_src in object_srcs.items {
169                        object_src
170                            .copy(bucket_dst, path_dst)
171                            .await
172                            .context(CloudStorage {
173                                object: path_dst.to_owned(),
174                                op: OpSource::CopyObject,
175                            })?;
176                        count += 1;
177                    }
178
179                    Ok((count, bucket_dst, path_dst))
180                },
181            )
182            .await
183            .map(|(count, ..)| count)
184    }
185
186    async fn create_parent_dirs(force_overwrite: bool, path_dst: impl AsRef<Path>) -> Result<()> {
187        let path_dst = PathBuf::from(path_dst.as_ref());
188
189        if let Some(dir_dst) = path_dst.parent() {
190            if FileUtil::exists(dir_dst).await {
191                if !FileUtil::is_dir(dir_dst).await {
192                    if force_overwrite {
193                        fs::remove_file(dir_dst)
194                            .await
195                            .context(Io { path: dir_dst })?;
196                    } else {
197                        return Err(Error::AlreadyExists { path: path_dst });
198                    }
199                }
200            } else {
201                log::trace!("Creating directory {:?}", &dir_dst);
202                fs::create_dir_all(dir_dst)
203                    .await
204                    .context(Io { path: dir_dst })?;
205            }
206        }
207
208        Ok(())
209    }
210
211    async fn maybe_create_dir(
212        force_overwrite: bool,
213        path_dst: impl AsRef<Path>,
214    ) -> Result<Option<PathBuf>> {
215        let path_dst = path_dst.as_ref();
216        let path_dst = PathBuf::from(path_dst);
217        let path_dst = path_dst.as_path();
218        match path_dst.metadata() {
219            Ok(md) if md.is_dir() => Ok(None),
220            Ok(_) => {
221                if force_overwrite {
222                    std::fs::remove_file(path_dst).context(Io { path: path_dst })?;
223                    std::fs::create_dir(path_dst).context(Io { path: path_dst })?;
224                    Ok(Some(path_dst.to_owned()))
225                } else {
226                    Err(Error::AlreadyExists {
227                        path: PathBuf::from(path_dst),
228                    })
229                }
230            }
231            Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
232                std::fs::create_dir(path_dst).context(Io { path: path_dst })?;
233                Ok(Some(path_dst.to_owned()))
234            }
235            Err(err) => Err(err).context(Io { path: path_dst }),
236        }
237    }
238
239    async fn download_object(
240        force_overwrite: bool,
241        bucket_src: &str,
242        path_dst: impl AsRef<Path>,
243        object_src: Object,
244    ) -> Result<usize> {
245        let mut count = 0;
246        let path_dst = path_dst.as_ref();
247
248        if !Self::should_download(force_overwrite, &object_src, path_dst).await? {
249            log::trace!("Skip {:?}", object_src.name);
250        } else {
251            log::trace!(
252                "Copy gs://{}/{} to {:?}",
253                bucket_src,
254                object_src.name,
255                &path_dst,
256            );
257            let file_dst = File::create(path_dst)
258                .await
259                .context(Io { path: path_dst })?;
260
261            let url_src = object_src.download_url(60).context(CloudStorage {
262                object: object_src.name.to_owned(),
263                op: OpSource::DownloadUrl,
264            })?;
265            let response_src = reqwest::get(&url_src).await?;
266
267            let (file_dst, copied) = response_src
268                .bytes_stream()
269                .map_err(Error::from)
270                .try_fold((file_dst, 0), |(mut file_dst, copied), chunk| async move {
271                    let copied = copied + chunk.len();
272                    file_dst
273                        .write_all(&chunk)
274                        .await
275                        .context(Io { path: path_dst })?;
276                    Ok((file_dst, copied))
277                })
278                .await?;
279
280            file_dst.sync_all().await.context(Io { path: path_dst })?;
281            count += 1;
282
283            log::trace!("Copied {} bytes", copied);
284        }
285        Ok(count)
286    }
287
288    async fn should_download(
289        force_overwrite: bool,
290        object: &Object,
291        path_dst: impl AsRef<Path>,
292    ) -> Result<bool> {
293        if force_overwrite {
294            return Ok(true);
295        }
296
297        if !path_dst.as_ref().exists() {
298            return Ok(true);
299        }
300
301        let dst_len = path_dst
302            .as_ref()
303            .metadata()
304            .context(Io {
305                path: path_dst.as_ref(),
306            })?
307            .len();
308
309        if dst_len != object.size {
310            log::trace!("Size mismatch, src: {}, dst: {}", object.size, dst_len);
311            Ok(true)
312        } else if file_crc32c(path_dst.as_ref()).await.context(Io {
313            path: path_dst.as_ref(),
314        })? != object.crc32c_decode()
315        {
316            log::trace!("Crc32c mismatch");
317            Ok(true)
318        } else {
319            Ok(false)
320        }
321    }
322}