#[macro_use]
extern crate log;
#[macro_use]
extern crate serde_derive;
extern crate colored;
extern crate url;
use std::convert::From;
use std::fs::{metadata, write, File};
use std::io::prelude::*;
use std::path::Path;
use std::str::FromStr;
use chrono::prelude::*;
use quick_xml::events::Event;
use quick_xml::Reader;
use regex::Regex;
use reqwest::{header, Client, Response};
use serde_json;
use url::Url;
mod aws;
static RESPONSE_FORMAT: &'static str =
r#""Contents":\["([^"]+?)","([^"]+?)","\\"([^"]+?)\\"",([^"]+?),"([^"]+?)"(.*?)\]"#;
#[derive(Debug, Clone, Deserialize)]
pub struct CredentialConfig {
pub host: String,
pub user: Option<String>,
pub access_key: String,
pub secret_key: String,
pub region: Option<String>,
pub s3_type: Option<String>,
}
pub enum AuthType {
AWS4,
AWS2,
}
pub enum Format {
JSON,
XML,
}
pub enum UrlStyle {
PATH,
HOST,
}
pub struct Handler<'a> {
pub host: &'a str,
pub access_key: &'a str,
pub secret_key: &'a str,
pub auth_type: AuthType,
pub format: Format,
pub url_style: UrlStyle,
pub region: Option<String>,
}
#[derive(Debug, Clone)]
pub struct S3Object {
pub bucket: Option<String>,
pub key: Option<String>,
pub mtime: Option<String>,
pub etag: Option<String>,
pub storage_class: Option<String>,
}
impl From<String> for S3Object {
fn from(s3_path: String) -> Self {
let url_parser = Url::parse(&s3_path).unwrap();
let bucket = match url_parser.host_str() {
Some(h) if h != "" => Some(h.to_string()),
_ => None,
};
match url_parser.path() {
"/" => S3Object {
bucket: bucket,
key: None,
mtime: None,
etag: None,
storage_class: None,
},
_ => S3Object {
bucket: bucket,
key: Some(url_parser.path().to_string()),
mtime: None,
etag: None,
storage_class: None,
},
}
}
}
impl From<S3Object> for String {
fn from(s3_object: S3Object) -> Self {
match s3_object.bucket {
Some(b) => match s3_object.key {
Some(k) => format!("s3://{}{}", b, k),
None => format!("s3://{}", b),
},
None => format!("s3://"),
}
}
}
pub trait S3Convert {
fn virtural_host_style_links(&self, host: String) -> (String, String);
fn path_style_links(&self, host: String) -> (String, String);
fn new_from_uri(path: String) -> Self;
fn new(
bucket: Option<String>,
key: Option<String>,
mtime: Option<String>,
etag: Option<String>,
storage_class: Option<String>,
) -> Self;
}
impl S3Convert for S3Object {
fn virtural_host_style_links(&self, host: String) -> (String, String) {
match self.bucket.clone() {
Some(b) => (
format!("{}.{}", b, host),
self.key.clone().unwrap_or("/".to_string()),
),
None => (host, "/".to_string()),
}
}
fn path_style_links(&self, host: String) -> (String, String) {
match self.bucket.clone() {
Some(b) => (
host,
format!("/{}{}", b, self.key.clone().unwrap_or("/".to_string())),
),
None => (host, "/".to_string()),
}
}
fn new_from_uri(uri: String) -> S3Object {
let re = Regex::new(r#"/?(?P<bucket>[A-Za-z0-9\-\._]+)(?P<object>[A-Za-z0-9\-\._/]*)\s*"#)
.unwrap();
let caps = re.captures(&uri).expect("S3 object uri format error.");
if &caps["object"] == "" || &caps["object"] == "/" {
S3Object {
bucket: Some(caps["bucket"].to_string()),
key: None,
mtime: None,
etag: None,
storage_class: None,
}
} else {
S3Object {
bucket: Some(caps["bucket"].to_string()),
key: Some(caps["object"].to_string()),
mtime: None,
etag: None,
storage_class: None,
}
}
}
fn new(
bucket: Option<String>,
object: Option<String>,
mtime: Option<String>,
etag: Option<String>,
storage_class: Option<String>,
) -> S3Object {
let key = match object {
None => None,
Some(b) => {
if b.starts_with("/") {
Some(b)
} else {
Some(format!("/{}", b))
}
}
};
S3Object {
bucket: bucket,
key: key,
mtime: mtime,
etag: etag,
storage_class: storage_class,
}
}
}
trait ResponseHandler {
fn handle_response(&mut self) -> (Vec<u8>, reqwest::header::HeaderMap);
}
impl ResponseHandler for Response {
fn handle_response(&mut self) -> (Vec<u8>, reqwest::header::HeaderMap) {
let mut body = Vec::new();
let _ = self.read_to_end(&mut body);
if self.status().is_success() || self.status().is_redirection() {
info!("Status: {}", self.status());
info!("Headers:\n{:?}", self.headers());
info!(
"Body:\n{}\n\n",
std::str::from_utf8(&body).expect("Body can not decode as UTF8")
);
} else {
error!("Status: {}", self.status());
error!("Headers:\n{:?}", self.headers());
error!(
"Body:\n{}\n\n",
std::str::from_utf8(&body).expect("Body can not decode as UTF8")
);
}
(body, self.headers().clone())
}
}
impl<'a> Handler<'a> {
fn aws_v2_request(
&self,
method: &str,
s3_object: &S3Object,
qs: &Vec<(&str, &str)>,
insert_headers: &Vec<(&str, &str)>,
payload: &Vec<u8>,
) -> Result<(Vec<u8>, reqwest::header::HeaderMap), &'static str> {
let utc: DateTime<Utc> = Utc::now();
let mut headers = header::HeaderMap::new();
let time_str = utc.to_rfc2822();
headers.insert("date", time_str.clone().parse().unwrap());
let mut signed_headers = vec![("Date", time_str.as_str())];
let insert_headers_name: Vec<String> = insert_headers
.into_iter()
.map(|x| x.0.to_string())
.collect();
if insert_headers_name.contains(&"delete-marker".to_string()) {
for h in insert_headers {
if h.0 == "delete-marker" {
headers.insert("x-amz-delete-marker", h.1.parse().unwrap());
signed_headers.push(("x-amz-delete-marker", h.1));
}
}
}
if insert_headers_name.contains(&"secure-delete".to_string()) {
for h in insert_headers {
if h.0 == "secure-delete" {
headers.insert("x-amz-secure-delete", h.1.parse().unwrap());
signed_headers.push(("x-amz-secure-delete", h.1));
}
}
}
let mut query_strings = vec![];
match self.format {
Format::JSON => query_strings.push(("format", "json")),
_ => {}
}
query_strings.extend(qs.iter().cloned());
let mut query = String::from_str("http://").unwrap();
let links;
match self.url_style {
UrlStyle::HOST => {
links = s3_object.virtural_host_style_links(self.host.to_string());
query.push_str(&links.0);
query.push_str(&links.1);
}
UrlStyle::PATH => {
links = s3_object.path_style_links(self.host.to_string());
query.push_str(self.host);
query.push_str(&links.1);
}
}
query.push('?');
query.push_str(&aws::canonical_query_string(&mut query_strings));
let signature = aws::aws_s3_v2_sign(
self.secret_key,
&aws::aws_s3_v2_get_string_to_signed(method, &links.1, &mut signed_headers, payload),
);
let mut authorize_string = String::from_str("AWS ").unwrap();
authorize_string.push_str(self.access_key);
authorize_string.push(':');
authorize_string.push_str(&signature);
headers.insert(header::AUTHORIZATION, authorize_string.parse().unwrap());
let client = Client::builder().default_headers(headers).build().unwrap();
let action;
match method {
"GET" => {
action = client.get(query.as_str());
}
"PUT" => {
action = client.put(query.as_str());
}
"DELETE" => {
action = client.delete(query.as_str());
}
"POST" => {
action = client.post(query.as_str());
}
_ => {
error!("unspport HTTP verb");
action = client.get(query.as_str());
}
}
match action.body((*payload).clone()).send() {
Ok(mut res) => Ok(res.handle_response()),
Err(_) => Err("Reqwest Error"),
}
}
fn _aws_v4_request(
&self,
method: &str,
s3_object: &S3Object,
qs: &Vec<(&str, &str)>,
insert_headers: &Vec<(&str, &str)>,
payload: Vec<u8>,
region: Option<String>,
endpoint: Option<String>,
) -> Result<(Vec<u8>, reqwest::header::HeaderMap), &'static str> {
let utc: DateTime<Utc> = Utc::now();
let mut headers = header::HeaderMap::new();
let time_str = utc.format("%Y%m%dT%H%M%SZ").to_string();
headers.insert("x-amz-date", time_str.clone().parse().unwrap());
let payload_hash = aws::hash_payload(&payload);
headers.insert("x-amz-content-sha256", payload_hash.parse().unwrap());
let links = match self.url_style {
UrlStyle::HOST => s3_object.virtural_host_style_links(self.host.to_string()),
UrlStyle::PATH => s3_object.path_style_links(self.host.to_string()),
};
let hostname = match endpoint {
Some(ep) => ep,
None => links.0,
};
let insert_headers_name: Vec<String> = insert_headers
.into_iter()
.map(|x| x.0.to_string())
.collect();
let mut signed_headers = vec![
("X-AMZ-Date", time_str.as_str()),
("Host", hostname.as_str()),
];
if insert_headers_name.contains(&"delete-marker".to_string()) {
for h in insert_headers {
if h.0 == "delete-marker" {
headers.insert("x-amz-delete-marker", h.1.parse().unwrap());
signed_headers.push(("x-amz-delete-marker", h.1));
}
}
}
if insert_headers_name.contains(&"secure-delete".to_string()) {
for h in insert_headers {
if h.0 == "secure-delete" {
headers.insert("x-amz-secure-delete", h.1.parse().unwrap());
signed_headers.push(("x-amz-secure-delete", h.1));
}
}
}
let mut query_strings = vec![];
match self.format {
Format::JSON => query_strings.push(("format", "json")),
_ => {}
}
query_strings.extend(qs.iter().cloned());
let mut query = String::from_str("http://").unwrap();
query.push_str(hostname.as_str());
query.push_str(links.1.as_str());
query.push('?');
query.push_str(&aws::canonical_query_string(&mut query_strings));
let signature = aws::aws_v4_sign(
self.secret_key,
aws::aws_v4_get_string_to_signed(
method,
links.1.as_str(),
&mut query_strings,
&mut signed_headers,
&payload,
utc.format("%Y%m%dT%H%M%SZ").to_string(),
region.clone(),
false,
)
.as_str(),
utc.format("%Y%m%d").to_string(),
region.clone(),
false,
);
let mut authorize_string = String::from_str("AWS4-HMAC-SHA256 Credential=").unwrap();
authorize_string.push_str(self.access_key);
authorize_string.push('/');
authorize_string.push_str(&format!(
"{}/{}/s3/aws4_request, SignedHeaders={}, Signature={}",
utc.format("%Y%m%d").to_string(),
region.clone().unwrap_or(String::from("us-east-1")),
aws::signed_headers(&mut signed_headers),
signature
));
headers.insert(header::AUTHORIZATION, authorize_string.parse().unwrap());
let client = Client::builder().default_headers(headers).build().unwrap();
let action;
match method {
"GET" => {
action = client.get(query.as_str());
}
"PUT" => {
action = client.put(query.as_str());
}
"DELETE" => {
action = client.delete(query.as_str());
}
"POST" => {
action = client.post(query.as_str());
}
_ => {
error!("unspport HTTP verb");
action = client.get(query.as_str());
}
}
match action.body(payload.clone()).send() {
Ok(mut res) => {
match res.status().is_redirection() {
true => {
let body = res.handle_response().0;
let result = std::str::from_utf8(&body).unwrap_or("");
let mut endpoint = "".to_string();
match self.format {
Format::JSON => {
unimplemented!();
}
Format::XML => {
let mut reader = Reader::from_str(&result);
let mut in_tag = false;
let mut buf = Vec::new();
loop {
match reader.read_event(&mut buf) {
Ok(Event::Start(ref e)) => {
if e.name() == b"Endpoint" {
in_tag = true;
}
}
Ok(Event::End(ref e)) => {
if e.name() == b"Endpoint" {
in_tag = false;
}
}
Ok(Event::Text(e)) => {
if in_tag {
endpoint = e.unescape_and_decode(&reader).unwrap();
}
}
Ok(Event::Eof) => break,
Err(e) => panic!(
"Error at position {}: {:?}",
reader.buffer_position(),
e
),
_ => (),
}
buf.clear();
}
}
}
self._aws_v4_request(
method,
s3_object,
qs,
insert_headers,
payload,
Some(
res.headers()["x-amz-bucket-region"]
.to_str()
.unwrap_or("")
.to_string(),
),
Some(endpoint),
)
}
false => Ok(res.handle_response()),
}
}
Err(_) => Err("Reqwest Error"),
}
}
fn aws_v4_request(
&self,
method: &str,
s3_object: &S3Object,
qs: &Vec<(&str, &str)>,
headers: &Vec<(&str, &str)>,
payload: Vec<u8>,
) -> Result<(Vec<u8>, reqwest::header::HeaderMap), &'static str> {
self._aws_v4_request(
method,
s3_object,
qs,
headers,
payload,
self.region.clone(),
None,
)
}
pub fn la(&self) -> Result<Vec<S3Object>, &'static str> {
let mut output = Vec::new();
let re = Regex::new(RESPONSE_FORMAT).unwrap();
let s3_object = S3Object::from("s3://".to_string());
let mut res = match self.auth_type {
AuthType::AWS4 => std::str::from_utf8(
&self
.aws_v4_request("GET", &s3_object, &Vec::new(), &Vec::new(), Vec::new())?
.0,
)
.unwrap_or("")
.to_string(),
AuthType::AWS2 => std::str::from_utf8(
&self
.aws_v2_request("GET", &s3_object, &Vec::new(), &Vec::new(), &Vec::new())?
.0,
)
.unwrap_or("")
.to_string(),
};
let mut buckets = Vec::new();
match self.format {
Format::JSON => {
let result: serde_json::Value = serde_json::from_str(&res).unwrap();
result[1].as_array().map(|bucket_list| {
buckets.extend(
bucket_list
.iter()
.map(|b| b["Name"].as_str().unwrap().to_string()),
)
});
}
Format::XML => {
let mut reader = Reader::from_str(&res);
let mut in_name_tag = false;
let mut buf = Vec::new();
loop {
match reader.read_event(&mut buf) {
Ok(Event::Start(ref e)) => {
if e.name() == b"Name" {
in_name_tag = true;
}
}
Ok(Event::End(ref e)) => {
if e.name() == b"Name" {
in_name_tag = false;
}
}
Ok(Event::Text(e)) => {
if in_name_tag {
buckets.push(e.unescape_and_decode(&reader).unwrap());
}
}
Ok(Event::Eof) => break,
Err(e) => panic!("Error at position {}: {:?}", reader.buffer_position(), e),
_ => (),
}
buf.clear();
}
}
}
for bucket in buckets {
let s3_object = S3Object::from(format!("s3://{}", bucket));
match self.auth_type {
AuthType::AWS4 => {
res = std::str::from_utf8(
&self
.aws_v4_request(
"GET",
&s3_object,
&Vec::new(),
&Vec::new(),
Vec::new(),
)?
.0,
)
.unwrap_or("")
.to_string();
match self.format {
Format::JSON => {
output.extend(re.captures_iter(&res).map(|cap| {
S3Convert::new(
Some(bucket.clone()),
Some(cap[1].to_string()),
Some(cap[2].to_string()),
Some(cap[3].to_string()),
Some(cap[5].to_string()),
)
}));
}
Format::XML => {
let mut reader = Reader::from_str(&res);
let mut in_key_tag = false;
let mut in_mtime_tag = false;
let mut in_etag_tag = false;
let mut in_storage_class_tag = false;
let mut key = String::new();
let mut mtime = String::new();
let mut etag = String::new();
let mut storage_class = String::new();
let mut buf = Vec::new();
loop {
match reader.read_event(&mut buf) {
Ok(Event::Start(ref e)) => match e.name() {
b"Key" => in_key_tag = true,
b"LastModified" => in_mtime_tag = true,
b"ETag" => in_etag_tag = true,
b"StorageClass" => in_storage_class_tag = true,
_ => {}
},
Ok(Event::End(ref e)) => match e.name() {
b"Contents" => output.push(S3Convert::new(
Some(bucket.clone()),
Some(key.clone()),
Some(mtime.clone()),
Some(etag[1..etag.len() - 1].to_string()),
Some(storage_class.clone()),
)),
_ => {}
},
Ok(Event::Text(e)) => {
if in_key_tag {
key = e.unescape_and_decode(&reader).unwrap();
in_key_tag = false;
}
if in_mtime_tag {
mtime = e.unescape_and_decode(&reader).unwrap();
in_mtime_tag = false;
}
if in_etag_tag {
etag = e.unescape_and_decode(&reader).unwrap();
in_etag_tag = false;
}
if in_storage_class_tag {
storage_class = e.unescape_and_decode(&reader).unwrap();
in_storage_class_tag = false;
}
}
Ok(Event::Eof) => break,
Err(e) => panic!(
"Error at position {}: {:?}",
reader.buffer_position(),
e
),
_ => (),
}
buf.clear();
}
}
}
}
AuthType::AWS2 => {
res = std::str::from_utf8(
&self
.aws_v2_request(
"GET",
&s3_object,
&Vec::new(),
&Vec::new(),
&Vec::new(),
)?
.0,
)
.unwrap_or("")
.to_string();
match self.format {
Format::JSON => {
output.extend(re.captures_iter(&res).map(|cap| {
S3Convert::new(
Some(bucket.clone()),
Some(cap[1].to_string()),
Some(cap[2].to_string()),
Some(cap[3].to_string()),
Some(cap[5].to_string()),
)
}));
}
Format::XML => {
let mut reader = Reader::from_str(&res);
let mut in_key_tag = false;
let mut in_mtime_tag = false;
let mut in_etag_tag = false;
let mut in_storage_class_tag = false;
let mut key = String::new();
let mut mtime = String::new();
let mut etag = String::new();
let mut storage_class = String::new();
let mut buf = Vec::new();
loop {
match reader.read_event(&mut buf) {
Ok(Event::Start(ref e)) => match e.name() {
b"Key" => in_key_tag = true,
b"LastModified" => in_mtime_tag = true,
b"ETag" => in_etag_tag = true,
b"StorageClass" => in_storage_class_tag = true,
_ => {}
},
Ok(Event::End(ref e)) => match e.name() {
b"Contents" => output.push(S3Convert::new(
Some(bucket.clone()),
Some(key.clone()),
Some(mtime.clone()),
Some(etag[1..etag.len() - 1].to_string()),
Some(storage_class.clone()),
)),
_ => {}
},
Ok(Event::Text(e)) => {
if in_key_tag {
key = e.unescape_and_decode(&reader).unwrap();
in_key_tag = false;
}
if in_mtime_tag {
mtime = e.unescape_and_decode(&reader).unwrap();
in_mtime_tag = false;
}
if in_etag_tag {
etag = e.unescape_and_decode(&reader).unwrap();
in_etag_tag = false;
}
if in_storage_class_tag {
storage_class = e.unescape_and_decode(&reader).unwrap();
in_storage_class_tag = false;
}
}
Ok(Event::Eof) => break,
Err(e) => panic!(
"Error at position {}: {:?}",
reader.buffer_position(),
e
),
_ => (),
}
buf.clear();
}
}
}
}
}
}
Ok(output)
}
pub fn ls(&self, bucket: Option<&str>) -> Result<Vec<S3Object>, &'static str> {
let mut output = Vec::new();
let res: String;
let s3_object = S3Object::from(bucket.unwrap_or("s3://").to_string());
match s3_object.bucket.clone() {
Some(b) => {
match self.auth_type {
AuthType::AWS4 => {
res = std::str::from_utf8(
&self
.aws_v4_request(
"GET",
&s3_object,
&Vec::new(),
&Vec::new(),
Vec::new(),
)?
.0,
)
.unwrap_or("")
.to_string();
}
AuthType::AWS2 => {
res = std::str::from_utf8(
&self
.aws_v2_request(
"GET",
&s3_object,
&Vec::new(),
&Vec::new(),
&Vec::new(),
)?
.0,
)
.unwrap_or("")
.to_string();
}
}
match self.format {
Format::JSON => {
let re = Regex::new(RESPONSE_FORMAT).unwrap();
output.extend(re.captures_iter(&res).map(|cap| {
S3Convert::new(
Some(b.to_string()),
Some(cap[1].to_string()),
Some(cap[2].to_string()),
Some(cap[3].to_string()),
Some(cap[5].to_string()),
)
}));
}
Format::XML => {
let mut reader = Reader::from_str(&res);
let mut in_key_tag = false;
let mut in_mtime_tag = false;
let mut in_etag_tag = false;
let mut in_storage_class_tag = false;
let mut key = String::new();
let mut mtime = String::new();
let mut etag = String::new();
let mut storage_class = String::new();
let mut buf = Vec::new();
loop {
match reader.read_event(&mut buf) {
Ok(Event::Start(ref e)) => match e.name() {
b"Key" => in_key_tag = true,
b"LastModified" => in_mtime_tag = true,
b"ETag" => in_etag_tag = true,
b"StorageClass" => in_storage_class_tag = true,
_ => {}
},
Ok(Event::End(ref e)) => match e.name() {
b"Contents" => output.push(S3Convert::new(
Some(b.to_string()),
Some(key.clone()),
Some(mtime.clone()),
Some(etag[1..etag.len() - 1].to_string()),
Some(storage_class.clone()),
)),
_ => {}
},
Ok(Event::Text(e)) => {
if in_key_tag {
key = e.unescape_and_decode(&reader).unwrap();
in_key_tag = false;
}
if in_mtime_tag {
mtime = e.unescape_and_decode(&reader).unwrap();
in_mtime_tag = false;
}
if in_etag_tag {
etag = e.unescape_and_decode(&reader).unwrap();
in_etag_tag = false;
}
if in_storage_class_tag {
storage_class = e.unescape_and_decode(&reader).unwrap();
in_storage_class_tag = false;
}
}
Ok(Event::Eof) => break,
Err(e) => panic!(
"Error at position {}: {:?}",
reader.buffer_position(),
e
),
_ => (),
}
buf.clear();
}
}
}
}
None => {
let s3_object = S3Object::from("s3://".to_string());
match self.auth_type {
AuthType::AWS4 => {
res = std::str::from_utf8(
&self
.aws_v4_request(
"GET",
&s3_object,
&Vec::new(),
&Vec::new(),
Vec::new(),
)?
.0,
)
.unwrap_or("")
.to_string();
}
AuthType::AWS2 => {
res = std::str::from_utf8(
&self
.aws_v2_request(
"GET",
&s3_object,
&Vec::new(),
&Vec::new(),
&Vec::new(),
)?
.0,
)
.unwrap_or("")
.to_string();
}
}
match self.format {
Format::JSON => {
let result: serde_json::Value = serde_json::from_str(&res).unwrap();
result[1].as_array().map(|bucket_list| {
output.extend(bucket_list.iter().map(|b| {
S3Convert::new(
Some(b["Name"].as_str().unwrap().to_string()),
None,
None,
None,
None,
)
}))
});
}
Format::XML => {
let mut reader = Reader::from_str(&res);
let mut in_name_tag = false;
let mut buf = Vec::new();
loop {
match reader.read_event(&mut buf) {
Ok(Event::Start(ref e)) => {
if e.name() == b"Name" {
in_name_tag = true
}
}
Ok(Event::End(ref e)) => {
if e.name() == b"Name" {
in_name_tag = false
}
}
Ok(Event::Text(e)) => {
if in_name_tag {
output.push(S3Convert::new(
Some(e.unescape_and_decode(&reader).unwrap()),
None,
None,
None,
None,
));
}
}
Ok(Event::Eof) => break,
Err(e) => panic!(
"Error at position {}: {:?}",
reader.buffer_position(),
e
),
_ => (),
}
buf.clear();
}
}
}
}
};
Ok(output)
}
pub fn put(&self, file: &str, dest: &str) -> Result<(), &'static str> {
if file == "" || dest == "" {
return Err("please specify the file and the destiney");
}
let mut s3_object = S3Object::from(dest.to_string());
let mut content: Vec<u8>;
if s3_object.key.is_none() {
let file_name = Path::new(file).file_name().unwrap().to_string_lossy();
s3_object.key = Some(format!("/{}", file_name));
}
if !Path::new(file).exists() && file == "test" {
content = vec![83, 51, 82, 83, 32, 116, 101, 115, 116, 10];
let _ = match self.auth_type {
AuthType::AWS4 => {
self.aws_v4_request("PUT", &s3_object, &Vec::new(), &Vec::new(), content)
}
AuthType::AWS2 => {
self.aws_v2_request("PUT", &s3_object, &Vec::new(), &Vec::new(), &content)
}
};
} else {
let file_size = match metadata(Path::new(file)) {
Ok(m) => m.len(),
Err(e) => {
error!("file meta error: {}", e);
0
}
};
debug!("upload file size: {}", file_size);
if file_size > 5242880 {
let res = match self.auth_type {
AuthType::AWS4 => std::str::from_utf8(
&self
.aws_v4_request(
"POST",
&s3_object,
&vec![("uploads", "")],
&Vec::new(),
Vec::new(),
)?
.0,
)
.unwrap_or("")
.to_string(),
AuthType::AWS2 => std::str::from_utf8(
&self
.aws_v2_request(
"POST",
&s3_object,
&vec![("uploads", "")],
&Vec::new(),
&Vec::new(),
)?
.0,
)
.unwrap_or("")
.to_string(),
};
let mut upload_id = "".to_string();
match self.format {
Format::JSON => {
error!("No JSON Multipart Implement");
}
Format::XML => {
let mut reader = Reader::from_str(&res);
let mut in_tag = false;
let mut buf = Vec::new();
loop {
match reader.read_event(&mut buf) {
Ok(Event::Start(ref e)) => {
if e.name() == b"UploadId" {
in_tag = true;
}
}
Ok(Event::End(ref e)) => {
if e.name() == b"UploadId" {
in_tag = false;
}
}
Ok(Event::Text(e)) => {
if in_tag {
upload_id = e.unescape_and_decode(&reader).unwrap();
}
}
Ok(Event::Eof) => break,
Err(e) => panic!(
"Error at position {}: {:?}",
reader.buffer_position(),
e
),
_ => (),
}
buf.clear();
}
}
}
info!("upload id: {}", upload_id);
let mut etags = Vec::new();
let mut part = 0u64;
let mut fin = match File::open(file) {
Ok(f) => f,
Err(_) => return Err("input file open error"),
};
loop {
let mut buffer = [0; 5242880];
match fin.read_exact(&mut buffer) {
Ok(_) => {}
Err(e) => {
error!("partial read file error: {}", e);
}
};
part += 1;
trace!("part {}, size: {}", part, buffer.to_vec().len());
let headers = match self.auth_type {
AuthType::AWS4 => {
self.aws_v4_request(
"PUT",
&s3_object,
&vec![
("uploadId", upload_id.as_str()),
("partNumber", part.to_string().as_str()),
],
&Vec::new(),
buffer.to_vec(),
)?
.1
}
AuthType::AWS2 => {
self.aws_v2_request(
"PUT",
&s3_object,
&vec![
("uploadId", upload_id.as_str()),
("partNumber", part.to_string().as_str()),
],
&Vec::new(),
&buffer.to_vec(),
)?
.1
}
};
let etag = headers[reqwest::header::ETAG]
.to_str()
.expect("unexpected etag from server");
etags.push((part.clone(), etag.to_string()));
info!("part: {} uploaded, etag: {}", part, etag);
if part * 5242880 >= file_size {
let mut content = format!("<CompleteMultipartUpload>");
for etag in etags {
content.push_str(&format!(
"<Part><PartNumber>{}</PartNumber><ETag>{}</ETag></Part>",
etag.0, etag.1
));
}
content.push_str(&format!("</CompleteMultipartUpload>"));
let _ = match self.auth_type {
AuthType::AWS4 => self.aws_v4_request(
"POST",
&s3_object,
&vec![("uploadId", upload_id.as_str())],
&Vec::new(),
content.into_bytes(),
),
AuthType::AWS2 => self.aws_v2_request(
"POST",
&s3_object,
&vec![("uploadId", upload_id.as_str())],
&Vec::new(),
&content.into_bytes(),
),
};
info!("complete multipart");
break;
}
}
} else {
content = Vec::new();
let mut fin = match File::open(file) {
Ok(f) => f,
Err(_) => return Err("input file open error"),
};
let _ = fin.read_to_end(&mut content);
let _ = match self.auth_type {
AuthType::AWS4 => {
self.aws_v4_request("PUT", &s3_object, &Vec::new(), &Vec::new(), content)
}
AuthType::AWS2 => {
self.aws_v2_request("PUT", &s3_object, &Vec::new(), &Vec::new(), &content)
}
};
};
}
Ok(())
}
pub fn get(&self, src: &str, file: Option<&str>) -> Result<(), &'static str> {
let s3_object = S3Object::from(src.to_string());
if s3_object.key.is_none() {
return Err("Please specific the object");
}
let fout = match file {
Some(fname) => fname,
None => Path::new(src)
.file_name()
.unwrap()
.to_str()
.unwrap_or("s3download"),
};
match self.auth_type {
AuthType::AWS4 => {
match write(
fout,
self.aws_v4_request("GET", &s3_object, &Vec::new(), &Vec::new(), Vec::new())?
.0,
) {
Ok(_) => return Ok(()),
Err(_) => return Err("write file error"),
}
}
AuthType::AWS2 => {
match write(
fout,
self.aws_v2_request("GET", &s3_object, &Vec::new(), &Vec::new(), &Vec::new())?
.0,
) {
Ok(_) => return Ok(()),
Err(_) => return Err("write file error"),
}
}
}
}
pub fn cat(&self, src: &str) -> Result<(), &'static str> {
let s3_object = S3Object::from(src.to_string());
if s3_object.key.is_none() {
return Err("Please specific the object");
}
match self.auth_type {
AuthType::AWS4 => {
match self.aws_v4_request("GET", &s3_object, &Vec::new(), &Vec::new(), Vec::new()) {
Ok(r) => {
println!("{}", std::str::from_utf8(&r.0).unwrap_or(""));
return Ok(());
}
Err(e) => return Err(e),
}
}
AuthType::AWS2 => {
match self.aws_v2_request("GET", &s3_object, &Vec::new(), &Vec::new(), &Vec::new())
{
Ok(r) => {
println!("{}", std::str::from_utf8(&r.0).unwrap_or(""));
return Ok(());
}
Err(e) => return Err(e),
}
}
}
}
pub fn del_with_flag(
&self,
src: &str,
headers: &Vec<(&str, &str)>,
) -> Result<(), &'static str> {
debug!("headers: {:?}", headers);
let s3_object = S3Object::from(src.to_string());
if s3_object.key.is_none() {
return Err("Please specific the object");
}
let _ = match self.auth_type {
AuthType::AWS4 => {
self.aws_v4_request("DELETE", &s3_object, &Vec::new(), headers, Vec::new())
}
AuthType::AWS2 => {
self.aws_v2_request("GET", &s3_object, &Vec::new(), headers, &Vec::new())
}
};
Ok(())
}
pub fn del(&self, src: &str) -> Result<(), &'static str> {
self.del_with_flag(src, &Vec::new())
}
pub fn mb(&self, bucket: &str) -> Result<(), &'static str> {
let s3_object = S3Object::from(bucket.to_string());
if s3_object.bucket.is_none() {
return Err("please specific the bucket name");
}
let _ = match self.auth_type {
AuthType::AWS4 => {
self.aws_v4_request("PUT", &s3_object, &Vec::new(), &Vec::new(), Vec::new())
}
AuthType::AWS2 => {
self.aws_v2_request("PUT", &s3_object, &Vec::new(), &Vec::new(), &Vec::new())
}
};
Ok(())
}
pub fn rb(&self, bucket: &str) -> Result<(), &'static str> {
let s3_object = S3Object::from(bucket.to_string());
if s3_object.bucket.is_none() {
return Err("please specific the bucket name");
}
let _ = match self.auth_type {
AuthType::AWS4 => {
self.aws_v4_request("DELETE", &s3_object, &Vec::new(), &Vec::new(), Vec::new())
}
AuthType::AWS2 => {
self.aws_v2_request("DELETE", &s3_object, &Vec::new(), &Vec::new(), &Vec::new())
}
};
Ok(())
}
pub fn list_tag(&self, target: &str) -> Result<(), &'static str> {
let res: String;
debug!("target: {:?}", target);
let s3_object = S3Object::from(target.to_string());
if s3_object.key.is_none() {
return Err("Please specific the object");
}
let query_string = vec![("tagging", "")];
res = match self.auth_type {
AuthType::AWS4 => std::str::from_utf8(
&self
.aws_v4_request("GET", &s3_object, &query_string, &Vec::new(), Vec::new())?
.0,
)
.unwrap_or("")
.to_string(),
AuthType::AWS2 => std::str::from_utf8(
&self
.aws_v2_request("GET", &s3_object, &query_string, &Vec::new(), &Vec::new())?
.0,
)
.unwrap_or("")
.to_string(),
};
println!("{}", res);
Ok(())
}
pub fn add_tag(&self, target: &str, tags: &Vec<(&str, &str)>) -> Result<(), &'static str> {
debug!("target: {:?}", target);
debug!("tags: {:?}", tags);
let s3_object = S3Object::from(target.to_string());
if s3_object.key.is_none() {
return Err("Please specific the object");
}
let mut content = format!("<Tagging><TagSet>");
for tag in tags {
content.push_str(&format!(
"<Tag><Key>{}</Key><Value>{}</Value></Tag>",
tag.0, tag.1
));
}
content.push_str(&format!("</TagSet></Tagging>"));
debug!("payload: {:?}", content);
let query_string = vec![("tagging", "")];
let _ = match self.auth_type {
AuthType::AWS4 => self.aws_v4_request(
"PUT",
&s3_object,
&query_string,
&Vec::new(),
content.into_bytes(),
),
AuthType::AWS2 => self.aws_v2_request(
"PUT",
&s3_object,
&query_string,
&Vec::new(),
&content.into_bytes(),
),
};
Ok(())
}
pub fn del_tag(&self, target: &str) -> Result<(), &'static str> {
debug!("target: {:?}", target);
let s3_object = S3Object::from(target.to_string());
if s3_object.key.is_none() {
return Err("Please specific the object");
}
let query_string = vec![("tagging", "")];
let _ = match self.auth_type {
AuthType::AWS4 => {
self.aws_v4_request("DELETE", &s3_object, &query_string, &Vec::new(), Vec::new())
}
AuthType::AWS2 => self.aws_v2_request(
"DELETE",
&s3_object,
&query_string,
&Vec::new(),
&Vec::new(),
),
};
Ok(())
}
pub fn usage(&self, target: &str, options: &Vec<(&str, &str)>) -> Result<(), &'static str> {
let s3_admin_bucket_object = S3Convert::new_from_uri("/admin/buckets".to_string());
let s3_object = S3Object::from(target.to_string());
let mut query_strings = options.clone();
if s3_object.bucket.is_none() {
return Err("S3 format error.");
};
let bucket = s3_object.bucket.unwrap();
query_strings.push(("bucket", &bucket));
let result = match self.auth_type {
AuthType::AWS4 => self.aws_v4_request(
"GET",
&s3_admin_bucket_object,
&query_strings,
&Vec::new(),
Vec::new(),
)?,
AuthType::AWS2 => self.aws_v2_request(
"GET",
&s3_admin_bucket_object,
&query_strings,
&Vec::new(),
&Vec::new(),
)?,
};
match self.format {
Format::JSON => {
let json: serde_json::Value;
json = serde_json::from_str(std::str::from_utf8(&result.0).unwrap_or("")).unwrap();
println!(
"{}",
serde_json::to_string_pretty(&json["usage"]).unwrap_or("".to_string())
);
}
Format::XML => {
}
};
Ok(())
}
pub fn url_command(&self, url: &str) -> Result<(), &'static str> {
let s3_object;
let mut raw_qs = String::new();
let mut query_strings = Vec::new();
match url.find('?') {
Some(idx) => {
s3_object = S3Object::from(url[..idx].to_string());
raw_qs.push_str(&String::from_str(&url[idx + 1..]).unwrap());
for q_pair in raw_qs.split('&') {
match q_pair.find('=') {
Some(_) => query_strings.push((
q_pair.split('=').nth(0).unwrap(),
q_pair.split('=').nth(1).unwrap(),
)),
None => query_strings.push((&q_pair, "")),
}
}
}
None => {
s3_object = S3Object::from(url.to_string());
}
}
let result = match self.auth_type {
AuthType::AWS4 => {
self.aws_v4_request("GET", &s3_object, &query_strings, &Vec::new(), Vec::new())?
}
AuthType::AWS2 => {
self.aws_v2_request("GET", &s3_object, &query_strings, &Vec::new(), &Vec::new())?
}
};
println!("{}", std::str::from_utf8(&result.0).unwrap_or(""));
Ok(())
}
pub fn change_s3_type(&mut self, command: &str) {
println!("set up s3 type as {}", command);
if command.ends_with("aws") {
self.auth_type = AuthType::AWS4;
self.format = Format::XML;
self.url_style = UrlStyle::HOST;
println!("using aws verion 4 signature, xml format, and host style url");
} else if command.ends_with("ceph") {
self.auth_type = AuthType::AWS4;
self.format = Format::JSON;
self.url_style = UrlStyle::PATH;
println!("using aws verion 4 signature, json format, and path style url");
} else {
println!("usage: s3_type [aws/ceph]");
}
}
pub fn change_auth_type(&mut self, command: &str) {
if command.ends_with("aws2") {
self.auth_type = AuthType::AWS2;
println!("using aws version 2 signature");
} else if command.ends_with("aws4") || command.ends_with("aws") {
self.auth_type = AuthType::AWS4;
println!("using aws verion 4 signature");
} else {
println!("usage: auth_type [aws4/aws2]");
}
}
pub fn change_format_type(&mut self, command: &str) {
if command.ends_with("xml") {
self.format = Format::XML;
println!("using xml format");
} else if command.ends_with("json") {
self.format = Format::JSON;
println!("using json format");
} else {
println!("usage: format_type [xml/json]");
}
}
pub fn change_url_style(&mut self, command: &str) {
if command.ends_with("path") {
self.url_style = UrlStyle::PATH;
println!("using path style url");
} else if command.ends_with("host") {
self.url_style = UrlStyle::HOST;
println!("using host style url");
} else {
println!("usage: url_style [path/host]");
}
}
pub fn init_from_config(credential: &'a CredentialConfig) -> Self {
debug!("host: {}", credential.host);
debug!("access key: {}", credential.access_key);
debug!("secret key: {}", credential.secret_key);
match credential
.clone()
.s3_type
.unwrap_or("".to_string())
.as_str()
{
"aws" => Handler {
host: &credential.host,
access_key: &credential.access_key,
secret_key: &credential.secret_key,
auth_type: AuthType::AWS4,
format: Format::XML,
url_style: UrlStyle::HOST,
region: credential.region.clone(),
},
"ceph" => Handler {
host: &credential.host,
access_key: &credential.access_key,
secret_key: &credential.secret_key,
auth_type: AuthType::AWS4,
format: Format::JSON,
url_style: UrlStyle::PATH,
region: credential.region.clone(),
},
_ => Handler {
host: &credential.host,
access_key: &credential.access_key,
secret_key: &credential.secret_key,
auth_type: AuthType::AWS4,
format: Format::XML,
url_style: UrlStyle::PATH,
region: credential.region.clone(),
},
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_s3object_for_dummy_folder() {
let s3_object = S3Object::from("s3://bucket/dummy_folder/".to_string());
assert_eq!(s3_object.bucket, Some("bucket".to_string()));
assert_eq!(s3_object.key, Some("/dummy_folder/".to_string()));
assert_eq!(
"s3://bucket/dummy_folder/".to_string(),
String::from(s3_object)
);
}
#[test]
fn test_s3object_for_bucket() {
let s3_object = S3Object::from("s3://bucket".to_string());
assert_eq!(s3_object.bucket, Some("bucket".to_string()));
assert_eq!(s3_object.key, None);
assert_eq!("s3://bucket".to_string(), String::from(s3_object));
}
#[test]
fn test_s3object_for_dummy_folder_from_uri() {
let s3_object: S3Object = S3Convert::new_from_uri("/bucket/dummy_folder/".to_string());
assert_eq!(
"s3://bucket/dummy_folder/".to_string(),
String::from(s3_object)
);
}
#[test]
fn test_s3object_for_root() {
let s3_object = S3Object::from("s3://".to_string());
assert_eq!(s3_object.bucket, None);
assert_eq!(s3_object.key, None);
}
#[test]
fn test_s3object_for_bucket_from_uri() {
let s3_object: S3Object = S3Convert::new_from_uri("/bucket".to_string());
assert_eq!("s3://bucket".to_string(), String::from(s3_object));
}
#[test]
fn test_s3object_for_slash_end_bucket_from_uri() {
let s3_object: S3Object = S3Convert::new_from_uri("/bucket/".to_string());
assert_eq!("s3://bucket".to_string(), String::from(s3_object));
}
#[test]
fn test_s3object_for_bucket_from_bucket_name() {
let s3_object: S3Object = S3Convert::new_from_uri("bucket".to_string());
assert_eq!("s3://bucket".to_string(), String::from(s3_object));
}
}