arche 3.0.1

An opinionated backend foundation for Axum applications, providing batteries-included integrations for cloud services, databases, authentication, middleware, and logging.
Documentation
mod read;
mod stream;
mod writer;

pub use read::{CsvBytesReader, CsvFileReader, CsvReadBuilder, CsvUrlReader};
pub use stream::CsvRecordStream;
pub use writer::CsvRecordWriter;

use crate::error::AppError;
use csv_async::{AsyncReaderBuilder, AsyncWriterBuilder};
use serde::Serialize;
use tokio::io::{AsyncRead, AsyncWrite};

pub(crate) fn map_csv_error(err: csv_async::Error) -> AppError {
    AppError::internal_error(err.to_string(), Some("CSV processing error".to_string()))
}

#[derive(Debug, Clone)]
pub struct CsvClient {
    delimiter: u8,
    has_headers: bool,
    flexible: bool,
    quoting: bool,
    quote: u8,
    escape: Option<u8>,
    comment: Option<u8>,
}

impl Default for CsvClient {
    fn default() -> Self {
        Self {
            delimiter: b',',
            has_headers: true,
            flexible: false,
            quoting: true,
            quote: b'"',
            escape: None,
            comment: None,
        }
    }
}

impl CsvClient {
    pub fn new() -> Self {
        Self::default()
    }

    pub fn delimiter(mut self, delimiter: u8) -> Self {
        self.delimiter = delimiter;
        self
    }

    pub fn has_headers(mut self, has_headers: bool) -> Self {
        self.has_headers = has_headers;
        self
    }

    pub fn flexible(mut self, flexible: bool) -> Self {
        self.flexible = flexible;
        self
    }

    pub fn quoting(mut self, quoting: bool) -> Self {
        self.quoting = quoting;
        self
    }

    pub fn quote(mut self, quote: u8) -> Self {
        self.quote = quote;
        self
    }

    pub fn escape(mut self, escape: u8) -> Self {
        self.escape = Some(escape);
        self
    }

    pub fn comment(mut self, comment: u8) -> Self {
        self.comment = Some(comment);
        self
    }

    pub(crate) fn build_reader<R: AsyncRead + Unpin + Send>(
        &self,
        reader: R,
    ) -> csv_async::AsyncReader<R> {
        let mut builder = AsyncReaderBuilder::new();
        builder.delimiter(self.delimiter);
        builder.has_headers(self.has_headers);
        builder.flexible(self.flexible);
        builder.quoting(self.quoting);
        builder.quote(self.quote);
        builder.escape(self.escape);
        builder.comment(self.comment);
        builder.create_reader(reader)
    }

    fn build_serializer<W: AsyncWrite + Unpin + Send>(
        &self,
        writer: W,
    ) -> csv_async::AsyncSerializer<W> {
        let mut builder = AsyncWriterBuilder::new();
        builder.delimiter(self.delimiter);
        builder.has_headers(self.has_headers);
        if self.quoting {
            builder.quote_style(csv_async::QuoteStyle::Always);
        } else {
            builder.quote_style(csv_async::QuoteStyle::Never);
        }
        builder.quote(self.quote);
        if let Some(escape) = self.escape {
            builder.escape(escape);
        }
        builder.create_serializer(writer)
    }

    pub fn read(&self) -> CsvReadBuilder<'_> {
        CsvReadBuilder { client: self }
    }

    pub fn stream<R: AsyncRead + Unpin + Send>(&self, reader: R) -> CsvRecordStream<R> {
        CsvRecordStream {
            inner: self.build_reader(reader),
            headers: None,
        }
    }

    pub async fn write_all<T: Serialize>(&self, records: &[T]) -> Result<Vec<u8>, AppError> {
        let mut ser = self.build_serializer(Vec::new());
        for record in records {
            ser.serialize(record).await.map_err(map_csv_error)?;
        }
        ser.into_inner().await.map_err(|e| {
            AppError::internal_error(
                e.to_string(),
                Some("Failed to flush CSV writer".to_string()),
            )
        })
    }

    pub async fn write_file<T: Serialize>(
        &self,
        path: impl AsRef<std::path::Path>,
        records: &[T],
    ) -> Result<(), AppError> {
        let file = tokio::fs::File::create(path).await.map_err(|e| {
            AppError::internal_error(e.to_string(), Some("Failed to create CSV file".to_string()))
        })?;
        let mut ser = self.build_serializer(file);
        for record in records {
            ser.serialize(record).await.map_err(map_csv_error)?;
        }
        ser.into_inner().await.map_err(|e| {
            AppError::internal_error(
                e.to_string(),
                Some("Failed to flush CSV writer".to_string()),
            )
        })?;
        Ok(())
    }

    pub fn writer<W: AsyncWrite + Unpin + Send>(&self, writer: W) -> CsvRecordWriter<W> {
        CsvRecordWriter {
            inner: self.build_serializer(writer),
        }
    }

    pub async fn writer_to_file(
        &self,
        path: impl AsRef<std::path::Path>,
    ) -> Result<CsvRecordWriter<tokio::fs::File>, AppError> {
        let file = tokio::fs::File::create(path).await.map_err(|e| {
            AppError::internal_error(e.to_string(), Some("Failed to create CSV file".to_string()))
        })?;
        Ok(self.writer(file))
    }
}