gcs-rsync 0.4.8

rsync support for gcs with higher perf than gsutil rsync
Documentation
use crate::storage::{Error as StorageError, Metadata, ObjectMetadata};
use bytes::Bytes;
use chrono::TimeZone;
use futures::{Stream, StreamExt, TryStreamExt};

use super::{Entry, RSyncError, RelativePath};
use crate::{
    gcp::sync::RSyncResult,
    oauth2::token::TokenGenerator,
    storage::{Object, ObjectClient, ObjectsListRequest, PartialObject},
};

pub(super) struct GcsClient {
    client: ObjectClient,
    object_prefix: ObjectPrefix,
}

#[derive(Clone)]
struct ObjectPrefix {
    bucket: String,
    prefix: String,
    objects_list_request: ObjectsListRequest,
}

impl ObjectPrefix {
    fn new(bucket: &str, prefix: &str) -> Self {
        let bucket = bucket.to_owned();
        let prefix = prefix.strip_prefix('/').unwrap_or(prefix).to_owned();

        let objects_list_request = ObjectsListRequest {
            prefix: Some(prefix.to_owned()),
            fields: Some("items(name),nextPageToken".to_owned()),
            ..Default::default()
        };

        Self {
            bucket,
            prefix,
            objects_list_request,
        }
    }

    fn try_get_folder(prefix: &str) -> Option<String> {
        prefix.rfind('/').map(|pos| prefix[..pos + 1].to_owned())
    }

    fn as_object(&self, name: &RelativePath) -> RSyncResult<Object> {
        let name = name.path.as_str();
        let name = {
            let prefix = self.prefix.as_str();
            Self::try_get_folder(prefix)
                .map(|prefix| format!("{prefix}{name}"))
                .unwrap_or_else(|| name.to_owned())
        };

        let object = Object::new(&self.bucket, name.as_str());
        object.map_err(RSyncError::StorageError)
    }

    fn as_relative_path(&self, name: &str) -> RSyncResult<RelativePath> {
        let prefix = self.prefix.as_str();
        let path = Self::try_get_folder(prefix)
            .and_then(|prefix| name.strip_prefix(prefix.as_str()))
            .unwrap_or(name);

        RelativePath::new(path)
    }
}

type Size = u64;

impl GcsClient {
    pub(super) async fn new(
        token_generator: Box<dyn TokenGenerator>,
        bucket: &str,
        prefix: &str,
    ) -> RSyncResult<Self> {
        let object_client = ObjectClient::new(token_generator)
            .await
            .map_err(RSyncError::StorageError)?;
        let object_prefix = ObjectPrefix::new(bucket, prefix);
        Ok(Self {
            client: object_client,
            object_prefix,
        })
    }

    pub(super) fn no_auth(bucket: &str, prefix: &str) -> Self {
        let object_client = ObjectClient::no_auth();
        let object_prefix = ObjectPrefix::new(bucket, prefix);
        Self {
            client: object_client,
            object_prefix,
        }
    }

    pub(super) async fn is_valid(&self) -> RSyncResult<()> {
        self.client
            .is_valid(&self.object_prefix.bucket, &self.object_prefix.prefix)
            .await
            .map_err(RSyncError::StorageError)
    }

