s3find 0.7.2

A command line utility to walk an Amazon S3 hierarchy. s3find is an analog of find for Amazon S3.
Documentation
use futures::Stream;
use humansize::{file_size_opts as options, FileSize};
use rusoto_core::request::HttpClient;
use rusoto_core::Region;
use rusoto_credential::{DefaultCredentialsProvider, StaticProvider};
use rusoto_s3::*;
use rusoto_s3::{ListObjectsV2Request, Object, S3Client, Tag};
use std::fmt;
use std::ops::Add;

use crate::arg::*;
use crate::filter::Filter;
use crate::function::*;

#[derive(Clone)]
pub struct FilterList(pub Vec<Box<dyn Filter>>);

impl FilterList {
    pub async fn test_match(&self, object: Object) -> bool {
        for item in &self.0 {
            if !item.filter(&object) {
                return false;
            }
        }

        true
    }
}

#[derive(Clone)]
pub struct Find {
    pub client: S3Client,
    pub region: Region,
    pub path: S3Path,
    pub filters: FilterList,
    pub limit: Option<usize>,
    pub page_size: i64,
    pub stats: bool,
    pub summarize: bool,
    pub command: Box<dyn RunCommand>,
}

impl Find {
    #![allow(unreachable_patterns)]
    pub async fn exec(&self, acc: Option<FindStat>, list: Vec<Object>) -> Option<FindStat> {
        let status = match acc {
            Some(stat) => Some(stat + &list),
            None => None,
        };

        let region = &self.region.name();
        self.command
            .execute(&self.client, region, &self.path, &list)
            .await
            .unwrap();
        status
    }

    pub fn to_stream(&self) -> FindStream {
        FindStream {
            client: self.client.clone(),
            path: self.path.clone(),
            token: None,
            page_size: self.page_size,
            initial: true,
        }
    }
}

pub fn default_stats(summarize: bool) -> Option<FindStat> {
    if summarize {
        Some(FindStat::default())
    } else {
        None
    }
}

pub struct FindStream {
    pub client: S3Client,
    pub path: S3Path,
    pub token: Option<String>,
    pub page_size: i64,
    pub initial: bool,
}

impl FindStream {
    async fn list(mut self) -> Option<(Vec<Object>, Self)> {
        if !self.initial && self.token == None {
            return None;
        }

        let request = ListObjectsV2Request {
            bucket: self.path.bucket.clone(),
            continuation_token: self.token.clone(),
            delimiter: None,
            encoding_type: None,
            fetch_owner: None,
            max_keys: Some(self.page_size),
            prefix: self.path.prefix.clone(),
            request_payer: None,
            start_after: None,
        };

        self.initial = false;
        self.token = None;

        let (token, objects) = self
            .client
            .list_objects_v2(request)
            .await
            .map(|x| (x.next_continuation_token, x.contents))
            .unwrap();

        self.token = token;
        objects.map(|x| (x, self))
    }

    pub fn stream(self) -> impl Stream<Item = Vec<Object>> {
        futures::stream::unfold(self, |s| async { s.list().await })
    }
}

impl PartialEq for FindStream {
    fn eq(&self, other: &Self) -> bool {
        self.path == other.path
            && self.token == other.token
            && self.page_size == other.page_size
            && self.initial == other.initial
    }
}

impl fmt::Debug for FindStream {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(
            f,
            "\
FindStream {{
    client,
    path: {:?},
    token: {:?},
    page_size: {},
    initial: {},
}}",
            self.path, self.token, self.page_size, self.initial
        )
    }
}

impl From<FindOpt> for Find {
    fn from(opts: FindOpt) -> Self {
        let filters = opts.clone().into();

        let FindOpt {
            aws_access_key,
            aws_secret_key,
            aws_region,
            path,
            cmd,
            page_size,
            summarize,
            limit,
            ..
        } = opts;

        let region = aws_region.clone();
        let client = get_client(aws_access_key, aws_secret_key, aws_region);
        let command = cmd.unwrap_or_default().downcast();

        Find {
            client,
            filters,
            region,
            path,
            command,
            page_size,
            summarize,
            limit,
            stats: summarize,
        }
    }
}

fn get_client(
    aws_access_key: Option<String>,
    aws_secret_key: Option<String>,
    region: Region,
) -> S3Client {
    let dispatcher = HttpClient::new().unwrap();
    match (aws_access_key, aws_secret_key) {
        (Some(aws_access_key), Some(aws_secret_key)) => {
            let provider = StaticProvider::new(aws_access_key, aws_secret_key, None, None);
            S3Client::new_with(dispatcher, provider, region)
        }
        _ => {
            let provider = DefaultCredentialsProvider::new().unwrap();
            S3Client::new_with(dispatcher, provider, region)
        }
    }
}

