arche 3.0.1

An opinionated backend foundation for Axum applications, providing batteries-included integrations for cloud services, databases, authentication, middleware, and logging.
Documentation
use crate::error::AppError;
use csv_async::AsyncReader;
use serde::Deserialize;
use std::future::Future;
use std::path::PathBuf;
use tokio::io::AsyncRead;

use super::stream::CsvRecordStream;
use super::{CsvClient, map_csv_error};

pub struct CsvReadBuilder<'c> {
    pub(super) client: &'c CsvClient,
}

impl<'c> CsvReadBuilder<'c> {
    pub fn from_bytes<'d>(self, data: &'d [u8]) -> CsvBytesReader<'c, 'd> {
        CsvBytesReader {
            client: self.client,
            data,
        }
    }

    pub fn from_file(self, path: impl AsRef<std::path::Path>) -> CsvFileReader<'c> {
        CsvFileReader {
            client: self.client,
            path: path.as_ref().to_path_buf(),
        }
    }

    pub fn from_url(self, url: &str) -> CsvUrlReader<'c> {
        CsvUrlReader {
            client: self.client,
            url: url.to_owned(),
        }
    }
}

pub struct CsvBytesReader<'c, 'd> {
    client: &'c CsvClient,
    data: &'d [u8],
}

impl<'c, 'd> CsvBytesReader<'c, 'd> {
    pub async fn deserialize<T>(self) -> Result<Vec<T>, AppError>
    where
        T: for<'de> Deserialize<'de>,
    {
        let mut rdr = self.client.build_reader(self.data);
        collect_deserialized(&mut rdr).await
    }

    pub async fn records(self) -> Result<Vec<csv_async::StringRecord>, AppError> {
        let mut rdr = self.client.build_reader(self.data);
        collect_records(&mut rdr).await
    }

    pub async fn deserialize_batched<T, F, Fut>(
        self,
        batch_size: usize,
        f: F,
    ) -> Result<(), AppError>
    where
        T: for<'de> Deserialize<'de>,
        F: Fn(Vec<T>) -> Fut,
        Fut: Future<Output = Result<(), AppError>>,
    {
        let mut rdr = self.client.build_reader(self.data);
        batched_deserialize(&mut rdr, batch_size, f).await
    }

    pub async fn records_batched<F, Fut>(self, batch_size: usize, f: F) -> Result<(), AppError>
    where
        F: Fn(Vec<csv_async::StringRecord>) -> Fut,
        Fut: Future<Output = Result<(), AppError>>,
    {
        let mut rdr = self.client.build_reader(self.data);
        batched_records(&mut rdr, batch_size, f).await
    }

    pub fn stream(self) -> CsvRecordStream<&'d [u8]> {
        self.client.stream(self.data)
    }
}

pub struct CsvFileReader<'c> {
    client: &'c CsvClient,
    path: PathBuf,
}

impl<'c> CsvFileReader<'c> {
    pub async fn deserialize<T>(self) -> Result<Vec<T>, AppError>
    where
        T: for<'de> Deserialize<'de>,
    {
        let file = open_csv_file(&self.path).await?;
        let mut rdr = self.client.build_reader(file);
        collect_deserialized(&mut rdr).await
    }

    pub async fn records(self) -> Result<Vec<csv_async::StringRecord>, AppError> {
        let file = open_csv_file(&self.path).await?;
        let mut rdr = self.client.build_reader(file);
        collect_records(&mut rdr).await
    }

    pub async fn deserialize_batched<T, F, Fut>(
        self,
        batch_size: usize,
        f: F,
    ) -> Result<(), AppError>
    where
        T: for<'de> Deserialize<'de>,
        F: Fn(Vec<T>) -> Fut,
        Fut: Future<Output = Result<(), AppError>>,
    {
        let file = open_csv_file(&self.path).await?;
        let mut rdr = self.client.build_reader(file);
        batched_deserialize(&mut rdr, batch_size, f).await
    }

    pub async fn records_batched<F, Fut>(self, batch_size: usize, f: F) -> Result<(), AppError>
    where
        F: Fn(Vec<csv_async::StringRecord>) -> Fut,
        Fut: Future<Output = Result<(), AppError>>,
    {
        let file = open_csv_file(&self.path).await?;
        let mut rdr = self.client.build_reader(file);
        batched_records(&mut rdr, batch_size, f).await
    }

    pub async fn stream(self) -> Result<CsvRecordStream<tokio::fs::File>, AppError> {
        let file = open_csv_file(&self.path).await?;
        Ok(self.client.stream(file))
    }
}

pub struct CsvUrlReader<'c> {
    client: &'c CsvClient,
    url: String,
}

impl<'c> CsvUrlReader<'c> {
    pub async fn deserialize<T>(self) -> Result<Vec<T>, AppError>
    where
        T: for<'de> Deserialize<'de>,
    {
        let bytes = fetch_url_bytes(&self.url).await?;
        let mut rdr = self.client.build_reader(bytes.as_slice());
        collect_deserialized(&mut rdr).await
    }

