vortex 0.70.0

Vortex file format with all builtin codecs and a sampling compressor.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

// https://github.com/rust-lang/cargo/pull/11645#issuecomment-1536905941
#![doc = include_str!(concat!("../", env!("CARGO_PKG_README")))]

// vortex::compute is deprecated and will be ported over to expressions.
pub use vortex_array::aggregate_fn;
use vortex_array::aggregate_fn::session::AggregateFnSession;
pub use vortex_array::compute;
use vortex_array::dtype::session::DTypeSession;
// vortex::expr is in the process of having its dependencies inverted, and will eventually be
// pulled back out into a vortex_expr crate.
pub use vortex_array::expr;
use vortex_array::optimizer::kernels::ArrayKernels;
pub use vortex_array::scalar_fn;
use vortex_array::scalar_fn::session::ScalarFnSession;
use vortex_array::session::ArraySession;
use vortex_io::session::RuntimeSession;
use vortex_layout::session::LayoutSession;
use vortex_session::VortexSession;

// We re-export like so in order to allow users to search inside subcrates when using the Rust docs.

pub mod array {
    pub use vortex_array::*;

    // TODO(connor): We should probably manually pull up everything we need besides these 3 modules.
    // Note that there `vortex::dtype`, `vortex::extension`, and `vortex::scalar` are all exported
    // twice.
}

pub mod buffer {
    pub use vortex_buffer::*;
}

pub mod compressor {
    pub use vortex_btrblocks::BtrBlocksCompressor;
    pub use vortex_btrblocks::BtrBlocksCompressorBuilder;
    pub use vortex_btrblocks::Scheme;
    pub use vortex_btrblocks::SchemeId;
}

pub mod dtype {
    pub use vortex_array::dtype::*;
}

pub mod error {
    pub use vortex_error::*;
}

pub mod extension {
    pub use vortex_array::extension::*;
}

#[cfg(feature = "files")]
pub mod file {
    pub use vortex_file::*;
}

pub mod flatbuffers {
    pub use vortex_flatbuffers::*;
}

pub mod io {
    pub use vortex_io::*;
}

pub mod ipc {
    pub use vortex_ipc::*;
}

pub mod layout {
    pub use vortex_layout::*;
}

pub mod mask {
    pub use vortex_mask::*;
}

pub mod metrics {
    pub use vortex_metrics::*;
}

pub mod proto {
    pub use vortex_proto::*;
}

pub mod scalar {
    pub use vortex_array::scalar::*;
}

pub mod scan {
    pub use vortex_scan::*;
}

pub mod session {
    pub use vortex_session::*;
}

pub mod utils {
    pub use vortex_utils::*;
}

pub mod encodings {
    pub mod alp {
        pub use vortex_alp::*;
    }

    pub mod bytebool {
        pub use vortex_bytebool::*;
    }

    pub mod datetime_parts {
        pub use vortex_datetime_parts::*;
    }

    pub mod decimal_byte_parts {
        pub use vortex_decimal_byte_parts::*;
    }

    pub mod fastlanes {
        pub use vortex_fastlanes::*;
    }

    pub mod fsst {
        pub use vortex_fsst::*;
    }

    pub mod pco {
        pub use vortex_pco::*;
    }

    pub mod runend {
        pub use vortex_runend::*;
    }

    pub mod sequence {
        pub use vortex_sequence::*;
    }

    pub mod sparse {
        pub use vortex_sparse::*;
    }

    pub mod zigzag {
        pub use vortex_zigzag::*;
    }

    #[cfg(feature = "zstd")]
    pub mod zstd {
        pub use vortex_zstd::*;
    }
}

/// Extension trait to create a default Vortex session.
pub trait VortexSessionDefault {
    /// Creates a default Vortex session with standard arrays, layouts, scalar functions,
    /// optimizer kernels, expressions, aggregate functions, and runtime support.
    fn default() -> VortexSession;
}

impl VortexSessionDefault for VortexSession {
    fn default() -> VortexSession {
        let session = VortexSession::empty()
            .with::<DTypeSession>()
            .with::<ArraySession>()
            .with::<LayoutSession>()
            .with::<ScalarFnSession>()
            .with::<ArrayKernels>()
            .with::<AggregateFnSession>()
            .with::<RuntimeSession>();

        #[cfg(feature = "files")]
        file::register_default_encodings(&session);

        session
    }
}

/// These tests are included in the getting started documentation, so be mindful of which imports
/// to keep inside the test functions, and which to just use from the outer scope. The examples
/// get too verbose if we include _everything_.
#[cfg(test)]
mod test {
    use std::path::PathBuf;

    use vortex_array::ArrayRef;
    use vortex_array::IntoArray;
    use vortex_array::LEGACY_SESSION;
    use vortex_array::VortexSessionExecute;
    use vortex_array::arrays::PrimitiveArray;
    use vortex_array::arrays::StructArray;
    use vortex_array::dtype::FieldNames;
    use vortex_array::expr::gt;
    use vortex_array::expr::lit;
    use vortex_array::expr::root;
    use vortex_array::expr::select;
    use vortex_array::stream::ArrayStreamExt;
    use vortex_array::validity::Validity;
    use vortex_btrblocks::BtrBlocksCompressorBuilder;
    use vortex_buffer::buffer;
    use vortex_error::VortexResult;
    use vortex_file::OpenOptionsSessionExt;
    use vortex_file::WriteOptionsSessionExt;
    use vortex_file::WriteStrategyBuilder;
    use vortex_session::VortexSession;

