vortex_serde/
lib.rs

1#![doc(html_logo_url = "/vortex/docs/_static/vortex_spiral_logo.svg")]
2
3use messages::reader::*;
4use messages::writer::*;
5
6pub mod chunked_reader;
7mod dtype_reader;
8pub mod file;
9pub mod io;
10mod messages;
11pub mod stream_reader;
12pub mod stream_writer;
13pub use dtype_reader::*;
14
15pub const ALIGNMENT: usize = 64;
16
17#[cfg(test)]
18#[allow(clippy::panic_in_result_fn)]
19mod test {
20    use std::sync::Arc;
21
22    use futures_executor::block_on;
23    use futures_util::io::Cursor;
24    use futures_util::{pin_mut, StreamExt, TryStreamExt};
25    use itertools::Itertools;
26    use vortex_array::array::{ChunkedArray, PrimitiveArray, PrimitiveEncoding};
27    use vortex_array::encoding::ArrayEncoding;
28    use vortex_array::stream::ArrayStreamExt;
29    use vortex_array::{ArrayDType, Context, IntoArray};
30    use vortex_error::VortexResult;
31
32    use crate::io::FuturesAdapter;
33    use crate::stream_reader::StreamArrayReader;
34    use crate::stream_writer::StreamArrayWriter;
35
36    fn write_ipc<A: IntoArray>(array: A) -> Vec<u8> {
37        block_on(async {
38            StreamArrayWriter::new(vec![])
39                .write_array(array.into_array())
40                .await
41                .unwrap()
42                .into_inner()
43        })
44    }
45
46    #[test]
47    #[cfg_attr(miri, ignore)]
48    fn test_empty_index() -> VortexResult<()> {
49        let data = PrimitiveArray::from((0i32..3_000_000).collect_vec());
50        let buffer = write_ipc(data);
51
52        let indices = PrimitiveArray::from(vec![1, 2, 10]).into_array();
53
54        let ctx = Arc::new(Context::default());
55        let stream_reader = block_on(async {
56            StreamArrayReader::try_new(FuturesAdapter(Cursor::new(buffer)), ctx)
57                .await
58                .unwrap()
59                .load_dtype()
60                .await
61                .unwrap()
62        });
63        let reader = stream_reader.into_array_stream();
64
65        let result_iter = reader.take_rows(indices)?;
66        pin_mut!(result_iter);
67
68        let _result = block_on(async { result_iter.next().await.unwrap().unwrap() });
69        Ok(())
70    }
71
72    #[test]
73    #[cfg_attr(miri, ignore)]
74    fn test_write_read_chunked() -> VortexResult<()> {
75        let indices = PrimitiveArray::from(vec![
76            10u32, 11, 12, 13, 100_000, 2_999_999, 2_999_999, 3_000_000,
77        ])
78        .into_array();
79
80        // NB: the order is reversed here to ensure we aren't grabbing indexes instead of values
81        let data = PrimitiveArray::from((0i32..3_000_000).rev().collect_vec()).into_array();
82        let data2 =
83            PrimitiveArray::from((3_000_000i32..6_000_000).rev().collect_vec()).into_array();
84        let chunked = ChunkedArray::try_new(vec![data.clone(), data2], data.dtype().clone())?;
85        let buffer = write_ipc(chunked);
86
87        let ctx = Arc::new(Context::default());
88        let stream_reader = block_on(async {
89            StreamArrayReader::try_new(FuturesAdapter(Cursor::new(buffer)), ctx)
90                .await
91                .unwrap()
92                .load_dtype()
93                .await
94                .unwrap()
95        });
96
97        let take_iter = stream_reader.into_array_stream().take_rows(indices)?;
98        pin_mut!(take_iter);
99
100        let next = block_on(async { take_iter.try_next().await })?.expect("Expected a chunk");
101        assert_eq!(next.encoding().id(), PrimitiveEncoding.id());
102
103        assert_eq!(
104            next.as_primitive().maybe_null_slice::<i32>(),
105            vec![2999989, 2999988, 2999987, 2999986, 2899999, 0, 0]
106        );
107        assert_eq!(
108            block_on(async { take_iter.try_next().await })?
109                .expect("Expected a chunk")
110                .as_primitive()
111                .maybe_null_slice::<i32>(),
112            vec![5999999]
113        );
114
115        Ok(())
116    }
117}