graph-http 0.0.2

Http client and utilities for the graph-rs project
Documentation
use crate::traits::{AsyncByteRangeRead, AsyncTryFrom, ByteRangeMultiple, ByteRangeRead};
use async_trait::async_trait;
use graph_error::{GraphFailure, GraphResult};
use std::collections::VecDeque;
use std::convert::TryFrom;
use std::fmt::{Debug, Formatter};
use std::io::{ErrorKind, Read, Seek, SeekFrom};
use std::path::{Path, PathBuf};
use tokio::io::AsyncReadExt;

pub struct ByteRanges<F> {
    file: F,
    file_size: u64,
}

impl<F> Debug for ByteRanges<F>
where
    F: Debug,
{
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("ByteRanges<F>")
            .field("file_size", &self.file_size)
            .field("file", &self.file)
            .finish()
    }
}

impl<F> ByteRangeMultiple for ByteRanges<F> {
    fn file_size(&self) -> u64 {
        self.file_size
    }
}

impl ByteRanges<std::fs::File> {
    pub fn new<P: AsRef<Path>>(path: P) -> GraphResult<ByteRanges<std::fs::File>> {
        let file = std::fs::File::open(path)?;
        let file_size = file.metadata()?.len();
        Ok(ByteRanges { file, file_size })
    }
}

impl ByteRangeRead for ByteRanges<std::fs::File> {
    fn read_to_vec(mut self) -> std::io::Result<VecDeque<(u64, u64, Vec<u8>)>> {
        let byte_range = self.byte_range_multiples();
        let size = self.file_size();
        let mut byte_range_queue: VecDeque<(u64, u64, Vec<u8>)> = VecDeque::new();

        let mut end = 0;
        let mut counter = 0;
        while counter < size {
            let from_pos = counter;

            let mut v: Vec<u8> = Vec::new();
            self.file.seek(SeekFrom::Start(from_pos))?;
            if size - counter < byte_range {
                counter += size - counter;
                {
                    let reference = self.file.by_ref();
                    reference.take(counter).read_to_end(&mut v)?;
                }
            } else {
                counter += byte_range;
                {
                    let reference = self.file.by_ref();
                    reference.take(byte_range).read_to_end(&mut v)?;
                }
            }

            let range = end + v.len() as u64;
            if end != 0 && range < size {
                end += 1;
            }

            let start = end;
            if range > size {
                end += size - end;
            } else {
                end += (v.len() as u64) - 1;
            }
            byte_range_queue.push_back((start, end, v));
        }
        Ok(byte_range_queue)
    }

    fn read_to_vec_range(
        mut self,
        start: u64,
        end: u64,
    ) -> std::io::Result<VecDeque<(u64, u64, Vec<u8>)>> {
        let size = self.file_size();
        if start > size || end > size {
            return Err(std::io::Error::new(
                ErrorKind::InvalidData,
                "start or end range is greater than file size",
            ));
        } else if end <= start {
            return Err(std::io::Error::new(
                ErrorKind::InvalidData,
                "start range cannot be greater than or equal to end range",
            ));
        }

        let mut byte_range_queue: VecDeque<(u64, u64, Vec<u8>)> = VecDeque::new();
        let byte_range = end - start;
        let mut end = end;
        let mut counter = start;

        while counter < size {
            let from_pos = counter;
            let mut v: Vec<u8> = Vec::new();
            self.file.seek(SeekFrom::Start(from_pos))?;

            if size - counter < byte_range {
                counter += size - counter;
                self.file.by_ref().take(counter).read_to_end(&mut v)?;
            } else {
                counter += byte_range;
                self.file.by_ref().take(byte_range).read_to_end(&mut v)?;
            }

            let range = end + v.len() as u64;
            if end != 0 && range < size {
                end += 1;
            }

            let start = end;
            if range > size {
                end += size - end;
            } else {
                end += (v.len() as u64) - 1;
            }
            byte_range_queue.push_back((start, end, v));
        }
        Ok(byte_range_queue)
    }
}

impl ByteRanges<tokio::fs::File> {
    pub async fn new<P: AsRef<Path>>(path: P) -> GraphResult<ByteRanges<tokio::fs::File>> {
        let file = tokio::fs::File::open(path).await?;
        let file_size = file.metadata().await?.len();
        Ok(ByteRanges { file, file_size })
    }
}

#[async_trait]
impl AsyncByteRangeRead for ByteRanges<tokio::fs::File> {
    async fn read_to_vec(mut self) -> std::io::Result<VecDeque<(u64, u64, Vec<u8>)>> {
        let range_multiple = self.byte_range_multiples();
        let size = self.file_size();
        let mut byte_range_queue: VecDeque<(u64, u64, Vec<u8>)> = VecDeque::new();

        let mut end = 0;
        let mut counter = 0;
        while counter < size {
            let from_pos = counter;

            let mut v: Vec<u8>;
            self.file.seek(SeekFrom::Start(from_pos)).await?;
            if size - counter < range_multiple {
                let last = size - counter;
                v = vec![0u8; last as usize];
                counter += size - counter;
                self.file.read_exact(&mut v).await?;
            } else {
                counter += range_multiple;
                v = vec![0u8; range_multiple as usize];
                self.file.read_exact(&mut v).await?;
            }

            let range = end + v.len() as u64;
            if end != 0 && range < size {
                end += 1;
            }

            let start = end;
            if range > size {
                end += size - end;
            } else {
                end += (v.len() as u64) - 1;
            }
            byte_range_queue.push_back((start, end, v));
        }
        Ok(byte_range_queue)
    }