    pub(super) async fn list(&self) -> impl Stream<Item = RSyncResult<RelativePath>> + '_ {
        self.client
            .list(
                &self.object_prefix.bucket,
                &self.object_prefix.objects_list_request,
            )
            .await
            .map_err(RSyncError::StorageError)
            .map(move |r| {
                r.and_then(|po| {
                    po.name
                        .ok_or_else(|| RSyncError::MissingFieldsInGcsResponse("name".to_owned()))
                        .and_then(|name| self.object_prefix.as_relative_path(&name))
                })
            })
    }

    pub(super) async fn read(&self, path: &RelativePath) -> impl Stream<Item = RSyncResult<Bytes>> {
        let download_result = async {
            let o = self.object_prefix.as_object(path)?;
            self.client
                .download(&o)
                .await
                .map(|x| x.map_err(RSyncError::StorageError))
                .map_err(RSyncError::StorageError)
        }
        .await;

        futures::stream::once(futures::future::ready(download_result)).try_flatten()
    }

    pub(super) async fn get_crc32c(&self, path: &RelativePath) -> RSyncResult<Option<Entry>> {
        fn to_crc32c(po: PartialObject) -> RSyncResult<u32> {
            po.crc32c
                .map(|x| x.to_u32())
                .ok_or_else(|| RSyncError::MissingFieldsInGcsResponse("crc32c".to_owned()))
        }

        let o = &self.object_prefix.as_object(path)?;
        let entry = self
            .client
            .get(o, "crc32c")
            .await
            .map_err(RSyncError::StorageError)
            .and_then(to_crc32c)
            .map(|crc32c| Entry::new(path, crc32c));

        match entry {
            Ok(e) => Ok(Some(e)),
            Err(RSyncError::StorageError(StorageError::GcsResourceNotFound { .. })) => Ok(None),
            Err(e) => Err(e),
        }
    }

    pub(super) async fn exists(&self, path: &RelativePath) -> RSyncResult<bool> {
        let o = &self.object_prefix.as_object(path)?;
        let entry = self
            .client
            .get(o, "name")
            .await
            .map_err(RSyncError::StorageError);
        match entry {
            Ok(_) => Ok(true),
            Err(RSyncError::StorageError(StorageError::GcsResourceNotFound { .. })) => Ok(false),
            Err(err) => Err(err),
        }
    }

    pub(super) async fn size_and_mt(
        &self,
        path: &RelativePath,
    ) -> RSyncResult<(Option<chrono::DateTime<chrono::Utc>>, Option<Size>)> {
        let o = &self.object_prefix.as_object(path)?;
        let entry = self
            .client
            .get(o, "size,metadata/goog-reserved-file-mtime")
            .await
            .map_err(RSyncError::StorageError);

        match entry {
            Ok(entry) => {
                let size = entry.size;
                let date_time = entry
                    .metadata
                    .and_then(|x| x.modification_time)
                    .and_then(|mtime| chrono::offset::Utc.timestamp_opt(mtime, 0).single());
                Ok((date_time, size))
            }
            Err(RSyncError::StorageError(StorageError::GcsResourceNotFound { .. })) => {
                Ok((None, None))
            }
            Err(err) => Err(err),
        }
    }

    pub(super) async fn delete(&self, path: &RelativePath) -> RSyncResult<()> {
        let o = self.object_prefix.as_object(path)?;
        let delete_result = self.client.delete(&o).await;
        match delete_result {
            Ok(_) | Err(StorageError::GcsResourceNotFound { .. }) => Ok(()),
            Err(e) => Err(RSyncError::StorageError(e)),
        }
    }

    /// The crc32 comparison is done outside to avoid crc32c calculation when remote is not found
    pub(super) async fn write<S>(&self, path: &RelativePath, stream: S) -> RSyncResult<()>
    where
        S: futures::TryStream<Ok = bytes::Bytes, Error = RSyncError> + Send + Sync + 'static,
    {
        let o = &self.object_prefix.as_object(path)?;
        self.client
            .upload(o, stream)
            .await
            .map_err(RSyncError::StorageError)
            .map(|_| ())
    }

    pub(super) async fn write_mtime<S>(
        &self,
        mtime: chrono::DateTime<chrono::Utc>,
        path: &RelativePath,
        stream: S,
    ) -> RSyncResult<()>
    where
        S: futures::TryStream<Ok = bytes::Bytes, Error = RSyncError> + Send + Sync + 'static,
    {
        let o = &self.object_prefix.as_object(path)?;
        let mtime = mtime.timestamp();
        let m = ObjectMetadata {
            metadata: {
                Metadata {
                    modification_time: Some(mtime),
                }
            },
        };
        self.client
            .upload_with_metadata(&m, o, stream)
            .await
            .map_err(RSyncError::StorageError)
            .map(|_| ())
    }
}

#[cfg(test)]
mod tests {