    pub async fn records(self) -> Result<Vec<csv_async::StringRecord>, AppError> {
        let bytes = fetch_url_bytes(&self.url).await?;
        let mut rdr = self.client.build_reader(bytes.as_slice());
        collect_records(&mut rdr).await
    }

    pub async fn deserialize_batched<T, F, Fut>(
        self,
        batch_size: usize,
        f: F,
    ) -> Result<(), AppError>
    where
        T: for<'de> Deserialize<'de>,
        F: Fn(Vec<T>) -> Fut,
        Fut: Future<Output = Result<(), AppError>>,
    {
        let bytes = fetch_url_bytes(&self.url).await?;
        let mut rdr = self.client.build_reader(bytes.as_slice());
        batched_deserialize(&mut rdr, batch_size, f).await
    }

    pub async fn records_batched<F, Fut>(self, batch_size: usize, f: F) -> Result<(), AppError>
    where
        F: Fn(Vec<csv_async::StringRecord>) -> Fut,
        Fut: Future<Output = Result<(), AppError>>,
    {
        let bytes = fetch_url_bytes(&self.url).await?;
        let mut rdr = self.client.build_reader(bytes.as_slice());
        batched_records(&mut rdr, batch_size, f).await
    }
}

async fn fetch_url_bytes(url: &str) -> Result<Vec<u8>, AppError> {
    let response = reqwest::get(url).await.map_err(|e| {
        AppError::internal_error(
            e.to_string(),
            Some("Failed to fetch CSV from URL".to_string()),
        )
    })?;
    Ok(response
        .bytes()
        .await
        .map_err(|e| {
            AppError::internal_error(
                e.to_string(),
                Some("Failed to read CSV response body".to_string()),
            )
        })?
        .to_vec())
}

async fn open_csv_file(path: impl AsRef<std::path::Path>) -> Result<tokio::fs::File, AppError> {
    tokio::fs::File::open(path).await.map_err(|e| {
        AppError::internal_error(e.to_string(), Some("Failed to open CSV file".to_string()))
    })
}

async fn collect_deserialized<T, R>(rdr: &mut AsyncReader<R>) -> Result<Vec<T>, AppError>
where
    T: for<'de> Deserialize<'de>,
    R: AsyncRead + Unpin + Send,
{
    let headers = rdr.headers().await.map_err(map_csv_error)?.clone();
    let header_ref = if headers.is_empty() {
        None
    } else {
        Some(&headers)
    };
    let mut records = Vec::new();
    let mut record = csv_async::StringRecord::new();
    while rdr.read_record(&mut record).await.map_err(map_csv_error)? {
        let item: T = record.deserialize(header_ref).map_err(map_csv_error)?;
        records.push(item);
    }
    Ok(records)
}

async fn collect_records<R>(
    rdr: &mut AsyncReader<R>,
) -> Result<Vec<csv_async::StringRecord>, AppError>
where
    R: AsyncRead + Unpin + Send,
{
    let mut records = Vec::new();
    let mut record = csv_async::StringRecord::new();
    while rdr.read_record(&mut record).await.map_err(map_csv_error)? {
        records.push(record.clone());
    }
    Ok(records)
}

async fn batched_deserialize<T, R, F, Fut>(
    rdr: &mut AsyncReader<R>,
    batch_size: usize,
    f: F,
) -> Result<(), AppError>
where
    T: for<'de> Deserialize<'de>,
    R: AsyncRead + Unpin + Send,
    F: Fn(Vec<T>) -> Fut,
    Fut: Future<Output = Result<(), AppError>>,
{
    let headers = rdr.headers().await.map_err(map_csv_error)?.clone();
    let header_ref = if headers.is_empty() {
        None
    } else {
        Some(&headers)
    };
    let mut batch = Vec::with_capacity(batch_size);
    let mut record = csv_async::StringRecord::new();
    while rdr.read_record(&mut record).await.map_err(map_csv_error)? {
        let item: T = record.deserialize(header_ref).map_err(map_csv_error)?;
        batch.push(item);
        if batch.len() >= batch_size {
            f(batch).await?;
            batch = Vec::with_capacity(batch_size);
        }
    }
    if !batch.is_empty() {
        f(batch).await?;
    }
    Ok(())
}

async fn batched_records<R, F, Fut>(
    rdr: &mut AsyncReader<R>,
    batch_size: usize,
    f: F,
) -> Result<(), AppError>
where
    R: AsyncRead + Unpin + Send,
    F: Fn(Vec<csv_async::StringRecord>) -> Fut,
    Fut: Future<Output = Result<(), AppError>>,
{
    let mut batch = Vec::with_capacity(batch_size);
    let mut record = csv_async::StringRecord::new();
    while rdr.read_record(&mut record).await.map_err(map_csv_error)? {
        batch.push(record.clone());
        if batch.len() >= batch_size {
            f(batch).await?;
            batch = Vec::with_capacity(batch_size);
        }
    }
    if !batch.is_empty() {
        f(batch).await?;
    }
    Ok(())
}