use lazy_static::lazy_static;
use regex::Regex;
use std::{
io::BufReader,
process::{Command, Stdio},
};
use tokio::io;
use tokio_process::CommandExt;
use super::S3Locator;
use crate::common::*;
use crate::csv_stream::csv_stream_name;
use crate::tokio_glue::copy_reader_to_stream;
pub(crate) async fn local_data_helper(
ctx: Context,
url: Url,
shared_args: SharedArguments<Unverified>,
source_args: SourceArguments<Unverified>,
) -> Result<Option<BoxStream<CsvStream>>> {
let _shared_args = shared_args.verify(S3Locator::features())?;
let _source_args = source_args.verify(S3Locator::features())?;
debug!(ctx.log(), "getting CSV files from {}", url);
debug!(ctx.log(), "listing {}", url);
let mut child = Command::new("aws")
.args(&["s3", "ls", "--recursive", url.as_str()])
.stdout(Stdio::piped())
.spawn_async()
.context("error running `aws s3 ls`")?;
let child_stdout = child.stdout().take().expect("child should have stdout");
ctx.spawn_process(format!("aws s3 ls {}", url), child);
let lines = io::lines(BufReader::with_capacity(BUFFER_SIZE, child_stdout))
.map_err(|e| format_err!("error reading `aws s3 ls` output: {}", e));
let csv_streams = lines.and_then(move |line| {
let ctx = ctx.clone();
let url = url.clone();
async move {
trace!(ctx.log(), "`aws s3 ls` line: {}", line);
let bucket_url = bucket_url(&url)?;
let path = path_from_line(&line)?;
let file_url = bucket_url.join(&path)?;
let name = csv_stream_name(url.as_str(), file_url.as_str())?;
let ctx = ctx.child(
o!("stream" => name.to_owned(), "url" => file_url.as_str().to_owned()),
);
debug!(ctx.log(), "streaming from {} using `aws s3 cp`", file_url);
let mut child = Command::new("aws")
.args(&["s3", "cp", file_url.as_str(), "-"])
.stdout(Stdio::piped())
.spawn_async()
.context("error running `aws s3 cp`")?;
let child_stdout =
child.stdout().take().expect("child should have stdout");
let child_stdout = BufReader::with_capacity(BUFFER_SIZE, child_stdout);
let data = copy_reader_to_stream(ctx.clone(), child_stdout)?;
ctx.spawn_process(format!("aws s3 cp {} -", file_url), child);
Ok(CsvStream {
name: name.to_owned(),
data: Box::new(data),
})
}
.boxed()
.compat()
});
Ok(Some(Box::new(csv_streams) as BoxStream<CsvStream>))
}
fn bucket_url(url: &Url) -> Result<Url> {
let bucket = url
.host()
.ok_or_else(|| format_err!("could not find bucket name in {}", url))?;
let bucket_url = format!("s3://{}/", bucket)
.parse::<Url>()
.context("could not parse S3 URL")?;
Ok(bucket_url)
}
#[test]
fn bucket_url_extracts_bucket() {
let examples = &[
("s3://bucket", "s3://bucket/"),
("s3://bucket/", "s3://bucket/"),
("s3://bucket/dir/", "s3://bucket/"),
("s3://bucket/dir/file.csv", "s3://bucket/"),
];
for &(url, expected) in examples {
assert_eq!(
bucket_url(&url.parse::<Url>().unwrap()).unwrap().as_str(),
expected,
);
}
}
fn path_from_line(line: &str) -> Result<String> {
lazy_static! {
static ref RE: Regex = Regex::new(r#"^[-0-9]+ [:0-9]+ +[0-9]+ ([^\r\n]+)"#)
.expect("invalid regex in source");
}
let cap = RE
.captures(line)
.ok_or_else(|| format_err!("cannot parse S3 ls output: {:?}", line))?;
Ok(cap[1].to_owned())
}
#[test]
fn path_from_line_returns_entire_path() {
let examples = &[
("2013-09-02 21:37:53 10 a.txt", "a.txt"),
("2013-09-02 21:37:53 2863288 foo.zip", "foo.zip"),
(
"2013-09-02 21:32:57 23 foo/bar/.baz/a",
"foo/bar/.baz/a",
),
];
for &(line, rel_path) in examples {
assert_eq!(path_from_line(line).unwrap(), rel_path);
}
}