gcs_rsync/gcp/sync/
mod.rs

1mod fs;
2mod gcs;
3
4use std::ops::Not;
5use std::path::{Path, PathBuf};
6
7use bytes::Bytes;
8use futures::future::Either;
9use futures::{Future, Stream, StreamExt, TryStreamExt};
10
11use fs::FsClient;
12use gcs::GcsClient;
13use globset::{Glob, GlobSet, GlobSetBuilder};
14
15use crate::oauth2::token::TokenGenerator;
16
17pub struct ReaderWriter {
18    inner: ReaderWriterInternal,
19}
20
21pub type Source = ReaderWriter;
22
23impl ReaderWriter {
24    fn new(inner: ReaderWriterInternal) -> Self {
25        Self { inner }
26    }
27
28    pub async fn gcs(
29        token_generator: Box<dyn TokenGenerator>,
30        bucket: &str,
31        prefix: &str,
32    ) -> RSyncResult<Self> {
33        let client = GcsClient::new(token_generator, bucket, prefix).await?;
34        Ok(Self::new(ReaderWriterInternal::Gcs(Box::new(client))))
35    }
36
37    pub fn gcs_no_auth(bucket: &str, prefix: &str) -> Self {
38        let client = GcsClient::no_auth(bucket, prefix);
39        Self::new(ReaderWriterInternal::Gcs(Box::new(client)))
40    }
41
42    pub fn fs(base_path: &Path) -> Self {
43        let client = FsClient::new(base_path);
44        Self::new(ReaderWriterInternal::Fs(Box::new(client)))
45    }
46}
47
48//TODO: replace this with trait when async trait will be more stable with method returning Trait
49enum ReaderWriterInternal {
50    Gcs(Box<GcsClient>),
51    Fs(Box<FsClient>),
52}
53
54type Size = u64;
55
56impl ReaderWriterInternal {
57    async fn is_valid(&self) -> RSyncResult<()> {
58        match self {
59            ReaderWriterInternal::Fs(client) => client.is_valid().await,
60            ReaderWriterInternal::Gcs(gcs_client) => gcs_client.is_valid().await,
61        }
62    }
63
64    async fn list(
65        &self,
66    ) -> Either<
67        impl Stream<Item = RSyncResult<RelativePath>> + '_,
68        impl Stream<Item = RSyncResult<RelativePath>> + '_,
69    > {
70        match self {
71            ReaderWriterInternal::Gcs(client) => Either::Left(client.list().await),
72            ReaderWriterInternal::Fs(client) => Either::Right(client.list().await),
73        }
74    }
75
76    async fn read(
77        &self,
78        path: &RelativePath,
79    ) -> Either<impl Stream<Item = RSyncResult<Bytes>>, impl Stream<Item = RSyncResult<Bytes>>>
80    {
81        match self {
82            ReaderWriterInternal::Gcs(client) => Either::Left(client.read(path).await),
83            ReaderWriterInternal::Fs(client) => Either::Right(client.read(path).await),
84        }
85    }
86
87    async fn get_crc32c(&self, path: &RelativePath) -> RSyncResult<Option<Entry>> {
88        match self {
89            ReaderWriterInternal::Gcs(client) => client.get_crc32c(path).await,
90            ReaderWriterInternal::Fs(client) => client.get_crc32c(path).await,
91        }
92    }
93
94    async fn write<S>(
95        &self,
96        mtime: Option<chrono::DateTime<chrono::Utc>>,
97        set_fs_mtime: bool,
98        path: &RelativePath,
99        stream: S,
100    ) -> RSyncResult<()>
101    where
102        S: futures::TryStream<Ok = bytes::Bytes, Error = RSyncError> + Send + Sync + 'static,
103    {
104        async {
105            match self {
106                ReaderWriterInternal::Gcs(client) => match mtime {
107                    Some(mtime) => client.write_mtime(mtime, path, stream).await,
108                    None => client.write(path, stream).await,
109                },
110                ReaderWriterInternal::Fs(client) => match (mtime, set_fs_mtime) {
111                    (Some(mtime), true) => client.write_mtime(mtime, path, stream).await,
112                    _ => client.write(path, stream).await,
113                },
114            }
115        }
116        .await
117    }
118
119    async fn delete(&self, path: &RelativePath) -> RSyncResult<()> {
120        match self {
121            ReaderWriterInternal::Gcs(client) => client.delete(path).await,
122            ReaderWriterInternal::Fs(client) => client.delete(path).await,
123        }
124    }
125
126    async fn exists(&self, path: &RelativePath) -> RSyncResult<bool> {
127        match self {
128            ReaderWriterInternal::Gcs(client) => client.exists(path).await,
129            ReaderWriterInternal::Fs(client) => client.exists(path).await,
130        }
131    }
132
133    async fn size_and_mt(
134        &self,
135        path: &RelativePath,
136    ) -> RSyncResult<(Option<chrono::DateTime<chrono::Utc>>, Option<Size>)> {
137        match self {
138            ReaderWriterInternal::Gcs(client) => client.size_and_mt(path).await,
139            ReaderWriterInternal::Fs(client) => client.size_and_mt(path).await,
140        }
141    }
142}
143
144pub struct RSync {
145    source: ReaderWriterInternal,
146    dest: ReaderWriterInternal,
147    restore_fs_mtime: bool,
148    includes: Option<GlobSet>,
149    excludes: Option<GlobSet>,
150}
151
152impl RSync {
153    pub fn new(source: ReaderWriter, dest: ReaderWriter) -> Self {
154        Self {
155            source: source.inner,
156            dest: dest.inner,
157            restore_fs_mtime: false,
158            includes: None,
159            excludes: None,
160        }
161    }
162
163    pub fn with_restore_fs_mtime(mut self, restore_fs_mtime: bool) -> Self {
164        self.restore_fs_mtime = restore_fs_mtime;
165        self
166    }
167
168    fn glob_set(globs: &[&str]) -> RSyncResult<Option<GlobSet>> {
169        fn glob_error(error: globset::Error) -> RSyncError {
170            RSyncError::GlobError(error.to_string())
171        }
172
173        if globs.is_empty() {
174            Ok(None)
175        } else {
176            let glob_set = {
177                let mut builer = GlobSetBuilder::new();
178                for glob in globs {
179                    builer.add(Glob::new(glob).map_err(glob_error)?);
180                }
181                builer.build().map_err(glob_error)?
182            };
183            Ok(Some(glob_set))
184        }
185    }
186
187    pub fn with_includes(mut self, includes: &[&str]) -> RSyncResult<Self> {
188        self.includes = Self::glob_set(includes)?;
189        Ok(self)
190    }
191
192    pub fn with_excludes(mut self, excludes: &[&str]) -> RSyncResult<Self> {
193        self.excludes = Self::glob_set(excludes)?;
194        Ok(self)
195    }
196
197    async fn write_entry(
198        &self,
199        mtime: Option<chrono::DateTime<chrono::Utc>>,
200        path: &RelativePath,
201    ) -> RSyncResult<()> {
202        let source = self.source.read(path).await;
203        self.dest
204            .write(mtime, self.restore_fs_mtime, path, source)
205            .await?;
206        Ok(())
207    }
208
209    async fn sync_entry_crc32c(&self, path: &RelativePath) -> RSyncResult<RSyncStatus> {
210        Ok(match self.dest.get_crc32c(path).await? {
211            None => {
212                self.write_entry(None, path).await?;
213                RSyncStatus::updated("no dest crc32c", path)
214            }
215            Some(crc32c_dest) => {
216                let crc32c_source = self.source.get_crc32c(path).await?;
217                if Some(crc32c_dest) == crc32c_source {
218                    RSyncStatus::already_synced("same crc32c", path)
219                } else {
220                    self.write_entry(None, path).await?;
221                    RSyncStatus::updated("different crc32c", path)
222                }
223            }
224        })
225    }
226
227    async fn sync_entry(&self, path: &RelativePath) -> RSyncResult<RSyncStatus> {
228        Ok(match self.dest.size_and_mt(path).await? {
229            (Some(dest_dt), Some(dest_size)) => match self.source.size_and_mt(path).await? {
230                (Some(source_dt), Some(source_size)) => {
231                    let dest_ts = dest_dt.timestamp();
232                    let source_ts = source_dt.timestamp();
233                    if dest_ts == source_ts && dest_size == source_size {
234                        RSyncStatus::already_synced("same mtime and size", path)
235                    } else {
236                        self.write_entry(Some(source_dt), path).await?;
237                        RSyncStatus::updated("different size or mtime", path)
238                    }
239                }
240                _ => self.sync_entry_crc32c(path).await?,
241            },
242            (None, None) => {
243                let (mtime, _) = self.source.size_and_mt(path).await?;
244                self.write_entry(mtime, path).await?;
245                RSyncStatus::Created(path.to_owned())
246            }
247            _ => self.sync_entry_crc32c(path).await?,
248        })
249    }
250
251    fn filter(&self, relative_path: &RelativePath) -> bool {
252        let i = self
253            .includes
254            .as_ref()
255            .map(|includes| {
256                includes
257                    .matches(relative_path.path.as_str())
258                    .is_empty()
259                    .not()
260            })
261            .unwrap_or(true);
262        let x = self
263            .excludes
264            .as_ref()
265            .map(|excludes| excludes.matches(relative_path.path.as_str()).is_empty())
266            .unwrap_or(true);
267        i && x
268    }
269
270    /// Sync synchronize source to destination by comparing crc32c if destination already exists
271    ///
272    /// Example
273    /// ```rust
274    /// use std::{path::PathBuf, str::FromStr};
275    ///
276    /// use futures::{StreamExt, TryStreamExt};
277    /// use gcs_rsync::{
278    ///     storage::credentials::authorizeduser,
279    ///     sync::{RSync, RSyncResult, ReaderWriter},
280    /// };
281    ///
282    /// #[tokio::main]
283    /// async fn main() -> RSyncResult<()> {
284    ///     let token_generator = Box::new(authorizeduser::default().await.unwrap());
285    ///
286    ///     let home_dir = ".";
287    ///     let test_prefix = "bucket_prefix_to_sync";
288    ///     let bucket = "bucket_name";
289    ///
290    ///     let source = ReaderWriter::gcs(token_generator, bucket, test_prefix)
291    ///         .await
292    ///         .unwrap();
293    ///
294    ///     let dest_folder = {
295    ///         let mut p = PathBuf::from_str(home_dir).unwrap();
296    ///         p.push(test_prefix);
297    ///         p
298    ///     };
299    ///     let dest = ReaderWriter::fs(dest_folder.as_path());
300    ///
301    ///     let rsync = RSync::new(source, dest);
302    ///
303    ///     rsync
304    ///         .sync()
305    ///         .await
306    ///         .try_buffer_unordered(12)
307    ///         .for_each(|x| {
308    ///             println!("{:?}", x);
309    ///             futures::future::ready(())
310    ///         })
311    ///         .await;
312    ///
313    ///     Ok(())
314    /// }
315    /// ```
316    pub async fn sync(
317        &self,
318    ) -> impl Stream<Item = RSyncResult<impl Future<Output = RSyncResult<RSyncStatus>> + '_>> + '_
319    {
320        self.source
321            .list()
322            .await
323            .try_filter(|x| futures::future::ready(self.filter(x)))
324            .map_ok(move |path| async move { self.sync_entry(&path).await })
325    }
326
327    async fn delete_extras(
328        &self,
329    ) -> impl Stream<Item = RSyncResult<impl Future<Output = RSyncResult<RMirrorStatus>> + '_>> + '_
330    {
331        self.dest.list().await.map(move |result| {
332            result.map(|path| async move {
333                if self.source.exists(&path).await?.not() || self.filter(&path).not() {
334                    self.dest.delete(&path).await?;
335                    Ok(RMirrorStatus::Deleted(path))
336                } else {
337                    Ok(RMirrorStatus::NotDeleted(path))
338                }
339            })
340        })
341    }
342
343    /// Mirror synchronize source to destination by deleting extras (destination)
344    ///
345    /// Example
346    /// ```rust
347    /// use std::{path::PathBuf, str::FromStr};
348    ///
349    /// use futures::{StreamExt, TryStreamExt};
350    /// use gcs_rsync::{
351    ///     storage::credentials::authorizeduser,
352    ///     sync::{RSync, RSyncResult, ReaderWriter},
353    /// };
354    ///
355    /// #[tokio::main]
356    /// async fn main() -> RSyncResult<()> {
357    ///     let token_generator = Box::new(authorizeduser::default().await.unwrap());
358    ///
359    ///     let home_dir = ".";
360    ///     let test_prefix = "bucket_prefix_to_sync";
361    ///     let bucket = "bucket_name";
362    ///
363    ///     let source = ReaderWriter::gcs(token_generator, bucket, test_prefix)
364    ///         .await
365    ///         .unwrap();
366    ///
367    ///     let dest_folder = {
368    ///         let mut p = PathBuf::from_str(home_dir).unwrap();
369    ///         p.push(test_prefix);
370    ///         p
371    ///     };
372    ///     let dest = ReaderWriter::fs(dest_folder.as_path());
373    ///
374    ///     let rsync = RSync::new(source, dest);
375    ///
376    ///     match rsync.mirror().await {
377    ///         Ok(mirror_result) => {
378    ///             mirror_result
379    ///                 .try_buffer_unordered(12)
380    ///                 .for_each(|x| {
381    ///                     println!("{:?}", x);
382    ///                     futures::future::ready(())
383    ///                 })
384    ///                 .await;
385    ///         }
386    ///         Err(e) => {
387    ///             println!("cannot mirror due to error {:?}", e);
388    ///         },
389    ///     };
390    ///     Ok(())
391    /// }
392    /// ```
393    pub async fn mirror(
394        &self,
395    ) -> RSyncResult<
396        impl Stream<Item = RSyncResult<impl Future<Output = RSyncResult<RMirrorStatus>> + '_>> + '_,
397    > {
398        self.source.is_valid().await?;
399
400        let synced = self
401            .sync()
402            .await
403            .map_ok(|fut| async { fut.await.map(RMirrorStatus::Synced) })
404            .map_ok(futures::future::Either::Left);
405
406        let deleted = self
407            .delete_extras()
408            .await
409            .map_ok(futures::future::Either::Right);
410
411        Ok(synced.chain(deleted))
412    }
413}
414
415#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)]
416pub struct RelativePath {
417    path: String,
418}
419
420impl std::fmt::Debug for RelativePath {
421    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
422        write!(f, "{}", self.path)
423    }
424}
425
426impl RelativePath {
427    pub fn new(path: &str) -> RSyncResult<Self> {
428        let path = path.replace('\\', "/").to_owned();
429        if path.is_empty() || path == "/" {
430            Err(RSyncError::EmptyRelativePathError)
431        } else {
432            Ok(Self { path })
433        }
434    }
435}
436#[derive(Debug, PartialEq, Clone)]
437struct Entry {
438    path: RelativePath,
439    crc32c: u32,
440}
441
442impl Entry {
443    pub(self) fn new(path: &RelativePath, crc32c: u32) -> Self {
444        Self {
445            path: path.to_owned(),
446            crc32c,
447        }
448    }
449}
450
451#[derive(Debug)]
452pub enum RSyncError {
453    MissingFieldsInGcsResponse(String),
454    StorageError(super::storage::Error),
455    FsIoError {
456        path: PathBuf,
457        message: String,
458        error: std::io::Error,
459    },
460    EmptyRelativePathError,
461    GlobError(String),
462    InvalidRsyncSource(String),
463}
464
465impl RSyncError {
466    fn fs_io_error<T, U>(message: U, path: T, error: std::io::Error) -> RSyncError
467    where
468        T: AsRef<Path>,
469        U: AsRef<str>,
470    {
471        RSyncError::FsIoError {
472            path: path.as_ref().to_path_buf(),
473            message: message.as_ref().to_owned(),
474            error,
475        }
476    }
477}
478
479impl std::fmt::Display for RSyncError {
480    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
481        write!(f, "{:?}", self)
482    }
483}
484
485impl std::error::Error for RSyncError {}
486
487#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
488pub enum RSyncStatus {
489    Created(RelativePath),
490    Updated { reason: String, path: RelativePath },
491    AlreadySynced { reason: String, path: RelativePath },
492}
493
494impl RSyncStatus {
495    fn updated(reason: &str, path: &RelativePath) -> Self {
496        let reason = reason.to_owned();
497        let path = path.to_owned();
498        Self::Updated { reason, path }
499    }
500
501    fn already_synced(reason: &str, path: &RelativePath) -> Self {
502        let reason = reason.to_owned();
503        let path = path.to_owned();
504        Self::AlreadySynced { reason, path }
505    }
506}
507
508#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
509pub enum RMirrorStatus {
510    Synced(RSyncStatus),
511    Deleted(RelativePath),
512    NotDeleted(RelativePath),
513}
514
515pub type RSyncResult<T> = Result<T, RSyncError>;
516
517#[cfg(test)]
518mod tests {
519    use crate::{gcp::sync::RelativePath, sync::RSyncError};
520
521    #[test]
522    fn test_relative_path() {
523        fn is_empty_err(x: RSyncError) -> bool {
524            matches!(x, RSyncError::EmptyRelativePathError)
525        }
526        assert!(
527            is_empty_err(RelativePath::new("").unwrap_err()),
528            "empty path is not allowed"
529        );
530        assert!(is_empty_err(RelativePath::new("/").unwrap_err()));
531        assert_eq!(
532            "/hello/world",
533            RelativePath::new("/hello/world").unwrap().path
534        );
535        assert_eq!(
536            "hello/world",
537            RelativePath::new("hello/world").unwrap().path
538        );
539    }
540}