1mod fs;
2mod gcs;
3
4use std::ops::Not;
5use std::path::{Path, PathBuf};
6
7use bytes::Bytes;
8use futures::future::Either;
9use futures::{Future, Stream, StreamExt, TryStreamExt};
10
11use fs::FsClient;
12use gcs::GcsClient;
13use globset::{Glob, GlobSet, GlobSetBuilder};
14
15use crate::oauth2::token::TokenGenerator;
16
17pub struct ReaderWriter {
18 inner: ReaderWriterInternal,
19}
20
21pub type Source = ReaderWriter;
22
23impl ReaderWriter {
24 fn new(inner: ReaderWriterInternal) -> Self {
25 Self { inner }
26 }
27
28 pub async fn gcs(
29 token_generator: Box<dyn TokenGenerator>,
30 bucket: &str,
31 prefix: &str,
32 ) -> RSyncResult<Self> {
33 let client = GcsClient::new(token_generator, bucket, prefix).await?;
34 Ok(Self::new(ReaderWriterInternal::Gcs(Box::new(client))))
35 }
36
37 pub fn gcs_no_auth(bucket: &str, prefix: &str) -> Self {
38 let client = GcsClient::no_auth(bucket, prefix);
39 Self::new(ReaderWriterInternal::Gcs(Box::new(client)))
40 }
41
42 pub fn fs(base_path: &Path) -> Self {
43 let client = FsClient::new(base_path);
44 Self::new(ReaderWriterInternal::Fs(Box::new(client)))
45 }
46}
47
48enum ReaderWriterInternal {
50 Gcs(Box<GcsClient>),
51 Fs(Box<FsClient>),
52}
53
54type Size = u64;
55
56impl ReaderWriterInternal {
57 async fn list(
58 &self,
59 ) -> Either<
60 impl Stream<Item = RSyncResult<RelativePath>> + '_,
61 impl Stream<Item = RSyncResult<RelativePath>> + '_,
62 > {
63 match self {
64 ReaderWriterInternal::Gcs(client) => Either::Left(client.list().await),
65 ReaderWriterInternal::Fs(client) => Either::Right(client.list().await),
66 }
67 }
68
69 async fn read(
70 &self,
71 path: &RelativePath,
72 ) -> Either<impl Stream<Item = RSyncResult<Bytes>>, impl Stream<Item = RSyncResult<Bytes>>>
73 {
74 match self {
75 ReaderWriterInternal::Gcs(client) => Either::Left(client.read(path).await),
76 ReaderWriterInternal::Fs(client) => Either::Right(client.read(path).await),
77 }
78 }
79
80 async fn get_crc32c(&self, path: &RelativePath) -> RSyncResult<Option<Entry>> {
81 match self {
82 ReaderWriterInternal::Gcs(client) => client.get_crc32c(path).await,
83 ReaderWriterInternal::Fs(client) => client.get_crc32c(path).await,
84 }
85 }
86
87 async fn write<S>(
88 &self,
89 mtime: Option<chrono::DateTime<chrono::Utc>>,
90 set_fs_mtime: bool,
91 path: &RelativePath,
92 stream: S,
93 ) -> RSyncResult<()>
94 where
95 S: futures::TryStream<Ok = bytes::Bytes, Error = RSyncError> + Send + Sync + 'static,
96 {
97 async {
98 match self {
99 ReaderWriterInternal::Gcs(client) => match mtime {
100 Some(mtime) => client.write_mtime(mtime, path, stream).await,
101 None => client.write(path, stream).await,
102 },
103 ReaderWriterInternal::Fs(client) => match (mtime, set_fs_mtime) {
104 (Some(mtime), true) => client.write_mtime(mtime, path, stream).await,
105 _ => client.write(path, stream).await,
106 },
107 }
108 }
109 .await
110 }
111
112 async fn delete(&self, path: &RelativePath) -> RSyncResult<()> {
113 match self {
114 ReaderWriterInternal::Gcs(client) => client.delete(path).await,
115 ReaderWriterInternal::Fs(client) => client.delete(path).await,
116 }
117 }
118
119 async fn exists(&self, path: &RelativePath) -> RSyncResult<bool> {
120 match self {
121 ReaderWriterInternal::Gcs(client) => client.exists(path).await,
122 ReaderWriterInternal::Fs(client) => client.exists(path).await,
123 }
124 }
125
126 async fn size_and_mt(
127 &self,
128 path: &RelativePath,
129 ) -> RSyncResult<(Option<chrono::DateTime<chrono::Utc>>, Option<Size>)> {
130 match self {
131 ReaderWriterInternal::Gcs(client) => client.size_and_mt(path).await,
132 ReaderWriterInternal::Fs(client) => client.size_and_mt(path).await,
133 }
134 }
135}
136
137pub struct RSync {
138 source: ReaderWriterInternal,
139 dest: ReaderWriterInternal,
140 restore_fs_mtime: bool,
141 includes: Option<GlobSet>,
142 excludes: Option<GlobSet>,
143}
144
145impl RSync {
146 pub fn new(source: ReaderWriter, dest: ReaderWriter) -> Self {
147 Self {
148 source: source.inner,
149 dest: dest.inner,
150 restore_fs_mtime: false,
151 includes: None,
152 excludes: None,
153 }
154 }
155
156 pub fn with_restore_fs_mtime(mut self, restore_fs_mtime: bool) -> Self {
157 self.restore_fs_mtime = restore_fs_mtime;
158 self
159 }
160
161 fn glob_set(globs: &[&str]) -> RSyncResult<Option<GlobSet>> {
162 fn glob_error(error: globset::Error) -> RSyncError {
163 RSyncError::GlobError(error.to_string())
164 }
165
166 if globs.is_empty() {
167 Ok(None)
168 } else {
169 let glob_set = {
170 let mut builer = GlobSetBuilder::new();
171 for glob in globs {
172 builer.add(Glob::new(glob).map_err(glob_error)?);
173 }
174 builer.build().map_err(glob_error)?
175 };
176 Ok(Some(glob_set))
177 }
178 }
179
180 pub fn with_includes(mut self, includes: &[&str]) -> RSyncResult<Self> {
181 self.includes = Self::glob_set(includes)?;
182 Ok(self)
183 }
184
185 pub fn with_excludes(mut self, excludes: &[&str]) -> RSyncResult<Self> {
186 self.excludes = Self::glob_set(excludes)?;
187 Ok(self)
188 }
189
190 async fn write_entry(
191 &self,
192 mtime: Option<chrono::DateTime<chrono::Utc>>,
193 path: &RelativePath,
194 ) -> RSyncResult<()> {
195 let source = self.source.read(path).await;
196 self.dest
197 .write(mtime, self.restore_fs_mtime, path, source)
198 .await?;
199 Ok(())
200 }
201
202 async fn sync_entry_crc32c(&self, path: &RelativePath) -> RSyncResult<RSyncStatus> {
203 Ok(match self.dest.get_crc32c(path).await? {
204 None => {
205 self.write_entry(None, path).await?;
206 RSyncStatus::updated("no dest crc32c", path)
207 }
208 Some(crc32c_dest) => {
209 let crc32c_source = self.source.get_crc32c(path).await?;
210 if Some(crc32c_dest) == crc32c_source {
211 RSyncStatus::already_synced("same crc32c", path)
212 } else {
213 self.write_entry(None, path).await?;
214 RSyncStatus::updated("different crc32c", path)
215 }
216 }
217 })
218 }
219
220 async fn sync_entry(&self, path: &RelativePath) -> RSyncResult<RSyncStatus> {
221 Ok(match self.dest.size_and_mt(path).await? {
222 (Some(dest_dt), Some(dest_size)) => match self.source.size_and_mt(path).await? {
223 (Some(source_dt), Some(source_size)) => {
224 let dest_ts = dest_dt.timestamp();
225 let source_ts = source_dt.timestamp();
226 if dest_ts == source_ts && dest_size == source_size {
227 RSyncStatus::already_synced("same mtime and size", path)
228 } else {
229 self.write_entry(Some(source_dt), path).await?;
230 RSyncStatus::updated("different size or mtime", path)
231 }
232 }
233 _ => self.sync_entry_crc32c(path).await?,
234 },
235 (None, None) => {
236 let (mtime, _) = self.source.size_and_mt(path).await?;
237 self.write_entry(mtime, path).await?;
238 RSyncStatus::Created(path.to_owned())
239 }
240 _ => self.sync_entry_crc32c(path).await?,
241 })
242 }
243
244 fn filter(&self, relative_path: &RelativePath) -> bool {
245 let i = self
246 .includes
247 .as_ref()
248 .map(|includes| {
249 includes
250 .matches(relative_path.path.as_str())
251 .is_empty()
252 .not()
253 })
254 .unwrap_or(true);
255 let x = self
256 .excludes
257 .as_ref()
258 .map(|excludes| excludes.matches(relative_path.path.as_str()).is_empty())
259 .unwrap_or(true);
260 i && x
261 }
262
263 pub async fn sync(
310 &self,
311 ) -> impl Stream<Item = RSyncResult<impl Future<Output = RSyncResult<RSyncStatus>> + '_>> + '_
312 {
313 self.source
314 .list()
315 .await
316 .try_filter(|x| futures::future::ready(self.filter(x)))
317 .map_ok(move |path| async move { self.sync_entry(&path).await })
318 }
319
320 async fn delete_extras(
321 &self,
322 ) -> impl Stream<Item = RSyncResult<impl Future<Output = RSyncResult<RMirrorStatus>> + '_>> + '_
323 {
324 self.dest.list().await.map(move |result| {
325 result.map(|path| async move {
326 if self.source.exists(&path).await?.not() || self.filter(&path).not() {
327 self.dest.delete(&path).await?;
328 Ok(RMirrorStatus::Deleted(path))
329 } else {
330 Ok(RMirrorStatus::NotDeleted(path))
331 }
332 })
333 })
334 }
335
336 pub async fn mirror(
383 &self,
384 ) -> impl Stream<Item = RSyncResult<impl Future<Output = RSyncResult<RMirrorStatus>> + '_>> + '_
385 {
386 let synced = self
387 .sync()
388 .await
389 .map_ok(|fut| async { fut.await.map(RMirrorStatus::Synced) })
390 .map_ok(futures::future::Either::Left);
391
392 let deleted = self
393 .delete_extras()
394 .await
395 .map_ok(futures::future::Either::Right);
396
397 synced.chain(deleted)
398 }
399}
400
401#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)]
402pub struct RelativePath {
403 path: String,
404}
405
406impl std::fmt::Debug for RelativePath {
407 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
408 write!(f, "{}", self.path)
409 }
410}
411
412impl RelativePath {
413 pub fn new(path: &str) -> RSyncResult<Self> {
414 let path = path.replace('\\', "/").to_owned();
415 if path.is_empty() || path == "/" {
416 Err(RSyncError::EmptyRelativePathError)
417 } else {
418 Ok(Self { path })
419 }
420 }
421}
422#[derive(Debug, PartialEq, Clone)]
423struct Entry {
424 path: RelativePath,
425 crc32c: u32,
426}
427
428impl Entry {
429 pub(self) fn new(path: &RelativePath, crc32c: u32) -> Self {
430 Self {
431 path: path.to_owned(),
432 crc32c,
433 }
434 }
435}
436
437#[derive(Debug)]
438pub enum RSyncError {
439 MissingFieldsInGcsResponse(String),
440 StorageError(super::storage::Error),
441 FsIoError {
442 path: PathBuf,
443 message: String,
444 error: std::io::Error,
445 },
446 EmptyRelativePathError,
447 GlobError(String),
448}
449
450impl RSyncError {
451 fn fs_io_error<T, U>(message: U, path: T, error: std::io::Error) -> RSyncError
452 where
453 T: AsRef<Path>,
454 U: AsRef<str>,
455 {
456 RSyncError::FsIoError {
457 path: path.as_ref().to_path_buf(),
458 message: message.as_ref().to_owned(),
459 error,
460 }
461 }
462}
463
464impl std::fmt::Display for RSyncError {
465 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
466 write!(f, "{:?}", self)
467 }
468}
469
470impl std::error::Error for RSyncError {}
471
472#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
473pub enum RSyncStatus {
474 Created(RelativePath),
475 Updated { reason: String, path: RelativePath },
476 AlreadySynced { reason: String, path: RelativePath },
477}
478
479impl RSyncStatus {
480 fn updated(reason: &str, path: &RelativePath) -> Self {
481 let reason = reason.to_owned();
482 let path = path.to_owned();
483 Self::Updated { reason, path }
484 }
485
486 fn already_synced(reason: &str, path: &RelativePath) -> Self {
487 let reason = reason.to_owned();
488 let path = path.to_owned();
489 Self::AlreadySynced { reason, path }
490 }
491}
492
493#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
494pub enum RMirrorStatus {
495 Synced(RSyncStatus),
496 Deleted(RelativePath),
497 NotDeleted(RelativePath),
498}
499
500pub type RSyncResult<T> = Result<T, RSyncError>;
501
502#[cfg(test)]
503mod tests {
504 use crate::{gcp::sync::RelativePath, sync::RSyncError};
505
506 #[test]
507 fn test_relative_path() {
508 fn is_empty_err(x: RSyncError) -> bool {
509 matches!(x, RSyncError::EmptyRelativePathError)
510 }
511 assert!(
512 is_empty_err(RelativePath::new("").unwrap_err()),
513 "empty path is not allowed"
514 );
515 assert!(is_empty_err(RelativePath::new("/").unwrap_err()));
516 assert_eq!(
517 "/hello/world",
518 RelativePath::new("/hello/world").unwrap().path
519 );
520 assert_eq!(
521 "hello/world",
522 RelativePath::new("hello/world").unwrap().path
523 );
524 }
525}