impl From<FindOpt> for FilterList {
    fn from(opts: FindOpt) -> Self {
        let mut list: Vec<Box<dyn Filter>> = Vec::new();

        let FindOpt {
            name,
            iname,
            regex,
            size,
            mtime,
            ..
        } = opts;

        for filter in name {
            list.push(Box::new(filter));
        }

        for filter in iname {
            list.push(Box::new(filter));
        }

        for filter in regex {
            list.push(Box::new(filter));
        }

        for filter in size {
            list.push(Box::new(filter));
        }

        for filter in mtime {
            list.push(Box::new(filter));
        }

        FilterList(list)
    }
}

impl From<FindTag> for Tag {
    fn from(tag: FindTag) -> Self {
        Tag {
            key: tag.key,
            value: tag.value,
        }
    }
}

#[derive(Debug, Clone, PartialEq)]
pub struct FindStat {
    pub total_files: usize,
    pub total_space: i64,
    pub max_size: Option<i64>,
    pub min_size: Option<i64>,
    pub max_key: String,
    pub min_key: String,
    pub average_size: i64,
}

impl Add<&[Object]> for FindStat {
    type Output = FindStat;

    #[allow(clippy::suspicious_arithmetic_impl)]
    fn add(mut self: FindStat, list: &[Object]) -> Self {
        for x in list {
            self.total_files += 1;
            let size = x.size.as_ref().unwrap_or(&0);
            self.total_space += size;

            match self.max_size {
                None => {
                    self.max_size = Some(*size);
                    self.max_key = x.key.clone().unwrap_or_default();
                }
                Some(max_size) if max_size <= *size => {
                    self.max_size = Some(*size);
                    self.max_key = x.key.clone().unwrap_or_default();
                }
                _ => {}
            }

            match self.min_size {
                None => {
                    self.min_size = Some(*size);
                    self.min_key = x.key.clone().unwrap_or_default();
                }
                Some(min_size) if min_size > *size => {
                    self.min_size = Some(*size);
                    self.min_key = x.key.clone().unwrap_or_default();
                }
                _ => {}
            }

            self.average_size = self.total_space / (self.total_files as i64);
        }
        self
    }
}

impl Default for FindStat {
    fn default() -> Self {
        FindStat {
            total_files: 0,
            total_space: 0,
            max_size: None,
            min_size: None,
            max_key: "".to_owned(),
            min_key: "".to_owned(),
            average_size: 0,
        }
    }
}

impl fmt::Display for FindStat {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        writeln!(f)?;
        writeln!(f, "Summary")?;
        writeln!(f, "{:19} {}", "Total files:", &self.total_files)?;
        writeln!(
            f,
            "Total space:        {}",
            &self
                .total_space
                .file_size(options::CONVENTIONAL)
                .map_err(|_| fmt::Error)?
        )?;
        writeln!(f, "{:19} {}", "Largest file:", &self.max_key)?;
        writeln!(
            f,
            "{:19} {}",
            "Largest file size:",
            &self
                .max_size
                .unwrap_or_default()
                .file_size(options::CONVENTIONAL)
                .map_err(|_| fmt::Error)?
        )?;
        writeln!(f, "{:19} {}", "Smallest file:", &self.min_key)?;
        writeln!(
            f,
            "{:19} {}",
            "Smallest file size:",
            &self
                .min_size
                .unwrap_or_default()
                .file_size(options::CONVENTIONAL)
                .map_err(|_| fmt::Error)?
        )?;
        writeln!(
            f,
            "{:19} {}",
            "Average file size:",
            &self
                .average_size
                .file_size(options::CONVENTIONAL)
                .map_err(|_| fmt::Error)?
        )?;
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    use anyhow::Error;
    use regex::Regex;
    use rusoto_mock::*;
    use std::str::FromStr;

    #[test]
    fn from_findtag() -> Result<(), Error> {
        let tag: Tag = FindTag {
            key: "tag".to_owned(),
            value: "val".to_owned(),
        }
        .into();

        assert_eq!(
            tag,
            Tag {
                key: "tag".to_owned(),
                value: "val".to_owned(),
            }
        );
        Ok(())
    }

