connectorx 0.4.5

Load data from databases to dataframes, the fastest way.
Documentation
use arrow::array::Int64Array;
use connectorx::prelude::*;
use connectorx::{
    destinations::arrow::{ArrowDestination, ArrowTypeSystem},
    sources::{
        csv::{CSVSource, CSVTypeSystem},
        PartitionParser,
    },
    sql::CXQuery,
    transports::CSVArrowTransport,
};

#[test]
#[should_panic]
#[ignore]
fn no_file() {
    let mut source = CSVSource::new(&[]);
    source.set_queries(&[CXQuery::naked("./a_fake_file.csv")]);
    let partitions = source.partition().unwrap();
    for mut p in partitions {
        p.result_rows().expect("run query");
    }
}

#[test]
#[should_panic]
#[ignore]
fn empty_file() {
    let mut source = CSVSource::new(&[]);
    source.set_queries(&[CXQuery::naked("./tests/data/empty.csv")]);
    let mut partitions = source.partition().unwrap();
    for p in &mut partitions {
        p.result_rows().expect("run query");
    }
    assert_eq!(0, partitions[0].nrows());
    assert_eq!(0, partitions[0].ncols());
    let mut parser = partitions[0].parser().unwrap();

    parser.fetch_next().unwrap();

    let _v: i64 = parser.produce().expect("produce from emtpy");
}

#[test]
#[ignore]
fn load_and_parse() {
    #[derive(Debug, PartialEq)]
    enum Value {
        City(String),
        State(String),
        Population(i64),
        Longitude(f64),
        Latitude(f64),
    }

    let mut source = CSVSource::new(&[
        CSVTypeSystem::String(false),
        CSVTypeSystem::String(false),
        CSVTypeSystem::I64(false),
        CSVTypeSystem::F64(false),
        CSVTypeSystem::F64(false),
    ]);
    source.set_queries(&[CXQuery::naked("./tests/data/uspop_0.csv")]);

    let mut partitions = source.partition().unwrap();

    let mut partition = partitions.remove(0);
    partition.result_rows().expect("run query");

    assert_eq!(3, partition.nrows());
    assert_eq!(5, partition.ncols());

    let mut results: Vec<Value> = Vec::new();
    let mut parser = partition.parser().unwrap();
    loop {
        let (n, is_last) = parser.fetch_next().unwrap();
        for _i in 0..n {
            results.push(Value::City(parser.produce().expect("parse city")));
            results.push(Value::State(parser.produce().expect("parse state")));
            results.push(Value::Population(
                parser.produce().expect("parse population"),
            ));
            results.push(Value::Longitude(parser.produce().expect("parse longitude")));
            results.push(Value::Latitude(parser.produce().expect("parse latitude")));
        }
        if is_last {
            break;
        }
    }
    assert_eq!(
        vec![
            Value::City(String::from("Kenai")),
            Value::State(String::from("AK")),
            Value::Population(7610),
            Value::Longitude(60.5544444),
            Value::Latitude(-151.2583333),
            Value::City(String::from("Selma")),
            Value::State(String::from("AL")),
            Value::Population(18980),
            Value::Longitude(32.4072222),
            Value::Latitude(-87.0211111),
            Value::City(String::from("El Mirage")),
            Value::State(String::from("AZ")),
            Value::Population(32308),
            Value::Longitude(33.6130556),
            Value::Latitude(-112.3238889)
        ],
        results
    );
}

#[test]
#[ignore]
fn test_csv() {
    let schema = [CSVTypeSystem::I64(false); 5];
    let files = [
        CXQuery::naked("./tests/data/uint_0.csv"),
        CXQuery::naked("./tests/data/uint_1.csv"),
    ];
    let source = CSVSource::new(&schema);

    let mut destination = ArrowDestination::new();
    let dispatcher =
        Dispatcher::<_, _, CSVArrowTransport>::new(source, &mut destination, &files, None);

    dispatcher.run().expect("run dispatcher");

    let result = destination.arrow().unwrap();

    println!("result len: {}", result.len());
    assert!(result.len() == 2);

    for rb in result {
        for i in 0..5 {
            let col = rb.column(i).as_any().downcast_ref::<Int64Array>().unwrap();
            assert!(
                col.eq(&Int64Array::from_iter_values(
                    (4i64..=10).map(|v| v * 5 + i as i64),
                )) || col.eq(&Int64Array::from_iter_values(
                    (0i64..4).map(|v| v * 5 + i as i64),
                ))
            );
        }
    }
}

#[test]
#[ignore]
fn test_csv_infer_schema() {
    let files = [CXQuery::naked("./tests/data/infer_0.csv")];
    let source = CSVSource::new(&[]);

    let mut writer = ArrowDestination::new();
    let dispatcher = Dispatcher::<_, _, CSVArrowTransport>::new(source, &mut writer, &files, None);

    dispatcher.run().expect("run dispatcher");

    let expected_schema = vec![
        ArrowTypeSystem::Int64(false),
        ArrowTypeSystem::Float64(false),
        ArrowTypeSystem::Boolean(true),
        ArrowTypeSystem::LargeUtf8(true),
        ArrowTypeSystem::Float64(false),
        ArrowTypeSystem::LargeUtf8(true),
        ArrowTypeSystem::LargeUtf8(false),
        ArrowTypeSystem::LargeUtf8(true),
    ];

    assert_eq!(expected_schema, writer.schema());
}