cloud_storage_sync/
lib.rs1#[macro_use]
2extern crate arrayref;
3
4pub mod error;
5pub mod gcs;
6pub mod local;
7
8pub use gcs::*;
9pub use local::*;
10
11mod util;
12
13use crate::error::*;
14
15pub(crate) type Result<T, E = Error> = std::result::Result<T, E>;
16
17#[cfg(test)]
18mod tests {
19 use crate::util::*;
20
21 use super::*;
22 use cloud_storage::{Client, ListRequest};
23 use futures::TryStreamExt;
24 use snafu::ResultExt;
25 use std::io::Read;
26 use std::io::Write;
27 use std::sync::Mutex;
28 use std::{
29 fs::{create_dir, remove_dir_all, File},
30 path::{Path, PathBuf},
31 };
32 use tempdir::TempDir;
33
34 lazy_static::lazy_static! {
35 static ref RUNTIME: Mutex<tokio::runtime::Runtime> = Mutex::new(tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap());
40 }
41
42 #[test]
43 fn test_local_file_upload() {
44 RUNTIME.lock().unwrap().block_on(async {
45 let prefix = "local_file_upload";
46 init(prefix).await;
47
48 let client = Client::default(); let populated = PopulatedDir::new().unwrap();
51 let local = LocalSource::new(false, 2);
52
53 for i in 0..2 {
54 let op_count = local
55 .to_gcs(&populated.somefile, &env_bucket(), &prefix)
56 .await
57 .unwrap();
58 if i == 0 {
59 assert_eq!(op_count, 1);
60 } else {
61 assert_eq!(op_count, 0);
62 }
63 }
64
65 let object = client
66 .object()
67 .read(&env_bucket(), &format!("{}/somefile", prefix))
68 .await
69 .unwrap();
70 assert_eq!(
71 file_crc32c(&populated.somefile).await.unwrap(),
72 object.crc32c_decode()
73 );
74 populated.remove().unwrap();
75 clear_bucket(prefix).await.unwrap();
76 });
77 }
78
79 #[test]
80 fn test_dir_sync() {
81 RUNTIME.lock().unwrap().block_on(async {
82 let prefix = "local_dir_upload";
83 init(prefix).await;
84 let populated = PopulatedDir::new().unwrap();
85
86 let gcs = GcsSource::new(false, 2);
87 let local = LocalSource::new(false, 2);
88
89 for i in 0..2 {
90 log::info!("upload iter {}", i);
91 let op_count = local
92 .to_gcs(
93 populated.tempdir.to_str_wrap().unwrap().to_owned(),
94 &env_bucket(),
95 prefix,
96 )
97 .await
98 .unwrap();
99
100 if i == 0 {
101 assert_eq!(op_count, 3);
102 } else {
103 assert_eq!(op_count, 0);
104 }
105 }
106
107 let dir = TempDir::new("cloud-storage-sync").unwrap();
108 for i in 0..2 {
109 let op_count = gcs
110 .to_local(&env_bucket(), prefix, dir.as_ref())
111 .await
112 .unwrap();
113 populated.assert_match(&dir.as_ref()).unwrap();
114
115 if i == 0 {
116 assert_eq!(op_count, 2);
118 } else {
119 assert_eq!(op_count, 0);
120 }
121 }
122
123 populated.remove().unwrap();
124 clear_bucket(prefix).await.unwrap();
125 });
126 }
127
128 async fn init(prefix: &str) {
129 let _ = env_logger::try_init();
130 clear_bucket(prefix).await.unwrap();
131 }
132
133 async fn clear_bucket(prefix: &str) -> Result<(), cloud_storage::Error> {
134 let bucket = env_bucket();
135 let client = Client::default();
136 let objects = client
137 .object()
138 .list(
139 &bucket,
140 ListRequest {
141 prefix: Some(prefix.to_owned()),
142 ..Default::default()
143 },
144 )
145 .await?;
146 objects
147 .try_for_each(|objects| async {
148 for object in objects.items {
149 log::trace!("deleting gs://{}{}", &object.bucket, &object.name);
150 client.object().delete(&object.bucket, &object.name).await?;
151 }
152 Ok(())
153 })
154 .await?;
155 Ok(())
156 }
157
158 fn env_bucket() -> String {
159 dotenv::var("BUCKET").unwrap()
160 }
161
162 struct PopulatedDir {
163 pub tempdir: TempDir,
164 pub somefile: PathBuf,
165 pub dirpath: PathBuf,
166 pub dirfile: PathBuf,
167 pub empty: PathBuf,
168 pub dirfilecontents: String,
169 }
170
171 impl PopulatedDir {
172 fn new() -> Result<PopulatedDir, std::io::Error> {
173 let tempdir = TempDir::new("cloud-storage-sync")?;
174 let filepath = tempdir.as_ref().join("somefile");
175 let mut file = File::create(&filepath)?;
176 write!(&mut file, "somefilecontents")?;
177
178 let dirpath = tempdir.as_ref().join("somedir");
179 create_dir(&dirpath)?;
180 let dirfilepath = dirpath.join("dirfile");
181 let mut dirfile = File::create(&dirfilepath)?;
182 let mut dirfilecontents = String::new();
183 for _ in 0..1_000_000 {
184 write!(&mut dirfile, "10_bytes_")?;
185 dirfilecontents.push_str("10_bytes_");
186 }
187
188 let empty = tempdir.as_ref().join("empty_dir");
189 create_dir(&empty)?;
190 Ok(PopulatedDir {
191 tempdir,
192 somefile: filepath,
193 dirpath,
194 dirfile: dirfilepath,
195 empty,
196 dirfilecontents,
197 })
198 }
199
200 fn remove(self) -> Result<(), std::io::Error> {
201 remove_dir_all(self.tempdir)?;
202 Ok(())
203 }
204
205 #[allow(clippy::expect_fun_call)]
206 fn assert_match(&self, path: impl AsRef<Path>) -> Result<()> {
207 self.assert_file_match(&path, "somefile", "somefilecontents")?;
208 self.assert_file_match(&path, "somedir/dirfile", &self.dirfilecontents)?;
209 let empty_dir = format!("{}/empty_dir", path.as_ref().to_str().unwrap());
210 assert!(
211 std::fs::metadata(empty_dir.clone())
212 .expect(&format!("{} should exist", empty_dir))
213 .is_dir(),
214 "empty_dir should be a dir"
215 );
216 Ok(())
217 }
218
219 fn assert_file_match(
220 &self,
221 in_dir: impl AsRef<Path>,
222 path: impl AsRef<Path>,
223 content: &str,
224 ) -> Result<()> {
225 dotenv::dotenv().ok();
226 let path = in_dir.as_ref().join(path.as_ref());
227 let mut file = File::open(&path).context(Io { path: &path })?;
228 let mut contents = String::new();
229 file.read_to_string(&mut contents)
230 .context(Io { path: &path })?;
231 assert_eq!(contents, content);
232 Ok(())
233 }
234 }
235}