    #[tokio::test]
    async fn from_findopt_to_findcommand() {
        let find: Find = FindOpt {
            path: S3Path {
                bucket: "bucket".to_owned(),
                prefix: Some("prefix".to_owned()),
            },
            aws_access_key: Some("access".to_owned()),
            aws_secret_key: Some("secret".to_owned()),
            aws_region: Region::UsEast1,
            name: vec![NameGlob::from_str("*ref*").unwrap()],
            iname: vec![InameGlob::from_str("Pre*").unwrap()],
            regex: vec![Regex::from_str("^pre").unwrap()],
            mtime: vec![FindTime::Lower(10)],
            size: vec![FindSize::Lower(1000)],
            limit: None,
            page_size: 1000,
            cmd: Some(Cmd::Ls(FastPrint {})),
            summarize: false,
        }
        .into();

        assert_eq!(
            find.path,
            S3Path {
                bucket: "bucket".to_owned(),
                prefix: Some("prefix".to_owned()),
            }
        );
        assert_eq!(find.region, Region::UsEast1);

        let object_ok = Object {
            key: Some("pref".to_owned()),
            size: Some(10),
            ..Default::default()
        };
        assert!(find.filters.test_match(object_ok).await);

        let object_fail = Object {
            key: Some("Refer".to_owned()),
            size: Some(10),
            ..Default::default()
        };
        assert!(!find.filters.test_match(object_fail).await);
    }

    #[test]
    fn filestat_add() -> Result<(), Error> {
        let objects = [
            Object {
                e_tag: Some("9d48114aa7c18f9d68aa20086dbb7756".to_string()),
                key: Some("sample2.txt".to_string()),
                last_modified: Some("2017-07-19T19:04:17.000Z".to_string()),
                owner: None,
                size: Some(20),
                storage_class: Some("STANDARD".to_string()),
            },
            Object {
                e_tag: Some("9d48114aa7c18f9d68aa20086dbb7756".to_string()),
                key: Some("sample1.txt".to_string()),
                last_modified: Some("2017-07-19T19:04:17.000Z".to_string()),
                owner: None,
                size: Some(10),
                storage_class: Some("STANDARD".to_string()),
            },
            Object {
                e_tag: Some("9d48114aa7c18f9d68aa20086dbb7756".to_string()),
                key: Some("sample3.txt".to_string()),
                last_modified: Some("2017-07-19T19:04:15.000Z".to_string()),
                owner: None,
                size: Some(30),
                storage_class: Some("STANDARD".to_string()),
            },
        ];

        let stat = FindStat::default() + &objects;

        let expected = FindStat {
            total_files: 3,
            total_space: 60,
            max_size: Some(30),
            min_size: Some(10),
            max_key: "sample3.txt".to_owned(),
            min_key: "sample1.txt".to_owned(),
            average_size: 20,
        };

        assert_eq!(stat, expected);

        Ok(())
    }

    #[tokio::test]
    async fn findstream_list() -> Result<(), Error> {
        let mock = MockRequestDispatcher::with_status(200).with_body(
            r#"
            <?xml version="1.0" encoding="UTF-8"?>
            <ListBucketResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
              <Name>test</Name>
              <Prefix></Prefix>
              <Marker></Marker>
              <MaxKeys>1</MaxKeys>
              <IsTruncated>false</IsTruncated>
              <Contents>
                <Key>key1</Key>
                <LastModified>2013-01-10T21:45:09.000Z</LastModified>
                <ETag>&quot;1d921b22129502cbbe5cbaf2c8bac682&quot;</ETag>
                <Size>10000</Size>
                <Owner>
                  <ID>1936a5d8a2b189cda450d1d1d514f3861b3adc2df515</ID>
                  <DisplayName>aws</DisplayName>
                </Owner>
                <StorageClass>STANDARD</StorageClass>
              </Contents>
              <Contents>
                <Key>key2</Key>
                <LastModified>2013-01-10T22:45:09.000Z</LastModified>
                <ETag>&quot;1d921b22129502cbbe5cbaf2c8bac682&quot;</ETag>
                <Size>1234</Size>
                <Owner>
                  <ID>1936a5d8a2b189cda450d1d1d514f3861b3adc2df515</ID>
                  <DisplayName>aws</DisplayName>
                </Owner>
                <StorageClass>STANDARD</StorageClass>
              </Contents>
            </ListBucketResult>"#,
        );
        let client = S3Client::new_with(mock, MockCredentialsProvider, Region::UsEast1);
        let path = S3Path {
            bucket: "test".to_owned(),
            prefix: None,
        };

        let command = FindStream {
            client,
            path,
            token: None,
            page_size: 1,
            initial: true,
        };

        let (objects, command2) = command.list().await.unwrap();
        assert_eq!(command2.token, None);
        assert_eq!(command2.initial, false);
        assert_eq!(objects[0].key, Some("key1".to_owned()));
        assert_eq!(objects[1].key, Some("key2".to_owned()));

        Ok(())
    }

