use std::io::BufRead;
use futures::{Stream, StreamExt};
use serde::de::DeserializeOwned;
use wme_models::Article;
pub struct NdjsonStream;
impl NdjsonStream {
pub fn parse<T>(
lines: impl Stream<Item = Result<String, std::io::Error>>,
) -> impl Stream<Item = Result<T, crate::StreamError>>
where
T: DeserializeOwned,
{
lines.map(|line| {
let line = line.map_err(|e| crate::StreamError::Io(e.to_string()))?;
serde_json::from_str::<T>(&line)
.map_err(|e| crate::StreamError::JsonParse(e.to_string()))
})
}
pub fn parse_articles(
lines: impl Stream<Item = Result<String, std::io::Error>>,
) -> impl Stream<Item = Result<Article, crate::StreamError>> {
Self::parse(lines)
}
pub fn from_reader<R: BufRead + Send + 'static>(
reader: R,
) -> impl Stream<Item = Result<String, std::io::Error>> {
async_stream::try_stream! {
for line in reader.lines() {
yield line?;
}
}
}
}
pub trait NdjsonExt {
fn parse_ndjson<T: DeserializeOwned>(self)
-> impl Stream<Item = Result<T, crate::StreamError>>;
}
impl<S> NdjsonExt for S
where
S: Stream<Item = Result<String, std::io::Error>>,
{
fn parse_ndjson<T: DeserializeOwned>(
self,
) -> impl Stream<Item = Result<T, crate::StreamError>> {
NdjsonStream::parse(self)
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::stream;
use serde::Deserialize;
use std::io::Error;
#[derive(Deserialize, Debug, PartialEq)]
struct TestData {
id: u64,
name: String,
}
#[tokio::test]
async fn test_parse_valid_lines() {
let lines = vec![
Ok::<_, Error>(r#"{"id":1,"name":"Alice"}"#.to_string()),
Ok::<_, Error>(r#"{"id":2,"name":"Bob"}"#.to_string()),
];
let stream = stream::iter(lines);
let results: Vec<TestData> = NdjsonStream::parse(stream)
.map(|r| r.unwrap())
.collect()
.await;
assert_eq!(results.len(), 2);
assert_eq!(
results[0],
TestData {
id: 1,
name: "Alice".to_string()
}
);
assert_eq!(
results[1],
TestData {
id: 2,
name: "Bob".to_string()
}
);
}
#[tokio::test]
async fn test_parse_invalid_json() {
let lines = vec![
Ok::<_, Error>(r#"{"id":1,"name":"Alice"}"#.to_string()),
Ok::<_, Error>("not valid json".to_string()),
Ok::<_, Error>(r#"{"id":2,"name":"Bob"}"#.to_string()),
];
let stream = stream::iter(lines);
let results: Vec<_> = NdjsonStream::parse::<TestData>(stream).collect().await;
assert_eq!(results.len(), 3);
assert!(results[0].is_ok());
assert!(results[1].is_err()); assert!(results[2].is_ok());
}
#[tokio::test]
async fn test_parse_io_error() {
let lines = vec![
Ok::<_, Error>(r#"{"id":1,"name":"Alice"}"#.to_string()),
Err::<String, _>(Error::other("read failed")),
Ok::<_, Error>(r#"{"id":2,"name":"Bob"}"#.to_string()),
];
let stream = stream::iter(lines);
let results: Vec<_> = NdjsonStream::parse::<TestData>(stream).collect().await;
assert_eq!(results.len(), 3);
assert!(results[0].is_ok());
assert!(results[1].is_err()); assert!(results[2].is_ok());
}
#[test]
fn test_ndjson_ext() {
let lines: Vec<Result<String, Error>> = vec![];
let stream = stream::iter(lines);
let _parsed = stream.parse_ndjson::<TestData>();
}
}