Crate csv_async

source ·
Expand description

The csv-async crate provides a fast and flexible CSV reader and writer, which is intended to be run in asynchronous environment - i.e. inside functions with async attribute called by tasks run by executor. This library does not imply using any particular executor. Unit tests and documentation snippets use either async-std or tokio crates. Synchronous interface for reading and writing CSV files is not contained in this crate, please use csv crate for this. This crate attempts to mimic csv crate API, but there are some exceptions. E.g. configuration builders have create_... factory functions instead of from_... as in csv crate.

§Brief overview

The primary types in this crate are AsyncReader and AsyncWriter for reading and writing CSV data respectively. Or AsyncDeserializer and AsyncSerializer for reading and writing CSV data using interfaces generated by serde_derive macros.

Correspondingly, to support CSV data with custom field or record delimiters (among many other things), you should use either a AsyncReaderBuilder or a AsyncWriterBuilder, depending on whether you’re reading or writing CSV data.

The standard CSV record types are StringRecord and ByteRecord. StringRecord should be used when you know your data to be valid UTF-8. For data that may be invalid UTF-8, ByteRecord is suitable.

Finally, the set of errors is described by the Error type.

The rest of the types in this crate mostly correspond to more detailed errors, position information, configuration knobs or iterator types.

§Setup

In root folder for your project run cargo add csv-async or cargo add --features tokio csv-async to add this crate to your projext.

§Examples

This example shows how to read and write CSV file in asynchronous context and get into some record details.

Sample input file:

city,region,country,population
Southborough,MA,United States,9686
Northbridge,MA,United States,14061
Marlborough,MA,United States,38334
Springfield,MA,United States,152227
Springfield,MO,United States,150443
Springfield,NJ,United States,14976
Concord,NH,United States,42605
use std::error::Error;
use std::process;
#[cfg(not(feature = "tokio"))]
use futures::stream::StreamExt;
#[cfg(not(feature = "tokio"))]
use async_std::fs::File;
#[cfg(feature = "tokio")]
use tokio1 as tokio;
#[cfg(feature = "tokio")]
use tokio_stream::StreamExt;
#[cfg(feature = "tokio")]
use tokio::fs::File;

async fn filter_by_region(region:&str, file_in:&str, file_out:&str) -> Result<(), Box<dyn Error>> {
    // Function reads CSV file that has column named "region" at second position (index = 1).
    // It writes to new file only rows with region equal to passed argument
    // and removes region column.
    let mut rdr = csv_async::AsyncReader::from_reader(
        File::open(file_in).await?
    );
    let mut wri = csv_async::AsyncWriter::from_writer(
        File::create(file_out).await?
    );
    wri.write_record(rdr
        .headers()
        .await?.into_iter()
        .filter(|h| *h != "region")
    ).await?;
    let mut records = rdr.records();
    while let Some(record) = records.next().await {
        let record = record?;
        match record.get(1) {
            Some(reg) if reg == region => 
                wri.write_record(record
                    .iter()
                    .enumerate()
                    .filter(|(i, _)| *i != 1)
                    .map(|(_, s)| s)
                ).await?,
            _ => {},
        }
    }
    Ok(())
}

#[cfg(not(feature = "tokio"))]
fn main() {
    async_std::task::block_on(async {
        if let Err(err) = filter_by_region(
            "MA",
            "/tmp/all_regions.csv",
            "/tmp/MA_only.csv"
        ).await {
            eprintln!("error running filter_by_region: {}", err);
            process::exit(1);
        }
    });
}

#[cfg(feature = "tokio")]
fn main() {
    tokio::runtime::Runtime::new().unwrap().block_on(async {
        if let Err(err) = filter_by_region(
            "MA",
            "/tmp/all_regions.csv",
            "/tmp/MA_only.csv"
        ).await {
            eprintln!("error running filter_by_region: {}", err);
            process::exit(1);
        }
    });
}
use std::error::Error;
use std::process;
#[cfg(feature = "with_serde")]
use serde::{Deserialize, Serialize};
#[cfg(not(feature = "tokio"))]
use futures::stream::StreamExt;
#[cfg(not(feature = "tokio"))]
use async_std::fs::File;
#[cfg(feature = "tokio")]
use tokio1 as tokio;
#[cfg(feature = "tokio")]
use tokio_stream::StreamExt;
#[cfg(feature = "tokio")]
use tokio::fs::File;

#[cfg(feature = "with_serde")]
#[derive(Deserialize, Serialize)]
struct Row {
    city: String,
    region: String,
    country: String,
    population: u64,
}

#[cfg(feature = "with_serde")]
async fn filter_by_region_serde(region:&str, file_in:&str, file_out:&str) -> Result<(), Box<dyn Error>> {
    // Function reads CSV file that has column named "region" at second position (index = 1).
    // It writes to new file only rows with region equal to passed argument.
    let mut rdr = csv_async::AsyncDeserializer::from_reader(
        File::open(file_in).await?
    );
    let mut wri = csv_async::AsyncSerializer::from_writer(
        File::create(file_out).await?
    );
    let mut records = rdr.deserialize::<Row>();
    while let Some(record) = records.next().await {
        let record = record?;
        if record.region == region {
            wri.serialize(&record).await?;
        }
    }
    Ok(())
}

#[cfg(feature = "with_serde")]
#[cfg(not(feature = "tokio"))]
fn main() {
    async_std::task::block_on(async {
        if let Err(err) = filter_by_region_serde(
            "MA",
            "/tmp/all_regions.csv",
            "/tmp/MA_only.csv"
        ).await {
            eprintln!("error running filter_by_region_serde: {}", err);
            process::exit(1);
        }
    });
}

#[cfg(feature = "with_serde")]
#[cfg(feature = "tokio")]
fn main() {
    tokio::runtime::Runtime::new().unwrap().block_on(async {
        if let Err(err) = filter_by_region_serde(
            "MA",
            "/tmp/all_regions.csv",
            "/tmp/MA_only.csv"
        ).await {
            eprintln!("error running filter_by_region_serde: {}", err);
            process::exit(1);
        }
    });
}

#[cfg(not(feature = "with_serde"))]
fn main() {}

Structs§

Enums§

Type Aliases§

  • A type alias for Result<T, csv_async::Error>.