#![cfg_attr(docsrs, doc(cfg(feature = "async")))]
use futures_core::Stream;
use futures_sink::Sink;
use pin_project_lite::pin_project;
use serde::{de::DeserializeOwned, Serialize};
use std::io::Result;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{ready, Context, Poll};
use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWrite, AsyncWriteExt, Lines};
pin_project! {
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct AsyncJsonLinesReader<R> {
#[pin]
inner: R,
}
}
impl<R> AsyncJsonLinesReader<R> {
pub fn new(reader: R) -> Self {
AsyncJsonLinesReader { inner: reader }
}
pub fn into_inner(self) -> R {
self.inner
}
pub fn get_ref(&self) -> &R {
&self.inner
}
pub fn get_mut(&mut self) -> &mut R {
&mut self.inner
}
pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
self.project().inner
}
}
impl<R: AsyncBufRead> AsyncJsonLinesReader<R> {
#[allow(clippy::future_not_send)] pub async fn read<T>(&mut self) -> Result<Option<T>>
where
T: DeserializeOwned,
R: Unpin,
{
let mut s = String::new();
let r = self.inner.read_line(&mut s).await?;
if r == 0 {
Ok(None)
} else {
Ok(Some(serde_json::from_str::<T>(&s)?))
}
}
pub fn read_all<T>(self) -> JsonLinesStream<R, T> {
JsonLinesStream {
inner: self.inner.lines(),
_output: PhantomData,
}
}
}
pin_project! {
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct JsonLinesStream<R, T> {
#[pin]
inner: Lines<R>,
_output: PhantomData<T>,
}
}
impl<R: AsyncBufRead, T> Stream for JsonLinesStream<R, T>
where
T: DeserializeOwned,
{
type Item = Result<T>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match ready!(self.project().inner.poll_next_line(cx)) {
Ok(Some(line)) => Some(serde_json::from_str::<T>(&line).map_err(Into::into)).into(),
Ok(None) => None.into(),
Err(e) => Some(Err(e)).into(),
}
}
}
pin_project! {
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct AsyncJsonLinesWriter<W> {
#[pin]
inner: W,
}
}
impl<W> AsyncJsonLinesWriter<W> {
pub fn new(writer: W) -> Self {
AsyncJsonLinesWriter { inner: writer }
}
pub fn into_inner(self) -> W {
self.inner
}
pub fn get_ref(&self) -> &W {
&self.inner
}
pub fn get_mut(&mut self) -> &mut W {
&mut self.inner
}
pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> {
self.project().inner
}
pub fn into_sink<T>(self) -> JsonLinesSink<W, T> {
JsonLinesSink::new(self.inner)
}
}
impl<W: AsyncWrite> AsyncJsonLinesWriter<W> {
#[allow(clippy::future_not_send)] pub async fn write<T>(&mut self, value: &T) -> Result<()>
where
T: ?Sized + Serialize,
W: Unpin,
{
let mut buf = serde_json::to_vec(value)?;
buf.push(b'\n');
self.inner.write_all(&buf).await?;
Ok(())
}
#[allow(clippy::future_not_send)] pub async fn flush(&mut self) -> Result<()>
where
W: Unpin,
{
self.inner.flush().await
}
}
pin_project! {
#[derive(Clone, Debug, Eq, PartialEq)]
#[must_use = "sinks do nothing unless polled"]
pub struct JsonLinesSink<W, T> {
#[pin]
inner: W,
buffer: Option<Vec<u8>>,
offset: usize,
_input: PhantomData<T>,
}
}
impl<W, T> JsonLinesSink<W, T> {
fn new(writer: W) -> Self {
JsonLinesSink {
inner: writer,
buffer: None,
offset: 0,
_input: PhantomData,
}
}
fn poll_flush_buffer(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>
where
W: AsyncWrite,
{
let mut this = self.project();
if let Some(buffer) = this.buffer {
loop {
let written = ready!(this.inner.as_mut().poll_write(cx, &buffer[*this.offset..]))?;
*this.offset += written;
if *this.offset == buffer.len() {
break;
}
}
}
*this.buffer = None;
Poll::Ready(Ok(()))
}
}
impl<W: AsyncWrite, T> Sink<T> for JsonLinesSink<W, T>
where
T: Serialize,
{
type Error = std::io::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.poll_flush_buffer(cx)
}
fn start_send(self: Pin<&mut Self>, item: T) -> Result<()> {
debug_assert!(
self.buffer.is_none(),
"buffer should be None after calling poll_ready()"
);
let this = self.project();
let mut buf = serde_json::to_vec(&item)?;
buf.push(b'\n');
*this.buffer = Some(buf);
*this.offset = 0;
Ok(())
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
ready!(self.as_mut().poll_flush_buffer(cx))?;
ready!(self.project().inner.poll_flush(cx))?;
Poll::Ready(Ok(()))
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
ready!(self.as_mut().poll_flush_buffer(cx))?;
ready!(self.project().inner.poll_shutdown(cx))?;
Poll::Ready(Ok(()))
}
}
pub trait AsyncBufReadJsonLines: AsyncBufRead {
fn json_lines<T>(self) -> JsonLinesStream<Self, T>
where
Self: Sized,
{
JsonLinesStream {
inner: self.lines(),
_output: PhantomData,
}
}
}
impl<R: AsyncBufRead> AsyncBufReadJsonLines for R {}
pub trait AsyncWriteJsonLines: AsyncWrite {
fn into_json_lines_sink<T>(self) -> JsonLinesSink<Self, T>
where
Self: Sized,
{
JsonLinesSink::new(self)
}
}
impl<W: AsyncWrite> AsyncWriteJsonLines for W {}
#[cfg(test)]
mod tests {
use super::*;
fn require_send<T: Send>(_t: T) {}
#[test]
fn test_read_is_send_if_r_is_send() {
let mut ajreader = AsyncJsonLinesReader::new(tokio::io::empty());
let fut = ajreader.read::<String>();
require_send(fut);
}
#[test]
fn test_write_is_send_if_w_is_send() {
let mut ajwriter = AsyncJsonLinesWriter::new(tokio::io::sink());
let s = String::from("This is test text.");
let fut = ajwriter.write(&s);
require_send(fut);
}
#[test]
fn test_flush_is_send_if_w_is_send() {
let mut ajwriter = AsyncJsonLinesWriter::new(tokio::io::sink());
let fut = ajwriter.flush();
require_send(fut);
}
}