1mod fs;
2mod gcs;
3
4use std::ops::Not;
5use std::path::{Path, PathBuf};
6
7use bytes::Bytes;
8use futures::future::Either;
9use futures::{Future, Stream, StreamExt, TryStreamExt};
10
11use fs::FsClient;
12use gcs::GcsClient;
13use globset::{Glob, GlobSet, GlobSetBuilder};
14
15use crate::oauth2::token::TokenGenerator;
16
17pub struct ReaderWriter {
18 inner: ReaderWriterInternal,
19}
20
21pub type Source = ReaderWriter;
22
23impl ReaderWriter {
24 fn new(inner: ReaderWriterInternal) -> Self {
25 Self { inner }
26 }
27
28 pub async fn gcs(
29 token_generator: Box<dyn TokenGenerator>,
30 bucket: &str,
31 prefix: &str,
32 ) -> RSyncResult<Self> {
33 let client = GcsClient::new(token_generator, bucket, prefix).await?;
34 Ok(Self::new(ReaderWriterInternal::Gcs(Box::new(client))))
35 }
36
37 pub fn gcs_no_auth(bucket: &str, prefix: &str) -> Self {
38 let client = GcsClient::no_auth(bucket, prefix);
39 Self::new(ReaderWriterInternal::Gcs(Box::new(client)))
40 }
41
42 pub fn fs(base_path: &Path) -> Self {
43 let client = FsClient::new(base_path);
44 Self::new(ReaderWriterInternal::Fs(Box::new(client)))
45 }
46}
47
48enum ReaderWriterInternal {
50 Gcs(Box<GcsClient>),
51 Fs(Box<FsClient>),
52}
53
54type Size = u64;
55
56impl ReaderWriterInternal {
57 async fn is_valid(&self) -> RSyncResult<()> {
58 match self {
59 ReaderWriterInternal::Fs(client) => client.is_valid().await,
60 ReaderWriterInternal::Gcs(gcs_client) => gcs_client.is_valid().await,
61 }
62 }
63
64 async fn list(
65 &self,
66 ) -> Either<
67 impl Stream<Item = RSyncResult<RelativePath>> + '_,
68 impl Stream<Item = RSyncResult<RelativePath>> + '_,
69 > {
70 match self {
71 ReaderWriterInternal::Gcs(client) => Either::Left(client.list().await),
72 ReaderWriterInternal::Fs(client) => Either::Right(client.list().await),
73 }
74 }
75
76 async fn read(
77 &self,
78 path: &RelativePath,
79 ) -> Either<impl Stream<Item = RSyncResult<Bytes>>, impl Stream<Item = RSyncResult<Bytes>>>
80 {
81 match self {
82 ReaderWriterInternal::Gcs(client) => Either::Left(client.read(path).await),
83 ReaderWriterInternal::Fs(client) => Either::Right(client.read(path).await),
84 }
85 }
86
87 async fn get_crc32c(&self, path: &RelativePath) -> RSyncResult<Option<Entry>> {
88 match self {
89 ReaderWriterInternal::Gcs(client) => client.get_crc32c(path).await,
90 ReaderWriterInternal::Fs(client) => client.get_crc32c(path).await,
91 }
92 }
93
94 async fn write<S>(
95 &self,
96 mtime: Option<chrono::DateTime<chrono::Utc>>,
97 set_fs_mtime: bool,
98 path: &RelativePath,
99 stream: S,
100 ) -> RSyncResult<()>
101 where
102 S: futures::TryStream<Ok = bytes::Bytes, Error = RSyncError> + Send + Sync + 'static,
103 {
104 async {
105 match self {
106 ReaderWriterInternal::Gcs(client) => match mtime {
107 Some(mtime) => client.write_mtime(mtime, path, stream).await,
108 None => client.write(path, stream).await,
109 },
110 ReaderWriterInternal::Fs(client) => match (mtime, set_fs_mtime) {
111 (Some(mtime), true) => client.write_mtime(mtime, path, stream).await,
112 _ => client.write(path, stream).await,
113 },
114 }
115 }
116 .await
117 }
118
119 async fn delete(&self, path: &RelativePath) -> RSyncResult<()> {
120 match self {
121 ReaderWriterInternal::Gcs(client) => client.delete(path).await,
122 ReaderWriterInternal::Fs(client) => client.delete(path).await,
123 }
124 }
125
126 async fn exists(&self, path: &RelativePath) -> RSyncResult<bool> {
127 match self {
128 ReaderWriterInternal::Gcs(client) => client.exists(path).await,
129 ReaderWriterInternal::Fs(client) => client.exists(path).await,
130 }
131 }
132
133 async fn size_and_mt(
134 &self,
135 path: &RelativePath,
136 ) -> RSyncResult<(Option<chrono::DateTime<chrono::Utc>>, Option<Size>)> {
137 match self {
138 ReaderWriterInternal::Gcs(client) => client.size_and_mt(path).await,
139 ReaderWriterInternal::Fs(client) => client.size_and_mt(path).await,
140 }
141 }
142}
143
144pub struct RSync {
145 source: ReaderWriterInternal,
146 dest: ReaderWriterInternal,
147 restore_fs_mtime: bool,
148 includes: Option<GlobSet>,
149 excludes: Option<GlobSet>,
150}
151
152impl RSync {
153 pub fn new(source: ReaderWriter, dest: ReaderWriter) -> Self {
154 Self {
155 source: source.inner,
156 dest: dest.inner,
157 restore_fs_mtime: false,
158 includes: None,
159 excludes: None,
160 }
161 }
162
163 pub fn with_restore_fs_mtime(mut self, restore_fs_mtime: bool) -> Self {
164 self.restore_fs_mtime = restore_fs_mtime;
165 self
166 }
167
168 fn glob_set(globs: &[&str]) -> RSyncResult<Option<GlobSet>> {
169 fn glob_error(error: globset::Error) -> RSyncError {
170 RSyncError::GlobError(error.to_string())
171 }
172
173 if globs.is_empty() {
174 Ok(None)
175 } else {
176 let glob_set = {
177 let mut builer = GlobSetBuilder::new();
178 for glob in globs {
179 builer.add(Glob::new(glob).map_err(glob_error)?);
180 }
181 builer.build().map_err(glob_error)?
182 };
183 Ok(Some(glob_set))
184 }
185 }
186
187 pub fn with_includes(mut self, includes: &[&str]) -> RSyncResult<Self> {
188 self.includes = Self::glob_set(includes)?;
189 Ok(self)
190 }
191
192 pub fn with_excludes(mut self, excludes: &[&str]) -> RSyncResult<Self> {
193 self.excludes = Self::glob_set(excludes)?;
194 Ok(self)
195 }
196
197 async fn write_entry(
198 &self,
199 mtime: Option<chrono::DateTime<chrono::Utc>>,
200 path: &RelativePath,
201 ) -> RSyncResult<()> {
202 let source = self.source.read(path).await;
203 self.dest
204 .write(mtime, self.restore_fs_mtime, path, source)
205 .await?;
206 Ok(())
207 }
208
209 async fn sync_entry_crc32c(&self, path: &RelativePath) -> RSyncResult<RSyncStatus> {
210 Ok(match self.dest.get_crc32c(path).await? {
211 None => {
212 self.write_entry(None, path).await?;
213 RSyncStatus::updated("no dest crc32c", path)
214 }
215 Some(crc32c_dest) => {
216 let crc32c_source = self.source.get_crc32c(path).await?;
217 if Some(crc32c_dest) == crc32c_source {
218 RSyncStatus::already_synced("same crc32c", path)
219 } else {
220 self.write_entry(None, path).await?;
221 RSyncStatus::updated("different crc32c", path)
222 }
223 }
224 })
225 }
226
227 async fn sync_entry(&self, path: &RelativePath) -> RSyncResult<RSyncStatus> {
228 Ok(match self.dest.size_and_mt(path).await? {
229 (Some(dest_dt), Some(dest_size)) => match self.source.size_and_mt(path).await? {
230 (Some(source_dt), Some(source_size)) => {
231 let dest_ts = dest_dt.timestamp();
232 let source_ts = source_dt.timestamp();
233 if dest_ts == source_ts && dest_size == source_size {
234 RSyncStatus::already_synced("same mtime and size", path)
235 } else {
236 self.write_entry(Some(source_dt), path).await?;
237 RSyncStatus::updated("different size or mtime", path)
238 }
239 }
240 _ => self.sync_entry_crc32c(path).await?,
241 },
242 (None, None) => {
243 let (mtime, _) = self.source.size_and_mt(path).await?;
244 self.write_entry(mtime, path).await?;
245 RSyncStatus::Created(path.to_owned())
246 }
247 _ => self.sync_entry_crc32c(path).await?,
248 })
249 }
250
251 fn filter(&self, relative_path: &RelativePath) -> bool {
252 let i = self
253 .includes
254 .as_ref()
255 .map(|includes| {
256 includes
257 .matches(relative_path.path.as_str())
258 .is_empty()
259 .not()
260 })
261 .unwrap_or(true);
262 let x = self
263 .excludes
264 .as_ref()
265 .map(|excludes| excludes.matches(relative_path.path.as_str()).is_empty())
266 .unwrap_or(true);
267 i && x
268 }
269
270 pub async fn sync(
317 &self,
318 ) -> impl Stream<Item = RSyncResult<impl Future<Output = RSyncResult<RSyncStatus>> + '_>> + '_
319 {
320 self.source
321 .list()
322 .await
323 .try_filter(|x| futures::future::ready(self.filter(x)))
324 .map_ok(move |path| async move { self.sync_entry(&path).await })
325 }
326
327 async fn delete_extras(
328 &self,
329 ) -> impl Stream<Item = RSyncResult<impl Future<Output = RSyncResult<RMirrorStatus>> + '_>> + '_
330 {
331 self.dest.list().await.map(move |result| {
332 result.map(|path| async move {
333 if self.source.exists(&path).await?.not() || self.filter(&path).not() {
334 self.dest.delete(&path).await?;
335 Ok(RMirrorStatus::Deleted(path))
336 } else {
337 Ok(RMirrorStatus::NotDeleted(path))
338 }
339 })
340 })
341 }
342
343 pub async fn mirror(
394 &self,
395 ) -> RSyncResult<
396 impl Stream<Item = RSyncResult<impl Future<Output = RSyncResult<RMirrorStatus>> + '_>> + '_,
397 > {
398 self.source.is_valid().await?;
399
400 let synced = self
401 .sync()
402 .await
403 .map_ok(|fut| async { fut.await.map(RMirrorStatus::Synced) })
404 .map_ok(futures::future::Either::Left);
405
406 let deleted = self
407 .delete_extras()
408 .await
409 .map_ok(futures::future::Either::Right);
410
411 Ok(synced.chain(deleted))
412 }
413}
414
415#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)]
416pub struct RelativePath {
417 path: String,
418}
419
420impl std::fmt::Debug for RelativePath {
421 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
422 write!(f, "{}", self.path)
423 }
424}
425
426impl RelativePath {
427 pub fn new(path: &str) -> RSyncResult<Self> {
428 let path = path.replace('\\', "/").to_owned();
429 if path.is_empty() || path == "/" {
430 Err(RSyncError::EmptyRelativePathError)
431 } else {
432 Ok(Self { path })
433 }
434 }
435}
436#[derive(Debug, PartialEq, Clone)]
437struct Entry {
438 path: RelativePath,
439 crc32c: u32,
440}
441
442impl Entry {
443 pub(self) fn new(path: &RelativePath, crc32c: u32) -> Self {
444 Self {
445 path: path.to_owned(),
446 crc32c,
447 }
448 }
449}
450
451#[derive(Debug)]
452pub enum RSyncError {
453 MissingFieldsInGcsResponse(String),
454 StorageError(super::storage::Error),
455 FsIoError {
456 path: PathBuf,
457 message: String,
458 error: std::io::Error,
459 },
460 EmptyRelativePathError,
461 GlobError(String),
462 InvalidRsyncSource(String),
463}
464
465impl RSyncError {
466 fn fs_io_error<T, U>(message: U, path: T, error: std::io::Error) -> RSyncError
467 where
468 T: AsRef<Path>,
469 U: AsRef<str>,
470 {
471 RSyncError::FsIoError {
472 path: path.as_ref().to_path_buf(),
473 message: message.as_ref().to_owned(),
474 error,
475 }
476 }
477}
478
479impl std::fmt::Display for RSyncError {
480 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
481 write!(f, "{:?}", self)
482 }
483}
484
485impl std::error::Error for RSyncError {}
486
487#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
488pub enum RSyncStatus {
489 Created(RelativePath),
490 Updated { reason: String, path: RelativePath },
491 AlreadySynced { reason: String, path: RelativePath },
492}
493
494impl RSyncStatus {
495 fn updated(reason: &str, path: &RelativePath) -> Self {
496 let reason = reason.to_owned();
497 let path = path.to_owned();
498 Self::Updated { reason, path }
499 }
500
501 fn already_synced(reason: &str, path: &RelativePath) -> Self {
502 let reason = reason.to_owned();
503 let path = path.to_owned();
504 Self::AlreadySynced { reason, path }
505 }
506}
507
508#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
509pub enum RMirrorStatus {
510 Synced(RSyncStatus),
511 Deleted(RelativePath),
512 NotDeleted(RelativePath),
513}
514
515pub type RSyncResult<T> = Result<T, RSyncError>;
516
517#[cfg(test)]
518mod tests {
519 use crate::{gcp::sync::RelativePath, sync::RSyncError};
520
521 #[test]
522 fn test_relative_path() {
523 fn is_empty_err(x: RSyncError) -> bool {
524 matches!(x, RSyncError::EmptyRelativePathError)
525 }
526 assert!(
527 is_empty_err(RelativePath::new("").unwrap_err()),
528 "empty path is not allowed"
529 );
530 assert!(is_empty_err(RelativePath::new("/").unwrap_err()));
531 assert_eq!(
532 "/hello/world",
533 RelativePath::new("/hello/world").unwrap().path
534 );
535 assert_eq!(
536 "hello/world",
537 RelativePath::new("hello/world").unwrap().path
538 );
539 }
540}