cloud_storage_sync/
local.rs

1use crate::error::*;
2use crate::util::*;
3use crate::Result;
4use cloud_storage::{object::Object, Client};
5use futures::future::{BoxFuture, FutureExt};
6use futures::stream::TryStreamExt;
7use snafu::{futures::TryStreamExt as SnafuTryStreamExt, ResultExt};
8use std::path::{Path, PathBuf};
9use tokio::fs::{self, File};
10
11#[derive(Debug)]
12pub struct LocalSource {
13    pub(crate) force_overwrite: bool,
14    pub(crate) concurrency: usize,
15    pub(crate) client: Client,
16}
17
18impl LocalSource {
19    pub fn new(force_overwrite: bool, concurrency: usize) -> Self {
20        let client = Client::default();
21        Self {
22            force_overwrite,
23            concurrency,
24            client,
25        }
26    }
27
28    pub fn client(&self) -> &Client {
29        &self.client
30    }
31
32    /// Syncs local file or directory to Gcs bucket
33    /// if path_src is a file then the resulting object will be [bucket_dst]/[path_dst]/[filename]
34    /// where [filename] is a string after the last "/" of the path_src
35    pub async fn to_gcs(
36        &self,
37        path_src: impl AsRef<Path>,
38        bucket_dst: &str,
39        path_dst: &str,
40    ) -> Result<usize, Error> {
41        let path_buf = PathBuf::from(path_src.as_ref());
42        if path_buf.is_dir() {
43            self.sync_local_dir_to_gcs(
44                path_src.to_str_wrap()?.to_owned(),
45                bucket_dst.to_owned(),
46                path_dst.to_owned(),
47            )
48            .await
49        } else {
50            let filename = path_buf.file_name().ok_or(Error::Other {
51                message: "path_src is not a file, should never happen, please report an issue",
52            })?;
53            let path_dst = PathBuf::from(path_dst).join(filename);
54            let gcs_path_dst = path_dst.to_str_wrap()?;
55            self.sync_local_file_to_gcs(path_src, bucket_dst, gcs_path_dst)
56                .await
57        }
58    }
59
60    /// Syncs local directory to gcs bucket
61    /// the resulting filenames will be [path_dst]/[filename]
62    /// where [filename] is path relative to the path_src
63    fn sync_local_dir_to_gcs(
64        &self,
65        // path_src: impl AsRef<Path>,
66        path_src: String,
67        bucket: String,
68        path_dst: String,
69    ) -> BoxFuture<Result<usize>> {
70        async move {
71            // get dir entries
72            let entries = fs::read_dir(&path_src).await.context(TokioIo {
73                path: path_src.clone(),
74            })?;
75            // convert to stream
76            let entries = tokio_stream::wrappers::ReadDirStream::new(entries);
77
78            let (entry_count, op_count) = entries
79                .context(Io { path: path_src })
80                .map_ok(|entry| (entry, bucket.clone(), path_dst.clone()))
81                .and_then(|(entry, bucket, path_dst)| async move {
82                    let entry_path = entry.path();
83                    let path_dst = PathBuf::from(&path_dst).join(entry.file_name());
84                    let path_dst = path_dst.to_str_wrap()?.to_owned();
85                    if entry_path.is_dir() {
86                        self.sync_local_dir_to_gcs(
87                            entry_path.to_str_wrap()?.to_owned(),
88                            bucket.clone(),
89                            path_dst.clone(),
90                        )
91                        .await
92                    } else {
93                        self.sync_local_file_to_gcs(&entry_path, &bucket, &path_dst)
94                            .await
95                    }
96                })
97                .try_fold(
98                    (0usize, 0usize),
99                    |(entry_count, op_count), entry_op_count| async move {
100                        Ok((entry_count + 1, op_count + entry_op_count))
101                    },
102                )
103                .await?;
104
105            if entry_count == 0 {
106                // empty directory, create an object/
107                let dir_object = format!("{}/", path_dst);
108                match Object::read(&bucket, &dir_object).await {
109                    Ok(_) => Ok(0),
110                    Err(cloud_storage::Error::Google(response))
111                        if response.errors_has_reason(&cloud_storage::Reason::NotFound) =>
112                    {
113                        log::trace!("Creating gs://{}{}", bucket, dir_object);
114                        Object::create(&bucket, vec![], &dir_object, "")
115                            .await
116                            .context(CloudStorage {
117                                object: dir_object,
118                                op: OpSource::CreateObject,
119                            })?;
120                        Ok(1)
121                    }
122                    Err(e) => Err(e).context(CloudStorage {
123                        object: dir_object,
124                        op: OpSource::ReadObject,
125                    }),
126                }
127            } else {
128                Ok(op_count)
129            }
130            // .map(|(count, ..)| count);
131        }
132        .boxed()
133    }
134
135    /// Syncs local file and remote object
136    async fn sync_local_file_to_gcs(
137        &self,
138        path_src: impl AsRef<Path>,
139        bucket: &str,
140        filename: &str,
141    ) -> Result<usize> {
142        if !self
143            .should_upload_local(path_src.as_ref(), bucket, filename)
144            .await?
145        {
146            log::trace!("Skip {:?}", path_src.as_ref());
147            Ok(0)
148        } else {
149            log::trace!(
150                "Copy {:?} to gs://{}/{}",
151                path_src.as_ref(),
152                bucket,
153                filename,
154            );
155            let file_src = File::open(path_src.as_ref()).await.context(Io {
156                path: path_src.as_ref(),
157            })?;
158            let metadata = file_src.metadata().await.context(Io {
159                path: path_src.as_ref(),
160            })?;
161            let length = metadata.len();
162            // let stream = ByteStream(Pin::new(Box::new(file_src)));
163            let stream = tokio_util::io::ReaderStream::new(file_src);
164            // let reader = BufReader::new(file_src);
165            let mime_type =
166                mime_guess::from_path(path_src).first_or(mime::APPLICATION_OCTET_STREAM);
167            let mime_type_str = mime_type.essence_str();
168            Object::create_streamed(bucket, stream, length, filename, mime_type_str)
169                .await
170                .context(CloudStorage {
171                    object: filename.to_owned(),
172                    op: OpSource::CreateObject,
173                })?;
174            Ok(1)
175        }
176    }
177
178    async fn should_upload_local(
179        &self,
180        path_src: impl AsRef<Path>,
181        bucket: &str,
182        filename: &str,
183    ) -> Result<bool> {
184        if self.force_overwrite {
185            return Ok(true);
186        }
187
188        let src_len = path_src
189            .as_ref()
190            .metadata()
191            .context(Io {
192                path: path_src.as_ref(),
193            })?
194            .len();
195        if let Ok(object) = self.client.object().read(bucket, filename).await {
196            if object.size != src_len {
197                log::trace!("Size mismatch, src: {}, dst: {}", src_len, object.size);
198                Ok(true)
199            } else if file_crc32c(path_src.as_ref()).await.context(Io {
200                path: path_src.as_ref(),
201            })? != object.crc32c_decode()
202            {
203                log::trace!("Crc32c mismatch");
204                Ok(true)
205            } else {
206                Ok(false)
207            }
208        } else {
209            // cloud-sync-rs don't provide semantic errors, so on any error we assume here that file does not exists in a bucket
210            Ok(true)
211        }
212    }
213}
214
215pub(crate) trait ToStrWrap {
216    fn to_str_wrap(&self) -> Result<&str>;
217}
218
219impl<P: AsRef<Path>> ToStrWrap for P {
220    fn to_str_wrap(&self) -> Result<&str> {
221        self.as_ref().to_str().ok_or(Error::Other {
222            message: "Can't convert Path to &str, should never happen, please report an issue",
223        })
224    }
225}