#![doc = include_str!(concat!("../", env!("CARGO_PKG_README")))]
pub use vortex_array::aggregate_fn;
use vortex_array::aggregate_fn::session::AggregateFnSession;
pub use vortex_array::compute;
use vortex_array::dtype::session::DTypeSession;
pub use vortex_array::expr;
pub use vortex_array::scalar_fn;
use vortex_array::scalar_fn::session::ScalarFnSession;
use vortex_array::session::ArraySession;
use vortex_io::session::RuntimeSession;
use vortex_layout::session::LayoutSession;
use vortex_session::VortexSession;
pub mod array {
pub use vortex_array::*;
}
pub mod buffer {
pub use vortex_buffer::*;
}
pub mod compressor {
pub use vortex_btrblocks::BtrBlocksCompressor;
pub use vortex_btrblocks::BtrBlocksCompressorBuilder;
pub use vortex_btrblocks::FloatCode;
pub use vortex_btrblocks::IntCode;
pub use vortex_btrblocks::StringCode;
}
pub mod dtype {
pub use vortex_array::dtype::*;
}
pub mod error {
pub use vortex_error::*;
}
pub mod extension {
pub use vortex_array::extension::*;
}
#[cfg(feature = "files")]
pub mod file {
pub use vortex_file::*;
}
pub mod flatbuffers {
pub use vortex_flatbuffers::*;
}
pub mod io {
pub use vortex_io::*;
}
pub mod ipc {
pub use vortex_ipc::*;
}
pub mod layout {
pub use vortex_layout::*;
}
pub mod mask {
pub use vortex_mask::*;
}
pub mod metrics {
pub use vortex_metrics::*;
}
pub mod proto {
pub use vortex_proto::*;
}
pub mod scalar {
pub use vortex_array::scalar::*;
}
pub mod scan {
pub use vortex_scan::*;
}
pub mod session {
pub use vortex_session::*;
}
pub mod utils {
pub use vortex_utils::*;
}
pub mod encodings {
pub mod alp {
pub use vortex_alp::*;
}
pub mod bytebool {
pub use vortex_bytebool::*;
}
pub mod datetime_parts {
pub use vortex_datetime_parts::*;
}
pub mod decimal_byte_parts {
pub use vortex_decimal_byte_parts::*;
}
pub mod fastlanes {
pub use vortex_fastlanes::*;
}
pub mod fsst {
pub use vortex_fsst::*;
}
pub mod pco {
pub use vortex_pco::*;
}
pub mod runend {
pub use vortex_runend::*;
}
pub mod sequence {
pub use vortex_sequence::*;
}
pub mod sparse {
pub use vortex_sparse::*;
}
pub mod zigzag {
pub use vortex_zigzag::*;
}
#[cfg(feature = "zstd")]
pub mod zstd {
pub use vortex_zstd::*;
}
}
pub trait VortexSessionDefault {
fn default() -> VortexSession;
}
impl VortexSessionDefault for VortexSession {
#[allow(unused_mut)]
fn default() -> VortexSession {
let mut session = VortexSession::empty()
.with::<DTypeSession>()
.with::<ArraySession>()
.with::<LayoutSession>()
.with::<ScalarFnSession>()
.with::<AggregateFnSession>()
.with::<RuntimeSession>();
#[cfg(feature = "files")]
file::register_default_encodings(&mut session);
session
}
}
#[cfg(test)]
mod test {
use std::path::PathBuf;
use vortex_array::ArrayRef;
use vortex_array::IntoArray;
use vortex_array::LEGACY_SESSION;
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::StructArray;
use vortex_array::dtype::FieldNames;
use vortex_array::expr::gt;
use vortex_array::expr::lit;
use vortex_array::expr::root;
use vortex_array::expr::select;
use vortex_array::stream::ArrayStreamExt;
use vortex_array::validity::Validity;
use vortex_array::vtable::ValidityHelper;
use vortex_buffer::buffer;
use vortex_error::VortexResult;
use vortex_file::OpenOptionsSessionExt;
use vortex_file::WriteOptionsSessionExt;
use vortex_file::WriteStrategyBuilder;
use vortex_session::VortexSession;
use crate as vortex;
use crate::VortexSessionDefault;
#[test]
fn convert() -> anyhow::Result<()> {
use std::fs::File;
use arrow_array::RecordBatchReader;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use vortex::array::DynArray;
use vortex::array::arrays::ChunkedArray;
use vortex::dtype::DType;
use vortex::dtype::arrow::FromArrowType;
use vortex_array::arrow::FromArrowArray;
let reader = ParquetRecordBatchReaderBuilder::try_new(File::open(
"../docs/_static/example.parquet",
)?)?
.build()?;
let dtype = DType::from_arrow(reader.schema());
let chunks: Vec<_> = reader
.map(|record_batch| {
let batch = record_batch?;
ArrayRef::from_arrow(batch, false)
})
.collect::<VortexResult<_>>()?;
let vortex_array = ChunkedArray::try_new(chunks, dtype)?.into_array();
assert_eq!(vortex_array.len(), 1000);
Ok(())
}
#[test]
fn compress() -> VortexResult<()> {
use vortex::compressor::BtrBlocksCompressor;
let array = PrimitiveArray::new(buffer![42u64; 100_000], Validity::NonNullable);
let compressed = BtrBlocksCompressor::default().compress(&array.clone().into_array())?;
println!(
"BtrBlocks size: {} / {}",
compressed.nbytes(),
array.nbytes()
);
Ok(())
}
#[tokio::test]
async fn read_write() -> VortexResult<()> {
let session = VortexSession::default();
let array = PrimitiveArray::new(buffer![0u64, 1, 2, 3, 4], Validity::NonNullable);
let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("example.vortex");
session
.write_options()
.write(
&mut tokio::fs::File::create(&path).await?,
array.to_array_stream(),
)
.await?;
let array = session
.open_options()
.open_path(path.clone())
.await?
.scan()?
.with_filter(gt(root(), lit(2u64)))
.into_array_stream()?
.read_all()
.await?;
assert_eq!(array.len(), 2);
std::fs::remove_file(&path)?;
Ok(())
}
#[tokio::test]
async fn compact_read_write() -> VortexResult<()> {
let session = VortexSession::default();
let array = PrimitiveArray::new(buffer![0u64, 1, 2, 3, 4], Validity::NonNullable);
let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("example_compact.vortex");
session
.write_options()
.with_strategy(
WriteStrategyBuilder::default()
.with_compact_encodings()
.build(),
)
.write(
&mut tokio::fs::File::create(&path).await?,
array.to_array_stream(),
)
.await?;
let recovered_array = session
.open_options()
.open_path(path.clone())
.await?
.scan()?
.into_array_stream()?
.read_all()
.await?;
assert_eq!(recovered_array.len(), array.len());
let mut ctx = LEGACY_SESSION.create_execution_ctx();
let recovered_primitive = recovered_array.execute::<PrimitiveArray>(&mut ctx)?;
assert!(
recovered_primitive
.validity()
.mask_eq(array.validity(), &mut ctx)?
);
assert_eq!(
recovered_primitive.to_buffer::<u64>(),
array.to_buffer::<u64>()
);
std::fs::remove_file(&path)?;
Ok(())
}
#[tokio::test]
async fn projection_read_write() -> VortexResult<()> {
let session = VortexSession::default();
let ids = PrimitiveArray::new(buffer![1u64, 2, 3, 4, 5], Validity::NonNullable);
let values = PrimitiveArray::new(buffer![10u64, 20, 30, 40, 50], Validity::NonNullable);
let array = StructArray::try_new(
FieldNames::from(["id", "value"]),
vec![ids.into_array(), values.into_array()],
5,
Validity::NonNullable,
)?
.into_array();
let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("example_projection.vortex");
session
.write_options()
.write(
&mut tokio::fs::File::create(&path).await?,
array.to_array_stream(),
)
.await?;
let projected = session
.open_options()
.open_path(path.clone())
.await?
.scan()?
.with_projection(select(["value"], root()))
.into_array_stream()?
.read_all()
.await?;
assert_eq!(projected.len(), 5);
std::fs::remove_file(&path)?;
Ok(())
}
}