cloud_storage_sync/
lib.rs

1#[macro_use]
2extern crate arrayref;
3
4pub mod error;
5pub mod gcs;
6pub mod local;
7
8pub use gcs::*;
9pub use local::*;
10
11mod util;
12
13use crate::error::*;
14
15pub(crate) type Result<T, E = Error> = std::result::Result<T, E>;
16
17#[cfg(test)]
18mod tests {
19    use crate::util::*;
20
21    use super::*;
22    use cloud_storage::{Client, ListRequest};
23    use futures::TryStreamExt;
24    use snafu::ResultExt;
25    use std::io::Read;
26    use std::io::Write;
27    use std::sync::Mutex;
28    use std::{
29        fs::{create_dir, remove_dir_all, File},
30        path::{Path, PathBuf},
31    };
32    use tempdir::TempDir;
33
34    lazy_static::lazy_static! {
35        // prevent error
36        // "dispatch dropped without returning error"
37        // caused by parallel tests
38        // https://github.com/ThouCheese/cloud-storage-rs/blob/master/src/lib.rs#L118
39        static ref RUNTIME: Mutex<tokio::runtime::Runtime> = Mutex::new(tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap());
40    }
41
42    #[test]
43    fn test_local_file_upload() {
44        RUNTIME.lock().unwrap().block_on(async {
45            let prefix = "local_file_upload";
46            init(prefix).await;
47
48            let client = Client::default(); // FIXME: get from local2gcs
49
50            let populated = PopulatedDir::new().unwrap();
51            let local = LocalSource::new(false, 2);
52
53            for i in 0..2 {
54                let op_count = local
55                    .to_gcs(&populated.somefile, &env_bucket(), &prefix)
56                    .await
57                    .unwrap();
58                if i == 0 {
59                    assert_eq!(op_count, 1);
60                } else {
61                    assert_eq!(op_count, 0);
62                }
63            }
64
65            let object = client
66                .object()
67                .read(&env_bucket(), &format!("{}/somefile", prefix))
68                .await
69                .unwrap();
70            assert_eq!(
71                file_crc32c(&populated.somefile).await.unwrap(),
72                object.crc32c_decode()
73            );
74            populated.remove().unwrap();
75            clear_bucket(prefix).await.unwrap();
76        });
77    }
78
79    #[test]
80    fn test_dir_sync() {
81        RUNTIME.lock().unwrap().block_on(async {
82            let prefix = "local_dir_upload";
83            init(prefix).await;
84            let populated = PopulatedDir::new().unwrap();
85
86            let gcs = GcsSource::new(false, 2);
87            let local = LocalSource::new(false, 2);
88
89            for i in 0..2 {
90                log::info!("upload iter {}", i);
91                let op_count = local
92                    .to_gcs(
93                        populated.tempdir.to_str_wrap().unwrap().to_owned(),
94                        &env_bucket(),
95                        prefix,
96                    )
97                    .await
98                    .unwrap();
99
100                if i == 0 {
101                    assert_eq!(op_count, 3);
102                } else {
103                    assert_eq!(op_count, 0);
104                }
105            }
106
107            let dir = TempDir::new("cloud-storage-sync").unwrap();
108            for i in 0..2 {
109                let op_count = gcs
110                    .to_local(&env_bucket(), prefix, dir.as_ref())
111                    .await
112                    .unwrap();
113                populated.assert_match(&dir.as_ref()).unwrap();
114
115                if i == 0 {
116                    // 2 op_count because we don't need to download an empty_dir/ object
117                    assert_eq!(op_count, 2);
118                } else {
119                    assert_eq!(op_count, 0);
120                }
121            }
122
123            populated.remove().unwrap();
124            clear_bucket(prefix).await.unwrap();
125        });
126    }
127
128    async fn init(prefix: &str) {
129        let _ = env_logger::try_init();
130        clear_bucket(prefix).await.unwrap();
131    }
132
133    async fn clear_bucket(prefix: &str) -> Result<(), cloud_storage::Error> {
134        let bucket = env_bucket();
135        let client = Client::default();
136        let objects = client
137            .object()
138            .list(
139                &bucket,
140                ListRequest {
141                    prefix: Some(prefix.to_owned()),
142                    ..Default::default()
143                },
144            )
145            .await?;
146        objects
147            .try_for_each(|objects| async {
148                for object in objects.items {
149                    log::trace!("deleting gs://{}{}", &object.bucket, &object.name);
150                    client.object().delete(&object.bucket, &object.name).await?;
151                }
152                Ok(())
153            })
154            .await?;
155        Ok(())
156    }
157
158    fn env_bucket() -> String {
159        dotenv::var("BUCKET").unwrap()
160    }
161
162    struct PopulatedDir {
163        pub tempdir: TempDir,
164        pub somefile: PathBuf,
165        pub dirpath: PathBuf,
166        pub dirfile: PathBuf,
167        pub empty: PathBuf,
168        pub dirfilecontents: String,
169    }
170
171    impl PopulatedDir {
172        fn new() -> Result<PopulatedDir, std::io::Error> {
173            let tempdir = TempDir::new("cloud-storage-sync")?;
174            let filepath = tempdir.as_ref().join("somefile");
175            let mut file = File::create(&filepath)?;
176            write!(&mut file, "somefilecontents")?;
177
178            let dirpath = tempdir.as_ref().join("somedir");
179            create_dir(&dirpath)?;
180            let dirfilepath = dirpath.join("dirfile");
181            let mut dirfile = File::create(&dirfilepath)?;
182            let mut dirfilecontents = String::new();
183            for _ in 0..1_000_000 {
184                write!(&mut dirfile, "10_bytes_")?;
185                dirfilecontents.push_str("10_bytes_");
186            }
187
188            let empty = tempdir.as_ref().join("empty_dir");
189            create_dir(&empty)?;
190            Ok(PopulatedDir {
191                tempdir,
192                somefile: filepath,
193                dirpath,
194                dirfile: dirfilepath,
195                empty,
196                dirfilecontents,
197            })
198        }
199
200        fn remove(self) -> Result<(), std::io::Error> {
201            remove_dir_all(self.tempdir)?;
202            Ok(())
203        }
204
205        #[allow(clippy::expect_fun_call)]
206        fn assert_match(&self, path: impl AsRef<Path>) -> Result<()> {
207            self.assert_file_match(&path, "somefile", "somefilecontents")?;
208            self.assert_file_match(&path, "somedir/dirfile", &self.dirfilecontents)?;
209            let empty_dir = format!("{}/empty_dir", path.as_ref().to_str().unwrap());
210            assert!(
211                std::fs::metadata(empty_dir.clone())
212                    .expect(&format!("{} should exist", empty_dir))
213                    .is_dir(),
214                "empty_dir should be a dir"
215            );
216            Ok(())
217        }
218
219        fn assert_file_match(
220            &self,
221            in_dir: impl AsRef<Path>,
222            path: impl AsRef<Path>,
223            content: &str,
224        ) -> Result<()> {
225            dotenv::dotenv().ok();
226            let path = in_dir.as_ref().join(path.as_ref());
227            let mut file = File::open(&path).context(Io { path: &path })?;
228            let mut contents = String::new();
229            file.read_to_string(&mut contents)
230                .context(Io { path: &path })?;
231            assert_eq!(contents, content);
232            Ok(())
233        }
234    }
235}