Crate csv_async[][src]

Expand description

The csv-async crate provides a fast and flexible CSV reader and writer, which is intended to be run in asynchronous environment - i.e. inside functions with async attribute called by tasks run by executor. This library does not imply using any particular executor. Unit tests and documentation snippets use either async-std or tokio crates. Synchronous interface for reading and writing CSV files is not contained in this crate, please use csv crate for this. This crate attempts to mimic csv crate API, but there are some exceptions. E.g. configuration builders have create_... factory functions instead of from_... as in csv crate.

Brief overview

The primary types in this crate are AsyncReader and AsyncWriter for reading and writing CSV data respectively. Or AsyncDeserializer and AsyncSerializer for reading and writing CSV data using interfaces generated by serde_derive macros.

Correspondingly, to support CSV data with custom field or record delimiters (among many other things), you should use either a AsyncReaderBuilder or a AsyncWriterBuilder, depending on whether you’re reading or writing CSV data.

The standard CSV record types are StringRecord and ByteRecord. StringRecord should be used when you know your data to be valid UTF-8. For data that may be invalid UTF-8, ByteRecord is suitable.

Finally, the set of errors is described by the Error type.

The rest of the types in this crate mostly correspond to more detailed errors, position information, configuration knobs or iterator types.

Setup

Add this to your Cargo.toml:

[dependencies]
csv-async = "1.1"

Examples

This example shows how to read and write CSV file in asynchronous context and get into some record details.

Sample input file:

city,region,country,population
Southborough,MA,United States,9686
Northbridge,MA,United States,14061
Marlborough,MA,United States,38334
Springfield,MA,United States,152227
Springfield,MO,United States,150443
Springfield,NJ,United States,14976
Concord,NH,United States,42605
use std::error::Error;
use std::process;
#[cfg(not(feature = "tokio"))]
use futures::stream::StreamExt;
#[cfg(not(feature = "tokio"))]
use async_std::fs::File;
#[cfg(feature = "tokio")]
use tokio1 as tokio;
#[cfg(feature = "tokio")]
use tokio_stream::StreamExt;
#[cfg(feature = "tokio")]
use tokio::fs::File;

async fn filter_by_region(region:&str, file_in:&str, file_out:&str) -> Result<(), Box<dyn Error>> {
    // Function reads CSV file that has column named "region" at second position (index = 1).
    // It writes to new file only rows with region equal to passed argument
    // and removes region column.
    let mut rdr = csv_async::AsyncReader::from_reader(
        File::open(file_in).await?
    );
    let mut wri = csv_async::AsyncWriter::from_writer(
        File::create(file_out).await?
    );
    wri.write_record(rdr
        .headers()
        .await?.into_iter()
        .filter(|h| *h != "region")
    ).await?;
    let mut records = rdr.records();
    while let Some(record) = records.next().await {
        let record = record?;
        match record.get(1) {
            Some(reg) if reg == region => 
                wri.write_record(record
                    .iter()
                    .enumerate()
                    .filter(|(i, _)| *i != 1)
                    .map(|(_, s)| s)
                ).await?,
            _ => {},
        }
    }
    Ok(())
}

#[cfg(not(feature = "tokio"))]
fn main() {
    async_std::task::block_on(async {
        if let Err(err) = filter_by_region(
            "MA",
            "/tmp/all_regions.csv",
            "/tmp/MA_only.csv"
        ).await {
            eprintln!("error running filter_by_region: {}", err);
            process::exit(1);
        }
    });
}

#[cfg(feature = "tokio")]
fn main() {
    tokio::runtime::Runtime::new().unwrap().block_on(async {
        if let Err(err) = filter_by_region(
            "MA",
            "/tmp/all_regions.csv",
            "/tmp/MA_only.csv"
        ).await {
            eprintln!("error running filter_by_region: {}", err);
            process::exit(1);
        }
    });
}
use std::error::Error;
use std::process;
#[cfg(feature = "with_serde")]
use serde::{Deserialize, Serialize};
#[cfg(not(feature = "tokio"))]
use futures::stream::StreamExt;
#[cfg(not(feature = "tokio"))]
use async_std::fs::File;
#[cfg(feature = "tokio")]
use tokio1 as tokio;
#[cfg(feature = "tokio")]
use tokio_stream::StreamExt;
#[cfg(feature = "tokio")]
use tokio::fs::File;

#[cfg(feature = "with_serde")]
#[derive(Deserialize, Serialize)]
struct Row {
    city: String,
    region: String,
    country: String,
    population: u64,
}

#[cfg(feature = "with_serde")]
async fn filter_by_region_serde(region:&str, file_in:&str, file_out:&str) -> Result<(), Box<dyn Error>> {
    // Function reads CSV file that has column named "region" at second position (index = 1).
    // It writes to new file only rows with region equal to passed argument.
    let mut rdr = csv_async::AsyncDeserializer::from_reader(
        File::open(file_in).await?
    );
    let mut wri = csv_async::AsyncSerializer::from_writer(
        File::create(file_out).await?
    );
    let mut records = rdr.deserialize::<Row>();
    while let Some(record) = records.next().await {
        let record = record?;
        if record.region == region {
            wri.serialize(&record).await?;
        }
    }
    Ok(())
}

#[cfg(feature = "with_serde")]
#[cfg(not(feature = "tokio"))]
fn main() {
    async_std::task::block_on(async {
        if let Err(err) = filter_by_region_serde(
            "MA",
            "/tmp/all_regions.csv",
            "/tmp/MA_only.csv"
        ).await {
            eprintln!("error running filter_by_region_serde: {}", err);
            process::exit(1);
        }
    });
}

#[cfg(feature = "with_serde")]
#[cfg(feature = "tokio")]
fn main() {
    tokio::runtime::Runtime::new().unwrap().block_on(async {
        if let Err(err) = filter_by_region_serde(
            "MA",
            "/tmp/all_regions.csv",
            "/tmp/MA_only.csv"
        ).await {
            eprintln!("error running filter_by_region_serde: {}", err);
            process::exit(1);
        }
    });
}

#[cfg(not(feature = "with_serde"))]
fn main() {}

Structs

A already configured CSV serde deserializer.

A already configured CSV reader.

Builds a CSV reader with various configuration knobs.

A already configured CSV serde serializer.

A already configured CSV writer.

Builds a CSV writer with various configuration knobs.

A single CSV record stored as raw bytes.

A double-ended iterator over the fields in a byte record.

An owned stream of records as raw bytes.

A borrowed stream of records as raw bytes.

A owned stream of deserialized records.

A owned stream of pairs: deserialized records and position in stream before reading record.

A borrowed stream of deserialized records.

A borrowed stream of pairs: deserialized records and position in stream before reading record.

An error that can occur when processing CSV data.

A UTF-8 validation error during record conversion.

IntoInnerError occurs when consuming a Writer fails.

A position in CSV data.

A single CSV record stored as valid UTF-8 bytes.

An iterator over the fields in a string record.

An owned stream of records as strings.

A borrowed stream of records as strings.

A UTF-8 validation error.

Enums

The specific type of an error.

The quoting style to use when writing CSV data.

A record terminator.

The whitespace preservation behavior when reading CSV data.

Type Definitions

A type alias for Result<T, csv_async::Error>.