ipfs-api 0.7.1

Implementation of an IPFS HTTP API client
Documentation
// Copyright 2017 rust-ipfs-api Developers
//
// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
// http://opensource.org/licenses/MIT>, at your option. This file may not be
// copied, modified, or distributed except according to those terms.
//

use crate::{header::X_STREAM_ERROR, response::Error};
use bytes::{Bytes, BytesMut};
use futures::{
    task::{Context, Poll},
    Stream,
};
use serde::Deserialize;
use serde_json;
use std::{cmp, fmt::Display, io, marker::PhantomData, pin::Pin};
use tokio::io::AsyncRead;
use tokio_util::codec::Decoder;

/// A decoder for a response where each line is a full json object.
///
pub struct JsonLineDecoder<T> {
    /// Set to true if the stream can contain a X-Stream-Error header,
    /// which indicates an error while streaming.
    ///
    parse_stream_error: bool,

    ty: PhantomData<T>,
}

impl<T> JsonLineDecoder<T> {
    #[inline]
    pub fn new(parse_stream_error: bool) -> JsonLineDecoder<T> {
        JsonLineDecoder {
            parse_stream_error,
            ty: PhantomData,
        }
    }
}

impl<T> Decoder for JsonLineDecoder<T>
where
    for<'de> T: Deserialize<'de>,
{
    type Item = T;

    type Error = Error;

    /// Tries to find a new line character. If it does, it will split the buffer,
    /// and parse the first slice.
    ///
    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        let nl_index = src.iter().position(|b| *b == b'\n');

        if let Some(pos) = nl_index {
            let slice = src.split_to(pos + 1);
            let slice = &slice[..slice.len() - 1];

            match serde_json::from_slice(slice) {
                Ok(json) => Ok(json),
                // If a JSON object couldn't be parsed from the response, it is possible
                // that a stream error trailing header was returned. If the JSON decoder
                // was configured to parse these kinds of error, it should try. If a header
                // couldn't be parsed, it will return the original error.
                //
                Err(e) => {
                    if self.parse_stream_error {
                        match slice.iter().position(|&x| x == b':') {
                            Some(colon) if &slice[..colon] == X_STREAM_ERROR.as_bytes() => {
                                let e = Error::StreamError(
                                    String::from_utf8_lossy(&slice[colon + 2..]).into(),
                                );

                                Err(e)
                            }
                            _ => Err(e.into()),
                        }
                    } else {
                        Err(e.into())
                    }
                }
            }
        } else {
            Ok(None)
        }
    }
}

/// A decoder that reads a line at a time.
///
pub struct LineDecoder;

impl Decoder for LineDecoder {
    type Item = String;

    type Error = Error;

    /// Attempts to find a new line character, and returns the entire line if
    /// it finds one.
    ///
    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        let nl_index = src.iter().position(|b| *b == b'\n');

        if let Some(pos) = nl_index {
            let slice = src.split_to(pos + 1);

            Ok(Some(
                String::from_utf8_lossy(&slice[..slice.len() - 1]).into_owned(),
            ))
        } else {
            Ok(None)
        }
    }
}

/// Copies bytes from a Bytes chunk into a destination buffer, and returns
/// the number of bytes that were read.
///
fn copy_from_chunk_to(dest: &mut [u8], chunk: &mut Bytes, chunk_start: usize) -> usize {
    let len = cmp::min(dest.len(), chunk.len() - chunk_start);
    let chunk_end = chunk_start + len;

    dest[..len].copy_from_slice(&chunk[chunk_start..chunk_end]);

    len
}

/// The state of a stream returning Chunks.
///
enum ReadState {
    /// A chunk is ready to be read from.
    ///
    Ready(Bytes, usize),

    /// The next chunk isn't ready yet.
    ///
    NotReady,
}

/// Reads from a stream of chunks asynchronously.
///
pub struct StreamReader<S> {
    stream: S,
    state: ReadState,
}

impl<S, E> StreamReader<S>
where
    S: Stream<Item = Result<Bytes, E>>,
    E: Display,
{
    #[inline]
    pub fn new(stream: S) -> StreamReader<S> {
        StreamReader {
            stream,
            state: ReadState::NotReady,
        }
    }
}

impl<S, E> AsyncRead for StreamReader<S>
where
    S: Stream<Item = Result<Bytes, E>> + Unpin,
    E: Display,
{
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context,
        mut buf: &mut [u8],
    ) -> Poll<io::Result<usize>> {
        match self.state {
            // Stream yielded a Chunk to read.
            //
            ReadState::Ready(ref mut chunk, ref mut pos) => {
                let bytes_read = copy_from_chunk_to(buf, chunk, *pos);

                if *pos + bytes_read >= chunk.len() {
                    self.state = ReadState::NotReady;
                } else {
                    *pos += bytes_read;
                }

                return Poll::Ready(Ok(bytes_read));
            }
            // Stream is not ready, and a Chunk needs to be read.
            //
            ReadState::NotReady => {
                match Stream::poll_next(Pin::new(&mut self.stream), cx) {
                    // Polling stream yielded a Chunk that can be read from.
                    //
                    Poll::Ready(Some(Ok(mut chunk))) => {
                        let bytes_read = copy_from_chunk_to(&mut buf, &mut chunk, 0);

                        if bytes_read >= chunk.len() {
                            self.state = ReadState::NotReady;
                        } else {
                            self.state = ReadState::Ready(chunk, bytes_read);
                        }

                        return Poll::Ready(Ok(bytes_read));
                    }
                    Poll::Ready(Some(Err(e))) => {
                        return Poll::Ready(Err(io::Error::new(
                            io::ErrorKind::Other,
                            e.to_string(),
                        )));
                    }
                    // Polling stream yielded EOF.
                    //
                    Poll::Ready(None) => return Poll::Ready(Ok(0)),
                    // Stream could not be read from.
                    //
                    Poll::Pending => (),
                }
            }
        }

        Poll::Pending
    }
}