1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
use futures::{Stream, StreamExt, TryStream, TryStreamExt};

use crate::oauth2::token::TokenGenerator;

use super::{
    client::StorageClient,
    resources::object::{ObjectMetadata, Objects},
    Bucket, StorageResult, {Object, ObjectsListRequest, PartialObject},
};

pub struct ObjectClient<T> {
    storage_client: StorageClient<T>,
}

impl<T: TokenGenerator> ObjectClient<T> {
    pub async fn new(token_generator: T) -> StorageResult<Self> {
        Ok(Self {
            storage_client: StorageClient::new(token_generator).await?,
        })
    }
}

impl<T> ObjectClient<T>
where
    T: TokenGenerator,
{
    pub async fn get(&self, o: &Object, fields: &str) -> StorageResult<PartialObject> {
        let url = o.url();
        self.storage_client
            .get_as_json(url.as_str(), &[("fields", fields)])
            .await
    }

    pub async fn delete(&self, o: &Object) -> StorageResult<String> {
        let url = o.url();
        self.storage_client.delete(&url).await?;
        super::StorageResult::Ok(url)
    }

    pub async fn download(
        &self,
        o: &Object,
    ) -> StorageResult<impl Stream<Item = StorageResult<bytes::Bytes>>> {
        let url = o.url();
        Ok(self
            .storage_client
            .get_as_stream(&url, &[("alt", "media")])
            .await?)
    }

    pub async fn upload<S>(&self, o: &Object, stream: S) -> StorageResult<()>
    where
        S: futures::TryStream + Send + Sync + 'static,
        S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
        bytes::Bytes: From<S::Ok>,
    {
        let url = o.upload_url("media");
        self.storage_client.post(&url, stream).await?;
        super::StorageResult::Ok(())
    }

    pub async fn upload_with_metadata<S>(
        &self,
        m: &ObjectMetadata,
        o: &Object,
        stream: S,
    ) -> StorageResult<()>
    where
        S: TryStream<Ok = bytes::Bytes> + Send + Sync + 'static,
        S::Error: Into<Box<dyn std::error::Error + Send + Sync>> + Send + Sync,
    {
        let url = o.upload_url("multipart");
        self.storage_client.post_multipart(&url, m, stream).await?;
        super::StorageResult::Ok(())
    }

    pub async fn list(
        &self,
        bucket: &str,
        objects_list_request: &ObjectsListRequest,
    ) -> impl Stream<Item = StorageResult<PartialObject>> + '_ {
        let objects_list_request = objects_list_request.to_owned();
        let url = Bucket::new(bucket).url();
        futures::stream::try_unfold(
            (Some(objects_list_request), url),
            move |(state, url)| async move {
                match state {
                    None => Ok(None),
                    Some(state) => {
                        let objects: Objects =
                            self.storage_client.get_as_json(&url, &state).await?;
                        let items = futures::stream::iter(objects.items).map(Ok);
                        match objects.next_page_token {
                            None => Ok(Some((items, (None, url)))),
                            Some(next_token) => {
                                let new_state = ObjectsListRequest {
                                    page_token: Some(next_token),
                                    ..state
                                };
                                Ok(Some((items, (Some(new_state), url))))
                            }
                        }
                    }
                }
            },
        )
        .try_flatten()
    }
}