    #[test]
    fn test_default_stats() {
        assert_eq!(default_stats(false), None);
        assert_eq!(default_stats(true), Some(Default::default()));
    }

    #[tokio::test]
    async fn test_find_exec() {
        let mock = MockRequestDispatcher::with_status(200);
        let client = S3Client::new_with(mock, MockCredentialsProvider, Region::UsEast1);
        let command: Box<dyn RunCommand> = Box::new(DoNothing {});

        let find = Find {
            client,
            region: Region::UsEast1,
            path: "s3://test/path".parse().unwrap(),
            filters: FilterList(Vec::new()),
            limit: None,
            page_size: 1000,
            stats: true,
            summarize: false,
            command,
        };

        assert_eq!(find.exec(None, Vec::new()).await, None);
        assert_eq!(
            find.exec(Some(Default::default()), Vec::new()).await,
            Some(Default::default())
        );

        let objects = vec![
            Object {
                e_tag: Some("9d48114aa7c18f9d68aa20086dbb7756".to_string()),
                key: Some("sample1.txt".to_string()),
                last_modified: Some("2017-07-19T19:04:17.000Z".to_string()),
                owner: None,
                size: Some(10),
                storage_class: Some("STANDARD".to_string()),
            },
            Object {
                e_tag: Some("9d48114aa7c18f9d68aa20086dbb7756".to_string()),
                key: Some("sample2.txt".to_string()),
                last_modified: Some("2017-07-19T19:04:17.000Z".to_string()),
                owner: None,
                size: Some(20),
                storage_class: Some("STANDARD".to_string()),
            },
            Object {
                e_tag: Some("9d48114aa7c18f9d68aa20086dbb7756".to_string()),
                key: Some("sample3.txt".to_string()),
                last_modified: Some("2017-07-19T19:04:15.000Z".to_string()),
                owner: None,
                size: Some(30),
                storage_class: Some("STANDARD".to_string()),
            },
        ];

        let stat = find.exec(Some(Default::default()), objects).await;
        assert_eq!(
            stat,
            Some(FindStat {
                total_files: 3,
                total_space: 60,
                max_size: Some(30),
                min_size: Some(10),
                max_key: "sample3.txt".to_owned(),
                min_key: "sample1.txt".to_owned(),
                average_size: 20
            })
        );
    }

    #[test]
    fn test_find_findstat() {
        let stat = FindStat {
            total_files: 3,
            total_space: 60,
            max_size: Some(30),
            min_size: Some(10),
            max_key: "sample3.txt".to_owned(),
            min_key: "sample1.txt".to_owned(),
            average_size: 20,
        };
        // smoke debug
        let stat_str = stat.to_string();
        assert!(stat_str.contains("sample1.txt"));
        assert!(stat_str.contains("sample3.txt"));
    }

    #[test]
    fn test_find_stream() {
        let mock = MockRequestDispatcher::with_status(200);
        let client = S3Client::new_with(mock, MockCredentialsProvider, Region::UsEast1);
        let command: Box<dyn RunCommand> = Box::new(DoNothing {});

        let find = Find {
            client: client.clone(),
            region: Region::UsEast1,
            path: "s3://test/path".parse().unwrap(),
            filters: FilterList(Vec::new()),
            limit: None,
            page_size: 1000,
            stats: true,
            summarize: false,
            command,
        };

        let stream = find.to_stream();
        assert_eq!(
            stream,
            FindStream {
                client,
                path: "s3://test/path".parse().unwrap(),
                token: None,
                page_size: 1000,
                initial: true,
            }
        );
    }

    #[test]
    fn test_stream_debug() {
        let mock = MockRequestDispatcher::with_status(200);
        let client = S3Client::new_with(mock, MockCredentialsProvider, Region::UsEast1);

        let stream = FindStream {
            client,
            path: "s3://test/path".parse().unwrap(),
            token: None,
            page_size: 1000,
            initial: true,
        };

        let stream_str = format!("{:?}", stream);
        assert!(stream_str.contains("FindStream"));
        assert!(stream_str.contains("S3Path"));
        assert!(stream_str.contains("test"));
        assert!(stream_str.contains("path"));
        assert!(stream_str.contains("1000"));
    }
}