arche 2.4.0

An opinionated backend foundation for Axum applications, providing batteries-included integrations for cloud services, databases, authentication, middleware, and logging.
Documentation
use crate::error::AppError;
use csv_async::AsyncReader;
use serde::Deserialize;
use tokio::io::AsyncRead;

use super::map_csv_error;

pub struct CsvRecordStream<R: AsyncRead + Unpin + Send> {
    pub(super) inner: AsyncReader<R>,
    pub(super) headers: Option<csv_async::StringRecord>,
}

impl<R: AsyncRead + Unpin + Send> CsvRecordStream<R> {
    pub async fn next_deserialized<T>(&mut self) -> Option<Result<T, AppError>>
    where
        T: for<'de> Deserialize<'de>,
    {
        if self.headers.is_none() {
            match self.inner.headers().await {
                Ok(h) => self.headers = Some(h.clone()),
                Err(e) => return Some(Err(map_csv_error(e))),
            }
        }
        let header_ref = self
            .headers
            .as_ref()
            .and_then(|h| if h.is_empty() { None } else { Some(h) });
        let mut record = csv_async::StringRecord::new();
        match self.inner.read_record(&mut record).await {
            Ok(true) => Some(record.deserialize(header_ref).map_err(map_csv_error)),
            Ok(false) => None,
            Err(e) => Some(Err(map_csv_error(e))),
        }
    }

    pub async fn next_record(&mut self) -> Option<Result<csv_async::StringRecord, AppError>> {
        let mut record = csv_async::StringRecord::new();
        match self.inner.read_record(&mut record).await {
            Ok(true) => Some(Ok(record)),
            Ok(false) => None,
            Err(e) => Some(Err(map_csv_error(e))),
        }
    }

    pub async fn headers(&mut self) -> Result<&csv_async::StringRecord, AppError> {
        if self.headers.is_none() {
            let h = self.inner.headers().await.map_err(map_csv_error)?.clone();
            self.headers = Some(h);
        }
        Ok(self.headers.as_ref().unwrap())
    }

    pub async fn next_batch_deserialized<T>(
        &mut self,
        batch_size: usize,
    ) -> Option<Result<Vec<T>, AppError>>
    where
        T: for<'de> Deserialize<'de>,
    {
        let mut batch = Vec::with_capacity(batch_size);
        for _ in 0..batch_size {
            match self.next_deserialized::<T>().await {
                Some(Ok(item)) => batch.push(item),
                Some(Err(e)) => return Some(Err(e)),
                None => break,
            }
        }
        if batch.is_empty() {
            None
        } else {
            Some(Ok(batch))
        }
    }

    pub async fn next_batch(
        &mut self,
        batch_size: usize,
    ) -> Option<Result<Vec<csv_async::StringRecord>, AppError>> {
        let mut batch = Vec::with_capacity(batch_size);
        for _ in 0..batch_size {
            match self.next_record().await {
                Some(Ok(record)) => batch.push(record),
                Some(Err(e)) => return Some(Err(e)),
                None => break,
            }
        }
        if batch.is_empty() {
            None
        } else {
            Some(Ok(batch))
        }
    }

    pub fn into_inner(self) -> AsyncReader<R> {
        self.inner
    }
}