    async fn read_to_vec_range(
        mut self,
        start: u64,
        end: u64,
    ) -> std::io::Result<VecDeque<(u64, u64, Vec<u8>)>> {
        let size = self.file_size;
        if start > size || end > size {
            return Err(std::io::Error::new(
                ErrorKind::InvalidData,
                "start or end range is greater than file size",
            ));
        } else if end <= start {
            return Err(std::io::Error::new(
                ErrorKind::InvalidData,
                "start range cannot be greater than or equal to end range",
            ));
        }

        let mut byte_range_queue: VecDeque<(u64, u64, Vec<u8>)> = VecDeque::new();
        let byte_range = end - start;
        let mut end = end;
        let mut counter = start;

        while counter < size {
            let from_pos = counter;
            let mut v: Vec<u8>;
            self.file.seek(SeekFrom::Start(from_pos)).await?;

            if size - counter < byte_range {
                counter += size - counter;
                v = vec![0u8; counter as usize];
                self.file.read_exact(&mut v).await?;
            } else {
                counter += byte_range;
                v = vec![0u8; byte_range as usize];
                self.file.read_exact(&mut v).await?;
            }

            let range = end + v.len() as u64;
            if end != 0 && range < size {
                end += 1;
            }

            let start = end;
            if range > size {
                end += size - end;
            } else {
                end += (v.len() as u64) - 1;
            }
            byte_range_queue.push_back((start, end, v));
        }
        Ok(byte_range_queue)
    }
}

#[derive(Debug, Default)]
pub struct ByteRangeIterator {
    file_size: u64,
    byte_range: VecDeque<(u64, u64, Vec<u8>)>,
}

impl ByteRangeIterator {
    pub fn new(file_size: u64, byte_range: VecDeque<(u64, u64, Vec<u8>)>) -> ByteRangeIterator {
        ByteRangeIterator {
            file_size,
            byte_range,
        }
    }

    pub fn from_range<P: AsRef<Path>>(
        start: u64,
        end: u64,
        file: P,
    ) -> GraphResult<ByteRangeIterator> {
        let byte_range: ByteRanges<std::fs::File> = ByteRanges::<std::fs::File>::new(file)?;
        let file_size = byte_range.file_size();
        Ok(ByteRangeIterator::new(
            file_size,
            byte_range.read_to_vec_range(start, end)?,
        ))
    }

    pub async fn async_from_range<P: AsRef<Path>>(
        start: u64,
        end: u64,
        file: P,
    ) -> GraphResult<ByteRangeIterator> {
        let byte_range: ByteRanges<tokio::fs::File> =
            ByteRanges::<tokio::fs::File>::new(file).await?;
        let file_size = byte_range.file_size();
        Ok(ByteRangeIterator::new(
            file_size,
            byte_range.read_to_vec_range(start, end).await?,
        ))
    }

    pub async fn try_from_async(value: PathBuf) -> GraphResult<ByteRangeIterator> {
        let byte_range: ByteRanges<tokio::fs::File> =
            ByteRanges::<tokio::fs::File>::new(value).await?;
        let file_size = byte_range.file_size();
        let bytes = byte_range.read_to_vec().await?;
        Ok(ByteRangeIterator::new(file_size, bytes))
    }

    pub fn file_size(&self) -> u64 {
        self.file_size
    }

    pub fn is_empty(&self) -> bool {
        self.byte_range.is_empty()
    }

    pub fn pop_front(&mut self) -> Option<(Vec<u8>, u64, String)> {
        let next = self.byte_range.pop_front()?;
        let mut content_length = next.1 - next.0;
        content_length += 1;
        let start = next.0.to_string();
        let end = next.1.to_string();
        let file_size = self.file_size.to_string();
        Some((
            next.2,
            content_length,
            format!(
                "bytes {}-{}/{}",
                start.as_str(),
                end.as_str(),
                file_size.as_str()
            ),
        ))
    }

    pub fn clear(&mut self) {
        self.byte_range.clear();
    }
}

impl Iterator for ByteRangeIterator {
    type Item = (Vec<u8>, u64, String);

    fn next(&mut self) -> Option<Self::Item> {
        self.pop_front()
    }
}

impl TryFrom<PathBuf> for ByteRangeIterator {
    type Error = GraphFailure;

    fn try_from(value: PathBuf) -> Result<Self, Self::Error> {
        let byte_range = ByteRanges::<std::fs::File>::new(value)?;
        let file_size = byte_range.file_size();
        let bytes = byte_range.read_to_vec()?;
        Ok(ByteRangeIterator::new(file_size, bytes))
    }
}

#[async_trait]
impl AsyncTryFrom<PathBuf> for ByteRangeIterator {
    type Error = GraphFailure;

    async fn async_try_from(value: PathBuf) -> Result<Self, Self::Error> {
        let byte_range = ByteRanges::<tokio::fs::File>::new(value).await?;
        let file_size = byte_range.file_size();
        let bytes = byte_range.read_to_vec().await?;
        Ok(ByteRangeIterator::new(file_size, bytes))
    }
}