use std::{convert::Infallible, error::Error as StdError};
use actix_web::{
body::{BodyStream, MessageBody},
HttpResponse, Responder,
};
use bytes::{Bytes, BytesMut};
use futures_core::Stream;
use futures_util::TryStreamExt as _;
use mime::Mime;
use pin_project_lite::pin_project;
use serde::Serialize;
use crate::util::{InfallibleStream, MutWriter};
pin_project! {
pub struct Csv<S> {
#[pin]
stream: S,
}
}
impl<S> Csv<S> {
pub fn new(stream: S) -> Self {
Self { stream }
}
}
impl<S> Csv<S> {
pub fn new_infallible(stream: S) -> Csv<InfallibleStream<S>> {
Csv::new(InfallibleStream::new(stream))
}
}
impl<S, T, E> Csv<S>
where
S: Stream<Item = Result<T, E>>,
T: Serialize,
E: Into<Box<dyn StdError>> + 'static,
{
pub fn into_body_stream(self) -> impl MessageBody {
BodyStream::new(self.into_chunk_stream())
}
pub fn into_responder(self) -> impl Responder
where
S: 'static,
T: 'static,
E: 'static,
{
HttpResponse::Ok()
.content_type(mime::TEXT_CSV_UTF_8)
.message_body(self.into_body_stream())
.unwrap()
}
pub fn into_chunk_stream(self) -> impl Stream<Item = Result<Bytes, E>> {
self.stream.map_ok(serialize_csv_row)
}
}
impl Csv<Infallible> {
pub fn mime() -> Mime {
mime::TEXT_CSV_UTF_8
}
}
fn serialize_csv_row(item: impl Serialize) -> Bytes {
let mut buf = BytesMut::new();
let wrt = MutWriter(&mut buf);
let mut csv_wrt = csv::Writer::from_writer(wrt);
csv_wrt.serialize(&item).unwrap();
csv_wrt.flush().unwrap();
drop(csv_wrt);
buf.freeze()
}
#[cfg(test)]
mod tests {
use std::error::Error as StdError;
use actix_web::body;
use futures_util::stream;
use super::*;
#[actix_web::test]
async fn serializes_into_body() {
let ndjson_body = Csv::new_infallible(stream::iter([
[123, 456],
[789, 12],
[345, 678],
[901, 234],
[456, 789],
]))
.into_body_stream();
let body_bytes = body::to_bytes(ndjson_body)
.await
.map_err(Into::<Box<dyn StdError>>::into)
.unwrap();
const EXP_BYTES: &str = "123,456\n\
789,12\n\
345,678\n\
901,234\n\
456,789\n";
assert_eq!(body_bytes, EXP_BYTES);
}
}