    use crate::{gcp::sync::RelativePath, storage::Object};

    use super::ObjectPrefix;

    #[test]
    fn test_try_get_parent() {
        assert_eq!(Some("/".to_owned()), ObjectPrefix::try_get_folder("/"));
        assert_eq!(
            Some("/hello/world/".to_owned()),
            ObjectPrefix::try_get_folder("/hello/world/hello")
        );
        assert_eq!(
            Some("/hello/world/".to_owned()),
            ObjectPrefix::try_get_folder("/hello/world/")
        );
    }

    #[test]
    fn test_object_prefix_as_object() {
        assert_eq!(
            Object::new("bucket", "hello").unwrap(),
            ObjectPrefix::new("bucket", "")
                .as_object(&RelativePath::new("hello").unwrap())
                .unwrap()
        );

        assert_eq!(
            Object::new("bucket", "hello").unwrap(),
            ObjectPrefix::new("bucket", "")
                .as_object(&RelativePath::new("hello").unwrap())
                .unwrap()
        );

        assert_eq!(
            Object::new("bucket", "hello").unwrap(),
            ObjectPrefix::new("bucket", "prefix")
                .as_object(&RelativePath::new("hello").unwrap())
                .unwrap()
        );

        assert_eq!(
            Object::new("bucket", "prefix/hello").unwrap(),
            ObjectPrefix::new("bucket", "/")
                .as_object(&RelativePath::new("prefix/hello").unwrap())
                .unwrap()
        );

        assert_eq!(
            Object::new("bucket", "prefix/hello").unwrap(),
            ObjectPrefix::new("bucket", "")
                .as_object(&RelativePath::new("prefix/hello").unwrap())
                .unwrap()
        );

        assert_eq!(
            Object::new("bucket", "prefix/prefix2/hello").unwrap(),
            ObjectPrefix::new("bucket", "/prefix/")
                .as_object(&RelativePath::new("prefix2/hello").unwrap())
                .unwrap()
        );

        assert_eq!(
            Object::new("bucket", "prefix/world").unwrap(),
            ObjectPrefix::new("bucket", "/prefix/hello")
                .as_object(&RelativePath::new("world").unwrap())
                .unwrap()
        );
    }

    #[test]
    fn test_object_prefix_as_relative_path() {
        assert_eq!(
            RelativePath::new("hello").unwrap(),
            ObjectPrefix::new("bucket", "")
                .as_relative_path("hello")
                .unwrap()
        );

        assert_eq!(
            RelativePath::new("/hello").unwrap(),
            ObjectPrefix::new("bucket", "")
                .as_relative_path("/hello")
                .unwrap()
        );

        assert_eq!(
            RelativePath::new("hello").unwrap(),
            ObjectPrefix::new("bucket", "/prefix/")
                .as_relative_path("hello")
                .unwrap()
        );

        assert_eq!(
            RelativePath::new("prefix/hello").unwrap(),
            ObjectPrefix::new("bucket", "prefix")
                .as_relative_path("prefix/hello")
                .unwrap()
        );

        assert_eq!(
            RelativePath::new("hello").unwrap(),
            ObjectPrefix::new("bucket", "prefix/")
                .as_relative_path("hello")
                .unwrap()
        );

        assert_eq!(
            RelativePath::new("hello/world").unwrap(),
            ObjectPrefix::new("bucket", "prefix/hello")
                .as_relative_path("hello/world")
                .unwrap()
        );

        assert_eq!(
            RelativePath::new("world").unwrap(),
            ObjectPrefix::new("bucket", "prefix/hello/")
                .as_relative_path("prefix/hello/world")
                .unwrap()
        );

        assert_eq!(
            RelativePath::new("hello/world").unwrap(),
            ObjectPrefix::new("bucket", "prefix/")
                .as_relative_path("prefix/hello/world")
                .unwrap()
        );

        assert_eq!(
            RelativePath::new("world").unwrap(),
            ObjectPrefix::new("bucket", "/prefix/hello/")
                .as_relative_path("prefix/hello/world")
                .unwrap()
        );
    }
}