use std::{error::Error as StdError, fmt, io::Write as _};
use actix_web::{
body::{BodyStream, MessageBody},
HttpResponse, Responder,
};
use bytes::{Bytes, BytesMut};
use futures_core::Stream;
use futures_util::TryStreamExt as _;
use pin_project_lite::pin_project;
use crate::util::{InfallibleStream, MutWriter};
pin_project! {
pub struct DisplayStream<S> {
#[pin]
stream: S,
}
}
impl<S> DisplayStream<S> {
pub fn new(stream: S) -> Self {
Self { stream }
}
}
impl<S> DisplayStream<S> {
pub fn new_infallible(stream: S) -> DisplayStream<InfallibleStream<S>> {
DisplayStream::new(InfallibleStream::new(stream))
}
}
impl<S, T, E> DisplayStream<S>
where
S: Stream<Item = Result<T, E>>,
T: fmt::Display,
E: Into<Box<dyn StdError>> + 'static,
{
pub fn into_body_stream(self) -> impl MessageBody {
BodyStream::new(self.into_chunk_stream())
}
pub fn into_responder(self) -> impl Responder
where
S: 'static,
T: 'static,
E: 'static,
{
HttpResponse::Ok()
.content_type(mime::TEXT_PLAIN_UTF_8)
.message_body(self.into_body_stream())
.unwrap()
}
pub fn into_chunk_stream(self) -> impl Stream<Item = Result<Bytes, E>> {
self.stream.map_ok(write_display)
}
}
fn write_display(item: impl fmt::Display) -> Bytes {
let mut buf = BytesMut::new();
let mut wrt = MutWriter(&mut buf);
writeln!(wrt, "{item}").unwrap();
buf.freeze()
}
#[cfg(test)]
mod tests {
use std::error::Error as StdError;
use actix_web::body;
use futures_util::stream;
use super::*;
#[actix_web::test]
async fn serializes_into_body() {
let ndjson_body = DisplayStream::new_infallible(stream::iter([123, 789, 345, 901, 456]))
.into_body_stream();
let body_bytes = body::to_bytes(ndjson_body)
.await
.map_err(Into::<Box<dyn StdError>>::into)
.unwrap();
const EXP_BYTES: &str = "123\n\
789\n\
345\n\
901\n\
456\n";
assert_eq!(body_bytes, EXP_BYTES);
}
}