[][src]Module s3_ext::iter

Iterators over Objects

Example

use futures::stream::{Stream, StreamExt, TryStreamExt};
use futures::future::try_join_all;
use std::future::Future;
use rand::RngCore;
use rusoto_core::Region;
use rusoto_s3::{CreateBucketRequest, PutObjectRequest, S3, S3Client};
use s3_ext::S3Ext;
use std::env;
use tokio::io::AsyncReadExt;

use s3_ext::error::S3ExtError;

#[tokio::main]
async fn main() -> Result<(), S3ExtError> {
    let bucket = format!("iter-module-example-{}", rand::thread_rng().next_u64());

    // setup client

    let access_key = "ANTN35UAENTS5UIAEATD".to_string();
    let secret_key = "TtnuieannGt2rGuie2t8Tt7urarg5nauedRndrur".to_string();
    let endpoint = env::var("S3_ENDPOINT").unwrap_or_else(|_| "http://localhost:9000".to_string());
    let region = Region::Custom {
        name: "eu-west-1".to_string(),
        endpoint,
    };
    let client = s3_ext::new_s3client_with_credentials(region, access_key, secret_key)?;

    // create bucket

    client
        .create_bucket(CreateBucketRequest {
            bucket: bucket.clone(),
            ..Default::default()
        })
        .await?;

    // create test objects

    for obj in (0..5).map(|n| format!("object_{:02}", n)) {
        client
            .put_object(PutObjectRequest {
                bucket: bucket.clone(),
                key: obj.to_string(),
                body: Some(obj.as_bytes().to_vec().into()),
                ..Default::default()
            })
            .await?;
    }

    // iterate over objects objects (sorted alphabetically)

   let objects: Vec<_> = client
        .stream_objects(&bucket)
        .map(|res| res.map(|obj| obj.key))
        .try_collect()
        .await?;
   let objects: Vec<_> = objects.into_iter().filter_map(|x| x).collect();

    assert_eq!(
        objects.as_slice(),
        &[
            "object_00",
            "object_01",
            "object_02",
            "object_03",
            "object_04",
        ]
    );

    // iterate object and fetch content on the fly (sorted alphabetically)
    let results: Result<Vec<_>, _> = client
        .stream_get_objects(&bucket)
        .map(|res| res.map(|(key, obj)| (key, obj.body)))
        .try_collect()
        .await;

    let futures: Vec<_> = results?
        .into_iter()
        .map(|(key, body)| async move {
            let mut buf = Vec::new();
            if let Some(body) = body {
                match body.into_async_read().read_to_end(&mut buf).await {
                    Ok(_) => Ok(Some((key, buf))),
                    Err(e) => Err(e),
                }
            } else {
                Ok(None)
            }
        })
        .collect();
    let results: Result<Vec<_>, _> = try_join_all(futures).await;
    let objects: Vec<_> = results?.into_iter().filter_map(|x| x).collect();

    for (i, (key, body)) in objects.iter().enumerate() {
        let expected = format!("object_{:02}", i);
        assert_eq!(key, &expected);
        assert_eq!(body.as_slice(), expected.as_bytes());
    }
    Ok(())
}

Structs

GetObjectIter

Iterator-like object retrieving all objects or objects with a given prefix

GetObjectStream

Stream which retrieves objects

ObjectIter

Iterator-like objects, forms the basis of ObjectStream

ObjectStream

Stream over objects