use bytes::Bytes;
use reqwest::{self, r#async::Response};
use std::error::Error as StdError;
use crate::common::*;
pub struct CsvStream {
pub name: String,
pub data: BoxStream<BytesMut>,
}
impl CsvStream {
#[cfg(test)]
pub(crate) async fn from_bytes<B>(bytes: B) -> Self
where
B: Into<BytesMut>,
{
use crate::tokio_glue::bytes_channel;
let (sender, receiver) = bytes_channel(1);
sender
.send(Ok(bytes.into()))
.compat()
.await
.expect("could not send bytes to channel");
CsvStream {
name: "bytes".to_owned(),
data: Box::new(receiver),
}
}
#[cfg(test)]
pub(crate) async fn into_bytes(self, ctx: Context) -> Result<BytesMut> {
let ctx = ctx.child(o!("fn" => "into_bytes"));
let mut stream = self.data;
let mut bytes = BytesMut::new();
loop {
match stream.into_future().compat().await {
Err((err, _rest_of_stream)) => {
error!(ctx.log(), "error reading stream: {}", err);
return Err(err);
}
Ok((None, _rest_of_stream)) => {
trace!(ctx.log(), "end of stream");
return Ok(bytes);
}
Ok((Some(new_bytes), rest_of_stream)) => {
trace!(ctx.log(), "received {} bytes", new_bytes.len());
stream = rest_of_stream;
bytes.extend_from_slice(&new_bytes);
}
}
}
}
pub(crate) fn from_http_response(
name: String,
response: Response,
) -> Result<CsvStream> {
let data = response
.into_body()
.map(|chunk| BytesMut::from(chunk.as_ref()))
.map_err(|err| err.into());
Ok(CsvStream {
name,
data: Box::new(data),
})
}
pub(crate) fn into_name_and_portable_stream(
self,
) -> (
String,
impl Stream<Item = Bytes, Error = impl StdError + Send + Sized + Sync + 'static>,
) {
(
self.name,
self.data
.map(|bytes| bytes.freeze())
.map_err(|err| err.compat()),
)
}
}
pub(crate) fn csv_stream_name<'a>(
base_path: &str,
file_path: &'a str,
) -> Result<&'a str> {
let basename_or_relative = if file_path == base_path {
file_path
.rsplitn(2, '/')
.next()
.expect("should have '/' in URL")
} else if file_path.starts_with(base_path) {
if base_path.ends_with('/') {
&file_path[base_path.len()..]
} else if file_path.len() > base_path.len()
&& file_path[base_path.len()..].starts_with('/')
{
&file_path[base_path.len() + 1..]
} else {
return Err(format_err!(
"expected {} to start with {}",
file_path,
base_path,
));
}
} else {
return Err(format_err!(
"expected {} to start with {}",
file_path,
base_path,
));
};
let name = basename_or_relative
.splitn(2, '.')
.next()
.ok_or_else(|| format_err!("can't get basename of {}", file_path))?;
Ok(name)
}
#[test]
fn csv_stream_name_handles_file_inputs() {
let expected = &[
("/path/to/file1.csv", "file1"),
("file2.csv", "file2"),
("s3://bucket/dir/file3.csv", "file3"),
("gs://bucket/dir/file4.csv", "file4"),
];
for &(file_path, stream_name) in expected {
assert_eq!(csv_stream_name(file_path, file_path).unwrap(), stream_name);
}
}
#[test]
fn csv_stream_name_handles_directory_inputs() {
let expected = &[
("dir/", "dir/file1.csv", "file1"),
("dir", "dir/file1.csv", "file1"),
("dir/", "dir/subdir/file2.csv", "subdir/file2"),
(
"s3://bucket/dir/",
"s3://bucket/dir/subdir/file3.csv",
"subdir/file3",
),
];
for &(base_path, file_path, stream_name) in expected {
assert_eq!(csv_stream_name(base_path, file_path).unwrap(), stream_name);
}
}