use std::sync::Arc;
use async_trait::async_trait;
use bytes::Buf;
use quick_xml::de;
use serde::Deserialize;
use super::core::S3Core;
use super::error::parse_error;
use crate::raw::*;
use crate::EntryMode;
use crate::Metadata;
use crate::Result;
pub struct S3Pager {
core: Arc<S3Core>,
path: String,
delimiter: String,
limit: Option<usize>,
start_after: Option<String>,
token: String,
done: bool,
}
impl S3Pager {
pub fn new(
core: Arc<S3Core>,
path: &str,
delimiter: &str,
limit: Option<usize>,
start_after: Option<&str>,
) -> Self {
Self {
core,
path: path.to_string(),
delimiter: delimiter.to_string(),
limit,
start_after: start_after.map(String::from),
token: "".to_string(),
done: false,
}
}
}
#[async_trait]
impl oio::Page for S3Pager {
async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
if self.done {
return Ok(None);
}
let resp = self
.core
.s3_list_objects(
&self.path,
&self.token,
&self.delimiter,
self.limit,
self.start_after.clone(),
)
.await?;
if resp.status() != http::StatusCode::OK {
return Err(parse_error(resp).await?);
}
let bs = resp.into_body().bytes().await?;
let output: Output = de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?;
self.done = if let Some(is_truncated) = output.is_truncated {
!is_truncated
} else if let Some(next_continuation_token) = output.next_continuation_token.as_ref() {
next_continuation_token.is_empty()
} else {
output.common_prefixes.is_empty() && output.contents.is_empty()
};
self.token = output.next_continuation_token.clone().unwrap_or_default();
let mut entries = Vec::with_capacity(output.common_prefixes.len() + output.contents.len());
for prefix in output.common_prefixes {
let de = oio::Entry::new(
&build_rel_path(&self.core.root, &prefix.prefix),
Metadata::new(EntryMode::DIR),
);
entries.push(de);
}
for object in output.contents {
if object.key.ends_with('/') {
continue;
}
let mut meta = Metadata::new(EntryMode::FILE);
meta.set_etag(&object.etag);
meta.set_content_md5(object.etag.trim_matches('"'));
meta.set_content_length(object.size);
meta.set_last_modified(parse_datetime_from_rfc3339(object.last_modified.as_str())?);
let de = oio::Entry::new(&build_rel_path(&self.core.root, &object.key), meta);
entries.push(de);
}
Ok(Some(entries))
}
}
#[derive(Default, Debug, Deserialize)]
#[serde(default, rename_all = "PascalCase")]
struct Output {
is_truncated: Option<bool>,
next_continuation_token: Option<String>,
common_prefixes: Vec<OutputCommonPrefix>,
contents: Vec<OutputContent>,
}
#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
#[serde(rename_all = "PascalCase")]
struct OutputContent {
key: String,
size: u64,
last_modified: String,
#[serde(rename = "ETag")]
etag: String,
}
#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
#[serde(rename_all = "PascalCase")]
struct OutputCommonPrefix {
prefix: String,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_list_output() {
let bs = bytes::Bytes::from(
r#"<ListBucketResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<Name>example-bucket</Name>
<Prefix>photos/2006/</Prefix>
<KeyCount>3</KeyCount>
<MaxKeys>1000</MaxKeys>
<Delimiter>/</Delimiter>
<IsTruncated>false</IsTruncated>
<Contents>
<Key>photos/2006</Key>
<LastModified>2016-04-30T23:51:29.000Z</LastModified>
<ETag>"d41d8cd98f00b204e9800998ecf8427e"</ETag>
<Size>56</Size>
<StorageClass>STANDARD</StorageClass>
</Contents>
<Contents>
<Key>photos/2007</Key>
<LastModified>2016-04-30T23:51:29.000Z</LastModified>
<ETag>"d41d8cd98f00b204e9800998ecf8427e"</ETag>
<Size>100</Size>
<StorageClass>STANDARD</StorageClass>
</Contents>
<CommonPrefixes>
<Prefix>photos/2006/February/</Prefix>
</CommonPrefixes>
<CommonPrefixes>
<Prefix>photos/2006/January/</Prefix>
</CommonPrefixes>
</ListBucketResult>"#,
);
let out: Output = de::from_reader(bs.reader()).expect("must success");
assert!(!out.is_truncated.unwrap());
assert!(out.next_continuation_token.is_none());
assert_eq!(
out.common_prefixes
.iter()
.map(|v| v.prefix.clone())
.collect::<Vec<String>>(),
vec!["photos/2006/February/", "photos/2006/January/"]
);
assert_eq!(
out.contents,
vec![
OutputContent {
key: "photos/2006".to_string(),
size: 56,
etag: "\"d41d8cd98f00b204e9800998ecf8427e\"".to_string(),
last_modified: "2016-04-30T23:51:29.000Z".to_string(),
},
OutputContent {
key: "photos/2007".to_string(),
size: 100,
last_modified: "2016-04-30T23:51:29.000Z".to_string(),
etag: "\"d41d8cd98f00b204e9800998ecf8427e\"".to_string(),
}
]
)
}
}