gcs_rsync/gcp/sync/
mod.rs

1mod fs;
2mod gcs;
3
4use std::ops::Not;
5use std::path::{Path, PathBuf};
6
7use bytes::Bytes;
8use futures::future::Either;
9use futures::{Future, Stream, StreamExt, TryStreamExt};
10
11use fs::FsClient;
12use gcs::GcsClient;
13use globset::{Glob, GlobSet, GlobSetBuilder};
14
15use crate::oauth2::token::TokenGenerator;
16
17pub struct ReaderWriter {
18    inner: ReaderWriterInternal,
19}
20
21pub type Source = ReaderWriter;
22
23impl ReaderWriter {
24    fn new(inner: ReaderWriterInternal) -> Self {
25        Self { inner }
26    }
27
28    pub async fn gcs(
29        token_generator: Box<dyn TokenGenerator>,
30        bucket: &str,
31        prefix: &str,
32    ) -> RSyncResult<Self> {
33        let client = GcsClient::new(token_generator, bucket, prefix).await?;
34        Ok(Self::new(ReaderWriterInternal::Gcs(Box::new(client))))
35    }
36
37    pub fn gcs_no_auth(bucket: &str, prefix: &str) -> Self {
38        let client = GcsClient::no_auth(bucket, prefix);
39        Self::new(ReaderWriterInternal::Gcs(Box::new(client)))
40    }
41
42    pub fn fs(base_path: &Path) -> Self {
43        let client = FsClient::new(base_path);
44        Self::new(ReaderWriterInternal::Fs(Box::new(client)))
45    }
46}
47
48//TODO: replace this with trait when async trait will be more stable with method returning Trait
49enum ReaderWriterInternal {
50    Gcs(Box<GcsClient>),
51    Fs(Box<FsClient>),
52}
53
54type Size = u64;
55
56impl ReaderWriterInternal {
57    async fn list(
58        &self,
59    ) -> Either<
60        impl Stream<Item = RSyncResult<RelativePath>> + '_,
61        impl Stream<Item = RSyncResult<RelativePath>> + '_,
62    > {
63        match self {
64            ReaderWriterInternal::Gcs(client) => Either::Left(client.list().await),
65            ReaderWriterInternal::Fs(client) => Either::Right(client.list().await),
66        }
67    }
68
69    async fn read(
70        &self,
71        path: &RelativePath,
72    ) -> Either<impl Stream<Item = RSyncResult<Bytes>>, impl Stream<Item = RSyncResult<Bytes>>>
73    {
74        match self {
75            ReaderWriterInternal::Gcs(client) => Either::Left(client.read(path).await),
76            ReaderWriterInternal::Fs(client) => Either::Right(client.read(path).await),
77        }
78    }
79
80    async fn get_crc32c(&self, path: &RelativePath) -> RSyncResult<Option<Entry>> {
81        match self {
82            ReaderWriterInternal::Gcs(client) => client.get_crc32c(path).await,
83            ReaderWriterInternal::Fs(client) => client.get_crc32c(path).await,
84        }
85    }
86
87    async fn write<S>(
88        &self,
89        mtime: Option<chrono::DateTime<chrono::Utc>>,
90        set_fs_mtime: bool,
91        path: &RelativePath,
92        stream: S,
93    ) -> RSyncResult<()>
94    where
95        S: futures::TryStream<Ok = bytes::Bytes, Error = RSyncError> + Send + Sync + 'static,
96    {
97        async {
98            match self {
99                ReaderWriterInternal::Gcs(client) => match mtime {
100                    Some(mtime) => client.write_mtime(mtime, path, stream).await,
101                    None => client.write(path, stream).await,
102                },
103                ReaderWriterInternal::Fs(client) => match (mtime, set_fs_mtime) {
104                    (Some(mtime), true) => client.write_mtime(mtime, path, stream).await,
105                    _ => client.write(path, stream).await,
106                },
107            }
108        }
109        .await
110    }
111
112    async fn delete(&self, path: &RelativePath) -> RSyncResult<()> {
113        match self {
114            ReaderWriterInternal::Gcs(client) => client.delete(path).await,
115            ReaderWriterInternal::Fs(client) => client.delete(path).await,
116        }
117    }
118
119    async fn exists(&self, path: &RelativePath) -> RSyncResult<bool> {
120        match self {
121            ReaderWriterInternal::Gcs(client) => client.exists(path).await,
122            ReaderWriterInternal::Fs(client) => client.exists(path).await,
123        }
124    }
125
126    async fn size_and_mt(
127        &self,
128        path: &RelativePath,
129    ) -> RSyncResult<(Option<chrono::DateTime<chrono::Utc>>, Option<Size>)> {
130        match self {
131            ReaderWriterInternal::Gcs(client) => client.size_and_mt(path).await,
132            ReaderWriterInternal::Fs(client) => client.size_and_mt(path).await,
133        }
134    }
135}
136
137pub struct RSync {
138    source: ReaderWriterInternal,
139    dest: ReaderWriterInternal,
140    restore_fs_mtime: bool,
141    includes: Option<GlobSet>,
142    excludes: Option<GlobSet>,
143}
144
145impl RSync {
146    pub fn new(source: ReaderWriter, dest: ReaderWriter) -> Self {
147        Self {
148            source: source.inner,
149            dest: dest.inner,
150            restore_fs_mtime: false,
151            includes: None,
152            excludes: None,
153        }
154    }
155
156    pub fn with_restore_fs_mtime(mut self, restore_fs_mtime: bool) -> Self {
157        self.restore_fs_mtime = restore_fs_mtime;
158        self
159    }
160
161    fn glob_set(globs: &[&str]) -> RSyncResult<Option<GlobSet>> {
162        fn glob_error(error: globset::Error) -> RSyncError {
163            RSyncError::GlobError(error.to_string())
164        }
165
166        if globs.is_empty() {
167            Ok(None)
168        } else {
169            let glob_set = {
170                let mut builer = GlobSetBuilder::new();
171                for glob in globs {
172                    builer.add(Glob::new(glob).map_err(glob_error)?);
173                }
174                builer.build().map_err(glob_error)?
175            };
176            Ok(Some(glob_set))
177        }
178    }
179
180    pub fn with_includes(mut self, includes: &[&str]) -> RSyncResult<Self> {
181        self.includes = Self::glob_set(includes)?;
182        Ok(self)
183    }
184
185    pub fn with_excludes(mut self, excludes: &[&str]) -> RSyncResult<Self> {
186        self.excludes = Self::glob_set(excludes)?;
187        Ok(self)
188    }
189
190    async fn write_entry(
191        &self,
192        mtime: Option<chrono::DateTime<chrono::Utc>>,
193        path: &RelativePath,
194    ) -> RSyncResult<()> {
195        let source = self.source.read(path).await;
196        self.dest
197            .write(mtime, self.restore_fs_mtime, path, source)
198            .await?;
199        Ok(())
200    }
201
202    async fn sync_entry_crc32c(&self, path: &RelativePath) -> RSyncResult<RSyncStatus> {
203        Ok(match self.dest.get_crc32c(path).await? {
204            None => {
205                self.write_entry(None, path).await?;
206                RSyncStatus::updated("no dest crc32c", path)
207            }
208            Some(crc32c_dest) => {
209                let crc32c_source = self.source.get_crc32c(path).await?;
210                if Some(crc32c_dest) == crc32c_source {
211                    RSyncStatus::already_synced("same crc32c", path)
212                } else {
213                    self.write_entry(None, path).await?;
214                    RSyncStatus::updated("different crc32c", path)
215                }
216            }
217        })
218    }
219
220    async fn sync_entry(&self, path: &RelativePath) -> RSyncResult<RSyncStatus> {
221        Ok(match self.dest.size_and_mt(path).await? {
222            (Some(dest_dt), Some(dest_size)) => match self.source.size_and_mt(path).await? {
223                (Some(source_dt), Some(source_size)) => {
224                    let dest_ts = dest_dt.timestamp();
225                    let source_ts = source_dt.timestamp();
226                    if dest_ts == source_ts && dest_size == source_size {
227                        RSyncStatus::already_synced("same mtime and size", path)
228                    } else {
229                        self.write_entry(Some(source_dt), path).await?;
230                        RSyncStatus::updated("different size or mtime", path)
231                    }
232                }
233                _ => self.sync_entry_crc32c(path).await?,
234            },
235            (None, None) => {
236                let (mtime, _) = self.source.size_and_mt(path).await?;
237                self.write_entry(mtime, path).await?;
238                RSyncStatus::Created(path.to_owned())
239            }
240            _ => self.sync_entry_crc32c(path).await?,
241        })
242    }
243
244    fn filter(&self, relative_path: &RelativePath) -> bool {
245        let i = self
246            .includes
247            .as_ref()
248            .map(|includes| {
249                includes
250                    .matches(relative_path.path.as_str())
251                    .is_empty()
252                    .not()
253            })
254            .unwrap_or(true);
255        let x = self
256            .excludes
257            .as_ref()
258            .map(|excludes| excludes.matches(relative_path.path.as_str()).is_empty())
259            .unwrap_or(true);
260        i && x
261    }
262
263    /// Sync synchronize source to destination by comparing crc32c if destination already exists
264    ///
265    /// Example
266    /// ```rust
267    /// use std::{path::PathBuf, str::FromStr};
268    ///
269    /// use futures::{StreamExt, TryStreamExt};
270    /// use gcs_rsync::{
271    ///     storage::credentials::authorizeduser,
272    ///     sync::{RSync, RSyncResult, ReaderWriter},
273    /// };
274    ///
275    /// #[tokio::main]
276    /// async fn main() -> RSyncResult<()> {
277    ///     let token_generator = Box::new(authorizeduser::default().await.unwrap());
278    ///
279    ///     let home_dir = ".";
280    ///     let test_prefix = "bucket_prefix_to_sync";
281    ///     let bucket = "bucket_name";
282    ///
283    ///     let source = ReaderWriter::gcs(token_generator, bucket, test_prefix)
284    ///         .await
285    ///         .unwrap();
286    ///
287    ///     let dest_folder = {
288    ///         let mut p = PathBuf::from_str(home_dir).unwrap();
289    ///         p.push(test_prefix);
290    ///         p
291    ///     };
292    ///     let dest = ReaderWriter::fs(dest_folder.as_path());
293    ///
294    ///     let rsync = RSync::new(source, dest);
295    ///
296    ///     rsync
297    ///         .sync()
298    ///         .await
299    ///         .try_buffer_unordered(12)
300    ///         .for_each(|x| {
301    ///             println!("{:?}", x);
302    ///             futures::future::ready(())
303    ///         })
304    ///         .await;
305    ///
306    ///     Ok(())
307    /// }
308    /// ```
309    pub async fn sync(
310        &self,
311    ) -> impl Stream<Item = RSyncResult<impl Future<Output = RSyncResult<RSyncStatus>> + '_>> + '_
312    {
313        self.source
314            .list()
315            .await
316            .try_filter(|x| futures::future::ready(self.filter(x)))
317            .map_ok(move |path| async move { self.sync_entry(&path).await })
318    }
319
320    async fn delete_extras(
321        &self,
322    ) -> impl Stream<Item = RSyncResult<impl Future<Output = RSyncResult<RMirrorStatus>> + '_>> + '_
323    {
324        self.dest.list().await.map(move |result| {
325            result.map(|path| async move {
326                if self.source.exists(&path).await?.not() || self.filter(&path).not() {
327                    self.dest.delete(&path).await?;
328                    Ok(RMirrorStatus::Deleted(path))
329                } else {
330                    Ok(RMirrorStatus::NotDeleted(path))
331                }
332            })
333        })
334    }
335
336    /// Mirror synchronize source to destination by deleting extras (destination)
337    ///
338    /// Example
339    /// ```rust
340    /// use std::{path::PathBuf, str::FromStr};
341    ///
342    /// use futures::{StreamExt, TryStreamExt};
343    /// use gcs_rsync::{
344    ///     storage::credentials::authorizeduser,
345    ///     sync::{RSync, RSyncResult, ReaderWriter},
346    /// };
347    ///
348    /// #[tokio::main]
349    /// async fn main() -> RSyncResult<()> {
350    ///     let token_generator = Box::new(authorizeduser::default().await.unwrap());
351    ///
352    ///     let home_dir = ".";
353    ///     let test_prefix = "bucket_prefix_to_sync";
354    ///     let bucket = "bucket_name";
355    ///
356    ///     let source = ReaderWriter::gcs(token_generator, bucket, test_prefix)
357    ///         .await
358    ///         .unwrap();
359    ///
360    ///     let dest_folder = {
361    ///         let mut p = PathBuf::from_str(home_dir).unwrap();
362    ///         p.push(test_prefix);
363    ///         p
364    ///     };
365    ///     let dest = ReaderWriter::fs(dest_folder.as_path());
366    ///
367    ///     let rsync = RSync::new(source, dest);
368    ///
369    ///     rsync
370    ///         .mirror()
371    ///         .await
372    ///         .try_buffer_unordered(12)
373    ///         .for_each(|x| {
374    ///             println!("{:?}", x);
375    ///             futures::future::ready(())
376    ///         })
377    ///         .await;
378    ///
379    ///     Ok(())
380    /// }
381    /// ```
382    pub async fn mirror(
383        &self,
384    ) -> impl Stream<Item = RSyncResult<impl Future<Output = RSyncResult<RMirrorStatus>> + '_>> + '_
385    {
386        let synced = self
387            .sync()
388            .await
389            .map_ok(|fut| async { fut.await.map(RMirrorStatus::Synced) })
390            .map_ok(futures::future::Either::Left);
391
392        let deleted = self
393            .delete_extras()
394            .await
395            .map_ok(futures::future::Either::Right);
396
397        synced.chain(deleted)
398    }
399}
400
401#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)]
402pub struct RelativePath {
403    path: String,
404}
405
406impl std::fmt::Debug for RelativePath {
407    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
408        write!(f, "{}", self.path)
409    }
410}
411
412impl RelativePath {
413    pub fn new(path: &str) -> RSyncResult<Self> {
414        let path = path.replace('\\', "/").to_owned();
415        if path.is_empty() || path == "/" {
416            Err(RSyncError::EmptyRelativePathError)
417        } else {
418            Ok(Self { path })
419        }
420    }
421}
422#[derive(Debug, PartialEq, Clone)]
423struct Entry {
424    path: RelativePath,
425    crc32c: u32,
426}
427
428impl Entry {
429    pub(self) fn new(path: &RelativePath, crc32c: u32) -> Self {
430        Self {
431            path: path.to_owned(),
432            crc32c,
433        }
434    }
435}
436
437#[derive(Debug)]
438pub enum RSyncError {
439    MissingFieldsInGcsResponse(String),
440    StorageError(super::storage::Error),
441    FsIoError {
442        path: PathBuf,
443        message: String,
444        error: std::io::Error,
445    },
446    EmptyRelativePathError,
447    GlobError(String),
448}
449
450impl RSyncError {
451    fn fs_io_error<T, U>(message: U, path: T, error: std::io::Error) -> RSyncError
452    where
453        T: AsRef<Path>,
454        U: AsRef<str>,
455    {
456        RSyncError::FsIoError {
457            path: path.as_ref().to_path_buf(),
458            message: message.as_ref().to_owned(),
459            error,
460        }
461    }
462}
463
464impl std::fmt::Display for RSyncError {
465    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
466        write!(f, "{:?}", self)
467    }
468}
469
470impl std::error::Error for RSyncError {}
471
472#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
473pub enum RSyncStatus {
474    Created(RelativePath),
475    Updated { reason: String, path: RelativePath },
476    AlreadySynced { reason: String, path: RelativePath },
477}
478
479impl RSyncStatus {
480    fn updated(reason: &str, path: &RelativePath) -> Self {
481        let reason = reason.to_owned();
482        let path = path.to_owned();
483        Self::Updated { reason, path }
484    }
485
486    fn already_synced(reason: &str, path: &RelativePath) -> Self {
487        let reason = reason.to_owned();
488        let path = path.to_owned();
489        Self::AlreadySynced { reason, path }
490    }
491}
492
493#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
494pub enum RMirrorStatus {
495    Synced(RSyncStatus),
496    Deleted(RelativePath),
497    NotDeleted(RelativePath),
498}
499
500pub type RSyncResult<T> = Result<T, RSyncError>;
501
502#[cfg(test)]
503mod tests {
504    use crate::{gcp::sync::RelativePath, sync::RSyncError};
505
506    #[test]
507    fn test_relative_path() {
508        fn is_empty_err(x: RSyncError) -> bool {
509            matches!(x, RSyncError::EmptyRelativePathError)
510        }
511        assert!(
512            is_empty_err(RelativePath::new("").unwrap_err()),
513            "empty path is not allowed"
514        );
515        assert!(is_empty_err(RelativePath::new("/").unwrap_err()));
516        assert_eq!(
517            "/hello/world",
518            RelativePath::new("/hello/world").unwrap().path
519        );
520        assert_eq!(
521            "hello/world",
522            RelativePath::new("hello/world").unwrap().path
523        );
524    }
525}