    use crate as vortex;
    use crate::VortexSessionDefault;

    #[test]
    fn convert() -> anyhow::Result<()> {
        // [convert]
        use std::fs::File;

        use arrow_array::RecordBatchReader;
        use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
        use vortex::array::arrays::ChunkedArray;
        use vortex::array::arrow::FromArrowArray;
        use vortex::dtype::DType;
        use vortex::dtype::arrow::FromArrowType;

        let reader = ParquetRecordBatchReaderBuilder::try_new(File::open(
            "../docs/_static/example.parquet",
        )?)?
        .build()?;

        let dtype = DType::from_arrow(reader.schema());
        let chunks: Vec<_> = reader
            .map(|record_batch| {
                let batch = record_batch?;
                ArrayRef::from_arrow(batch, false)
            })
            .collect::<VortexResult<_>>()?;
        let vortex_array = ChunkedArray::try_new(chunks, dtype)?.into_array();
        // [convert]

        assert_eq!(vortex_array.len(), 1000);

        Ok(())
    }

    #[test]
    fn compress() -> VortexResult<()> {
        // [compress]
        use vortex::compressor::BtrBlocksCompressor;

        let array = PrimitiveArray::new(buffer![42u64; 100_000], Validity::NonNullable);

        // You can compress an array in-memory with the BtrBlocks compressor
        let session = VortexSession::default();
        let compressed = BtrBlocksCompressor::default().compress(
            &array.clone().into_array(),
            &mut session.create_execution_ctx(),
        )?;
        println!(
            "BtrBlocks size: {} / {}",
            compressed.nbytes(),
            array.into_array().nbytes()
        );
        // [compress]

        Ok(())
    }

    #[tokio::test]
    async fn read_write() -> VortexResult<()> {
        let session = VortexSession::default();

        // [write]
        let array = PrimitiveArray::new(buffer![0u64, 1, 2, 3, 4], Validity::NonNullable);

        // Write a Vortex file with the default compression and layout strategy.
        let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("example.vortex");

        session
            .write_options()
            .write(
                &mut tokio::fs::File::create(&path).await?,
                array.into_array().to_array_stream(),
            )
            .await?;

        // [write]

        // [read]
        let array = session
            .open_options()
            .open_path(path.clone())
            .await?
            .scan()?
            .with_filter(gt(root(), lit(2u64)))
            .into_array_stream()?
            .read_all()
            .await?;

        assert_eq!(array.len(), 2);

        // [read]

        std::fs::remove_file(&path)?;

        Ok(())
    }

    #[tokio::test]
    async fn compact_read_write() -> VortexResult<()> {
        let session = VortexSession::default();

        // [compact write]
        let array = PrimitiveArray::new(buffer![0u64, 1, 2, 3, 4], Validity::NonNullable);

        let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("example_compact.vortex");

        session
            .write_options()
            .with_strategy(
                WriteStrategyBuilder::default()
                    .with_btrblocks_builder(BtrBlocksCompressorBuilder::default().with_compact())
                    .build(),
            )
            .write(
                &mut tokio::fs::File::create(&path).await?,
                array.clone().into_array().to_array_stream(),
            )
            .await?;

        // [compact read]
        let recovered_array = session
            .open_options()
            .open_path(path.clone())
            .await?
            .scan()?
            .into_array_stream()?
            .read_all()
            .await?;

        assert_eq!(recovered_array.len(), array.len());

        let mut ctx = LEGACY_SESSION.create_execution_ctx();

        let recovered_primitive = recovered_array.execute::<PrimitiveArray>(&mut ctx)?;
        assert!(
            recovered_primitive
                .validity()?
                .mask_eq(&array.validity()?, &mut ctx)?
        );
        assert_eq!(
            recovered_primitive.to_buffer::<u64>(),
            array.to_buffer::<u64>()
        );

        std::fs::remove_file(&path)?;

        Ok(())
    }

    #[tokio::test]
    async fn projection_read_write() -> VortexResult<()> {
        let session = VortexSession::default();

        // Build a simple two-column struct array: { id: u64, value: u64 }
        let ids = PrimitiveArray::new(buffer![1u64, 2, 3, 4, 5], Validity::NonNullable);
        let values = PrimitiveArray::new(buffer![10u64, 20, 30, 40, 50], Validity::NonNullable);

        let array = StructArray::try_new(
            FieldNames::from(["id", "value"]),
            vec![ids.into_array(), values.into_array()],
            5,
            Validity::NonNullable,
        )?
        .into_array();

        // Write a Vortex file containing both columns.
        let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("example_projection.vortex");

        session
            .write_options()
            .write(
                &mut tokio::fs::File::create(&path).await?,
                array.into_array().to_array_stream(),
            )
            .await?;

        // Read the file back, but project down to just the "value" column.
        let projected = session
            .open_options()
            .open_path(path.clone())
            .await?
            .scan()?
            .with_projection(select(["value"], root()))
            .into_array_stream()?
            .read_all()
            .await?;

        // Projection keeps the same number of rows but only one column.
        assert_eq!(projected.len(), 5);

        std::fs::remove_file(&path)?;

        Ok(())
    }
}