1use crate::error::*;
2use crate::util::*;
3use crate::Result;
4use cloud_storage::{object::Object, Client};
5use futures::future::{BoxFuture, FutureExt};
6use futures::stream::TryStreamExt;
7use snafu::{futures::TryStreamExt as SnafuTryStreamExt, ResultExt};
8use std::path::{Path, PathBuf};
9use tokio::fs::{self, File};
10
11#[derive(Debug)]
12pub struct LocalSource {
13 pub(crate) force_overwrite: bool,
14 pub(crate) concurrency: usize,
15 pub(crate) client: Client,
16}
17
18impl LocalSource {
19 pub fn new(force_overwrite: bool, concurrency: usize) -> Self {
20 let client = Client::default();
21 Self {
22 force_overwrite,
23 concurrency,
24 client,
25 }
26 }
27
28 pub fn client(&self) -> &Client {
29 &self.client
30 }
31
32 pub async fn to_gcs(
36 &self,
37 path_src: impl AsRef<Path>,
38 bucket_dst: &str,
39 path_dst: &str,
40 ) -> Result<usize, Error> {
41 let path_buf = PathBuf::from(path_src.as_ref());
42 if path_buf.is_dir() {
43 self.sync_local_dir_to_gcs(
44 path_src.to_str_wrap()?.to_owned(),
45 bucket_dst.to_owned(),
46 path_dst.to_owned(),
47 )
48 .await
49 } else {
50 let filename = path_buf.file_name().ok_or(Error::Other {
51 message: "path_src is not a file, should never happen, please report an issue",
52 })?;
53 let path_dst = PathBuf::from(path_dst).join(filename);
54 let gcs_path_dst = path_dst.to_str_wrap()?;
55 self.sync_local_file_to_gcs(path_src, bucket_dst, gcs_path_dst)
56 .await
57 }
58 }
59
60 fn sync_local_dir_to_gcs(
64 &self,
65 path_src: String,
67 bucket: String,
68 path_dst: String,
69 ) -> BoxFuture<Result<usize>> {
70 async move {
71 let entries = fs::read_dir(&path_src).await.context(TokioIo {
73 path: path_src.clone(),
74 })?;
75 let entries = tokio_stream::wrappers::ReadDirStream::new(entries);
77
78 let (entry_count, op_count) = entries
79 .context(Io { path: path_src })
80 .map_ok(|entry| (entry, bucket.clone(), path_dst.clone()))
81 .and_then(|(entry, bucket, path_dst)| async move {
82 let entry_path = entry.path();
83 let path_dst = PathBuf::from(&path_dst).join(entry.file_name());
84 let path_dst = path_dst.to_str_wrap()?.to_owned();
85 if entry_path.is_dir() {
86 self.sync_local_dir_to_gcs(
87 entry_path.to_str_wrap()?.to_owned(),
88 bucket.clone(),
89 path_dst.clone(),
90 )
91 .await
92 } else {
93 self.sync_local_file_to_gcs(&entry_path, &bucket, &path_dst)
94 .await
95 }
96 })
97 .try_fold(
98 (0usize, 0usize),
99 |(entry_count, op_count), entry_op_count| async move {
100 Ok((entry_count + 1, op_count + entry_op_count))
101 },
102 )
103 .await?;
104
105 if entry_count == 0 {
106 let dir_object = format!("{}/", path_dst);
108 match Object::read(&bucket, &dir_object).await {
109 Ok(_) => Ok(0),
110 Err(cloud_storage::Error::Google(response))
111 if response.errors_has_reason(&cloud_storage::Reason::NotFound) =>
112 {
113 log::trace!("Creating gs://{}{}", bucket, dir_object);
114 Object::create(&bucket, vec![], &dir_object, "")
115 .await
116 .context(CloudStorage {
117 object: dir_object,
118 op: OpSource::CreateObject,
119 })?;
120 Ok(1)
121 }
122 Err(e) => Err(e).context(CloudStorage {
123 object: dir_object,
124 op: OpSource::ReadObject,
125 }),
126 }
127 } else {
128 Ok(op_count)
129 }
130 }
132 .boxed()
133 }
134
135 async fn sync_local_file_to_gcs(
137 &self,
138 path_src: impl AsRef<Path>,
139 bucket: &str,
140 filename: &str,
141 ) -> Result<usize> {
142 if !self
143 .should_upload_local(path_src.as_ref(), bucket, filename)
144 .await?
145 {
146 log::trace!("Skip {:?}", path_src.as_ref());
147 Ok(0)
148 } else {
149 log::trace!(
150 "Copy {:?} to gs://{}/{}",
151 path_src.as_ref(),
152 bucket,
153 filename,
154 );
155 let file_src = File::open(path_src.as_ref()).await.context(Io {
156 path: path_src.as_ref(),
157 })?;
158 let metadata = file_src.metadata().await.context(Io {
159 path: path_src.as_ref(),
160 })?;
161 let length = metadata.len();
162 let stream = tokio_util::io::ReaderStream::new(file_src);
164 let mime_type =
166 mime_guess::from_path(path_src).first_or(mime::APPLICATION_OCTET_STREAM);
167 let mime_type_str = mime_type.essence_str();
168 Object::create_streamed(bucket, stream, length, filename, mime_type_str)
169 .await
170 .context(CloudStorage {
171 object: filename.to_owned(),
172 op: OpSource::CreateObject,
173 })?;
174 Ok(1)
175 }
176 }
177
178 async fn should_upload_local(
179 &self,
180 path_src: impl AsRef<Path>,
181 bucket: &str,
182 filename: &str,
183 ) -> Result<bool> {
184 if self.force_overwrite {
185 return Ok(true);
186 }
187
188 let src_len = path_src
189 .as_ref()
190 .metadata()
191 .context(Io {
192 path: path_src.as_ref(),
193 })?
194 .len();
195 if let Ok(object) = self.client.object().read(bucket, filename).await {
196 if object.size != src_len {
197 log::trace!("Size mismatch, src: {}, dst: {}", src_len, object.size);
198 Ok(true)
199 } else if file_crc32c(path_src.as_ref()).await.context(Io {
200 path: path_src.as_ref(),
201 })? != object.crc32c_decode()
202 {
203 log::trace!("Crc32c mismatch");
204 Ok(true)
205 } else {
206 Ok(false)
207 }
208 } else {
209 Ok(true)
211 }
212 }
213}
214
215pub(crate) trait ToStrWrap {
216 fn to_str_wrap(&self) -> Result<&str>;
217}
218
219impl<P: AsRef<Path>> ToStrWrap for P {
220 fn to_str_wrap(&self) -> Result<&str> {
221 self.as_ref().to_str().ok_or(Error::Other {
222 message: "Can't convert Path to &str, should never happen, please report an issue",
223 })
224 }
225}